1use crate::Result;
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
4use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
5use crate::tree_store::btree_mutator::MutateHelper;
6use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
7use crate::tree_store::{BtreeHeader, PageNumber, PageTrackerPolicy};
8use crate::types::{Key, Value};
9use std::borrow::Borrow;
10use std::collections::Bound;
11use std::marker::PhantomData;
12use std::ops::{Range, RangeBounds};
13use std::sync::{Arc, Mutex};
14
15#[derive(Debug, Clone)]
16pub enum RangeIterState {
17 Leaf {
18 page: PageImpl,
19 fixed_key_size: Option<usize>,
20 fixed_value_size: Option<usize>,
21 entry: usize,
22 parent: Option<Box<RangeIterState>>,
23 },
24 Internal {
25 page: PageImpl,
26 fixed_key_size: Option<usize>,
27 fixed_value_size: Option<usize>,
28 child: usize,
29 parent: Option<Box<RangeIterState>>,
30 },
31}
32
33impl RangeIterState {
34 fn page_number(&self) -> PageNumber {
35 match self {
36 Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
37 }
38 }
39
40 fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
41 match self {
42 Leaf {
43 page,
44 fixed_key_size,
45 fixed_value_size,
46 entry,
47 parent,
48 } => {
49 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
50 let direction = if reverse { -1 } else { 1 };
51 let next_entry = isize::try_from(entry).unwrap() + direction;
52 if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
53 Ok(Some(Leaf {
54 page,
55 fixed_key_size,
56 fixed_value_size,
57 entry: next_entry.try_into().unwrap(),
58 parent,
59 }))
60 } else {
61 Ok(parent.map(|x| *x))
62 }
63 }
64 Internal {
65 page,
66 fixed_key_size,
67 fixed_value_size,
68 child,
69 mut parent,
70 } => {
71 let accessor = BranchAccessor::new(&page, fixed_key_size);
72 let child_page = accessor.child_page(child).unwrap();
73 let child_page = manager.get_page(child_page)?;
74 let direction = if reverse { -1 } else { 1 };
75 let next_child = isize::try_from(child).unwrap() + direction;
76 if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
77 parent = Some(Box::new(Internal {
78 page,
79 fixed_key_size,
80 fixed_value_size,
81 child: next_child.try_into().unwrap(),
82 parent,
83 }));
84 }
85 match child_page.memory()[0] {
86 LEAF => {
87 let child_accessor = LeafAccessor::new(
88 child_page.memory(),
89 fixed_key_size,
90 fixed_value_size,
91 );
92 let entry = if reverse {
93 child_accessor.num_pairs() - 1
94 } else {
95 0
96 };
97 Ok(Some(Leaf {
98 page: child_page,
99 fixed_key_size,
100 fixed_value_size,
101 entry,
102 parent,
103 }))
104 }
105 BRANCH => {
106 let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
107 let child = if reverse {
108 child_accessor.count_children() - 1
109 } else {
110 0
111 };
112 Ok(Some(Internal {
113 page: child_page,
114 fixed_key_size,
115 fixed_value_size,
116 child,
117 parent,
118 }))
119 }
120 _ => unreachable!(),
121 }
122 }
123 }
124 }
125
126 fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
127 match self {
128 Leaf {
129 page,
130 fixed_key_size,
131 fixed_value_size,
132 entry,
133 ..
134 } => {
135 let (key, value) =
136 LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
137 .entry_ranges(*entry)?;
138 Some(EntryGuard::new(page.clone(), key, value))
139 }
140 Internal { .. } => None,
141 }
142 }
143}
144
145pub(crate) struct EntryGuard<K: Key, V: Value> {
146 page: PageImpl,
147 key_range: Range<usize>,
148 value_range: Range<usize>,
149 _key_type: PhantomData<K>,
150 _value_type: PhantomData<V>,
151}
152
153impl<K: Key, V: Value> EntryGuard<K, V> {
154 fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
155 Self {
156 page,
157 key_range,
158 value_range,
159 _key_type: Default::default(),
160 _value_type: Default::default(),
161 }
162 }
163
164 pub(crate) fn key_data(&self) -> Vec<u8> {
165 self.page.memory()[self.key_range.clone()].to_vec()
166 }
167
168 pub(crate) fn key(&self) -> K::SelfType<'_> {
169 K::from_bytes(&self.page.memory()[self.key_range.clone()])
170 }
171
172 pub(crate) fn value(&self) -> V::SelfType<'_> {
173 V::from_bytes(&self.page.memory()[self.value_range.clone()])
174 }
175
176 pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
177 (self.page, self.key_range, self.value_range)
178 }
179}
180
181pub(crate) struct AllPageNumbersBtreeIter {
182 next: Option<RangeIterState>,
183 manager: Arc<TransactionalMemory>,
184}
185
186impl AllPageNumbersBtreeIter {
187 pub(crate) fn new(
188 root: PageNumber,
189 fixed_key_size: Option<usize>,
190 fixed_value_size: Option<usize>,
191 manager: Arc<TransactionalMemory>,
192 ) -> Result<Self> {
193 let root_page = manager.get_page(root)?;
194 let node_mem = root_page.memory();
195 let start = match node_mem[0] {
196 LEAF => Leaf {
197 page: root_page,
198 fixed_key_size,
199 fixed_value_size,
200 entry: 0,
201 parent: None,
202 },
203 BRANCH => Internal {
204 page: root_page,
205 fixed_key_size,
206 fixed_value_size,
207 child: 0,
208 parent: None,
209 },
210 _ => unreachable!(),
211 };
212 Ok(Self {
213 next: Some(start),
214 manager,
215 })
216 }
217}
218
219impl Iterator for AllPageNumbersBtreeIter {
220 type Item = Result<PageNumber>;
221
222 fn next(&mut self) -> Option<Self::Item> {
223 loop {
224 let state = self.next.take()?;
225 let value = state.page_number();
226 let once = match state {
228 Leaf { entry, .. } => entry == 0,
229 Internal { child, .. } => child == 0,
230 };
231 match state.next(false, &self.manager) {
232 Ok(next) => {
233 self.next = next;
234 }
235 Err(err) => {
236 return Some(Err(err));
237 }
238 }
239 if once {
240 return Some(Ok(value));
241 }
242 }
243 }
244}
245
246pub(crate) struct BtreeExtractIf<
247 'a,
248 K: Key + 'static,
249 V: Value + 'static,
250 F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
251> {
252 root: &'a mut Option<BtreeHeader>,
253 inner: BtreeRangeIter<K, V>,
254 predicate: F,
255 free_on_drop: Vec<PageNumber>,
256 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
257 allocated: Arc<Mutex<PageTrackerPolicy>>,
258 mem: Arc<TransactionalMemory>,
259}
260
261impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
262 BtreeExtractIf<'a, K, V, F>
263{
264 pub(crate) fn new(
265 root: &'a mut Option<BtreeHeader>,
266 inner: BtreeRangeIter<K, V>,
267 predicate: F,
268 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
269 allocated: Arc<Mutex<PageTrackerPolicy>>,
270 mem: Arc<TransactionalMemory>,
271 ) -> Self {
272 Self {
273 root,
274 inner,
275 predicate,
276 free_on_drop: vec![],
277 master_free_list,
278 allocated,
279 mem,
280 }
281 }
282}
283
284impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
285 for BtreeExtractIf<'_, K, V, F>
286{
287 type Item = Result<EntryGuard<K, V>>;
288
289 fn next(&mut self) -> Option<Self::Item> {
290 let mut item = self.inner.next();
291 while let Some(Ok(ref entry)) = item {
292 if (self.predicate)(entry.key(), entry.value()) {
293 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
294 self.root,
295 self.mem.clone(),
296 &mut self.free_on_drop,
297 self.allocated.clone(),
298 );
299 match operation.delete(&entry.key()) {
300 Ok(x) => {
301 assert!(x.is_some());
302 }
303 Err(x) => {
304 return Some(Err(x));
305 }
306 }
307 break;
308 }
309 item = self.inner.next();
310 }
311 item
312 }
313}
314
315impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
316 DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
317{
318 fn next_back(&mut self) -> Option<Self::Item> {
319 let mut item = self.inner.next_back();
320 while let Some(Ok(ref entry)) = item {
321 if (self.predicate)(entry.key(), entry.value()) {
322 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
323 self.root,
324 self.mem.clone(),
325 &mut self.free_on_drop,
326 self.allocated.clone(),
327 );
328 match operation.delete(&entry.key()) {
329 Ok(x) => {
330 assert!(x.is_some());
331 }
332 Err(x) => {
333 return Some(Err(x));
334 }
335 }
336 break;
337 }
338 item = self.inner.next_back();
339 }
340 item
341 }
342}
343
344impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
345 for BtreeExtractIf<'_, K, V, F>
346{
347 fn drop(&mut self) {
348 let mut master_free_list = self.master_free_list.lock().unwrap();
349 let mut allocated = self.allocated.lock().unwrap();
350 for page in self.free_on_drop.drain(..) {
351 if !self.mem.free_if_uncommitted(page, &mut allocated) {
352 master_free_list.push(page);
353 }
354 }
355 }
356}
357
358#[derive(Clone)]
359pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
360 left: Option<RangeIterState>, right: Option<RangeIterState>, include_left: bool, include_right: bool, manager: Arc<TransactionalMemory>,
365 _key_type: PhantomData<K>,
366 _value_type: PhantomData<V>,
367}
368
369impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
370 pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
371 query_range: &'_ T,
372 table_root: Option<PageNumber>,
373 manager: Arc<TransactionalMemory>,
374 ) -> Result<Self> {
375 if let Some(root) = table_root {
376 let (include_left, left) = match query_range.start_bound() {
377 Bound::Included(k) => find_iter_left::<K, V>(
378 manager.get_page(root)?,
379 None,
380 K::as_bytes(k.borrow()).as_ref(),
381 true,
382 &manager,
383 )?,
384 Bound::Excluded(k) => find_iter_left::<K, V>(
385 manager.get_page(root)?,
386 None,
387 K::as_bytes(k.borrow()).as_ref(),
388 false,
389 &manager,
390 )?,
391 Bound::Unbounded => {
392 let state = find_iter_unbounded::<K, V>(
393 manager.get_page(root)?,
394 None,
395 false,
396 &manager,
397 )?;
398 (true, state)
399 }
400 };
401 let (include_right, right) = match query_range.end_bound() {
402 Bound::Included(k) => find_iter_right::<K, V>(
403 manager.get_page(root)?,
404 None,
405 K::as_bytes(k.borrow()).as_ref(),
406 true,
407 &manager,
408 )?,
409 Bound::Excluded(k) => find_iter_right::<K, V>(
410 manager.get_page(root)?,
411 None,
412 K::as_bytes(k.borrow()).as_ref(),
413 false,
414 &manager,
415 )?,
416 Bound::Unbounded => {
417 let state =
418 find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
419 (true, state)
420 }
421 };
422 Ok(Self {
423 left,
424 right,
425 include_left,
426 include_right,
427 manager,
428 _key_type: Default::default(),
429 _value_type: Default::default(),
430 })
431 } else {
432 Ok(Self {
433 left: None,
434 right: None,
435 include_left: false,
436 include_right: false,
437 manager,
438 _key_type: Default::default(),
439 _value_type: Default::default(),
440 })
441 }
442 }
443}
444
445impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
446 type Item = Result<EntryGuard<K, V>>;
447
448 fn next(&mut self) -> Option<Self::Item> {
449 if let (
450 Some(Leaf {
451 page: left_page,
452 entry: left_entry,
453 ..
454 }),
455 Some(Leaf {
456 page: right_page,
457 entry: right_entry,
458 ..
459 }),
460 ) = (&self.left, &self.right)
461 {
462 if left_page.get_page_number() == right_page.get_page_number()
463 && (left_entry > right_entry
464 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
465 {
466 return None;
467 }
468 }
469
470 loop {
471 if !self.include_left {
472 match self.left.take()?.next(false, &self.manager) {
473 Ok(left) => {
474 self.left = left;
475 }
476 Err(err) => {
477 return Some(Err(err));
478 }
479 }
480 }
481 self.left.as_ref()?;
483
484 if let (
485 Some(Leaf {
486 page: left_page,
487 entry: left_entry,
488 ..
489 }),
490 Some(Leaf {
491 page: right_page,
492 entry: right_entry,
493 ..
494 }),
495 ) = (&self.left, &self.right)
496 {
497 if left_page.get_page_number() == right_page.get_page_number()
498 && (left_entry > right_entry
499 || (left_entry == right_entry && !self.include_right))
500 {
501 return None;
502 }
503 }
504
505 self.include_left = false;
506 if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
507 return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
508 }
509 }
510 }
511}
512
513impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
514 fn next_back(&mut self) -> Option<Self::Item> {
515 if let (
516 Some(Leaf {
517 page: left_page,
518 entry: left_entry,
519 ..
520 }),
521 Some(Leaf {
522 page: right_page,
523 entry: right_entry,
524 ..
525 }),
526 ) = (&self.left, &self.right)
527 {
528 if left_page.get_page_number() == right_page.get_page_number()
529 && (left_entry > right_entry
530 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
531 {
532 return None;
533 }
534 }
535
536 loop {
537 if !self.include_right {
538 match self.right.take()?.next(true, &self.manager) {
539 Ok(right) => {
540 self.right = right;
541 }
542 Err(err) => {
543 return Some(Err(err));
544 }
545 }
546 }
547 self.right.as_ref()?;
549
550 if let (
551 Some(Leaf {
552 page: left_page,
553 entry: left_entry,
554 ..
555 }),
556 Some(Leaf {
557 page: right_page,
558 entry: right_entry,
559 ..
560 }),
561 ) = (&self.left, &self.right)
562 {
563 if left_page.get_page_number() == right_page.get_page_number()
564 && (left_entry > right_entry
565 || (left_entry == right_entry && !self.include_left))
566 {
567 return None;
568 }
569 }
570
571 self.include_right = false;
572 if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
573 return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
574 }
575 }
576 }
577}
578
579fn find_iter_unbounded<K: Key, V: Value>(
580 page: PageImpl,
581 mut parent: Option<Box<RangeIterState>>,
582 reverse: bool,
583 manager: &TransactionalMemory,
584) -> Result<Option<RangeIterState>> {
585 let node_mem = page.memory();
586 match node_mem[0] {
587 LEAF => {
588 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
589 let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
590 Ok(Some(Leaf {
591 page,
592 fixed_key_size: K::fixed_width(),
593 fixed_value_size: V::fixed_width(),
594 entry,
595 parent,
596 }))
597 }
598 BRANCH => {
599 let accessor = BranchAccessor::new(&page, K::fixed_width());
600 let child_index = if reverse {
601 accessor.count_children() - 1
602 } else {
603 0
604 };
605 let child_page_number = accessor.child_page(child_index).unwrap();
606 let child_page = manager.get_page(child_page_number)?;
607 let direction = if reverse { -1isize } else { 1 };
608 parent = Some(Box::new(Internal {
609 page,
610 fixed_key_size: K::fixed_width(),
611 fixed_value_size: V::fixed_width(),
612 child: (isize::try_from(child_index).unwrap() + direction)
613 .try_into()
614 .unwrap(),
615 parent,
616 }));
617 find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
618 }
619 _ => unreachable!(),
620 }
621}
622
623fn find_iter_left<K: Key, V: Value>(
626 page: PageImpl,
627 mut parent: Option<Box<RangeIterState>>,
628 query: &[u8],
629 include_query: bool,
630 manager: &TransactionalMemory,
631) -> Result<(bool, Option<RangeIterState>)> {
632 let node_mem = page.memory();
633 match node_mem[0] {
634 LEAF => {
635 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
636 let (mut position, found) = accessor.position::<K>(query);
637 let include = if position < accessor.num_pairs() {
638 include_query || !found
639 } else {
640 position -= 1;
642 false
644 };
645 let result = Leaf {
646 page,
647 fixed_key_size: K::fixed_width(),
648 fixed_value_size: V::fixed_width(),
649 entry: position,
650 parent,
651 };
652 Ok((include, Some(result)))
653 }
654 BRANCH => {
655 let accessor = BranchAccessor::new(&page, K::fixed_width());
656 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
657 let child_page = manager.get_page(child_page_number)?;
658 if child_index < accessor.count_children() - 1 {
659 parent = Some(Box::new(Internal {
660 page,
661 fixed_key_size: K::fixed_width(),
662 fixed_value_size: V::fixed_width(),
663 child: child_index + 1,
664 parent,
665 }));
666 }
667 find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
668 }
669 _ => unreachable!(),
670 }
671}
672
673fn find_iter_right<K: Key, V: Value>(
674 page: PageImpl,
675 mut parent: Option<Box<RangeIterState>>,
676 query: &[u8],
677 include_query: bool,
678 manager: &TransactionalMemory,
679) -> Result<(bool, Option<RangeIterState>)> {
680 let node_mem = page.memory();
681 match node_mem[0] {
682 LEAF => {
683 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
684 let (mut position, found) = accessor.position::<K>(query);
685 let include = if position < accessor.num_pairs() {
686 include_query && found
687 } else {
688 position -= 1;
690 true
692 };
693 let result = Leaf {
694 page,
695 fixed_key_size: K::fixed_width(),
696 fixed_value_size: V::fixed_width(),
697 entry: position,
698 parent,
699 };
700 Ok((include, Some(result)))
701 }
702 BRANCH => {
703 let accessor = BranchAccessor::new(&page, K::fixed_width());
704 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
705 let child_page = manager.get_page(child_page_number)?;
706 if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
707 parent = Some(Box::new(Internal {
708 page,
709 fixed_key_size: K::fixed_width(),
710 fixed_value_size: V::fixed_width(),
711 child: child_index - 1,
712 parent,
713 }));
714 }
715 find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
716 }
717 _ => unreachable!(),
718 }
719}