1use crate::Result;
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
4use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
5use crate::tree_store::btree_mutator::MutateHelper;
6use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
7use crate::tree_store::{BtreeHeader, PageNumber, PageTrackerPolicy};
8use crate::types::{Key, Value};
9use Bound::{Excluded, Included, Unbounded};
10use std::borrow::Borrow;
11use std::collections::Bound;
12use std::marker::PhantomData;
13use std::ops::{Range, RangeBounds};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug, Clone)]
17pub enum RangeIterState {
18 Leaf {
19 page: PageImpl,
20 fixed_key_size: Option<usize>,
21 fixed_value_size: Option<usize>,
22 entry: usize,
23 parent: Option<Box<RangeIterState>>,
24 },
25 Internal {
26 page: PageImpl,
27 fixed_key_size: Option<usize>,
28 fixed_value_size: Option<usize>,
29 child: usize,
30 parent: Option<Box<RangeIterState>>,
31 },
32}
33
34impl RangeIterState {
35 fn page_number(&self) -> PageNumber {
36 match self {
37 Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
38 }
39 }
40
41 fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
42 match self {
43 Leaf {
44 page,
45 fixed_key_size,
46 fixed_value_size,
47 entry,
48 parent,
49 } => {
50 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
51 let direction = if reverse { -1 } else { 1 };
52 let next_entry = isize::try_from(entry).unwrap() + direction;
53 if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
54 Ok(Some(Leaf {
55 page,
56 fixed_key_size,
57 fixed_value_size,
58 entry: next_entry.try_into().unwrap(),
59 parent,
60 }))
61 } else {
62 Ok(parent.map(|x| *x))
63 }
64 }
65 Internal {
66 page,
67 fixed_key_size,
68 fixed_value_size,
69 child,
70 mut parent,
71 } => {
72 let accessor = BranchAccessor::new(&page, fixed_key_size);
73 let child_page = accessor.child_page(child).unwrap();
74 let child_page = manager.get_page(child_page)?;
75 let direction = if reverse { -1 } else { 1 };
76 let next_child = isize::try_from(child).unwrap() + direction;
77 if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
78 parent = Some(Box::new(Internal {
79 page,
80 fixed_key_size,
81 fixed_value_size,
82 child: next_child.try_into().unwrap(),
83 parent,
84 }));
85 }
86 match child_page.memory()[0] {
87 LEAF => {
88 let child_accessor = LeafAccessor::new(
89 child_page.memory(),
90 fixed_key_size,
91 fixed_value_size,
92 );
93 let entry = if reverse {
94 child_accessor.num_pairs() - 1
95 } else {
96 0
97 };
98 Ok(Some(Leaf {
99 page: child_page,
100 fixed_key_size,
101 fixed_value_size,
102 entry,
103 parent,
104 }))
105 }
106 BRANCH => {
107 let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
108 let child = if reverse {
109 child_accessor.count_children() - 1
110 } else {
111 0
112 };
113 Ok(Some(Internal {
114 page: child_page,
115 fixed_key_size,
116 fixed_value_size,
117 child,
118 parent,
119 }))
120 }
121 _ => unreachable!(),
122 }
123 }
124 }
125 }
126
127 fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
128 match self {
129 Leaf {
130 page,
131 fixed_key_size,
132 fixed_value_size,
133 entry,
134 ..
135 } => {
136 let (key, value) =
137 LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
138 .entry_ranges(*entry)?;
139 Some(EntryGuard::new(page.clone(), key, value))
140 }
141 Internal { .. } => None,
142 }
143 }
144}
145
146pub(crate) struct EntryGuard<K: Key, V: Value> {
147 page: PageImpl,
148 key_range: Range<usize>,
149 value_range: Range<usize>,
150 _key_type: PhantomData<K>,
151 _value_type: PhantomData<V>,
152}
153
154impl<K: Key, V: Value> EntryGuard<K, V> {
155 fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
156 Self {
157 page,
158 key_range,
159 value_range,
160 _key_type: Default::default(),
161 _value_type: Default::default(),
162 }
163 }
164
165 pub(crate) fn key_data(&self) -> Vec<u8> {
166 self.page.memory()[self.key_range.clone()].to_vec()
167 }
168
169 pub(crate) fn key(&self) -> K::SelfType<'_> {
170 K::from_bytes(&self.page.memory()[self.key_range.clone()])
171 }
172
173 pub(crate) fn value(&self) -> V::SelfType<'_> {
174 V::from_bytes(&self.page.memory()[self.value_range.clone()])
175 }
176
177 pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
178 (self.page, self.key_range, self.value_range)
179 }
180}
181
182pub(crate) struct AllPageNumbersBtreeIter {
183 next: Option<RangeIterState>,
184 manager: Arc<TransactionalMemory>,
185}
186
187impl AllPageNumbersBtreeIter {
188 pub(crate) fn new(
189 root: PageNumber,
190 fixed_key_size: Option<usize>,
191 fixed_value_size: Option<usize>,
192 manager: Arc<TransactionalMemory>,
193 ) -> Result<Self> {
194 let root_page = manager.get_page(root)?;
195 let node_mem = root_page.memory();
196 let start = match node_mem[0] {
197 LEAF => Leaf {
198 page: root_page,
199 fixed_key_size,
200 fixed_value_size,
201 entry: 0,
202 parent: None,
203 },
204 BRANCH => Internal {
205 page: root_page,
206 fixed_key_size,
207 fixed_value_size,
208 child: 0,
209 parent: None,
210 },
211 _ => unreachable!(),
212 };
213 Ok(Self {
214 next: Some(start),
215 manager,
216 })
217 }
218}
219
220impl Iterator for AllPageNumbersBtreeIter {
221 type Item = Result<PageNumber>;
222
223 fn next(&mut self) -> Option<Self::Item> {
224 loop {
225 let state = self.next.take()?;
226 let value = state.page_number();
227 let once = match state {
229 Leaf { entry, .. } => entry == 0,
230 Internal { child, .. } => child == 0,
231 };
232 match state.next(false, &self.manager) {
233 Ok(next) => {
234 self.next = next;
235 }
236 Err(err) => {
237 return Some(Err(err));
238 }
239 }
240 if once {
241 return Some(Ok(value));
242 }
243 }
244 }
245}
246
247pub(crate) struct BtreeExtractIf<
248 'a,
249 K: Key + 'static,
250 V: Value + 'static,
251 F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
252> {
253 root: &'a mut Option<BtreeHeader>,
254 inner: BtreeRangeIter<K, V>,
255 predicate: F,
256 free_on_drop: Vec<PageNumber>,
257 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
258 allocated: Arc<Mutex<PageTrackerPolicy>>,
259 mem: Arc<TransactionalMemory>,
260}
261
262impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
263 BtreeExtractIf<'a, K, V, F>
264{
265 pub(crate) fn new(
266 root: &'a mut Option<BtreeHeader>,
267 inner: BtreeRangeIter<K, V>,
268 predicate: F,
269 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
270 allocated: Arc<Mutex<PageTrackerPolicy>>,
271 mem: Arc<TransactionalMemory>,
272 ) -> Self {
273 Self {
274 root,
275 inner,
276 predicate,
277 free_on_drop: vec![],
278 master_free_list,
279 allocated,
280 mem,
281 }
282 }
283}
284
285impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
286 for BtreeExtractIf<'_, K, V, F>
287{
288 type Item = Result<EntryGuard<K, V>>;
289
290 fn next(&mut self) -> Option<Self::Item> {
291 let mut item = self.inner.next();
292 while let Some(Ok(ref entry)) = item {
293 if (self.predicate)(entry.key(), entry.value()) {
294 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
295 self.root,
296 self.mem.clone(),
297 &mut self.free_on_drop,
298 self.allocated.clone(),
299 );
300 match operation.delete(&entry.key()) {
301 Ok(x) => {
302 assert!(x.is_some());
303 }
304 Err(x) => {
305 return Some(Err(x));
306 }
307 }
308 break;
309 }
310 item = self.inner.next();
311 }
312 item
313 }
314}
315
316impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
317 DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
318{
319 fn next_back(&mut self) -> Option<Self::Item> {
320 let mut item = self.inner.next_back();
321 while let Some(Ok(ref entry)) = item {
322 if (self.predicate)(entry.key(), entry.value()) {
323 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
324 self.root,
325 self.mem.clone(),
326 &mut self.free_on_drop,
327 self.allocated.clone(),
328 );
329 match operation.delete(&entry.key()) {
330 Ok(x) => {
331 assert!(x.is_some());
332 }
333 Err(x) => {
334 return Some(Err(x));
335 }
336 }
337 break;
338 }
339 item = self.inner.next_back();
340 }
341 item
342 }
343}
344
345impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
346 for BtreeExtractIf<'_, K, V, F>
347{
348 fn drop(&mut self) {
349 let mut master_free_list = self.master_free_list.lock().unwrap();
350 let mut allocated = self.allocated.lock().unwrap();
351 for page in self.free_on_drop.drain(..) {
352 if !self.mem.free_if_uncommitted(page, &mut allocated) {
353 master_free_list.push(page);
354 }
355 }
356 }
357}
358
359#[derive(Clone)]
360pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
361 left: Option<RangeIterState>, right: Option<RangeIterState>, include_left: bool, include_right: bool, manager: Arc<TransactionalMemory>,
366 _key_type: PhantomData<K>,
367 _value_type: PhantomData<V>,
368}
369
370fn range_is_empty<'a, K: Key + 'static, KR: Borrow<K::SelfType<'a>>, T: RangeBounds<KR>>(
371 range: &T,
372) -> bool {
373 match (range.start_bound(), range.end_bound()) {
374 (Unbounded, _) | (_, Unbounded) => false,
375 (Included(start), Excluded(end)) | (Excluded(start), Included(end) | Excluded(end)) => {
376 let start_tmp = K::as_bytes(start.borrow());
377 let start_value = start_tmp.as_ref();
378 let end_tmp = K::as_bytes(end.borrow());
379 let end_value = end_tmp.as_ref();
380 K::compare(start_value, end_value).is_ge()
381 }
382 (Included(start), Included(end)) => {
383 let start_tmp = K::as_bytes(start.borrow());
384 let start_value = start_tmp.as_ref();
385 let end_tmp = K::as_bytes(end.borrow());
386 let end_value = end_tmp.as_ref();
387 K::compare(start_value, end_value).is_gt()
388 }
389 }
390}
391
392impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
393 pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
394 query_range: &'_ T,
395 table_root: Option<PageNumber>,
396 manager: Arc<TransactionalMemory>,
397 ) -> Result<Self> {
398 if range_is_empty::<K, KR, T>(query_range) {
399 return Ok(Self {
400 left: None,
401 right: None,
402 include_left: false,
403 include_right: false,
404 manager,
405 _key_type: Default::default(),
406 _value_type: Default::default(),
407 });
408 }
409 if let Some(root) = table_root {
410 let (include_left, left) = match query_range.start_bound() {
411 Included(k) => find_iter_left::<K, V>(
412 manager.get_page(root)?,
413 None,
414 K::as_bytes(k.borrow()).as_ref(),
415 true,
416 &manager,
417 )?,
418 Excluded(k) => find_iter_left::<K, V>(
419 manager.get_page(root)?,
420 None,
421 K::as_bytes(k.borrow()).as_ref(),
422 false,
423 &manager,
424 )?,
425 Unbounded => {
426 let state = find_iter_unbounded::<K, V>(
427 manager.get_page(root)?,
428 None,
429 false,
430 &manager,
431 )?;
432 (true, state)
433 }
434 };
435 let (include_right, right) = match query_range.end_bound() {
436 Included(k) => find_iter_right::<K, V>(
437 manager.get_page(root)?,
438 None,
439 K::as_bytes(k.borrow()).as_ref(),
440 true,
441 &manager,
442 )?,
443 Excluded(k) => find_iter_right::<K, V>(
444 manager.get_page(root)?,
445 None,
446 K::as_bytes(k.borrow()).as_ref(),
447 false,
448 &manager,
449 )?,
450 Unbounded => {
451 let state =
452 find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
453 (true, state)
454 }
455 };
456 Ok(Self {
457 left,
458 right,
459 include_left,
460 include_right,
461 manager,
462 _key_type: Default::default(),
463 _value_type: Default::default(),
464 })
465 } else {
466 Ok(Self {
467 left: None,
468 right: None,
469 include_left: false,
470 include_right: false,
471 manager,
472 _key_type: Default::default(),
473 _value_type: Default::default(),
474 })
475 }
476 }
477}
478
479impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
480 type Item = Result<EntryGuard<K, V>>;
481
482 fn next(&mut self) -> Option<Self::Item> {
483 if let (
484 Some(Leaf {
485 page: left_page,
486 entry: left_entry,
487 ..
488 }),
489 Some(Leaf {
490 page: right_page,
491 entry: right_entry,
492 ..
493 }),
494 ) = (&self.left, &self.right)
495 {
496 if left_page.get_page_number() == right_page.get_page_number()
497 && (left_entry > right_entry
498 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
499 {
500 return None;
501 }
502 }
503
504 loop {
505 if !self.include_left {
506 match self.left.take()?.next(false, &self.manager) {
507 Ok(left) => {
508 self.left = left;
509 }
510 Err(err) => {
511 return Some(Err(err));
512 }
513 }
514 }
515 self.left.as_ref()?;
517
518 if let (
519 Some(Leaf {
520 page: left_page,
521 entry: left_entry,
522 ..
523 }),
524 Some(Leaf {
525 page: right_page,
526 entry: right_entry,
527 ..
528 }),
529 ) = (&self.left, &self.right)
530 {
531 if left_page.get_page_number() == right_page.get_page_number()
532 && (left_entry > right_entry
533 || (left_entry == right_entry && !self.include_right))
534 {
535 return None;
536 }
537 }
538
539 self.include_left = false;
540 if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
541 return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
542 }
543 }
544 }
545}
546
547impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
548 fn next_back(&mut self) -> Option<Self::Item> {
549 if let (
550 Some(Leaf {
551 page: left_page,
552 entry: left_entry,
553 ..
554 }),
555 Some(Leaf {
556 page: right_page,
557 entry: right_entry,
558 ..
559 }),
560 ) = (&self.left, &self.right)
561 {
562 if left_page.get_page_number() == right_page.get_page_number()
563 && (left_entry > right_entry
564 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
565 {
566 return None;
567 }
568 }
569
570 loop {
571 if !self.include_right {
572 match self.right.take()?.next(true, &self.manager) {
573 Ok(right) => {
574 self.right = right;
575 }
576 Err(err) => {
577 return Some(Err(err));
578 }
579 }
580 }
581 self.right.as_ref()?;
583
584 if let (
585 Some(Leaf {
586 page: left_page,
587 entry: left_entry,
588 ..
589 }),
590 Some(Leaf {
591 page: right_page,
592 entry: right_entry,
593 ..
594 }),
595 ) = (&self.left, &self.right)
596 {
597 if left_page.get_page_number() == right_page.get_page_number()
598 && (left_entry > right_entry
599 || (left_entry == right_entry && !self.include_left))
600 {
601 return None;
602 }
603 }
604
605 self.include_right = false;
606 if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
607 return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
608 }
609 }
610 }
611}
612
613fn find_iter_unbounded<K: Key, V: Value>(
614 page: PageImpl,
615 mut parent: Option<Box<RangeIterState>>,
616 reverse: bool,
617 manager: &TransactionalMemory,
618) -> Result<Option<RangeIterState>> {
619 let node_mem = page.memory();
620 match node_mem[0] {
621 LEAF => {
622 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
623 let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
624 Ok(Some(Leaf {
625 page,
626 fixed_key_size: K::fixed_width(),
627 fixed_value_size: V::fixed_width(),
628 entry,
629 parent,
630 }))
631 }
632 BRANCH => {
633 let accessor = BranchAccessor::new(&page, K::fixed_width());
634 let child_index = if reverse {
635 accessor.count_children() - 1
636 } else {
637 0
638 };
639 let child_page_number = accessor.child_page(child_index).unwrap();
640 let child_page = manager.get_page(child_page_number)?;
641 let direction = if reverse { -1isize } else { 1 };
642 parent = Some(Box::new(Internal {
643 page,
644 fixed_key_size: K::fixed_width(),
645 fixed_value_size: V::fixed_width(),
646 child: (isize::try_from(child_index).unwrap() + direction)
647 .try_into()
648 .unwrap(),
649 parent,
650 }));
651 find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
652 }
653 _ => unreachable!(),
654 }
655}
656
657fn find_iter_left<K: Key, V: Value>(
660 page: PageImpl,
661 mut parent: Option<Box<RangeIterState>>,
662 query: &[u8],
663 include_query: bool,
664 manager: &TransactionalMemory,
665) -> Result<(bool, Option<RangeIterState>)> {
666 let node_mem = page.memory();
667 match node_mem[0] {
668 LEAF => {
669 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
670 let (mut position, found) = accessor.position::<K>(query);
671 let include = if position < accessor.num_pairs() {
672 include_query || !found
673 } else {
674 position -= 1;
676 false
678 };
679 let result = Leaf {
680 page,
681 fixed_key_size: K::fixed_width(),
682 fixed_value_size: V::fixed_width(),
683 entry: position,
684 parent,
685 };
686 Ok((include, Some(result)))
687 }
688 BRANCH => {
689 let accessor = BranchAccessor::new(&page, K::fixed_width());
690 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
691 let child_page = manager.get_page(child_page_number)?;
692 if child_index < accessor.count_children() - 1 {
693 parent = Some(Box::new(Internal {
694 page,
695 fixed_key_size: K::fixed_width(),
696 fixed_value_size: V::fixed_width(),
697 child: child_index + 1,
698 parent,
699 }));
700 }
701 find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
702 }
703 _ => unreachable!(),
704 }
705}
706
707fn find_iter_right<K: Key, V: Value>(
708 page: PageImpl,
709 mut parent: Option<Box<RangeIterState>>,
710 query: &[u8],
711 include_query: bool,
712 manager: &TransactionalMemory,
713) -> Result<(bool, Option<RangeIterState>)> {
714 let node_mem = page.memory();
715 match node_mem[0] {
716 LEAF => {
717 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
718 let (mut position, found) = accessor.position::<K>(query);
719 let include = if position < accessor.num_pairs() {
720 include_query && found
721 } else {
722 position -= 1;
724 true
726 };
727 let result = Leaf {
728 page,
729 fixed_key_size: K::fixed_width(),
730 fixed_value_size: V::fixed_width(),
731 entry: position,
732 parent,
733 };
734 Ok((include, Some(result)))
735 }
736 BRANCH => {
737 let accessor = BranchAccessor::new(&page, K::fixed_width());
738 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
739 let child_page = manager.get_page(child_page_number)?;
740 if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
741 parent = Some(Box::new(Internal {
742 page,
743 fixed_key_size: K::fixed_width(),
744 fixed_value_size: V::fixed_width(),
745 child: child_index - 1,
746 parent,
747 }));
748 }
749 find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
750 }
751 _ => unreachable!(),
752 }
753}