redb/tree_store/
btree_base.rs

1use crate::tree_store::page_store::{xxh3_checksum, Page, PageImpl, PageMut, TransactionalMemory};
2use crate::tree_store::PageNumber;
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::Arc;
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16// Dummy value. Final value will be computed during commit
17pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20    page: &T,
21    fixed_key_size: Option<usize>,
22    fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25    let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26        StorageError::Corrupted(format!(
27            "Leaf page {:?} corrupted. Number of pairs is zero",
28            page.get_page_number()
29        ))
30    })?;
31    let end = accessor.value_end(last_pair).ok_or_else(|| {
32        StorageError::Corrupted(format!(
33            "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34            page.get_page_number(),
35            last_pair,
36        ))
37    })?;
38    if end > page.memory().len() {
39        Err(StorageError::Corrupted(format!(
40            "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41            page.get_page_number(),
42            end,
43            page.memory().len()
44        )))
45    } else {
46        Ok(xxh3_checksum(&page.memory()[..end]))
47    }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51    page: &T,
52    fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54    let accessor = BranchAccessor::new(page, fixed_key_size);
55    let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56        StorageError::Corrupted(format!(
57            "Branch page {:?} corrupted. Number of keys is zero",
58            page.get_page_number()
59        ))
60    })?;
61    let end = accessor.key_end(last_key).ok_or_else(|| {
62        StorageError::Corrupted(format!(
63            "Branch page {:?} corrupted. Can't find offset for key {}",
64            page.get_page_number(),
65            last_key
66        ))
67    })?;
68    if end > page.memory().len() {
69        Err(StorageError::Corrupted(format!(
70            "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71            page.get_page_number(),
72            end,
73            page.memory().len()
74        )))
75    } else {
76        Ok(xxh3_checksum(&page.memory()[..end]))
77    }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82    pub(crate) root: PageNumber,
83    pub(crate) checksum: Checksum,
84    pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88    pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89        Self {
90            root,
91            checksum,
92            length,
93        }
94    }
95
96    pub(crate) const fn serialized_size() -> usize {
97        PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98    }
99
100    pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101        let root =
102            PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103        let mut offset = PageNumber::serialized_size();
104        let checksum = Checksum::from_le_bytes(
105            bytes[offset..(offset + size_of::<Checksum>())]
106                .try_into()
107                .unwrap(),
108        );
109        offset += size_of::<Checksum>();
110        let length = u64::from_le_bytes(
111            bytes[offset..(offset + size_of::<u64>())]
112                .try_into()
113                .unwrap(),
114        );
115
116        Self {
117            root,
118            checksum,
119            length,
120        }
121    }
122
123    pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124        let mut result = [0; Self::serialized_size()];
125        result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126        result[PageNumber::serialized_size()
127            ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128            .copy_from_slice(&self.checksum.to_le_bytes());
129        result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130            .copy_from_slice(&self.length.to_le_bytes());
131
132        result
133    }
134}
135
136enum OnDrop {
137    None,
138    RemoveEntry {
139        position: usize,
140        fixed_key_size: Option<usize>,
141    },
142}
143
144enum EitherPage {
145    Immutable(PageImpl),
146    Mutable(PageMut),
147    OwnedMemory(Vec<u8>),
148    ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152    fn memory(&self) -> &[u8] {
153        match self {
154            EitherPage::Immutable(page) => page.memory(),
155            EitherPage::Mutable(page) => page.memory(),
156            EitherPage::OwnedMemory(mem) => mem.as_slice(),
157            EitherPage::ArcMemory(mem) => mem,
158        }
159    }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163    page: EitherPage,
164    offset: usize,
165    len: usize,
166    on_drop: OnDrop,
167    _value_type: PhantomData<V>,
168    // Used so that logical references into a Table respect the appropriate lifetime
169    _lifetime: PhantomData<&'a ()>,
170}
171
172impl<'a, V: Value + 'static> AccessGuard<'a, V> {
173    pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174        Self {
175            page: EitherPage::Immutable(page),
176            offset: range.start,
177            len: range.len(),
178            on_drop: OnDrop::None,
179            _value_type: Default::default(),
180            _lifetime: Default::default(),
181        }
182    }
183
184    pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185        Self {
186            page: EitherPage::ArcMemory(page),
187            offset: range.start,
188            len: range.len(),
189            on_drop: OnDrop::None,
190            _value_type: Default::default(),
191            _lifetime: Default::default(),
192        }
193    }
194
195    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196        let len = value.len();
197        Self {
198            page: EitherPage::OwnedMemory(value),
199            offset: 0,
200            len,
201            on_drop: OnDrop::None,
202            _value_type: Default::default(),
203            _lifetime: Default::default(),
204        }
205    }
206
207    pub(super) fn remove_on_drop(
208        page: PageMut,
209        offset: usize,
210        len: usize,
211        position: usize,
212        fixed_key_size: Option<usize>,
213    ) -> Self {
214        Self {
215            page: EitherPage::Mutable(page),
216            offset,
217            len,
218            on_drop: OnDrop::RemoveEntry {
219                position,
220                fixed_key_size,
221            },
222            _value_type: Default::default(),
223            _lifetime: Default::default(),
224        }
225    }
226
227    pub fn value(&self) -> V::SelfType<'_> {
228        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229    }
230}
231
232impl<'a, V: Value + 'static> Drop for AccessGuard<'a, V> {
233    fn drop(&mut self) {
234        match self.on_drop {
235            OnDrop::None => {}
236            OnDrop::RemoveEntry {
237                position,
238                fixed_key_size,
239            } => {
240                if let EitherPage::Mutable(ref mut mut_page) = self.page {
241                    let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242                    mutator.remove(position);
243                } else if !thread::panicking() {
244                    unreachable!();
245                }
246            }
247        }
248    }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252    page: PageMut,
253    offset: usize,
254    len: usize,
255    _value_type: PhantomData<V>,
256    // Used so that logical references into a Table respect the appropriate lifetime
257    _lifetime: PhantomData<&'a ()>,
258}
259
260impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
261    pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262        AccessGuardMut {
263            page,
264            offset,
265            len,
266            _value_type: Default::default(),
267            _lifetime: Default::default(),
268        }
269    }
270}
271
272impl<'a, V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'a, V> {
273    fn as_mut(&mut self) -> &mut V::BaseRefType {
274        V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275    }
276}
277
278// Provides a simple zero-copy way to access entries
279pub struct EntryAccessor<'a> {
280    key: &'a [u8],
281    value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286        EntryAccessor { key, value }
287    }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291    pub(crate) fn key(&'b self) -> &'a [u8] {
292        self.key
293    }
294
295    pub(crate) fn value(&'b self) -> &'a [u8] {
296        self.value
297    }
298}
299
300// Provides a simple zero-copy way to access a leaf page
301pub(crate) struct LeafAccessor<'a> {
302    page: &'a [u8],
303    fixed_key_size: Option<usize>,
304    fixed_value_size: Option<usize>,
305    num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309    pub(crate) fn new(
310        page: &'a [u8],
311        fixed_key_size: Option<usize>,
312        fixed_value_size: Option<usize>,
313    ) -> Self {
314        debug_assert_eq!(page[0], LEAF);
315        let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316        LeafAccessor {
317            page,
318            fixed_key_size,
319            fixed_value_size,
320            num_pairs,
321        }
322    }
323
324    pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325        let mut i = 0;
326        while let Some(entry) = self.entry(i) {
327            eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328            if include_value {
329                eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330            }
331            i += 1;
332        }
333    }
334
335    pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336        // inclusive
337        let mut min_entry = 0;
338        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
339        let mut max_entry = self.num_pairs();
340        while min_entry < max_entry {
341            let mid = (min_entry + max_entry) / 2;
342            let key = self.key_unchecked(mid);
343            match K::compare(query, key) {
344                Ordering::Less => {
345                    max_entry = mid;
346                }
347                Ordering::Equal => {
348                    return (mid, true);
349                }
350                Ordering::Greater => {
351                    min_entry = mid + 1;
352                }
353            }
354        }
355        debug_assert_eq!(min_entry, max_entry);
356        (min_entry, false)
357    }
358
359    pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360        let (entry, found) = self.position::<K>(query);
361        if found {
362            Some(entry)
363        } else {
364            None
365        }
366    }
367
368    fn key_section_start(&self) -> usize {
369        let mut offset = 4;
370        if self.fixed_key_size.is_none() {
371            offset += size_of::<u32>() * self.num_pairs;
372        }
373        if self.fixed_value_size.is_none() {
374            offset += size_of::<u32>() * self.num_pairs;
375        }
376
377        offset
378    }
379
380    fn key_start(&self, n: usize) -> Option<usize> {
381        if n == 0 {
382            Some(self.key_section_start())
383        } else {
384            self.key_end(n - 1)
385        }
386    }
387
388    fn key_end(&self, n: usize) -> Option<usize> {
389        if n >= self.num_pairs() {
390            None
391        } else {
392            if let Some(fixed) = self.fixed_key_size {
393                return Some(self.key_section_start() + fixed * (n + 1));
394            }
395            let offset = 4 + size_of::<u32>() * n;
396            let end = u32::from_le_bytes(
397                self.page
398                    .get(offset..(offset + size_of::<u32>()))?
399                    .try_into()
400                    .unwrap(),
401            ) as usize;
402            Some(end)
403        }
404    }
405
406    fn value_start(&self, n: usize) -> Option<usize> {
407        if n == 0 {
408            self.key_end(self.num_pairs() - 1)
409        } else {
410            self.value_end(n - 1)
411        }
412    }
413
414    fn value_end(&self, n: usize) -> Option<usize> {
415        if n >= self.num_pairs() {
416            None
417        } else {
418            if let Some(fixed) = self.fixed_value_size {
419                return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
420            }
421            let mut offset = 4 + size_of::<u32>() * n;
422            if self.fixed_key_size.is_none() {
423                offset += size_of::<u32>() * self.num_pairs;
424            }
425            let end = u32::from_le_bytes(
426                self.page
427                    .get(offset..(offset + size_of::<u32>()))?
428                    .try_into()
429                    .unwrap(),
430            ) as usize;
431            Some(end)
432        }
433    }
434
435    pub(crate) fn num_pairs(&self) -> usize {
436        self.num_pairs
437    }
438
439    pub(super) fn offset_of_first_value(&self) -> usize {
440        self.offset_of_value(0).unwrap()
441    }
442
443    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
444        self.value_start(n)
445    }
446
447    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
448        Some((self.value_start(n)?, self.value_end(n)?))
449    }
450
451    // Returns the length of all keys and values between [start, end)
452    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
453        self.length_of_values(start, end) + self.length_of_keys(start, end)
454    }
455
456    fn length_of_values(&self, start: usize, end: usize) -> usize {
457        if end == 0 {
458            return 0;
459        }
460        let end_offset = self.value_end(end - 1).unwrap();
461        let start_offset = self.value_start(start).unwrap();
462        end_offset - start_offset
463    }
464
465    // Returns the length of all keys between [start, end)
466    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
467        if end == 0 {
468            return 0;
469        }
470        let end_offset = self.key_end(end - 1).unwrap();
471        let start_offset = self.key_start(start).unwrap();
472        end_offset - start_offset
473    }
474
475    pub(crate) fn total_length(&self) -> usize {
476        // Values are stored last
477        self.value_end(self.num_pairs() - 1).unwrap()
478    }
479
480    fn key_unchecked(&self, n: usize) -> &[u8] {
481        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
482    }
483
484    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
485        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
486        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
487        Some(EntryAccessor::new(key, value))
488    }
489
490    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
491        let key = self.key_start(n)?..self.key_end(n)?;
492        let value = self.value_start(n)?..self.value_end(n)?;
493        Some((key, value))
494    }
495
496    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
497        self.entry(self.num_pairs() - 1).unwrap()
498    }
499}
500
501pub(super) struct LeafBuilder<'a, 'b> {
502    pairs: Vec<(&'a [u8], &'a [u8])>,
503    fixed_key_size: Option<usize>,
504    fixed_value_size: Option<usize>,
505    total_key_bytes: usize,
506    total_value_bytes: usize,
507    mem: &'b TransactionalMemory,
508}
509
510impl<'a, 'b> LeafBuilder<'a, 'b> {
511    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
512        RawLeafBuilder::required_bytes(
513            num_pairs,
514            keys_values_bytes,
515            self.fixed_key_size,
516            self.fixed_value_size,
517        )
518    }
519
520    pub(super) fn new(
521        mem: &'b TransactionalMemory,
522        capacity: usize,
523        fixed_key_size: Option<usize>,
524        fixed_value_size: Option<usize>,
525    ) -> Self {
526        Self {
527            pairs: Vec::with_capacity(capacity),
528            fixed_key_size,
529            fixed_value_size,
530            total_key_bytes: 0,
531            total_value_bytes: 0,
532            mem,
533        }
534    }
535
536    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
537        self.total_key_bytes += key.len();
538        self.total_value_bytes += value.len();
539        self.pairs.push((key, value));
540    }
541
542    pub(super) fn push_all_except(
543        &mut self,
544        accessor: &'a LeafAccessor<'_>,
545        except: Option<usize>,
546    ) {
547        for i in 0..accessor.num_pairs() {
548            if let Some(except) = except {
549                if except == i {
550                    continue;
551                }
552            }
553            let entry = accessor.entry(i).unwrap();
554            self.push(entry.key(), entry.value());
555        }
556    }
557
558    pub(super) fn should_split(&self) -> bool {
559        let required_size = self.required_bytes(
560            self.pairs.len(),
561            self.total_key_bytes + self.total_value_bytes,
562        );
563        required_size > self.mem.get_page_size() && self.pairs.len() > 1
564    }
565
566    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
567        let total_size = self.total_key_bytes + self.total_value_bytes;
568        let mut division = 0;
569        let mut first_split_key_bytes = 0;
570        let mut first_split_value_bytes = 0;
571        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
572            first_split_key_bytes += key.len();
573            first_split_value_bytes += value.len();
574            division += 1;
575            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
576                break;
577            }
578        }
579
580        let required_size =
581            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
582        let mut page1 = self.mem.allocate(required_size)?;
583        let mut builder = RawLeafBuilder::new(
584            page1.memory_mut(),
585            division,
586            self.fixed_key_size,
587            self.fixed_value_size,
588            first_split_key_bytes,
589        );
590        for (key, value) in self.pairs.iter().take(division) {
591            builder.append(key, value);
592        }
593        drop(builder);
594
595        let required_size = self.required_bytes(
596            self.pairs.len() - division,
597            self.total_key_bytes + self.total_value_bytes
598                - first_split_key_bytes
599                - first_split_value_bytes,
600        );
601        let mut page2 = self.mem.allocate(required_size)?;
602        let mut builder = RawLeafBuilder::new(
603            page2.memory_mut(),
604            self.pairs.len() - division,
605            self.fixed_key_size,
606            self.fixed_value_size,
607            self.total_key_bytes - first_split_key_bytes,
608        );
609        for (key, value) in &self.pairs[division..] {
610            builder.append(key, value);
611        }
612        drop(builder);
613
614        Ok((page1, self.pairs[division - 1].0, page2))
615    }
616
617    pub(super) fn build(self) -> Result<PageMut> {
618        let required_size = self.required_bytes(
619            self.pairs.len(),
620            self.total_key_bytes + self.total_value_bytes,
621        );
622        let mut page = self.mem.allocate(required_size)?;
623        let mut builder = RawLeafBuilder::new(
624            page.memory_mut(),
625            self.pairs.len(),
626            self.fixed_key_size,
627            self.fixed_value_size,
628            self.total_key_bytes,
629        );
630        for (key, value) in self.pairs {
631            builder.append(key, value);
632        }
633        drop(builder);
634        Ok(page)
635    }
636}
637
638// Note the caller is responsible for ensuring that the buffer is large enough
639// and rewriting all fields if any dynamically sized fields are written
640// Layout is:
641// 1 byte: type
642// 1 byte: reserved (padding to 32bits aligned)
643// 2 bytes: num_entries (number of pairs)
644// (optional) repeating (num_entries times):
645// 4 bytes: key_end
646// (optional) repeating (num_entries times):
647// 4 bytes: value_end
648// repeating (num_entries times):
649// * n bytes: key data
650// repeating (num_entries times):
651// * n bytes: value data
652pub(crate) struct RawLeafBuilder<'a> {
653    page: &'a mut [u8],
654    fixed_key_size: Option<usize>,
655    fixed_value_size: Option<usize>,
656    num_pairs: usize,
657    provisioned_key_bytes: usize,
658    pairs_written: usize, // used for debugging
659}
660
661impl<'a> RawLeafBuilder<'a> {
662    pub(crate) fn required_bytes(
663        num_pairs: usize,
664        keys_values_bytes: usize,
665        key_size: Option<usize>,
666        value_size: Option<usize>,
667    ) -> usize {
668        // Page id & header;
669        let mut result = 4;
670        // key & value lengths
671        if key_size.is_none() {
672            result += num_pairs * size_of::<u32>();
673        }
674        if value_size.is_none() {
675            result += num_pairs * size_of::<u32>();
676        }
677        result += keys_values_bytes;
678
679        result
680    }
681
682    pub(crate) fn new(
683        page: &'a mut [u8],
684        num_pairs: usize,
685        fixed_key_size: Option<usize>,
686        fixed_value_size: Option<usize>,
687        key_bytes: usize,
688    ) -> Self {
689        page[0] = LEAF;
690        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
691        #[cfg(debug_assertions)]
692        {
693            // Poison all the key & value offsets, in case the caller forgets to write them
694            let mut last = 4;
695            if fixed_key_size.is_none() {
696                last += size_of::<u32>() * num_pairs;
697            }
698            if fixed_value_size.is_none() {
699                last += size_of::<u32>() * num_pairs;
700            }
701            for x in &mut page[4..last] {
702                *x = 0xFF;
703            }
704        }
705        RawLeafBuilder {
706            page,
707            fixed_key_size,
708            fixed_value_size,
709            num_pairs,
710            provisioned_key_bytes: key_bytes,
711            pairs_written: 0,
712        }
713    }
714
715    fn value_end(&self, n: usize) -> usize {
716        if let Some(fixed) = self.fixed_value_size {
717            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
718        }
719        let mut offset = 4 + size_of::<u32>() * n;
720        if self.fixed_key_size.is_none() {
721            offset += size_of::<u32>() * self.num_pairs;
722        }
723        u32::from_le_bytes(
724            self.page[offset..(offset + size_of::<u32>())]
725                .try_into()
726                .unwrap(),
727        ) as usize
728    }
729
730    fn key_section_start(&self) -> usize {
731        let mut offset = 4;
732        if self.fixed_key_size.is_none() {
733            offset += size_of::<u32>() * self.num_pairs;
734        }
735        if self.fixed_value_size.is_none() {
736            offset += size_of::<u32>() * self.num_pairs;
737        }
738
739        offset
740    }
741
742    fn key_end(&self, n: usize) -> usize {
743        if let Some(fixed) = self.fixed_key_size {
744            return self.key_section_start() + fixed * (n + 1);
745        }
746        let offset = 4 + size_of::<u32>() * n;
747        u32::from_le_bytes(
748            self.page[offset..(offset + size_of::<u32>())]
749                .try_into()
750                .unwrap(),
751        ) as usize
752    }
753
754    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
755        if let Some(key_width) = self.fixed_key_size {
756            assert_eq!(key_width, key.len());
757        }
758        if let Some(value_width) = self.fixed_value_size {
759            assert_eq!(value_width, value.len());
760        }
761        let key_offset = if self.pairs_written == 0 {
762            self.key_section_start()
763        } else {
764            self.key_end(self.pairs_written - 1)
765        };
766        let value_offset = if self.pairs_written == 0 {
767            self.key_section_start() + self.provisioned_key_bytes
768        } else {
769            self.value_end(self.pairs_written - 1)
770        };
771
772        let n = self.pairs_written;
773        if self.fixed_key_size.is_none() {
774            let offset = 4 + size_of::<u32>() * n;
775            self.page[offset..(offset + size_of::<u32>())]
776                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
777        }
778        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
779        let written_key_len = key_offset + key.len() - self.key_section_start();
780        assert!(written_key_len <= self.provisioned_key_bytes);
781
782        if self.fixed_value_size.is_none() {
783            let mut offset = 4 + size_of::<u32>() * n;
784            if self.fixed_key_size.is_none() {
785                offset += size_of::<u32>() * self.num_pairs;
786            }
787            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
788                &u32::try_from(value_offset + value.len())
789                    .unwrap()
790                    .to_le_bytes(),
791            );
792        }
793        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
794        self.pairs_written += 1;
795    }
796}
797
798impl<'a> Drop for RawLeafBuilder<'a> {
799    fn drop(&mut self) {
800        if !thread::panicking() {
801            assert_eq!(self.pairs_written, self.num_pairs);
802            assert_eq!(
803                self.key_section_start() + self.provisioned_key_bytes,
804                self.key_end(self.num_pairs - 1)
805            );
806        }
807    }
808}
809
810pub(crate) struct LeafMutator<'b> {
811    page: &'b mut PageMut,
812    fixed_key_size: Option<usize>,
813    fixed_value_size: Option<usize>,
814}
815
816impl<'b> LeafMutator<'b> {
817    pub(crate) fn new(
818        page: &'b mut PageMut,
819        fixed_key_size: Option<usize>,
820        fixed_value_size: Option<usize>,
821    ) -> Self {
822        assert_eq!(page.memory_mut()[0], LEAF);
823        Self {
824            page,
825            fixed_key_size,
826            fixed_value_size,
827        }
828    }
829
830    pub(super) fn sufficient_insert_inplace_space(
831        page: &'_ PageImpl,
832        position: usize,
833        overwrite: bool,
834        fixed_key_size: Option<usize>,
835        fixed_value_size: Option<usize>,
836        new_key: &[u8],
837        new_value: &[u8],
838    ) -> bool {
839        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
840        if overwrite {
841            let remaining = page.memory().len() - accessor.total_length();
842            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
843                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
844            required_delta <= isize::try_from(remaining).unwrap()
845        } else {
846            // If this is a large page, only allow in-place appending to avoid write amplification
847            //
848            // Note: this check is also required to avoid inserting an unbounded number of small values
849            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
850            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
851                return false;
852            }
853            let remaining = page.memory().len() - accessor.total_length();
854            let mut required_delta = new_key.len() + new_value.len();
855            if fixed_key_size.is_none() {
856                required_delta += size_of::<u32>();
857            }
858            if fixed_value_size.is_none() {
859                required_delta += size_of::<u32>();
860            }
861            required_delta <= remaining
862        }
863    }
864
865    // Insert the given key, value pair at index i and shift all following pairs to the right
866    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
867        let accessor = LeafAccessor::new(
868            self.page.memory(),
869            self.fixed_key_size,
870            self.fixed_value_size,
871        );
872        let required_delta = if overwrite {
873            isize::try_from(key.len() + value.len()).unwrap()
874                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
875        } else {
876            let mut delta = key.len() + value.len();
877            if self.fixed_key_size.is_none() {
878                delta += size_of::<u32>();
879            }
880            if self.fixed_value_size.is_none() {
881                delta += size_of::<u32>();
882            }
883            delta.try_into().unwrap()
884        };
885        assert!(
886            isize::try_from(accessor.total_length()).unwrap() + required_delta
887                <= isize::try_from(self.page.memory().len()).unwrap()
888        );
889
890        let num_pairs = accessor.num_pairs();
891        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
892        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
893        let shift_index = if overwrite { i + 1 } else { i };
894        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
895        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
896        let existing_value_len = accessor
897            .value_range(i)
898            .map(|(start, end)| end - start)
899            .unwrap_or_default();
900
901        let value_delta = if overwrite {
902            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
903        } else {
904            value.len().try_into().unwrap()
905        };
906
907        // Update all the pointers
908        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
909        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
910            4
911        } else {
912            0
913        };
914        if !overwrite {
915            for j in 0..i {
916                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
917                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
918                    .try_into()
919                    .unwrap();
920                self.update_value_end(j, value_delta);
921            }
922        }
923        for j in i..num_pairs {
924            if overwrite {
925                self.update_value_end(j, value_delta);
926            } else {
927                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
928                    .try_into()
929                    .unwrap();
930                self.update_key_end(j, key_delta);
931                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
932                self.update_value_end(j, value_delta);
933            }
934        }
935
936        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
937        self.page.memory_mut()[2..4]
938            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
939
940        // Right shift the trailing values
941        let mut dest = if overwrite {
942            (isize::try_from(shift_value_start).unwrap() + value_delta)
943                .try_into()
944                .unwrap()
945        } else {
946            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
947        };
948        let start = shift_value_start;
949        let end = last_value_end;
950        self.page.memory_mut().copy_within(start..end, dest);
951
952        // Insert the value
953        let inserted_value_end: u32 = dest.try_into().unwrap();
954        dest -= value.len();
955        self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
956
957        if !overwrite {
958            // Right shift the trailing key data & preceding value data
959            let start = shift_key_start;
960            let end = shift_value_start;
961            dest -= end - start;
962            self.page.memory_mut().copy_within(start..end, dest);
963
964            // Insert the key
965            let inserted_key_end: u32 = dest.try_into().unwrap();
966            dest -= key.len();
967            self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
968
969            // Right shift the trailing value pointers & preceding key data
970            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
971            let end = shift_key_start;
972            dest -= end - start;
973            debug_assert_eq!(
974                dest,
975                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
976            );
977            self.page.memory_mut().copy_within(start..end, dest);
978
979            // Insert the value pointer
980            if self.fixed_value_size.is_none() {
981                dest -= size_of::<u32>();
982                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
983                    .copy_from_slice(&inserted_value_end.to_le_bytes());
984            }
985
986            // Right shift the trailing key pointers & preceding value pointers
987            let start = 4 + key_ptr_size * i;
988            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
989            dest -= end - start;
990            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
991            self.page.memory_mut().copy_within(start..end, dest);
992
993            // Insert the key pointer
994            if self.fixed_key_size.is_none() {
995                dest -= size_of::<u32>();
996                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
997                    .copy_from_slice(&inserted_key_end.to_le_bytes());
998            }
999            debug_assert_eq!(dest, 4 + key_ptr_size * i);
1000        }
1001    }
1002
1003    pub(super) fn remove(&mut self, i: usize) {
1004        let accessor = LeafAccessor::new(
1005            self.page.memory(),
1006            self.fixed_key_size,
1007            self.fixed_value_size,
1008        );
1009        let num_pairs = accessor.num_pairs();
1010        assert!(i < num_pairs);
1011        assert!(num_pairs > 1);
1012        let key_start = accessor.key_start(i).unwrap();
1013        let key_end = accessor.key_end(i).unwrap();
1014        let value_start = accessor.value_start(i).unwrap();
1015        let value_end = accessor.value_end(i).unwrap();
1016        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1017
1018        // Update all the pointers
1019        let key_ptr_size = if self.fixed_key_size.is_none() {
1020            size_of::<u32>()
1021        } else {
1022            0
1023        };
1024        let value_ptr_size = if self.fixed_value_size.is_none() {
1025            size_of::<u32>()
1026        } else {
1027            0
1028        };
1029        for j in 0..i {
1030            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1031            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1032                - isize::try_from(key_end - key_start).unwrap();
1033            self.update_value_end(j, value_delta);
1034        }
1035        for j in (i + 1)..num_pairs {
1036            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1037                - isize::try_from(key_end - key_start).unwrap();
1038            self.update_key_end(j, key_delta);
1039            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1040            self.update_value_end(j, value_delta);
1041        }
1042
1043        // Left shift all the pointers & data
1044
1045        let new_num_pairs = num_pairs - 1;
1046        self.page.memory_mut()[2..4]
1047            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1048        // Left shift the trailing key pointers & preceding value pointers
1049        let mut dest = 4 + key_ptr_size * i;
1050        // First trailing key pointer
1051        let start = 4 + key_ptr_size * (i + 1);
1052        // Last preceding value pointer
1053        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1054        self.page.memory_mut().copy_within(start..end, dest);
1055        dest += end - start;
1056        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1057
1058        // Left shift the trailing value pointers & preceding key data
1059        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1060        let end = key_start;
1061        self.page.memory_mut().copy_within(start..end, dest);
1062        dest += end - start;
1063
1064        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1065        debug_assert_eq!(
1066            dest,
1067            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1068        );
1069
1070        // Left shift the trailing key data & preceding value data
1071        let start = key_end;
1072        let end = value_start;
1073        self.page.memory_mut().copy_within(start..end, dest);
1074        dest += end - start;
1075
1076        // Left shift the trailing value data
1077        let preceding_data_len =
1078            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1079        debug_assert_eq!(
1080            dest,
1081            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1082        );
1083        let start = value_end;
1084        let end = last_value_end;
1085        self.page.memory_mut().copy_within(start..end, dest);
1086    }
1087
1088    fn update_key_end(&mut self, i: usize, delta: isize) {
1089        if self.fixed_key_size.is_some() {
1090            return;
1091        }
1092        let offset = 4 + size_of::<u32>() * i;
1093        let mut ptr = u32::from_le_bytes(
1094            self.page.memory()[offset..(offset + size_of::<u32>())]
1095                .try_into()
1096                .unwrap(),
1097        );
1098        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1099        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1100            .copy_from_slice(&ptr.to_le_bytes());
1101    }
1102
1103    fn update_value_end(&mut self, i: usize, delta: isize) {
1104        if self.fixed_value_size.is_some() {
1105            return;
1106        }
1107        let accessor = LeafAccessor::new(
1108            self.page.memory(),
1109            self.fixed_key_size,
1110            self.fixed_value_size,
1111        );
1112        let num_pairs = accessor.num_pairs();
1113        let mut offset = 4 + size_of::<u32>() * i;
1114        if self.fixed_key_size.is_none() {
1115            offset += size_of::<u32>() * num_pairs;
1116        }
1117        let mut ptr = u32::from_le_bytes(
1118            self.page.memory()[offset..(offset + size_of::<u32>())]
1119                .try_into()
1120                .unwrap(),
1121        );
1122        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1123        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1124            .copy_from_slice(&ptr.to_le_bytes());
1125    }
1126}
1127
1128// Provides a simple zero-copy way to access a branch page
1129// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1130pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1131    page: &'b T,
1132    num_keys: usize,
1133    fixed_key_size: Option<usize>,
1134    _page_lifetime: PhantomData<&'a ()>,
1135}
1136
1137impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1138    pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1139        debug_assert_eq!(page.memory()[0], BRANCH);
1140        let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1141        BranchAccessor {
1142            page,
1143            num_keys,
1144            fixed_key_size,
1145            _page_lifetime: Default::default(),
1146        }
1147    }
1148
1149    pub(super) fn print_node<K: Key>(&self) {
1150        eprint!(
1151            "Internal[ (page={:?}), child_0={:?}",
1152            self.page.get_page_number(),
1153            self.child_page(0).unwrap()
1154        );
1155        for i in 0..(self.count_children() - 1) {
1156            if let Some(child) = self.child_page(i + 1) {
1157                let key = self.key(i).unwrap();
1158                eprint!(" key_{i}={:?}", K::from_bytes(key));
1159                eprint!(" child_{}={child:?}", i + 1);
1160            }
1161        }
1162        eprint!("]");
1163    }
1164
1165    pub(crate) fn total_length(&self) -> usize {
1166        // Keys are stored at the end
1167        self.key_end(self.num_keys() - 1).unwrap()
1168    }
1169
1170    pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1171        let mut min_child = 0; // inclusive
1172        let mut max_child = self.num_keys(); // inclusive
1173        while min_child < max_child {
1174            let mid = (min_child + max_child) / 2;
1175            match K::compare(query, self.key(mid).unwrap()) {
1176                Ordering::Less => {
1177                    max_child = mid;
1178                }
1179                Ordering::Equal => {
1180                    return (mid, self.child_page(mid).unwrap());
1181                }
1182                Ordering::Greater => {
1183                    min_child = mid + 1;
1184                }
1185            }
1186        }
1187        debug_assert_eq!(min_child, max_child);
1188
1189        (min_child, self.child_page(min_child).unwrap())
1190    }
1191
1192    fn key_section_start(&self) -> usize {
1193        if self.fixed_key_size.is_none() {
1194            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1195                + size_of::<u32>() * self.num_keys()
1196        } else {
1197            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1198        }
1199    }
1200
1201    fn key_offset(&self, n: usize) -> usize {
1202        if n == 0 {
1203            self.key_section_start()
1204        } else {
1205            self.key_end(n - 1).unwrap()
1206        }
1207    }
1208
1209    fn key_end(&self, n: usize) -> Option<usize> {
1210        if let Some(fixed) = self.fixed_key_size {
1211            return Some(self.key_section_start() + fixed * (n + 1));
1212        }
1213        let offset = 8
1214            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1215            + size_of::<u32>() * n;
1216        Some(u32::from_le_bytes(
1217            self.page
1218                .memory()
1219                .get(offset..(offset + size_of::<u32>()))?
1220                .try_into()
1221                .unwrap(),
1222        ) as usize)
1223    }
1224
1225    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1226        if n >= self.num_keys() {
1227            return None;
1228        }
1229        let offset = self.key_offset(n);
1230        let end = self.key_end(n)?;
1231        Some(&self.page.memory()[offset..end])
1232    }
1233
1234    pub(crate) fn count_children(&self) -> usize {
1235        self.num_keys() + 1
1236    }
1237
1238    pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1239        if n >= self.count_children() {
1240            return None;
1241        }
1242
1243        let offset = 8 + size_of::<Checksum>() * n;
1244        Some(Checksum::from_le_bytes(
1245            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1246                .try_into()
1247                .unwrap(),
1248        ))
1249    }
1250
1251    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1252        if n >= self.count_children() {
1253            return None;
1254        }
1255
1256        let offset =
1257            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1258        Some(PageNumber::from_le_bytes(
1259            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1260                .try_into()
1261                .unwrap(),
1262        ))
1263    }
1264
1265    fn num_keys(&self) -> usize {
1266        self.num_keys
1267    }
1268}
1269
1270pub(super) struct BranchBuilder<'a, 'b> {
1271    children: Vec<(PageNumber, Checksum)>,
1272    keys: Vec<&'a [u8]>,
1273    total_key_bytes: usize,
1274    fixed_key_size: Option<usize>,
1275    mem: &'b TransactionalMemory,
1276}
1277
1278impl<'a, 'b> BranchBuilder<'a, 'b> {
1279    pub(super) fn new(
1280        mem: &'b TransactionalMemory,
1281        child_capacity: usize,
1282        fixed_key_size: Option<usize>,
1283    ) -> Self {
1284        Self {
1285            children: Vec::with_capacity(child_capacity),
1286            keys: Vec::with_capacity(child_capacity - 1),
1287            total_key_bytes: 0,
1288            fixed_key_size,
1289            mem,
1290        }
1291    }
1292
1293    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1294        self.children[index] = (child, checksum);
1295    }
1296
1297    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1298        self.children.push((child, checksum));
1299    }
1300
1301    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1302        self.keys.push(key);
1303        self.total_key_bytes += key.len();
1304    }
1305
1306    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1307        for i in 0..accessor.count_children() {
1308            let child = accessor.child_page(i).unwrap();
1309            let checksum = accessor.child_checksum(i).unwrap();
1310            self.push_child(child, checksum);
1311        }
1312        for i in 0..(accessor.count_children() - 1) {
1313            self.push_key(accessor.key(i).unwrap());
1314        }
1315    }
1316
1317    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1318        if self.children.len() > 1 {
1319            None
1320        } else {
1321            Some(self.children[0])
1322        }
1323    }
1324
1325    pub(super) fn build(self) -> Result<PageMut> {
1326        assert_eq!(self.children.len(), self.keys.len() + 1);
1327        let size = RawBranchBuilder::required_bytes(
1328            self.keys.len(),
1329            self.total_key_bytes,
1330            self.fixed_key_size,
1331        );
1332        let mut page = self.mem.allocate(size)?;
1333        let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1334        builder.write_first_page(self.children[0].0, self.children[0].1);
1335        for i in 1..self.children.len() {
1336            let key = &self.keys[i - 1];
1337            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1338        }
1339        drop(builder);
1340
1341        Ok(page)
1342    }
1343
1344    pub(super) fn should_split(&self) -> bool {
1345        let size = RawBranchBuilder::required_bytes(
1346            self.keys.len(),
1347            self.total_key_bytes,
1348            self.fixed_key_size,
1349        );
1350        size > self.mem.get_page_size() && self.keys.len() >= 3
1351    }
1352
1353    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1354        assert_eq!(self.children.len(), self.keys.len() + 1);
1355        assert!(self.keys.len() >= 3);
1356        let division = self.keys.len() / 2;
1357        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1358        let division_key = self.keys[division];
1359        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1360
1361        let size =
1362            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1363        let mut page1 = self.mem.allocate(size)?;
1364        let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1365        builder.write_first_page(self.children[0].0, self.children[0].1);
1366        for i in 0..division {
1367            let key = &self.keys[i];
1368            builder.write_nth_key(
1369                key.as_ref(),
1370                self.children[i + 1].0,
1371                self.children[i + 1].1,
1372                i,
1373            );
1374        }
1375        drop(builder);
1376
1377        let size = RawBranchBuilder::required_bytes(
1378            self.keys.len() - division - 1,
1379            second_split_key_len,
1380            self.fixed_key_size,
1381        );
1382        let mut page2 = self.mem.allocate(size)?;
1383        let mut builder = RawBranchBuilder::new(
1384            &mut page2,
1385            self.keys.len() - division - 1,
1386            self.fixed_key_size,
1387        );
1388        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1389        for i in (division + 1)..self.keys.len() {
1390            let key = &self.keys[i];
1391            builder.write_nth_key(
1392                key.as_ref(),
1393                self.children[i + 1].0,
1394                self.children[i + 1].1,
1395                i - division - 1,
1396            );
1397        }
1398        drop(builder);
1399
1400        Ok((page1, division_key, page2))
1401    }
1402}
1403
1404// Note the caller is responsible for ensuring that the buffer is large enough
1405// and rewriting all fields if any dynamically sized fields are written
1406// Layout is:
1407// 1 byte: type
1408// 1 byte: padding (padding to 16bits aligned)
1409// 2 bytes: num_keys (number of keys)
1410// 4 byte: padding (padding to 64bits aligned)
1411// repeating (num_keys + 1 times):
1412// 16 bytes: child page checksum
1413// repeating (num_keys + 1 times):
1414// 8 bytes: page number
1415// (optional) repeating (num_keys times):
1416// * 4 bytes: key end. Ending offset of the key, exclusive
1417// repeating (num_keys times):
1418// * n bytes: key data
1419pub(super) struct RawBranchBuilder<'b> {
1420    page: &'b mut PageMut,
1421    fixed_key_size: Option<usize>,
1422    num_keys: usize,
1423    keys_written: usize, // used for debugging
1424}
1425
1426impl<'b> RawBranchBuilder<'b> {
1427    pub(super) fn required_bytes(
1428        num_keys: usize,
1429        size_of_keys: usize,
1430        fixed_key_size: Option<usize>,
1431    ) -> usize {
1432        if fixed_key_size.is_none() {
1433            let fixed_size = 8
1434                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1435                + size_of::<u32>() * num_keys;
1436            size_of_keys + fixed_size
1437        } else {
1438            let fixed_size =
1439                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1440            size_of_keys + fixed_size
1441        }
1442    }
1443
1444    // Caller MUST write num_keys values
1445    pub(super) fn new(
1446        page: &'b mut PageMut,
1447        num_keys: usize,
1448        fixed_key_size: Option<usize>,
1449    ) -> Self {
1450        assert!(num_keys > 0);
1451        page.memory_mut()[0] = BRANCH;
1452        page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1453        #[cfg(debug_assertions)]
1454        {
1455            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1456            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1457            let last = 8
1458                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1459                + size_of::<u32>() * num_keys;
1460            for x in &mut page.memory_mut()[start..last] {
1461                *x = 0xFF;
1462            }
1463        }
1464        RawBranchBuilder {
1465            page,
1466            fixed_key_size,
1467            num_keys,
1468            keys_written: 0,
1469        }
1470    }
1471
1472    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1473        let offset = 8;
1474        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1475            .copy_from_slice(&checksum.to_le_bytes());
1476        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1477        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1478            .copy_from_slice(&page_number.to_le_bytes());
1479    }
1480
1481    fn key_section_start(&self) -> usize {
1482        let mut offset =
1483            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1484        if self.fixed_key_size.is_none() {
1485            offset += size_of::<u32>() * self.num_keys;
1486        }
1487
1488        offset
1489    }
1490
1491    fn key_end(&self, n: usize) -> usize {
1492        if let Some(fixed) = self.fixed_key_size {
1493            return self.key_section_start() + fixed * (n + 1);
1494        }
1495        let offset = 8
1496            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1497            + size_of::<u32>() * n;
1498        u32::from_le_bytes(
1499            self.page.memory()[offset..(offset + size_of::<u32>())]
1500                .try_into()
1501                .unwrap(),
1502        ) as usize
1503    }
1504
1505    // Write the nth key and page of values greater than this key, but less than or equal to the next
1506    // Caller must write keys & pages in increasing order
1507    pub(super) fn write_nth_key(
1508        &mut self,
1509        key: &[u8],
1510        page_number: PageNumber,
1511        checksum: Checksum,
1512        n: usize,
1513    ) {
1514        assert!(n < self.num_keys);
1515        assert_eq!(n, self.keys_written);
1516        self.keys_written += 1;
1517        let offset = 8 + size_of::<Checksum>() * (n + 1);
1518        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1519            .copy_from_slice(&checksum.to_le_bytes());
1520        let offset = 8
1521            + size_of::<Checksum>() * (self.num_keys + 1)
1522            + PageNumber::serialized_size() * (n + 1);
1523        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1524            .copy_from_slice(&page_number.to_le_bytes());
1525
1526        let data_offset = if n > 0 {
1527            self.key_end(n - 1)
1528        } else {
1529            self.key_section_start()
1530        };
1531        if self.fixed_key_size.is_none() {
1532            let offset = 8
1533                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1534                + size_of::<u32>() * n;
1535            self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1536                &u32::try_from(data_offset + key.len())
1537                    .unwrap()
1538                    .to_le_bytes(),
1539            );
1540        }
1541
1542        debug_assert!(data_offset > offset);
1543        self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1544    }
1545}
1546
1547impl<'b> Drop for RawBranchBuilder<'b> {
1548    fn drop(&mut self) {
1549        if !thread::panicking() {
1550            assert_eq!(self.keys_written, self.num_keys);
1551        }
1552    }
1553}
1554
1555pub(crate) struct BranchMutator<'b> {
1556    page: &'b mut PageMut,
1557}
1558
1559impl<'b> BranchMutator<'b> {
1560    pub(crate) fn new(page: &'b mut PageMut) -> Self {
1561        assert_eq!(page.memory()[0], BRANCH);
1562        Self { page }
1563    }
1564
1565    fn num_keys(&self) -> usize {
1566        u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1567    }
1568
1569    pub(crate) fn write_child_page(
1570        &mut self,
1571        i: usize,
1572        page_number: PageNumber,
1573        checksum: Checksum,
1574    ) {
1575        debug_assert!(i <= self.num_keys());
1576        let offset = 8 + size_of::<Checksum>() * i;
1577        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1578            .copy_from_slice(&checksum.to_le_bytes());
1579        let offset =
1580            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1581        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1582            .copy_from_slice(&page_number.to_le_bytes());
1583    }
1584}