redb/tree_store/
btree_mutator.rs

1use crate::tree_store::btree_base::{
2    BranchAccessor, BranchBuilder, BranchMutator, Checksum, LeafAccessor, LeafBuilder, LeafMutator,
3    BRANCH, DEFERRED, LEAF,
4};
5use crate::tree_store::btree_mutator::DeletionResult::{
6    DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree,
7};
8use crate::tree_store::page_store::{Page, PageImpl, PageMut};
9use crate::tree_store::{
10    AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{AccessGuard, Result};
14use std::cmp::{max, min};
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18// TODO: it seems like Checksum can be removed from most/all of these, now that we're using deferred checksums
19#[derive(Debug)]
20enum DeletionResult {
21    // A proper subtree
22    Subtree(PageNumber, Checksum),
23    // A leaf with zero children
24    DeletedLeaf,
25    // A leaf with fewer entries than desired
26    PartialLeaf {
27        page: Arc<[u8]>,
28        deleted_pair: usize,
29    },
30    // A branch page subtree with fewer children than desired
31    PartialBranch(PageNumber, Checksum),
32    // Indicates that the branch node was deleted, and includes the only remaining child
33    DeletedBranch(PageNumber, Checksum),
34}
35
36struct InsertionResult<'a, V: Value + 'static> {
37    // the new root page
38    new_root: PageNumber,
39    // checksum of the root page
40    root_checksum: Checksum,
41    // Following sibling, if the root had to be split
42    additional_sibling: Option<(Vec<u8>, PageNumber, Checksum)>,
43    // The inserted value for .insert_reserve() to use
44    inserted_value: AccessGuardMut<'a, V>,
45    // The previous value, if any
46    old_value: Option<AccessGuard<'a, V>>,
47}
48
49pub(crate) struct MutateHelper<'a, 'b, K: Key, V: Value> {
50    root: &'b mut Option<BtreeHeader>,
51    modify_uncommitted: bool,
52    mem: Arc<TransactionalMemory>,
53    freed: &'b mut Vec<PageNumber>,
54    _key_type: PhantomData<K>,
55    _value_type: PhantomData<V>,
56    _lifetime: PhantomData<&'a ()>,
57}
58
59impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
60    pub(crate) fn new(
61        root: &'b mut Option<BtreeHeader>,
62        mem: Arc<TransactionalMemory>,
63        freed: &'b mut Vec<PageNumber>,
64    ) -> Self {
65        Self {
66            root,
67            modify_uncommitted: true,
68            mem,
69            freed,
70            _key_type: Default::default(),
71            _value_type: Default::default(),
72            _lifetime: Default::default(),
73        }
74    }
75
76    // Creates a new mutator which will not modify any existing uncommitted pages, or free any existing pages.
77    // It will still queue pages for future freeing in the freed vec
78    pub(crate) fn new_do_not_modify(
79        root: &'b mut Option<BtreeHeader>,
80        mem: Arc<TransactionalMemory>,
81        freed: &'b mut Vec<PageNumber>,
82    ) -> Self {
83        Self {
84            root,
85            modify_uncommitted: false,
86            mem,
87            freed,
88            _key_type: Default::default(),
89            _value_type: Default::default(),
90            _lifetime: Default::default(),
91        }
92    }
93
94    fn conditional_free(&mut self, page_number: PageNumber) {
95        if self.modify_uncommitted {
96            if !self.mem.free_if_uncommitted(page_number) {
97                self.freed.push(page_number);
98            }
99        } else {
100            self.freed.push(page_number);
101        }
102    }
103
104    pub(crate) fn delete(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'a, V>>> {
105        if let Some(BtreeHeader {
106            root: p,
107            checksum,
108            length,
109        }) = *self.root
110        {
111            let (deletion_result, found) =
112                self.delete_helper(self.mem.get_page(p)?, checksum, K::as_bytes(key).as_ref())?;
113            let new_length = if found.is_some() { length - 1 } else { length };
114            let new_root = match deletion_result {
115                Subtree(page, checksum) => Some(BtreeHeader::new(page, checksum, new_length)),
116                DeletedLeaf => None,
117                PartialLeaf { page, deleted_pair } => {
118                    let accessor = LeafAccessor::new(&page, K::fixed_width(), V::fixed_width());
119                    let mut builder = LeafBuilder::new(
120                        &self.mem,
121                        accessor.num_pairs() - 1,
122                        K::fixed_width(),
123                        V::fixed_width(),
124                    );
125                    builder.push_all_except(&accessor, Some(deleted_pair));
126                    let page = builder.build()?;
127                    assert_eq!(new_length, accessor.num_pairs() as u64 - 1);
128                    Some(BtreeHeader::new(
129                        page.get_page_number(),
130                        DEFERRED,
131                        new_length,
132                    ))
133                }
134                PartialBranch(page_number, checksum) => {
135                    Some(BtreeHeader::new(page_number, checksum, new_length))
136                }
137                DeletedBranch(remaining_child, checksum) => {
138                    Some(BtreeHeader::new(remaining_child, checksum, new_length))
139                }
140            };
141            *self.root = new_root;
142            Ok(found)
143        } else {
144            Ok(None)
145        }
146    }
147
148    #[allow(clippy::type_complexity)]
149    pub(crate) fn insert(
150        &mut self,
151        key: &K::SelfType<'_>,
152        value: &V::SelfType<'_>,
153    ) -> Result<(Option<AccessGuard<'a, V>>, AccessGuardMut<'a, V>)> {
154        let (new_root, old_value, guard) = if let Some(BtreeHeader {
155            root: p,
156            checksum,
157            length,
158        }) = *self.root
159        {
160            let result = self.insert_helper(
161                self.mem.get_page(p)?,
162                checksum,
163                K::as_bytes(key).as_ref(),
164                V::as_bytes(value).as_ref(),
165            )?;
166
167            let new_length = if result.old_value.is_some() {
168                length
169            } else {
170                length + 1
171            };
172
173            let new_root = if let Some((key, page2, page2_checksum)) = result.additional_sibling {
174                let mut builder = BranchBuilder::new(&self.mem, 2, K::fixed_width());
175                builder.push_child(result.new_root, result.root_checksum);
176                builder.push_key(&key);
177                builder.push_child(page2, page2_checksum);
178                let new_page = builder.build()?;
179                BtreeHeader::new(new_page.get_page_number(), DEFERRED, new_length)
180            } else {
181                BtreeHeader::new(result.new_root, result.root_checksum, new_length)
182            };
183            (new_root, result.old_value, result.inserted_value)
184        } else {
185            let key_bytes = K::as_bytes(key);
186            let value_bytes = V::as_bytes(value);
187            let key_bytes = key_bytes.as_ref();
188            let value_bytes = value_bytes.as_ref();
189            let mut builder = LeafBuilder::new(&self.mem, 1, K::fixed_width(), V::fixed_width());
190            builder.push(key_bytes, value_bytes);
191            let page = builder.build()?;
192
193            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
194            let offset = accessor.offset_of_first_value();
195            let page_num = page.get_page_number();
196            let guard = AccessGuardMut::new(page, offset, value_bytes.len());
197
198            (BtreeHeader::new(page_num, DEFERRED, 1), None, guard)
199        };
200        *self.root = Some(new_root);
201        Ok((old_value, guard))
202    }
203
204    fn insert_helper(
205        &mut self,
206        page: PageImpl,
207        page_checksum: Checksum,
208        key: &[u8],
209        value: &[u8],
210    ) -> Result<InsertionResult<'a, V>> {
211        let node_mem = page.memory();
212        Ok(match node_mem[0] {
213            LEAF => {
214                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
215                let (position, found) = accessor.position::<K>(key);
216
217                // Fast-path to avoid re-building and splitting pages with a single large value
218                let single_large_value = accessor.num_pairs() == 1
219                    && accessor.total_length() >= self.mem.get_page_size();
220                if !found && single_large_value {
221                    let mut builder =
222                        LeafBuilder::new(&self.mem, 1, K::fixed_width(), V::fixed_width());
223                    builder.push(key, value);
224                    let new_page = builder.build()?;
225                    let new_page_number = new_page.get_page_number();
226                    let new_page_accessor =
227                        LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
228                    let offset = new_page_accessor.offset_of_first_value();
229                    let guard = AccessGuardMut::new(new_page, offset, value.len());
230                    return if position == 0 {
231                        Ok(InsertionResult {
232                            new_root: new_page_number,
233                            root_checksum: DEFERRED,
234                            additional_sibling: Some((
235                                key.to_vec(),
236                                page.get_page_number(),
237                                page_checksum,
238                            )),
239                            inserted_value: guard,
240                            old_value: None,
241                        })
242                    } else {
243                        let split_key = accessor.last_entry().key().to_vec();
244                        Ok(InsertionResult {
245                            new_root: page.get_page_number(),
246                            root_checksum: page_checksum,
247                            additional_sibling: Some((split_key, new_page_number, DEFERRED)),
248                            inserted_value: guard,
249                            old_value: None,
250                        })
251                    };
252                }
253
254                // Fast-path for uncommitted pages, that can be modified in-place
255                if self.mem.uncommitted(page.get_page_number())
256                    && self.modify_uncommitted
257                    && LeafMutator::sufficient_insert_inplace_space(
258                        &page,
259                        position,
260                        found,
261                        K::fixed_width(),
262                        V::fixed_width(),
263                        key,
264                        value,
265                    )
266                {
267                    let page_number = page.get_page_number();
268                    let existing_value = if found {
269                        let copied_value = accessor.entry(position).unwrap().value().to_vec();
270                        Some(AccessGuard::with_owned_value(copied_value))
271                    } else {
272                        None
273                    };
274                    drop(page);
275                    let mut page_mut = self.mem.get_page_mut(page_number)?;
276                    let mut mutator =
277                        LeafMutator::new(&mut page_mut, K::fixed_width(), V::fixed_width());
278                    mutator.insert(position, found, key, value);
279                    let new_page_accessor =
280                        LeafAccessor::new(page_mut.memory(), K::fixed_width(), V::fixed_width());
281                    let offset = new_page_accessor.offset_of_value(position).unwrap();
282                    let guard = AccessGuardMut::new(page_mut, offset, value.len());
283                    return Ok(InsertionResult {
284                        new_root: page_number,
285                        root_checksum: DEFERRED,
286                        additional_sibling: None,
287                        inserted_value: guard,
288                        old_value: existing_value,
289                    });
290                }
291
292                let mut builder = LeafBuilder::new(
293                    &self.mem,
294                    accessor.num_pairs() + 1,
295                    K::fixed_width(),
296                    V::fixed_width(),
297                );
298                for i in 0..accessor.num_pairs() {
299                    if i == position {
300                        builder.push(key, value);
301                    }
302                    if !found || i != position {
303                        let entry = accessor.entry(i).unwrap();
304                        builder.push(entry.key(), entry.value());
305                    }
306                }
307                if accessor.num_pairs() == position {
308                    builder.push(key, value);
309                }
310                if !builder.should_split() {
311                    let new_page = builder.build()?;
312
313                    let page_number = page.get_page_number();
314                    let existing_value = if found {
315                        let (start, end) = accessor.value_range(position).unwrap();
316                        if self.modify_uncommitted && self.mem.uncommitted(page_number) {
317                            let arc = page.to_arc();
318                            drop(page);
319                            self.mem.free(page_number);
320                            Some(AccessGuard::with_arc_page(arc, start..end))
321                        } else {
322                            self.freed.push(page_number);
323                            Some(AccessGuard::with_page(page, start..end))
324                        }
325                    } else {
326                        drop(page);
327                        self.conditional_free(page_number);
328                        None
329                    };
330
331                    let new_page_number = new_page.get_page_number();
332                    let accessor =
333                        LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
334                    let offset = accessor.offset_of_value(position).unwrap();
335                    let guard = AccessGuardMut::new(new_page, offset, value.len());
336
337                    InsertionResult {
338                        new_root: new_page_number,
339                        root_checksum: DEFERRED,
340                        additional_sibling: None,
341                        inserted_value: guard,
342                        old_value: existing_value,
343                    }
344                } else {
345                    let (new_page1, split_key, new_page2) = builder.build_split()?;
346                    let split_key = split_key.to_vec();
347                    let page_number = page.get_page_number();
348                    let existing_value = if found {
349                        let (start, end) = accessor.value_range(position).unwrap();
350                        if self.modify_uncommitted && self.mem.uncommitted(page_number) {
351                            let arc = page.to_arc();
352                            drop(page);
353                            self.mem.free(page_number);
354                            Some(AccessGuard::with_arc_page(arc, start..end))
355                        } else {
356                            self.freed.push(page_number);
357                            Some(AccessGuard::with_page(page, start..end))
358                        }
359                    } else {
360                        drop(page);
361                        self.conditional_free(page_number);
362                        None
363                    };
364
365                    let new_page_number = new_page1.get_page_number();
366                    let new_page_number2 = new_page2.get_page_number();
367                    let accessor =
368                        LeafAccessor::new(new_page1.memory(), K::fixed_width(), V::fixed_width());
369                    let division = accessor.num_pairs();
370                    let guard = if position < division {
371                        let accessor = LeafAccessor::new(
372                            new_page1.memory(),
373                            K::fixed_width(),
374                            V::fixed_width(),
375                        );
376                        let offset = accessor.offset_of_value(position).unwrap();
377                        AccessGuardMut::new(new_page1, offset, value.len())
378                    } else {
379                        let accessor = LeafAccessor::new(
380                            new_page2.memory(),
381                            K::fixed_width(),
382                            V::fixed_width(),
383                        );
384                        let offset = accessor.offset_of_value(position - division).unwrap();
385                        AccessGuardMut::new(new_page2, offset, value.len())
386                    };
387
388                    InsertionResult {
389                        new_root: new_page_number,
390                        root_checksum: DEFERRED,
391                        additional_sibling: Some((split_key, new_page_number2, DEFERRED)),
392                        inserted_value: guard,
393                        old_value: existing_value,
394                    }
395                }
396            }
397            BRANCH => {
398                let accessor = BranchAccessor::new(&page, K::fixed_width());
399                let (child_index, child_page) = accessor.child_for_key::<K>(key);
400                let child_checksum = accessor.child_checksum(child_index).unwrap();
401                let sub_result =
402                    self.insert_helper(self.mem.get_page(child_page)?, child_checksum, key, value)?;
403
404                if sub_result.additional_sibling.is_none()
405                    && self.modify_uncommitted
406                    && self.mem.uncommitted(page.get_page_number())
407                {
408                    let page_number = page.get_page_number();
409                    drop(page);
410                    let mut mutpage = self.mem.get_page_mut(page_number)?;
411                    let mut mutator = BranchMutator::new(&mut mutpage);
412                    mutator.write_child_page(
413                        child_index,
414                        sub_result.new_root,
415                        sub_result.root_checksum,
416                    );
417                    return Ok(InsertionResult {
418                        new_root: mutpage.get_page_number(),
419                        root_checksum: DEFERRED,
420                        additional_sibling: None,
421                        inserted_value: sub_result.inserted_value,
422                        old_value: sub_result.old_value,
423                    });
424                }
425
426                // A child was added, or we couldn't use the fast-path above
427                let mut builder =
428                    BranchBuilder::new(&self.mem, accessor.count_children() + 1, K::fixed_width());
429                if child_index == 0 {
430                    builder.push_child(sub_result.new_root, sub_result.root_checksum);
431                    if let Some((ref index_key2, page2, page2_checksum)) =
432                        sub_result.additional_sibling
433                    {
434                        builder.push_key(index_key2);
435                        builder.push_child(page2, page2_checksum);
436                    }
437                } else {
438                    builder.push_child(
439                        accessor.child_page(0).unwrap(),
440                        accessor.child_checksum(0).unwrap(),
441                    );
442                }
443                for i in 1..accessor.count_children() {
444                    if let Some(key) = accessor.key(i - 1) {
445                        builder.push_key(key);
446                        if i == child_index {
447                            builder.push_child(sub_result.new_root, sub_result.root_checksum);
448                            if let Some((ref index_key2, page2, page2_checksum)) =
449                                sub_result.additional_sibling
450                            {
451                                builder.push_key(index_key2);
452                                builder.push_child(page2, page2_checksum);
453                            }
454                        } else {
455                            builder.push_child(
456                                accessor.child_page(i).unwrap(),
457                                accessor.child_checksum(i).unwrap(),
458                            );
459                        }
460                    } else {
461                        unreachable!();
462                    }
463                }
464
465                let result = if builder.should_split() {
466                    let (new_page1, split_key, new_page2) = builder.build_split()?;
467                    InsertionResult {
468                        new_root: new_page1.get_page_number(),
469                        root_checksum: DEFERRED,
470                        additional_sibling: Some((
471                            split_key.to_vec(),
472                            new_page2.get_page_number(),
473                            DEFERRED,
474                        )),
475                        inserted_value: sub_result.inserted_value,
476                        old_value: sub_result.old_value,
477                    }
478                } else {
479                    let new_page = builder.build()?;
480                    InsertionResult {
481                        new_root: new_page.get_page_number(),
482                        root_checksum: DEFERRED,
483                        additional_sibling: None,
484                        inserted_value: sub_result.inserted_value,
485                        old_value: sub_result.old_value,
486                    }
487                };
488                // Free the original page, since we've replaced it
489                let page_number = page.get_page_number();
490                drop(page);
491                self.conditional_free(page_number);
492
493                result
494            }
495            _ => unreachable!(),
496        })
497    }
498
499    pub(crate) fn insert_inplace(
500        &mut self,
501        key: &K::SelfType<'_>,
502        value: &V::SelfType<'_>,
503    ) -> Result<()> {
504        assert!(self.modify_uncommitted);
505        let header = self.root.expect("Key not found (tree is empty)");
506        self.insert_inplace_helper(
507            self.mem.get_page_mut(header.root)?,
508            K::as_bytes(key).as_ref(),
509            V::as_bytes(value).as_ref(),
510        )?;
511        *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length));
512        Ok(())
513    }
514
515    fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> {
516        assert!(self.mem.uncommitted(page.get_page_number()));
517
518        let node_mem = page.memory();
519        match node_mem[0] {
520            LEAF => {
521                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
522                let (position, found) = accessor.position::<K>(key);
523                assert!(found);
524                let old_len = accessor.entry(position).unwrap().value().len();
525                assert!(value.len() <= old_len);
526                let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width());
527                mutator.insert(position, true, key, value);
528            }
529            BRANCH => {
530                let accessor = BranchAccessor::new(&page, K::fixed_width());
531                let (child_index, child_page) = accessor.child_for_key::<K>(key);
532                self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?;
533                let mut mutator = BranchMutator::new(&mut page);
534                mutator.write_child_page(child_index, child_page, DEFERRED);
535            }
536            _ => unreachable!(),
537        }
538
539        Ok(())
540    }
541
542    fn delete_leaf_helper(
543        &mut self,
544        page: PageImpl,
545        checksum: Checksum,
546        key: &[u8],
547    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
548        let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
549        let (position, found) = accessor.position::<K>(key);
550        if !found {
551            return Ok((Subtree(page.get_page_number(), checksum), None));
552        }
553        let new_kv_bytes = accessor.length_of_pairs(0, accessor.num_pairs())
554            - accessor.length_of_pairs(position, position + 1);
555        let new_required_bytes = RawLeafBuilder::required_bytes(
556            accessor.num_pairs() - 1,
557            new_kv_bytes,
558            K::fixed_width(),
559            V::fixed_width(),
560        );
561        let uncommitted = self.mem.uncommitted(page.get_page_number());
562
563        // Fast-path for dirty pages
564        if uncommitted
565            && self.modify_uncommitted
566            && new_required_bytes >= self.mem.get_page_size() / 2
567            && accessor.num_pairs() > 1
568        {
569            let (start, end) = accessor.value_range(position).unwrap();
570            let page_number = page.get_page_number();
571            drop(page);
572            let page_mut = self.mem.get_page_mut(page_number)?;
573
574            let guard = AccessGuard::remove_on_drop(
575                page_mut,
576                start,
577                end - start,
578                position,
579                K::fixed_width(),
580            );
581            return Ok((Subtree(page_number, DEFERRED), Some(guard)));
582        }
583
584        let result = if accessor.num_pairs() == 1 {
585            DeletedLeaf
586        } else if new_required_bytes < self.mem.get_page_size() / 3 {
587            // Merge when less than 33% full. Splits occur when a page is full and produce two 50%
588            // full pages, so we use 33% instead of 50% to avoid oscillating
589            PartialLeaf {
590                page: page.to_arc(),
591                deleted_pair: position,
592            }
593        } else {
594            let mut builder = LeafBuilder::new(
595                &self.mem,
596                accessor.num_pairs() - 1,
597                K::fixed_width(),
598                V::fixed_width(),
599            );
600            for i in 0..accessor.num_pairs() {
601                if i == position {
602                    continue;
603                }
604                let entry = accessor.entry(i).unwrap();
605                builder.push(entry.key(), entry.value());
606            }
607            let new_page = builder.build()?;
608            Subtree(new_page.get_page_number(), DEFERRED)
609        };
610        let (start, end) = accessor.value_range(position).unwrap();
611        let guard = if uncommitted && self.modify_uncommitted {
612            let page_number = page.get_page_number();
613            let arc = page.to_arc();
614            drop(page);
615            self.mem.free(page_number);
616            Some(AccessGuard::with_arc_page(arc, start..end))
617        } else {
618            // Won't be freed until the end of the transaction, so returning the page
619            // in the AccessGuard below is still safe
620            self.freed.push(page.get_page_number());
621            Some(AccessGuard::with_page(page, start..end))
622        };
623        Ok((result, guard))
624    }
625
626    fn finalize_branch_builder(&self, builder: BranchBuilder<'_, '_>) -> Result<DeletionResult> {
627        let result = if let Some((only_child, checksum)) = builder.to_single_child() {
628            DeletedBranch(only_child, checksum)
629        } else {
630            // TODO: can we optimize away this page allocation?
631            // The PartialInternal gets returned, and then the caller has to merge it immediately
632            let new_page = builder.build()?;
633            let accessor = BranchAccessor::new(&new_page, K::fixed_width());
634            // Merge when less than 33% full. Splits occur when a page is full and produce two 50%
635            // full pages, so we use 33% instead of 50% to avoid oscillating
636            if accessor.total_length() < self.mem.get_page_size() / 3 {
637                PartialBranch(new_page.get_page_number(), DEFERRED)
638            } else {
639                Subtree(new_page.get_page_number(), DEFERRED)
640            }
641        };
642        Ok(result)
643    }
644
645    fn delete_branch_helper(
646        &mut self,
647        page: PageImpl,
648        checksum: Checksum,
649        key: &[u8],
650    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
651        let accessor = BranchAccessor::new(&page, K::fixed_width());
652        let original_page_number = page.get_page_number();
653        let (child_index, child_page_number) = accessor.child_for_key::<K>(key);
654        let child_checksum = accessor.child_checksum(child_index).unwrap();
655        let (result, found) =
656            self.delete_helper(self.mem.get_page(child_page_number)?, child_checksum, key)?;
657        if found.is_none() {
658            return Ok((Subtree(original_page_number, checksum), None));
659        }
660        if let Subtree(new_child, new_child_checksum) = result {
661            let result_page =
662                if self.mem.uncommitted(original_page_number) && self.modify_uncommitted {
663                    drop(page);
664                    let mut mutpage = self.mem.get_page_mut(original_page_number)?;
665                    let mut mutator = BranchMutator::new(&mut mutpage);
666                    mutator.write_child_page(child_index, new_child, new_child_checksum);
667                    original_page_number
668                } else {
669                    let mut builder =
670                        BranchBuilder::new(&self.mem, accessor.count_children(), K::fixed_width());
671                    builder.push_all(&accessor);
672                    builder.replace_child(child_index, new_child, new_child_checksum);
673                    let new_page = builder.build()?;
674                    self.conditional_free(original_page_number);
675                    new_page.get_page_number()
676                };
677            return Ok((Subtree(result_page, DEFERRED), found));
678        }
679
680        // Child is requesting to be merged with a sibling
681        let mut builder =
682            BranchBuilder::new(&self.mem, accessor.count_children(), K::fixed_width());
683
684        let final_result = match result {
685            Subtree(_, _) => {
686                // Handled in the if above
687                unreachable!();
688            }
689            DeletedLeaf => {
690                for i in 0..accessor.count_children() {
691                    if i == child_index {
692                        continue;
693                    }
694                    builder.push_child(
695                        accessor.child_page(i).unwrap(),
696                        accessor.child_checksum(i).unwrap(),
697                    );
698                }
699                let end = if child_index == accessor.count_children() - 1 {
700                    // Skip the last key, which precedes the child
701                    accessor.count_children() - 2
702                } else {
703                    accessor.count_children() - 1
704                };
705                for i in 0..end {
706                    if i == child_index {
707                        continue;
708                    }
709                    builder.push_key(accessor.key(i).unwrap());
710                }
711                self.finalize_branch_builder(builder)?
712            }
713            PartialLeaf {
714                page: partial_child_page,
715                deleted_pair,
716            } => {
717                let partial_child_accessor =
718                    LeafAccessor::new(&partial_child_page, K::fixed_width(), V::fixed_width());
719                debug_assert!(partial_child_accessor.num_pairs() > 1);
720
721                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
722                debug_assert!(merge_with < accessor.count_children());
723                let merge_with_page = self
724                    .mem
725                    .get_page(accessor.child_page(merge_with).unwrap())?;
726                let merge_with_accessor =
727                    LeafAccessor::new(merge_with_page.memory(), K::fixed_width(), V::fixed_width());
728
729                let single_large_value = merge_with_accessor.num_pairs() == 1
730                    && merge_with_accessor.total_length() >= self.mem.get_page_size();
731                // Don't try to merge or rebalance, if the sibling contains a single large value
732                if single_large_value {
733                    let mut child_builder = LeafBuilder::new(
734                        &self.mem,
735                        partial_child_accessor.num_pairs() - 1,
736                        K::fixed_width(),
737                        V::fixed_width(),
738                    );
739                    child_builder.push_all_except(&partial_child_accessor, Some(deleted_pair));
740                    let new_page = child_builder.build()?;
741                    builder.push_all(&accessor);
742                    builder.replace_child(child_index, new_page.get_page_number(), DEFERRED);
743
744                    let result = self.finalize_branch_builder(builder)?;
745
746                    drop(page);
747                    self.conditional_free(original_page_number);
748                    // child_page_number does not need to be freed, because it's a leaf and the
749                    // MutAccessGuard will free it
750
751                    return Ok((result, found));
752                }
753
754                for i in 0..accessor.count_children() {
755                    if i == child_index {
756                        continue;
757                    }
758                    let page_number = accessor.child_page(i).unwrap();
759                    let page_checksum = accessor.child_checksum(i).unwrap();
760                    if i == merge_with {
761                        let mut child_builder = LeafBuilder::new(
762                            &self.mem,
763                            partial_child_accessor.num_pairs() - 1
764                                + merge_with_accessor.num_pairs(),
765                            K::fixed_width(),
766                            V::fixed_width(),
767                        );
768                        if child_index < merge_with {
769                            child_builder
770                                .push_all_except(&partial_child_accessor, Some(deleted_pair));
771                        }
772                        child_builder.push_all_except(&merge_with_accessor, None);
773                        if child_index > merge_with {
774                            child_builder
775                                .push_all_except(&partial_child_accessor, Some(deleted_pair));
776                        }
777                        if child_builder.should_split() {
778                            let (new_page1, split_key, new_page2) = child_builder.build_split()?;
779                            builder.push_key(split_key);
780                            builder.push_child(new_page1.get_page_number(), DEFERRED);
781                            builder.push_child(new_page2.get_page_number(), DEFERRED);
782                        } else {
783                            let new_page = child_builder.build()?;
784                            builder.push_child(new_page.get_page_number(), DEFERRED);
785                        }
786
787                        let merged_key_index = max(child_index, merge_with);
788                        if merged_key_index < accessor.count_children() - 1 {
789                            builder.push_key(accessor.key(merged_key_index).unwrap());
790                        }
791                    } else {
792                        builder.push_child(page_number, page_checksum);
793                        if i < accessor.count_children() - 1 {
794                            builder.push_key(accessor.key(i).unwrap());
795                        }
796                    }
797                }
798
799                let result = self.finalize_branch_builder(builder)?;
800
801                let page_number = merge_with_page.get_page_number();
802                drop(merge_with_page);
803                self.conditional_free(page_number);
804                // child_page_number does not need to be freed, because it's a leaf and the
805                // MutAccessGuard will free it
806
807                result
808            }
809            DeletedBranch(only_grandchild, grandchild_checksum) => {
810                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
811                let merge_with_page = self
812                    .mem
813                    .get_page(accessor.child_page(merge_with).unwrap())?;
814                let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
815                debug_assert!(merge_with < accessor.count_children());
816                for i in 0..accessor.count_children() {
817                    if i == child_index {
818                        continue;
819                    }
820                    let page_number = accessor.child_page(i).unwrap();
821                    let page_checksum = accessor.child_checksum(i).unwrap();
822                    if i == merge_with {
823                        let mut child_builder = BranchBuilder::new(
824                            &self.mem,
825                            merge_with_accessor.count_children() + 1,
826                            K::fixed_width(),
827                        );
828                        let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
829                        if child_index < merge_with {
830                            child_builder.push_child(only_grandchild, grandchild_checksum);
831                            child_builder.push_key(separator_key);
832                        }
833                        child_builder.push_all(&merge_with_accessor);
834                        if child_index > merge_with {
835                            child_builder.push_key(separator_key);
836                            child_builder.push_child(only_grandchild, grandchild_checksum);
837                        }
838                        if child_builder.should_split() {
839                            let (new_page1, separator, new_page2) = child_builder.build_split()?;
840                            builder.push_child(new_page1.get_page_number(), DEFERRED);
841                            builder.push_key(separator);
842                            builder.push_child(new_page2.get_page_number(), DEFERRED);
843                        } else {
844                            let new_page = child_builder.build()?;
845                            builder.push_child(new_page.get_page_number(), DEFERRED);
846                        }
847
848                        let merged_key_index = max(child_index, merge_with);
849                        if merged_key_index < accessor.count_children() - 1 {
850                            builder.push_key(accessor.key(merged_key_index).unwrap());
851                        }
852                    } else {
853                        builder.push_child(page_number, page_checksum);
854                        if i < accessor.count_children() - 1 {
855                            builder.push_key(accessor.key(i).unwrap());
856                        }
857                    }
858                }
859                let result = self.finalize_branch_builder(builder)?;
860
861                let page_number = merge_with_page.get_page_number();
862                drop(merge_with_page);
863                self.conditional_free(page_number);
864
865                result
866            }
867            PartialBranch(partial_child, ..) => {
868                let partial_child_page = self.mem.get_page(partial_child)?;
869                let partial_child_accessor =
870                    BranchAccessor::new(&partial_child_page, K::fixed_width());
871                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
872                let merge_with_page = self
873                    .mem
874                    .get_page(accessor.child_page(merge_with).unwrap())?;
875                let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
876                debug_assert!(merge_with < accessor.count_children());
877                for i in 0..accessor.count_children() {
878                    if i == child_index {
879                        continue;
880                    }
881                    let page_number = accessor.child_page(i).unwrap();
882                    let page_checksum = accessor.child_checksum(i).unwrap();
883                    if i == merge_with {
884                        let mut child_builder = BranchBuilder::new(
885                            &self.mem,
886                            merge_with_accessor.count_children()
887                                + partial_child_accessor.count_children(),
888                            K::fixed_width(),
889                        );
890                        let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
891                        if child_index < merge_with {
892                            child_builder.push_all(&partial_child_accessor);
893                            child_builder.push_key(separator_key);
894                        }
895                        child_builder.push_all(&merge_with_accessor);
896                        if child_index > merge_with {
897                            child_builder.push_key(separator_key);
898                            child_builder.push_all(&partial_child_accessor);
899                        }
900                        if child_builder.should_split() {
901                            let (new_page1, separator, new_page2) = child_builder.build_split()?;
902                            builder.push_child(new_page1.get_page_number(), DEFERRED);
903                            builder.push_key(separator);
904                            builder.push_child(new_page2.get_page_number(), DEFERRED);
905                        } else {
906                            let new_page = child_builder.build()?;
907                            builder.push_child(new_page.get_page_number(), DEFERRED);
908                        }
909
910                        let merged_key_index = max(child_index, merge_with);
911                        if merged_key_index < accessor.count_children() - 1 {
912                            builder.push_key(accessor.key(merged_key_index).unwrap());
913                        }
914                    } else {
915                        builder.push_child(page_number, page_checksum);
916                        if i < accessor.count_children() - 1 {
917                            builder.push_key(accessor.key(i).unwrap());
918                        }
919                    }
920                }
921                let result = self.finalize_branch_builder(builder)?;
922
923                let page_number = merge_with_page.get_page_number();
924                drop(merge_with_page);
925                self.conditional_free(page_number);
926                drop(partial_child_page);
927                self.conditional_free(partial_child);
928
929                result
930            }
931        };
932
933        drop(page);
934        self.conditional_free(original_page_number);
935
936        Ok((final_result, found))
937    }
938
939    // Returns the page number of the sub-tree with this key deleted, or None if the sub-tree is empty.
940    // If key is not found, guaranteed not to modify the tree
941    fn delete_helper(
942        &mut self,
943        page: PageImpl,
944        checksum: Checksum,
945        key: &[u8],
946    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
947        let node_mem = page.memory();
948        match node_mem[0] {
949            LEAF => self.delete_leaf_helper(page, checksum, key),
950            BRANCH => self.delete_branch_helper(page, checksum, key),
951            _ => unreachable!(),
952        }
953    }
954}