1use crate::tree_store::page_store::{xxh3_checksum, Page, PageImpl, PageMut, TransactionalMemory};
2use crate::tree_store::PageNumber;
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::Arc;
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20 page: &T,
21 fixed_key_size: Option<usize>,
22 fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25 let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26 StorageError::Corrupted(format!(
27 "Leaf page {:?} corrupted. Number of pairs is zero",
28 page.get_page_number()
29 ))
30 })?;
31 let end = accessor.value_end(last_pair).ok_or_else(|| {
32 StorageError::Corrupted(format!(
33 "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34 page.get_page_number(),
35 last_pair,
36 ))
37 })?;
38 if end > page.memory().len() {
39 Err(StorageError::Corrupted(format!(
40 "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41 page.get_page_number(),
42 end,
43 page.memory().len()
44 )))
45 } else {
46 Ok(xxh3_checksum(&page.memory()[..end]))
47 }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51 page: &T,
52 fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54 let accessor = BranchAccessor::new(page, fixed_key_size);
55 let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56 StorageError::Corrupted(format!(
57 "Branch page {:?} corrupted. Number of keys is zero",
58 page.get_page_number()
59 ))
60 })?;
61 let end = accessor.key_end(last_key).ok_or_else(|| {
62 StorageError::Corrupted(format!(
63 "Branch page {:?} corrupted. Can't find offset for key {}",
64 page.get_page_number(),
65 last_key
66 ))
67 })?;
68 if end > page.memory().len() {
69 Err(StorageError::Corrupted(format!(
70 "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71 page.get_page_number(),
72 end,
73 page.memory().len()
74 )))
75 } else {
76 Ok(xxh3_checksum(&page.memory()[..end]))
77 }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82 pub(crate) root: PageNumber,
83 pub(crate) checksum: Checksum,
84 pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88 pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89 Self {
90 root,
91 checksum,
92 length,
93 }
94 }
95
96 pub(crate) const fn serialized_size() -> usize {
97 PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98 }
99
100 pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101 let root =
102 PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103 let mut offset = PageNumber::serialized_size();
104 let checksum = Checksum::from_le_bytes(
105 bytes[offset..(offset + size_of::<Checksum>())]
106 .try_into()
107 .unwrap(),
108 );
109 offset += size_of::<Checksum>();
110 let length = u64::from_le_bytes(
111 bytes[offset..(offset + size_of::<u64>())]
112 .try_into()
113 .unwrap(),
114 );
115
116 Self {
117 root,
118 checksum,
119 length,
120 }
121 }
122
123 pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124 let mut result = [0; Self::serialized_size()];
125 result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126 result[PageNumber::serialized_size()
127 ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128 .copy_from_slice(&self.checksum.to_le_bytes());
129 result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130 .copy_from_slice(&self.length.to_le_bytes());
131
132 result
133 }
134}
135
136enum OnDrop {
137 None,
138 RemoveEntry {
139 position: usize,
140 fixed_key_size: Option<usize>,
141 },
142}
143
144enum EitherPage {
145 Immutable(PageImpl),
146 Mutable(PageMut),
147 OwnedMemory(Vec<u8>),
148 ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152 fn memory(&self) -> &[u8] {
153 match self {
154 EitherPage::Immutable(page) => page.memory(),
155 EitherPage::Mutable(page) => page.memory(),
156 EitherPage::OwnedMemory(mem) => mem.as_slice(),
157 EitherPage::ArcMemory(mem) => mem,
158 }
159 }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163 page: EitherPage,
164 offset: usize,
165 len: usize,
166 on_drop: OnDrop,
167 _value_type: PhantomData<V>,
168 _lifetime: PhantomData<&'a ()>,
170}
171
172impl<'a, V: Value + 'static> AccessGuard<'a, V> {
173 pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174 Self {
175 page: EitherPage::Immutable(page),
176 offset: range.start,
177 len: range.len(),
178 on_drop: OnDrop::None,
179 _value_type: Default::default(),
180 _lifetime: Default::default(),
181 }
182 }
183
184 pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185 Self {
186 page: EitherPage::ArcMemory(page),
187 offset: range.start,
188 len: range.len(),
189 on_drop: OnDrop::None,
190 _value_type: Default::default(),
191 _lifetime: Default::default(),
192 }
193 }
194
195 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196 let len = value.len();
197 Self {
198 page: EitherPage::OwnedMemory(value),
199 offset: 0,
200 len,
201 on_drop: OnDrop::None,
202 _value_type: Default::default(),
203 _lifetime: Default::default(),
204 }
205 }
206
207 pub(super) fn remove_on_drop(
208 page: PageMut,
209 offset: usize,
210 len: usize,
211 position: usize,
212 fixed_key_size: Option<usize>,
213 ) -> Self {
214 Self {
215 page: EitherPage::Mutable(page),
216 offset,
217 len,
218 on_drop: OnDrop::RemoveEntry {
219 position,
220 fixed_key_size,
221 },
222 _value_type: Default::default(),
223 _lifetime: Default::default(),
224 }
225 }
226
227 pub fn value(&self) -> V::SelfType<'_> {
228 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229 }
230}
231
232impl<'a, V: Value + 'static> Drop for AccessGuard<'a, V> {
233 fn drop(&mut self) {
234 match self.on_drop {
235 OnDrop::None => {}
236 OnDrop::RemoveEntry {
237 position,
238 fixed_key_size,
239 } => {
240 if let EitherPage::Mutable(ref mut mut_page) = self.page {
241 let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242 mutator.remove(position);
243 } else if !thread::panicking() {
244 unreachable!();
245 }
246 }
247 }
248 }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252 page: PageMut,
253 offset: usize,
254 len: usize,
255 _value_type: PhantomData<V>,
256 _lifetime: PhantomData<&'a ()>,
258}
259
260impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
261 pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262 AccessGuardMut {
263 page,
264 offset,
265 len,
266 _value_type: Default::default(),
267 _lifetime: Default::default(),
268 }
269 }
270}
271
272impl<'a, V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'a, V> {
273 fn as_mut(&mut self) -> &mut V::BaseRefType {
274 V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275 }
276}
277
278pub struct EntryAccessor<'a> {
280 key: &'a [u8],
281 value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286 EntryAccessor { key, value }
287 }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291 pub(crate) fn key(&'b self) -> &'a [u8] {
292 self.key
293 }
294
295 pub(crate) fn value(&'b self) -> &'a [u8] {
296 self.value
297 }
298}
299
300pub(crate) struct LeafAccessor<'a> {
302 page: &'a [u8],
303 fixed_key_size: Option<usize>,
304 fixed_value_size: Option<usize>,
305 num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309 pub(crate) fn new(
310 page: &'a [u8],
311 fixed_key_size: Option<usize>,
312 fixed_value_size: Option<usize>,
313 ) -> Self {
314 debug_assert_eq!(page[0], LEAF);
315 let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316 LeafAccessor {
317 page,
318 fixed_key_size,
319 fixed_value_size,
320 num_pairs,
321 }
322 }
323
324 pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325 let mut i = 0;
326 while let Some(entry) = self.entry(i) {
327 eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328 if include_value {
329 eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330 }
331 i += 1;
332 }
333 }
334
335 pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336 let mut min_entry = 0;
338 let mut max_entry = self.num_pairs();
340 while min_entry < max_entry {
341 let mid = (min_entry + max_entry) / 2;
342 let key = self.key_unchecked(mid);
343 match K::compare(query, key) {
344 Ordering::Less => {
345 max_entry = mid;
346 }
347 Ordering::Equal => {
348 return (mid, true);
349 }
350 Ordering::Greater => {
351 min_entry = mid + 1;
352 }
353 }
354 }
355 debug_assert_eq!(min_entry, max_entry);
356 (min_entry, false)
357 }
358
359 pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360 let (entry, found) = self.position::<K>(query);
361 if found {
362 Some(entry)
363 } else {
364 None
365 }
366 }
367
368 fn key_section_start(&self) -> usize {
369 let mut offset = 4;
370 if self.fixed_key_size.is_none() {
371 offset += size_of::<u32>() * self.num_pairs;
372 }
373 if self.fixed_value_size.is_none() {
374 offset += size_of::<u32>() * self.num_pairs;
375 }
376
377 offset
378 }
379
380 fn key_start(&self, n: usize) -> Option<usize> {
381 if n == 0 {
382 Some(self.key_section_start())
383 } else {
384 self.key_end(n - 1)
385 }
386 }
387
388 fn key_end(&self, n: usize) -> Option<usize> {
389 if n >= self.num_pairs() {
390 None
391 } else {
392 if let Some(fixed) = self.fixed_key_size {
393 return Some(self.key_section_start() + fixed * (n + 1));
394 }
395 let offset = 4 + size_of::<u32>() * n;
396 let end = u32::from_le_bytes(
397 self.page
398 .get(offset..(offset + size_of::<u32>()))?
399 .try_into()
400 .unwrap(),
401 ) as usize;
402 Some(end)
403 }
404 }
405
406 fn value_start(&self, n: usize) -> Option<usize> {
407 if n == 0 {
408 self.key_end(self.num_pairs() - 1)
409 } else {
410 self.value_end(n - 1)
411 }
412 }
413
414 fn value_end(&self, n: usize) -> Option<usize> {
415 if n >= self.num_pairs() {
416 None
417 } else {
418 if let Some(fixed) = self.fixed_value_size {
419 return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
420 }
421 let mut offset = 4 + size_of::<u32>() * n;
422 if self.fixed_key_size.is_none() {
423 offset += size_of::<u32>() * self.num_pairs;
424 }
425 let end = u32::from_le_bytes(
426 self.page
427 .get(offset..(offset + size_of::<u32>()))?
428 .try_into()
429 .unwrap(),
430 ) as usize;
431 Some(end)
432 }
433 }
434
435 pub(crate) fn num_pairs(&self) -> usize {
436 self.num_pairs
437 }
438
439 pub(super) fn offset_of_first_value(&self) -> usize {
440 self.offset_of_value(0).unwrap()
441 }
442
443 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
444 self.value_start(n)
445 }
446
447 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
448 Some((self.value_start(n)?, self.value_end(n)?))
449 }
450
451 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
453 self.length_of_values(start, end) + self.length_of_keys(start, end)
454 }
455
456 fn length_of_values(&self, start: usize, end: usize) -> usize {
457 if end == 0 {
458 return 0;
459 }
460 let end_offset = self.value_end(end - 1).unwrap();
461 let start_offset = self.value_start(start).unwrap();
462 end_offset - start_offset
463 }
464
465 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
467 if end == 0 {
468 return 0;
469 }
470 let end_offset = self.key_end(end - 1).unwrap();
471 let start_offset = self.key_start(start).unwrap();
472 end_offset - start_offset
473 }
474
475 pub(crate) fn total_length(&self) -> usize {
476 self.value_end(self.num_pairs() - 1).unwrap()
478 }
479
480 fn key_unchecked(&self, n: usize) -> &[u8] {
481 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
482 }
483
484 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
485 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
486 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
487 Some(EntryAccessor::new(key, value))
488 }
489
490 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
491 let key = self.key_start(n)?..self.key_end(n)?;
492 let value = self.value_start(n)?..self.value_end(n)?;
493 Some((key, value))
494 }
495
496 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
497 self.entry(self.num_pairs() - 1).unwrap()
498 }
499}
500
501pub(super) struct LeafBuilder<'a, 'b> {
502 pairs: Vec<(&'a [u8], &'a [u8])>,
503 fixed_key_size: Option<usize>,
504 fixed_value_size: Option<usize>,
505 total_key_bytes: usize,
506 total_value_bytes: usize,
507 mem: &'b TransactionalMemory,
508}
509
510impl<'a, 'b> LeafBuilder<'a, 'b> {
511 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
512 RawLeafBuilder::required_bytes(
513 num_pairs,
514 keys_values_bytes,
515 self.fixed_key_size,
516 self.fixed_value_size,
517 )
518 }
519
520 pub(super) fn new(
521 mem: &'b TransactionalMemory,
522 capacity: usize,
523 fixed_key_size: Option<usize>,
524 fixed_value_size: Option<usize>,
525 ) -> Self {
526 Self {
527 pairs: Vec::with_capacity(capacity),
528 fixed_key_size,
529 fixed_value_size,
530 total_key_bytes: 0,
531 total_value_bytes: 0,
532 mem,
533 }
534 }
535
536 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
537 self.total_key_bytes += key.len();
538 self.total_value_bytes += value.len();
539 self.pairs.push((key, value));
540 }
541
542 pub(super) fn push_all_except(
543 &mut self,
544 accessor: &'a LeafAccessor<'_>,
545 except: Option<usize>,
546 ) {
547 for i in 0..accessor.num_pairs() {
548 if let Some(except) = except {
549 if except == i {
550 continue;
551 }
552 }
553 let entry = accessor.entry(i).unwrap();
554 self.push(entry.key(), entry.value());
555 }
556 }
557
558 pub(super) fn should_split(&self) -> bool {
559 let required_size = self.required_bytes(
560 self.pairs.len(),
561 self.total_key_bytes + self.total_value_bytes,
562 );
563 required_size > self.mem.get_page_size() && self.pairs.len() > 1
564 }
565
566 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
567 let total_size = self.total_key_bytes + self.total_value_bytes;
568 let mut division = 0;
569 let mut first_split_key_bytes = 0;
570 let mut first_split_value_bytes = 0;
571 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
572 first_split_key_bytes += key.len();
573 first_split_value_bytes += value.len();
574 division += 1;
575 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
576 break;
577 }
578 }
579
580 let required_size =
581 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
582 let mut page1 = self.mem.allocate(required_size)?;
583 let mut builder = RawLeafBuilder::new(
584 page1.memory_mut(),
585 division,
586 self.fixed_key_size,
587 self.fixed_value_size,
588 first_split_key_bytes,
589 );
590 for (key, value) in self.pairs.iter().take(division) {
591 builder.append(key, value);
592 }
593 drop(builder);
594
595 let required_size = self.required_bytes(
596 self.pairs.len() - division,
597 self.total_key_bytes + self.total_value_bytes
598 - first_split_key_bytes
599 - first_split_value_bytes,
600 );
601 let mut page2 = self.mem.allocate(required_size)?;
602 let mut builder = RawLeafBuilder::new(
603 page2.memory_mut(),
604 self.pairs.len() - division,
605 self.fixed_key_size,
606 self.fixed_value_size,
607 self.total_key_bytes - first_split_key_bytes,
608 );
609 for (key, value) in &self.pairs[division..] {
610 builder.append(key, value);
611 }
612 drop(builder);
613
614 Ok((page1, self.pairs[division - 1].0, page2))
615 }
616
617 pub(super) fn build(self) -> Result<PageMut> {
618 let required_size = self.required_bytes(
619 self.pairs.len(),
620 self.total_key_bytes + self.total_value_bytes,
621 );
622 let mut page = self.mem.allocate(required_size)?;
623 let mut builder = RawLeafBuilder::new(
624 page.memory_mut(),
625 self.pairs.len(),
626 self.fixed_key_size,
627 self.fixed_value_size,
628 self.total_key_bytes,
629 );
630 for (key, value) in self.pairs {
631 builder.append(key, value);
632 }
633 drop(builder);
634 Ok(page)
635 }
636}
637
638pub(crate) struct RawLeafBuilder<'a> {
653 page: &'a mut [u8],
654 fixed_key_size: Option<usize>,
655 fixed_value_size: Option<usize>,
656 num_pairs: usize,
657 provisioned_key_bytes: usize,
658 pairs_written: usize, }
660
661impl<'a> RawLeafBuilder<'a> {
662 pub(crate) fn required_bytes(
663 num_pairs: usize,
664 keys_values_bytes: usize,
665 key_size: Option<usize>,
666 value_size: Option<usize>,
667 ) -> usize {
668 let mut result = 4;
670 if key_size.is_none() {
672 result += num_pairs * size_of::<u32>();
673 }
674 if value_size.is_none() {
675 result += num_pairs * size_of::<u32>();
676 }
677 result += keys_values_bytes;
678
679 result
680 }
681
682 pub(crate) fn new(
683 page: &'a mut [u8],
684 num_pairs: usize,
685 fixed_key_size: Option<usize>,
686 fixed_value_size: Option<usize>,
687 key_bytes: usize,
688 ) -> Self {
689 page[0] = LEAF;
690 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
691 #[cfg(debug_assertions)]
692 {
693 let mut last = 4;
695 if fixed_key_size.is_none() {
696 last += size_of::<u32>() * num_pairs;
697 }
698 if fixed_value_size.is_none() {
699 last += size_of::<u32>() * num_pairs;
700 }
701 for x in &mut page[4..last] {
702 *x = 0xFF;
703 }
704 }
705 RawLeafBuilder {
706 page,
707 fixed_key_size,
708 fixed_value_size,
709 num_pairs,
710 provisioned_key_bytes: key_bytes,
711 pairs_written: 0,
712 }
713 }
714
715 fn value_end(&self, n: usize) -> usize {
716 if let Some(fixed) = self.fixed_value_size {
717 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
718 }
719 let mut offset = 4 + size_of::<u32>() * n;
720 if self.fixed_key_size.is_none() {
721 offset += size_of::<u32>() * self.num_pairs;
722 }
723 u32::from_le_bytes(
724 self.page[offset..(offset + size_of::<u32>())]
725 .try_into()
726 .unwrap(),
727 ) as usize
728 }
729
730 fn key_section_start(&self) -> usize {
731 let mut offset = 4;
732 if self.fixed_key_size.is_none() {
733 offset += size_of::<u32>() * self.num_pairs;
734 }
735 if self.fixed_value_size.is_none() {
736 offset += size_of::<u32>() * self.num_pairs;
737 }
738
739 offset
740 }
741
742 fn key_end(&self, n: usize) -> usize {
743 if let Some(fixed) = self.fixed_key_size {
744 return self.key_section_start() + fixed * (n + 1);
745 }
746 let offset = 4 + size_of::<u32>() * n;
747 u32::from_le_bytes(
748 self.page[offset..(offset + size_of::<u32>())]
749 .try_into()
750 .unwrap(),
751 ) as usize
752 }
753
754 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
755 if let Some(key_width) = self.fixed_key_size {
756 assert_eq!(key_width, key.len());
757 }
758 if let Some(value_width) = self.fixed_value_size {
759 assert_eq!(value_width, value.len());
760 }
761 let key_offset = if self.pairs_written == 0 {
762 self.key_section_start()
763 } else {
764 self.key_end(self.pairs_written - 1)
765 };
766 let value_offset = if self.pairs_written == 0 {
767 self.key_section_start() + self.provisioned_key_bytes
768 } else {
769 self.value_end(self.pairs_written - 1)
770 };
771
772 let n = self.pairs_written;
773 if self.fixed_key_size.is_none() {
774 let offset = 4 + size_of::<u32>() * n;
775 self.page[offset..(offset + size_of::<u32>())]
776 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
777 }
778 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
779 let written_key_len = key_offset + key.len() - self.key_section_start();
780 assert!(written_key_len <= self.provisioned_key_bytes);
781
782 if self.fixed_value_size.is_none() {
783 let mut offset = 4 + size_of::<u32>() * n;
784 if self.fixed_key_size.is_none() {
785 offset += size_of::<u32>() * self.num_pairs;
786 }
787 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
788 &u32::try_from(value_offset + value.len())
789 .unwrap()
790 .to_le_bytes(),
791 );
792 }
793 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
794 self.pairs_written += 1;
795 }
796}
797
798impl<'a> Drop for RawLeafBuilder<'a> {
799 fn drop(&mut self) {
800 if !thread::panicking() {
801 assert_eq!(self.pairs_written, self.num_pairs);
802 assert_eq!(
803 self.key_section_start() + self.provisioned_key_bytes,
804 self.key_end(self.num_pairs - 1)
805 );
806 }
807 }
808}
809
810pub(crate) struct LeafMutator<'b> {
811 page: &'b mut PageMut,
812 fixed_key_size: Option<usize>,
813 fixed_value_size: Option<usize>,
814}
815
816impl<'b> LeafMutator<'b> {
817 pub(crate) fn new(
818 page: &'b mut PageMut,
819 fixed_key_size: Option<usize>,
820 fixed_value_size: Option<usize>,
821 ) -> Self {
822 assert_eq!(page.memory_mut()[0], LEAF);
823 Self {
824 page,
825 fixed_key_size,
826 fixed_value_size,
827 }
828 }
829
830 pub(super) fn sufficient_insert_inplace_space(
831 page: &'_ PageImpl,
832 position: usize,
833 overwrite: bool,
834 fixed_key_size: Option<usize>,
835 fixed_value_size: Option<usize>,
836 new_key: &[u8],
837 new_value: &[u8],
838 ) -> bool {
839 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
840 if overwrite {
841 let remaining = page.memory().len() - accessor.total_length();
842 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
843 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
844 required_delta <= isize::try_from(remaining).unwrap()
845 } else {
846 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
851 return false;
852 }
853 let remaining = page.memory().len() - accessor.total_length();
854 let mut required_delta = new_key.len() + new_value.len();
855 if fixed_key_size.is_none() {
856 required_delta += size_of::<u32>();
857 }
858 if fixed_value_size.is_none() {
859 required_delta += size_of::<u32>();
860 }
861 required_delta <= remaining
862 }
863 }
864
865 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
867 let accessor = LeafAccessor::new(
868 self.page.memory(),
869 self.fixed_key_size,
870 self.fixed_value_size,
871 );
872 let required_delta = if overwrite {
873 isize::try_from(key.len() + value.len()).unwrap()
874 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
875 } else {
876 let mut delta = key.len() + value.len();
877 if self.fixed_key_size.is_none() {
878 delta += size_of::<u32>();
879 }
880 if self.fixed_value_size.is_none() {
881 delta += size_of::<u32>();
882 }
883 delta.try_into().unwrap()
884 };
885 assert!(
886 isize::try_from(accessor.total_length()).unwrap() + required_delta
887 <= isize::try_from(self.page.memory().len()).unwrap()
888 );
889
890 let num_pairs = accessor.num_pairs();
891 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
892 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
893 let shift_index = if overwrite { i + 1 } else { i };
894 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
895 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
896 let existing_value_len = accessor
897 .value_range(i)
898 .map(|(start, end)| end - start)
899 .unwrap_or_default();
900
901 let value_delta = if overwrite {
902 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
903 } else {
904 value.len().try_into().unwrap()
905 };
906
907 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
909 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
910 4
911 } else {
912 0
913 };
914 if !overwrite {
915 for j in 0..i {
916 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
917 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
918 .try_into()
919 .unwrap();
920 self.update_value_end(j, value_delta);
921 }
922 }
923 for j in i..num_pairs {
924 if overwrite {
925 self.update_value_end(j, value_delta);
926 } else {
927 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
928 .try_into()
929 .unwrap();
930 self.update_key_end(j, key_delta);
931 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
932 self.update_value_end(j, value_delta);
933 }
934 }
935
936 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
937 self.page.memory_mut()[2..4]
938 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
939
940 let mut dest = if overwrite {
942 (isize::try_from(shift_value_start).unwrap() + value_delta)
943 .try_into()
944 .unwrap()
945 } else {
946 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
947 };
948 let start = shift_value_start;
949 let end = last_value_end;
950 self.page.memory_mut().copy_within(start..end, dest);
951
952 let inserted_value_end: u32 = dest.try_into().unwrap();
954 dest -= value.len();
955 self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
956
957 if !overwrite {
958 let start = shift_key_start;
960 let end = shift_value_start;
961 dest -= end - start;
962 self.page.memory_mut().copy_within(start..end, dest);
963
964 let inserted_key_end: u32 = dest.try_into().unwrap();
966 dest -= key.len();
967 self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
968
969 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
971 let end = shift_key_start;
972 dest -= end - start;
973 debug_assert_eq!(
974 dest,
975 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
976 );
977 self.page.memory_mut().copy_within(start..end, dest);
978
979 if self.fixed_value_size.is_none() {
981 dest -= size_of::<u32>();
982 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
983 .copy_from_slice(&inserted_value_end.to_le_bytes());
984 }
985
986 let start = 4 + key_ptr_size * i;
988 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
989 dest -= end - start;
990 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
991 self.page.memory_mut().copy_within(start..end, dest);
992
993 if self.fixed_key_size.is_none() {
995 dest -= size_of::<u32>();
996 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
997 .copy_from_slice(&inserted_key_end.to_le_bytes());
998 }
999 debug_assert_eq!(dest, 4 + key_ptr_size * i);
1000 }
1001 }
1002
1003 pub(super) fn remove(&mut self, i: usize) {
1004 let accessor = LeafAccessor::new(
1005 self.page.memory(),
1006 self.fixed_key_size,
1007 self.fixed_value_size,
1008 );
1009 let num_pairs = accessor.num_pairs();
1010 assert!(i < num_pairs);
1011 assert!(num_pairs > 1);
1012 let key_start = accessor.key_start(i).unwrap();
1013 let key_end = accessor.key_end(i).unwrap();
1014 let value_start = accessor.value_start(i).unwrap();
1015 let value_end = accessor.value_end(i).unwrap();
1016 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1017
1018 let key_ptr_size = if self.fixed_key_size.is_none() {
1020 size_of::<u32>()
1021 } else {
1022 0
1023 };
1024 let value_ptr_size = if self.fixed_value_size.is_none() {
1025 size_of::<u32>()
1026 } else {
1027 0
1028 };
1029 for j in 0..i {
1030 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1031 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1032 - isize::try_from(key_end - key_start).unwrap();
1033 self.update_value_end(j, value_delta);
1034 }
1035 for j in (i + 1)..num_pairs {
1036 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1037 - isize::try_from(key_end - key_start).unwrap();
1038 self.update_key_end(j, key_delta);
1039 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1040 self.update_value_end(j, value_delta);
1041 }
1042
1043 let new_num_pairs = num_pairs - 1;
1046 self.page.memory_mut()[2..4]
1047 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1048 let mut dest = 4 + key_ptr_size * i;
1050 let start = 4 + key_ptr_size * (i + 1);
1052 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1054 self.page.memory_mut().copy_within(start..end, dest);
1055 dest += end - start;
1056 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1057
1058 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1060 let end = key_start;
1061 self.page.memory_mut().copy_within(start..end, dest);
1062 dest += end - start;
1063
1064 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1065 debug_assert_eq!(
1066 dest,
1067 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1068 );
1069
1070 let start = key_end;
1072 let end = value_start;
1073 self.page.memory_mut().copy_within(start..end, dest);
1074 dest += end - start;
1075
1076 let preceding_data_len =
1078 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1079 debug_assert_eq!(
1080 dest,
1081 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1082 );
1083 let start = value_end;
1084 let end = last_value_end;
1085 self.page.memory_mut().copy_within(start..end, dest);
1086 }
1087
1088 fn update_key_end(&mut self, i: usize, delta: isize) {
1089 if self.fixed_key_size.is_some() {
1090 return;
1091 }
1092 let offset = 4 + size_of::<u32>() * i;
1093 let mut ptr = u32::from_le_bytes(
1094 self.page.memory()[offset..(offset + size_of::<u32>())]
1095 .try_into()
1096 .unwrap(),
1097 );
1098 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1099 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1100 .copy_from_slice(&ptr.to_le_bytes());
1101 }
1102
1103 fn update_value_end(&mut self, i: usize, delta: isize) {
1104 if self.fixed_value_size.is_some() {
1105 return;
1106 }
1107 let accessor = LeafAccessor::new(
1108 self.page.memory(),
1109 self.fixed_key_size,
1110 self.fixed_value_size,
1111 );
1112 let num_pairs = accessor.num_pairs();
1113 let mut offset = 4 + size_of::<u32>() * i;
1114 if self.fixed_key_size.is_none() {
1115 offset += size_of::<u32>() * num_pairs;
1116 }
1117 let mut ptr = u32::from_le_bytes(
1118 self.page.memory()[offset..(offset + size_of::<u32>())]
1119 .try_into()
1120 .unwrap(),
1121 );
1122 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1123 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1124 .copy_from_slice(&ptr.to_le_bytes());
1125 }
1126}
1127
1128pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1131 page: &'b T,
1132 num_keys: usize,
1133 fixed_key_size: Option<usize>,
1134 _page_lifetime: PhantomData<&'a ()>,
1135}
1136
1137impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1138 pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1139 debug_assert_eq!(page.memory()[0], BRANCH);
1140 let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1141 BranchAccessor {
1142 page,
1143 num_keys,
1144 fixed_key_size,
1145 _page_lifetime: Default::default(),
1146 }
1147 }
1148
1149 pub(super) fn print_node<K: Key>(&self) {
1150 eprint!(
1151 "Internal[ (page={:?}), child_0={:?}",
1152 self.page.get_page_number(),
1153 self.child_page(0).unwrap()
1154 );
1155 for i in 0..(self.count_children() - 1) {
1156 if let Some(child) = self.child_page(i + 1) {
1157 let key = self.key(i).unwrap();
1158 eprint!(" key_{i}={:?}", K::from_bytes(key));
1159 eprint!(" child_{}={child:?}", i + 1);
1160 }
1161 }
1162 eprint!("]");
1163 }
1164
1165 pub(crate) fn total_length(&self) -> usize {
1166 self.key_end(self.num_keys() - 1).unwrap()
1168 }
1169
1170 pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1171 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1174 let mid = (min_child + max_child) / 2;
1175 match K::compare(query, self.key(mid).unwrap()) {
1176 Ordering::Less => {
1177 max_child = mid;
1178 }
1179 Ordering::Equal => {
1180 return (mid, self.child_page(mid).unwrap());
1181 }
1182 Ordering::Greater => {
1183 min_child = mid + 1;
1184 }
1185 }
1186 }
1187 debug_assert_eq!(min_child, max_child);
1188
1189 (min_child, self.child_page(min_child).unwrap())
1190 }
1191
1192 fn key_section_start(&self) -> usize {
1193 if self.fixed_key_size.is_none() {
1194 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1195 + size_of::<u32>() * self.num_keys()
1196 } else {
1197 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1198 }
1199 }
1200
1201 fn key_offset(&self, n: usize) -> usize {
1202 if n == 0 {
1203 self.key_section_start()
1204 } else {
1205 self.key_end(n - 1).unwrap()
1206 }
1207 }
1208
1209 fn key_end(&self, n: usize) -> Option<usize> {
1210 if let Some(fixed) = self.fixed_key_size {
1211 return Some(self.key_section_start() + fixed * (n + 1));
1212 }
1213 let offset = 8
1214 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1215 + size_of::<u32>() * n;
1216 Some(u32::from_le_bytes(
1217 self.page
1218 .memory()
1219 .get(offset..(offset + size_of::<u32>()))?
1220 .try_into()
1221 .unwrap(),
1222 ) as usize)
1223 }
1224
1225 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1226 if n >= self.num_keys() {
1227 return None;
1228 }
1229 let offset = self.key_offset(n);
1230 let end = self.key_end(n)?;
1231 Some(&self.page.memory()[offset..end])
1232 }
1233
1234 pub(crate) fn count_children(&self) -> usize {
1235 self.num_keys() + 1
1236 }
1237
1238 pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1239 if n >= self.count_children() {
1240 return None;
1241 }
1242
1243 let offset = 8 + size_of::<Checksum>() * n;
1244 Some(Checksum::from_le_bytes(
1245 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1246 .try_into()
1247 .unwrap(),
1248 ))
1249 }
1250
1251 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1252 if n >= self.count_children() {
1253 return None;
1254 }
1255
1256 let offset =
1257 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1258 Some(PageNumber::from_le_bytes(
1259 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1260 .try_into()
1261 .unwrap(),
1262 ))
1263 }
1264
1265 fn num_keys(&self) -> usize {
1266 self.num_keys
1267 }
1268}
1269
1270pub(super) struct BranchBuilder<'a, 'b> {
1271 children: Vec<(PageNumber, Checksum)>,
1272 keys: Vec<&'a [u8]>,
1273 total_key_bytes: usize,
1274 fixed_key_size: Option<usize>,
1275 mem: &'b TransactionalMemory,
1276}
1277
1278impl<'a, 'b> BranchBuilder<'a, 'b> {
1279 pub(super) fn new(
1280 mem: &'b TransactionalMemory,
1281 child_capacity: usize,
1282 fixed_key_size: Option<usize>,
1283 ) -> Self {
1284 Self {
1285 children: Vec::with_capacity(child_capacity),
1286 keys: Vec::with_capacity(child_capacity - 1),
1287 total_key_bytes: 0,
1288 fixed_key_size,
1289 mem,
1290 }
1291 }
1292
1293 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1294 self.children[index] = (child, checksum);
1295 }
1296
1297 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1298 self.children.push((child, checksum));
1299 }
1300
1301 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1302 self.keys.push(key);
1303 self.total_key_bytes += key.len();
1304 }
1305
1306 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1307 for i in 0..accessor.count_children() {
1308 let child = accessor.child_page(i).unwrap();
1309 let checksum = accessor.child_checksum(i).unwrap();
1310 self.push_child(child, checksum);
1311 }
1312 for i in 0..(accessor.count_children() - 1) {
1313 self.push_key(accessor.key(i).unwrap());
1314 }
1315 }
1316
1317 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1318 if self.children.len() > 1 {
1319 None
1320 } else {
1321 Some(self.children[0])
1322 }
1323 }
1324
1325 pub(super) fn build(self) -> Result<PageMut> {
1326 assert_eq!(self.children.len(), self.keys.len() + 1);
1327 let size = RawBranchBuilder::required_bytes(
1328 self.keys.len(),
1329 self.total_key_bytes,
1330 self.fixed_key_size,
1331 );
1332 let mut page = self.mem.allocate(size)?;
1333 let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1334 builder.write_first_page(self.children[0].0, self.children[0].1);
1335 for i in 1..self.children.len() {
1336 let key = &self.keys[i - 1];
1337 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1338 }
1339 drop(builder);
1340
1341 Ok(page)
1342 }
1343
1344 pub(super) fn should_split(&self) -> bool {
1345 let size = RawBranchBuilder::required_bytes(
1346 self.keys.len(),
1347 self.total_key_bytes,
1348 self.fixed_key_size,
1349 );
1350 size > self.mem.get_page_size() && self.keys.len() >= 3
1351 }
1352
1353 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1354 assert_eq!(self.children.len(), self.keys.len() + 1);
1355 assert!(self.keys.len() >= 3);
1356 let division = self.keys.len() / 2;
1357 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1358 let division_key = self.keys[division];
1359 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1360
1361 let size =
1362 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1363 let mut page1 = self.mem.allocate(size)?;
1364 let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1365 builder.write_first_page(self.children[0].0, self.children[0].1);
1366 for i in 0..division {
1367 let key = &self.keys[i];
1368 builder.write_nth_key(
1369 key.as_ref(),
1370 self.children[i + 1].0,
1371 self.children[i + 1].1,
1372 i,
1373 );
1374 }
1375 drop(builder);
1376
1377 let size = RawBranchBuilder::required_bytes(
1378 self.keys.len() - division - 1,
1379 second_split_key_len,
1380 self.fixed_key_size,
1381 );
1382 let mut page2 = self.mem.allocate(size)?;
1383 let mut builder = RawBranchBuilder::new(
1384 &mut page2,
1385 self.keys.len() - division - 1,
1386 self.fixed_key_size,
1387 );
1388 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1389 for i in (division + 1)..self.keys.len() {
1390 let key = &self.keys[i];
1391 builder.write_nth_key(
1392 key.as_ref(),
1393 self.children[i + 1].0,
1394 self.children[i + 1].1,
1395 i - division - 1,
1396 );
1397 }
1398 drop(builder);
1399
1400 Ok((page1, division_key, page2))
1401 }
1402}
1403
1404pub(super) struct RawBranchBuilder<'b> {
1420 page: &'b mut PageMut,
1421 fixed_key_size: Option<usize>,
1422 num_keys: usize,
1423 keys_written: usize, }
1425
1426impl<'b> RawBranchBuilder<'b> {
1427 pub(super) fn required_bytes(
1428 num_keys: usize,
1429 size_of_keys: usize,
1430 fixed_key_size: Option<usize>,
1431 ) -> usize {
1432 if fixed_key_size.is_none() {
1433 let fixed_size = 8
1434 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1435 + size_of::<u32>() * num_keys;
1436 size_of_keys + fixed_size
1437 } else {
1438 let fixed_size =
1439 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1440 size_of_keys + fixed_size
1441 }
1442 }
1443
1444 pub(super) fn new(
1446 page: &'b mut PageMut,
1447 num_keys: usize,
1448 fixed_key_size: Option<usize>,
1449 ) -> Self {
1450 assert!(num_keys > 0);
1451 page.memory_mut()[0] = BRANCH;
1452 page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1453 #[cfg(debug_assertions)]
1454 {
1455 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1457 let last = 8
1458 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1459 + size_of::<u32>() * num_keys;
1460 for x in &mut page.memory_mut()[start..last] {
1461 *x = 0xFF;
1462 }
1463 }
1464 RawBranchBuilder {
1465 page,
1466 fixed_key_size,
1467 num_keys,
1468 keys_written: 0,
1469 }
1470 }
1471
1472 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1473 let offset = 8;
1474 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1475 .copy_from_slice(&checksum.to_le_bytes());
1476 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1477 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1478 .copy_from_slice(&page_number.to_le_bytes());
1479 }
1480
1481 fn key_section_start(&self) -> usize {
1482 let mut offset =
1483 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1484 if self.fixed_key_size.is_none() {
1485 offset += size_of::<u32>() * self.num_keys;
1486 }
1487
1488 offset
1489 }
1490
1491 fn key_end(&self, n: usize) -> usize {
1492 if let Some(fixed) = self.fixed_key_size {
1493 return self.key_section_start() + fixed * (n + 1);
1494 }
1495 let offset = 8
1496 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1497 + size_of::<u32>() * n;
1498 u32::from_le_bytes(
1499 self.page.memory()[offset..(offset + size_of::<u32>())]
1500 .try_into()
1501 .unwrap(),
1502 ) as usize
1503 }
1504
1505 pub(super) fn write_nth_key(
1508 &mut self,
1509 key: &[u8],
1510 page_number: PageNumber,
1511 checksum: Checksum,
1512 n: usize,
1513 ) {
1514 assert!(n < self.num_keys);
1515 assert_eq!(n, self.keys_written);
1516 self.keys_written += 1;
1517 let offset = 8 + size_of::<Checksum>() * (n + 1);
1518 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1519 .copy_from_slice(&checksum.to_le_bytes());
1520 let offset = 8
1521 + size_of::<Checksum>() * (self.num_keys + 1)
1522 + PageNumber::serialized_size() * (n + 1);
1523 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1524 .copy_from_slice(&page_number.to_le_bytes());
1525
1526 let data_offset = if n > 0 {
1527 self.key_end(n - 1)
1528 } else {
1529 self.key_section_start()
1530 };
1531 if self.fixed_key_size.is_none() {
1532 let offset = 8
1533 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1534 + size_of::<u32>() * n;
1535 self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1536 &u32::try_from(data_offset + key.len())
1537 .unwrap()
1538 .to_le_bytes(),
1539 );
1540 }
1541
1542 debug_assert!(data_offset > offset);
1543 self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1544 }
1545}
1546
1547impl<'b> Drop for RawBranchBuilder<'b> {
1548 fn drop(&mut self) {
1549 if !thread::panicking() {
1550 assert_eq!(self.keys_written, self.num_keys);
1551 }
1552 }
1553}
1554
1555pub(crate) struct BranchMutator<'b> {
1556 page: &'b mut PageMut,
1557}
1558
1559impl<'b> BranchMutator<'b> {
1560 pub(crate) fn new(page: &'b mut PageMut) -> Self {
1561 assert_eq!(page.memory()[0], BRANCH);
1562 Self { page }
1563 }
1564
1565 fn num_keys(&self) -> usize {
1566 u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1567 }
1568
1569 pub(crate) fn write_child_page(
1570 &mut self,
1571 i: usize,
1572 page_number: PageNumber,
1573 checksum: Checksum,
1574 ) {
1575 debug_assert!(i <= self.num_keys());
1576 let offset = 8 + size_of::<Checksum>() * i;
1577 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1578 .copy_from_slice(&checksum.to_le_bytes());
1579 let offset =
1580 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1581 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1582 .copy_from_slice(&page_number.to_le_bytes());
1583 }
1584}