1use crate::tree_store::btree_base::{
2 BRANCH, BranchAccessor, BranchBuilder, BranchMutator, Checksum, DEFERRED, LEAF, LeafAccessor,
3 LeafBuilder, LeafMutator,
4};
5use crate::tree_store::btree_mutator::DeletionResult::{
6 DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree,
7};
8use crate::tree_store::page_store::{Page, PageImpl, PageMut};
9use crate::tree_store::{
10 AccessGuardMut, BtreeHeader, PageNumber, PageTrackerPolicy, RawLeafBuilder, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{AccessGuard, Result};
14use std::cmp::{max, min};
15use std::marker::PhantomData;
16use std::sync::{Arc, Mutex};
17
18#[derive(Debug)]
20enum DeletionResult {
21 Subtree(PageNumber, Checksum),
23 DeletedLeaf,
25 PartialLeaf {
27 page: Arc<[u8]>,
28 deleted_pair: usize,
29 },
30 PartialBranch(PageNumber, Checksum),
32 DeletedBranch(PageNumber, Checksum),
34}
35
36struct InsertionResult<'a, V: Value + 'static> {
37 new_root: PageNumber,
39 root_checksum: Checksum,
41 additional_sibling: Option<(Vec<u8>, PageNumber, Checksum)>,
43 inserted_value: AccessGuardMut<'a, V>,
45 old_value: Option<AccessGuard<'a, V>>,
47}
48
49pub(crate) struct MutateHelper<'a, 'b, K: Key, V: Value> {
50 root: &'b mut Option<BtreeHeader>,
51 modify_uncommitted: bool,
52 mem: Arc<TransactionalMemory>,
53 freed: &'b mut Vec<PageNumber>,
54 allocated: Arc<Mutex<PageTrackerPolicy>>,
55 _key_type: PhantomData<K>,
56 _value_type: PhantomData<V>,
57 _lifetime: PhantomData<&'a ()>,
58}
59
60impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
61 pub(crate) fn new(
62 root: &'b mut Option<BtreeHeader>,
63 mem: Arc<TransactionalMemory>,
64 freed: &'b mut Vec<PageNumber>,
65 allocated: Arc<Mutex<PageTrackerPolicy>>,
66 ) -> Self {
67 Self {
68 root,
69 modify_uncommitted: true,
70 mem,
71 freed,
72 allocated,
73 _key_type: Default::default(),
74 _value_type: Default::default(),
75 _lifetime: Default::default(),
76 }
77 }
78
79 pub(crate) fn new_do_not_modify(
82 root: &'b mut Option<BtreeHeader>,
83 mem: Arc<TransactionalMemory>,
84 freed: &'b mut Vec<PageNumber>,
85 allocated: Arc<Mutex<PageTrackerPolicy>>,
86 ) -> Self {
87 Self {
88 root,
89 modify_uncommitted: false,
90 mem,
91 freed,
92 allocated,
93 _key_type: Default::default(),
94 _value_type: Default::default(),
95 _lifetime: Default::default(),
96 }
97 }
98
99 fn conditional_free(&mut self, page_number: PageNumber) {
100 if self.modify_uncommitted {
101 let mut allocated = self.allocated.lock().unwrap();
102 if !self.mem.free_if_uncommitted(page_number, &mut allocated) {
103 self.freed.push(page_number);
104 }
105 } else {
106 self.freed.push(page_number);
107 }
108 }
109
110 pub(crate) fn delete(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'a, V>>> {
111 if let Some(BtreeHeader {
112 root: p,
113 checksum,
114 length,
115 }) = *self.root
116 {
117 let (deletion_result, found) =
118 self.delete_helper(self.mem.get_page(p)?, checksum, K::as_bytes(key).as_ref())?;
119 let new_length = if found.is_some() { length - 1 } else { length };
120 let new_root = match deletion_result {
121 Subtree(page, checksum) => Some(BtreeHeader::new(page, checksum, new_length)),
122 DeletedLeaf => None,
123 PartialLeaf { page, deleted_pair } => {
124 let accessor = LeafAccessor::new(&page, K::fixed_width(), V::fixed_width());
125 let mut builder = LeafBuilder::new(
126 &self.mem,
127 &self.allocated,
128 accessor.num_pairs() - 1,
129 K::fixed_width(),
130 V::fixed_width(),
131 );
132 builder.push_all_except(&accessor, Some(deleted_pair));
133 let page = builder.build()?;
134 assert_eq!(new_length, accessor.num_pairs() as u64 - 1);
135 Some(BtreeHeader::new(
136 page.get_page_number(),
137 DEFERRED,
138 new_length,
139 ))
140 }
141 PartialBranch(page_number, checksum) => {
142 Some(BtreeHeader::new(page_number, checksum, new_length))
143 }
144 DeletedBranch(remaining_child, checksum) => {
145 Some(BtreeHeader::new(remaining_child, checksum, new_length))
146 }
147 };
148 *self.root = new_root;
149 Ok(found)
150 } else {
151 Ok(None)
152 }
153 }
154
155 #[allow(clippy::type_complexity)]
156 pub(crate) fn insert(
157 &mut self,
158 key: &K::SelfType<'_>,
159 value: &V::SelfType<'_>,
160 ) -> Result<(Option<AccessGuard<'a, V>>, AccessGuardMut<'a, V>)> {
161 let (new_root, old_value, guard) = if let Some(BtreeHeader {
162 root: p,
163 checksum,
164 length,
165 }) = *self.root
166 {
167 let result = self.insert_helper(
168 self.mem.get_page(p)?,
169 checksum,
170 K::as_bytes(key).as_ref(),
171 V::as_bytes(value).as_ref(),
172 )?;
173
174 let new_length = if result.old_value.is_some() {
175 length
176 } else {
177 length + 1
178 };
179
180 let new_root = if let Some((key, page2, page2_checksum)) = result.additional_sibling {
181 let mut builder =
182 BranchBuilder::new(&self.mem, &self.allocated, 2, K::fixed_width());
183 builder.push_child(result.new_root, result.root_checksum);
184 builder.push_key(&key);
185 builder.push_child(page2, page2_checksum);
186 let new_page = builder.build()?;
187 BtreeHeader::new(new_page.get_page_number(), DEFERRED, new_length)
188 } else {
189 BtreeHeader::new(result.new_root, result.root_checksum, new_length)
190 };
191 (new_root, result.old_value, result.inserted_value)
192 } else {
193 let key_bytes = K::as_bytes(key);
194 let value_bytes = V::as_bytes(value);
195 let key_bytes = key_bytes.as_ref();
196 let value_bytes = value_bytes.as_ref();
197 let mut builder = LeafBuilder::new(
198 &self.mem,
199 &self.allocated,
200 1,
201 K::fixed_width(),
202 V::fixed_width(),
203 );
204 builder.push(key_bytes, value_bytes);
205 let page = builder.build()?;
206
207 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
208 let offset = accessor.offset_of_first_value();
209 let page_num = page.get_page_number();
210 let guard = AccessGuardMut::new(page, offset, value_bytes.len());
211
212 (BtreeHeader::new(page_num, DEFERRED, 1), None, guard)
213 };
214 *self.root = Some(new_root);
215 Ok((old_value, guard))
216 }
217
218 fn insert_helper(
219 &mut self,
220 page: PageImpl,
221 page_checksum: Checksum,
222 key: &[u8],
223 value: &[u8],
224 ) -> Result<InsertionResult<'a, V>> {
225 let node_mem = page.memory();
226 Ok(match node_mem[0] {
227 LEAF => {
228 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
229 let (position, found) = accessor.position::<K>(key);
230
231 let single_large_value = accessor.num_pairs() == 1
233 && accessor.total_length() >= self.mem.get_page_size();
234 if !found && single_large_value {
235 let mut builder = LeafBuilder::new(
236 &self.mem,
237 &self.allocated,
238 1,
239 K::fixed_width(),
240 V::fixed_width(),
241 );
242 builder.push(key, value);
243 let new_page = builder.build()?;
244 let new_page_number = new_page.get_page_number();
245 let new_page_accessor =
246 LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
247 let offset = new_page_accessor.offset_of_first_value();
248 let guard = AccessGuardMut::new(new_page, offset, value.len());
249 return if position == 0 {
250 Ok(InsertionResult {
251 new_root: new_page_number,
252 root_checksum: DEFERRED,
253 additional_sibling: Some((
254 key.to_vec(),
255 page.get_page_number(),
256 page_checksum,
257 )),
258 inserted_value: guard,
259 old_value: None,
260 })
261 } else {
262 let split_key = accessor.last_entry().key().to_vec();
263 Ok(InsertionResult {
264 new_root: page.get_page_number(),
265 root_checksum: page_checksum,
266 additional_sibling: Some((split_key, new_page_number, DEFERRED)),
267 inserted_value: guard,
268 old_value: None,
269 })
270 };
271 }
272
273 if self.mem.uncommitted(page.get_page_number())
275 && self.modify_uncommitted
276 && LeafMutator::sufficient_insert_inplace_space(
277 &page,
278 position,
279 found,
280 K::fixed_width(),
281 V::fixed_width(),
282 key,
283 value,
284 )
285 {
286 let page_number = page.get_page_number();
287 let existing_value = if found {
288 let copied_value = accessor.entry(position).unwrap().value().to_vec();
289 Some(AccessGuard::with_owned_value(copied_value))
290 } else {
291 None
292 };
293 drop(page);
294 let mut page_mut = self.mem.get_page_mut(page_number)?;
295 let mut mutator =
296 LeafMutator::new(&mut page_mut, K::fixed_width(), V::fixed_width());
297 mutator.insert(position, found, key, value);
298 let new_page_accessor =
299 LeafAccessor::new(page_mut.memory(), K::fixed_width(), V::fixed_width());
300 let offset = new_page_accessor.offset_of_value(position).unwrap();
301 let guard = AccessGuardMut::new(page_mut, offset, value.len());
302 return Ok(InsertionResult {
303 new_root: page_number,
304 root_checksum: DEFERRED,
305 additional_sibling: None,
306 inserted_value: guard,
307 old_value: existing_value,
308 });
309 }
310
311 let mut builder = LeafBuilder::new(
312 &self.mem,
313 &self.allocated,
314 accessor.num_pairs() + 1,
315 K::fixed_width(),
316 V::fixed_width(),
317 );
318 for i in 0..accessor.num_pairs() {
319 if i == position {
320 builder.push(key, value);
321 }
322 if !found || i != position {
323 let entry = accessor.entry(i).unwrap();
324 builder.push(entry.key(), entry.value());
325 }
326 }
327 if accessor.num_pairs() == position {
328 builder.push(key, value);
329 }
330 if !builder.should_split() {
331 let new_page = builder.build()?;
332
333 let page_number = page.get_page_number();
334 let existing_value = if found {
335 let (start, end) = accessor.value_range(position).unwrap();
336 if self.modify_uncommitted && self.mem.uncommitted(page_number) {
337 let arc = page.to_arc();
338 drop(page);
339 let mut allocated = self.allocated.lock().unwrap();
340 self.mem.free(page_number, &mut allocated);
341 Some(AccessGuard::with_arc_page(arc, start..end))
342 } else {
343 self.freed.push(page_number);
344 Some(AccessGuard::with_page(page, start..end))
345 }
346 } else {
347 drop(page);
348 self.conditional_free(page_number);
349 None
350 };
351
352 let new_page_number = new_page.get_page_number();
353 let accessor =
354 LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
355 let offset = accessor.offset_of_value(position).unwrap();
356 let guard = AccessGuardMut::new(new_page, offset, value.len());
357
358 InsertionResult {
359 new_root: new_page_number,
360 root_checksum: DEFERRED,
361 additional_sibling: None,
362 inserted_value: guard,
363 old_value: existing_value,
364 }
365 } else {
366 let (new_page1, split_key, new_page2) = builder.build_split()?;
367 let split_key = split_key.to_vec();
368 let page_number = page.get_page_number();
369 let existing_value = if found {
370 let (start, end) = accessor.value_range(position).unwrap();
371 if self.modify_uncommitted && self.mem.uncommitted(page_number) {
372 let arc = page.to_arc();
373 drop(page);
374 let mut allocated = self.allocated.lock().unwrap();
375 self.mem.free(page_number, &mut allocated);
376 Some(AccessGuard::with_arc_page(arc, start..end))
377 } else {
378 self.freed.push(page_number);
379 Some(AccessGuard::with_page(page, start..end))
380 }
381 } else {
382 drop(page);
383 self.conditional_free(page_number);
384 None
385 };
386
387 let new_page_number = new_page1.get_page_number();
388 let new_page_number2 = new_page2.get_page_number();
389 let accessor =
390 LeafAccessor::new(new_page1.memory(), K::fixed_width(), V::fixed_width());
391 let division = accessor.num_pairs();
392 let guard = if position < division {
393 let accessor = LeafAccessor::new(
394 new_page1.memory(),
395 K::fixed_width(),
396 V::fixed_width(),
397 );
398 let offset = accessor.offset_of_value(position).unwrap();
399 AccessGuardMut::new(new_page1, offset, value.len())
400 } else {
401 let accessor = LeafAccessor::new(
402 new_page2.memory(),
403 K::fixed_width(),
404 V::fixed_width(),
405 );
406 let offset = accessor.offset_of_value(position - division).unwrap();
407 AccessGuardMut::new(new_page2, offset, value.len())
408 };
409
410 InsertionResult {
411 new_root: new_page_number,
412 root_checksum: DEFERRED,
413 additional_sibling: Some((split_key, new_page_number2, DEFERRED)),
414 inserted_value: guard,
415 old_value: existing_value,
416 }
417 }
418 }
419 BRANCH => {
420 let accessor = BranchAccessor::new(&page, K::fixed_width());
421 let (child_index, child_page) = accessor.child_for_key::<K>(key);
422 let child_checksum = accessor.child_checksum(child_index).unwrap();
423 let sub_result =
424 self.insert_helper(self.mem.get_page(child_page)?, child_checksum, key, value)?;
425
426 if sub_result.additional_sibling.is_none()
427 && self.modify_uncommitted
428 && self.mem.uncommitted(page.get_page_number())
429 {
430 let page_number = page.get_page_number();
431 drop(page);
432 let mut mutpage = self.mem.get_page_mut(page_number)?;
433 let mut mutator = BranchMutator::new(&mut mutpage);
434 mutator.write_child_page(
435 child_index,
436 sub_result.new_root,
437 sub_result.root_checksum,
438 );
439 return Ok(InsertionResult {
440 new_root: mutpage.get_page_number(),
441 root_checksum: DEFERRED,
442 additional_sibling: None,
443 inserted_value: sub_result.inserted_value,
444 old_value: sub_result.old_value,
445 });
446 }
447
448 let mut builder = BranchBuilder::new(
450 &self.mem,
451 &self.allocated,
452 accessor.count_children() + 1,
453 K::fixed_width(),
454 );
455 if child_index == 0 {
456 builder.push_child(sub_result.new_root, sub_result.root_checksum);
457 if let Some((ref index_key2, page2, page2_checksum)) =
458 sub_result.additional_sibling
459 {
460 builder.push_key(index_key2);
461 builder.push_child(page2, page2_checksum);
462 }
463 } else {
464 builder.push_child(
465 accessor.child_page(0).unwrap(),
466 accessor.child_checksum(0).unwrap(),
467 );
468 }
469 for i in 1..accessor.count_children() {
470 if let Some(key) = accessor.key(i - 1) {
471 builder.push_key(key);
472 if i == child_index {
473 builder.push_child(sub_result.new_root, sub_result.root_checksum);
474 if let Some((ref index_key2, page2, page2_checksum)) =
475 sub_result.additional_sibling
476 {
477 builder.push_key(index_key2);
478 builder.push_child(page2, page2_checksum);
479 }
480 } else {
481 builder.push_child(
482 accessor.child_page(i).unwrap(),
483 accessor.child_checksum(i).unwrap(),
484 );
485 }
486 } else {
487 unreachable!();
488 }
489 }
490
491 let result = if builder.should_split() {
492 let (new_page1, split_key, new_page2) = builder.build_split()?;
493 InsertionResult {
494 new_root: new_page1.get_page_number(),
495 root_checksum: DEFERRED,
496 additional_sibling: Some((
497 split_key.to_vec(),
498 new_page2.get_page_number(),
499 DEFERRED,
500 )),
501 inserted_value: sub_result.inserted_value,
502 old_value: sub_result.old_value,
503 }
504 } else {
505 let new_page = builder.build()?;
506 InsertionResult {
507 new_root: new_page.get_page_number(),
508 root_checksum: DEFERRED,
509 additional_sibling: None,
510 inserted_value: sub_result.inserted_value,
511 old_value: sub_result.old_value,
512 }
513 };
514 let page_number = page.get_page_number();
516 drop(page);
517 self.conditional_free(page_number);
518
519 result
520 }
521 _ => unreachable!(),
522 })
523 }
524
525 pub(crate) fn insert_inplace(
526 &mut self,
527 key: &K::SelfType<'_>,
528 value: &V::SelfType<'_>,
529 ) -> Result<()> {
530 assert!(self.modify_uncommitted);
531 let header = self.root.expect("Key not found (tree is empty)");
532 self.insert_inplace_helper(
533 self.mem.get_page_mut(header.root)?,
534 K::as_bytes(key).as_ref(),
535 V::as_bytes(value).as_ref(),
536 )?;
537 *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length));
538 Ok(())
539 }
540
541 fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> {
542 assert!(self.mem.uncommitted(page.get_page_number()));
543
544 let node_mem = page.memory();
545 match node_mem[0] {
546 LEAF => {
547 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
548 let (position, found) = accessor.position::<K>(key);
549 assert!(found);
550 let old_len = accessor.entry(position).unwrap().value().len();
551 assert!(value.len() <= old_len);
552 let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width());
553 mutator.insert(position, true, key, value);
554 }
555 BRANCH => {
556 let accessor = BranchAccessor::new(&page, K::fixed_width());
557 let (child_index, child_page) = accessor.child_for_key::<K>(key);
558 self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?;
559 let mut mutator = BranchMutator::new(&mut page);
560 mutator.write_child_page(child_index, child_page, DEFERRED);
561 }
562 _ => unreachable!(),
563 }
564
565 Ok(())
566 }
567
568 fn delete_leaf_helper(
569 &mut self,
570 page: PageImpl,
571 checksum: Checksum,
572 key: &[u8],
573 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
574 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
575 let (position, found) = accessor.position::<K>(key);
576 if !found {
577 return Ok((Subtree(page.get_page_number(), checksum), None));
578 }
579 let new_kv_bytes = accessor.length_of_pairs(0, accessor.num_pairs())
580 - accessor.length_of_pairs(position, position + 1);
581 let new_required_bytes = RawLeafBuilder::required_bytes(
582 accessor.num_pairs() - 1,
583 new_kv_bytes,
584 K::fixed_width(),
585 V::fixed_width(),
586 );
587 let uncommitted = self.mem.uncommitted(page.get_page_number());
588
589 if uncommitted
591 && self.modify_uncommitted
592 && new_required_bytes >= self.mem.get_page_size() / 2
593 && accessor.num_pairs() > 1
594 {
595 let (start, end) = accessor.value_range(position).unwrap();
596 let page_number = page.get_page_number();
597 drop(page);
598 let page_mut = self.mem.get_page_mut(page_number)?;
599
600 let guard = AccessGuard::remove_on_drop(
601 page_mut,
602 start,
603 end - start,
604 position,
605 K::fixed_width(),
606 );
607 return Ok((Subtree(page_number, DEFERRED), Some(guard)));
608 }
609
610 let result = if accessor.num_pairs() == 1 {
611 DeletedLeaf
612 } else if new_required_bytes < self.mem.get_page_size() / 3 {
613 PartialLeaf {
616 page: page.to_arc(),
617 deleted_pair: position,
618 }
619 } else {
620 let mut builder = LeafBuilder::new(
621 &self.mem,
622 &self.allocated,
623 accessor.num_pairs() - 1,
624 K::fixed_width(),
625 V::fixed_width(),
626 );
627 for i in 0..accessor.num_pairs() {
628 if i == position {
629 continue;
630 }
631 let entry = accessor.entry(i).unwrap();
632 builder.push(entry.key(), entry.value());
633 }
634 let new_page = builder.build()?;
635 Subtree(new_page.get_page_number(), DEFERRED)
636 };
637 let (start, end) = accessor.value_range(position).unwrap();
638 let guard = if uncommitted && self.modify_uncommitted {
639 let page_number = page.get_page_number();
640 let arc = page.to_arc();
641 drop(page);
642 let mut allocated = self.allocated.lock().unwrap();
643 self.mem.free(page_number, &mut allocated);
644 Some(AccessGuard::with_arc_page(arc, start..end))
645 } else {
646 self.freed.push(page.get_page_number());
649 Some(AccessGuard::with_page(page, start..end))
650 };
651 Ok((result, guard))
652 }
653
654 fn finalize_branch_builder(
655 builder: BranchBuilder<'_, '_>,
656 page_size: usize,
657 ) -> Result<DeletionResult> {
658 let result = if let Some((only_child, checksum)) = builder.to_single_child() {
659 DeletedBranch(only_child, checksum)
660 } else {
661 let new_page = builder.build()?;
664 let accessor = BranchAccessor::new(&new_page, K::fixed_width());
665 if accessor.total_length() < page_size / 3 {
668 PartialBranch(new_page.get_page_number(), DEFERRED)
669 } else {
670 Subtree(new_page.get_page_number(), DEFERRED)
671 }
672 };
673 Ok(result)
674 }
675
676 fn delete_branch_helper(
677 &mut self,
678 page: PageImpl,
679 checksum: Checksum,
680 key: &[u8],
681 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
682 let accessor = BranchAccessor::new(&page, K::fixed_width());
683 let original_page_number = page.get_page_number();
684 let (child_index, child_page_number) = accessor.child_for_key::<K>(key);
685 let child_checksum = accessor.child_checksum(child_index).unwrap();
686 let (result, found) =
687 self.delete_helper(self.mem.get_page(child_page_number)?, child_checksum, key)?;
688 if found.is_none() {
689 return Ok((Subtree(original_page_number, checksum), None));
690 }
691 if let Subtree(new_child, new_child_checksum) = result {
692 let result_page =
693 if self.mem.uncommitted(original_page_number) && self.modify_uncommitted {
694 drop(page);
695 let mut mutpage = self.mem.get_page_mut(original_page_number)?;
696 let mut mutator = BranchMutator::new(&mut mutpage);
697 mutator.write_child_page(child_index, new_child, new_child_checksum);
698 original_page_number
699 } else {
700 let mut builder = BranchBuilder::new(
701 &self.mem,
702 &self.allocated,
703 accessor.count_children(),
704 K::fixed_width(),
705 );
706 builder.push_all(&accessor);
707 builder.replace_child(child_index, new_child, new_child_checksum);
708 let new_page = builder.build()?;
709 self.conditional_free(original_page_number);
710 new_page.get_page_number()
711 };
712 return Ok((Subtree(result_page, DEFERRED), found));
713 }
714
715 let mut builder = BranchBuilder::new(
717 &self.mem,
718 &self.allocated,
719 accessor.count_children(),
720 K::fixed_width(),
721 );
722
723 let final_result = match result {
724 Subtree(_, _) => {
725 unreachable!();
727 }
728 DeletedLeaf => {
729 for i in 0..accessor.count_children() {
730 if i == child_index {
731 continue;
732 }
733 builder.push_child(
734 accessor.child_page(i).unwrap(),
735 accessor.child_checksum(i).unwrap(),
736 );
737 }
738 let end = if child_index == accessor.count_children() - 1 {
739 accessor.count_children() - 2
741 } else {
742 accessor.count_children() - 1
743 };
744 for i in 0..end {
745 if i == child_index {
746 continue;
747 }
748 builder.push_key(accessor.key(i).unwrap());
749 }
750 Self::finalize_branch_builder(builder, self.mem.get_page_size())?
751 }
752 PartialLeaf {
753 page: partial_child_page,
754 deleted_pair,
755 } => {
756 let partial_child_accessor =
757 LeafAccessor::new(&partial_child_page, K::fixed_width(), V::fixed_width());
758 debug_assert!(partial_child_accessor.num_pairs() > 1);
759
760 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
761 debug_assert!(merge_with < accessor.count_children());
762 let merge_with_page = self
763 .mem
764 .get_page(accessor.child_page(merge_with).unwrap())?;
765 let merge_with_accessor =
766 LeafAccessor::new(merge_with_page.memory(), K::fixed_width(), V::fixed_width());
767
768 let single_large_value = merge_with_accessor.num_pairs() == 1
769 && merge_with_accessor.total_length() >= self.mem.get_page_size();
770 if single_large_value {
772 let mut child_builder = LeafBuilder::new(
773 &self.mem,
774 &self.allocated,
775 partial_child_accessor.num_pairs() - 1,
776 K::fixed_width(),
777 V::fixed_width(),
778 );
779 child_builder.push_all_except(&partial_child_accessor, Some(deleted_pair));
780 let new_page = child_builder.build()?;
781 builder.push_all(&accessor);
782 builder.replace_child(child_index, new_page.get_page_number(), DEFERRED);
783
784 let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
785
786 drop(page);
787 self.conditional_free(original_page_number);
788 return Ok((result, found));
792 }
793
794 for i in 0..accessor.count_children() {
795 if i == child_index {
796 continue;
797 }
798 let page_number = accessor.child_page(i).unwrap();
799 let page_checksum = accessor.child_checksum(i).unwrap();
800 if i == merge_with {
801 let mut child_builder = LeafBuilder::new(
802 &self.mem,
803 &self.allocated,
804 partial_child_accessor.num_pairs() - 1
805 + merge_with_accessor.num_pairs(),
806 K::fixed_width(),
807 V::fixed_width(),
808 );
809 if child_index < merge_with {
810 child_builder
811 .push_all_except(&partial_child_accessor, Some(deleted_pair));
812 }
813 child_builder.push_all_except(&merge_with_accessor, None);
814 if child_index > merge_with {
815 child_builder
816 .push_all_except(&partial_child_accessor, Some(deleted_pair));
817 }
818 if child_builder.should_split() {
819 let (new_page1, split_key, new_page2) = child_builder.build_split()?;
820 builder.push_key(split_key);
821 builder.push_child(new_page1.get_page_number(), DEFERRED);
822 builder.push_child(new_page2.get_page_number(), DEFERRED);
823 } else {
824 let new_page = child_builder.build()?;
825 builder.push_child(new_page.get_page_number(), DEFERRED);
826 }
827
828 let merged_key_index = max(child_index, merge_with);
829 if merged_key_index < accessor.count_children() - 1 {
830 builder.push_key(accessor.key(merged_key_index).unwrap());
831 }
832 } else {
833 builder.push_child(page_number, page_checksum);
834 if i < accessor.count_children() - 1 {
835 builder.push_key(accessor.key(i).unwrap());
836 }
837 }
838 }
839
840 let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
841
842 let page_number = merge_with_page.get_page_number();
843 drop(merge_with_page);
844 self.conditional_free(page_number);
845 result
849 }
850 DeletedBranch(only_grandchild, grandchild_checksum) => {
851 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
852 let merge_with_page = self
853 .mem
854 .get_page(accessor.child_page(merge_with).unwrap())?;
855 let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
856 debug_assert!(merge_with < accessor.count_children());
857 for i in 0..accessor.count_children() {
858 if i == child_index {
859 continue;
860 }
861 let page_number = accessor.child_page(i).unwrap();
862 let page_checksum = accessor.child_checksum(i).unwrap();
863 if i == merge_with {
864 let mut child_builder = BranchBuilder::new(
865 &self.mem,
866 &self.allocated,
867 merge_with_accessor.count_children() + 1,
868 K::fixed_width(),
869 );
870 let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
871 if child_index < merge_with {
872 child_builder.push_child(only_grandchild, grandchild_checksum);
873 child_builder.push_key(separator_key);
874 }
875 child_builder.push_all(&merge_with_accessor);
876 if child_index > merge_with {
877 child_builder.push_key(separator_key);
878 child_builder.push_child(only_grandchild, grandchild_checksum);
879 }
880 if child_builder.should_split() {
881 let (new_page1, separator, new_page2) = child_builder.build_split()?;
882 builder.push_child(new_page1.get_page_number(), DEFERRED);
883 builder.push_key(separator);
884 builder.push_child(new_page2.get_page_number(), DEFERRED);
885 } else {
886 let new_page = child_builder.build()?;
887 builder.push_child(new_page.get_page_number(), DEFERRED);
888 }
889
890 let merged_key_index = max(child_index, merge_with);
891 if merged_key_index < accessor.count_children() - 1 {
892 builder.push_key(accessor.key(merged_key_index).unwrap());
893 }
894 } else {
895 builder.push_child(page_number, page_checksum);
896 if i < accessor.count_children() - 1 {
897 builder.push_key(accessor.key(i).unwrap());
898 }
899 }
900 }
901 let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
902
903 let page_number = merge_with_page.get_page_number();
904 drop(merge_with_page);
905 self.conditional_free(page_number);
906
907 result
908 }
909 PartialBranch(partial_child, ..) => {
910 let partial_child_page = self.mem.get_page(partial_child)?;
911 let partial_child_accessor =
912 BranchAccessor::new(&partial_child_page, K::fixed_width());
913 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
914 let merge_with_page = self
915 .mem
916 .get_page(accessor.child_page(merge_with).unwrap())?;
917 let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
918 debug_assert!(merge_with < accessor.count_children());
919 for i in 0..accessor.count_children() {
920 if i == child_index {
921 continue;
922 }
923 let page_number = accessor.child_page(i).unwrap();
924 let page_checksum = accessor.child_checksum(i).unwrap();
925 if i == merge_with {
926 let mut child_builder = BranchBuilder::new(
927 &self.mem,
928 &self.allocated,
929 merge_with_accessor.count_children()
930 + partial_child_accessor.count_children(),
931 K::fixed_width(),
932 );
933 let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
934 if child_index < merge_with {
935 child_builder.push_all(&partial_child_accessor);
936 child_builder.push_key(separator_key);
937 }
938 child_builder.push_all(&merge_with_accessor);
939 if child_index > merge_with {
940 child_builder.push_key(separator_key);
941 child_builder.push_all(&partial_child_accessor);
942 }
943 if child_builder.should_split() {
944 let (new_page1, separator, new_page2) = child_builder.build_split()?;
945 builder.push_child(new_page1.get_page_number(), DEFERRED);
946 builder.push_key(separator);
947 builder.push_child(new_page2.get_page_number(), DEFERRED);
948 } else {
949 let new_page = child_builder.build()?;
950 builder.push_child(new_page.get_page_number(), DEFERRED);
951 }
952
953 let merged_key_index = max(child_index, merge_with);
954 if merged_key_index < accessor.count_children() - 1 {
955 builder.push_key(accessor.key(merged_key_index).unwrap());
956 }
957 } else {
958 builder.push_child(page_number, page_checksum);
959 if i < accessor.count_children() - 1 {
960 builder.push_key(accessor.key(i).unwrap());
961 }
962 }
963 }
964 let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
965
966 let page_number = merge_with_page.get_page_number();
967 drop(merge_with_page);
968 self.conditional_free(page_number);
969 drop(partial_child_page);
970 self.conditional_free(partial_child);
971
972 result
973 }
974 };
975
976 drop(page);
977 self.conditional_free(original_page_number);
978
979 Ok((final_result, found))
980 }
981
982 fn delete_helper(
985 &mut self,
986 page: PageImpl,
987 checksum: Checksum,
988 key: &[u8],
989 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
990 let node_mem = page.memory();
991 match node_mem[0] {
992 LEAF => self.delete_leaf_helper(page, checksum, key),
993 BRANCH => self.delete_branch_helper(page, checksum, key),
994 _ => unreachable!(),
995 }
996 }
997}