redb/tree_store/
btree_iters.rs

1use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
4use crate::tree_store::btree_mutator::MutateHelper;
5use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
6use crate::tree_store::{BtreeHeader, PageNumber};
7use crate::types::{Key, Value};
8use crate::Result;
9use std::borrow::Borrow;
10use std::collections::Bound;
11use std::marker::PhantomData;
12use std::ops::{Range, RangeBounds};
13use std::sync::{Arc, Mutex};
14
15#[derive(Debug, Clone)]
16pub enum RangeIterState {
17    Leaf {
18        page: PageImpl,
19        fixed_key_size: Option<usize>,
20        fixed_value_size: Option<usize>,
21        entry: usize,
22        parent: Option<Box<RangeIterState>>,
23    },
24    Internal {
25        page: PageImpl,
26        fixed_key_size: Option<usize>,
27        fixed_value_size: Option<usize>,
28        child: usize,
29        parent: Option<Box<RangeIterState>>,
30    },
31}
32
33impl RangeIterState {
34    fn page_number(&self) -> PageNumber {
35        match self {
36            Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
37        }
38    }
39
40    fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
41        match self {
42            Leaf {
43                page,
44                fixed_key_size,
45                fixed_value_size,
46                entry,
47                parent,
48            } => {
49                let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
50                let direction = if reverse { -1 } else { 1 };
51                let next_entry = isize::try_from(entry).unwrap() + direction;
52                if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
53                    Ok(Some(Leaf {
54                        page,
55                        fixed_key_size,
56                        fixed_value_size,
57                        entry: next_entry.try_into().unwrap(),
58                        parent,
59                    }))
60                } else {
61                    Ok(parent.map(|x| *x))
62                }
63            }
64            Internal {
65                page,
66                fixed_key_size,
67                fixed_value_size,
68                child,
69                mut parent,
70            } => {
71                let accessor = BranchAccessor::new(&page, fixed_key_size);
72                let child_page = accessor.child_page(child).unwrap();
73                let child_page = manager.get_page(child_page)?;
74                let direction = if reverse { -1 } else { 1 };
75                let next_child = isize::try_from(child).unwrap() + direction;
76                if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
77                    parent = Some(Box::new(Internal {
78                        page,
79                        fixed_key_size,
80                        fixed_value_size,
81                        child: next_child.try_into().unwrap(),
82                        parent,
83                    }));
84                }
85                match child_page.memory()[0] {
86                    LEAF => {
87                        let child_accessor = LeafAccessor::new(
88                            child_page.memory(),
89                            fixed_key_size,
90                            fixed_value_size,
91                        );
92                        let entry = if reverse {
93                            child_accessor.num_pairs() - 1
94                        } else {
95                            0
96                        };
97                        Ok(Some(Leaf {
98                            page: child_page,
99                            fixed_key_size,
100                            fixed_value_size,
101                            entry,
102                            parent,
103                        }))
104                    }
105                    BRANCH => {
106                        let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
107                        let child = if reverse {
108                            child_accessor.count_children() - 1
109                        } else {
110                            0
111                        };
112                        Ok(Some(Internal {
113                            page: child_page,
114                            fixed_key_size,
115                            fixed_value_size,
116                            child,
117                            parent,
118                        }))
119                    }
120                    _ => unreachable!(),
121                }
122            }
123        }
124    }
125
126    fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
127        match self {
128            Leaf {
129                page,
130                fixed_key_size,
131                fixed_value_size,
132                entry,
133                ..
134            } => {
135                let (key, value) =
136                    LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
137                        .entry_ranges(*entry)?;
138                Some(EntryGuard::new(page.clone(), key, value))
139            }
140            Internal { .. } => None,
141        }
142    }
143}
144
145pub(crate) struct EntryGuard<K: Key, V: Value> {
146    page: PageImpl,
147    key_range: Range<usize>,
148    value_range: Range<usize>,
149    _key_type: PhantomData<K>,
150    _value_type: PhantomData<V>,
151}
152
153impl<K: Key, V: Value> EntryGuard<K, V> {
154    fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
155        Self {
156            page,
157            key_range,
158            value_range,
159            _key_type: Default::default(),
160            _value_type: Default::default(),
161        }
162    }
163
164    pub(crate) fn key_data(&self) -> Vec<u8> {
165        self.page.memory()[self.key_range.clone()].to_vec()
166    }
167
168    pub(crate) fn key(&self) -> K::SelfType<'_> {
169        K::from_bytes(&self.page.memory()[self.key_range.clone()])
170    }
171
172    pub(crate) fn value(&self) -> V::SelfType<'_> {
173        V::from_bytes(&self.page.memory()[self.value_range.clone()])
174    }
175
176    pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
177        (self.page, self.key_range, self.value_range)
178    }
179}
180
181pub(crate) struct AllPageNumbersBtreeIter {
182    next: Option<RangeIterState>,
183    manager: Arc<TransactionalMemory>,
184}
185
186impl AllPageNumbersBtreeIter {
187    pub(crate) fn new(
188        root: PageNumber,
189        fixed_key_size: Option<usize>,
190        fixed_value_size: Option<usize>,
191        manager: Arc<TransactionalMemory>,
192    ) -> Result<Self> {
193        let root_page = manager.get_page(root)?;
194        let node_mem = root_page.memory();
195        let start = match node_mem[0] {
196            LEAF => Leaf {
197                page: root_page,
198                fixed_key_size,
199                fixed_value_size,
200                entry: 0,
201                parent: None,
202            },
203            BRANCH => Internal {
204                page: root_page,
205                fixed_key_size,
206                fixed_value_size,
207                child: 0,
208                parent: None,
209            },
210            _ => unreachable!(),
211        };
212        Ok(Self {
213            next: Some(start),
214            manager,
215        })
216    }
217}
218
219impl Iterator for AllPageNumbersBtreeIter {
220    type Item = Result<PageNumber>;
221
222    fn next(&mut self) -> Option<Self::Item> {
223        loop {
224            let state = self.next.take()?;
225            let value = state.page_number();
226            // Only return each page number once
227            let once = match state {
228                Leaf { entry, .. } => entry == 0,
229                Internal { child, .. } => child == 0,
230            };
231            match state.next(false, &self.manager) {
232                Ok(next) => {
233                    self.next = next;
234                }
235                Err(err) => {
236                    return Some(Err(err));
237                }
238            }
239            if once {
240                return Some(Ok(value));
241            }
242        }
243    }
244}
245
246pub(crate) struct BtreeExtractIf<
247    'a,
248    K: Key + 'static,
249    V: Value + 'static,
250    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
251> {
252    root: &'a mut Option<BtreeHeader>,
253    inner: BtreeRangeIter<K, V>,
254    predicate: F,
255    free_on_drop: Vec<PageNumber>,
256    master_free_list: Arc<Mutex<Vec<PageNumber>>>,
257    mem: Arc<TransactionalMemory>,
258}
259
260impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
261    BtreeExtractIf<'a, K, V, F>
262{
263    pub(crate) fn new(
264        root: &'a mut Option<BtreeHeader>,
265        inner: BtreeRangeIter<K, V>,
266        predicate: F,
267        master_free_list: Arc<Mutex<Vec<PageNumber>>>,
268        mem: Arc<TransactionalMemory>,
269    ) -> Self {
270        Self {
271            root,
272            inner,
273            predicate,
274            free_on_drop: vec![],
275            master_free_list,
276            mem,
277        }
278    }
279}
280
281impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
282    for BtreeExtractIf<'_, K, V, F>
283{
284    type Item = Result<EntryGuard<K, V>>;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        let mut item = self.inner.next();
288        while let Some(Ok(ref entry)) = item {
289            if (self.predicate)(entry.key(), entry.value()) {
290                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
291                    self.root,
292                    self.mem.clone(),
293                    &mut self.free_on_drop,
294                );
295                match operation.delete(&entry.key()) {
296                    Ok(x) => {
297                        assert!(x.is_some());
298                    }
299                    Err(x) => {
300                        return Some(Err(x));
301                    }
302                }
303                break;
304            }
305            item = self.inner.next();
306        }
307        item
308    }
309}
310
311impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
312    DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
313{
314    fn next_back(&mut self) -> Option<Self::Item> {
315        let mut item = self.inner.next_back();
316        while let Some(Ok(ref entry)) = item {
317            if (self.predicate)(entry.key(), entry.value()) {
318                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
319                    self.root,
320                    self.mem.clone(),
321                    &mut self.free_on_drop,
322                );
323                match operation.delete(&entry.key()) {
324                    Ok(x) => {
325                        assert!(x.is_some());
326                    }
327                    Err(x) => {
328                        return Some(Err(x));
329                    }
330                }
331                break;
332            }
333            item = self.inner.next_back();
334        }
335        item
336    }
337}
338
339impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
340    for BtreeExtractIf<'_, K, V, F>
341{
342    fn drop(&mut self) {
343        let mut master_free_list = self.master_free_list.lock().unwrap();
344        for page in self.free_on_drop.drain(..) {
345            if !self.mem.free_if_uncommitted(page) {
346                master_free_list.push(page);
347            }
348        }
349    }
350}
351
352#[derive(Clone)]
353pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
354    left: Option<RangeIterState>, // Exclusive. The previous element returned
355    right: Option<RangeIterState>, // Exclusive. The previous element returned
356    include_left: bool,           // left is inclusive, instead of exclusive
357    include_right: bool,          // right is inclusive, instead of exclusive
358    manager: Arc<TransactionalMemory>,
359    _key_type: PhantomData<K>,
360    _value_type: PhantomData<V>,
361}
362
363impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
364    pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
365        query_range: &'_ T,
366        table_root: Option<PageNumber>,
367        manager: Arc<TransactionalMemory>,
368    ) -> Result<Self> {
369        if let Some(root) = table_root {
370            let (include_left, left) = match query_range.start_bound() {
371                Bound::Included(k) => find_iter_left::<K, V>(
372                    manager.get_page(root)?,
373                    None,
374                    K::as_bytes(k.borrow()).as_ref(),
375                    true,
376                    &manager,
377                )?,
378                Bound::Excluded(k) => find_iter_left::<K, V>(
379                    manager.get_page(root)?,
380                    None,
381                    K::as_bytes(k.borrow()).as_ref(),
382                    false,
383                    &manager,
384                )?,
385                Bound::Unbounded => {
386                    let state = find_iter_unbounded::<K, V>(
387                        manager.get_page(root)?,
388                        None,
389                        false,
390                        &manager,
391                    )?;
392                    (true, state)
393                }
394            };
395            let (include_right, right) = match query_range.end_bound() {
396                Bound::Included(k) => find_iter_right::<K, V>(
397                    manager.get_page(root)?,
398                    None,
399                    K::as_bytes(k.borrow()).as_ref(),
400                    true,
401                    &manager,
402                )?,
403                Bound::Excluded(k) => find_iter_right::<K, V>(
404                    manager.get_page(root)?,
405                    None,
406                    K::as_bytes(k.borrow()).as_ref(),
407                    false,
408                    &manager,
409                )?,
410                Bound::Unbounded => {
411                    let state =
412                        find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
413                    (true, state)
414                }
415            };
416            Ok(Self {
417                left,
418                right,
419                include_left,
420                include_right,
421                manager,
422                _key_type: Default::default(),
423                _value_type: Default::default(),
424            })
425        } else {
426            Ok(Self {
427                left: None,
428                right: None,
429                include_left: false,
430                include_right: false,
431                manager,
432                _key_type: Default::default(),
433                _value_type: Default::default(),
434            })
435        }
436    }
437}
438
439impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
440    type Item = Result<EntryGuard<K, V>>;
441
442    fn next(&mut self) -> Option<Self::Item> {
443        if let (
444            Some(Leaf {
445                page: left_page,
446                entry: left_entry,
447                ..
448            }),
449            Some(Leaf {
450                page: right_page,
451                entry: right_entry,
452                ..
453            }),
454        ) = (&self.left, &self.right)
455        {
456            if left_page.get_page_number() == right_page.get_page_number()
457                && (left_entry > right_entry
458                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
459            {
460                return None;
461            }
462        }
463
464        loop {
465            if !self.include_left {
466                match self.left.take()?.next(false, &self.manager) {
467                    Ok(left) => {
468                        self.left = left;
469                    }
470                    Err(err) => {
471                        return Some(Err(err));
472                    }
473                }
474            }
475            // Return None if the next state is None
476            self.left.as_ref()?;
477
478            if let (
479                Some(Leaf {
480                    page: left_page,
481                    entry: left_entry,
482                    ..
483                }),
484                Some(Leaf {
485                    page: right_page,
486                    entry: right_entry,
487                    ..
488                }),
489            ) = (&self.left, &self.right)
490            {
491                if left_page.get_page_number() == right_page.get_page_number()
492                    && (left_entry > right_entry
493                        || (left_entry == right_entry && !self.include_right))
494                {
495                    return None;
496                }
497            }
498
499            self.include_left = false;
500            if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
501                return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
502            }
503        }
504    }
505}
506
507impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
508    fn next_back(&mut self) -> Option<Self::Item> {
509        if let (
510            Some(Leaf {
511                page: left_page,
512                entry: left_entry,
513                ..
514            }),
515            Some(Leaf {
516                page: right_page,
517                entry: right_entry,
518                ..
519            }),
520        ) = (&self.left, &self.right)
521        {
522            if left_page.get_page_number() == right_page.get_page_number()
523                && (left_entry > right_entry
524                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
525            {
526                return None;
527            }
528        }
529
530        loop {
531            if !self.include_right {
532                match self.right.take()?.next(true, &self.manager) {
533                    Ok(right) => {
534                        self.right = right;
535                    }
536                    Err(err) => {
537                        return Some(Err(err));
538                    }
539                }
540            }
541            // Return None if the next state is None
542            self.right.as_ref()?;
543
544            if let (
545                Some(Leaf {
546                    page: left_page,
547                    entry: left_entry,
548                    ..
549                }),
550                Some(Leaf {
551                    page: right_page,
552                    entry: right_entry,
553                    ..
554                }),
555            ) = (&self.left, &self.right)
556            {
557                if left_page.get_page_number() == right_page.get_page_number()
558                    && (left_entry > right_entry
559                        || (left_entry == right_entry && !self.include_left))
560                {
561                    return None;
562                }
563            }
564
565            self.include_right = false;
566            if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
567                return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
568            }
569        }
570    }
571}
572
573fn find_iter_unbounded<K: Key, V: Value>(
574    page: PageImpl,
575    mut parent: Option<Box<RangeIterState>>,
576    reverse: bool,
577    manager: &TransactionalMemory,
578) -> Result<Option<RangeIterState>> {
579    let node_mem = page.memory();
580    match node_mem[0] {
581        LEAF => {
582            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
583            let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
584            Ok(Some(Leaf {
585                page,
586                fixed_key_size: K::fixed_width(),
587                fixed_value_size: V::fixed_width(),
588                entry,
589                parent,
590            }))
591        }
592        BRANCH => {
593            let accessor = BranchAccessor::new(&page, K::fixed_width());
594            let child_index = if reverse {
595                accessor.count_children() - 1
596            } else {
597                0
598            };
599            let child_page_number = accessor.child_page(child_index).unwrap();
600            let child_page = manager.get_page(child_page_number)?;
601            let direction = if reverse { -1isize } else { 1 };
602            parent = Some(Box::new(Internal {
603                page,
604                fixed_key_size: K::fixed_width(),
605                fixed_value_size: V::fixed_width(),
606                child: (isize::try_from(child_index).unwrap() + direction)
607                    .try_into()
608                    .unwrap(),
609                parent,
610            }));
611            find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
612        }
613        _ => unreachable!(),
614    }
615}
616
617// Returns a bool indicating whether the first entry pointed to by the state is included in the
618// queried range
619fn find_iter_left<K: Key, V: Value>(
620    page: PageImpl,
621    mut parent: Option<Box<RangeIterState>>,
622    query: &[u8],
623    include_query: bool,
624    manager: &TransactionalMemory,
625) -> Result<(bool, Option<RangeIterState>)> {
626    let node_mem = page.memory();
627    match node_mem[0] {
628        LEAF => {
629            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
630            let (mut position, found) = accessor.position::<K>(query);
631            let include = if position < accessor.num_pairs() {
632                include_query || !found
633            } else {
634                // Back up to the last valid position
635                position -= 1;
636                // and exclude it
637                false
638            };
639            let result = Leaf {
640                page,
641                fixed_key_size: K::fixed_width(),
642                fixed_value_size: V::fixed_width(),
643                entry: position,
644                parent,
645            };
646            Ok((include, Some(result)))
647        }
648        BRANCH => {
649            let accessor = BranchAccessor::new(&page, K::fixed_width());
650            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
651            let child_page = manager.get_page(child_page_number)?;
652            if child_index < accessor.count_children() - 1 {
653                parent = Some(Box::new(Internal {
654                    page,
655                    fixed_key_size: K::fixed_width(),
656                    fixed_value_size: V::fixed_width(),
657                    child: child_index + 1,
658                    parent,
659                }));
660            }
661            find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
662        }
663        _ => unreachable!(),
664    }
665}
666
667fn find_iter_right<K: Key, V: Value>(
668    page: PageImpl,
669    mut parent: Option<Box<RangeIterState>>,
670    query: &[u8],
671    include_query: bool,
672    manager: &TransactionalMemory,
673) -> Result<(bool, Option<RangeIterState>)> {
674    let node_mem = page.memory();
675    match node_mem[0] {
676        LEAF => {
677            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
678            let (mut position, found) = accessor.position::<K>(query);
679            let include = if position < accessor.num_pairs() {
680                include_query && found
681            } else {
682                // Back up to the last valid position
683                position -= 1;
684                // and include it
685                true
686            };
687            let result = Leaf {
688                page,
689                fixed_key_size: K::fixed_width(),
690                fixed_value_size: V::fixed_width(),
691                entry: position,
692                parent,
693            };
694            Ok((include, Some(result)))
695        }
696        BRANCH => {
697            let accessor = BranchAccessor::new(&page, K::fixed_width());
698            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
699            let child_page = manager.get_page(child_page_number)?;
700            if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
701                parent = Some(Box::new(Internal {
702                    page,
703                    fixed_key_size: K::fixed_width(),
704                    fixed_value_size: V::fixed_width(),
705                    child: child_index - 1,
706                    parent,
707                }));
708            }
709            find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
710        }
711        _ => unreachable!(),
712    }
713}