redb/tree_store/
btree.rs

1use crate::db::TransactionGuard;
2use crate::tree_store::btree_base::{
3    BRANCH, BranchAccessor, BranchMutator, BtreeHeader, Checksum, DEFERRED, LEAF, LeafAccessor,
4    branch_checksum, leaf_checksum,
5};
6use crate::tree_store::btree_iters::BtreeExtractIf;
7use crate::tree_store::btree_mutator::MutateHelper;
8use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory};
9use crate::tree_store::{
10    AccessGuardMut, AllPageNumbersBtreeIter, BtreeRangeIter, PageHint, PageNumber,
11    PageTrackerPolicy,
12};
13use crate::types::{Key, MutInPlaceValue, Value};
14use crate::{AccessGuard, Result};
15#[cfg(feature = "logging")]
16use log::trace;
17use std::borrow::Borrow;
18use std::cmp::max;
19use std::collections::HashMap;
20use std::marker::PhantomData;
21use std::ops::RangeBounds;
22use std::sync::{Arc, Mutex};
23
24pub(crate) struct BtreeStats {
25    pub(crate) tree_height: u32,
26    pub(crate) leaf_pages: u64,
27    pub(crate) branch_pages: u64,
28    pub(crate) stored_leaf_bytes: u64,
29    pub(crate) metadata_bytes: u64,
30    pub(crate) fragmented_bytes: u64,
31}
32
33#[derive(Clone)]
34pub(crate) struct PagePath {
35    path: Vec<PageNumber>,
36}
37
38impl PagePath {
39    pub(crate) fn new_root(page_number: PageNumber) -> Self {
40        Self {
41            path: vec![page_number],
42        }
43    }
44
45    pub(crate) fn with_child(&self, page_number: PageNumber) -> Self {
46        let mut path = self.path.clone();
47        path.push(page_number);
48        Self { path }
49    }
50
51    pub(crate) fn with_subpath(&self, other: &Self) -> Self {
52        let mut path = self.path.clone();
53        path.extend(&other.path);
54        Self { path }
55    }
56
57    pub(crate) fn parents(&self) -> &[PageNumber] {
58        &self.path[..self.path.len() - 1]
59    }
60
61    pub(crate) fn page_number(&self) -> PageNumber {
62        self.path[self.path.len() - 1]
63    }
64}
65
66pub(crate) struct UntypedBtree {
67    mem: Arc<TransactionalMemory>,
68    root: Option<BtreeHeader>,
69    key_width: Option<usize>,
70    _value_width: Option<usize>,
71}
72
73impl UntypedBtree {
74    pub(crate) fn new(
75        root: Option<BtreeHeader>,
76        mem: Arc<TransactionalMemory>,
77        key_width: Option<usize>,
78        value_width: Option<usize>,
79    ) -> Self {
80        Self {
81            mem,
82            root,
83            key_width,
84            _value_width: value_width,
85        }
86    }
87
88    // Applies visitor to pages in the tree
89    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
90    where
91        F: FnMut(&PagePath) -> Result,
92    {
93        if let Some(page_number) = self.root.map(|x| x.root) {
94            self.visit_pages_helper(PagePath::new_root(page_number), &mut visitor)?;
95        }
96
97        Ok(())
98    }
99
100    fn visit_pages_helper<F>(&self, path: PagePath, visitor: &mut F) -> Result
101    where
102        F: FnMut(&PagePath) -> Result,
103    {
104        visitor(&path)?;
105        let page = self.mem.get_page(path.page_number())?;
106
107        match page.memory()[0] {
108            LEAF => {
109                // No-op
110            }
111            BRANCH => {
112                let accessor = BranchAccessor::new(&page, self.key_width);
113                for i in 0..accessor.count_children() {
114                    let child_page = accessor.child_page(i).unwrap();
115                    let child_path = path.with_child(child_page);
116                    self.visit_pages_helper(child_path, visitor)?;
117                }
118            }
119            _ => unreachable!(),
120        }
121
122        Ok(())
123    }
124}
125
126pub(crate) struct UntypedBtreeMut {
127    mem: Arc<TransactionalMemory>,
128    root: Option<BtreeHeader>,
129    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
130    key_width: Option<usize>,
131    value_width: Option<usize>,
132}
133
134impl UntypedBtreeMut {
135    pub(crate) fn new(
136        root: Option<BtreeHeader>,
137        mem: Arc<TransactionalMemory>,
138        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
139        key_width: Option<usize>,
140        value_width: Option<usize>,
141    ) -> Self {
142        Self {
143            mem,
144            root,
145            freed_pages,
146            key_width,
147            value_width,
148        }
149    }
150
151    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
152        self.root
153    }
154
155    // Recomputes the checksum for all pages that are uncommitted
156    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
157        let mut root = self.root;
158        if let Some(BtreeHeader {
159            root: ref p,
160            ref mut checksum,
161            length: _,
162        }) = root
163        {
164            if !self.mem.uncommitted(*p) {
165                // root page is clean
166                return Ok(root);
167            }
168
169            *checksum = self.finalize_dirty_checksums_helper(*p)?;
170            self.root = root;
171        }
172
173        Ok(root)
174    }
175
176    fn finalize_dirty_checksums_helper(&mut self, page_number: PageNumber) -> Result<Checksum> {
177        assert!(self.mem.uncommitted(page_number));
178        let mut page = self.mem.get_page_mut(page_number)?;
179
180        match page.memory()[0] {
181            LEAF => leaf_checksum(&page, self.key_width, self.value_width),
182            BRANCH => {
183                let accessor = BranchAccessor::new(&page, self.key_width);
184                let mut new_children = vec![];
185                for i in 0..accessor.count_children() {
186                    let child_page = accessor.child_page(i).unwrap();
187                    if self.mem.uncommitted(child_page) {
188                        let new_checksum = self.finalize_dirty_checksums_helper(child_page)?;
189                        new_children.push(Some((i, child_page, new_checksum)));
190                    } else {
191                        // Child is clean, skip it
192                        new_children.push(None);
193                    }
194                }
195
196                let mut mutator = BranchMutator::new(&mut page);
197                for (child_index, child_page, child_checksum) in new_children.into_iter().flatten()
198                {
199                    mutator.write_child_page(child_index, child_page, child_checksum);
200                }
201
202                branch_checksum(&page, self.key_width)
203            }
204            _ => unreachable!(),
205        }
206    }
207
208    // Applies visitor to all dirty leaf pages in the tree
209    pub(crate) fn dirty_leaf_visitor<F>(&mut self, visitor: F) -> Result
210    where
211        F: Fn(PageMut) -> Result,
212    {
213        if let Some(page_number) = self.root.map(|x| x.root) {
214            if !self.mem.uncommitted(page_number) {
215                // root page is clean
216                return Ok(());
217            }
218
219            let page = self.mem.get_page_mut(page_number)?;
220            match page.memory()[0] {
221                LEAF => {
222                    visitor(page)?;
223                }
224                BRANCH => {
225                    drop(page);
226                    self.dirty_leaf_visitor_helper(page_number, &visitor)?;
227                }
228                _ => unreachable!(),
229            }
230        }
231
232        Ok(())
233    }
234
235    fn dirty_leaf_visitor_helper<F>(&mut self, page_number: PageNumber, visitor: &F) -> Result
236    where
237        F: Fn(PageMut) -> Result,
238    {
239        assert!(self.mem.uncommitted(page_number));
240        let page = self.mem.get_page_mut(page_number)?;
241
242        match page.memory()[0] {
243            LEAF => {
244                visitor(page)?;
245            }
246            BRANCH => {
247                let accessor = BranchAccessor::new(&page, self.key_width);
248                for i in 0..accessor.count_children() {
249                    let child_page = accessor.child_page(i).unwrap();
250                    if self.mem.uncommitted(child_page) {
251                        self.dirty_leaf_visitor_helper(child_page, visitor)?;
252                    }
253                }
254            }
255            _ => unreachable!(),
256        }
257
258        Ok(())
259    }
260
261    pub(crate) fn relocate(
262        &mut self,
263        relocation_map: &HashMap<PageNumber, PageNumber>,
264    ) -> Result<bool> {
265        if let Some(root) = self.get_root() {
266            if let Some((new_root, new_checksum)) =
267                self.relocate_helper(root.root, relocation_map)?
268            {
269                self.root = Some(BtreeHeader::new(new_root, new_checksum, root.length));
270                return Ok(true);
271            }
272        }
273        Ok(false)
274    }
275
276    // Relocates the given subtree to the pages specified in relocation_map
277    fn relocate_helper(
278        &mut self,
279        page_number: PageNumber,
280        relocation_map: &HashMap<PageNumber, PageNumber>,
281    ) -> Result<Option<(PageNumber, Checksum)>> {
282        let old_page = self.mem.get_page(page_number)?;
283        let mut new_page = if let Some(new_page_number) = relocation_map.get(&page_number) {
284            self.mem.get_page_mut(*new_page_number)?
285        } else {
286            return Ok(None);
287        };
288        new_page.memory_mut().copy_from_slice(old_page.memory());
289
290        let node_mem = old_page.memory();
291        match node_mem[0] {
292            LEAF => {
293                // No-op
294            }
295            BRANCH => {
296                let accessor = BranchAccessor::new(&old_page, self.key_width);
297                let mut mutator = BranchMutator::new(&mut new_page);
298                for i in 0..accessor.count_children() {
299                    let child = accessor.child_page(i).unwrap();
300                    if let Some((new_child, new_checksum)) =
301                        self.relocate_helper(child, relocation_map)?
302                    {
303                        mutator.write_child_page(i, new_child, new_checksum);
304                    }
305                }
306            }
307            _ => unreachable!(),
308        }
309
310        let mut freed_pages = self.freed_pages.lock().unwrap();
311        // No need to track allocations, because this method is only called during compaction when
312        // there can't be any savepoints
313        let mut ignore = PageTrackerPolicy::Ignore;
314        if !self.mem.free_if_uncommitted(page_number, &mut ignore) {
315            freed_pages.push(page_number);
316        }
317
318        Ok(Some((new_page.get_page_number(), DEFERRED)))
319    }
320}
321
322pub(crate) struct BtreeMut<'a, K: Key + 'static, V: Value + 'static> {
323    mem: Arc<TransactionalMemory>,
324    transaction_guard: Arc<TransactionGuard>,
325    root: Option<BtreeHeader>,
326    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
327    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
328    _key_type: PhantomData<K>,
329    _value_type: PhantomData<V>,
330    _lifetime: PhantomData<&'a ()>,
331}
332
333impl<K: Key + 'static, V: Value + 'static> BtreeMut<'_, K, V> {
334    pub(crate) fn new(
335        root: Option<BtreeHeader>,
336        guard: Arc<TransactionGuard>,
337        mem: Arc<TransactionalMemory>,
338        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
339        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
340    ) -> Self {
341        Self {
342            mem,
343            transaction_guard: guard,
344            root,
345            freed_pages,
346            allocated_pages,
347            _key_type: Default::default(),
348            _value_type: Default::default(),
349            _lifetime: Default::default(),
350        }
351    }
352
353    pub(crate) fn verify_checksum(&self) -> Result<bool> {
354        RawBtree::new(
355            self.get_root(),
356            K::fixed_width(),
357            V::fixed_width(),
358            self.mem.clone(),
359        )
360        .verify_checksum()
361    }
362
363    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
364        let mut tree = UntypedBtreeMut::new(
365            self.get_root(),
366            self.mem.clone(),
367            self.freed_pages.clone(),
368            K::fixed_width(),
369            V::fixed_width(),
370        );
371        self.root = tree.finalize_dirty_checksums()?;
372        Ok(self.root)
373    }
374
375    #[allow(dead_code)]
376    pub(crate) fn all_pages_iter(&self) -> Result<Option<AllPageNumbersBtreeIter>> {
377        if let Some(root) = self.root.map(|x| x.root) {
378            Ok(Some(AllPageNumbersBtreeIter::new(
379                root,
380                K::fixed_width(),
381                V::fixed_width(),
382                self.mem.clone(),
383            )?))
384        } else {
385            Ok(None)
386        }
387    }
388
389    pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
390    where
391        F: FnMut(&PagePath) -> Result,
392    {
393        self.read_tree()?.visit_all_pages(visitor)
394    }
395
396    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
397        self.root
398    }
399
400    pub(crate) fn set_root(&mut self, root: Option<BtreeHeader>) {
401        self.root = root;
402    }
403
404    pub(crate) fn relocate(
405        &mut self,
406        relocation_map: &HashMap<PageNumber, PageNumber>,
407    ) -> Result<bool> {
408        let mut tree = UntypedBtreeMut::new(
409            self.get_root(),
410            self.mem.clone(),
411            self.freed_pages.clone(),
412            K::fixed_width(),
413            V::fixed_width(),
414        );
415        if tree.relocate(relocation_map)? {
416            self.root = tree.get_root();
417            Ok(true)
418        } else {
419            Ok(false)
420        }
421    }
422
423    pub(crate) fn insert(
424        &mut self,
425        key: &K::SelfType<'_>,
426        value: &V::SelfType<'_>,
427    ) -> Result<Option<AccessGuard<V>>> {
428        #[cfg(feature = "logging")]
429        trace!(
430            "Btree(root={:?}): Inserting {:?} with value of length {}",
431            &self.root,
432            key,
433            V::as_bytes(value).as_ref().len()
434        );
435        let mut freed_pages = self.freed_pages.lock().unwrap();
436        let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
437            &mut self.root,
438            self.mem.clone(),
439            freed_pages.as_mut(),
440            self.allocated_pages.clone(),
441        );
442        let (old_value, _) = operation.insert(key, value)?;
443        Ok(old_value)
444    }
445
446    // Insert without allocating or freeing any pages. This requires that you've previously
447    // inserted the same key, with a value of at least the same serialized length, earlier
448    // in the same transaction. If those preconditions aren't satisfied, insert_inplace()
449    // will panic; it won't allocate under any circumstances
450    pub(crate) fn insert_inplace(
451        &mut self,
452        key: &K::SelfType<'_>,
453        value: &V::SelfType<'_>,
454    ) -> Result<()> {
455        let mut fake_freed_pages = vec![];
456        let fake_allocated_pages = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
457        let mut operation = MutateHelper::<K, V>::new(
458            &mut self.root,
459            self.mem.clone(),
460            fake_freed_pages.as_mut(),
461            fake_allocated_pages,
462        );
463        operation.insert_inplace(key, value)?;
464        assert!(fake_freed_pages.is_empty());
465        Ok(())
466    }
467
468    pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<V>>> {
469        #[cfg(feature = "logging")]
470        trace!("Btree(root={:?}): Deleting {:?}", &self.root, key);
471        let mut freed_pages = self.freed_pages.lock().unwrap();
472        let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
473            &mut self.root,
474            self.mem.clone(),
475            freed_pages.as_mut(),
476            self.allocated_pages.clone(),
477        );
478        let result = operation.delete(key)?;
479        Ok(result)
480    }
481
482    #[allow(dead_code)]
483    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
484        self.read_tree()?.print_debug(include_values)
485    }
486
487    pub(crate) fn stats(&self) -> Result<BtreeStats> {
488        btree_stats(
489            self.get_root().map(|x| x.root),
490            &self.mem,
491            K::fixed_width(),
492            V::fixed_width(),
493        )
494    }
495
496    fn read_tree(&self) -> Result<Btree<K, V>> {
497        Btree::new(
498            self.get_root(),
499            PageHint::None,
500            self.transaction_guard.clone(),
501            self.mem.clone(),
502        )
503    }
504
505    pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'_, V>>> {
506        self.read_tree()?.get(key)
507    }
508
509    pub(crate) fn first(
510        &self,
511    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
512        self.read_tree()?.first()
513    }
514
515    pub(crate) fn last(
516        &self,
517    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
518        self.read_tree()?.last()
519    }
520
521    pub(crate) fn range<'a0, T: RangeBounds<KR> + 'a0, KR: Borrow<K::SelfType<'a0>> + 'a0>(
522        &self,
523        range: &'_ T,
524    ) -> Result<BtreeRangeIter<K, V>>
525    where
526        K: 'a0,
527    {
528        self.read_tree()?.range(range)
529    }
530
531    pub(crate) fn extract_from_if<
532        'a,
533        'a0,
534        T: RangeBounds<KR> + 'a0,
535        KR: Borrow<K::SelfType<'a0>> + 'a0,
536        F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
537    >(
538        &'a mut self,
539        range: &'_ T,
540        predicate: F,
541    ) -> Result<BtreeExtractIf<'a, K, V, F>>
542    where
543        K: 'a0,
544    {
545        let iter = self.range(range)?;
546
547        let result = BtreeExtractIf::new(
548            &mut self.root,
549            iter,
550            predicate,
551            self.freed_pages.clone(),
552            self.allocated_pages.clone(),
553            self.mem.clone(),
554        );
555
556        Ok(result)
557    }
558
559    pub(crate) fn retain_in<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
560        &mut self,
561        mut predicate: F,
562        range: impl RangeBounds<KR> + 'a,
563    ) -> Result
564    where
565        KR: Borrow<K::SelfType<'a>> + 'a,
566    {
567        let iter = self.range(&range)?;
568        let mut freed = vec![];
569        // Do not modify the existing tree, because we're iterating over it concurrently with the removals
570        // TODO: optimize this to iterate and remove at the same time
571        let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
572            &mut self.root,
573            self.mem.clone(),
574            &mut freed,
575            self.allocated_pages.clone(),
576        );
577        for entry in iter {
578            let entry = entry?;
579            if !predicate(entry.key(), entry.value()) {
580                assert!(operation.delete(&entry.key())?.is_some());
581            }
582        }
583        let mut freed_pages = self.freed_pages.lock().unwrap();
584        let mut allocated_pages = self.allocated_pages.lock().unwrap();
585        for page in freed {
586            if !self.mem.free_if_uncommitted(page, &mut allocated_pages) {
587                freed_pages.push(page);
588            }
589        }
590
591        Ok(())
592    }
593
594    pub(crate) fn len(&self) -> Result<u64> {
595        self.read_tree()?.len()
596    }
597}
598
599impl<'a, K: Key + 'a, V: MutInPlaceValue + 'a> BtreeMut<'a, K, V> {
600    /// Reserve space to insert a key-value pair
601    /// The returned reference will have length equal to `value_length`
602    // Return type has the same lifetime as &self, because the tree must not be modified until the mutable guard is dropped
603    pub(crate) fn insert_reserve(
604        &mut self,
605        key: &K::SelfType<'_>,
606        value_length: u32,
607    ) -> Result<AccessGuardMut<V>> {
608        #[cfg(feature = "logging")]
609        trace!(
610            "Btree(root={:?}): Inserting {:?} with {} reserved bytes for the value",
611            &self.root, key, value_length
612        );
613        let mut freed_pages = self.freed_pages.lock().unwrap();
614        let mut value = vec![0u8; value_length as usize];
615        V::initialize(&mut value);
616        let mut operation = MutateHelper::<K, V>::new(
617            &mut self.root,
618            self.mem.clone(),
619            freed_pages.as_mut(),
620            self.allocated_pages.clone(),
621        );
622        let (_, guard) = operation.insert(key, &V::from_bytes(&value))?;
623        Ok(guard)
624    }
625}
626
627pub(crate) struct RawBtree {
628    mem: Arc<TransactionalMemory>,
629    root: Option<BtreeHeader>,
630    fixed_key_size: Option<usize>,
631    fixed_value_size: Option<usize>,
632}
633
634impl RawBtree {
635    pub(crate) fn new(
636        root: Option<BtreeHeader>,
637        fixed_key_size: Option<usize>,
638        fixed_value_size: Option<usize>,
639        mem: Arc<TransactionalMemory>,
640    ) -> Self {
641        Self {
642            mem,
643            root,
644            fixed_key_size,
645            fixed_value_size,
646        }
647    }
648
649    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
650        self.root
651    }
652
653    pub(crate) fn stats(&self) -> Result<BtreeStats> {
654        btree_stats(
655            self.root.map(|x| x.root),
656            &self.mem,
657            self.fixed_key_size,
658            self.fixed_value_size,
659        )
660    }
661
662    pub(crate) fn len(&self) -> Result<u64> {
663        Ok(self.root.map_or(0, |x| x.length))
664    }
665
666    pub(crate) fn verify_checksum(&self) -> Result<bool> {
667        if let Some(header) = self.root {
668            self.verify_checksum_helper(header.root, header.checksum)
669        } else {
670            Ok(true)
671        }
672    }
673
674    fn verify_checksum_helper(
675        &self,
676        page_number: PageNumber,
677        expected_checksum: Checksum,
678    ) -> Result<bool> {
679        let page = self.mem.get_page(page_number)?;
680        let node_mem = page.memory();
681        Ok(match node_mem[0] {
682            LEAF => {
683                if let Ok(computed) =
684                    leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
685                {
686                    expected_checksum == computed
687                } else {
688                    false
689                }
690            }
691            BRANCH => {
692                if let Ok(computed) = branch_checksum(&page, self.fixed_key_size) {
693                    if expected_checksum != computed {
694                        return Ok(false);
695                    }
696                } else {
697                    return Ok(false);
698                }
699                let accessor = BranchAccessor::new(&page, self.fixed_key_size);
700                for i in 0..accessor.count_children() {
701                    if !self.verify_checksum_helper(
702                        accessor.child_page(i).unwrap(),
703                        accessor.child_checksum(i).unwrap(),
704                    )? {
705                        return Ok(false);
706                    }
707                }
708                true
709            }
710            _ => false,
711        })
712    }
713}
714
715pub(crate) struct Btree<K: Key + 'static, V: Value + 'static> {
716    mem: Arc<TransactionalMemory>,
717    transaction_guard: Arc<TransactionGuard>,
718    // Cache of the root page to avoid repeated lookups
719    cached_root: Option<PageImpl>,
720    root: Option<BtreeHeader>,
721    hint: PageHint,
722    _key_type: PhantomData<K>,
723    _value_type: PhantomData<V>,
724}
725
726impl<K: Key, V: Value> Btree<K, V> {
727    pub(crate) fn new(
728        root: Option<BtreeHeader>,
729        hint: PageHint,
730        guard: Arc<TransactionGuard>,
731        mem: Arc<TransactionalMemory>,
732    ) -> Result<Self> {
733        let cached_root = if let Some(header) = root {
734            Some(mem.get_page_extended(header.root, hint)?)
735        } else {
736            None
737        };
738        Ok(Self {
739            mem,
740            transaction_guard: guard,
741            cached_root,
742            root,
743            hint,
744            _key_type: Default::default(),
745            _value_type: Default::default(),
746        })
747    }
748
749    pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
750        &self.transaction_guard
751    }
752
753    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
754        self.root
755    }
756
757    pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
758    where
759        F: FnMut(&PagePath) -> Result,
760    {
761        let tree = UntypedBtree::new(
762            self.root,
763            self.mem.clone(),
764            K::fixed_width(),
765            V::fixed_width(),
766        );
767        tree.visit_all_pages(visitor)
768    }
769
770    pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'static, V>>> {
771        if let Some(ref root_page) = self.cached_root {
772            self.get_helper(root_page.clone(), K::as_bytes(key).as_ref())
773        } else {
774            Ok(None)
775        }
776    }
777
778    // Returns the value for the queried key, if present
779    fn get_helper(&self, page: PageImpl, query: &[u8]) -> Result<Option<AccessGuard<'static, V>>> {
780        let node_mem = page.memory();
781        match node_mem[0] {
782            LEAF => {
783                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
784                if let Some(entry_index) = accessor.find_key::<K>(query) {
785                    let (start, end) = accessor.value_range(entry_index).unwrap();
786                    let guard = AccessGuard::with_page(page, start..end);
787                    Ok(Some(guard))
788                } else {
789                    Ok(None)
790                }
791            }
792            BRANCH => {
793                let accessor = BranchAccessor::new(&page, K::fixed_width());
794                let (_, child_page) = accessor.child_for_key::<K>(query);
795                self.get_helper(self.mem.get_page_extended(child_page, self.hint)?, query)
796            }
797            _ => unreachable!(),
798        }
799    }
800
801    pub(crate) fn first(
802        &self,
803    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
804        if let Some(ref root) = self.cached_root {
805            self.first_helper(root.clone())
806        } else {
807            Ok(None)
808        }
809    }
810
811    fn first_helper(
812        &self,
813        page: PageImpl,
814    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
815        let node_mem = page.memory();
816        match node_mem[0] {
817            LEAF => {
818                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
819                let (key_range, value_range) = accessor.entry_ranges(0).unwrap();
820                let key_guard = AccessGuard::with_page(page.clone(), key_range);
821                let value_guard = AccessGuard::with_page(page, value_range);
822                Ok(Some((key_guard, value_guard)))
823            }
824            BRANCH => {
825                let accessor = BranchAccessor::new(&page, K::fixed_width());
826                let child_page = accessor.child_page(0).unwrap();
827                self.first_helper(self.mem.get_page_extended(child_page, self.hint)?)
828            }
829            _ => unreachable!(),
830        }
831    }
832
833    pub(crate) fn last(
834        &self,
835    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
836        if let Some(ref root) = self.cached_root {
837            self.last_helper(root.clone())
838        } else {
839            Ok(None)
840        }
841    }
842
843    fn last_helper(
844        &self,
845        page: PageImpl,
846    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
847        let node_mem = page.memory();
848        match node_mem[0] {
849            LEAF => {
850                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
851                let (key_range, value_range) =
852                    accessor.entry_ranges(accessor.num_pairs() - 1).unwrap();
853                let key_guard = AccessGuard::with_page(page.clone(), key_range);
854                let value_guard = AccessGuard::with_page(page, value_range);
855                Ok(Some((key_guard, value_guard)))
856            }
857            BRANCH => {
858                let accessor = BranchAccessor::new(&page, K::fixed_width());
859                let child_page = accessor.child_page(accessor.count_children() - 1).unwrap();
860                self.last_helper(self.mem.get_page_extended(child_page, self.hint)?)
861            }
862            _ => unreachable!(),
863        }
864    }
865
866    pub(crate) fn range<'a0, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a0>>>(
867        &self,
868        range: &'_ T,
869    ) -> Result<BtreeRangeIter<K, V>> {
870        BtreeRangeIter::new(range, self.root.map(|x| x.root), self.mem.clone())
871    }
872
873    pub(crate) fn len(&self) -> Result<u64> {
874        Ok(self.root.map_or(0, |x| x.length))
875    }
876
877    pub(crate) fn stats(&self) -> Result<BtreeStats> {
878        btree_stats(
879            self.root.map(|x| x.root),
880            &self.mem,
881            K::fixed_width(),
882            V::fixed_width(),
883        )
884    }
885
886    #[allow(dead_code)]
887    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
888        if let Some(p) = self.root.map(|x| x.root) {
889            let mut pages = vec![self.mem.get_page(p)?];
890            while !pages.is_empty() {
891                let mut next_children = vec![];
892                for page in pages.drain(..) {
893                    let node_mem = page.memory();
894                    match node_mem[0] {
895                        LEAF => {
896                            eprint!("Leaf[ (page={:?})", page.get_page_number());
897                            LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width())
898                                .print_node::<K, V>(include_values);
899                            eprint!("]");
900                        }
901                        BRANCH => {
902                            let accessor = BranchAccessor::new(&page, K::fixed_width());
903                            for i in 0..accessor.count_children() {
904                                let child = accessor.child_page(i).unwrap();
905                                next_children.push(self.mem.get_page(child)?);
906                            }
907                            accessor.print_node::<K>();
908                        }
909                        _ => unreachable!(),
910                    }
911                    eprint!("  ");
912                }
913                eprintln!();
914
915                pages = next_children;
916            }
917        }
918
919        Ok(())
920    }
921}
922
923impl<K: Key, V: Value> Drop for Btree<K, V> {
924    fn drop(&mut self) {
925        // Make sure that we clear our reference to the root page, before the transaction guard goes out of scope
926        self.cached_root = None;
927    }
928}
929
930pub(crate) fn btree_stats(
931    root: Option<PageNumber>,
932    mem: &TransactionalMemory,
933    fixed_key_size: Option<usize>,
934    fixed_value_size: Option<usize>,
935) -> Result<BtreeStats> {
936    if let Some(root) = root {
937        stats_helper(root, mem, fixed_key_size, fixed_value_size)
938    } else {
939        Ok(BtreeStats {
940            tree_height: 0,
941            leaf_pages: 0,
942            branch_pages: 0,
943            stored_leaf_bytes: 0,
944            metadata_bytes: 0,
945            fragmented_bytes: 0,
946        })
947    }
948}
949
950fn stats_helper(
951    page_number: PageNumber,
952    mem: &TransactionalMemory,
953    fixed_key_size: Option<usize>,
954    fixed_value_size: Option<usize>,
955) -> Result<BtreeStats> {
956    let page = mem.get_page(page_number)?;
957    let node_mem = page.memory();
958    match node_mem[0] {
959        LEAF => {
960            let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
961            let leaf_bytes = accessor.length_of_pairs(0, accessor.num_pairs());
962            let overhead_bytes = accessor.total_length() - leaf_bytes;
963            let fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
964            Ok(BtreeStats {
965                tree_height: 1,
966                leaf_pages: 1,
967                branch_pages: 0,
968                stored_leaf_bytes: leaf_bytes.try_into().unwrap(),
969                metadata_bytes: overhead_bytes.try_into().unwrap(),
970                fragmented_bytes,
971            })
972        }
973        BRANCH => {
974            let accessor = BranchAccessor::new(&page, fixed_key_size);
975            let mut max_child_height = 0;
976            let mut leaf_pages = 0;
977            let mut branch_pages = 1;
978            let mut stored_leaf_bytes = 0;
979            let mut metadata_bytes = accessor.total_length() as u64;
980            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
981            for i in 0..accessor.count_children() {
982                if let Some(child) = accessor.child_page(i) {
983                    let stats = stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
984                    max_child_height = max(max_child_height, stats.tree_height);
985                    leaf_pages += stats.leaf_pages;
986                    branch_pages += stats.branch_pages;
987                    stored_leaf_bytes += stats.stored_leaf_bytes;
988                    metadata_bytes += stats.metadata_bytes;
989                    fragmented_bytes += stats.fragmented_bytes;
990                }
991            }
992
993            Ok(BtreeStats {
994                tree_height: max_child_height + 1,
995                leaf_pages,
996                branch_pages,
997                stored_leaf_bytes,
998                metadata_bytes,
999                fragmented_bytes,
1000            })
1001        }
1002        _ => unreachable!(),
1003    }
1004}