1use crate::tree_store::btree_base::{BranchAccessor, LeafAccessor};
2use crate::tree_store::btree_base::{BRANCH, LEAF};
3use crate::tree_store::btree_iters::RangeIterState::{Internal, Leaf};
4use crate::tree_store::btree_mutator::MutateHelper;
5use crate::tree_store::page_store::{Page, PageImpl, TransactionalMemory};
6use crate::tree_store::{BtreeHeader, PageNumber};
7use crate::types::{Key, Value};
8use crate::Result;
9use std::borrow::Borrow;
10use std::collections::Bound;
11use std::marker::PhantomData;
12use std::ops::{Range, RangeBounds};
13use std::sync::{Arc, Mutex};
14
15#[derive(Debug, Clone)]
16pub enum RangeIterState {
17 Leaf {
18 page: PageImpl,
19 fixed_key_size: Option<usize>,
20 fixed_value_size: Option<usize>,
21 entry: usize,
22 parent: Option<Box<RangeIterState>>,
23 },
24 Internal {
25 page: PageImpl,
26 fixed_key_size: Option<usize>,
27 fixed_value_size: Option<usize>,
28 child: usize,
29 parent: Option<Box<RangeIterState>>,
30 },
31}
32
33impl RangeIterState {
34 fn page_number(&self) -> PageNumber {
35 match self {
36 Leaf { page, .. } | Internal { page, .. } => page.get_page_number(),
37 }
38 }
39
40 fn next(self, reverse: bool, manager: &TransactionalMemory) -> Result<Option<RangeIterState>> {
41 match self {
42 Leaf {
43 page,
44 fixed_key_size,
45 fixed_value_size,
46 entry,
47 parent,
48 } => {
49 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
50 let direction = if reverse { -1 } else { 1 };
51 let next_entry = isize::try_from(entry).unwrap() + direction;
52 if 0 <= next_entry && next_entry < accessor.num_pairs().try_into().unwrap() {
53 Ok(Some(Leaf {
54 page,
55 fixed_key_size,
56 fixed_value_size,
57 entry: next_entry.try_into().unwrap(),
58 parent,
59 }))
60 } else {
61 Ok(parent.map(|x| *x))
62 }
63 }
64 Internal {
65 page,
66 fixed_key_size,
67 fixed_value_size,
68 child,
69 mut parent,
70 } => {
71 let accessor = BranchAccessor::new(&page, fixed_key_size);
72 let child_page = accessor.child_page(child).unwrap();
73 let child_page = manager.get_page(child_page)?;
74 let direction = if reverse { -1 } else { 1 };
75 let next_child = isize::try_from(child).unwrap() + direction;
76 if 0 <= next_child && next_child < accessor.count_children().try_into().unwrap() {
77 parent = Some(Box::new(Internal {
78 page,
79 fixed_key_size,
80 fixed_value_size,
81 child: next_child.try_into().unwrap(),
82 parent,
83 }));
84 }
85 match child_page.memory()[0] {
86 LEAF => {
87 let child_accessor = LeafAccessor::new(
88 child_page.memory(),
89 fixed_key_size,
90 fixed_value_size,
91 );
92 let entry = if reverse {
93 child_accessor.num_pairs() - 1
94 } else {
95 0
96 };
97 Ok(Some(Leaf {
98 page: child_page,
99 fixed_key_size,
100 fixed_value_size,
101 entry,
102 parent,
103 }))
104 }
105 BRANCH => {
106 let child_accessor = BranchAccessor::new(&child_page, fixed_key_size);
107 let child = if reverse {
108 child_accessor.count_children() - 1
109 } else {
110 0
111 };
112 Ok(Some(Internal {
113 page: child_page,
114 fixed_key_size,
115 fixed_value_size,
116 child,
117 parent,
118 }))
119 }
120 _ => unreachable!(),
121 }
122 }
123 }
124 }
125
126 fn get_entry<K: Key, V: Value>(&self) -> Option<EntryGuard<K, V>> {
127 match self {
128 Leaf {
129 page,
130 fixed_key_size,
131 fixed_value_size,
132 entry,
133 ..
134 } => {
135 let (key, value) =
136 LeafAccessor::new(page.memory(), *fixed_key_size, *fixed_value_size)
137 .entry_ranges(*entry)?;
138 Some(EntryGuard::new(page.clone(), key, value))
139 }
140 Internal { .. } => None,
141 }
142 }
143}
144
145pub(crate) struct EntryGuard<K: Key, V: Value> {
146 page: PageImpl,
147 key_range: Range<usize>,
148 value_range: Range<usize>,
149 _key_type: PhantomData<K>,
150 _value_type: PhantomData<V>,
151}
152
153impl<K: Key, V: Value> EntryGuard<K, V> {
154 fn new(page: PageImpl, key_range: Range<usize>, value_range: Range<usize>) -> Self {
155 Self {
156 page,
157 key_range,
158 value_range,
159 _key_type: Default::default(),
160 _value_type: Default::default(),
161 }
162 }
163
164 pub(crate) fn key_data(&self) -> Vec<u8> {
165 self.page.memory()[self.key_range.clone()].to_vec()
166 }
167
168 pub(crate) fn key(&self) -> K::SelfType<'_> {
169 K::from_bytes(&self.page.memory()[self.key_range.clone()])
170 }
171
172 pub(crate) fn value(&self) -> V::SelfType<'_> {
173 V::from_bytes(&self.page.memory()[self.value_range.clone()])
174 }
175
176 pub(crate) fn into_raw(self) -> (PageImpl, Range<usize>, Range<usize>) {
177 (self.page, self.key_range, self.value_range)
178 }
179}
180
181pub(crate) struct AllPageNumbersBtreeIter {
182 next: Option<RangeIterState>,
183 manager: Arc<TransactionalMemory>,
184}
185
186impl AllPageNumbersBtreeIter {
187 pub(crate) fn new(
188 root: PageNumber,
189 fixed_key_size: Option<usize>,
190 fixed_value_size: Option<usize>,
191 manager: Arc<TransactionalMemory>,
192 ) -> Result<Self> {
193 let root_page = manager.get_page(root)?;
194 let node_mem = root_page.memory();
195 let start = match node_mem[0] {
196 LEAF => Leaf {
197 page: root_page,
198 fixed_key_size,
199 fixed_value_size,
200 entry: 0,
201 parent: None,
202 },
203 BRANCH => Internal {
204 page: root_page,
205 fixed_key_size,
206 fixed_value_size,
207 child: 0,
208 parent: None,
209 },
210 _ => unreachable!(),
211 };
212 Ok(Self {
213 next: Some(start),
214 manager,
215 })
216 }
217}
218
219impl Iterator for AllPageNumbersBtreeIter {
220 type Item = Result<PageNumber>;
221
222 fn next(&mut self) -> Option<Self::Item> {
223 loop {
224 let state = self.next.take()?;
225 let value = state.page_number();
226 let once = match state {
228 Leaf { entry, .. } => entry == 0,
229 Internal { child, .. } => child == 0,
230 };
231 match state.next(false, &self.manager) {
232 Ok(next) => {
233 self.next = next;
234 }
235 Err(err) => {
236 return Some(Err(err));
237 }
238 }
239 if once {
240 return Some(Ok(value));
241 }
242 }
243 }
244}
245
246pub(crate) struct BtreeExtractIf<
247 'a,
248 K: Key + 'static,
249 V: Value + 'static,
250 F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
251> {
252 root: &'a mut Option<BtreeHeader>,
253 inner: BtreeRangeIter<K, V>,
254 predicate: F,
255 free_on_drop: Vec<PageNumber>,
256 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
257 mem: Arc<TransactionalMemory>,
258}
259
260impl<'a, K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
261 BtreeExtractIf<'a, K, V, F>
262{
263 pub(crate) fn new(
264 root: &'a mut Option<BtreeHeader>,
265 inner: BtreeRangeIter<K, V>,
266 predicate: F,
267 master_free_list: Arc<Mutex<Vec<PageNumber>>>,
268 mem: Arc<TransactionalMemory>,
269 ) -> Self {
270 Self {
271 root,
272 inner,
273 predicate,
274 free_on_drop: vec![],
275 master_free_list,
276 mem,
277 }
278 }
279}
280
281impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Iterator
282 for BtreeExtractIf<'_, K, V, F>
283{
284 type Item = Result<EntryGuard<K, V>>;
285
286 fn next(&mut self) -> Option<Self::Item> {
287 let mut item = self.inner.next();
288 while let Some(Ok(ref entry)) = item {
289 if (self.predicate)(entry.key(), entry.value()) {
290 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
291 self.root,
292 self.mem.clone(),
293 &mut self.free_on_drop,
294 );
295 match operation.delete(&entry.key()) {
296 Ok(x) => {
297 assert!(x.is_some());
298 }
299 Err(x) => {
300 return Some(Err(x));
301 }
302 }
303 break;
304 }
305 item = self.inner.next();
306 }
307 item
308 }
309}
310
311impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>
312 DoubleEndedIterator for BtreeExtractIf<'_, K, V, F>
313{
314 fn next_back(&mut self) -> Option<Self::Item> {
315 let mut item = self.inner.next_back();
316 while let Some(Ok(ref entry)) = item {
317 if (self.predicate)(entry.key(), entry.value()) {
318 let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
319 self.root,
320 self.mem.clone(),
321 &mut self.free_on_drop,
322 );
323 match operation.delete(&entry.key()) {
324 Ok(x) => {
325 assert!(x.is_some());
326 }
327 Err(x) => {
328 return Some(Err(x));
329 }
330 }
331 break;
332 }
333 item = self.inner.next_back();
334 }
335 item
336 }
337}
338
339impl<K: Key, V: Value, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool> Drop
340 for BtreeExtractIf<'_, K, V, F>
341{
342 fn drop(&mut self) {
343 let mut master_free_list = self.master_free_list.lock().unwrap();
344 for page in self.free_on_drop.drain(..) {
345 if !self.mem.free_if_uncommitted(page) {
346 master_free_list.push(page);
347 }
348 }
349 }
350}
351
352#[derive(Clone)]
353pub(crate) struct BtreeRangeIter<K: Key + 'static, V: Value + 'static> {
354 left: Option<RangeIterState>, right: Option<RangeIterState>, include_left: bool, include_right: bool, manager: Arc<TransactionalMemory>,
359 _key_type: PhantomData<K>,
360 _value_type: PhantomData<V>,
361}
362
363impl<K: Key + 'static, V: Value + 'static> BtreeRangeIter<K, V> {
364 pub(crate) fn new<'a, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a>>>(
365 query_range: &'_ T,
366 table_root: Option<PageNumber>,
367 manager: Arc<TransactionalMemory>,
368 ) -> Result<Self> {
369 if let Some(root) = table_root {
370 let (include_left, left) = match query_range.start_bound() {
371 Bound::Included(k) => find_iter_left::<K, V>(
372 manager.get_page(root)?,
373 None,
374 K::as_bytes(k.borrow()).as_ref(),
375 true,
376 &manager,
377 )?,
378 Bound::Excluded(k) => find_iter_left::<K, V>(
379 manager.get_page(root)?,
380 None,
381 K::as_bytes(k.borrow()).as_ref(),
382 false,
383 &manager,
384 )?,
385 Bound::Unbounded => {
386 let state = find_iter_unbounded::<K, V>(
387 manager.get_page(root)?,
388 None,
389 false,
390 &manager,
391 )?;
392 (true, state)
393 }
394 };
395 let (include_right, right) = match query_range.end_bound() {
396 Bound::Included(k) => find_iter_right::<K, V>(
397 manager.get_page(root)?,
398 None,
399 K::as_bytes(k.borrow()).as_ref(),
400 true,
401 &manager,
402 )?,
403 Bound::Excluded(k) => find_iter_right::<K, V>(
404 manager.get_page(root)?,
405 None,
406 K::as_bytes(k.borrow()).as_ref(),
407 false,
408 &manager,
409 )?,
410 Bound::Unbounded => {
411 let state =
412 find_iter_unbounded::<K, V>(manager.get_page(root)?, None, true, &manager)?;
413 (true, state)
414 }
415 };
416 Ok(Self {
417 left,
418 right,
419 include_left,
420 include_right,
421 manager,
422 _key_type: Default::default(),
423 _value_type: Default::default(),
424 })
425 } else {
426 Ok(Self {
427 left: None,
428 right: None,
429 include_left: false,
430 include_right: false,
431 manager,
432 _key_type: Default::default(),
433 _value_type: Default::default(),
434 })
435 }
436 }
437}
438
439impl<K: Key, V: Value> Iterator for BtreeRangeIter<K, V> {
440 type Item = Result<EntryGuard<K, V>>;
441
442 fn next(&mut self) -> Option<Self::Item> {
443 if let (
444 Some(Leaf {
445 page: left_page,
446 entry: left_entry,
447 ..
448 }),
449 Some(Leaf {
450 page: right_page,
451 entry: right_entry,
452 ..
453 }),
454 ) = (&self.left, &self.right)
455 {
456 if left_page.get_page_number() == right_page.get_page_number()
457 && (left_entry > right_entry
458 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
459 {
460 return None;
461 }
462 }
463
464 loop {
465 if !self.include_left {
466 match self.left.take()?.next(false, &self.manager) {
467 Ok(left) => {
468 self.left = left;
469 }
470 Err(err) => {
471 return Some(Err(err));
472 }
473 }
474 }
475 self.left.as_ref()?;
477
478 if let (
479 Some(Leaf {
480 page: left_page,
481 entry: left_entry,
482 ..
483 }),
484 Some(Leaf {
485 page: right_page,
486 entry: right_entry,
487 ..
488 }),
489 ) = (&self.left, &self.right)
490 {
491 if left_page.get_page_number() == right_page.get_page_number()
492 && (left_entry > right_entry
493 || (left_entry == right_entry && !self.include_right))
494 {
495 return None;
496 }
497 }
498
499 self.include_left = false;
500 if self.left.as_ref().unwrap().get_entry::<K, V>().is_some() {
501 return self.left.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
502 }
503 }
504 }
505}
506
507impl<K: Key, V: Value> DoubleEndedIterator for BtreeRangeIter<K, V> {
508 fn next_back(&mut self) -> Option<Self::Item> {
509 if let (
510 Some(Leaf {
511 page: left_page,
512 entry: left_entry,
513 ..
514 }),
515 Some(Leaf {
516 page: right_page,
517 entry: right_entry,
518 ..
519 }),
520 ) = (&self.left, &self.right)
521 {
522 if left_page.get_page_number() == right_page.get_page_number()
523 && (left_entry > right_entry
524 || (left_entry == right_entry && (!self.include_left || !self.include_right)))
525 {
526 return None;
527 }
528 }
529
530 loop {
531 if !self.include_right {
532 match self.right.take()?.next(true, &self.manager) {
533 Ok(right) => {
534 self.right = right;
535 }
536 Err(err) => {
537 return Some(Err(err));
538 }
539 }
540 }
541 self.right.as_ref()?;
543
544 if let (
545 Some(Leaf {
546 page: left_page,
547 entry: left_entry,
548 ..
549 }),
550 Some(Leaf {
551 page: right_page,
552 entry: right_entry,
553 ..
554 }),
555 ) = (&self.left, &self.right)
556 {
557 if left_page.get_page_number() == right_page.get_page_number()
558 && (left_entry > right_entry
559 || (left_entry == right_entry && !self.include_left))
560 {
561 return None;
562 }
563 }
564
565 self.include_right = false;
566 if self.right.as_ref().unwrap().get_entry::<K, V>().is_some() {
567 return self.right.as_ref().map(|s| s.get_entry().unwrap()).map(Ok);
568 }
569 }
570 }
571}
572
573fn find_iter_unbounded<K: Key, V: Value>(
574 page: PageImpl,
575 mut parent: Option<Box<RangeIterState>>,
576 reverse: bool,
577 manager: &TransactionalMemory,
578) -> Result<Option<RangeIterState>> {
579 let node_mem = page.memory();
580 match node_mem[0] {
581 LEAF => {
582 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
583 let entry = if reverse { accessor.num_pairs() - 1 } else { 0 };
584 Ok(Some(Leaf {
585 page,
586 fixed_key_size: K::fixed_width(),
587 fixed_value_size: V::fixed_width(),
588 entry,
589 parent,
590 }))
591 }
592 BRANCH => {
593 let accessor = BranchAccessor::new(&page, K::fixed_width());
594 let child_index = if reverse {
595 accessor.count_children() - 1
596 } else {
597 0
598 };
599 let child_page_number = accessor.child_page(child_index).unwrap();
600 let child_page = manager.get_page(child_page_number)?;
601 let direction = if reverse { -1isize } else { 1 };
602 parent = Some(Box::new(Internal {
603 page,
604 fixed_key_size: K::fixed_width(),
605 fixed_value_size: V::fixed_width(),
606 child: (isize::try_from(child_index).unwrap() + direction)
607 .try_into()
608 .unwrap(),
609 parent,
610 }));
611 find_iter_unbounded::<K, V>(child_page, parent, reverse, manager)
612 }
613 _ => unreachable!(),
614 }
615}
616
617fn find_iter_left<K: Key, V: Value>(
620 page: PageImpl,
621 mut parent: Option<Box<RangeIterState>>,
622 query: &[u8],
623 include_query: bool,
624 manager: &TransactionalMemory,
625) -> Result<(bool, Option<RangeIterState>)> {
626 let node_mem = page.memory();
627 match node_mem[0] {
628 LEAF => {
629 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
630 let (mut position, found) = accessor.position::<K>(query);
631 let include = if position < accessor.num_pairs() {
632 include_query || !found
633 } else {
634 position -= 1;
636 false
638 };
639 let result = Leaf {
640 page,
641 fixed_key_size: K::fixed_width(),
642 fixed_value_size: V::fixed_width(),
643 entry: position,
644 parent,
645 };
646 Ok((include, Some(result)))
647 }
648 BRANCH => {
649 let accessor = BranchAccessor::new(&page, K::fixed_width());
650 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
651 let child_page = manager.get_page(child_page_number)?;
652 if child_index < accessor.count_children() - 1 {
653 parent = Some(Box::new(Internal {
654 page,
655 fixed_key_size: K::fixed_width(),
656 fixed_value_size: V::fixed_width(),
657 child: child_index + 1,
658 parent,
659 }));
660 }
661 find_iter_left::<K, V>(child_page, parent, query, include_query, manager)
662 }
663 _ => unreachable!(),
664 }
665}
666
667fn find_iter_right<K: Key, V: Value>(
668 page: PageImpl,
669 mut parent: Option<Box<RangeIterState>>,
670 query: &[u8],
671 include_query: bool,
672 manager: &TransactionalMemory,
673) -> Result<(bool, Option<RangeIterState>)> {
674 let node_mem = page.memory();
675 match node_mem[0] {
676 LEAF => {
677 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
678 let (mut position, found) = accessor.position::<K>(query);
679 let include = if position < accessor.num_pairs() {
680 include_query && found
681 } else {
682 position -= 1;
684 true
686 };
687 let result = Leaf {
688 page,
689 fixed_key_size: K::fixed_width(),
690 fixed_value_size: V::fixed_width(),
691 entry: position,
692 parent,
693 };
694 Ok((include, Some(result)))
695 }
696 BRANCH => {
697 let accessor = BranchAccessor::new(&page, K::fixed_width());
698 let (child_index, child_page_number) = accessor.child_for_key::<K>(query);
699 let child_page = manager.get_page(child_page_number)?;
700 if child_index > 0 && accessor.child_page(child_index - 1).is_some() {
701 parent = Some(Box::new(Internal {
702 page,
703 fixed_key_size: K::fixed_width(),
704 fixed_value_size: V::fixed_width(),
705 child: child_index - 1,
706 parent,
707 }));
708 }
709 find_iter_right::<K, V>(child_page, parent, query, include_query, manager)
710 }
711 _ => unreachable!(),
712 }
713}