1use crate::tree_store::btree_base::{
2 BranchAccessor, BranchBuilder, BranchMutator, Checksum, LeafAccessor, LeafBuilder, LeafMutator,
3 BRANCH, DEFERRED, LEAF,
4};
5use crate::tree_store::btree_mutator::DeletionResult::{
6 DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree,
7};
8use crate::tree_store::page_store::{Page, PageImpl, PageMut};
9use crate::tree_store::{
10 AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{AccessGuard, Result};
14use std::cmp::{max, min};
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18#[derive(Debug)]
20enum DeletionResult {
21 Subtree(PageNumber, Checksum),
23 DeletedLeaf,
25 PartialLeaf {
27 page: Arc<[u8]>,
28 deleted_pair: usize,
29 },
30 PartialBranch(PageNumber, Checksum),
32 DeletedBranch(PageNumber, Checksum),
34}
35
36struct InsertionResult<'a, V: Value + 'static> {
37 new_root: PageNumber,
39 root_checksum: Checksum,
41 additional_sibling: Option<(Vec<u8>, PageNumber, Checksum)>,
43 inserted_value: AccessGuardMut<'a, V>,
45 old_value: Option<AccessGuard<'a, V>>,
47}
48
49pub(crate) struct MutateHelper<'a, 'b, K: Key, V: Value> {
50 root: &'b mut Option<BtreeHeader>,
51 modify_uncommitted: bool,
52 mem: Arc<TransactionalMemory>,
53 freed: &'b mut Vec<PageNumber>,
54 _key_type: PhantomData<K>,
55 _value_type: PhantomData<V>,
56 _lifetime: PhantomData<&'a ()>,
57}
58
59impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
60 pub(crate) fn new(
61 root: &'b mut Option<BtreeHeader>,
62 mem: Arc<TransactionalMemory>,
63 freed: &'b mut Vec<PageNumber>,
64 ) -> Self {
65 Self {
66 root,
67 modify_uncommitted: true,
68 mem,
69 freed,
70 _key_type: Default::default(),
71 _value_type: Default::default(),
72 _lifetime: Default::default(),
73 }
74 }
75
76 pub(crate) fn new_do_not_modify(
79 root: &'b mut Option<BtreeHeader>,
80 mem: Arc<TransactionalMemory>,
81 freed: &'b mut Vec<PageNumber>,
82 ) -> Self {
83 Self {
84 root,
85 modify_uncommitted: false,
86 mem,
87 freed,
88 _key_type: Default::default(),
89 _value_type: Default::default(),
90 _lifetime: Default::default(),
91 }
92 }
93
94 fn conditional_free(&mut self, page_number: PageNumber) {
95 if self.modify_uncommitted {
96 if !self.mem.free_if_uncommitted(page_number) {
97 self.freed.push(page_number);
98 }
99 } else {
100 self.freed.push(page_number);
101 }
102 }
103
104 pub(crate) fn delete(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'a, V>>> {
105 if let Some(BtreeHeader {
106 root: p,
107 checksum,
108 length,
109 }) = *self.root
110 {
111 let (deletion_result, found) =
112 self.delete_helper(self.mem.get_page(p)?, checksum, K::as_bytes(key).as_ref())?;
113 let new_length = if found.is_some() { length - 1 } else { length };
114 let new_root = match deletion_result {
115 Subtree(page, checksum) => Some(BtreeHeader::new(page, checksum, new_length)),
116 DeletedLeaf => None,
117 PartialLeaf { page, deleted_pair } => {
118 let accessor = LeafAccessor::new(&page, K::fixed_width(), V::fixed_width());
119 let mut builder = LeafBuilder::new(
120 &self.mem,
121 accessor.num_pairs() - 1,
122 K::fixed_width(),
123 V::fixed_width(),
124 );
125 builder.push_all_except(&accessor, Some(deleted_pair));
126 let page = builder.build()?;
127 assert_eq!(new_length, accessor.num_pairs() as u64 - 1);
128 Some(BtreeHeader::new(
129 page.get_page_number(),
130 DEFERRED,
131 new_length,
132 ))
133 }
134 PartialBranch(page_number, checksum) => {
135 Some(BtreeHeader::new(page_number, checksum, new_length))
136 }
137 DeletedBranch(remaining_child, checksum) => {
138 Some(BtreeHeader::new(remaining_child, checksum, new_length))
139 }
140 };
141 *self.root = new_root;
142 Ok(found)
143 } else {
144 Ok(None)
145 }
146 }
147
148 #[allow(clippy::type_complexity)]
149 pub(crate) fn insert(
150 &mut self,
151 key: &K::SelfType<'_>,
152 value: &V::SelfType<'_>,
153 ) -> Result<(Option<AccessGuard<'a, V>>, AccessGuardMut<'a, V>)> {
154 let (new_root, old_value, guard) = if let Some(BtreeHeader {
155 root: p,
156 checksum,
157 length,
158 }) = *self.root
159 {
160 let result = self.insert_helper(
161 self.mem.get_page(p)?,
162 checksum,
163 K::as_bytes(key).as_ref(),
164 V::as_bytes(value).as_ref(),
165 )?;
166
167 let new_length = if result.old_value.is_some() {
168 length
169 } else {
170 length + 1
171 };
172
173 let new_root = if let Some((key, page2, page2_checksum)) = result.additional_sibling {
174 let mut builder = BranchBuilder::new(&self.mem, 2, K::fixed_width());
175 builder.push_child(result.new_root, result.root_checksum);
176 builder.push_key(&key);
177 builder.push_child(page2, page2_checksum);
178 let new_page = builder.build()?;
179 BtreeHeader::new(new_page.get_page_number(), DEFERRED, new_length)
180 } else {
181 BtreeHeader::new(result.new_root, result.root_checksum, new_length)
182 };
183 (new_root, result.old_value, result.inserted_value)
184 } else {
185 let key_bytes = K::as_bytes(key);
186 let value_bytes = V::as_bytes(value);
187 let key_bytes = key_bytes.as_ref();
188 let value_bytes = value_bytes.as_ref();
189 let mut builder = LeafBuilder::new(&self.mem, 1, K::fixed_width(), V::fixed_width());
190 builder.push(key_bytes, value_bytes);
191 let page = builder.build()?;
192
193 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
194 let offset = accessor.offset_of_first_value();
195 let page_num = page.get_page_number();
196 let guard = AccessGuardMut::new(page, offset, value_bytes.len());
197
198 (BtreeHeader::new(page_num, DEFERRED, 1), None, guard)
199 };
200 *self.root = Some(new_root);
201 Ok((old_value, guard))
202 }
203
204 fn insert_helper(
205 &mut self,
206 page: PageImpl,
207 page_checksum: Checksum,
208 key: &[u8],
209 value: &[u8],
210 ) -> Result<InsertionResult<'a, V>> {
211 let node_mem = page.memory();
212 Ok(match node_mem[0] {
213 LEAF => {
214 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
215 let (position, found) = accessor.position::<K>(key);
216
217 let single_large_value = accessor.num_pairs() == 1
219 && accessor.total_length() >= self.mem.get_page_size();
220 if !found && single_large_value {
221 let mut builder =
222 LeafBuilder::new(&self.mem, 1, K::fixed_width(), V::fixed_width());
223 builder.push(key, value);
224 let new_page = builder.build()?;
225 let new_page_number = new_page.get_page_number();
226 let new_page_accessor =
227 LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
228 let offset = new_page_accessor.offset_of_first_value();
229 let guard = AccessGuardMut::new(new_page, offset, value.len());
230 return if position == 0 {
231 Ok(InsertionResult {
232 new_root: new_page_number,
233 root_checksum: DEFERRED,
234 additional_sibling: Some((
235 key.to_vec(),
236 page.get_page_number(),
237 page_checksum,
238 )),
239 inserted_value: guard,
240 old_value: None,
241 })
242 } else {
243 let split_key = accessor.last_entry().key().to_vec();
244 Ok(InsertionResult {
245 new_root: page.get_page_number(),
246 root_checksum: page_checksum,
247 additional_sibling: Some((split_key, new_page_number, DEFERRED)),
248 inserted_value: guard,
249 old_value: None,
250 })
251 };
252 }
253
254 if self.mem.uncommitted(page.get_page_number())
256 && self.modify_uncommitted
257 && LeafMutator::sufficient_insert_inplace_space(
258 &page,
259 position,
260 found,
261 K::fixed_width(),
262 V::fixed_width(),
263 key,
264 value,
265 )
266 {
267 let page_number = page.get_page_number();
268 let existing_value = if found {
269 let copied_value = accessor.entry(position).unwrap().value().to_vec();
270 Some(AccessGuard::with_owned_value(copied_value))
271 } else {
272 None
273 };
274 drop(page);
275 let mut page_mut = self.mem.get_page_mut(page_number)?;
276 let mut mutator =
277 LeafMutator::new(&mut page_mut, K::fixed_width(), V::fixed_width());
278 mutator.insert(position, found, key, value);
279 let new_page_accessor =
280 LeafAccessor::new(page_mut.memory(), K::fixed_width(), V::fixed_width());
281 let offset = new_page_accessor.offset_of_value(position).unwrap();
282 let guard = AccessGuardMut::new(page_mut, offset, value.len());
283 return Ok(InsertionResult {
284 new_root: page_number,
285 root_checksum: DEFERRED,
286 additional_sibling: None,
287 inserted_value: guard,
288 old_value: existing_value,
289 });
290 }
291
292 let mut builder = LeafBuilder::new(
293 &self.mem,
294 accessor.num_pairs() + 1,
295 K::fixed_width(),
296 V::fixed_width(),
297 );
298 for i in 0..accessor.num_pairs() {
299 if i == position {
300 builder.push(key, value);
301 }
302 if !found || i != position {
303 let entry = accessor.entry(i).unwrap();
304 builder.push(entry.key(), entry.value());
305 }
306 }
307 if accessor.num_pairs() == position {
308 builder.push(key, value);
309 }
310 if !builder.should_split() {
311 let new_page = builder.build()?;
312
313 let page_number = page.get_page_number();
314 let existing_value = if found {
315 let (start, end) = accessor.value_range(position).unwrap();
316 if self.modify_uncommitted && self.mem.uncommitted(page_number) {
317 let arc = page.to_arc();
318 drop(page);
319 self.mem.free(page_number);
320 Some(AccessGuard::with_arc_page(arc, start..end))
321 } else {
322 self.freed.push(page_number);
323 Some(AccessGuard::with_page(page, start..end))
324 }
325 } else {
326 drop(page);
327 self.conditional_free(page_number);
328 None
329 };
330
331 let new_page_number = new_page.get_page_number();
332 let accessor =
333 LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
334 let offset = accessor.offset_of_value(position).unwrap();
335 let guard = AccessGuardMut::new(new_page, offset, value.len());
336
337 InsertionResult {
338 new_root: new_page_number,
339 root_checksum: DEFERRED,
340 additional_sibling: None,
341 inserted_value: guard,
342 old_value: existing_value,
343 }
344 } else {
345 let (new_page1, split_key, new_page2) = builder.build_split()?;
346 let split_key = split_key.to_vec();
347 let page_number = page.get_page_number();
348 let existing_value = if found {
349 let (start, end) = accessor.value_range(position).unwrap();
350 if self.modify_uncommitted && self.mem.uncommitted(page_number) {
351 let arc = page.to_arc();
352 drop(page);
353 self.mem.free(page_number);
354 Some(AccessGuard::with_arc_page(arc, start..end))
355 } else {
356 self.freed.push(page_number);
357 Some(AccessGuard::with_page(page, start..end))
358 }
359 } else {
360 drop(page);
361 self.conditional_free(page_number);
362 None
363 };
364
365 let new_page_number = new_page1.get_page_number();
366 let new_page_number2 = new_page2.get_page_number();
367 let accessor =
368 LeafAccessor::new(new_page1.memory(), K::fixed_width(), V::fixed_width());
369 let division = accessor.num_pairs();
370 let guard = if position < division {
371 let accessor = LeafAccessor::new(
372 new_page1.memory(),
373 K::fixed_width(),
374 V::fixed_width(),
375 );
376 let offset = accessor.offset_of_value(position).unwrap();
377 AccessGuardMut::new(new_page1, offset, value.len())
378 } else {
379 let accessor = LeafAccessor::new(
380 new_page2.memory(),
381 K::fixed_width(),
382 V::fixed_width(),
383 );
384 let offset = accessor.offset_of_value(position - division).unwrap();
385 AccessGuardMut::new(new_page2, offset, value.len())
386 };
387
388 InsertionResult {
389 new_root: new_page_number,
390 root_checksum: DEFERRED,
391 additional_sibling: Some((split_key, new_page_number2, DEFERRED)),
392 inserted_value: guard,
393 old_value: existing_value,
394 }
395 }
396 }
397 BRANCH => {
398 let accessor = BranchAccessor::new(&page, K::fixed_width());
399 let (child_index, child_page) = accessor.child_for_key::<K>(key);
400 let child_checksum = accessor.child_checksum(child_index).unwrap();
401 let sub_result =
402 self.insert_helper(self.mem.get_page(child_page)?, child_checksum, key, value)?;
403
404 if sub_result.additional_sibling.is_none()
405 && self.modify_uncommitted
406 && self.mem.uncommitted(page.get_page_number())
407 {
408 let page_number = page.get_page_number();
409 drop(page);
410 let mut mutpage = self.mem.get_page_mut(page_number)?;
411 let mut mutator = BranchMutator::new(&mut mutpage);
412 mutator.write_child_page(
413 child_index,
414 sub_result.new_root,
415 sub_result.root_checksum,
416 );
417 return Ok(InsertionResult {
418 new_root: mutpage.get_page_number(),
419 root_checksum: DEFERRED,
420 additional_sibling: None,
421 inserted_value: sub_result.inserted_value,
422 old_value: sub_result.old_value,
423 });
424 }
425
426 let mut builder =
428 BranchBuilder::new(&self.mem, accessor.count_children() + 1, K::fixed_width());
429 if child_index == 0 {
430 builder.push_child(sub_result.new_root, sub_result.root_checksum);
431 if let Some((ref index_key2, page2, page2_checksum)) =
432 sub_result.additional_sibling
433 {
434 builder.push_key(index_key2);
435 builder.push_child(page2, page2_checksum);
436 }
437 } else {
438 builder.push_child(
439 accessor.child_page(0).unwrap(),
440 accessor.child_checksum(0).unwrap(),
441 );
442 }
443 for i in 1..accessor.count_children() {
444 if let Some(key) = accessor.key(i - 1) {
445 builder.push_key(key);
446 if i == child_index {
447 builder.push_child(sub_result.new_root, sub_result.root_checksum);
448 if let Some((ref index_key2, page2, page2_checksum)) =
449 sub_result.additional_sibling
450 {
451 builder.push_key(index_key2);
452 builder.push_child(page2, page2_checksum);
453 }
454 } else {
455 builder.push_child(
456 accessor.child_page(i).unwrap(),
457 accessor.child_checksum(i).unwrap(),
458 );
459 }
460 } else {
461 unreachable!();
462 }
463 }
464
465 let result = if builder.should_split() {
466 let (new_page1, split_key, new_page2) = builder.build_split()?;
467 InsertionResult {
468 new_root: new_page1.get_page_number(),
469 root_checksum: DEFERRED,
470 additional_sibling: Some((
471 split_key.to_vec(),
472 new_page2.get_page_number(),
473 DEFERRED,
474 )),
475 inserted_value: sub_result.inserted_value,
476 old_value: sub_result.old_value,
477 }
478 } else {
479 let new_page = builder.build()?;
480 InsertionResult {
481 new_root: new_page.get_page_number(),
482 root_checksum: DEFERRED,
483 additional_sibling: None,
484 inserted_value: sub_result.inserted_value,
485 old_value: sub_result.old_value,
486 }
487 };
488 let page_number = page.get_page_number();
490 drop(page);
491 self.conditional_free(page_number);
492
493 result
494 }
495 _ => unreachable!(),
496 })
497 }
498
499 pub(crate) fn insert_inplace(
500 &mut self,
501 key: &K::SelfType<'_>,
502 value: &V::SelfType<'_>,
503 ) -> Result<()> {
504 assert!(self.modify_uncommitted);
505 let header = self.root.expect("Key not found (tree is empty)");
506 self.insert_inplace_helper(
507 self.mem.get_page_mut(header.root)?,
508 K::as_bytes(key).as_ref(),
509 V::as_bytes(value).as_ref(),
510 )?;
511 *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length));
512 Ok(())
513 }
514
515 fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> {
516 assert!(self.mem.uncommitted(page.get_page_number()));
517
518 let node_mem = page.memory();
519 match node_mem[0] {
520 LEAF => {
521 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
522 let (position, found) = accessor.position::<K>(key);
523 assert!(found);
524 let old_len = accessor.entry(position).unwrap().value().len();
525 assert!(value.len() <= old_len);
526 let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width());
527 mutator.insert(position, true, key, value);
528 }
529 BRANCH => {
530 let accessor = BranchAccessor::new(&page, K::fixed_width());
531 let (child_index, child_page) = accessor.child_for_key::<K>(key);
532 self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?;
533 let mut mutator = BranchMutator::new(&mut page);
534 mutator.write_child_page(child_index, child_page, DEFERRED);
535 }
536 _ => unreachable!(),
537 }
538
539 Ok(())
540 }
541
542 fn delete_leaf_helper(
543 &mut self,
544 page: PageImpl,
545 checksum: Checksum,
546 key: &[u8],
547 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
548 let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
549 let (position, found) = accessor.position::<K>(key);
550 if !found {
551 return Ok((Subtree(page.get_page_number(), checksum), None));
552 }
553 let new_kv_bytes = accessor.length_of_pairs(0, accessor.num_pairs())
554 - accessor.length_of_pairs(position, position + 1);
555 let new_required_bytes = RawLeafBuilder::required_bytes(
556 accessor.num_pairs() - 1,
557 new_kv_bytes,
558 K::fixed_width(),
559 V::fixed_width(),
560 );
561 let uncommitted = self.mem.uncommitted(page.get_page_number());
562
563 if uncommitted
565 && self.modify_uncommitted
566 && new_required_bytes >= self.mem.get_page_size() / 2
567 && accessor.num_pairs() > 1
568 {
569 let (start, end) = accessor.value_range(position).unwrap();
570 let page_number = page.get_page_number();
571 drop(page);
572 let page_mut = self.mem.get_page_mut(page_number)?;
573
574 let guard = AccessGuard::remove_on_drop(
575 page_mut,
576 start,
577 end - start,
578 position,
579 K::fixed_width(),
580 );
581 return Ok((Subtree(page_number, DEFERRED), Some(guard)));
582 }
583
584 let result = if accessor.num_pairs() == 1 {
585 DeletedLeaf
586 } else if new_required_bytes < self.mem.get_page_size() / 3 {
587 PartialLeaf {
590 page: page.to_arc(),
591 deleted_pair: position,
592 }
593 } else {
594 let mut builder = LeafBuilder::new(
595 &self.mem,
596 accessor.num_pairs() - 1,
597 K::fixed_width(),
598 V::fixed_width(),
599 );
600 for i in 0..accessor.num_pairs() {
601 if i == position {
602 continue;
603 }
604 let entry = accessor.entry(i).unwrap();
605 builder.push(entry.key(), entry.value());
606 }
607 let new_page = builder.build()?;
608 Subtree(new_page.get_page_number(), DEFERRED)
609 };
610 let (start, end) = accessor.value_range(position).unwrap();
611 let guard = if uncommitted && self.modify_uncommitted {
612 let page_number = page.get_page_number();
613 let arc = page.to_arc();
614 drop(page);
615 self.mem.free(page_number);
616 Some(AccessGuard::with_arc_page(arc, start..end))
617 } else {
618 self.freed.push(page.get_page_number());
621 Some(AccessGuard::with_page(page, start..end))
622 };
623 Ok((result, guard))
624 }
625
626 fn finalize_branch_builder(&self, builder: BranchBuilder<'_, '_>) -> Result<DeletionResult> {
627 let result = if let Some((only_child, checksum)) = builder.to_single_child() {
628 DeletedBranch(only_child, checksum)
629 } else {
630 let new_page = builder.build()?;
633 let accessor = BranchAccessor::new(&new_page, K::fixed_width());
634 if accessor.total_length() < self.mem.get_page_size() / 3 {
637 PartialBranch(new_page.get_page_number(), DEFERRED)
638 } else {
639 Subtree(new_page.get_page_number(), DEFERRED)
640 }
641 };
642 Ok(result)
643 }
644
645 fn delete_branch_helper(
646 &mut self,
647 page: PageImpl,
648 checksum: Checksum,
649 key: &[u8],
650 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
651 let accessor = BranchAccessor::new(&page, K::fixed_width());
652 let original_page_number = page.get_page_number();
653 let (child_index, child_page_number) = accessor.child_for_key::<K>(key);
654 let child_checksum = accessor.child_checksum(child_index).unwrap();
655 let (result, found) =
656 self.delete_helper(self.mem.get_page(child_page_number)?, child_checksum, key)?;
657 if found.is_none() {
658 return Ok((Subtree(original_page_number, checksum), None));
659 }
660 if let Subtree(new_child, new_child_checksum) = result {
661 let result_page =
662 if self.mem.uncommitted(original_page_number) && self.modify_uncommitted {
663 drop(page);
664 let mut mutpage = self.mem.get_page_mut(original_page_number)?;
665 let mut mutator = BranchMutator::new(&mut mutpage);
666 mutator.write_child_page(child_index, new_child, new_child_checksum);
667 original_page_number
668 } else {
669 let mut builder =
670 BranchBuilder::new(&self.mem, accessor.count_children(), K::fixed_width());
671 builder.push_all(&accessor);
672 builder.replace_child(child_index, new_child, new_child_checksum);
673 let new_page = builder.build()?;
674 self.conditional_free(original_page_number);
675 new_page.get_page_number()
676 };
677 return Ok((Subtree(result_page, DEFERRED), found));
678 }
679
680 let mut builder =
682 BranchBuilder::new(&self.mem, accessor.count_children(), K::fixed_width());
683
684 let final_result = match result {
685 Subtree(_, _) => {
686 unreachable!();
688 }
689 DeletedLeaf => {
690 for i in 0..accessor.count_children() {
691 if i == child_index {
692 continue;
693 }
694 builder.push_child(
695 accessor.child_page(i).unwrap(),
696 accessor.child_checksum(i).unwrap(),
697 );
698 }
699 let end = if child_index == accessor.count_children() - 1 {
700 accessor.count_children() - 2
702 } else {
703 accessor.count_children() - 1
704 };
705 for i in 0..end {
706 if i == child_index {
707 continue;
708 }
709 builder.push_key(accessor.key(i).unwrap());
710 }
711 self.finalize_branch_builder(builder)?
712 }
713 PartialLeaf {
714 page: partial_child_page,
715 deleted_pair,
716 } => {
717 let partial_child_accessor =
718 LeafAccessor::new(&partial_child_page, K::fixed_width(), V::fixed_width());
719 debug_assert!(partial_child_accessor.num_pairs() > 1);
720
721 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
722 debug_assert!(merge_with < accessor.count_children());
723 let merge_with_page = self
724 .mem
725 .get_page(accessor.child_page(merge_with).unwrap())?;
726 let merge_with_accessor =
727 LeafAccessor::new(merge_with_page.memory(), K::fixed_width(), V::fixed_width());
728
729 let single_large_value = merge_with_accessor.num_pairs() == 1
730 && merge_with_accessor.total_length() >= self.mem.get_page_size();
731 if single_large_value {
733 let mut child_builder = LeafBuilder::new(
734 &self.mem,
735 partial_child_accessor.num_pairs() - 1,
736 K::fixed_width(),
737 V::fixed_width(),
738 );
739 child_builder.push_all_except(&partial_child_accessor, Some(deleted_pair));
740 let new_page = child_builder.build()?;
741 builder.push_all(&accessor);
742 builder.replace_child(child_index, new_page.get_page_number(), DEFERRED);
743
744 let result = self.finalize_branch_builder(builder)?;
745
746 drop(page);
747 self.conditional_free(original_page_number);
748 return Ok((result, found));
752 }
753
754 for i in 0..accessor.count_children() {
755 if i == child_index {
756 continue;
757 }
758 let page_number = accessor.child_page(i).unwrap();
759 let page_checksum = accessor.child_checksum(i).unwrap();
760 if i == merge_with {
761 let mut child_builder = LeafBuilder::new(
762 &self.mem,
763 partial_child_accessor.num_pairs() - 1
764 + merge_with_accessor.num_pairs(),
765 K::fixed_width(),
766 V::fixed_width(),
767 );
768 if child_index < merge_with {
769 child_builder
770 .push_all_except(&partial_child_accessor, Some(deleted_pair));
771 }
772 child_builder.push_all_except(&merge_with_accessor, None);
773 if child_index > merge_with {
774 child_builder
775 .push_all_except(&partial_child_accessor, Some(deleted_pair));
776 }
777 if child_builder.should_split() {
778 let (new_page1, split_key, new_page2) = child_builder.build_split()?;
779 builder.push_key(split_key);
780 builder.push_child(new_page1.get_page_number(), DEFERRED);
781 builder.push_child(new_page2.get_page_number(), DEFERRED);
782 } else {
783 let new_page = child_builder.build()?;
784 builder.push_child(new_page.get_page_number(), DEFERRED);
785 }
786
787 let merged_key_index = max(child_index, merge_with);
788 if merged_key_index < accessor.count_children() - 1 {
789 builder.push_key(accessor.key(merged_key_index).unwrap());
790 }
791 } else {
792 builder.push_child(page_number, page_checksum);
793 if i < accessor.count_children() - 1 {
794 builder.push_key(accessor.key(i).unwrap());
795 }
796 }
797 }
798
799 let result = self.finalize_branch_builder(builder)?;
800
801 let page_number = merge_with_page.get_page_number();
802 drop(merge_with_page);
803 self.conditional_free(page_number);
804 result
808 }
809 DeletedBranch(only_grandchild, grandchild_checksum) => {
810 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
811 let merge_with_page = self
812 .mem
813 .get_page(accessor.child_page(merge_with).unwrap())?;
814 let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
815 debug_assert!(merge_with < accessor.count_children());
816 for i in 0..accessor.count_children() {
817 if i == child_index {
818 continue;
819 }
820 let page_number = accessor.child_page(i).unwrap();
821 let page_checksum = accessor.child_checksum(i).unwrap();
822 if i == merge_with {
823 let mut child_builder = BranchBuilder::new(
824 &self.mem,
825 merge_with_accessor.count_children() + 1,
826 K::fixed_width(),
827 );
828 let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
829 if child_index < merge_with {
830 child_builder.push_child(only_grandchild, grandchild_checksum);
831 child_builder.push_key(separator_key);
832 }
833 child_builder.push_all(&merge_with_accessor);
834 if child_index > merge_with {
835 child_builder.push_key(separator_key);
836 child_builder.push_child(only_grandchild, grandchild_checksum);
837 }
838 if child_builder.should_split() {
839 let (new_page1, separator, new_page2) = child_builder.build_split()?;
840 builder.push_child(new_page1.get_page_number(), DEFERRED);
841 builder.push_key(separator);
842 builder.push_child(new_page2.get_page_number(), DEFERRED);
843 } else {
844 let new_page = child_builder.build()?;
845 builder.push_child(new_page.get_page_number(), DEFERRED);
846 }
847
848 let merged_key_index = max(child_index, merge_with);
849 if merged_key_index < accessor.count_children() - 1 {
850 builder.push_key(accessor.key(merged_key_index).unwrap());
851 }
852 } else {
853 builder.push_child(page_number, page_checksum);
854 if i < accessor.count_children() - 1 {
855 builder.push_key(accessor.key(i).unwrap());
856 }
857 }
858 }
859 let result = self.finalize_branch_builder(builder)?;
860
861 let page_number = merge_with_page.get_page_number();
862 drop(merge_with_page);
863 self.conditional_free(page_number);
864
865 result
866 }
867 PartialBranch(partial_child, ..) => {
868 let partial_child_page = self.mem.get_page(partial_child)?;
869 let partial_child_accessor =
870 BranchAccessor::new(&partial_child_page, K::fixed_width());
871 let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
872 let merge_with_page = self
873 .mem
874 .get_page(accessor.child_page(merge_with).unwrap())?;
875 let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
876 debug_assert!(merge_with < accessor.count_children());
877 for i in 0..accessor.count_children() {
878 if i == child_index {
879 continue;
880 }
881 let page_number = accessor.child_page(i).unwrap();
882 let page_checksum = accessor.child_checksum(i).unwrap();
883 if i == merge_with {
884 let mut child_builder = BranchBuilder::new(
885 &self.mem,
886 merge_with_accessor.count_children()
887 + partial_child_accessor.count_children(),
888 K::fixed_width(),
889 );
890 let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
891 if child_index < merge_with {
892 child_builder.push_all(&partial_child_accessor);
893 child_builder.push_key(separator_key);
894 }
895 child_builder.push_all(&merge_with_accessor);
896 if child_index > merge_with {
897 child_builder.push_key(separator_key);
898 child_builder.push_all(&partial_child_accessor);
899 }
900 if child_builder.should_split() {
901 let (new_page1, separator, new_page2) = child_builder.build_split()?;
902 builder.push_child(new_page1.get_page_number(), DEFERRED);
903 builder.push_key(separator);
904 builder.push_child(new_page2.get_page_number(), DEFERRED);
905 } else {
906 let new_page = child_builder.build()?;
907 builder.push_child(new_page.get_page_number(), DEFERRED);
908 }
909
910 let merged_key_index = max(child_index, merge_with);
911 if merged_key_index < accessor.count_children() - 1 {
912 builder.push_key(accessor.key(merged_key_index).unwrap());
913 }
914 } else {
915 builder.push_child(page_number, page_checksum);
916 if i < accessor.count_children() - 1 {
917 builder.push_key(accessor.key(i).unwrap());
918 }
919 }
920 }
921 let result = self.finalize_branch_builder(builder)?;
922
923 let page_number = merge_with_page.get_page_number();
924 drop(merge_with_page);
925 self.conditional_free(page_number);
926 drop(partial_child_page);
927 self.conditional_free(partial_child);
928
929 result
930 }
931 };
932
933 drop(page);
934 self.conditional_free(original_page_number);
935
936 Ok((final_result, found))
937 }
938
939 fn delete_helper(
942 &mut self,
943 page: PageImpl,
944 checksum: Checksum,
945 key: &[u8],
946 ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
947 let node_mem = page.memory();
948 match node_mem[0] {
949 LEAF => self.delete_leaf_helper(page, checksum, key),
950 BRANCH => self.delete_branch_helper(page, checksum, key),
951 _ => unreachable!(),
952 }
953 }
954}