1use crate::tree_store::PageNumber;
2use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
3use crate::types::{Key, MutInPlaceValue, Value};
4use crate::{Result, StorageError};
5use std::cmp::Ordering;
6use std::marker::PhantomData;
7use std::mem::size_of;
8use std::ops::Range;
9use std::sync::Arc;
10use std::thread;
11
12pub(crate) const LEAF: u8 = 1;
13pub(crate) const BRANCH: u8 = 2;
14
15pub(crate) type Checksum = u128;
16pub(crate) const DEFERRED: Checksum = 999;
18
19pub(super) fn leaf_checksum<T: Page>(
20 page: &T,
21 fixed_key_size: Option<usize>,
22 fixed_value_size: Option<usize>,
23) -> Result<Checksum, StorageError> {
24 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
25 let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
26 StorageError::Corrupted(format!(
27 "Leaf page {:?} corrupted. Number of pairs is zero",
28 page.get_page_number()
29 ))
30 })?;
31 let end = accessor.value_end(last_pair).ok_or_else(|| {
32 StorageError::Corrupted(format!(
33 "Leaf page {:?} corrupted. Couldn't find offset for pair {}",
34 page.get_page_number(),
35 last_pair,
36 ))
37 })?;
38 if end > page.memory().len() {
39 Err(StorageError::Corrupted(format!(
40 "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
41 page.get_page_number(),
42 end,
43 page.memory().len()
44 )))
45 } else {
46 Ok(xxh3_checksum(&page.memory()[..end]))
47 }
48}
49
50pub(super) fn branch_checksum<T: Page>(
51 page: &T,
52 fixed_key_size: Option<usize>,
53) -> Result<Checksum, StorageError> {
54 let accessor = BranchAccessor::new(page, fixed_key_size);
55 let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
56 StorageError::Corrupted(format!(
57 "Branch page {:?} corrupted. Number of keys is zero",
58 page.get_page_number()
59 ))
60 })?;
61 let end = accessor.key_end(last_key).ok_or_else(|| {
62 StorageError::Corrupted(format!(
63 "Branch page {:?} corrupted. Can't find offset for key {}",
64 page.get_page_number(),
65 last_key
66 ))
67 })?;
68 if end > page.memory().len() {
69 Err(StorageError::Corrupted(format!(
70 "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
71 page.get_page_number(),
72 end,
73 page.memory().len()
74 )))
75 } else {
76 Ok(xxh3_checksum(&page.memory()[..end]))
77 }
78}
79
80#[derive(Debug, Copy, Clone, Eq, PartialEq)]
81pub(crate) struct BtreeHeader {
82 pub(crate) root: PageNumber,
83 pub(crate) checksum: Checksum,
84 pub(crate) length: u64,
85}
86
87impl BtreeHeader {
88 pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
89 Self {
90 root,
91 checksum,
92 length,
93 }
94 }
95
96 pub(crate) const fn serialized_size() -> usize {
97 PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
98 }
99
100 pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
101 let root =
102 PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
103 let mut offset = PageNumber::serialized_size();
104 let checksum = Checksum::from_le_bytes(
105 bytes[offset..(offset + size_of::<Checksum>())]
106 .try_into()
107 .unwrap(),
108 );
109 offset += size_of::<Checksum>();
110 let length = u64::from_le_bytes(
111 bytes[offset..(offset + size_of::<u64>())]
112 .try_into()
113 .unwrap(),
114 );
115
116 Self {
117 root,
118 checksum,
119 length,
120 }
121 }
122
123 pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
124 let mut result = [0; Self::serialized_size()];
125 result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
126 result[PageNumber::serialized_size()
127 ..(PageNumber::serialized_size() + size_of::<Checksum>())]
128 .copy_from_slice(&self.checksum.to_le_bytes());
129 result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
130 .copy_from_slice(&self.length.to_le_bytes());
131
132 result
133 }
134}
135
136enum OnDrop {
137 None,
138 RemoveEntry {
139 position: usize,
140 fixed_key_size: Option<usize>,
141 },
142}
143
144enum EitherPage {
145 Immutable(PageImpl),
146 Mutable(PageMut),
147 OwnedMemory(Vec<u8>),
148 ArcMemory(Arc<[u8]>),
149}
150
151impl EitherPage {
152 fn memory(&self) -> &[u8] {
153 match self {
154 EitherPage::Immutable(page) => page.memory(),
155 EitherPage::Mutable(page) => page.memory(),
156 EitherPage::OwnedMemory(mem) => mem.as_slice(),
157 EitherPage::ArcMemory(mem) => mem,
158 }
159 }
160}
161
162pub struct AccessGuard<'a, V: Value + 'static> {
163 page: EitherPage,
164 offset: usize,
165 len: usize,
166 on_drop: OnDrop,
167 _value_type: PhantomData<V>,
168 _lifetime: PhantomData<&'a ()>,
170}
171
172impl<V: Value + 'static> AccessGuard<'_, V> {
173 pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
174 Self {
175 page: EitherPage::Immutable(page),
176 offset: range.start,
177 len: range.len(),
178 on_drop: OnDrop::None,
179 _value_type: Default::default(),
180 _lifetime: Default::default(),
181 }
182 }
183
184 pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
185 Self {
186 page: EitherPage::ArcMemory(page),
187 offset: range.start,
188 len: range.len(),
189 on_drop: OnDrop::None,
190 _value_type: Default::default(),
191 _lifetime: Default::default(),
192 }
193 }
194
195 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
196 let len = value.len();
197 Self {
198 page: EitherPage::OwnedMemory(value),
199 offset: 0,
200 len,
201 on_drop: OnDrop::None,
202 _value_type: Default::default(),
203 _lifetime: Default::default(),
204 }
205 }
206
207 pub(super) fn remove_on_drop(
208 page: PageMut,
209 offset: usize,
210 len: usize,
211 position: usize,
212 fixed_key_size: Option<usize>,
213 ) -> Self {
214 Self {
215 page: EitherPage::Mutable(page),
216 offset,
217 len,
218 on_drop: OnDrop::RemoveEntry {
219 position,
220 fixed_key_size,
221 },
222 _value_type: Default::default(),
223 _lifetime: Default::default(),
224 }
225 }
226
227 pub fn value(&self) -> V::SelfType<'_> {
228 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
229 }
230}
231
232impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
233 fn drop(&mut self) {
234 match self.on_drop {
235 OnDrop::None => {}
236 OnDrop::RemoveEntry {
237 position,
238 fixed_key_size,
239 } => {
240 if let EitherPage::Mutable(ref mut mut_page) = self.page {
241 let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
242 mutator.remove(position);
243 } else if !thread::panicking() {
244 unreachable!();
245 }
246 }
247 }
248 }
249}
250
251pub struct AccessGuardMut<'a, V: Value + 'static> {
252 page: PageMut,
253 offset: usize,
254 len: usize,
255 _value_type: PhantomData<V>,
256 _lifetime: PhantomData<&'a ()>,
258}
259
260impl<V: Value + 'static> AccessGuardMut<'_, V> {
261 pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
262 AccessGuardMut {
263 page,
264 offset,
265 len,
266 _value_type: Default::default(),
267 _lifetime: Default::default(),
268 }
269 }
270}
271
272impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMut<'_, V> {
273 fn as_mut(&mut self) -> &mut V::BaseRefType {
274 V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
275 }
276}
277
278pub struct EntryAccessor<'a> {
280 key: &'a [u8],
281 value: &'a [u8],
282}
283
284impl<'a> EntryAccessor<'a> {
285 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
286 EntryAccessor { key, value }
287 }
288}
289
290impl<'a: 'b, 'b> EntryAccessor<'a> {
291 pub(crate) fn key(&'b self) -> &'a [u8] {
292 self.key
293 }
294
295 pub(crate) fn value(&'b self) -> &'a [u8] {
296 self.value
297 }
298}
299
300pub(crate) struct LeafAccessor<'a> {
302 page: &'a [u8],
303 fixed_key_size: Option<usize>,
304 fixed_value_size: Option<usize>,
305 num_pairs: usize,
306}
307
308impl<'a> LeafAccessor<'a> {
309 pub(crate) fn new(
310 page: &'a [u8],
311 fixed_key_size: Option<usize>,
312 fixed_value_size: Option<usize>,
313 ) -> Self {
314 debug_assert_eq!(page[0], LEAF);
315 let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
316 LeafAccessor {
317 page,
318 fixed_key_size,
319 fixed_value_size,
320 num_pairs,
321 }
322 }
323
324 pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
325 let mut i = 0;
326 while let Some(entry) = self.entry(i) {
327 eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
328 if include_value {
329 eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
330 }
331 i += 1;
332 }
333 }
334
335 pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
336 let mut min_entry = 0;
338 let mut max_entry = self.num_pairs();
340 while min_entry < max_entry {
341 let mid = (min_entry + max_entry) / 2;
342 let key = self.key_unchecked(mid);
343 match K::compare(query, key) {
344 Ordering::Less => {
345 max_entry = mid;
346 }
347 Ordering::Equal => {
348 return (mid, true);
349 }
350 Ordering::Greater => {
351 min_entry = mid + 1;
352 }
353 }
354 }
355 debug_assert_eq!(min_entry, max_entry);
356 (min_entry, false)
357 }
358
359 pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
360 let (entry, found) = self.position::<K>(query);
361 if found { Some(entry) } else { None }
362 }
363
364 fn key_section_start(&self) -> usize {
365 let mut offset = 4;
366 if self.fixed_key_size.is_none() {
367 offset += size_of::<u32>() * self.num_pairs;
368 }
369 if self.fixed_value_size.is_none() {
370 offset += size_of::<u32>() * self.num_pairs;
371 }
372
373 offset
374 }
375
376 fn key_start(&self, n: usize) -> Option<usize> {
377 if n == 0 {
378 Some(self.key_section_start())
379 } else {
380 self.key_end(n - 1)
381 }
382 }
383
384 fn key_end(&self, n: usize) -> Option<usize> {
385 if n >= self.num_pairs() {
386 None
387 } else {
388 if let Some(fixed) = self.fixed_key_size {
389 return Some(self.key_section_start() + fixed * (n + 1));
390 }
391 let offset = 4 + size_of::<u32>() * n;
392 let end = u32::from_le_bytes(
393 self.page
394 .get(offset..(offset + size_of::<u32>()))?
395 .try_into()
396 .unwrap(),
397 ) as usize;
398 Some(end)
399 }
400 }
401
402 fn value_start(&self, n: usize) -> Option<usize> {
403 if n == 0 {
404 self.key_end(self.num_pairs() - 1)
405 } else {
406 self.value_end(n - 1)
407 }
408 }
409
410 fn value_end(&self, n: usize) -> Option<usize> {
411 if n >= self.num_pairs() {
412 None
413 } else {
414 if let Some(fixed) = self.fixed_value_size {
415 return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
416 }
417 let mut offset = 4 + size_of::<u32>() * n;
418 if self.fixed_key_size.is_none() {
419 offset += size_of::<u32>() * self.num_pairs;
420 }
421 let end = u32::from_le_bytes(
422 self.page
423 .get(offset..(offset + size_of::<u32>()))?
424 .try_into()
425 .unwrap(),
426 ) as usize;
427 Some(end)
428 }
429 }
430
431 pub(crate) fn num_pairs(&self) -> usize {
432 self.num_pairs
433 }
434
435 pub(super) fn offset_of_first_value(&self) -> usize {
436 self.offset_of_value(0).unwrap()
437 }
438
439 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
440 self.value_start(n)
441 }
442
443 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
444 Some((self.value_start(n)?, self.value_end(n)?))
445 }
446
447 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
449 self.length_of_values(start, end) + self.length_of_keys(start, end)
450 }
451
452 fn length_of_values(&self, start: usize, end: usize) -> usize {
453 if end == 0 {
454 return 0;
455 }
456 let end_offset = self.value_end(end - 1).unwrap();
457 let start_offset = self.value_start(start).unwrap();
458 end_offset - start_offset
459 }
460
461 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
463 if end == 0 {
464 return 0;
465 }
466 let end_offset = self.key_end(end - 1).unwrap();
467 let start_offset = self.key_start(start).unwrap();
468 end_offset - start_offset
469 }
470
471 pub(crate) fn total_length(&self) -> usize {
472 self.value_end(self.num_pairs() - 1).unwrap()
474 }
475
476 fn key_unchecked(&self, n: usize) -> &[u8] {
477 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
478 }
479
480 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
481 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
482 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
483 Some(EntryAccessor::new(key, value))
484 }
485
486 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
487 let key = self.key_start(n)?..self.key_end(n)?;
488 let value = self.value_start(n)?..self.value_end(n)?;
489 Some((key, value))
490 }
491
492 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
493 self.entry(self.num_pairs() - 1).unwrap()
494 }
495}
496
497pub(super) struct LeafBuilder<'a, 'b> {
498 pairs: Vec<(&'a [u8], &'a [u8])>,
499 fixed_key_size: Option<usize>,
500 fixed_value_size: Option<usize>,
501 total_key_bytes: usize,
502 total_value_bytes: usize,
503 mem: &'b TransactionalMemory,
504}
505
506impl<'a, 'b> LeafBuilder<'a, 'b> {
507 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
508 RawLeafBuilder::required_bytes(
509 num_pairs,
510 keys_values_bytes,
511 self.fixed_key_size,
512 self.fixed_value_size,
513 )
514 }
515
516 pub(super) fn new(
517 mem: &'b TransactionalMemory,
518 capacity: usize,
519 fixed_key_size: Option<usize>,
520 fixed_value_size: Option<usize>,
521 ) -> Self {
522 Self {
523 pairs: Vec::with_capacity(capacity),
524 fixed_key_size,
525 fixed_value_size,
526 total_key_bytes: 0,
527 total_value_bytes: 0,
528 mem,
529 }
530 }
531
532 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
533 self.total_key_bytes += key.len();
534 self.total_value_bytes += value.len();
535 self.pairs.push((key, value));
536 }
537
538 pub(super) fn push_all_except(
539 &mut self,
540 accessor: &'a LeafAccessor<'_>,
541 except: Option<usize>,
542 ) {
543 for i in 0..accessor.num_pairs() {
544 if let Some(except) = except {
545 if except == i {
546 continue;
547 }
548 }
549 let entry = accessor.entry(i).unwrap();
550 self.push(entry.key(), entry.value());
551 }
552 }
553
554 pub(super) fn should_split(&self) -> bool {
555 let required_size = self.required_bytes(
556 self.pairs.len(),
557 self.total_key_bytes + self.total_value_bytes,
558 );
559 required_size > self.mem.get_page_size() && self.pairs.len() > 1
560 }
561
562 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
563 let total_size = self.total_key_bytes + self.total_value_bytes;
564 let mut division = 0;
565 let mut first_split_key_bytes = 0;
566 let mut first_split_value_bytes = 0;
567 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
568 first_split_key_bytes += key.len();
569 first_split_value_bytes += value.len();
570 division += 1;
571 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
572 break;
573 }
574 }
575
576 let required_size =
577 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
578 let mut page1 = self.mem.allocate(required_size)?;
579 let mut builder = RawLeafBuilder::new(
580 page1.memory_mut(),
581 division,
582 self.fixed_key_size,
583 self.fixed_value_size,
584 first_split_key_bytes,
585 );
586 for (key, value) in self.pairs.iter().take(division) {
587 builder.append(key, value);
588 }
589 drop(builder);
590
591 let required_size = self.required_bytes(
592 self.pairs.len() - division,
593 self.total_key_bytes + self.total_value_bytes
594 - first_split_key_bytes
595 - first_split_value_bytes,
596 );
597 let mut page2 = self.mem.allocate(required_size)?;
598 let mut builder = RawLeafBuilder::new(
599 page2.memory_mut(),
600 self.pairs.len() - division,
601 self.fixed_key_size,
602 self.fixed_value_size,
603 self.total_key_bytes - first_split_key_bytes,
604 );
605 for (key, value) in &self.pairs[division..] {
606 builder.append(key, value);
607 }
608 drop(builder);
609
610 Ok((page1, self.pairs[division - 1].0, page2))
611 }
612
613 pub(super) fn build(self) -> Result<PageMut> {
614 let required_size = self.required_bytes(
615 self.pairs.len(),
616 self.total_key_bytes + self.total_value_bytes,
617 );
618 let mut page = self.mem.allocate(required_size)?;
619 let mut builder = RawLeafBuilder::new(
620 page.memory_mut(),
621 self.pairs.len(),
622 self.fixed_key_size,
623 self.fixed_value_size,
624 self.total_key_bytes,
625 );
626 for (key, value) in self.pairs {
627 builder.append(key, value);
628 }
629 drop(builder);
630 Ok(page)
631 }
632}
633
634pub(crate) struct RawLeafBuilder<'a> {
649 page: &'a mut [u8],
650 fixed_key_size: Option<usize>,
651 fixed_value_size: Option<usize>,
652 num_pairs: usize,
653 provisioned_key_bytes: usize,
654 pairs_written: usize, }
656
657impl<'a> RawLeafBuilder<'a> {
658 pub(crate) fn required_bytes(
659 num_pairs: usize,
660 keys_values_bytes: usize,
661 key_size: Option<usize>,
662 value_size: Option<usize>,
663 ) -> usize {
664 let mut result = 4;
666 if key_size.is_none() {
668 result += num_pairs * size_of::<u32>();
669 }
670 if value_size.is_none() {
671 result += num_pairs * size_of::<u32>();
672 }
673 result += keys_values_bytes;
674
675 result
676 }
677
678 pub(crate) fn new(
679 page: &'a mut [u8],
680 num_pairs: usize,
681 fixed_key_size: Option<usize>,
682 fixed_value_size: Option<usize>,
683 key_bytes: usize,
684 ) -> Self {
685 page[0] = LEAF;
686 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
687 #[cfg(debug_assertions)]
688 {
689 let mut last = 4;
691 if fixed_key_size.is_none() {
692 last += size_of::<u32>() * num_pairs;
693 }
694 if fixed_value_size.is_none() {
695 last += size_of::<u32>() * num_pairs;
696 }
697 for x in &mut page[4..last] {
698 *x = 0xFF;
699 }
700 }
701 RawLeafBuilder {
702 page,
703 fixed_key_size,
704 fixed_value_size,
705 num_pairs,
706 provisioned_key_bytes: key_bytes,
707 pairs_written: 0,
708 }
709 }
710
711 fn value_end(&self, n: usize) -> usize {
712 if let Some(fixed) = self.fixed_value_size {
713 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
714 }
715 let mut offset = 4 + size_of::<u32>() * n;
716 if self.fixed_key_size.is_none() {
717 offset += size_of::<u32>() * self.num_pairs;
718 }
719 u32::from_le_bytes(
720 self.page[offset..(offset + size_of::<u32>())]
721 .try_into()
722 .unwrap(),
723 ) as usize
724 }
725
726 fn key_section_start(&self) -> usize {
727 let mut offset = 4;
728 if self.fixed_key_size.is_none() {
729 offset += size_of::<u32>() * self.num_pairs;
730 }
731 if self.fixed_value_size.is_none() {
732 offset += size_of::<u32>() * self.num_pairs;
733 }
734
735 offset
736 }
737
738 fn key_end(&self, n: usize) -> usize {
739 if let Some(fixed) = self.fixed_key_size {
740 return self.key_section_start() + fixed * (n + 1);
741 }
742 let offset = 4 + size_of::<u32>() * n;
743 u32::from_le_bytes(
744 self.page[offset..(offset + size_of::<u32>())]
745 .try_into()
746 .unwrap(),
747 ) as usize
748 }
749
750 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
751 if let Some(key_width) = self.fixed_key_size {
752 assert_eq!(key_width, key.len());
753 }
754 if let Some(value_width) = self.fixed_value_size {
755 assert_eq!(value_width, value.len());
756 }
757 let key_offset = if self.pairs_written == 0 {
758 self.key_section_start()
759 } else {
760 self.key_end(self.pairs_written - 1)
761 };
762 let value_offset = if self.pairs_written == 0 {
763 self.key_section_start() + self.provisioned_key_bytes
764 } else {
765 self.value_end(self.pairs_written - 1)
766 };
767
768 let n = self.pairs_written;
769 if self.fixed_key_size.is_none() {
770 let offset = 4 + size_of::<u32>() * n;
771 self.page[offset..(offset + size_of::<u32>())]
772 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
773 }
774 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
775 let written_key_len = key_offset + key.len() - self.key_section_start();
776 assert!(written_key_len <= self.provisioned_key_bytes);
777
778 if self.fixed_value_size.is_none() {
779 let mut offset = 4 + size_of::<u32>() * n;
780 if self.fixed_key_size.is_none() {
781 offset += size_of::<u32>() * self.num_pairs;
782 }
783 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
784 &u32::try_from(value_offset + value.len())
785 .unwrap()
786 .to_le_bytes(),
787 );
788 }
789 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
790 self.pairs_written += 1;
791 }
792}
793
794impl Drop for RawLeafBuilder<'_> {
795 fn drop(&mut self) {
796 if !thread::panicking() {
797 assert_eq!(self.pairs_written, self.num_pairs);
798 assert_eq!(
799 self.key_section_start() + self.provisioned_key_bytes,
800 self.key_end(self.num_pairs - 1)
801 );
802 }
803 }
804}
805
806pub(crate) struct LeafMutator<'b> {
807 page: &'b mut PageMut,
808 fixed_key_size: Option<usize>,
809 fixed_value_size: Option<usize>,
810}
811
812impl<'b> LeafMutator<'b> {
813 pub(crate) fn new(
814 page: &'b mut PageMut,
815 fixed_key_size: Option<usize>,
816 fixed_value_size: Option<usize>,
817 ) -> Self {
818 assert_eq!(page.memory_mut()[0], LEAF);
819 Self {
820 page,
821 fixed_key_size,
822 fixed_value_size,
823 }
824 }
825
826 pub(super) fn sufficient_insert_inplace_space(
827 page: &'_ PageImpl,
828 position: usize,
829 overwrite: bool,
830 fixed_key_size: Option<usize>,
831 fixed_value_size: Option<usize>,
832 new_key: &[u8],
833 new_value: &[u8],
834 ) -> bool {
835 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
836 if overwrite {
837 let remaining = page.memory().len() - accessor.total_length();
838 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
839 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
840 required_delta <= isize::try_from(remaining).unwrap()
841 } else {
842 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
847 return false;
848 }
849 let remaining = page.memory().len() - accessor.total_length();
850 let mut required_delta = new_key.len() + new_value.len();
851 if fixed_key_size.is_none() {
852 required_delta += size_of::<u32>();
853 }
854 if fixed_value_size.is_none() {
855 required_delta += size_of::<u32>();
856 }
857 required_delta <= remaining
858 }
859 }
860
861 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
863 let accessor = LeafAccessor::new(
864 self.page.memory(),
865 self.fixed_key_size,
866 self.fixed_value_size,
867 );
868 let required_delta = if overwrite {
869 isize::try_from(key.len() + value.len()).unwrap()
870 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
871 } else {
872 let mut delta = key.len() + value.len();
873 if self.fixed_key_size.is_none() {
874 delta += size_of::<u32>();
875 }
876 if self.fixed_value_size.is_none() {
877 delta += size_of::<u32>();
878 }
879 delta.try_into().unwrap()
880 };
881 assert!(
882 isize::try_from(accessor.total_length()).unwrap() + required_delta
883 <= isize::try_from(self.page.memory().len()).unwrap()
884 );
885
886 let num_pairs = accessor.num_pairs();
887 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
888 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
889 let shift_index = if overwrite { i + 1 } else { i };
890 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
891 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
892 let existing_value_len = accessor
893 .value_range(i)
894 .map(|(start, end)| end - start)
895 .unwrap_or_default();
896
897 let value_delta = if overwrite {
898 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
899 } else {
900 value.len().try_into().unwrap()
901 };
902
903 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
905 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
906 4
907 } else {
908 0
909 };
910 if !overwrite {
911 for j in 0..i {
912 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
913 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
914 .try_into()
915 .unwrap();
916 self.update_value_end(j, value_delta);
917 }
918 }
919 for j in i..num_pairs {
920 if overwrite {
921 self.update_value_end(j, value_delta);
922 } else {
923 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
924 .try_into()
925 .unwrap();
926 self.update_key_end(j, key_delta);
927 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
928 self.update_value_end(j, value_delta);
929 }
930 }
931
932 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
933 self.page.memory_mut()[2..4]
934 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
935
936 let mut dest = if overwrite {
938 (isize::try_from(shift_value_start).unwrap() + value_delta)
939 .try_into()
940 .unwrap()
941 } else {
942 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
943 };
944 let start = shift_value_start;
945 let end = last_value_end;
946 self.page.memory_mut().copy_within(start..end, dest);
947
948 let inserted_value_end: u32 = dest.try_into().unwrap();
950 dest -= value.len();
951 self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
952
953 if !overwrite {
954 let start = shift_key_start;
956 let end = shift_value_start;
957 dest -= end - start;
958 self.page.memory_mut().copy_within(start..end, dest);
959
960 let inserted_key_end: u32 = dest.try_into().unwrap();
962 dest -= key.len();
963 self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
964
965 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
967 let end = shift_key_start;
968 dest -= end - start;
969 debug_assert_eq!(
970 dest,
971 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
972 );
973 self.page.memory_mut().copy_within(start..end, dest);
974
975 if self.fixed_value_size.is_none() {
977 dest -= size_of::<u32>();
978 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
979 .copy_from_slice(&inserted_value_end.to_le_bytes());
980 }
981
982 let start = 4 + key_ptr_size * i;
984 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
985 dest -= end - start;
986 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
987 self.page.memory_mut().copy_within(start..end, dest);
988
989 if self.fixed_key_size.is_none() {
991 dest -= size_of::<u32>();
992 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
993 .copy_from_slice(&inserted_key_end.to_le_bytes());
994 }
995 debug_assert_eq!(dest, 4 + key_ptr_size * i);
996 }
997 }
998
999 pub(super) fn remove(&mut self, i: usize) {
1000 let accessor = LeafAccessor::new(
1001 self.page.memory(),
1002 self.fixed_key_size,
1003 self.fixed_value_size,
1004 );
1005 let num_pairs = accessor.num_pairs();
1006 assert!(i < num_pairs);
1007 assert!(num_pairs > 1);
1008 let key_start = accessor.key_start(i).unwrap();
1009 let key_end = accessor.key_end(i).unwrap();
1010 let value_start = accessor.value_start(i).unwrap();
1011 let value_end = accessor.value_end(i).unwrap();
1012 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1013
1014 let key_ptr_size = if self.fixed_key_size.is_none() {
1016 size_of::<u32>()
1017 } else {
1018 0
1019 };
1020 let value_ptr_size = if self.fixed_value_size.is_none() {
1021 size_of::<u32>()
1022 } else {
1023 0
1024 };
1025 for j in 0..i {
1026 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1027 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1028 - isize::try_from(key_end - key_start).unwrap();
1029 self.update_value_end(j, value_delta);
1030 }
1031 for j in (i + 1)..num_pairs {
1032 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1033 - isize::try_from(key_end - key_start).unwrap();
1034 self.update_key_end(j, key_delta);
1035 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1036 self.update_value_end(j, value_delta);
1037 }
1038
1039 let new_num_pairs = num_pairs - 1;
1042 self.page.memory_mut()[2..4]
1043 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1044 let mut dest = 4 + key_ptr_size * i;
1046 let start = 4 + key_ptr_size * (i + 1);
1048 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1050 self.page.memory_mut().copy_within(start..end, dest);
1051 dest += end - start;
1052 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1053
1054 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1056 let end = key_start;
1057 self.page.memory_mut().copy_within(start..end, dest);
1058 dest += end - start;
1059
1060 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1061 debug_assert_eq!(
1062 dest,
1063 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1064 );
1065
1066 let start = key_end;
1068 let end = value_start;
1069 self.page.memory_mut().copy_within(start..end, dest);
1070 dest += end - start;
1071
1072 let preceding_data_len =
1074 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1075 debug_assert_eq!(
1076 dest,
1077 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1078 );
1079 let start = value_end;
1080 let end = last_value_end;
1081 self.page.memory_mut().copy_within(start..end, dest);
1082 }
1083
1084 fn update_key_end(&mut self, i: usize, delta: isize) {
1085 if self.fixed_key_size.is_some() {
1086 return;
1087 }
1088 let offset = 4 + size_of::<u32>() * i;
1089 let mut ptr = u32::from_le_bytes(
1090 self.page.memory()[offset..(offset + size_of::<u32>())]
1091 .try_into()
1092 .unwrap(),
1093 );
1094 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1095 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1096 .copy_from_slice(&ptr.to_le_bytes());
1097 }
1098
1099 fn update_value_end(&mut self, i: usize, delta: isize) {
1100 if self.fixed_value_size.is_some() {
1101 return;
1102 }
1103 let accessor = LeafAccessor::new(
1104 self.page.memory(),
1105 self.fixed_key_size,
1106 self.fixed_value_size,
1107 );
1108 let num_pairs = accessor.num_pairs();
1109 let mut offset = 4 + size_of::<u32>() * i;
1110 if self.fixed_key_size.is_none() {
1111 offset += size_of::<u32>() * num_pairs;
1112 }
1113 let mut ptr = u32::from_le_bytes(
1114 self.page.memory()[offset..(offset + size_of::<u32>())]
1115 .try_into()
1116 .unwrap(),
1117 );
1118 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1119 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1120 .copy_from_slice(&ptr.to_le_bytes());
1121 }
1122}
1123
1124pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1127 page: &'b T,
1128 num_keys: usize,
1129 fixed_key_size: Option<usize>,
1130 _page_lifetime: PhantomData<&'a ()>,
1131}
1132
1133impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1134 pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1135 debug_assert_eq!(page.memory()[0], BRANCH);
1136 let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1137 BranchAccessor {
1138 page,
1139 num_keys,
1140 fixed_key_size,
1141 _page_lifetime: Default::default(),
1142 }
1143 }
1144
1145 pub(super) fn print_node<K: Key>(&self) {
1146 eprint!(
1147 "Internal[ (page={:?}), child_0={:?}",
1148 self.page.get_page_number(),
1149 self.child_page(0).unwrap()
1150 );
1151 for i in 0..(self.count_children() - 1) {
1152 if let Some(child) = self.child_page(i + 1) {
1153 let key = self.key(i).unwrap();
1154 eprint!(" key_{i}={:?}", K::from_bytes(key));
1155 eprint!(" child_{}={child:?}", i + 1);
1156 }
1157 }
1158 eprint!("]");
1159 }
1160
1161 pub(crate) fn total_length(&self) -> usize {
1162 self.key_end(self.num_keys() - 1).unwrap()
1164 }
1165
1166 pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1167 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1170 let mid = (min_child + max_child) / 2;
1171 match K::compare(query, self.key(mid).unwrap()) {
1172 Ordering::Less => {
1173 max_child = mid;
1174 }
1175 Ordering::Equal => {
1176 return (mid, self.child_page(mid).unwrap());
1177 }
1178 Ordering::Greater => {
1179 min_child = mid + 1;
1180 }
1181 }
1182 }
1183 debug_assert_eq!(min_child, max_child);
1184
1185 (min_child, self.child_page(min_child).unwrap())
1186 }
1187
1188 fn key_section_start(&self) -> usize {
1189 if self.fixed_key_size.is_none() {
1190 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1191 + size_of::<u32>() * self.num_keys()
1192 } else {
1193 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1194 }
1195 }
1196
1197 fn key_offset(&self, n: usize) -> usize {
1198 if n == 0 {
1199 self.key_section_start()
1200 } else {
1201 self.key_end(n - 1).unwrap()
1202 }
1203 }
1204
1205 fn key_end(&self, n: usize) -> Option<usize> {
1206 if let Some(fixed) = self.fixed_key_size {
1207 return Some(self.key_section_start() + fixed * (n + 1));
1208 }
1209 let offset = 8
1210 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1211 + size_of::<u32>() * n;
1212 Some(u32::from_le_bytes(
1213 self.page
1214 .memory()
1215 .get(offset..(offset + size_of::<u32>()))?
1216 .try_into()
1217 .unwrap(),
1218 ) as usize)
1219 }
1220
1221 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1222 if n >= self.num_keys() {
1223 return None;
1224 }
1225 let offset = self.key_offset(n);
1226 let end = self.key_end(n)?;
1227 Some(&self.page.memory()[offset..end])
1228 }
1229
1230 pub(crate) fn count_children(&self) -> usize {
1231 self.num_keys() + 1
1232 }
1233
1234 pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1235 if n >= self.count_children() {
1236 return None;
1237 }
1238
1239 let offset = 8 + size_of::<Checksum>() * n;
1240 Some(Checksum::from_le_bytes(
1241 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1242 .try_into()
1243 .unwrap(),
1244 ))
1245 }
1246
1247 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1248 if n >= self.count_children() {
1249 return None;
1250 }
1251
1252 let offset =
1253 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1254 Some(PageNumber::from_le_bytes(
1255 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1256 .try_into()
1257 .unwrap(),
1258 ))
1259 }
1260
1261 fn num_keys(&self) -> usize {
1262 self.num_keys
1263 }
1264}
1265
1266pub(super) struct BranchBuilder<'a, 'b> {
1267 children: Vec<(PageNumber, Checksum)>,
1268 keys: Vec<&'a [u8]>,
1269 total_key_bytes: usize,
1270 fixed_key_size: Option<usize>,
1271 mem: &'b TransactionalMemory,
1272}
1273
1274impl<'a, 'b> BranchBuilder<'a, 'b> {
1275 pub(super) fn new(
1276 mem: &'b TransactionalMemory,
1277 child_capacity: usize,
1278 fixed_key_size: Option<usize>,
1279 ) -> Self {
1280 Self {
1281 children: Vec::with_capacity(child_capacity),
1282 keys: Vec::with_capacity(child_capacity - 1),
1283 total_key_bytes: 0,
1284 fixed_key_size,
1285 mem,
1286 }
1287 }
1288
1289 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1290 self.children[index] = (child, checksum);
1291 }
1292
1293 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1294 self.children.push((child, checksum));
1295 }
1296
1297 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1298 self.keys.push(key);
1299 self.total_key_bytes += key.len();
1300 }
1301
1302 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1303 for i in 0..accessor.count_children() {
1304 let child = accessor.child_page(i).unwrap();
1305 let checksum = accessor.child_checksum(i).unwrap();
1306 self.push_child(child, checksum);
1307 }
1308 for i in 0..(accessor.count_children() - 1) {
1309 self.push_key(accessor.key(i).unwrap());
1310 }
1311 }
1312
1313 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1314 if self.children.len() > 1 {
1315 None
1316 } else {
1317 Some(self.children[0])
1318 }
1319 }
1320
1321 pub(super) fn build(self) -> Result<PageMut> {
1322 assert_eq!(self.children.len(), self.keys.len() + 1);
1323 let size = RawBranchBuilder::required_bytes(
1324 self.keys.len(),
1325 self.total_key_bytes,
1326 self.fixed_key_size,
1327 );
1328 let mut page = self.mem.allocate(size)?;
1329 let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1330 builder.write_first_page(self.children[0].0, self.children[0].1);
1331 for i in 1..self.children.len() {
1332 let key = &self.keys[i - 1];
1333 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1334 }
1335 drop(builder);
1336
1337 Ok(page)
1338 }
1339
1340 pub(super) fn should_split(&self) -> bool {
1341 let size = RawBranchBuilder::required_bytes(
1342 self.keys.len(),
1343 self.total_key_bytes,
1344 self.fixed_key_size,
1345 );
1346 size > self.mem.get_page_size() && self.keys.len() >= 3
1347 }
1348
1349 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1350 assert_eq!(self.children.len(), self.keys.len() + 1);
1351 assert!(self.keys.len() >= 3);
1352 let division = self.keys.len() / 2;
1353 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1354 let division_key = self.keys[division];
1355 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1356
1357 let size =
1358 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1359 let mut page1 = self.mem.allocate(size)?;
1360 let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1361 builder.write_first_page(self.children[0].0, self.children[0].1);
1362 for i in 0..division {
1363 let key = &self.keys[i];
1364 builder.write_nth_key(
1365 key.as_ref(),
1366 self.children[i + 1].0,
1367 self.children[i + 1].1,
1368 i,
1369 );
1370 }
1371 drop(builder);
1372
1373 let size = RawBranchBuilder::required_bytes(
1374 self.keys.len() - division - 1,
1375 second_split_key_len,
1376 self.fixed_key_size,
1377 );
1378 let mut page2 = self.mem.allocate(size)?;
1379 let mut builder = RawBranchBuilder::new(
1380 &mut page2,
1381 self.keys.len() - division - 1,
1382 self.fixed_key_size,
1383 );
1384 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1385 for i in (division + 1)..self.keys.len() {
1386 let key = &self.keys[i];
1387 builder.write_nth_key(
1388 key.as_ref(),
1389 self.children[i + 1].0,
1390 self.children[i + 1].1,
1391 i - division - 1,
1392 );
1393 }
1394 drop(builder);
1395
1396 Ok((page1, division_key, page2))
1397 }
1398}
1399
1400pub(super) struct RawBranchBuilder<'b> {
1416 page: &'b mut PageMut,
1417 fixed_key_size: Option<usize>,
1418 num_keys: usize,
1419 keys_written: usize, }
1421
1422impl<'b> RawBranchBuilder<'b> {
1423 pub(super) fn required_bytes(
1424 num_keys: usize,
1425 size_of_keys: usize,
1426 fixed_key_size: Option<usize>,
1427 ) -> usize {
1428 if fixed_key_size.is_none() {
1429 let fixed_size = 8
1430 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1431 + size_of::<u32>() * num_keys;
1432 size_of_keys + fixed_size
1433 } else {
1434 let fixed_size =
1435 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1436 size_of_keys + fixed_size
1437 }
1438 }
1439
1440 pub(super) fn new(
1442 page: &'b mut PageMut,
1443 num_keys: usize,
1444 fixed_key_size: Option<usize>,
1445 ) -> Self {
1446 assert!(num_keys > 0);
1447 page.memory_mut()[0] = BRANCH;
1448 page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1449 #[cfg(debug_assertions)]
1450 {
1451 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1453 let last = 8
1454 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1455 + size_of::<u32>() * num_keys;
1456 for x in &mut page.memory_mut()[start..last] {
1457 *x = 0xFF;
1458 }
1459 }
1460 RawBranchBuilder {
1461 page,
1462 fixed_key_size,
1463 num_keys,
1464 keys_written: 0,
1465 }
1466 }
1467
1468 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1469 let offset = 8;
1470 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1471 .copy_from_slice(&checksum.to_le_bytes());
1472 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1473 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1474 .copy_from_slice(&page_number.to_le_bytes());
1475 }
1476
1477 fn key_section_start(&self) -> usize {
1478 let mut offset =
1479 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1480 if self.fixed_key_size.is_none() {
1481 offset += size_of::<u32>() * self.num_keys;
1482 }
1483
1484 offset
1485 }
1486
1487 fn key_end(&self, n: usize) -> usize {
1488 if let Some(fixed) = self.fixed_key_size {
1489 return self.key_section_start() + fixed * (n + 1);
1490 }
1491 let offset = 8
1492 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1493 + size_of::<u32>() * n;
1494 u32::from_le_bytes(
1495 self.page.memory()[offset..(offset + size_of::<u32>())]
1496 .try_into()
1497 .unwrap(),
1498 ) as usize
1499 }
1500
1501 pub(super) fn write_nth_key(
1504 &mut self,
1505 key: &[u8],
1506 page_number: PageNumber,
1507 checksum: Checksum,
1508 n: usize,
1509 ) {
1510 assert!(n < self.num_keys);
1511 assert_eq!(n, self.keys_written);
1512 self.keys_written += 1;
1513 let offset = 8 + size_of::<Checksum>() * (n + 1);
1514 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1515 .copy_from_slice(&checksum.to_le_bytes());
1516 let offset = 8
1517 + size_of::<Checksum>() * (self.num_keys + 1)
1518 + PageNumber::serialized_size() * (n + 1);
1519 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1520 .copy_from_slice(&page_number.to_le_bytes());
1521
1522 let data_offset = if n > 0 {
1523 self.key_end(n - 1)
1524 } else {
1525 self.key_section_start()
1526 };
1527 if self.fixed_key_size.is_none() {
1528 let offset = 8
1529 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1530 + size_of::<u32>() * n;
1531 self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1532 &u32::try_from(data_offset + key.len())
1533 .unwrap()
1534 .to_le_bytes(),
1535 );
1536 }
1537
1538 debug_assert!(data_offset > offset);
1539 self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1540 }
1541}
1542
1543impl Drop for RawBranchBuilder<'_> {
1544 fn drop(&mut self) {
1545 if !thread::panicking() {
1546 assert_eq!(self.keys_written, self.num_keys);
1547 }
1548 }
1549}
1550
1551pub(crate) struct BranchMutator<'b> {
1552 page: &'b mut PageMut,
1553}
1554
1555impl<'b> BranchMutator<'b> {
1556 pub(crate) fn new(page: &'b mut PageMut) -> Self {
1557 assert_eq!(page.memory()[0], BRANCH);
1558 Self { page }
1559 }
1560
1561 fn num_keys(&self) -> usize {
1562 u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1563 }
1564
1565 pub(crate) fn write_child_page(
1566 &mut self,
1567 i: usize,
1568 page_number: PageNumber,
1569 checksum: Checksum,
1570 ) {
1571 debug_assert!(i <= self.num_keys());
1572 let offset = 8 + size_of::<Checksum>() * i;
1573 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1574 .copy_from_slice(&checksum.to_le_bytes());
1575 let offset =
1576 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1577 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1578 .copy_from_slice(&page_number.to_le_bytes());
1579 }
1580}