redb/tree_store/
btree_iters.rs

1use crate::Result;
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
4use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
5use crate::tree_store::btree_mutator::MutateHelper;
6use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
7use crate::tree_store::{BtreeHeader, PageNumber, PageTrackerPolicy};
8use crate::types::{Key, Value};
9use std::borrow::Borrow;
10use std::collections::Bound;
11use std::marker::PhantomData;
12use std::ops::{Range, RangeBounds};
13use std::sync::{Arc, Mutex};
14
15#[derive(Debug, Clone)]
16pub enum RangeIterState {
17    Leaf {
18        page: PageImpl,
19        fixed_key_size: Option<usize>,
20        fixed_value_size: Option<usize>,
21        entry: usize,
22        parent: Option<Box<RangeIterState>>,
23    },
24    Internal {
25        page: PageImpl,
26        fixed_key_size: Option<usize>,
27        fixed_value_size: Option<usize>,
28        child: usize,
29        parent: Option<Box<RangeIterState>>,
30    },
31}
32
33impl RangeIterState {
34    fn page_number(&self) -> PageNumber {
35        match self {
36            Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
37        }
38    }
39
40    fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
41        match self {
42            Leaf {
43                page,
44                fixed_key_size,
45                fixed_value_size,
46                entry,
47                parent,
48            } => {
49                let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
50                let direction = if reverse { -1 } else { 1 };
51                let next_entry = isize::try_from(entry).unwrap() + direction;
52                if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
53                    Ok(Some(Leaf {
54                        page,
55                        fixed_key_size,
56                        fixed_value_size,
57                        entry: next_entry.try_into().unwrap(),
58                        parent,
59                    }))
60                } else {
61                    Ok(parent.map(|x| *x))
62                }
63            }
64            Internal {
65                page,
66                fixed_key_size,
67                fixed_value_size,
68                child,
69                mut parent,
70            } => {
71                let accessor = BranchAccessor::new(&page, fixed_key_size);
72                let child_page = accessor.child_page(child).unwrap();
73                let child_page = manager.get_page(child_page)?;
74                let direction = if reverse { -1 } else { 1 };
75                let next_child = isize::try_from(child).unwrap() + direction;
76                if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
77                    parent = Some(Box::new(Internal {
78                        page,
79                        fixed_key_size,
80                        fixed_value_size,
81                        child: next_child.try_into().unwrap(),
82                        parent,
83                    }));
84                }
85                match child_page.memory()[0] {
86                    LEAF => {
87                        let child_accessor = LeafAccessor::new(
88                            child_page.memory(),
89                            fixed_key_size,
90                            fixed_value_size,
91                        );
92                        let entry = if reverse {
93                            child_accessor.num_pairs() - 1
94                        } else {
95                            0
96                        };
97                        Ok(Some(Leaf {
98                            page: child_page,
99                            fixed_key_size,
100                            fixed_value_size,
101                            entry,
102                            parent,
103                        }))
104                    }
105                    BRANCH => {
106                        let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
107                        let child = if reverse {
108                            child_accessor.count_children() - 1
109                        } else {
110                            0
111                        };
112                        Ok(Some(Internal {
113                            page: child_page,
114                            fixed_key_size,
115                            fixed_value_size,
116                            child,
117                            parent,
118                        }))
119                    }
120                    _ => unreachable!(),
121                }
122            }
123        }
124    }
125
126    fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
127        match self {
128            Leaf {
129                page,
130                fixed_key_size,
131                fixed_value_size,
132                entry,
133                ..
134            } => {
135                let (key, value) =
136                    LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
137                        .entry_ranges(*entry)?;
138                Some(EntryGuard::new(page.clone(), key, value))
139            }
140            Internal { .. } => None,
141        }
142    }
143}
144
145pub(crate) struct EntryGuard<K: Key, V: Value> {
146    page: PageImpl,
147    key_range: Range<usize>,
148    value_range: Range<usize>,
149    _key_type: PhantomData<K>,
150    _value_type: PhantomData<V>,
151}
152
153impl<K: Key, V: Value> EntryGuard<K, V> {
154    fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
155        Self {
156            page,
157            key_range,
158            value_range,
159            _key_type: Default::default(),
160            _value_type: Default::default(),
161        }
162    }
163
164    pub(crate) fn key_data(&self) -> Vec<u8> {
165        self.page.memory()[self.key_range.clone()].to_vec()
166    }
167
168    pub(crate) fn key(&self) -> K::SelfType<'_> {
169        K::from_bytes(&self.page.memory()[self.key_range.clone()])
170    }
171
172    pub(crate) fn value(&self) -> V::SelfType<'_> {
173        V::from_bytes(&self.page.memory()[self.value_range.clone()])
174    }
175
176    pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
177        (self.page, self.key_range, self.value_range)
178    }
179}
180
181pub(crate) struct AllPageNumbersBtreeIter {
182    next: Option<RangeIterState>,
183    manager: Arc<TransactionalMemory>,
184}
185
186impl AllPageNumbersBtreeIter {
187    pub(crate) fn new(
188        root: PageNumber,
189        fixed_key_size: Option<usize>,
190        fixed_value_size: Option<usize>,
191        manager: Arc<TransactionalMemory>,
192    ) -> Result<Self> {
193        let root_page = manager.get_page(root)?;
194        let node_mem = root_page.memory();
195        let start = match node_mem[0] {
196            LEAF => Leaf {
197                page: root_page,
198                fixed_key_size,
199                fixed_value_size,
200                entry: 0,
201                parent: None,
202            },
203            BRANCH => Internal {
204                page: root_page,
205                fixed_key_size,
206                fixed_value_size,
207                child: 0,
208                parent: None,
209            },
210            _ => unreachable!(),
211        };
212        Ok(Self {
213            next: Some(start),
214            manager,
215        })
216    }
217}
218
219impl Iterator for AllPageNumbersBtreeIter {
220    type Item = Result<PageNumber>;
221
222    fn next(&mut self) -> Option<Self::Item> {
223        loop {
224            let state = self.next.take()?;
225            let value = state.page_number();
226            // Only return each page number once
227            let once = match state {
228                Leaf { entry, .. } => entry == 0,
229                Internal { child, .. } => child == 0,
230            };
231            match state.next(false, &self.manager) {
232                Ok(next) => {
233                    self.next = next;
234                }
235                Err(err) => {
236                    return Some(Err(err));
237                }
238            }
239            if once {
240                return Some(Ok(value));
241            }
242        }
243    }
244}
245
246pub(crate) struct BtreeExtractIf<
247    'a,
248    K: Key + 'static,
249    V: Value + 'static,
250    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
251> {
252    root: &'a mut Option<BtreeHeader>,
253    inner: BtreeRangeIter<K, V>,
254    predicate: F,
255    free_on_drop: Vec<PageNumber>,
256    master_free_list: Arc<Mutex<Vec<PageNumber>>>,
257    allocated: Arc<Mutex<PageTrackerPolicy>>,
258    mem: Arc<TransactionalMemory>,
259}
260
261impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
262    BtreeExtractIf<'a, K, V, F>
263{
264    pub(crate) fn new(
265        root: &'a mut Option<BtreeHeader>,
266        inner: BtreeRangeIter<K, V>,
267        predicate: F,
268        master_free_list: Arc<Mutex<Vec<PageNumber>>>,
269        allocated: Arc<Mutex<PageTrackerPolicy>>,
270        mem: Arc<TransactionalMemory>,
271    ) -> Self {
272        Self {
273            root,
274            inner,
275            predicate,
276            free_on_drop: vec![],
277            master_free_list,
278            allocated,
279            mem,
280        }
281    }
282}
283
284impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
285    for BtreeExtractIf<'_, K, V, F>
286{
287    type Item = Result<EntryGuard<K, V>>;
288
289    fn next(&mut self) -> Option<Self::Item> {
290        let mut item = self.inner.next();
291        while let Some(Ok(ref entry)) = item {
292            if (self.predicate)(entry.key(), entry.value()) {
293                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
294                    self.root,
295                    self.mem.clone(),
296                    &mut self.free_on_drop,
297                    self.allocated.clone(),
298                );
299                match operation.delete(&entry.key()) {
300                    Ok(x) => {
301                        assert!(x.is_some());
302                    }
303                    Err(x) => {
304                        return Some(Err(x));
305                    }
306                }
307                break;
308            }
309            item = self.inner.next();
310        }
311        item
312    }
313}
314
315impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
316    DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
317{
318    fn next_back(&mut self) -> Option<Self::Item> {
319        let mut item = self.inner.next_back();
320        while let Some(Ok(ref entry)) = item {
321            if (self.predicate)(entry.key(), entry.value()) {
322                let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
323                    self.root,
324                    self.mem.clone(),
325                    &mut self.free_on_drop,
326                    self.allocated.clone(),
327                );
328                match operation.delete(&entry.key()) {
329                    Ok(x) => {
330                        assert!(x.is_some());
331                    }
332                    Err(x) => {
333                        return Some(Err(x));
334                    }
335                }
336                break;
337            }
338            item = self.inner.next_back();
339        }
340        item
341    }
342}
343
344impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
345    for BtreeExtractIf<'_, K, V, F>
346{
347    fn drop(&mut self) {
348        let mut master_free_list = self.master_free_list.lock().unwrap();
349        let mut allocated = self.allocated.lock().unwrap();
350        for page in self.free_on_drop.drain(..) {
351            if !self.mem.free_if_uncommitted(page, &mut allocated) {
352                master_free_list.push(page);
353            }
354        }
355    }
356}
357
358#[derive(Clone)]
359pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
360    left: Option<RangeIterState>, // Exclusive. The previous element returned
361    right: Option<RangeIterState>, // Exclusive. The previous element returned
362    include_left: bool,           // left is inclusive, instead of exclusive
363    include_right: bool,          // right is inclusive, instead of exclusive
364    manager: Arc<TransactionalMemory>,
365    _key_type: PhantomData<K>,
366    _value_type: PhantomData<V>,
367}
368
369impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
370    pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
371        query_range: &'_ T,
372        table_root: Option<PageNumber>,
373        manager: Arc<TransactionalMemory>,
374    ) -> Result<Self> {
375        if let Some(root) = table_root {
376            let (include_left, left) = match query_range.start_bound() {
377                Bound::Included(k) => find_iter_left::<K, V>(
378                    manager.get_page(root)?,
379                    None,
380                    K::as_bytes(k.borrow()).as_ref(),
381                    true,
382                    &manager,
383                )?,
384                Bound::Excluded(k) => find_iter_left::<K, V>(
385                    manager.get_page(root)?,
386                    None,
387                    K::as_bytes(k.borrow()).as_ref(),
388                    false,
389                    &manager,
390                )?,
391                Bound::Unbounded => {
392                    let state = find_iter_unbounded::<K, V>(
393                        manager.get_page(root)?,
394                        None,
395                        false,
396                        &manager,
397                    )?;
398                    (true, state)
399                }
400            };
401            let (include_right, right) = match query_range.end_bound() {
402                Bound::Included(k) => find_iter_right::<K, V>(
403                    manager.get_page(root)?,
404                    None,
405                    K::as_bytes(k.borrow()).as_ref(),
406                    true,
407                    &manager,
408                )?,
409                Bound::Excluded(k) => find_iter_right::<K, V>(
410                    manager.get_page(root)?,
411                    None,
412                    K::as_bytes(k.borrow()).as_ref(),
413                    false,
414                    &manager,
415                )?,
416                Bound::Unbounded => {
417                    let state =
418                        find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
419                    (true, state)
420                }
421            };
422            Ok(Self {
423                left,
424                right,
425                include_left,
426                include_right,
427                manager,
428                _key_type: Default::default(),
429                _value_type: Default::default(),
430            })
431        } else {
432            Ok(Self {
433                left: None,
434                right: None,
435                include_left: false,
436                include_right: false,
437                manager,
438                _key_type: Default::default(),
439                _value_type: Default::default(),
440            })
441        }
442    }
443}
444
445impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
446    type Item = Result<EntryGuard<K, V>>;
447
448    fn next(&mut self) -> Option<Self::Item> {
449        if let (
450            Some(Leaf {
451                page: left_page,
452                entry: left_entry,
453                ..
454            }),
455            Some(Leaf {
456                page: right_page,
457                entry: right_entry,
458                ..
459            }),
460        ) = (&self.left, &self.right)
461        {
462            if left_page.get_page_number() == right_page.get_page_number()
463                && (left_entry > right_entry
464                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
465            {
466                return None;
467            }
468        }
469
470        loop {
471            if !self.include_left {
472                match self.left.take()?.next(false, &self.manager) {
473                    Ok(left) => {
474                        self.left = left;
475                    }
476                    Err(err) => {
477                        return Some(Err(err));
478                    }
479                }
480            }
481            // Return None if the next state is None
482            self.left.as_ref()?;
483
484            if let (
485                Some(Leaf {
486                    page: left_page,
487                    entry: left_entry,
488                    ..
489                }),
490                Some(Leaf {
491                    page: right_page,
492                    entry: right_entry,
493                    ..
494                }),
495            ) = (&self.left, &self.right)
496            {
497                if left_page.get_page_number() == right_page.get_page_number()
498                    && (left_entry > right_entry
499                        || (left_entry == right_entry && !self.include_right))
500                {
501                    return None;
502                }
503            }
504
505            self.include_left = false;
506            if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
507                return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
508            }
509        }
510    }
511}
512
513impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
514    fn next_back(&mut self) -> Option<Self::Item> {
515        if let (
516            Some(Leaf {
517                page: left_page,
518                entry: left_entry,
519                ..
520            }),
521            Some(Leaf {
522                page: right_page,
523                entry: right_entry,
524                ..
525            }),
526        ) = (&self.left, &self.right)
527        {
528            if left_page.get_page_number() == right_page.get_page_number()
529                && (left_entry > right_entry
530                    || (left_entry == right_entry && (!self.include_left || !self.include_right)))
531            {
532                return None;
533            }
534        }
535
536        loop {
537            if !self.include_right {
538                match self.right.take()?.next(true, &self.manager) {
539                    Ok(right) => {
540                        self.right = right;
541                    }
542                    Err(err) => {
543                        return Some(Err(err));
544                    }
545                }
546            }
547            // Return None if the next state is None
548            self.right.as_ref()?;
549
550            if let (
551                Some(Leaf {
552                    page: left_page,
553                    entry: left_entry,
554                    ..
555                }),
556                Some(Leaf {
557                    page: right_page,
558                    entry: right_entry,
559                    ..
560                }),
561            ) = (&self.left, &self.right)
562            {
563                if left_page.get_page_number() == right_page.get_page_number()
564                    && (left_entry > right_entry
565                        || (left_entry == right_entry && !self.include_left))
566                {
567                    return None;
568                }
569            }
570
571            self.include_right = false;
572            if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
573                return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
574            }
575        }
576    }
577}
578
579fn find_iter_unbounded<K: Key, V: Value>(
580    page: PageImpl,
581    mut parent: Option<Box<RangeIterState>>,
582    reverse: bool,
583    manager: &TransactionalMemory,
584) -> Result<Option<RangeIterState>> {
585    let node_mem = page.memory();
586    match node_mem[0] {
587        LEAF => {
588            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
589            let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
590            Ok(Some(Leaf {
591                page,
592                fixed_key_size: K::fixed_width(),
593                fixed_value_size: V::fixed_width(),
594                entry,
595                parent,
596            }))
597        }
598        BRANCH => {
599            let accessor = BranchAccessor::new(&page, K::fixed_width());
600            let child_index = if reverse {
601                accessor.count_children() - 1
602            } else {
603                0
604            };
605            let child_page_number = accessor.child_page(child_index).unwrap();
606            let child_page = manager.get_page(child_page_number)?;
607            let direction = if reverse { -1isize } else { 1 };
608            parent = Some(Box::new(Internal {
609                page,
610                fixed_key_size: K::fixed_width(),
611                fixed_value_size: V::fixed_width(),
612                child: (isize::try_from(child_index).unwrap() + direction)
613                    .try_into()
614                    .unwrap(),
615                parent,
616            }));
617            find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
618        }
619        _ => unreachable!(),
620    }
621}
622
623// Returns a bool indicating whether the first entry pointed to by the state is included in the
624// queried range
625fn find_iter_left<K: Key, V: Value>(
626    page: PageImpl,
627    mut parent: Option<Box<RangeIterState>>,
628    query: &[u8],
629    include_query: bool,
630    manager: &TransactionalMemory,
631) -> Result<(bool, Option<RangeIterState>)> {
632    let node_mem = page.memory();
633    match node_mem[0] {
634        LEAF => {
635            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
636            let (mut position, found) = accessor.position::<K>(query);
637            let include = if position < accessor.num_pairs() {
638                include_query || !found
639            } else {
640                // Back up to the last valid position
641                position -= 1;
642                // and exclude it
643                false
644            };
645            let result = Leaf {
646                page,
647                fixed_key_size: K::fixed_width(),
648                fixed_value_size: V::fixed_width(),
649                entry: position,
650                parent,
651            };
652            Ok((include, Some(result)))
653        }
654        BRANCH => {
655            let accessor = BranchAccessor::new(&page, K::fixed_width());
656            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
657            let child_page = manager.get_page(child_page_number)?;
658            if child_index < accessor.count_children() - 1 {
659                parent = Some(Box::new(Internal {
660                    page,
661                    fixed_key_size: K::fixed_width(),
662                    fixed_value_size: V::fixed_width(),
663                    child: child_index + 1,
664                    parent,
665                }));
666            }
667            find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
668        }
669        _ => unreachable!(),
670    }
671}
672
673fn find_iter_right<K: Key, V: Value>(
674    page: PageImpl,
675    mut parent: Option<Box<RangeIterState>>,
676    query: &[u8],
677    include_query: bool,
678    manager: &TransactionalMemory,
679) -> Result<(bool, Option<RangeIterState>)> {
680    let node_mem = page.memory();
681    match node_mem[0] {
682        LEAF => {
683            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
684            let (mut position, found) = accessor.position::<K>(query);
685            let include = if position < accessor.num_pairs() {
686                include_query && found
687            } else {
688                // Back up to the last valid position
689                position -= 1;
690                // and include it
691                true
692            };
693            let result = Leaf {
694                page,
695                fixed_key_size: K::fixed_width(),
696                fixed_value_size: V::fixed_width(),
697                entry: position,
698                parent,
699            };
700            Ok((include, Some(result)))
701        }
702        BRANCH => {
703            let accessor = BranchAccessor::new(&page, K::fixed_width());
704            let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
705            let child_page = manager.get_page(child_page_number)?;
706            if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
707                parent = Some(Box::new(Internal {
708                    page,
709                    fixed_key_size: K::fixed_width(),
710                    fixed_value_size: V::fixed_width(),
711                    child: child_index - 1,
712                    parent,
713                }));
714            }
715            find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
716        }
717        _ => unreachable!(),
718    }
719}