redb/tree_store/
btree_iters.rs

1use crate::Result;
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
4use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
5use crate::tree_store::btree_mutator::MutateHelper;
6use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
7use crate::tree_store::{BtreeHeader, PageNumber, PageTrackerPolicy};
8use crate::types::{Key, Value};
9use Bound::{Excluded, Included, Unbounded};
10use std::borrow::Borrow;
11use std::collections::Bound;
12use std::marker::PhantomData;
13use std::ops::{Range, RangeBounds};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug, Clone)]
17pub enum RangeIterState {
18    Leaf {
19        page: PageImpl,
20        fixed_key_size: Option<usize>,
21        fixed_value_size: Option<usize>,
22        entry: usize,
23        parent: Option<Box<RangeIterState>>,
24    },
25    Internal {
26        page: PageImpl,
27        fixed_key_size: Option<usize>,
28        fixed_value_size: Option<usize>,
29        child: usize,
30        parent: Option<Box<RangeIterState>>,
31    },
32}
33
34impl RangeIterState {
35    fn page_number(&self) -> PageNumber {
36        match self {
37            Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
38        }
39    }
40
41    fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
42        match self {
43            Leaf {
44                page,
45                fixed_key_size,
46                fixed_value_size,
47                entry,
48                parent,
49            } => {
50                let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
51                let direction = if reverse { -1 } else { 1 };
52                let next_entry = isize::try_from(entry).unwrap() + direction;
53                if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
54                    Ok(Some(Leaf {
55                        page,
56                        fixed_key_size,
57                        fixed_value_size,
58                        entry: next_entry.try_into().unwrap(),
59                        parent,
60                    }))
61                } else {
62                    Ok(parent.map(|x| *x))
63                }
64            }
65            Internal {
66                page,
67                fixed_key_size,
68                fixed_value_size,
69                child,
70                mut parent,
71            } => {
72                let accessor = BranchAccessor::new(&page, fixed_key_size);
73                let child_page = accessor.child_page(child).unwrap();
74                let child_page = manager.get_page(child_page)?;
75                let direction = if reverse { -1 } else { 1 };
76                let next_child = isize::try_from(child).unwrap() + direction;
77                if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
78                    parent = Some(Box::new(Internal {
79                        page,
80                        fixed_key_size,
81                        fixed_value_size,
82                        child: next_child.try_into().unwrap(),
83                        parent,
84                    }));
85                }
86                match child_page.memory()[0] {
87                    LEAF => {
88                        let child_accessor = LeafAccessor::new(
89                            child_page.memory(),
90                            fixed_key_size,
91                            fixed_value_size,
92                        );
93                        let entry = if reverse {
94                            child_accessor.num_pairs() - 1
95                        } else {
96                            0
97                        };
98                        Ok(Some(Leaf {
99                            page: child_page,
100                            fixed_key_size,
101                            fixed_value_size,
102                            entry,
103                            parent,
104                        }))
105                    }
106                    BRANCH => {
107                        let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
108                        let child = if reverse {
109                            child_accessor.count_children() - 1
110                        } else {
111                            0
112                        };
113                        Ok(Some(Internal {
114                            page: child_page,
115                            fixed_key_size,
116                            fixed_value_size,
117                            child,
118                            parent,
119                        }))
120                    }
121                    _ => unreachable!(),
122                }
123            }
124        }
125    }
126
127    fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
128        match self {
129            Leaf {
130                page,
131                fixed_key_size,
132                fixed_value_size,
133                entry,
134                ..
135            } => {
136                let (key, value) =
137                    LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
138                        .entry_ranges(*entry)?;
139                Some(EntryGuard::new(page.clone(), key, value))
140            }
141            Internal { .. } => None,
142        }
143    }
144}
145
146pub(crate) struct EntryGuard<K: Key, V: Value> {
147    page: PageImpl,
148    key_range: Range<usize>,
149    value_range: Range<usize>,
150    _key_type: PhantomData<K>,
151    _value_type: PhantomData<V>,
152}
153
154impl<K: Key, V: Value> EntryGuard<K, V> {
155    fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
156        Self {
157            page,
158            key_range,
159            value_range,
160            _key_type: Default::default(),
161            _value_type: Default::default(),
162        }
163    }
164
165    pub(crate) fn key_data(&self) -> Vec<u8> {
166        self.page.memory()[self.key_range.clone()].to_vec()
167    }
168
169    pub(crate) fn key(&self) -> K::SelfType<'_> {
170        K::from_bytes(&self.page.memory()[self.key_range.clone()])
171    }
172
173    pub(crate) fn value(&self) -> V::SelfType<'_> {
174        V::from_bytes(&self.page.memory()[self.value_range.clone()])
175    }
176
177    pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
178        (self.page, self.key_range, self.value_range)
179    }
180}
181
182pub(crate) struct AllPageNumbersBtreeIter {
183    next: Option<RangeIterState>,
184    manager: Arc<TransactionalMemory>,
185}
186
187impl AllPageNumbersBtreeIter {
188    pub(crate) fn new(
189        root: PageNumber,
190        fixed_key_size: Option<usize>,
191        fixed_value_size: Option<usize>,
192        manager: Arc<TransactionalMemory>,
193    ) -> Result<Self> {
194        let root_page = manager.get_page(root)?;
195        let node_mem = root_page.memory();
196        let start = match node_mem[0] {
197            LEAF => Leaf {
198                page: root_page,
199                fixed_key_size,
200                fixed_value_size,
201                entry: 0,
202                parent: None,
203            },
204            BRANCH => Internal {
205                page: root_page,
206                fixed_key_size,
207                fixed_value_size,
208                child: 0,
209                parent: None,
210            },
211            _ => unreachable!(),
212        };
213        Ok(Self {
214            next: Some(start),
215            manager,
216        })
217    }
218}
219
220impl Iterator for AllPageNumbersBtreeIter {
221    type Item = Result<PageNumber>;
222
223    fn next(&mut self) -> Option<Self::Item> {
224        loop {
225            let state = self.next.take()?;
226            let value = state.page_number();
227            // Only return each page number once
228            let once = match state {
229                Leaf { entry, .. } => entry == 0,
230                Internal { child, .. } => child == 0,
231            };
232            match state.next(false, &self.manager) {
233                Ok(next) => {
234                    self.next = next;
235                }
236                Err(err) => {
237                    return Some(Err(err));
238                }
239            }
240            if once {
241                return Some(Ok(value));
242            }
243        }
244    }
245}
246
247pub(crate) struct BtreeExtractIf<
248    'a,
249    K: Key + 'static,
250    V: Value + 'static,
251    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
252> {
253    root: &'a mut Option<BtreeHeader>,
254    inner: BtreeRangeIter<K, V>,
255    predicate: F,
256    free_on_drop: Vec<PageNumber>,
257    master_free_list: Arc<Mutex<Vec<PageNumber>>>,
258    allocated: Arc<Mutex<PageTrackerPolicy>>,
259    mem: Arc<TransactionalMemory>,
260}
261
262impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
263    BtreeExtractIf<'a, K, V, F>
264{
265    pub(crate) fn new(
266        root: &'a mut Option<BtreeHeader>,
267        inner: BtreeRangeIter<K, V>,
268        predicate: F,
269        master_free_list: Arc<Mutex<Vec<PageNumber>>>,
270        allocated: Arc<Mutex<PageTrackerPolicy>>,
271        mem: Arc<TransactionalMemory>,
272    ) -> Self {
273        Self {
274            root,
275            inner,
276            predicate,
277            free_on_drop: vec![],
278            master_free_list,
279            allocated,
280            mem,
281        }
282    }
283}
284
285impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
286    for BtreeExtractIf<'_, K, V, F>
287{
288    type Item = Result<EntryGuard<K, V>>;
289
290    fn next(&mut self) -> Option<Self::Item> {
291        let mut item = self.inner.next();
292        while let Some(Ok(ref entry)) = item {
293            if (self.predicate)(entry.key(), entry.value()) {
294                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
295                    self.root,
296                    self.mem.clone(),
297                    &mut self.free_on_drop,
298                    self.allocated.clone(),
299                );
300                match operation.delete(&entry.key()) {
301                    Ok(x) => {
302                        assert!(x.is_some());
303                    }
304                    Err(x) => {
305                        return Some(Err(x));
306                    }
307                }
308                break;
309            }
310            item = self.inner.next();
311        }
312        item
313    }
314}
315
316impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
317    DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
318{
319    fn next_back(&mut self) -> Option<Self::Item> {
320        let mut item = self.inner.next_back();
321        while let Some(Ok(ref entry)) = item {
322            if (self.predicate)(entry.key(), entry.value()) {
323                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
324                    self.root,
325                    self.mem.clone(),
326                    &mut self.free_on_drop,
327                    self.allocated.clone(),
328                );
329                match operation.delete(&entry.key()) {
330                    Ok(x) => {
331                        assert!(x.is_some());
332                    }
333                    Err(x) => {
334                        return Some(Err(x));
335                    }
336                }
337                break;
338            }
339            item = self.inner.next_back();
340        }
341        item
342    }
343}
344
345impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
346    for BtreeExtractIf<'_, K, V, F>
347{
348    fn drop(&mut self) {
349        let mut master_free_list = self.master_free_list.lock().unwrap();
350        let mut allocated = self.allocated.lock().unwrap();
351        for page in self.free_on_drop.drain(..) {
352            if !self.mem.free_if_uncommitted(page, &mut allocated) {
353                master_free_list.push(page);
354            }
355        }
356    }
357}
358
359#[derive(Clone)]
360pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
361    left: Option<RangeIterState>, // Exclusive. The previous element returned
362    right: Option<RangeIterState>, // Exclusive. The previous element returned
363    include_left: bool,           // left is inclusive, instead of exclusive
364    include_right: bool,          // right is inclusive, instead of exclusive
365    manager: Arc<TransactionalMemory>,
366    _key_type: PhantomData<K>,
367    _value_type: PhantomData<V>,
368}
369
370fn range_is_empty<'a, K: Key + 'static, KR: Borrow<K::SelfType<'a>>, T: RangeBounds<KR>>(
371    range: &T,
372) -> bool {
373    match (range.start_bound(), range.end_bound()) {
374        (Unbounded, _) | (_, Unbounded) => false,
375        (Included(start), Excluded(end)) | (Excluded(start), Included(end) | Excluded(end)) => {
376            let start_tmp = K::as_bytes(start.borrow());
377            let start_value = start_tmp.as_ref();
378            let end_tmp = K::as_bytes(end.borrow());
379            let end_value = end_tmp.as_ref();
380            K::compare(start_value, end_value).is_ge()
381        }
382        (Included(start), Included(end)) => {
383            let start_tmp = K::as_bytes(start.borrow());
384            let start_value = start_tmp.as_ref();
385            let end_tmp = K::as_bytes(end.borrow());
386            let end_value = end_tmp.as_ref();
387            K::compare(start_value, end_value).is_gt()
388        }
389    }
390}
391
392impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
393    pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
394        query_range: &'_ T,
395        table_root: Option<PageNumber>,
396        manager: Arc<TransactionalMemory>,
397    ) -> Result<Self> {
398        if range_is_empty::<K, KR, T>(query_range) {
399            return Ok(Self {
400                left: None,
401                right: None,
402                include_left: false,
403                include_right: false,
404                manager,
405                _key_type: Default::default(),
406                _value_type: Default::default(),
407            });
408        }
409        if let Some(root) = table_root {
410            let (include_left, left) = match query_range.start_bound() {
411                Included(k) => find_iter_left::<K, V>(
412                    manager.get_page(root)?,
413                    None,
414                    K::as_bytes(k.borrow()).as_ref(),
415                    true,
416                    &manager,
417                )?,
418                Excluded(k) => find_iter_left::<K, V>(
419                    manager.get_page(root)?,
420                    None,
421                    K::as_bytes(k.borrow()).as_ref(),
422                    false,
423                    &manager,
424                )?,
425                Unbounded => {
426                    let state = find_iter_unbounded::<K, V>(
427                        manager.get_page(root)?,
428                        None,
429                        false,
430                        &manager,
431                    )?;
432                    (true, state)
433                }
434            };
435            let (include_right, right) = match query_range.end_bound() {
436                Included(k) => find_iter_right::<K, V>(
437                    manager.get_page(root)?,
438                    None,
439                    K::as_bytes(k.borrow()).as_ref(),
440                    true,
441                    &manager,
442                )?,
443                Excluded(k) => find_iter_right::<K, V>(
444                    manager.get_page(root)?,
445                    None,
446                    K::as_bytes(k.borrow()).as_ref(),
447                    false,
448                    &manager,
449                )?,
450                Unbounded => {
451                    let state =
452                        find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
453                    (true, state)
454                }
455            };
456            Ok(Self {
457                left,
458                right,
459                include_left,
460                include_right,
461                manager,
462                _key_type: Default::default(),
463                _value_type: Default::default(),
464            })
465        } else {
466            Ok(Self {
467                left: None,
468                right: None,
469                include_left: false,
470                include_right: false,
471                manager,
472                _key_type: Default::default(),
473                _value_type: Default::default(),
474            })
475        }
476    }
477}
478
479impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
480    type Item = Result<EntryGuard<K, V>>;
481
482    fn next(&mut self) -> Option<Self::Item> {
483        if let (
484            Some(Leaf {
485                page: left_page,
486                entry: left_entry,
487                ..
488            }),
489            Some(Leaf {
490                page: right_page,
491                entry: right_entry,
492                ..
493            }),
494        ) = (&self.left, &self.right)
495        {
496            if left_page.get_page_number() == right_page.get_page_number()
497                && (left_entry > right_entry
498                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
499            {
500                return None;
501            }
502        }
503
504        loop {
505            if !self.include_left {
506                match self.left.take()?.next(false, &self.manager) {
507                    Ok(left) => {
508                        self.left = left;
509                    }
510                    Err(err) => {
511                        return Some(Err(err));
512                    }
513                }
514            }
515            // Return None if the next state is None
516            self.left.as_ref()?;
517
518            if let (
519                Some(Leaf {
520                    page: left_page,
521                    entry: left_entry,
522                    ..
523                }),
524                Some(Leaf {
525                    page: right_page,
526                    entry: right_entry,
527                    ..
528                }),
529            ) = (&self.left, &self.right)
530            {
531                if left_page.get_page_number() == right_page.get_page_number()
532                    && (left_entry > right_entry
533                        || (left_entry == right_entry && !self.include_right))
534                {
535                    return None;
536                }
537            }
538
539            self.include_left = false;
540            if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
541                return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
542            }
543        }
544    }
545}
546
547impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
548    fn next_back(&mut self) -> Option<Self::Item> {
549        if let (
550            Some(Leaf {
551                page: left_page,
552                entry: left_entry,
553                ..
554            }),
555            Some(Leaf {
556                page: right_page,
557                entry: right_entry,
558                ..
559            }),
560        ) = (&self.left, &self.right)
561        {
562            if left_page.get_page_number() == right_page.get_page_number()
563                && (left_entry > right_entry
564                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
565            {
566                return None;
567            }
568        }
569
570        loop {
571            if !self.include_right {
572                match self.right.take()?.next(true, &self.manager) {
573                    Ok(right) => {
574                        self.right = right;
575                    }
576                    Err(err) => {
577                        return Some(Err(err));
578                    }
579                }
580            }
581            // Return None if the next state is None
582            self.right.as_ref()?;
583
584            if let (
585                Some(Leaf {
586                    page: left_page,
587                    entry: left_entry,
588                    ..
589                }),
590                Some(Leaf {
591                    page: right_page,
592                    entry: right_entry,
593                    ..
594                }),
595            ) = (&self.left, &self.right)
596            {
597                if left_page.get_page_number() == right_page.get_page_number()
598                    && (left_entry > right_entry
599                        || (left_entry == right_entry && !self.include_left))
600                {
601                    return None;
602                }
603            }
604
605            self.include_right = false;
606            if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
607                return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
608            }
609        }
610    }
611}
612
613fn find_iter_unbounded<K: Key, V: Value>(
614    page: PageImpl,
615    mut parent: Option<Box<RangeIterState>>,
616    reverse: bool,
617    manager: &TransactionalMemory,
618) -> Result<Option<RangeIterState>> {
619    let node_mem = page.memory();
620    match node_mem[0] {
621        LEAF => {
622            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
623            let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
624            Ok(Some(Leaf {
625                page,
626                fixed_key_size: K::fixed_width(),
627                fixed_value_size: V::fixed_width(),
628                entry,
629                parent,
630            }))
631        }
632        BRANCH => {
633            let accessor = BranchAccessor::new(&page, K::fixed_width());
634            let child_index = if reverse {
635                accessor.count_children() - 1
636            } else {
637                0
638            };
639            let child_page_number = accessor.child_page(child_index).unwrap();
640            let child_page = manager.get_page(child_page_number)?;
641            let direction = if reverse { -1isize } else { 1 };
642            parent = Some(Box::new(Internal {
643                page,
644                fixed_key_size: K::fixed_width(),
645                fixed_value_size: V::fixed_width(),
646                child: (isize::try_from(child_index).unwrap() + direction)
647                    .try_into()
648                    .unwrap(),
649                parent,
650            }));
651            find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
652        }
653        _ => unreachable!(),
654    }
655}
656
657// Returns a bool indicating whether the first entry pointed to by the state is included in the
658// queried range
659fn find_iter_left<K: Key, V: Value>(
660    page: PageImpl,
661    mut parent: Option<Box<RangeIterState>>,
662    query: &[u8],
663    include_query: bool,
664    manager: &TransactionalMemory,
665) -> Result<(bool, Option<RangeIterState>)> {
666    let node_mem = page.memory();
667    match node_mem[0] {
668        LEAF => {
669            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
670            let (mut position, found) = accessor.position::<K>(query);
671            let include = if position < accessor.num_pairs() {
672                include_query || !found
673            } else {
674                // Back up to the last valid position
675                position -= 1;
676                // and exclude it
677                false
678            };
679            let result = Leaf {
680                page,
681                fixed_key_size: K::fixed_width(),
682                fixed_value_size: V::fixed_width(),
683                entry: position,
684                parent,
685            };
686            Ok((include, Some(result)))
687        }
688        BRANCH => {
689            let accessor = BranchAccessor::new(&page, K::fixed_width());
690            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
691            let child_page = manager.get_page(child_page_number)?;
692            if child_index < accessor.count_children() - 1 {
693                parent = Some(Box::new(Internal {
694                    page,
695                    fixed_key_size: K::fixed_width(),
696                    fixed_value_size: V::fixed_width(),
697                    child: child_index + 1,
698                    parent,
699                }));
700            }
701            find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
702        }
703        _ => unreachable!(),
704    }
705}
706
707fn find_iter_right<K: Key, V: Value>(
708    page: PageImpl,
709    mut parent: Option<Box<RangeIterState>>,
710    query: &[u8],
711    include_query: bool,
712    manager: &TransactionalMemory,
713) -> Result<(bool, Option<RangeIterState>)> {
714    let node_mem = page.memory();
715    match node_mem[0] {
716        LEAF => {
717            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
718            let (mut position, found) = accessor.position::<K>(query);
719            let include = if position < accessor.num_pairs() {
720                include_query && found
721            } else {
722                // Back up to the last valid position
723                position -= 1;
724                // and include it
725                true
726            };
727            let result = Leaf {
728                page,
729                fixed_key_size: K::fixed_width(),
730                fixed_value_size: V::fixed_width(),
731                entry: position,
732                parent,
733            };
734            Ok((include, Some(result)))
735        }
736        BRANCH => {
737            let accessor = BranchAccessor::new(&page, K::fixed_width());
738            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
739            let child_page = manager.get_page(child_page_number)?;
740            if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
741                parent = Some(Box::new(Internal {
742                    page,
743                    fixed_key_size: K::fixed_width(),
744                    fixed_value_size: V::fixed_width(),
745                    child: child_index - 1,
746                    parent,
747                }));
748            }
749            find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
750        }
751        _ => unreachable!(),
752    }
753}