redb/tree_store/
btree_base.rs

1use crate::tree_store::PageNumber;
2use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::Arc;
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16// Dummy value. Final value will be computed during commit
17pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20    page: &T,
21    fixed_key_size: Option<usize>,
22    fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25    let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26        StorageError::Corrupted(format!(
27            "Leaf page {:?} corrupted. Number of pairs is zero",
28            page.get_page_number()
29        ))
30    })?;
31    let end = accessor.value_end(last_pair).ok_or_else(|| {
32        StorageError::Corrupted(format!(
33            "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34            page.get_page_number(),
35            last_pair,
36        ))
37    })?;
38    if end > page.memory().len() {
39        Err(StorageError::Corrupted(format!(
40            "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41            page.get_page_number(),
42            end,
43            page.memory().len()
44        )))
45    } else {
46        Ok(xxh3_checksum(&page.memory()[..end]))
47    }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51    page: &T,
52    fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54    let accessor = BranchAccessor::new(page, fixed_key_size);
55    let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56        StorageError::Corrupted(format!(
57            "Branch page {:?} corrupted. Number of keys is zero",
58            page.get_page_number()
59        ))
60    })?;
61    let end = accessor.key_end(last_key).ok_or_else(|| {
62        StorageError::Corrupted(format!(
63            "Branch page {:?} corrupted. Can't find offset for key {}",
64            page.get_page_number(),
65            last_key
66        ))
67    })?;
68    if end > page.memory().len() {
69        Err(StorageError::Corrupted(format!(
70            "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71            page.get_page_number(),
72            end,
73            page.memory().len()
74        )))
75    } else {
76        Ok(xxh3_checksum(&page.memory()[..end]))
77    }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82    pub(crate) root: PageNumber,
83    pub(crate) checksum: Checksum,
84    pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88    pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89        Self {
90            root,
91            checksum,
92            length,
93        }
94    }
95
96    pub(crate) const fn serialized_size() -> usize {
97        PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98    }
99
100    pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101        let root =
102            PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103        let mut offset = PageNumber::serialized_size();
104        let checksum = Checksum::from_le_bytes(
105            bytes[offset..(offset + size_of::<Checksum>())]
106                .try_into()
107                .unwrap(),
108        );
109        offset += size_of::<Checksum>();
110        let length = u64::from_le_bytes(
111            bytes[offset..(offset + size_of::<u64>())]
112                .try_into()
113                .unwrap(),
114        );
115
116        Self {
117            root,
118            checksum,
119            length,
120        }
121    }
122
123    pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124        let mut result = [0; Self::serialized_size()];
125        result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126        result[PageNumber::serialized_size()
127            ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128            .copy_from_slice(&self.checksum.to_le_bytes());
129        result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130            .copy_from_slice(&self.length.to_le_bytes());
131
132        result
133    }
134}
135
136enum OnDrop {
137    None,
138    RemoveEntry {
139        position: usize,
140        fixed_key_size: Option<usize>,
141    },
142}
143
144enum EitherPage {
145    Immutable(PageImpl),
146    Mutable(PageMut),
147    OwnedMemory(Vec<u8>),
148    ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152    fn memory(&self) -> &[u8] {
153        match self {
154            EitherPage::Immutable(page) => page.memory(),
155            EitherPage::Mutable(page) => page.memory(),
156            EitherPage::OwnedMemory(mem) => mem.as_slice(),
157            EitherPage::ArcMemory(mem) => mem,
158        }
159    }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163    page: EitherPage,
164    offset: usize,
165    len: usize,
166    on_drop: OnDrop,
167    _value_type: PhantomData<V>,
168    // Used so that logical references into a Table respect the appropriate lifetime
169    _lifetime: PhantomData<&'a ()>,
170}
171
172impl<V: Value + 'static> AccessGuard<'_, V> {
173    pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174        Self {
175            page: EitherPage::Immutable(page),
176            offset: range.start,
177            len: range.len(),
178            on_drop: OnDrop::None,
179            _value_type: Default::default(),
180            _lifetime: Default::default(),
181        }
182    }
183
184    pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185        Self {
186            page: EitherPage::ArcMemory(page),
187            offset: range.start,
188            len: range.len(),
189            on_drop: OnDrop::None,
190            _value_type: Default::default(),
191            _lifetime: Default::default(),
192        }
193    }
194
195    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196        let len = value.len();
197        Self {
198            page: EitherPage::OwnedMemory(value),
199            offset: 0,
200            len,
201            on_drop: OnDrop::None,
202            _value_type: Default::default(),
203            _lifetime: Default::default(),
204        }
205    }
206
207    pub(super) fn remove_on_drop(
208        page: PageMut,
209        offset: usize,
210        len: usize,
211        position: usize,
212        fixed_key_size: Option<usize>,
213    ) -> Self {
214        Self {
215            page: EitherPage::Mutable(page),
216            offset,
217            len,
218            on_drop: OnDrop::RemoveEntry {
219                position,
220                fixed_key_size,
221            },
222            _value_type: Default::default(),
223            _lifetime: Default::default(),
224        }
225    }
226
227    pub fn value(&self) -> V::SelfType<'_> {
228        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229    }
230}
231
232impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
233    fn drop(&mut self) {
234        match self.on_drop {
235            OnDrop::None => {}
236            OnDrop::RemoveEntry {
237                position,
238                fixed_key_size,
239            } => {
240                if let EitherPage::Mutable(ref mut mut_page) = self.page {
241                    let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242                    mutator.remove(position);
243                } else if !thread::panicking() {
244                    unreachable!();
245                }
246            }
247        }
248    }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252    page: PageMut,
253    offset: usize,
254    len: usize,
255    _value_type: PhantomData<V>,
256    // Used so that logical references into a Table respect the appropriate lifetime
257    _lifetime: PhantomData<&'a ()>,
258}
259
260impl<V: Value + 'static> AccessGuardMut<'_, V> {
261    pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262        AccessGuardMut {
263            page,
264            offset,
265            len,
266            _value_type: Default::default(),
267            _lifetime: Default::default(),
268        }
269    }
270}
271
272impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'_, V> {
273    fn as_mut(&mut self) -> &mut V::BaseRefType {
274        V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275    }
276}
277
278// Provides a simple zero-copy way to access entries
279pub struct EntryAccessor<'a> {
280    key: &'a [u8],
281    value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286        EntryAccessor { key, value }
287    }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291    pub(crate) fn key(&'b self) -> &'a [u8] {
292        self.key
293    }
294
295    pub(crate) fn value(&'b self) -> &'a [u8] {
296        self.value
297    }
298}
299
300// Provides a simple zero-copy way to access a leaf page
301pub(crate) struct LeafAccessor<'a> {
302    page: &'a [u8],
303    fixed_key_size: Option<usize>,
304    fixed_value_size: Option<usize>,
305    num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309    pub(crate) fn new(
310        page: &'a [u8],
311        fixed_key_size: Option<usize>,
312        fixed_value_size: Option<usize>,
313    ) -> Self {
314        debug_assert_eq!(page[0], LEAF);
315        let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316        LeafAccessor {
317            page,
318            fixed_key_size,
319            fixed_value_size,
320            num_pairs,
321        }
322    }
323
324    pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325        let mut i = 0;
326        while let Some(entry) = self.entry(i) {
327            eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328            if include_value {
329                eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330            }
331            i += 1;
332        }
333    }
334
335    pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336        // inclusive
337        let mut min_entry = 0;
338        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
339        let mut max_entry = self.num_pairs();
340        while min_entry < max_entry {
341            let mid = (min_entry + max_entry) / 2;
342            let key = self.key_unchecked(mid);
343            match K::compare(query, key) {
344                Ordering::Less => {
345                    max_entry = mid;
346                }
347                Ordering::Equal => {
348                    return (mid, true);
349                }
350                Ordering::Greater => {
351                    min_entry = mid + 1;
352                }
353            }
354        }
355        debug_assert_eq!(min_entry, max_entry);
356        (min_entry, false)
357    }
358
359    pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360        let (entry, found) = self.position::<K>(query);
361        if found { Some(entry) } else { None }
362    }
363
364    fn key_section_start(&self) -> usize {
365        let mut offset = 4;
366        if self.fixed_key_size.is_none() {
367            offset += size_of::<u32>() * self.num_pairs;
368        }
369        if self.fixed_value_size.is_none() {
370            offset += size_of::<u32>() * self.num_pairs;
371        }
372
373        offset
374    }
375
376    fn key_start(&self, n: usize) -> Option<usize> {
377        if n == 0 {
378            Some(self.key_section_start())
379        } else {
380            self.key_end(n - 1)
381        }
382    }
383
384    fn key_end(&self, n: usize) -> Option<usize> {
385        if n >= self.num_pairs() {
386            None
387        } else {
388            if let Some(fixed) = self.fixed_key_size {
389                return Some(self.key_section_start() + fixed * (n + 1));
390            }
391            let offset = 4 + size_of::<u32>() * n;
392            let end = u32::from_le_bytes(
393                self.page
394                    .get(offset..(offset + size_of::<u32>()))?
395                    .try_into()
396                    .unwrap(),
397            ) as usize;
398            Some(end)
399        }
400    }
401
402    fn value_start(&self, n: usize) -> Option<usize> {
403        if n == 0 {
404            self.key_end(self.num_pairs() - 1)
405        } else {
406            self.value_end(n - 1)
407        }
408    }
409
410    fn value_end(&self, n: usize) -> Option<usize> {
411        if n >= self.num_pairs() {
412            None
413        } else {
414            if let Some(fixed) = self.fixed_value_size {
415                return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
416            }
417            let mut offset = 4 + size_of::<u32>() * n;
418            if self.fixed_key_size.is_none() {
419                offset += size_of::<u32>() * self.num_pairs;
420            }
421            let end = u32::from_le_bytes(
422                self.page
423                    .get(offset..(offset + size_of::<u32>()))?
424                    .try_into()
425                    .unwrap(),
426            ) as usize;
427            Some(end)
428        }
429    }
430
431    pub(crate) fn num_pairs(&self) -> usize {
432        self.num_pairs
433    }
434
435    pub(super) fn offset_of_first_value(&self) -> usize {
436        self.offset_of_value(0).unwrap()
437    }
438
439    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
440        self.value_start(n)
441    }
442
443    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
444        Some((self.value_start(n)?, self.value_end(n)?))
445    }
446
447    // Returns the length of all keys and values between [start, end)
448    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
449        self.length_of_values(start, end) + self.length_of_keys(start, end)
450    }
451
452    fn length_of_values(&self, start: usize, end: usize) -> usize {
453        if end == 0 {
454            return 0;
455        }
456        let end_offset = self.value_end(end - 1).unwrap();
457        let start_offset = self.value_start(start).unwrap();
458        end_offset - start_offset
459    }
460
461    // Returns the length of all keys between [start, end)
462    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
463        if end == 0 {
464            return 0;
465        }
466        let end_offset = self.key_end(end - 1).unwrap();
467        let start_offset = self.key_start(start).unwrap();
468        end_offset - start_offset
469    }
470
471    pub(crate) fn total_length(&self) -> usize {
472        // Values are stored last
473        self.value_end(self.num_pairs() - 1).unwrap()
474    }
475
476    fn key_unchecked(&self, n: usize) -> &[u8] {
477        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
478    }
479
480    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
481        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
482        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
483        Some(EntryAccessor::new(key, value))
484    }
485
486    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
487        let key = self.key_start(n)?..self.key_end(n)?;
488        let value = self.value_start(n)?..self.value_end(n)?;
489        Some((key, value))
490    }
491
492    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
493        self.entry(self.num_pairs() - 1).unwrap()
494    }
495}
496
497pub(super) struct LeafBuilder<'a, 'b> {
498    pairs: Vec<(&'a [u8], &'a [u8])>,
499    fixed_key_size: Option<usize>,
500    fixed_value_size: Option<usize>,
501    total_key_bytes: usize,
502    total_value_bytes: usize,
503    mem: &'b TransactionalMemory,
504}
505
506impl<'a, 'b> LeafBuilder<'a, 'b> {
507    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
508        RawLeafBuilder::required_bytes(
509            num_pairs,
510            keys_values_bytes,
511            self.fixed_key_size,
512            self.fixed_value_size,
513        )
514    }
515
516    pub(super) fn new(
517        mem: &'b TransactionalMemory,
518        capacity: usize,
519        fixed_key_size: Option<usize>,
520        fixed_value_size: Option<usize>,
521    ) -> Self {
522        Self {
523            pairs: Vec::with_capacity(capacity),
524            fixed_key_size,
525            fixed_value_size,
526            total_key_bytes: 0,
527            total_value_bytes: 0,
528            mem,
529        }
530    }
531
532    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
533        self.total_key_bytes += key.len();
534        self.total_value_bytes += value.len();
535        self.pairs.push((key, value));
536    }
537
538    pub(super) fn push_all_except(
539        &mut self,
540        accessor: &'a LeafAccessor<'_>,
541        except: Option<usize>,
542    ) {
543        for i in 0..accessor.num_pairs() {
544            if let Some(except) = except {
545                if except == i {
546                    continue;
547                }
548            }
549            let entry = accessor.entry(i).unwrap();
550            self.push(entry.key(), entry.value());
551        }
552    }
553
554    pub(super) fn should_split(&self) -> bool {
555        let required_size = self.required_bytes(
556            self.pairs.len(),
557            self.total_key_bytes + self.total_value_bytes,
558        );
559        required_size > self.mem.get_page_size() && self.pairs.len() > 1
560    }
561
562    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
563        let total_size = self.total_key_bytes + self.total_value_bytes;
564        let mut division = 0;
565        let mut first_split_key_bytes = 0;
566        let mut first_split_value_bytes = 0;
567        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
568            first_split_key_bytes += key.len();
569            first_split_value_bytes += value.len();
570            division += 1;
571            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
572                break;
573            }
574        }
575
576        let required_size =
577            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
578        let mut page1 = self.mem.allocate(required_size)?;
579        let mut builder = RawLeafBuilder::new(
580            page1.memory_mut(),
581            division,
582            self.fixed_key_size,
583            self.fixed_value_size,
584            first_split_key_bytes,
585        );
586        for (key, value) in self.pairs.iter().take(division) {
587            builder.append(key, value);
588        }
589        drop(builder);
590
591        let required_size = self.required_bytes(
592            self.pairs.len() - division,
593            self.total_key_bytes + self.total_value_bytes
594                - first_split_key_bytes
595                - first_split_value_bytes,
596        );
597        let mut page2 = self.mem.allocate(required_size)?;
598        let mut builder = RawLeafBuilder::new(
599            page2.memory_mut(),
600            self.pairs.len() - division,
601            self.fixed_key_size,
602            self.fixed_value_size,
603            self.total_key_bytes - first_split_key_bytes,
604        );
605        for (key, value) in &self.pairs[division..] {
606            builder.append(key, value);
607        }
608        drop(builder);
609
610        Ok((page1, self.pairs[division - 1].0, page2))
611    }
612
613    pub(super) fn build(self) -> Result<PageMut> {
614        let required_size = self.required_bytes(
615            self.pairs.len(),
616            self.total_key_bytes + self.total_value_bytes,
617        );
618        let mut page = self.mem.allocate(required_size)?;
619        let mut builder = RawLeafBuilder::new(
620            page.memory_mut(),
621            self.pairs.len(),
622            self.fixed_key_size,
623            self.fixed_value_size,
624            self.total_key_bytes,
625        );
626        for (key, value) in self.pairs {
627            builder.append(key, value);
628        }
629        drop(builder);
630        Ok(page)
631    }
632}
633
634// Note the caller is responsible for ensuring that the buffer is large enough
635// and rewriting all fields if any dynamically sized fields are written
636// Layout is:
637// 1 byte: type
638// 1 byte: reserved (padding to 32bits aligned)
639// 2 bytes: num_entries (number of pairs)
640// (optional) repeating (num_entries times):
641// 4 bytes: key_end
642// (optional) repeating (num_entries times):
643// 4 bytes: value_end
644// repeating (num_entries times):
645// * n bytes: key data
646// repeating (num_entries times):
647// * n bytes: value data
648pub(crate) struct RawLeafBuilder<'a> {
649    page: &'a mut [u8],
650    fixed_key_size: Option<usize>,
651    fixed_value_size: Option<usize>,
652    num_pairs: usize,
653    provisioned_key_bytes: usize,
654    pairs_written: usize, // used for debugging
655}
656
657impl<'a> RawLeafBuilder<'a> {
658    pub(crate) fn required_bytes(
659        num_pairs: usize,
660        keys_values_bytes: usize,
661        key_size: Option<usize>,
662        value_size: Option<usize>,
663    ) -> usize {
664        // Page id & header;
665        let mut result = 4;
666        // key & value lengths
667        if key_size.is_none() {
668            result += num_pairs * size_of::<u32>();
669        }
670        if value_size.is_none() {
671            result += num_pairs * size_of::<u32>();
672        }
673        result += keys_values_bytes;
674
675        result
676    }
677
678    pub(crate) fn new(
679        page: &'a mut [u8],
680        num_pairs: usize,
681        fixed_key_size: Option<usize>,
682        fixed_value_size: Option<usize>,
683        key_bytes: usize,
684    ) -> Self {
685        page[0] = LEAF;
686        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
687        #[cfg(debug_assertions)]
688        {
689            // Poison all the key & value offsets, in case the caller forgets to write them
690            let mut last = 4;
691            if fixed_key_size.is_none() {
692                last += size_of::<u32>() * num_pairs;
693            }
694            if fixed_value_size.is_none() {
695                last += size_of::<u32>() * num_pairs;
696            }
697            for x in &mut page[4..last] {
698                *x = 0xFF;
699            }
700        }
701        RawLeafBuilder {
702            page,
703            fixed_key_size,
704            fixed_value_size,
705            num_pairs,
706            provisioned_key_bytes: key_bytes,
707            pairs_written: 0,
708        }
709    }
710
711    fn value_end(&self, n: usize) -> usize {
712        if let Some(fixed) = self.fixed_value_size {
713            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
714        }
715        let mut offset = 4 + size_of::<u32>() * n;
716        if self.fixed_key_size.is_none() {
717            offset += size_of::<u32>() * self.num_pairs;
718        }
719        u32::from_le_bytes(
720            self.page[offset..(offset + size_of::<u32>())]
721                .try_into()
722                .unwrap(),
723        ) as usize
724    }
725
726    fn key_section_start(&self) -> usize {
727        let mut offset = 4;
728        if self.fixed_key_size.is_none() {
729            offset += size_of::<u32>() * self.num_pairs;
730        }
731        if self.fixed_value_size.is_none() {
732            offset += size_of::<u32>() * self.num_pairs;
733        }
734
735        offset
736    }
737
738    fn key_end(&self, n: usize) -> usize {
739        if let Some(fixed) = self.fixed_key_size {
740            return self.key_section_start() + fixed * (n + 1);
741        }
742        let offset = 4 + size_of::<u32>() * n;
743        u32::from_le_bytes(
744            self.page[offset..(offset + size_of::<u32>())]
745                .try_into()
746                .unwrap(),
747        ) as usize
748    }
749
750    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
751        if let Some(key_width) = self.fixed_key_size {
752            assert_eq!(key_width, key.len());
753        }
754        if let Some(value_width) = self.fixed_value_size {
755            assert_eq!(value_width, value.len());
756        }
757        let key_offset = if self.pairs_written == 0 {
758            self.key_section_start()
759        } else {
760            self.key_end(self.pairs_written - 1)
761        };
762        let value_offset = if self.pairs_written == 0 {
763            self.key_section_start() + self.provisioned_key_bytes
764        } else {
765            self.value_end(self.pairs_written - 1)
766        };
767
768        let n = self.pairs_written;
769        if self.fixed_key_size.is_none() {
770            let offset = 4 + size_of::<u32>() * n;
771            self.page[offset..(offset + size_of::<u32>())]
772                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
773        }
774        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
775        let written_key_len = key_offset + key.len() - self.key_section_start();
776        assert!(written_key_len <= self.provisioned_key_bytes);
777
778        if self.fixed_value_size.is_none() {
779            let mut offset = 4 + size_of::<u32>() * n;
780            if self.fixed_key_size.is_none() {
781                offset += size_of::<u32>() * self.num_pairs;
782            }
783            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
784                &u32::try_from(value_offset + value.len())
785                    .unwrap()
786                    .to_le_bytes(),
787            );
788        }
789        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
790        self.pairs_written += 1;
791    }
792}
793
794impl Drop for RawLeafBuilder<'_> {
795    fn drop(&mut self) {
796        if !thread::panicking() {
797            assert_eq!(self.pairs_written, self.num_pairs);
798            assert_eq!(
799                self.key_section_start() + self.provisioned_key_bytes,
800                self.key_end(self.num_pairs - 1)
801            );
802        }
803    }
804}
805
806pub(crate) struct LeafMutator<'b> {
807    page: &'b mut PageMut,
808    fixed_key_size: Option<usize>,
809    fixed_value_size: Option<usize>,
810}
811
812impl<'b> LeafMutator<'b> {
813    pub(crate) fn new(
814        page: &'b mut PageMut,
815        fixed_key_size: Option<usize>,
816        fixed_value_size: Option<usize>,
817    ) -> Self {
818        assert_eq!(page.memory_mut()[0], LEAF);
819        Self {
820            page,
821            fixed_key_size,
822            fixed_value_size,
823        }
824    }
825
826    pub(super) fn sufficient_insert_inplace_space(
827        page: &'_ PageImpl,
828        position: usize,
829        overwrite: bool,
830        fixed_key_size: Option<usize>,
831        fixed_value_size: Option<usize>,
832        new_key: &[u8],
833        new_value: &[u8],
834    ) -> bool {
835        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
836        if overwrite {
837            let remaining = page.memory().len() - accessor.total_length();
838            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
839                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
840            required_delta <= isize::try_from(remaining).unwrap()
841        } else {
842            // If this is a large page, only allow in-place appending to avoid write amplification
843            //
844            // Note: this check is also required to avoid inserting an unbounded number of small values
845            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
846            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
847                return false;
848            }
849            let remaining = page.memory().len() - accessor.total_length();
850            let mut required_delta = new_key.len() + new_value.len();
851            if fixed_key_size.is_none() {
852                required_delta += size_of::<u32>();
853            }
854            if fixed_value_size.is_none() {
855                required_delta += size_of::<u32>();
856            }
857            required_delta <= remaining
858        }
859    }
860
861    // Insert the given key, value pair at index i and shift all following pairs to the right
862    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
863        let accessor = LeafAccessor::new(
864            self.page.memory(),
865            self.fixed_key_size,
866            self.fixed_value_size,
867        );
868        let required_delta = if overwrite {
869            isize::try_from(key.len() + value.len()).unwrap()
870                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
871        } else {
872            let mut delta = key.len() + value.len();
873            if self.fixed_key_size.is_none() {
874                delta += size_of::<u32>();
875            }
876            if self.fixed_value_size.is_none() {
877                delta += size_of::<u32>();
878            }
879            delta.try_into().unwrap()
880        };
881        assert!(
882            isize::try_from(accessor.total_length()).unwrap() + required_delta
883                <= isize::try_from(self.page.memory().len()).unwrap()
884        );
885
886        let num_pairs = accessor.num_pairs();
887        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
888        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
889        let shift_index = if overwrite { i + 1 } else { i };
890        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
891        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
892        let existing_value_len = accessor
893            .value_range(i)
894            .map(|(start, end)| end - start)
895            .unwrap_or_default();
896
897        let value_delta = if overwrite {
898            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
899        } else {
900            value.len().try_into().unwrap()
901        };
902
903        // Update all the pointers
904        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
905        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
906            4
907        } else {
908            0
909        };
910        if !overwrite {
911            for j in 0..i {
912                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
913                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
914                    .try_into()
915                    .unwrap();
916                self.update_value_end(j, value_delta);
917            }
918        }
919        for j in i..num_pairs {
920            if overwrite {
921                self.update_value_end(j, value_delta);
922            } else {
923                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
924                    .try_into()
925                    .unwrap();
926                self.update_key_end(j, key_delta);
927                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
928                self.update_value_end(j, value_delta);
929            }
930        }
931
932        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
933        self.page.memory_mut()[2..4]
934            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
935
936        // Right shift the trailing values
937        let mut dest = if overwrite {
938            (isize::try_from(shift_value_start).unwrap() + value_delta)
939                .try_into()
940                .unwrap()
941        } else {
942            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
943        };
944        let start = shift_value_start;
945        let end = last_value_end;
946        self.page.memory_mut().copy_within(start..end, dest);
947
948        // Insert the value
949        let inserted_value_end: u32 = dest.try_into().unwrap();
950        dest -= value.len();
951        self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
952
953        if !overwrite {
954            // Right shift the trailing key data & preceding value data
955            let start = shift_key_start;
956            let end = shift_value_start;
957            dest -= end - start;
958            self.page.memory_mut().copy_within(start..end, dest);
959
960            // Insert the key
961            let inserted_key_end: u32 = dest.try_into().unwrap();
962            dest -= key.len();
963            self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
964
965            // Right shift the trailing value pointers & preceding key data
966            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
967            let end = shift_key_start;
968            dest -= end - start;
969            debug_assert_eq!(
970                dest,
971                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
972            );
973            self.page.memory_mut().copy_within(start..end, dest);
974
975            // Insert the value pointer
976            if self.fixed_value_size.is_none() {
977                dest -= size_of::<u32>();
978                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
979                    .copy_from_slice(&inserted_value_end.to_le_bytes());
980            }
981
982            // Right shift the trailing key pointers & preceding value pointers
983            let start = 4 + key_ptr_size * i;
984            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
985            dest -= end - start;
986            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
987            self.page.memory_mut().copy_within(start..end, dest);
988
989            // Insert the key pointer
990            if self.fixed_key_size.is_none() {
991                dest -= size_of::<u32>();
992                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
993                    .copy_from_slice(&inserted_key_end.to_le_bytes());
994            }
995            debug_assert_eq!(dest, 4 + key_ptr_size * i);
996        }
997    }
998
999    pub(super) fn remove(&mut self, i: usize) {
1000        let accessor = LeafAccessor::new(
1001            self.page.memory(),
1002            self.fixed_key_size,
1003            self.fixed_value_size,
1004        );
1005        let num_pairs = accessor.num_pairs();
1006        assert!(i < num_pairs);
1007        assert!(num_pairs > 1);
1008        let key_start = accessor.key_start(i).unwrap();
1009        let key_end = accessor.key_end(i).unwrap();
1010        let value_start = accessor.value_start(i).unwrap();
1011        let value_end = accessor.value_end(i).unwrap();
1012        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1013
1014        // Update all the pointers
1015        let key_ptr_size = if self.fixed_key_size.is_none() {
1016            size_of::<u32>()
1017        } else {
1018            0
1019        };
1020        let value_ptr_size = if self.fixed_value_size.is_none() {
1021            size_of::<u32>()
1022        } else {
1023            0
1024        };
1025        for j in 0..i {
1026            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1027            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1028                - isize::try_from(key_end - key_start).unwrap();
1029            self.update_value_end(j, value_delta);
1030        }
1031        for j in (i + 1)..num_pairs {
1032            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1033                - isize::try_from(key_end - key_start).unwrap();
1034            self.update_key_end(j, key_delta);
1035            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1036            self.update_value_end(j, value_delta);
1037        }
1038
1039        // Left shift all the pointers & data
1040
1041        let new_num_pairs = num_pairs - 1;
1042        self.page.memory_mut()[2..4]
1043            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1044        // Left shift the trailing key pointers & preceding value pointers
1045        let mut dest = 4 + key_ptr_size * i;
1046        // First trailing key pointer
1047        let start = 4 + key_ptr_size * (i + 1);
1048        // Last preceding value pointer
1049        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1050        self.page.memory_mut().copy_within(start..end, dest);
1051        dest += end - start;
1052        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1053
1054        // Left shift the trailing value pointers & preceding key data
1055        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1056        let end = key_start;
1057        self.page.memory_mut().copy_within(start..end, dest);
1058        dest += end - start;
1059
1060        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1061        debug_assert_eq!(
1062            dest,
1063            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1064        );
1065
1066        // Left shift the trailing key data & preceding value data
1067        let start = key_end;
1068        let end = value_start;
1069        self.page.memory_mut().copy_within(start..end, dest);
1070        dest += end - start;
1071
1072        // Left shift the trailing value data
1073        let preceding_data_len =
1074            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1075        debug_assert_eq!(
1076            dest,
1077            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1078        );
1079        let start = value_end;
1080        let end = last_value_end;
1081        self.page.memory_mut().copy_within(start..end, dest);
1082    }
1083
1084    fn update_key_end(&mut self, i: usize, delta: isize) {
1085        if self.fixed_key_size.is_some() {
1086            return;
1087        }
1088        let offset = 4 + size_of::<u32>() * i;
1089        let mut ptr = u32::from_le_bytes(
1090            self.page.memory()[offset..(offset + size_of::<u32>())]
1091                .try_into()
1092                .unwrap(),
1093        );
1094        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1095        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1096            .copy_from_slice(&ptr.to_le_bytes());
1097    }
1098
1099    fn update_value_end(&mut self, i: usize, delta: isize) {
1100        if self.fixed_value_size.is_some() {
1101            return;
1102        }
1103        let accessor = LeafAccessor::new(
1104            self.page.memory(),
1105            self.fixed_key_size,
1106            self.fixed_value_size,
1107        );
1108        let num_pairs = accessor.num_pairs();
1109        let mut offset = 4 + size_of::<u32>() * i;
1110        if self.fixed_key_size.is_none() {
1111            offset += size_of::<u32>() * num_pairs;
1112        }
1113        let mut ptr = u32::from_le_bytes(
1114            self.page.memory()[offset..(offset + size_of::<u32>())]
1115                .try_into()
1116                .unwrap(),
1117        );
1118        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1119        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1120            .copy_from_slice(&ptr.to_le_bytes());
1121    }
1122}
1123
1124// Provides a simple zero-copy way to access a branch page
1125// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1126pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1127    page: &'b T,
1128    num_keys: usize,
1129    fixed_key_size: Option<usize>,
1130    _page_lifetime: PhantomData<&'a ()>,
1131}
1132
1133impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1134    pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1135        debug_assert_eq!(page.memory()[0], BRANCH);
1136        let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1137        BranchAccessor {
1138            page,
1139            num_keys,
1140            fixed_key_size,
1141            _page_lifetime: Default::default(),
1142        }
1143    }
1144
1145    pub(super) fn print_node<K: Key>(&self) {
1146        eprint!(
1147            "Internal[ (page={:?}), child_0={:?}",
1148            self.page.get_page_number(),
1149            self.child_page(0).unwrap()
1150        );
1151        for i in 0..(self.count_children() - 1) {
1152            if let Some(child) = self.child_page(i + 1) {
1153                let key = self.key(i).unwrap();
1154                eprint!(" key_{i}={:?}", K::from_bytes(key));
1155                eprint!(" child_{}={child:?}", i + 1);
1156            }
1157        }
1158        eprint!("]");
1159    }
1160
1161    pub(crate) fn total_length(&self) -> usize {
1162        // Keys are stored at the end
1163        self.key_end(self.num_keys() - 1).unwrap()
1164    }
1165
1166    pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1167        let mut min_child = 0; // inclusive
1168        let mut max_child = self.num_keys(); // inclusive
1169        while min_child < max_child {
1170            let mid = (min_child + max_child) / 2;
1171            match K::compare(query, self.key(mid).unwrap()) {
1172                Ordering::Less => {
1173                    max_child = mid;
1174                }
1175                Ordering::Equal => {
1176                    return (mid, self.child_page(mid).unwrap());
1177                }
1178                Ordering::Greater => {
1179                    min_child = mid + 1;
1180                }
1181            }
1182        }
1183        debug_assert_eq!(min_child, max_child);
1184
1185        (min_child, self.child_page(min_child).unwrap())
1186    }
1187
1188    fn key_section_start(&self) -> usize {
1189        if self.fixed_key_size.is_none() {
1190            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1191                + size_of::<u32>() * self.num_keys()
1192        } else {
1193            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1194        }
1195    }
1196
1197    fn key_offset(&self, n: usize) -> usize {
1198        if n == 0 {
1199            self.key_section_start()
1200        } else {
1201            self.key_end(n - 1).unwrap()
1202        }
1203    }
1204
1205    fn key_end(&self, n: usize) -> Option<usize> {
1206        if let Some(fixed) = self.fixed_key_size {
1207            return Some(self.key_section_start() + fixed * (n + 1));
1208        }
1209        let offset = 8
1210            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1211            + size_of::<u32>() * n;
1212        Some(u32::from_le_bytes(
1213            self.page
1214                .memory()
1215                .get(offset..(offset + size_of::<u32>()))?
1216                .try_into()
1217                .unwrap(),
1218        ) as usize)
1219    }
1220
1221    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1222        if n >= self.num_keys() {
1223            return None;
1224        }
1225        let offset = self.key_offset(n);
1226        let end = self.key_end(n)?;
1227        Some(&self.page.memory()[offset..end])
1228    }
1229
1230    pub(crate) fn count_children(&self) -> usize {
1231        self.num_keys() + 1
1232    }
1233
1234    pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1235        if n >= self.count_children() {
1236            return None;
1237        }
1238
1239        let offset = 8 + size_of::<Checksum>() * n;
1240        Some(Checksum::from_le_bytes(
1241            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1242                .try_into()
1243                .unwrap(),
1244        ))
1245    }
1246
1247    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1248        if n >= self.count_children() {
1249            return None;
1250        }
1251
1252        let offset =
1253            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1254        Some(PageNumber::from_le_bytes(
1255            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1256                .try_into()
1257                .unwrap(),
1258        ))
1259    }
1260
1261    fn num_keys(&self) -> usize {
1262        self.num_keys
1263    }
1264}
1265
1266pub(super) struct BranchBuilder<'a, 'b> {
1267    children: Vec<(PageNumber, Checksum)>,
1268    keys: Vec<&'a [u8]>,
1269    total_key_bytes: usize,
1270    fixed_key_size: Option<usize>,
1271    mem: &'b TransactionalMemory,
1272}
1273
1274impl<'a, 'b> BranchBuilder<'a, 'b> {
1275    pub(super) fn new(
1276        mem: &'b TransactionalMemory,
1277        child_capacity: usize,
1278        fixed_key_size: Option<usize>,
1279    ) -> Self {
1280        Self {
1281            children: Vec::with_capacity(child_capacity),
1282            keys: Vec::with_capacity(child_capacity - 1),
1283            total_key_bytes: 0,
1284            fixed_key_size,
1285            mem,
1286        }
1287    }
1288
1289    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1290        self.children[index] = (child, checksum);
1291    }
1292
1293    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1294        self.children.push((child, checksum));
1295    }
1296
1297    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1298        self.keys.push(key);
1299        self.total_key_bytes += key.len();
1300    }
1301
1302    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1303        for i in 0..accessor.count_children() {
1304            let child = accessor.child_page(i).unwrap();
1305            let checksum = accessor.child_checksum(i).unwrap();
1306            self.push_child(child, checksum);
1307        }
1308        for i in 0..(accessor.count_children() - 1) {
1309            self.push_key(accessor.key(i).unwrap());
1310        }
1311    }
1312
1313    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1314        if self.children.len() > 1 {
1315            None
1316        } else {
1317            Some(self.children[0])
1318        }
1319    }
1320
1321    pub(super) fn build(self) -> Result<PageMut> {
1322        assert_eq!(self.children.len(), self.keys.len() + 1);
1323        let size = RawBranchBuilder::required_bytes(
1324            self.keys.len(),
1325            self.total_key_bytes,
1326            self.fixed_key_size,
1327        );
1328        let mut page = self.mem.allocate(size)?;
1329        let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1330        builder.write_first_page(self.children[0].0, self.children[0].1);
1331        for i in 1..self.children.len() {
1332            let key = &self.keys[i - 1];
1333            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1334        }
1335        drop(builder);
1336
1337        Ok(page)
1338    }
1339
1340    pub(super) fn should_split(&self) -> bool {
1341        let size = RawBranchBuilder::required_bytes(
1342            self.keys.len(),
1343            self.total_key_bytes,
1344            self.fixed_key_size,
1345        );
1346        size > self.mem.get_page_size() && self.keys.len() >= 3
1347    }
1348
1349    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1350        assert_eq!(self.children.len(), self.keys.len() + 1);
1351        assert!(self.keys.len() >= 3);
1352        let division = self.keys.len() / 2;
1353        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1354        let division_key = self.keys[division];
1355        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1356
1357        let size =
1358            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1359        let mut page1 = self.mem.allocate(size)?;
1360        let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1361        builder.write_first_page(self.children[0].0, self.children[0].1);
1362        for i in 0..division {
1363            let key = &self.keys[i];
1364            builder.write_nth_key(
1365                key.as_ref(),
1366                self.children[i + 1].0,
1367                self.children[i + 1].1,
1368                i,
1369            );
1370        }
1371        drop(builder);
1372
1373        let size = RawBranchBuilder::required_bytes(
1374            self.keys.len() - division - 1,
1375            second_split_key_len,
1376            self.fixed_key_size,
1377        );
1378        let mut page2 = self.mem.allocate(size)?;
1379        let mut builder = RawBranchBuilder::new(
1380            &mut page2,
1381            self.keys.len() - division - 1,
1382            self.fixed_key_size,
1383        );
1384        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1385        for i in (division + 1)..self.keys.len() {
1386            let key = &self.keys[i];
1387            builder.write_nth_key(
1388                key.as_ref(),
1389                self.children[i + 1].0,
1390                self.children[i + 1].1,
1391                i - division - 1,
1392            );
1393        }
1394        drop(builder);
1395
1396        Ok((page1, division_key, page2))
1397    }
1398}
1399
1400// Note the caller is responsible for ensuring that the buffer is large enough
1401// and rewriting all fields if any dynamically sized fields are written
1402// Layout is:
1403// 1 byte: type
1404// 1 byte: padding (padding to 16bits aligned)
1405// 2 bytes: num_keys (number of keys)
1406// 4 byte: padding (padding to 64bits aligned)
1407// repeating (num_keys + 1 times):
1408// 16 bytes: child page checksum
1409// repeating (num_keys + 1 times):
1410// 8 bytes: page number
1411// (optional) repeating (num_keys times):
1412// * 4 bytes: key end. Ending offset of the key, exclusive
1413// repeating (num_keys times):
1414// * n bytes: key data
1415pub(super) struct RawBranchBuilder<'b> {
1416    page: &'b mut PageMut,
1417    fixed_key_size: Option<usize>,
1418    num_keys: usize,
1419    keys_written: usize, // used for debugging
1420}
1421
1422impl<'b> RawBranchBuilder<'b> {
1423    pub(super) fn required_bytes(
1424        num_keys: usize,
1425        size_of_keys: usize,
1426        fixed_key_size: Option<usize>,
1427    ) -> usize {
1428        if fixed_key_size.is_none() {
1429            let fixed_size = 8
1430                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1431                + size_of::<u32>() * num_keys;
1432            size_of_keys + fixed_size
1433        } else {
1434            let fixed_size =
1435                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1436            size_of_keys + fixed_size
1437        }
1438    }
1439
1440    // Caller MUST write num_keys values
1441    pub(super) fn new(
1442        page: &'b mut PageMut,
1443        num_keys: usize,
1444        fixed_key_size: Option<usize>,
1445    ) -> Self {
1446        assert!(num_keys > 0);
1447        page.memory_mut()[0] = BRANCH;
1448        page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1449        #[cfg(debug_assertions)]
1450        {
1451            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1452            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1453            let last = 8
1454                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1455                + size_of::<u32>() * num_keys;
1456            for x in &mut page.memory_mut()[start..last] {
1457                *x = 0xFF;
1458            }
1459        }
1460        RawBranchBuilder {
1461            page,
1462            fixed_key_size,
1463            num_keys,
1464            keys_written: 0,
1465        }
1466    }
1467
1468    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1469        let offset = 8;
1470        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1471            .copy_from_slice(&checksum.to_le_bytes());
1472        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1473        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1474            .copy_from_slice(&page_number.to_le_bytes());
1475    }
1476
1477    fn key_section_start(&self) -> usize {
1478        let mut offset =
1479            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1480        if self.fixed_key_size.is_none() {
1481            offset += size_of::<u32>() * self.num_keys;
1482        }
1483
1484        offset
1485    }
1486
1487    fn key_end(&self, n: usize) -> usize {
1488        if let Some(fixed) = self.fixed_key_size {
1489            return self.key_section_start() + fixed * (n + 1);
1490        }
1491        let offset = 8
1492            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1493            + size_of::<u32>() * n;
1494        u32::from_le_bytes(
1495            self.page.memory()[offset..(offset + size_of::<u32>())]
1496                .try_into()
1497                .unwrap(),
1498        ) as usize
1499    }
1500
1501    // Write the nth key and page of values greater than this key, but less than or equal to the next
1502    // Caller must write keys & pages in increasing order
1503    pub(super) fn write_nth_key(
1504        &mut self,
1505        key: &[u8],
1506        page_number: PageNumber,
1507        checksum: Checksum,
1508        n: usize,
1509    ) {
1510        assert!(n < self.num_keys);
1511        assert_eq!(n, self.keys_written);
1512        self.keys_written += 1;
1513        let offset = 8 + size_of::<Checksum>() * (n + 1);
1514        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1515            .copy_from_slice(&checksum.to_le_bytes());
1516        let offset = 8
1517            + size_of::<Checksum>() * (self.num_keys + 1)
1518            + PageNumber::serialized_size() * (n + 1);
1519        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1520            .copy_from_slice(&page_number.to_le_bytes());
1521
1522        let data_offset = if n > 0 {
1523            self.key_end(n - 1)
1524        } else {
1525            self.key_section_start()
1526        };
1527        if self.fixed_key_size.is_none() {
1528            let offset = 8
1529                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1530                + size_of::<u32>() * n;
1531            self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1532                &u32::try_from(data_offset + key.len())
1533                    .unwrap()
1534                    .to_le_bytes(),
1535            );
1536        }
1537
1538        debug_assert!(data_offset > offset);
1539        self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1540    }
1541}
1542
1543impl Drop for RawBranchBuilder<'_> {
1544    fn drop(&mut self) {
1545        if !thread::panicking() {
1546            assert_eq!(self.keys_written, self.num_keys);
1547        }
1548    }
1549}
1550
1551pub(crate) struct BranchMutator<'b> {
1552    page: &'b mut PageMut,
1553}
1554
1555impl<'b> BranchMutator<'b> {
1556    pub(crate) fn new(page: &'b mut PageMut) -> Self {
1557        assert_eq!(page.memory()[0], BRANCH);
1558        Self { page }
1559    }
1560
1561    fn num_keys(&self) -> usize {
1562        u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1563    }
1564
1565    pub(crate) fn write_child_page(
1566        &mut self,
1567        i: usize,
1568        page_number: PageNumber,
1569        checksum: Checksum,
1570    ) {
1571        debug_assert!(i <= self.num_keys());
1572        let offset = 8 + size_of::<Checksum>() * i;
1573        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1574            .copy_from_slice(&checksum.to_le_bytes());
1575        let offset =
1576            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1577        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1578            .copy_from_slice(&page_number.to_le_bytes());
1579    }
1580}