redb/tree_store/
btree_mutator.rs

1use crate::tree_store::btree_base::{
2    BRANCH, BranchAccessor, BranchBuilder, BranchMutator, Checksum, DEFERRED, LEAF, LeafAccessor,
3    LeafBuilder, LeafMutator,
4};
5use crate::tree_store::btree_mutator::DeletionResult::{
6    DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree,
7};
8use crate::tree_store::page_store::{Page, PageImpl, PageMut};
9use crate::tree_store::{
10    AccessGuardMut, BtreeHeader, PageNumber, PageTrackerPolicy, RawLeafBuilder, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{AccessGuard, Result};
14use std::cmp::{max, min};
15use std::marker::PhantomData;
16use std::sync::{Arc, Mutex};
17
18// TODO: it seems like Checksum can be removed from most/all of these, now that we're using deferred checksums
19#[derive(Debug)]
20enum DeletionResult {
21    // A proper subtree
22    Subtree(PageNumber, Checksum),
23    // A leaf with zero children
24    DeletedLeaf,
25    // A leaf with fewer entries than desired
26    PartialLeaf {
27        page: Arc<[u8]>,
28        deleted_pair: usize,
29    },
30    // A branch page subtree with fewer children than desired
31    PartialBranch(PageNumber, Checksum),
32    // Indicates that the branch node was deleted, and includes the only remaining child
33    DeletedBranch(PageNumber, Checksum),
34}
35
36struct InsertionResult<'a, V: Value + 'static> {
37    // the new root page
38    new_root: PageNumber,
39    // checksum of the root page
40    root_checksum: Checksum,
41    // Following sibling, if the root had to be split
42    additional_sibling: Option<(Vec<u8>, PageNumber, Checksum)>,
43    // The inserted value for .insert_reserve() to use
44    inserted_value: AccessGuardMut<'a, V>,
45    // The previous value, if any
46    old_value: Option<AccessGuard<'a, V>>,
47}
48
49pub(crate) struct MutateHelper<'a, 'b, K: Key, V: Value> {
50    root: &'b mut Option<BtreeHeader>,
51    modify_uncommitted: bool,
52    mem: Arc<TransactionalMemory>,
53    freed: &'b mut Vec<PageNumber>,
54    allocated: Arc<Mutex<PageTrackerPolicy>>,
55    _key_type: PhantomData<K>,
56    _value_type: PhantomData<V>,
57    _lifetime: PhantomData<&'a ()>,
58}
59
60impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
61    pub(crate) fn new(
62        root: &'b mut Option<BtreeHeader>,
63        mem: Arc<TransactionalMemory>,
64        freed: &'b mut Vec<PageNumber>,
65        allocated: Arc<Mutex<PageTrackerPolicy>>,
66    ) -> Self {
67        Self {
68            root,
69            modify_uncommitted: true,
70            mem,
71            freed,
72            allocated,
73            _key_type: Default::default(),
74            _value_type: Default::default(),
75            _lifetime: Default::default(),
76        }
77    }
78
79    // Creates a new mutator which will not modify any existing uncommitted pages, or free any existing pages.
80    // It will still queue pages for future freeing in the freed vec
81    pub(crate) fn new_do_not_modify(
82        root: &'b mut Option<BtreeHeader>,
83        mem: Arc<TransactionalMemory>,
84        freed: &'b mut Vec<PageNumber>,
85        allocated: Arc<Mutex<PageTrackerPolicy>>,
86    ) -> Self {
87        Self {
88            root,
89            modify_uncommitted: false,
90            mem,
91            freed,
92            allocated,
93            _key_type: Default::default(),
94            _value_type: Default::default(),
95            _lifetime: Default::default(),
96        }
97    }
98
99    fn conditional_free(&mut self, page_number: PageNumber) {
100        if self.modify_uncommitted {
101            let mut allocated = self.allocated.lock().unwrap();
102            if !self.mem.free_if_uncommitted(page_number, &mut allocated) {
103                self.freed.push(page_number);
104            }
105        } else {
106            self.freed.push(page_number);
107        }
108    }
109
110    pub(crate) fn delete(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'a, V>>> {
111        if let Some(BtreeHeader {
112            root: p,
113            checksum,
114            length,
115        }) = *self.root
116        {
117            let (deletion_result, found) =
118                self.delete_helper(self.mem.get_page(p)?, checksum, K::as_bytes(key).as_ref())?;
119            let new_length = if found.is_some() { length - 1 } else { length };
120            let new_root = match deletion_result {
121                Subtree(page, checksum) => Some(BtreeHeader::new(page, checksum, new_length)),
122                DeletedLeaf => None,
123                PartialLeaf { page, deleted_pair } => {
124                    let accessor = LeafAccessor::new(&page, K::fixed_width(), V::fixed_width());
125                    let mut builder = LeafBuilder::new(
126                        &self.mem,
127                        &self.allocated,
128                        accessor.num_pairs() - 1,
129                        K::fixed_width(),
130                        V::fixed_width(),
131                    );
132                    builder.push_all_except(&accessor, Some(deleted_pair));
133                    let page = builder.build()?;
134                    assert_eq!(new_length, accessor.num_pairs() as u64 - 1);
135                    Some(BtreeHeader::new(
136                        page.get_page_number(),
137                        DEFERRED,
138                        new_length,
139                    ))
140                }
141                PartialBranch(page_number, checksum) => {
142                    Some(BtreeHeader::new(page_number, checksum, new_length))
143                }
144                DeletedBranch(remaining_child, checksum) => {
145                    Some(BtreeHeader::new(remaining_child, checksum, new_length))
146                }
147            };
148            *self.root = new_root;
149            Ok(found)
150        } else {
151            Ok(None)
152        }
153    }
154
155    #[allow(clippy::type_complexity)]
156    pub(crate) fn insert(
157        &mut self,
158        key: &K::SelfType<'_>,
159        value: &V::SelfType<'_>,
160    ) -> Result<(Option<AccessGuard<'a, V>>, AccessGuardMut<'a, V>)> {
161        let (new_root, old_value, guard) = if let Some(BtreeHeader {
162            root: p,
163            checksum,
164            length,
165        }) = *self.root
166        {
167            let result = self.insert_helper(
168                self.mem.get_page(p)?,
169                checksum,
170                K::as_bytes(key).as_ref(),
171                V::as_bytes(value).as_ref(),
172            )?;
173
174            let new_length = if result.old_value.is_some() {
175                length
176            } else {
177                length + 1
178            };
179
180            let new_root = if let Some((key, page2, page2_checksum)) = result.additional_sibling {
181                let mut builder =
182                    BranchBuilder::new(&self.mem, &self.allocated, 2, K::fixed_width());
183                builder.push_child(result.new_root, result.root_checksum);
184                builder.push_key(&key);
185                builder.push_child(page2, page2_checksum);
186                let new_page = builder.build()?;
187                BtreeHeader::new(new_page.get_page_number(), DEFERRED, new_length)
188            } else {
189                BtreeHeader::new(result.new_root, result.root_checksum, new_length)
190            };
191            (new_root, result.old_value, result.inserted_value)
192        } else {
193            let key_bytes = K::as_bytes(key);
194            let value_bytes = V::as_bytes(value);
195            let key_bytes = key_bytes.as_ref();
196            let value_bytes = value_bytes.as_ref();
197            let mut builder = LeafBuilder::new(
198                &self.mem,
199                &self.allocated,
200                1,
201                K::fixed_width(),
202                V::fixed_width(),
203            );
204            builder.push(key_bytes, value_bytes);
205            let page = builder.build()?;
206
207            let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
208            let offset = accessor.offset_of_first_value();
209            let page_num = page.get_page_number();
210            let guard = AccessGuardMut::new(page, offset, value_bytes.len());
211
212            (BtreeHeader::new(page_num, DEFERRED, 1), None, guard)
213        };
214        *self.root = Some(new_root);
215        Ok((old_value, guard))
216    }
217
218    fn insert_helper(
219        &mut self,
220        page: PageImpl,
221        page_checksum: Checksum,
222        key: &[u8],
223        value: &[u8],
224    ) -> Result<InsertionResult<'a, V>> {
225        let node_mem = page.memory();
226        Ok(match node_mem[0] {
227            LEAF => {
228                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
229                let (position, found) = accessor.position::<K>(key);
230
231                // Fast-path to avoid re-building and splitting pages with a single large value
232                let single_large_value = accessor.num_pairs() == 1
233                    && accessor.total_length() >= self.mem.get_page_size();
234                if !found && single_large_value {
235                    let mut builder = LeafBuilder::new(
236                        &self.mem,
237                        &self.allocated,
238                        1,
239                        K::fixed_width(),
240                        V::fixed_width(),
241                    );
242                    builder.push(key, value);
243                    let new_page = builder.build()?;
244                    let new_page_number = new_page.get_page_number();
245                    let new_page_accessor =
246                        LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
247                    let offset = new_page_accessor.offset_of_first_value();
248                    let guard = AccessGuardMut::new(new_page, offset, value.len());
249                    return if position == 0 {
250                        Ok(InsertionResult {
251                            new_root: new_page_number,
252                            root_checksum: DEFERRED,
253                            additional_sibling: Some((
254                                key.to_vec(),
255                                page.get_page_number(),
256                                page_checksum,
257                            )),
258                            inserted_value: guard,
259                            old_value: None,
260                        })
261                    } else {
262                        let split_key = accessor.last_entry().key().to_vec();
263                        Ok(InsertionResult {
264                            new_root: page.get_page_number(),
265                            root_checksum: page_checksum,
266                            additional_sibling: Some((split_key, new_page_number, DEFERRED)),
267                            inserted_value: guard,
268                            old_value: None,
269                        })
270                    };
271                }
272
273                // Fast-path for uncommitted pages, that can be modified in-place
274                if self.mem.uncommitted(page.get_page_number())
275                    && self.modify_uncommitted
276                    && LeafMutator::sufficient_insert_inplace_space(
277                        &page,
278                        position,
279                        found,
280                        K::fixed_width(),
281                        V::fixed_width(),
282                        key,
283                        value,
284                    )
285                {
286                    let page_number = page.get_page_number();
287                    let existing_value = if found {
288                        let copied_value = accessor.entry(position).unwrap().value().to_vec();
289                        Some(AccessGuard::with_owned_value(copied_value))
290                    } else {
291                        None
292                    };
293                    drop(page);
294                    let mut page_mut = self.mem.get_page_mut(page_number)?;
295                    let mut mutator =
296                        LeafMutator::new(&mut page_mut, K::fixed_width(), V::fixed_width());
297                    mutator.insert(position, found, key, value);
298                    let new_page_accessor =
299                        LeafAccessor::new(page_mut.memory(), K::fixed_width(), V::fixed_width());
300                    let offset = new_page_accessor.offset_of_value(position).unwrap();
301                    let guard = AccessGuardMut::new(page_mut, offset, value.len());
302                    return Ok(InsertionResult {
303                        new_root: page_number,
304                        root_checksum: DEFERRED,
305                        additional_sibling: None,
306                        inserted_value: guard,
307                        old_value: existing_value,
308                    });
309                }
310
311                let mut builder = LeafBuilder::new(
312                    &self.mem,
313                    &self.allocated,
314                    accessor.num_pairs() + 1,
315                    K::fixed_width(),
316                    V::fixed_width(),
317                );
318                for i in 0..accessor.num_pairs() {
319                    if i == position {
320                        builder.push(key, value);
321                    }
322                    if !found || i != position {
323                        let entry = accessor.entry(i).unwrap();
324                        builder.push(entry.key(), entry.value());
325                    }
326                }
327                if accessor.num_pairs() == position {
328                    builder.push(key, value);
329                }
330                if !builder.should_split() {
331                    let new_page = builder.build()?;
332
333                    let page_number = page.get_page_number();
334                    let existing_value = if found {
335                        let (start, end) = accessor.value_range(position).unwrap();
336                        if self.modify_uncommitted && self.mem.uncommitted(page_number) {
337                            let arc = page.to_arc();
338                            drop(page);
339                            let mut allocated = self.allocated.lock().unwrap();
340                            self.mem.free(page_number, &mut allocated);
341                            Some(AccessGuard::with_arc_page(arc, start..end))
342                        } else {
343                            self.freed.push(page_number);
344                            Some(AccessGuard::with_page(page, start..end))
345                        }
346                    } else {
347                        drop(page);
348                        self.conditional_free(page_number);
349                        None
350                    };
351
352                    let new_page_number = new_page.get_page_number();
353                    let accessor =
354                        LeafAccessor::new(new_page.memory(), K::fixed_width(), V::fixed_width());
355                    let offset = accessor.offset_of_value(position).unwrap();
356                    let guard = AccessGuardMut::new(new_page, offset, value.len());
357
358                    InsertionResult {
359                        new_root: new_page_number,
360                        root_checksum: DEFERRED,
361                        additional_sibling: None,
362                        inserted_value: guard,
363                        old_value: existing_value,
364                    }
365                } else {
366                    let (new_page1, split_key, new_page2) = builder.build_split()?;
367                    let split_key = split_key.to_vec();
368                    let page_number = page.get_page_number();
369                    let existing_value = if found {
370                        let (start, end) = accessor.value_range(position).unwrap();
371                        if self.modify_uncommitted && self.mem.uncommitted(page_number) {
372                            let arc = page.to_arc();
373                            drop(page);
374                            let mut allocated = self.allocated.lock().unwrap();
375                            self.mem.free(page_number, &mut allocated);
376                            Some(AccessGuard::with_arc_page(arc, start..end))
377                        } else {
378                            self.freed.push(page_number);
379                            Some(AccessGuard::with_page(page, start..end))
380                        }
381                    } else {
382                        drop(page);
383                        self.conditional_free(page_number);
384                        None
385                    };
386
387                    let new_page_number = new_page1.get_page_number();
388                    let new_page_number2 = new_page2.get_page_number();
389                    let accessor =
390                        LeafAccessor::new(new_page1.memory(), K::fixed_width(), V::fixed_width());
391                    let division = accessor.num_pairs();
392                    let guard = if position < division {
393                        let accessor = LeafAccessor::new(
394                            new_page1.memory(),
395                            K::fixed_width(),
396                            V::fixed_width(),
397                        );
398                        let offset = accessor.offset_of_value(position).unwrap();
399                        AccessGuardMut::new(new_page1, offset, value.len())
400                    } else {
401                        let accessor = LeafAccessor::new(
402                            new_page2.memory(),
403                            K::fixed_width(),
404                            V::fixed_width(),
405                        );
406                        let offset = accessor.offset_of_value(position - division).unwrap();
407                        AccessGuardMut::new(new_page2, offset, value.len())
408                    };
409
410                    InsertionResult {
411                        new_root: new_page_number,
412                        root_checksum: DEFERRED,
413                        additional_sibling: Some((split_key, new_page_number2, DEFERRED)),
414                        inserted_value: guard,
415                        old_value: existing_value,
416                    }
417                }
418            }
419            BRANCH => {
420                let accessor = BranchAccessor::new(&page, K::fixed_width());
421                let (child_index, child_page) = accessor.child_for_key::<K>(key);
422                let child_checksum = accessor.child_checksum(child_index).unwrap();
423                let sub_result =
424                    self.insert_helper(self.mem.get_page(child_page)?, child_checksum, key, value)?;
425
426                if sub_result.additional_sibling.is_none()
427                    && self.modify_uncommitted
428                    && self.mem.uncommitted(page.get_page_number())
429                {
430                    let page_number = page.get_page_number();
431                    drop(page);
432                    let mut mutpage = self.mem.get_page_mut(page_number)?;
433                    let mut mutator = BranchMutator::new(&mut mutpage);
434                    mutator.write_child_page(
435                        child_index,
436                        sub_result.new_root,
437                        sub_result.root_checksum,
438                    );
439                    return Ok(InsertionResult {
440                        new_root: mutpage.get_page_number(),
441                        root_checksum: DEFERRED,
442                        additional_sibling: None,
443                        inserted_value: sub_result.inserted_value,
444                        old_value: sub_result.old_value,
445                    });
446                }
447
448                // A child was added, or we couldn't use the fast-path above
449                let mut builder = BranchBuilder::new(
450                    &self.mem,
451                    &self.allocated,
452                    accessor.count_children() + 1,
453                    K::fixed_width(),
454                );
455                if child_index == 0 {
456                    builder.push_child(sub_result.new_root, sub_result.root_checksum);
457                    if let Some((ref index_key2, page2, page2_checksum)) =
458                        sub_result.additional_sibling
459                    {
460                        builder.push_key(index_key2);
461                        builder.push_child(page2, page2_checksum);
462                    }
463                } else {
464                    builder.push_child(
465                        accessor.child_page(0).unwrap(),
466                        accessor.child_checksum(0).unwrap(),
467                    );
468                }
469                for i in 1..accessor.count_children() {
470                    if let Some(key) = accessor.key(i - 1) {
471                        builder.push_key(key);
472                        if i == child_index {
473                            builder.push_child(sub_result.new_root, sub_result.root_checksum);
474                            if let Some((ref index_key2, page2, page2_checksum)) =
475                                sub_result.additional_sibling
476                            {
477                                builder.push_key(index_key2);
478                                builder.push_child(page2, page2_checksum);
479                            }
480                        } else {
481                            builder.push_child(
482                                accessor.child_page(i).unwrap(),
483                                accessor.child_checksum(i).unwrap(),
484                            );
485                        }
486                    } else {
487                        unreachable!();
488                    }
489                }
490
491                let result = if builder.should_split() {
492                    let (new_page1, split_key, new_page2) = builder.build_split()?;
493                    InsertionResult {
494                        new_root: new_page1.get_page_number(),
495                        root_checksum: DEFERRED,
496                        additional_sibling: Some((
497                            split_key.to_vec(),
498                            new_page2.get_page_number(),
499                            DEFERRED,
500                        )),
501                        inserted_value: sub_result.inserted_value,
502                        old_value: sub_result.old_value,
503                    }
504                } else {
505                    let new_page = builder.build()?;
506                    InsertionResult {
507                        new_root: new_page.get_page_number(),
508                        root_checksum: DEFERRED,
509                        additional_sibling: None,
510                        inserted_value: sub_result.inserted_value,
511                        old_value: sub_result.old_value,
512                    }
513                };
514                // Free the original page, since we've replaced it
515                let page_number = page.get_page_number();
516                drop(page);
517                self.conditional_free(page_number);
518
519                result
520            }
521            _ => unreachable!(),
522        })
523    }
524
525    pub(crate) fn insert_inplace(
526        &mut self,
527        key: &K::SelfType<'_>,
528        value: &V::SelfType<'_>,
529    ) -> Result<()> {
530        assert!(self.modify_uncommitted);
531        let header = self.root.expect("Key not found (tree is empty)");
532        self.insert_inplace_helper(
533            self.mem.get_page_mut(header.root)?,
534            K::as_bytes(key).as_ref(),
535            V::as_bytes(value).as_ref(),
536        )?;
537        *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length));
538        Ok(())
539    }
540
541    fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> {
542        assert!(self.mem.uncommitted(page.get_page_number()));
543
544        let node_mem = page.memory();
545        match node_mem[0] {
546            LEAF => {
547                let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
548                let (position, found) = accessor.position::<K>(key);
549                assert!(found);
550                let old_len = accessor.entry(position).unwrap().value().len();
551                assert!(value.len() <= old_len);
552                let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width());
553                mutator.insert(position, true, key, value);
554            }
555            BRANCH => {
556                let accessor = BranchAccessor::new(&page, K::fixed_width());
557                let (child_index, child_page) = accessor.child_for_key::<K>(key);
558                self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?;
559                let mut mutator = BranchMutator::new(&mut page);
560                mutator.write_child_page(child_index, child_page, DEFERRED);
561            }
562            _ => unreachable!(),
563        }
564
565        Ok(())
566    }
567
568    fn delete_leaf_helper(
569        &mut self,
570        page: PageImpl,
571        checksum: Checksum,
572        key: &[u8],
573    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
574        let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width());
575        let (position, found) = accessor.position::<K>(key);
576        if !found {
577            return Ok((Subtree(page.get_page_number(), checksum), None));
578        }
579        let new_kv_bytes = accessor.length_of_pairs(0, accessor.num_pairs())
580            - accessor.length_of_pairs(position, position + 1);
581        let new_required_bytes = RawLeafBuilder::required_bytes(
582            accessor.num_pairs() - 1,
583            new_kv_bytes,
584            K::fixed_width(),
585            V::fixed_width(),
586        );
587        let uncommitted = self.mem.uncommitted(page.get_page_number());
588
589        // Fast-path for dirty pages
590        if uncommitted
591            && self.modify_uncommitted
592            && new_required_bytes >= self.mem.get_page_size() / 2
593            && accessor.num_pairs() > 1
594        {
595            let (start, end) = accessor.value_range(position).unwrap();
596            let page_number = page.get_page_number();
597            drop(page);
598            let page_mut = self.mem.get_page_mut(page_number)?;
599
600            let guard = AccessGuard::remove_on_drop(
601                page_mut,
602                start,
603                end - start,
604                position,
605                K::fixed_width(),
606            );
607            return Ok((Subtree(page_number, DEFERRED), Some(guard)));
608        }
609
610        let result = if accessor.num_pairs() == 1 {
611            DeletedLeaf
612        } else if new_required_bytes < self.mem.get_page_size() / 3 {
613            // Merge when less than 33% full. Splits occur when a page is full and produce two 50%
614            // full pages, so we use 33% instead of 50% to avoid oscillating
615            PartialLeaf {
616                page: page.to_arc(),
617                deleted_pair: position,
618            }
619        } else {
620            let mut builder = LeafBuilder::new(
621                &self.mem,
622                &self.allocated,
623                accessor.num_pairs() - 1,
624                K::fixed_width(),
625                V::fixed_width(),
626            );
627            for i in 0..accessor.num_pairs() {
628                if i == position {
629                    continue;
630                }
631                let entry = accessor.entry(i).unwrap();
632                builder.push(entry.key(), entry.value());
633            }
634            let new_page = builder.build()?;
635            Subtree(new_page.get_page_number(), DEFERRED)
636        };
637        let (start, end) = accessor.value_range(position).unwrap();
638        let guard = if uncommitted && self.modify_uncommitted {
639            let page_number = page.get_page_number();
640            let arc = page.to_arc();
641            drop(page);
642            let mut allocated = self.allocated.lock().unwrap();
643            self.mem.free(page_number, &mut allocated);
644            Some(AccessGuard::with_arc_page(arc, start..end))
645        } else {
646            // Won't be freed until the end of the transaction, so returning the page
647            // in the AccessGuard below is still safe
648            self.freed.push(page.get_page_number());
649            Some(AccessGuard::with_page(page, start..end))
650        };
651        Ok((result, guard))
652    }
653
654    fn finalize_branch_builder(
655        builder: BranchBuilder<'_, '_>,
656        page_size: usize,
657    ) -> Result<DeletionResult> {
658        let result = if let Some((only_child, checksum)) = builder.to_single_child() {
659            DeletedBranch(only_child, checksum)
660        } else {
661            // TODO: can we optimize away this page allocation?
662            // The PartialInternal gets returned, and then the caller has to merge it immediately
663            let new_page = builder.build()?;
664            let accessor = BranchAccessor::new(&new_page, K::fixed_width());
665            // Merge when less than 33% full. Splits occur when a page is full and produce two 50%
666            // full pages, so we use 33% instead of 50% to avoid oscillating
667            if accessor.total_length() < page_size / 3 {
668                PartialBranch(new_page.get_page_number(), DEFERRED)
669            } else {
670                Subtree(new_page.get_page_number(), DEFERRED)
671            }
672        };
673        Ok(result)
674    }
675
676    fn delete_branch_helper(
677        &mut self,
678        page: PageImpl,
679        checksum: Checksum,
680        key: &[u8],
681    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
682        let accessor = BranchAccessor::new(&page, K::fixed_width());
683        let original_page_number = page.get_page_number();
684        let (child_index, child_page_number) = accessor.child_for_key::<K>(key);
685        let child_checksum = accessor.child_checksum(child_index).unwrap();
686        let (result, found) =
687            self.delete_helper(self.mem.get_page(child_page_number)?, child_checksum, key)?;
688        if found.is_none() {
689            return Ok((Subtree(original_page_number, checksum), None));
690        }
691        if let Subtree(new_child, new_child_checksum) = result {
692            let result_page =
693                if self.mem.uncommitted(original_page_number) && self.modify_uncommitted {
694                    drop(page);
695                    let mut mutpage = self.mem.get_page_mut(original_page_number)?;
696                    let mut mutator = BranchMutator::new(&mut mutpage);
697                    mutator.write_child_page(child_index, new_child, new_child_checksum);
698                    original_page_number
699                } else {
700                    let mut builder = BranchBuilder::new(
701                        &self.mem,
702                        &self.allocated,
703                        accessor.count_children(),
704                        K::fixed_width(),
705                    );
706                    builder.push_all(&accessor);
707                    builder.replace_child(child_index, new_child, new_child_checksum);
708                    let new_page = builder.build()?;
709                    self.conditional_free(original_page_number);
710                    new_page.get_page_number()
711                };
712            return Ok((Subtree(result_page, DEFERRED), found));
713        }
714
715        // Child is requesting to be merged with a sibling
716        let mut builder = BranchBuilder::new(
717            &self.mem,
718            &self.allocated,
719            accessor.count_children(),
720            K::fixed_width(),
721        );
722
723        let final_result = match result {
724            Subtree(_, _) => {
725                // Handled in the if above
726                unreachable!();
727            }
728            DeletedLeaf => {
729                for i in 0..accessor.count_children() {
730                    if i == child_index {
731                        continue;
732                    }
733                    builder.push_child(
734                        accessor.child_page(i).unwrap(),
735                        accessor.child_checksum(i).unwrap(),
736                    );
737                }
738                let end = if child_index == accessor.count_children() - 1 {
739                    // Skip the last key, which precedes the child
740                    accessor.count_children() - 2
741                } else {
742                    accessor.count_children() - 1
743                };
744                for i in 0..end {
745                    if i == child_index {
746                        continue;
747                    }
748                    builder.push_key(accessor.key(i).unwrap());
749                }
750                Self::finalize_branch_builder(builder, self.mem.get_page_size())?
751            }
752            PartialLeaf {
753                page: partial_child_page,
754                deleted_pair,
755            } => {
756                let partial_child_accessor =
757                    LeafAccessor::new(&partial_child_page, K::fixed_width(), V::fixed_width());
758                debug_assert!(partial_child_accessor.num_pairs() > 1);
759
760                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
761                debug_assert!(merge_with < accessor.count_children());
762                let merge_with_page = self
763                    .mem
764                    .get_page(accessor.child_page(merge_with).unwrap())?;
765                let merge_with_accessor =
766                    LeafAccessor::new(merge_with_page.memory(), K::fixed_width(), V::fixed_width());
767
768                let single_large_value = merge_with_accessor.num_pairs() == 1
769                    && merge_with_accessor.total_length() >= self.mem.get_page_size();
770                // Don't try to merge or rebalance, if the sibling contains a single large value
771                if single_large_value {
772                    let mut child_builder = LeafBuilder::new(
773                        &self.mem,
774                        &self.allocated,
775                        partial_child_accessor.num_pairs() - 1,
776                        K::fixed_width(),
777                        V::fixed_width(),
778                    );
779                    child_builder.push_all_except(&partial_child_accessor, Some(deleted_pair));
780                    let new_page = child_builder.build()?;
781                    builder.push_all(&accessor);
782                    builder.replace_child(child_index, new_page.get_page_number(), DEFERRED);
783
784                    let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
785
786                    drop(page);
787                    self.conditional_free(original_page_number);
788                    // child_page_number does not need to be freed, because it's a leaf and the
789                    // MutAccessGuard will free it
790
791                    return Ok((result, found));
792                }
793
794                for i in 0..accessor.count_children() {
795                    if i == child_index {
796                        continue;
797                    }
798                    let page_number = accessor.child_page(i).unwrap();
799                    let page_checksum = accessor.child_checksum(i).unwrap();
800                    if i == merge_with {
801                        let mut child_builder = LeafBuilder::new(
802                            &self.mem,
803                            &self.allocated,
804                            partial_child_accessor.num_pairs() - 1
805                                + merge_with_accessor.num_pairs(),
806                            K::fixed_width(),
807                            V::fixed_width(),
808                        );
809                        if child_index < merge_with {
810                            child_builder
811                                .push_all_except(&partial_child_accessor, Some(deleted_pair));
812                        }
813                        child_builder.push_all_except(&merge_with_accessor, None);
814                        if child_index > merge_with {
815                            child_builder
816                                .push_all_except(&partial_child_accessor, Some(deleted_pair));
817                        }
818                        if child_builder.should_split() {
819                            let (new_page1, split_key, new_page2) = child_builder.build_split()?;
820                            builder.push_key(split_key);
821                            builder.push_child(new_page1.get_page_number(), DEFERRED);
822                            builder.push_child(new_page2.get_page_number(), DEFERRED);
823                        } else {
824                            let new_page = child_builder.build()?;
825                            builder.push_child(new_page.get_page_number(), DEFERRED);
826                        }
827
828                        let merged_key_index = max(child_index, merge_with);
829                        if merged_key_index < accessor.count_children() - 1 {
830                            builder.push_key(accessor.key(merged_key_index).unwrap());
831                        }
832                    } else {
833                        builder.push_child(page_number, page_checksum);
834                        if i < accessor.count_children() - 1 {
835                            builder.push_key(accessor.key(i).unwrap());
836                        }
837                    }
838                }
839
840                let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
841
842                let page_number = merge_with_page.get_page_number();
843                drop(merge_with_page);
844                self.conditional_free(page_number);
845                // child_page_number does not need to be freed, because it's a leaf and the
846                // MutAccessGuard will free it
847
848                result
849            }
850            DeletedBranch(only_grandchild, grandchild_checksum) => {
851                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
852                let merge_with_page = self
853                    .mem
854                    .get_page(accessor.child_page(merge_with).unwrap())?;
855                let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
856                debug_assert!(merge_with < accessor.count_children());
857                for i in 0..accessor.count_children() {
858                    if i == child_index {
859                        continue;
860                    }
861                    let page_number = accessor.child_page(i).unwrap();
862                    let page_checksum = accessor.child_checksum(i).unwrap();
863                    if i == merge_with {
864                        let mut child_builder = BranchBuilder::new(
865                            &self.mem,
866                            &self.allocated,
867                            merge_with_accessor.count_children() + 1,
868                            K::fixed_width(),
869                        );
870                        let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
871                        if child_index < merge_with {
872                            child_builder.push_child(only_grandchild, grandchild_checksum);
873                            child_builder.push_key(separator_key);
874                        }
875                        child_builder.push_all(&merge_with_accessor);
876                        if child_index > merge_with {
877                            child_builder.push_key(separator_key);
878                            child_builder.push_child(only_grandchild, grandchild_checksum);
879                        }
880                        if child_builder.should_split() {
881                            let (new_page1, separator, new_page2) = child_builder.build_split()?;
882                            builder.push_child(new_page1.get_page_number(), DEFERRED);
883                            builder.push_key(separator);
884                            builder.push_child(new_page2.get_page_number(), DEFERRED);
885                        } else {
886                            let new_page = child_builder.build()?;
887                            builder.push_child(new_page.get_page_number(), DEFERRED);
888                        }
889
890                        let merged_key_index = max(child_index, merge_with);
891                        if merged_key_index < accessor.count_children() - 1 {
892                            builder.push_key(accessor.key(merged_key_index).unwrap());
893                        }
894                    } else {
895                        builder.push_child(page_number, page_checksum);
896                        if i < accessor.count_children() - 1 {
897                            builder.push_key(accessor.key(i).unwrap());
898                        }
899                    }
900                }
901                let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
902
903                let page_number = merge_with_page.get_page_number();
904                drop(merge_with_page);
905                self.conditional_free(page_number);
906
907                result
908            }
909            PartialBranch(partial_child, ..) => {
910                let partial_child_page = self.mem.get_page(partial_child)?;
911                let partial_child_accessor =
912                    BranchAccessor::new(&partial_child_page, K::fixed_width());
913                let merge_with = if child_index == 0 { 1 } else { child_index - 1 };
914                let merge_with_page = self
915                    .mem
916                    .get_page(accessor.child_page(merge_with).unwrap())?;
917                let merge_with_accessor = BranchAccessor::new(&merge_with_page, K::fixed_width());
918                debug_assert!(merge_with < accessor.count_children());
919                for i in 0..accessor.count_children() {
920                    if i == child_index {
921                        continue;
922                    }
923                    let page_number = accessor.child_page(i).unwrap();
924                    let page_checksum = accessor.child_checksum(i).unwrap();
925                    if i == merge_with {
926                        let mut child_builder = BranchBuilder::new(
927                            &self.mem,
928                            &self.allocated,
929                            merge_with_accessor.count_children()
930                                + partial_child_accessor.count_children(),
931                            K::fixed_width(),
932                        );
933                        let separator_key = accessor.key(min(child_index, merge_with)).unwrap();
934                        if child_index < merge_with {
935                            child_builder.push_all(&partial_child_accessor);
936                            child_builder.push_key(separator_key);
937                        }
938                        child_builder.push_all(&merge_with_accessor);
939                        if child_index > merge_with {
940                            child_builder.push_key(separator_key);
941                            child_builder.push_all(&partial_child_accessor);
942                        }
943                        if child_builder.should_split() {
944                            let (new_page1, separator, new_page2) = child_builder.build_split()?;
945                            builder.push_child(new_page1.get_page_number(), DEFERRED);
946                            builder.push_key(separator);
947                            builder.push_child(new_page2.get_page_number(), DEFERRED);
948                        } else {
949                            let new_page = child_builder.build()?;
950                            builder.push_child(new_page.get_page_number(), DEFERRED);
951                        }
952
953                        let merged_key_index = max(child_index, merge_with);
954                        if merged_key_index < accessor.count_children() - 1 {
955                            builder.push_key(accessor.key(merged_key_index).unwrap());
956                        }
957                    } else {
958                        builder.push_child(page_number, page_checksum);
959                        if i < accessor.count_children() - 1 {
960                            builder.push_key(accessor.key(i).unwrap());
961                        }
962                    }
963                }
964                let result = Self::finalize_branch_builder(builder, self.mem.get_page_size())?;
965
966                let page_number = merge_with_page.get_page_number();
967                drop(merge_with_page);
968                self.conditional_free(page_number);
969                drop(partial_child_page);
970                self.conditional_free(partial_child);
971
972                result
973            }
974        };
975
976        drop(page);
977        self.conditional_free(original_page_number);
978
979        Ok((final_result, found))
980    }
981
982    // Returns the page number of the sub-tree with this key deleted, or None if the sub-tree is empty.
983    // If key is not found, guaranteed not to modify the tree
984    fn delete_helper(
985        &mut self,
986        page: PageImpl,
987        checksum: Checksum,
988        key: &[u8],
989    ) -> Result<(DeletionResult, Option<AccessGuard<'a, V>>)> {
990        let node_mem = page.memory();
991        match node_mem[0] {
992            LEAF => self.delete_leaf_helper(page, checksum, key),
993            BRANCH => self.delete_branch_helper(page, checksum, key),
994            _ => unreachable!(),
995        }
996    }
997}