redb/tree_store/
btree.rs

1use crate::db::TransactionGuard;
2use crate::tree_store::btree_base::{
3    BRANCH, BranchAccessor, BranchMutator, BtreeHeader, Checksum, DEFERRED, LEAF, LeafAccessor,
4    branch_checksum, leaf_checksum,
5};
6use crate::tree_store::btree_iters::BtreeExtractIf;
7use crate::tree_store::btree_mutator::MutateHelper;
8use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory};
9use crate::tree_store::{
10    AccessGuardMut, AllPageNumbersBtreeIter, BtreeRangeIter, PageHint, PageNumber,
11};
12use crate::types::{Key, MutInPlaceValue, Value};
13use crate::{AccessGuard, Result};
14#[cfg(feature = "logging")]
15use log::trace;
16use std::borrow::Borrow;
17use std::cmp::max;
18use std::collections::HashMap;
19use std::marker::PhantomData;
20use std::ops::RangeBounds;
21use std::sync::{Arc, Mutex};
22
23pub(crate) struct BtreeStats {
24    pub(crate) tree_height: u32,
25    pub(crate) leaf_pages: u64,
26    pub(crate) branch_pages: u64,
27    pub(crate) stored_leaf_bytes: u64,
28    pub(crate) metadata_bytes: u64,
29    pub(crate) fragmented_bytes: u64,
30}
31
32#[derive(Clone)]
33pub(crate) struct PagePath {
34    path: Vec<PageNumber>,
35}
36
37impl PagePath {
38    pub(crate) fn new_root(page_number: PageNumber) -> Self {
39        Self {
40            path: vec![page_number],
41        }
42    }
43
44    pub(crate) fn with_child(&self, page_number: PageNumber) -> Self {
45        let mut path = self.path.clone();
46        path.push(page_number);
47        Self { path }
48    }
49
50    pub(crate) fn with_subpath(&self, other: &Self) -> Self {
51        let mut path = self.path.clone();
52        path.extend(&other.path);
53        Self { path }
54    }
55
56    pub(crate) fn parents(&self) -> &[PageNumber] {
57        &self.path[..self.path.len() - 1]
58    }
59
60    pub(crate) fn page_number(&self) -> PageNumber {
61        self.path[self.path.len() - 1]
62    }
63}
64
65pub(crate) struct UntypedBtree {
66    mem: Arc<TransactionalMemory>,
67    root: Option<BtreeHeader>,
68    key_width: Option<usize>,
69    _value_width: Option<usize>,
70}
71
72impl UntypedBtree {
73    pub(crate) fn new(
74        root: Option<BtreeHeader>,
75        mem: Arc<TransactionalMemory>,
76        key_width: Option<usize>,
77        value_width: Option<usize>,
78    ) -> Self {
79        Self {
80            mem,
81            root,
82            key_width,
83            _value_width: value_width,
84        }
85    }
86
87    // Applies visitor to pages in the tree
88    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
89    where
90        F: FnMut(&PagePath) -> Result,
91    {
92        if let Some(page_number) = self.root.map(|x| x.root) {
93            self.visit_pages_helper(PagePath::new_root(page_number), &mut visitor)?;
94        }
95
96        Ok(())
97    }
98
99    fn visit_pages_helper<F>(&self, path: PagePath, visitor: &mut F) -> Result
100    where
101        F: FnMut(&PagePath) -> Result,
102    {
103        visitor(&path)?;
104        let page = self.mem.get_page(path.page_number())?;
105
106        match page.memory()[0] {
107            LEAF => {
108                // No-op
109            }
110            BRANCH => {
111                let accessor = BranchAccessor::new(&page, self.key_width);
112                for i in 0..accessor.count_children() {
113                    let child_page = accessor.child_page(i).unwrap();
114                    let child_path = path.with_child(child_page);
115                    self.visit_pages_helper(child_path, visitor)?;
116                }
117            }
118            _ => unreachable!(),
119        }
120
121        Ok(())
122    }
123}
124
125pub(crate) struct UntypedBtreeMut {
126    mem: Arc<TransactionalMemory>,
127    root: Option<BtreeHeader>,
128    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
129    key_width: Option<usize>,
130    value_width: Option<usize>,
131}
132
133impl UntypedBtreeMut {
134    pub(crate) fn new(
135        root: Option<BtreeHeader>,
136        mem: Arc<TransactionalMemory>,
137        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
138        key_width: Option<usize>,
139        value_width: Option<usize>,
140    ) -> Self {
141        Self {
142            mem,
143            root,
144            freed_pages,
145            key_width,
146            value_width,
147        }
148    }
149
150    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
151        self.root
152    }
153
154    // Recomputes the checksum for all pages that are uncommitted
155    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
156        let mut root = self.root;
157        if let Some(BtreeHeader {
158            root: ref p,
159            ref mut checksum,
160            length: _,
161        }) = root
162        {
163            if !self.mem.uncommitted(*p) {
164                // root page is clean
165                return Ok(root);
166            }
167
168            *checksum = self.finalize_dirty_checksums_helper(*p)?;
169            self.root = root;
170        }
171
172        Ok(root)
173    }
174
175    fn finalize_dirty_checksums_helper(&mut self, page_number: PageNumber) -> Result<Checksum> {
176        assert!(self.mem.uncommitted(page_number));
177        let mut page = self.mem.get_page_mut(page_number)?;
178
179        match page.memory()[0] {
180            LEAF => leaf_checksum(&page, self.key_width, self.value_width),
181            BRANCH => {
182                let accessor = BranchAccessor::new(&page, self.key_width);
183                let mut new_children = vec![];
184                for i in 0..accessor.count_children() {
185                    let child_page = accessor.child_page(i).unwrap();
186                    if self.mem.uncommitted(child_page) {
187                        let new_checksum = self.finalize_dirty_checksums_helper(child_page)?;
188                        new_children.push(Some((i, child_page, new_checksum)));
189                    } else {
190                        // Child is clean, skip it
191                        new_children.push(None);
192                    }
193                }
194
195                let mut mutator = BranchMutator::new(&mut page);
196                for (child_index, child_page, child_checksum) in new_children.into_iter().flatten()
197                {
198                    mutator.write_child_page(child_index, child_page, child_checksum);
199                }
200
201                branch_checksum(&page, self.key_width)
202            }
203            _ => unreachable!(),
204        }
205    }
206
207    // Applies visitor to all dirty leaf pages in the tree
208    pub(crate) fn dirty_leaf_visitor<F>(&mut self, visitor: F) -> Result
209    where
210        F: Fn(PageMut) -> Result,
211    {
212        if let Some(page_number) = self.root.map(|x| x.root) {
213            if !self.mem.uncommitted(page_number) {
214                // root page is clean
215                return Ok(());
216            }
217
218            let page = self.mem.get_page_mut(page_number)?;
219            match page.memory()[0] {
220                LEAF => {
221                    visitor(page)?;
222                }
223                BRANCH => {
224                    drop(page);
225                    self.dirty_leaf_visitor_helper(page_number, &visitor)?;
226                }
227                _ => unreachable!(),
228            }
229        }
230
231        Ok(())
232    }
233
234    fn dirty_leaf_visitor_helper<F>(&mut self, page_number: PageNumber, visitor: &F) -> Result
235    where
236        F: Fn(PageMut) -> Result,
237    {
238        assert!(self.mem.uncommitted(page_number));
239        let page = self.mem.get_page_mut(page_number)?;
240
241        match page.memory()[0] {
242            LEAF => {
243                visitor(page)?;
244            }
245            BRANCH => {
246                let accessor = BranchAccessor::new(&page, self.key_width);
247                for i in 0..accessor.count_children() {
248                    let child_page = accessor.child_page(i).unwrap();
249                    if self.mem.uncommitted(child_page) {
250                        self.dirty_leaf_visitor_helper(child_page, visitor)?;
251                    }
252                }
253            }
254            _ => unreachable!(),
255        }
256
257        Ok(())
258    }
259
260    pub(crate) fn relocate(
261        &mut self,
262        relocation_map: &HashMap<PageNumber, PageNumber>,
263    ) -> Result<bool> {
264        if let Some(root) = self.get_root() {
265            if let Some((new_root, new_checksum)) =
266                self.relocate_helper(root.root, relocation_map)?
267            {
268                self.root = Some(BtreeHeader::new(new_root, new_checksum, root.length));
269                return Ok(true);
270            }
271        }
272        Ok(false)
273    }
274
275    // Relocates the given subtree to the pages specified in relocation_map
276    fn relocate_helper(
277        &mut self,
278        page_number: PageNumber,
279        relocation_map: &HashMap<PageNumber, PageNumber>,
280    ) -> Result<Option<(PageNumber, Checksum)>> {
281        let old_page = self.mem.get_page(page_number)?;
282        let mut new_page = if let Some(new_page_number) = relocation_map.get(&page_number) {
283            self.mem.get_page_mut(*new_page_number)?
284        } else {
285            return Ok(None);
286        };
287        new_page.memory_mut().copy_from_slice(old_page.memory());
288
289        let node_mem = old_page.memory();
290        match node_mem[0] {
291            LEAF => {
292                // No-op
293            }
294            BRANCH => {
295                let accessor = BranchAccessor::new(&old_page, self.key_width);
296                let mut mutator = BranchMutator::new(&mut new_page);
297                for i in 0..accessor.count_children() {
298                    let child = accessor.child_page(i).unwrap();
299                    if let Some((new_child, new_checksum)) =
300                        self.relocate_helper(child, relocation_map)?
301                    {
302                        mutator.write_child_page(i, new_child, new_checksum);
303                    }
304                }
305            }
306            _ => unreachable!(),
307        }
308
309        let mut freed_pages = self.freed_pages.lock().unwrap();
310        if !self.mem.free_if_uncommitted(page_number) {
311            freed_pages.push(page_number);
312        }
313
314        Ok(Some((new_page.get_page_number(), DEFERRED)))
315    }
316}
317
318pub(crate) struct BtreeMut<'a, K: Key + 'static, V: Value + 'static> {
319    mem: Arc<TransactionalMemory>,
320    transaction_guard: Arc<TransactionGuard>,
321    root: Option<BtreeHeader>,
322    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
323    _key_type: PhantomData<K>,
324    _value_type: PhantomData<V>,
325    _lifetime: PhantomData<&'a ()>,
326}
327
328impl<K: Key + 'static, V: Value + 'static> BtreeMut<'_, K, V> {
329    pub(crate) fn new(
330        root: Option<BtreeHeader>,
331        guard: Arc<TransactionGuard>,
332        mem: Arc<TransactionalMemory>,
333        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
334    ) -> Self {
335        Self {
336            mem,
337            transaction_guard: guard,
338            root,
339            freed_pages,
340            _key_type: Default::default(),
341            _value_type: Default::default(),
342            _lifetime: Default::default(),
343        }
344    }
345
346    pub(crate) fn verify_checksum(&self) -> Result<bool> {
347        RawBtree::new(
348            self.get_root(),
349            K::fixed_width(),
350            V::fixed_width(),
351            self.mem.clone(),
352        )
353        .verify_checksum()
354    }
355
356    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
357        let mut tree = UntypedBtreeMut::new(
358            self.get_root(),
359            self.mem.clone(),
360            self.freed_pages.clone(),
361            K::fixed_width(),
362            V::fixed_width(),
363        );
364        self.root = tree.finalize_dirty_checksums()?;
365        Ok(self.root)
366    }
367
368    #[allow(dead_code)]
369    pub(crate) fn all_pages_iter(&self) -> Result<Option<AllPageNumbersBtreeIter>> {
370        if let Some(root) = self.root.map(|x| x.root) {
371            Ok(Some(AllPageNumbersBtreeIter::new(
372                root,
373                K::fixed_width(),
374                V::fixed_width(),
375                self.mem.clone(),
376            )?))
377        } else {
378            Ok(None)
379        }
380    }
381
382    pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
383    where
384        F: FnMut(&PagePath) -> Result,
385    {
386        self.read_tree()?.visit_all_pages(visitor)
387    }
388
389    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
390        self.root
391    }
392
393    pub(crate) fn relocate(
394        &mut self,
395        relocation_map: &HashMap<PageNumber, PageNumber>,
396    ) -> Result<bool> {
397        let mut tree = UntypedBtreeMut::new(
398            self.get_root(),
399            self.mem.clone(),
400            self.freed_pages.clone(),
401            K::fixed_width(),
402            V::fixed_width(),
403        );
404        if tree.relocate(relocation_map)? {
405            self.root = tree.get_root();
406            Ok(true)
407        } else {
408            Ok(false)
409        }
410    }
411
412    pub(crate) fn insert(
413        &mut self,
414        key: &K::SelfType<'_>,
415        value: &V::SelfType<'_>,
416    ) -> Result<Option<AccessGuard<V>>> {
417        #[cfg(feature = "logging")]
418        trace!(
419            "Btree(root={:?}): Inserting {:?} with value of length {}",
420            &self.root,
421            key,
422            V::as_bytes(value).as_ref().len()
423        );
424        let mut freed_pages = self.freed_pages.lock().unwrap();
425        let mut operation: MutateHelper<'_, '_, K, V> =
426            MutateHelper::new(&mut self.root, self.mem.clone(), freed_pages.as_mut());
427        let (old_value, _) = operation.insert(key, value)?;
428        Ok(old_value)
429    }
430
431    // Insert without allocating or freeing any pages. This requires that you've previously
432    // inserted the same key, with a value of at least the same serialized length, earlier
433    // in the same transaction. If those preconditions aren't satisfied, insert_inplace()
434    // will panic; it won't allocate under any circumstances
435    pub(crate) fn insert_inplace(
436        &mut self,
437        key: &K::SelfType<'_>,
438        value: &V::SelfType<'_>,
439    ) -> Result<()> {
440        let mut fake_freed_pages = vec![];
441        let mut operation =
442            MutateHelper::<K, V>::new(&mut self.root, self.mem.clone(), fake_freed_pages.as_mut());
443        operation.insert_inplace(key, value)?;
444        assert!(fake_freed_pages.is_empty());
445        Ok(())
446    }
447
448    pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<V>>> {
449        #[cfg(feature = "logging")]
450        trace!("Btree(root={:?}): Deleting {:?}", &self.root, key);
451        let mut freed_pages = self.freed_pages.lock().unwrap();
452        let mut operation: MutateHelper<'_, '_, K, V> =
453            MutateHelper::new(&mut self.root, self.mem.clone(), freed_pages.as_mut());
454        let result = operation.delete(key)?;
455        Ok(result)
456    }
457
458    #[allow(dead_code)]
459    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
460        self.read_tree()?.print_debug(include_values)
461    }
462
463    pub(crate) fn stats(&self) -> Result<BtreeStats> {
464        btree_stats(
465            self.get_root().map(|x| x.root),
466            &self.mem,
467            K::fixed_width(),
468            V::fixed_width(),
469        )
470    }
471
472    fn read_tree(&self) -> Result<Btree<K, V>> {
473        Btree::new(
474            self.get_root(),
475            PageHint::None,
476            self.transaction_guard.clone(),
477            self.mem.clone(),
478        )
479    }
480
481    pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'_, V>>> {
482        self.read_tree()?.get(key)
483    }
484
485    pub(crate) fn first(
486        &self,
487    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
488        self.read_tree()?.first()
489    }
490
491    pub(crate) fn last(
492        &self,
493    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
494        self.read_tree()?.last()
495    }
496
497    pub(crate) fn range<'a0, T: RangeBounds<KR> + 'a0, KR: Borrow<K::SelfType<'a0>> + 'a0>(
498        &self,
499        range: &'_ T,
500    ) -> Result<BtreeRangeIter<K, V>>
501    where
502        K: 'a0,
503    {
504        self.read_tree()?.range(range)
505    }
506
507    pub(crate) fn extract_from_if<
508        'a,
509        'a0,
510        T: RangeBounds<KR> + 'a0,
511        KR: Borrow<K::SelfType<'a0>> + 'a0,
512        F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
513    >(
514        &'a mut self,
515        range: &'_ T,
516        predicate: F,
517    ) -> Result<BtreeExtractIf<'a, K, V, F>>
518    where
519        K: 'a0,
520    {
521        let iter = self.range(range)?;
522
523        let result = BtreeExtractIf::new(
524            &mut self.root,
525            iter,
526            predicate,
527            self.freed_pages.clone(),
528            self.mem.clone(),
529        );
530
531        Ok(result)
532    }
533
534    pub(crate) fn retain_in<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
535        &mut self,
536        mut predicate: F,
537        range: impl RangeBounds<KR> + 'a,
538    ) -> Result
539    where
540        KR: Borrow<K::SelfType<'a>> + 'a,
541    {
542        let iter = self.range(&range)?;
543        let mut freed = vec![];
544        // Do not modify the existing tree, because we're iterating over it concurrently with the removals
545        // TODO: optimize this to iterate and remove at the same time
546        let mut operation: MutateHelper<'_, '_, K, V> =
547            MutateHelper::new_do_not_modify(&mut self.root, self.mem.clone(), &mut freed);
548        for entry in iter {
549            let entry = entry?;
550            if !predicate(entry.key(), entry.value()) {
551                assert!(operation.delete(&entry.key())?.is_some());
552            }
553        }
554        let mut freed_pages = self.freed_pages.lock().unwrap();
555        for page in freed {
556            if !self.mem.free_if_uncommitted(page) {
557                freed_pages.push(page);
558            }
559        }
560
561        Ok(())
562    }
563
564    pub(crate) fn len(&self) -> Result<u64> {
565        self.read_tree()?.len()
566    }
567}
568
569impl<'a, K: Key + 'a, V: MutInPlaceValue + 'a> BtreeMut<'a, K, V> {
570    /// Reserve space to insert a key-value pair
571    /// The returned reference will have length equal to `value_length`
572    // Return type has the same lifetime as &self, because the tree must not be modified until the mutable guard is dropped
573    pub(crate) fn insert_reserve(
574        &mut self,
575        key: &K::SelfType<'_>,
576        value_length: u32,
577    ) -> Result<AccessGuardMut<V>> {
578        #[cfg(feature = "logging")]
579        trace!(
580            "Btree(root={:?}): Inserting {:?} with {} reserved bytes for the value",
581            &self.root, key, value_length
582        );
583        let mut freed_pages = self.freed_pages.lock().unwrap();
584        let mut value = vec![0u8; value_length as usize];
585        V::initialize(&mut value);
586        let mut operation =
587            MutateHelper::<K, V>::new(&mut self.root, self.mem.clone(), freed_pages.as_mut());
588        let (_, guard) = operation.insert(key, &V::from_bytes(&value))?;
589        Ok(guard)
590    }
591}
592
593pub(crate) struct RawBtree {
594    mem: Arc<TransactionalMemory>,
595    root: Option<BtreeHeader>,
596    fixed_key_size: Option<usize>,
597    fixed_value_size: Option<usize>,
598}
599
600impl RawBtree {
601    pub(crate) fn new(
602        root: Option<BtreeHeader>,
603        fixed_key_size: Option<usize>,
604        fixed_value_size: Option<usize>,
605        mem: Arc<TransactionalMemory>,
606    ) -> Self {
607        Self {
608            mem,
609            root,
610            fixed_key_size,
611            fixed_value_size,
612        }
613    }
614
615    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
616        self.root
617    }
618
619    pub(crate) fn stats(&self) -> Result<BtreeStats> {
620        btree_stats(
621            self.root.map(|x| x.root),
622            &self.mem,
623            self.fixed_key_size,
624            self.fixed_value_size,
625        )
626    }
627
628    pub(crate) fn len(&self) -> Result<u64> {
629        Ok(self.root.map_or(0, |x| x.length))
630    }
631
632    pub(crate) fn verify_checksum(&self) -> Result<bool> {
633        if let Some(header) = self.root {
634            self.verify_checksum_helper(header.root, header.checksum)
635        } else {
636            Ok(true)
637        }
638    }
639
640    fn verify_checksum_helper(
641        &self,
642        page_number: PageNumber,
643        expected_checksum: Checksum,
644    ) -> Result<bool> {
645        let page = self.mem.get_page(page_number)?;
646        let node_mem = page.memory();
647        Ok(match node_mem[0] {
648            LEAF => {
649                if let Ok(computed) =
650                    leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
651                {
652                    expected_checksum == computed
653                } else {
654                    false
655                }
656            }
657            BRANCH => {
658                if let Ok(computed) = branch_checksum(&page, self.fixed_key_size) {
659                    if expected_checksum != computed {
660                        return Ok(false);
661                    }
662                } else {
663                    return Ok(false);
664                }
665                let accessor = BranchAccessor::new(&page, self.fixed_key_size);
666                for i in 0..accessor.count_children() {
667                    if !self.verify_checksum_helper(
668                        accessor.child_page(i).unwrap(),
669                        accessor.child_checksum(i).unwrap(),
670                    )? {
671                        return Ok(false);
672                    }
673                }
674                true
675            }
676            _ => false,
677        })
678    }
679}
680
681pub(crate) struct Btree<K: Key + 'static, V: Value + 'static> {
682    mem: Arc<TransactionalMemory>,
683    transaction_guard: Arc<TransactionGuard>,
684    // Cache of the root page to avoid repeated lookups
685    cached_root: Option<PageImpl>,
686    root: Option<BtreeHeader>,
687    hint: PageHint,
688    _key_type: PhantomData<K>,
689    _value_type: PhantomData<V>,
690}
691
692impl<K: Key, V: Value> Btree<K, V> {
693    pub(crate) fn new(
694        root: Option<BtreeHeader>,
695        hint: PageHint,
696        guard: Arc<TransactionGuard>,
697        mem: Arc<TransactionalMemory>,
698    ) -> Result<Self> {
699        let cached_root = if let Some(header) = root {
700            Some(mem.get_page_extended(header.root, hint)?)
701        } else {
702            None
703        };
704        Ok(Self {
705            mem,
706            transaction_guard: guard,
707            cached_root,
708            root,
709            hint,
710            _key_type: Default::default(),
711            _value_type: Default::default(),
712        })
713    }
714
715    pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
716        &self.transaction_guard
717    }
718
719    pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
720        self.root
721    }
722
723    pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
724    where
725        F: FnMut(&PagePath) -> Result,
726    {
727        let tree = UntypedBtree::new(
728            self.root,
729            self.mem.clone(),
730            K::fixed_width(),
731            V::fixed_width(),
732        );
733        tree.visit_all_pages(visitor)
734    }
735
736    pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'static, V>>> {
737        if let Some(ref root_page) = self.cached_root {
738            self.get_helper(root_page.clone(), K::as_bytes(key).as_ref())
739        } else {
740            Ok(None)
741        }
742    }
743
744    // Returns the value for the queried key, if present
745    fn get_helper(&self, page: PageImpl, query: &[u8]) -> Result<Option<AccessGuard<'static, V>>> {
746        let node_mem = page.memory();
747        match node_mem[0] {
748            LEAF => {
749                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
750                if let Some(entry_index) = accessor.find_key::<K>(query) {
751                    let (start, end) = accessor.value_range(entry_index).unwrap();
752                    let guard = AccessGuard::with_page(page, start..end);
753                    Ok(Some(guard))
754                } else {
755                    Ok(None)
756                }
757            }
758            BRANCH => {
759                let accessor = BranchAccessor::new(&page, K::fixed_width());
760                let (_, child_page) = accessor.child_for_key::<K>(query);
761                self.get_helper(self.mem.get_page_extended(child_page, self.hint)?, query)
762            }
763            _ => unreachable!(),
764        }
765    }
766
767    pub(crate) fn first(
768        &self,
769    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
770        if let Some(ref root) = self.cached_root {
771            self.first_helper(root.clone())
772        } else {
773            Ok(None)
774        }
775    }
776
777    fn first_helper(
778        &self,
779        page: PageImpl,
780    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
781        let node_mem = page.memory();
782        match node_mem[0] {
783            LEAF => {
784                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
785                let (key_range, value_range) = accessor.entry_ranges(0).unwrap();
786                let key_guard = AccessGuard::with_page(page.clone(), key_range);
787                let value_guard = AccessGuard::with_page(page, value_range);
788                Ok(Some((key_guard, value_guard)))
789            }
790            BRANCH => {
791                let accessor = BranchAccessor::new(&page, K::fixed_width());
792                let child_page = accessor.child_page(0).unwrap();
793                self.first_helper(self.mem.get_page_extended(child_page, self.hint)?)
794            }
795            _ => unreachable!(),
796        }
797    }
798
799    pub(crate) fn last(
800        &self,
801    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
802        if let Some(ref root) = self.cached_root {
803            self.last_helper(root.clone())
804        } else {
805            Ok(None)
806        }
807    }
808
809    fn last_helper(
810        &self,
811        page: PageImpl,
812    ) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
813        let node_mem = page.memory();
814        match node_mem[0] {
815            LEAF => {
816                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
817                let (key_range, value_range) =
818                    accessor.entry_ranges(accessor.num_pairs() - 1).unwrap();
819                let key_guard = AccessGuard::with_page(page.clone(), key_range);
820                let value_guard = AccessGuard::with_page(page, value_range);
821                Ok(Some((key_guard, value_guard)))
822            }
823            BRANCH => {
824                let accessor = BranchAccessor::new(&page, K::fixed_width());
825                let child_page = accessor.child_page(accessor.count_children() - 1).unwrap();
826                self.last_helper(self.mem.get_page_extended(child_page, self.hint)?)
827            }
828            _ => unreachable!(),
829        }
830    }
831
832    pub(crate) fn range<'a0, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a0>>>(
833        &self,
834        range: &'_ T,
835    ) -> Result<BtreeRangeIter<K, V>> {
836        BtreeRangeIter::new(range, self.root.map(|x| x.root), self.mem.clone())
837    }
838
839    pub(crate) fn len(&self) -> Result<u64> {
840        Ok(self.root.map_or(0, |x| x.length))
841    }
842
843    pub(crate) fn stats(&self) -> Result<BtreeStats> {
844        btree_stats(
845            self.root.map(|x| x.root),
846            &self.mem,
847            K::fixed_width(),
848            V::fixed_width(),
849        )
850    }
851
852    #[allow(dead_code)]
853    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
854        if let Some(p) = self.root.map(|x| x.root) {
855            let mut pages = vec![self.mem.get_page(p)?];
856            while !pages.is_empty() {
857                let mut next_children = vec![];
858                for page in pages.drain(..) {
859                    let node_mem = page.memory();
860                    match node_mem[0] {
861                        LEAF => {
862                            eprint!("Leaf[ (page={:?})", page.get_page_number());
863                            LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width())
864                                .print_node::<K, V>(include_values);
865                            eprint!("]");
866                        }
867                        BRANCH => {
868                            let accessor = BranchAccessor::new(&page, K::fixed_width());
869                            for i in 0..accessor.count_children() {
870                                let child = accessor.child_page(i).unwrap();
871                                next_children.push(self.mem.get_page(child)?);
872                            }
873                            accessor.print_node::<K>();
874                        }
875                        _ => unreachable!(),
876                    }
877                    eprint!("  ");
878                }
879                eprintln!();
880
881                pages = next_children;
882            }
883        }
884
885        Ok(())
886    }
887}
888
889impl<K: Key, V: Value> Drop for Btree<K, V> {
890    fn drop(&mut self) {
891        // Make sure that we clear our reference to the root page, before the transaction guard goes out of scope
892        self.cached_root = None;
893    }
894}
895
896pub(crate) fn btree_stats(
897    root: Option<PageNumber>,
898    mem: &TransactionalMemory,
899    fixed_key_size: Option<usize>,
900    fixed_value_size: Option<usize>,
901) -> Result<BtreeStats> {
902    if let Some(root) = root {
903        stats_helper(root, mem, fixed_key_size, fixed_value_size)
904    } else {
905        Ok(BtreeStats {
906            tree_height: 0,
907            leaf_pages: 0,
908            branch_pages: 0,
909            stored_leaf_bytes: 0,
910            metadata_bytes: 0,
911            fragmented_bytes: 0,
912        })
913    }
914}
915
916fn stats_helper(
917    page_number: PageNumber,
918    mem: &TransactionalMemory,
919    fixed_key_size: Option<usize>,
920    fixed_value_size: Option<usize>,
921) -> Result<BtreeStats> {
922    let page = mem.get_page(page_number)?;
923    let node_mem = page.memory();
924    match node_mem[0] {
925        LEAF => {
926            let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
927            let leaf_bytes = accessor.length_of_pairs(0, accessor.num_pairs());
928            let overhead_bytes = accessor.total_length() - leaf_bytes;
929            let fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
930            Ok(BtreeStats {
931                tree_height: 1,
932                leaf_pages: 1,
933                branch_pages: 0,
934                stored_leaf_bytes: leaf_bytes.try_into().unwrap(),
935                metadata_bytes: overhead_bytes.try_into().unwrap(),
936                fragmented_bytes,
937            })
938        }
939        BRANCH => {
940            let accessor = BranchAccessor::new(&page, fixed_key_size);
941            let mut max_child_height = 0;
942            let mut leaf_pages = 0;
943            let mut branch_pages = 1;
944            let mut stored_leaf_bytes = 0;
945            let mut metadata_bytes = accessor.total_length() as u64;
946            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
947            for i in 0..accessor.count_children() {
948                if let Some(child) = accessor.child_page(i) {
949                    let stats = stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
950                    max_child_height = max(max_child_height, stats.tree_height);
951                    leaf_pages += stats.leaf_pages;
952                    branch_pages += stats.branch_pages;
953                    stored_leaf_bytes += stats.stored_leaf_bytes;
954                    metadata_bytes += stats.metadata_bytes;
955                    fragmented_bytes += stats.fragmented_bytes;
956                }
957            }
958
959            Ok(BtreeStats {
960                tree_height: max_child_height + 1,
961                leaf_pages,
962                branch_pages,
963                stored_leaf_bytes,
964                metadata_bytes,
965                fragmented_bytes,
966            })
967        }
968        _ => unreachable!(),
969    }
970}