1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6 AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
7 BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
8 MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, RawBtree,
9 RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24 root: Option<PageNumber>,
25 mem: &TransactionalMemory,
26 fixed_key_size: Option<usize>,
27 fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29 if let Some(root) = root {
30 multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31 } else {
32 Ok(BtreeStats {
33 tree_height: 0,
34 leaf_pages: 0,
35 branch_pages: 0,
36 stored_leaf_bytes: 0,
37 metadata_bytes: 0,
38 fragmented_bytes: 0,
39 })
40 }
41}
42
43fn multimap_stats_helper(
44 page_number: PageNumber,
45 mem: &TransactionalMemory,
46 fixed_key_size: Option<usize>,
47 fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49 let page = mem.get_page(page_number)?;
50 let node_mem = page.memory();
51 match node_mem[0] {
52 LEAF => {
53 let accessor = LeafAccessor::new(
54 page.memory(),
55 fixed_key_size,
56 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57 );
58 let mut leaf_bytes = 0u64;
59 let mut is_branch = false;
60 for i in 0..accessor.num_pairs() {
61 let entry = accessor.entry(i).unwrap();
62 let collection: &UntypedDynamicCollection =
63 UntypedDynamicCollection::new(entry.value());
64 match collection.collection_type() {
65 Inline => {
66 let inline_accessor = LeafAccessor::new(
67 collection.as_inline(),
68 fixed_value_size,
69 <() as Value>::fixed_width(),
70 );
71 leaf_bytes +=
72 inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73 }
74 SubtreeV2 => {
75 is_branch = true;
76 }
77 }
78 }
79 let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81 let mut max_child_height = 0;
82 let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84 for i in 0..accessor.num_pairs() {
85 let entry = accessor.entry(i).unwrap();
86 let collection: &UntypedDynamicCollection =
87 UntypedDynamicCollection::new(entry.value());
88 match collection.collection_type() {
89 Inline => {
90 }
92 SubtreeV2 => {
93 let stats = btree_stats(
95 Some(collection.as_subtree().root),
96 mem,
97 fixed_value_size,
98 <() as Value>::fixed_width(),
99 )?;
100 max_child_height = max(max_child_height, stats.tree_height);
101 branch_pages += stats.branch_pages;
102 leaf_pages += stats.leaf_pages;
103 fragmented_bytes += stats.fragmented_bytes;
104 overhead_bytes += stats.metadata_bytes;
105 leaf_bytes += stats.stored_leaf_bytes;
106 }
107 }
108 }
109
110 Ok(BtreeStats {
111 tree_height: max_child_height + 1,
112 leaf_pages,
113 branch_pages,
114 stored_leaf_bytes: leaf_bytes,
115 metadata_bytes: overhead_bytes,
116 fragmented_bytes,
117 })
118 }
119 BRANCH => {
120 let accessor = BranchAccessor::new(&page, fixed_key_size);
121 let mut max_child_height = 0;
122 let mut leaf_pages = 0;
123 let mut branch_pages = 1;
124 let mut stored_leaf_bytes = 0;
125 let mut metadata_bytes = accessor.total_length() as u64;
126 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127 for i in 0..accessor.count_children() {
128 if let Some(child) = accessor.child_page(i) {
129 let stats =
130 multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131 max_child_height = max(max_child_height, stats.tree_height);
132 leaf_pages += stats.leaf_pages;
133 branch_pages += stats.branch_pages;
134 stored_leaf_bytes += stats.stored_leaf_bytes;
135 metadata_bytes += stats.metadata_bytes;
136 fragmented_bytes += stats.fragmented_bytes;
137 }
138 }
139
140 Ok(BtreeStats {
141 tree_height: max_child_height + 1,
142 leaf_pages,
143 branch_pages,
144 stored_leaf_bytes,
145 metadata_bytes,
146 fragmented_bytes,
147 })
148 }
149 _ => unreachable!(),
150 }
151}
152
153pub(crate) fn verify_tree_and_subtree_checksums(
155 root: Option<BtreeHeader>,
156 key_size: Option<usize>,
157 value_size: Option<usize>,
158 mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160 if let Some(header) = root {
161 if !RawBtree::new(
162 Some(header),
163 key_size,
164 DynamicCollection::<()>::fixed_width_with(value_size),
165 mem.clone(),
166 )
167 .verify_checksum()?
168 {
169 return Ok(false);
170 }
171
172 let table_pages_iter = AllPageNumbersBtreeIter::new(
173 header.root,
174 key_size,
175 DynamicCollection::<()>::fixed_width_with(value_size),
176 mem.clone(),
177 )?;
178 for table_page in table_pages_iter {
179 let page = mem.get_page(table_page?)?;
180 let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181 for header in subtree_roots {
182 if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183 .verify_checksum()?
184 {
185 return Ok(false);
186 }
187 }
188 }
189 }
190
191 Ok(true)
192}
193
194pub(crate) fn relocate_subtrees(
196 root: (PageNumber, Checksum),
197 key_size: Option<usize>,
198 value_size: Option<usize>,
199 mem: Arc<TransactionalMemory>,
200 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201 relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203 let old_page = mem.get_page(root.0)?;
204 let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205 mem.get_page_mut(*new_page_number)?
206 } else {
207 return Ok(root);
208 };
209 let new_page_number = new_page.get_page_number();
210 new_page.memory_mut().copy_from_slice(old_page.memory());
211
212 match old_page.memory()[0] {
213 LEAF => {
214 let accessor = LeafAccessor::new(
215 old_page.memory(),
216 key_size,
217 UntypedDynamicCollection::fixed_width_with(value_size),
218 );
219 let mut mutator = LeafMutator::new(
221 &mut new_page,
222 key_size,
223 UntypedDynamicCollection::fixed_width_with(value_size),
224 );
225 for i in 0..accessor.num_pairs() {
226 let entry = accessor.entry(i).unwrap();
227 let collection = UntypedDynamicCollection::from_bytes(entry.value());
228 if matches!(collection.collection_type(), SubtreeV2) {
229 let sub_root = collection.as_subtree();
230 let mut tree = UntypedBtreeMut::new(
231 Some(sub_root),
232 mem.clone(),
233 freed_pages.clone(),
234 value_size,
235 <() as Value>::fixed_width(),
236 );
237 tree.relocate(relocation_map)?;
238 if sub_root != tree.get_root().unwrap() {
239 let new_collection =
240 UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241 mutator.insert(i, true, entry.key(), &new_collection);
242 }
243 }
244 }
245 }
246 BRANCH => {
247 let accessor = BranchAccessor::new(&old_page, key_size);
248 let mut mutator = BranchMutator::new(&mut new_page);
249 for i in 0..accessor.count_children() {
250 if let Some(child) = accessor.child_page(i) {
251 let child_checksum = accessor.child_checksum(i).unwrap();
252 let (new_child, new_checksum) = relocate_subtrees(
253 (child, child_checksum),
254 key_size,
255 value_size,
256 mem.clone(),
257 freed_pages.clone(),
258 relocation_map,
259 )?;
260 mutator.write_child_page(i, new_child, new_checksum);
261 }
262 }
263 }
264 _ => unreachable!(),
265 }
266
267 let old_page_number = old_page.get_page_number();
268 drop(old_page);
269 if !mem.free_if_uncommitted(old_page_number) {
270 freed_pages.lock().unwrap().push(old_page_number);
271 }
272 Ok((new_page_number, DEFERRED))
273}
274
275pub(crate) fn finalize_tree_and_subtree_checksums(
278 root: Option<BtreeHeader>,
279 key_size: Option<usize>,
280 value_size: Option<usize>,
281 mem: Arc<TransactionalMemory>,
282) -> Result<Option<BtreeHeader>> {
283 let freed_pages = Arc::new(Mutex::new(vec![]));
284 let mut tree = UntypedBtreeMut::new(
285 root,
286 mem.clone(),
287 freed_pages.clone(),
288 key_size,
289 DynamicCollection::<()>::fixed_width_with(value_size),
290 );
291 tree.dirty_leaf_visitor(|mut leaf_page| {
292 let mut sub_root_updates = vec![];
293 let accessor = LeafAccessor::new(
294 leaf_page.memory(),
295 key_size,
296 DynamicCollection::<()>::fixed_width_with(value_size),
297 );
298 for i in 0..accessor.num_pairs() {
299 let entry = accessor.entry(i).unwrap();
300 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
301 if matches!(collection.collection_type(), SubtreeV2) {
302 let sub_root = collection.as_subtree();
303 if mem.uncommitted(sub_root.root) {
304 let mut subtree = UntypedBtreeMut::new(
305 Some(sub_root),
306 mem.clone(),
307 freed_pages.clone(),
308 value_size,
309 <()>::fixed_width(),
310 );
311 let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
312 sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
313 }
314 }
315 }
316 let mut mutator = LeafMutator::new(
318 &mut leaf_page,
319 key_size,
320 DynamicCollection::<()>::fixed_width_with(value_size),
321 );
322 for (i, key, sub_root) in sub_root_updates {
323 let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
324 mutator.insert(i, true, &key, &collection);
325 }
326
327 Ok(())
328 })?;
329
330 let root = tree.finalize_dirty_checksums()?;
331 assert!(freed_pages.lock().unwrap().is_empty());
333 Ok(root)
334}
335
336fn parse_subtree_roots<T: Page>(
337 page: &T,
338 fixed_key_size: Option<usize>,
339 fixed_value_size: Option<usize>,
340) -> Vec<BtreeHeader> {
341 match page.memory()[0] {
342 BRANCH => {
343 vec![]
344 }
345 LEAF => {
346 let mut result = vec![];
347 let accessor = LeafAccessor::new(
348 page.memory(),
349 fixed_key_size,
350 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
351 );
352 for i in 0..accessor.num_pairs() {
353 let entry = accessor.entry(i).unwrap();
354 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
355 if matches!(collection.collection_type(), SubtreeV2) {
356 result.push(collection.as_subtree());
357 }
358 }
359
360 result
361 }
362 _ => unreachable!(),
363 }
364}
365
366pub(crate) struct UntypedMultiBtree {
367 mem: Arc<TransactionalMemory>,
368 root: Option<BtreeHeader>,
369 key_width: Option<usize>,
370 value_width: Option<usize>,
371}
372
373impl UntypedMultiBtree {
374 pub(crate) fn new(
375 root: Option<BtreeHeader>,
376 mem: Arc<TransactionalMemory>,
377 key_width: Option<usize>,
378 value_width: Option<usize>,
379 ) -> Self {
380 Self {
381 mem,
382 root,
383 key_width,
384 value_width,
385 }
386 }
387
388 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
390 where
391 F: FnMut(&PagePath) -> Result,
392 {
393 let tree = UntypedBtree::new(
394 self.root,
395 self.mem.clone(),
396 self.key_width,
397 UntypedDynamicCollection::fixed_width_with(self.value_width),
398 );
399 tree.visit_all_pages(|path| {
400 visitor(path)?;
401 let page = self.mem.get_page(path.page_number())?;
402 match page.memory()[0] {
403 LEAF => {
404 for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
405 let subtree = UntypedBtree::new(
406 Some(header),
407 self.mem.clone(),
408 self.value_width,
409 <() as Value>::fixed_width(),
410 );
411 subtree.visit_all_pages(|subpath| {
412 let full_path = path.with_subpath(subpath);
413 visitor(&full_path)
414 })?;
415 }
416 }
417 BRANCH => {
418 }
420 _ => unreachable!(),
421 }
422 Ok(())
423 })?;
424
425 Ok(())
426 }
427}
428
429pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
430 inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
431 fixed_key_size: Option<usize>,
432 fixed_value_size: Option<usize>,
433 start_entry: isize, end_entry: isize, }
436
437impl<'a, V: Key> LeafKeyIter<'a, V> {
438 fn new(
439 data: AccessGuard<'a, &'static DynamicCollection<V>>,
440 fixed_key_size: Option<usize>,
441 fixed_value_size: Option<usize>,
442 ) -> Self {
443 let accessor =
444 LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
445 let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
446 Self {
447 inline_collection: data,
448 fixed_key_size,
449 fixed_value_size,
450 start_entry: 0,
451 end_entry,
452 }
453 }
454
455 fn next_key(&mut self) -> Option<&[u8]> {
456 if self.end_entry < self.start_entry {
457 return None;
458 }
459 let accessor = LeafAccessor::new(
460 self.inline_collection.value().as_inline(),
461 self.fixed_key_size,
462 self.fixed_value_size,
463 );
464 self.start_entry += 1;
465 accessor
466 .entry((self.start_entry - 1).try_into().unwrap())
467 .map(|e| e.key())
468 }
469
470 fn next_key_back(&mut self) -> Option<&[u8]> {
471 if self.end_entry < self.start_entry {
472 return None;
473 }
474 let accessor = LeafAccessor::new(
475 self.inline_collection.value().as_inline(),
476 self.fixed_key_size,
477 self.fixed_value_size,
478 );
479 self.end_entry -= 1;
480 accessor
481 .entry((self.end_entry + 1).try_into().unwrap())
482 .map(|e| e.key())
483 }
484}
485
486enum DynamicCollectionType {
487 Inline,
488 SubtreeV2,
491}
492
493impl From<u8> for DynamicCollectionType {
494 fn from(value: u8) -> Self {
495 match value {
496 LEAF => Inline,
497 3 => SubtreeV2,
499 _ => unreachable!(),
500 }
501 }
502}
503
504#[allow(clippy::from_over_into)]
505impl Into<u8> for DynamicCollectionType {
506 fn into(self) -> u8 {
507 match self {
508 Inline => LEAF,
511 SubtreeV2 => 3,
513 }
514 }
515}
516
517#[repr(transparent)]
531struct DynamicCollection<V: Key> {
532 _value_type: PhantomData<V>,
533 data: [u8],
534}
535
536impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
537 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538 f.debug_struct("DynamicCollection")
539 .field("data", &&self.data)
540 .finish()
541 }
542}
543
544impl<V: Key> Value for &DynamicCollection<V> {
545 type SelfType<'a>
546 = &'a DynamicCollection<V>
547 where
548 Self: 'a;
549 type AsBytes<'a>
550 = &'a [u8]
551 where
552 Self: 'a;
553
554 fn fixed_width() -> Option<usize> {
555 None
556 }
557
558 fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
559 where
560 Self: 'a,
561 {
562 DynamicCollection::new(data)
563 }
564
565 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
566 where
567 Self: 'b,
568 {
569 &value.data
570 }
571
572 fn type_name() -> TypeName {
573 TypeName::internal("redb::DynamicCollection")
574 }
575}
576
577impl<V: Key> DynamicCollection<V> {
578 fn new(data: &[u8]) -> &Self {
579 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
580 }
581
582 fn collection_type(&self) -> DynamicCollectionType {
583 DynamicCollectionType::from(self.data[0])
584 }
585
586 fn as_inline(&self) -> &[u8] {
587 debug_assert!(matches!(self.collection_type(), Inline));
588 &self.data[1..]
589 }
590
591 fn as_subtree(&self) -> BtreeHeader {
592 assert!(matches!(self.collection_type(), SubtreeV2));
593 BtreeHeader::from_le_bytes(
594 self.data[1..=BtreeHeader::serialized_size()]
595 .try_into()
596 .unwrap(),
597 )
598 }
599
600 fn get_num_values(&self) -> u64 {
601 match self.collection_type() {
602 Inline => {
603 let leaf_data = self.as_inline();
604 let accessor =
605 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
606 accessor.num_pairs() as u64
607 }
608 SubtreeV2 => {
609 let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
610 u64::from_le_bytes(
611 self.data[offset..(offset + size_of::<u64>())]
612 .try_into()
613 .unwrap(),
614 )
615 }
616 }
617 }
618
619 fn make_inline_data(data: &[u8]) -> Vec<u8> {
620 let mut result = vec![Inline.into()];
621 result.extend_from_slice(data);
622
623 result
624 }
625
626 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
627 let mut result = vec![SubtreeV2.into()];
628 result.extend_from_slice(&header.to_le_bytes());
629 result
630 }
631
632 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
633 None
634 }
635}
636
637impl<V: Key> DynamicCollection<V> {
638 fn iter<'a>(
639 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
640 guard: Arc<TransactionGuard>,
641 mem: Arc<TransactionalMemory>,
642 ) -> Result<MultimapValue<'a, V>> {
643 Ok(match collection.value().collection_type() {
644 Inline => {
645 let leaf_iter =
646 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
647 MultimapValue::new_inline(leaf_iter, guard)
648 }
649 SubtreeV2 => {
650 let root = collection.value().as_subtree().root;
651 MultimapValue::new_subtree(
652 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
653 collection.value().get_num_values(),
654 guard,
655 )
656 }
657 })
658 }
659
660 fn iter_free_on_drop<'a>(
661 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
662 pages: Vec<PageNumber>,
663 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
664 guard: Arc<TransactionGuard>,
665 mem: Arc<TransactionalMemory>,
666 ) -> Result<MultimapValue<'a, V>> {
667 let num_values = collection.value().get_num_values();
668 Ok(match collection.value().collection_type() {
669 Inline => {
670 let leaf_iter =
671 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
672 MultimapValue::new_inline(leaf_iter, guard)
673 }
674 SubtreeV2 => {
675 let root = collection.value().as_subtree().root;
676 let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
677 &(..),
678 Some(root),
679 mem.clone(),
680 )?;
681 MultimapValue::new_subtree_free_on_drop(
682 inner,
683 num_values,
684 freed_pages,
685 pages,
686 guard,
687 mem,
688 )
689 }
690 })
691 }
692}
693
694#[repr(transparent)]
695struct UntypedDynamicCollection {
696 data: [u8],
697}
698
699impl UntypedDynamicCollection {
700 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
701 None
702 }
703
704 fn new(data: &[u8]) -> &Self {
705 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
706 }
707
708 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
709 let mut result = vec![SubtreeV2.into()];
710 result.extend_from_slice(&header.to_le_bytes());
711 result
712 }
713
714 fn from_bytes(data: &[u8]) -> &Self {
715 Self::new(data)
716 }
717
718 fn collection_type(&self) -> DynamicCollectionType {
719 DynamicCollectionType::from(self.data[0])
720 }
721
722 fn as_inline(&self) -> &[u8] {
723 debug_assert!(matches!(self.collection_type(), Inline));
724 &self.data[1..]
725 }
726
727 fn as_subtree(&self) -> BtreeHeader {
728 assert!(matches!(self.collection_type(), SubtreeV2));
729 BtreeHeader::from_le_bytes(
730 self.data[1..=BtreeHeader::serialized_size()]
731 .try_into()
732 .unwrap(),
733 )
734 }
735}
736
737enum ValueIterState<'a, V: Key + 'static> {
738 Subtree(BtreeRangeIter<V, ()>),
739 InlineLeaf(LeafKeyIter<'a, V>),
740}
741
742pub struct MultimapValue<'a, V: Key + 'static> {
743 inner: Option<ValueIterState<'a, V>>,
744 remaining: u64,
745 freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
746 free_on_drop: Vec<PageNumber>,
747 _transaction_guard: Arc<TransactionGuard>,
748 mem: Option<Arc<TransactionalMemory>>,
749 _value_type: PhantomData<V>,
750}
751
752impl<'a, V: Key + 'static> MultimapValue<'a, V> {
753 fn new_subtree(
754 inner: BtreeRangeIter<V, ()>,
755 num_values: u64,
756 guard: Arc<TransactionGuard>,
757 ) -> Self {
758 Self {
759 inner: Some(ValueIterState::Subtree(inner)),
760 remaining: num_values,
761 freed_pages: None,
762 free_on_drop: vec![],
763 _transaction_guard: guard,
764 mem: None,
765 _value_type: Default::default(),
766 }
767 }
768
769 fn new_subtree_free_on_drop(
770 inner: BtreeRangeIter<V, ()>,
771 num_values: u64,
772 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
773 pages: Vec<PageNumber>,
774 guard: Arc<TransactionGuard>,
775 mem: Arc<TransactionalMemory>,
776 ) -> Self {
777 Self {
778 inner: Some(ValueIterState::Subtree(inner)),
779 remaining: num_values,
780 freed_pages: Some(freed_pages),
781 free_on_drop: pages,
782 _transaction_guard: guard,
783 mem: Some(mem),
784 _value_type: Default::default(),
785 }
786 }
787
788 fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
789 let remaining = inner.inline_collection.value().get_num_values();
790 Self {
791 inner: Some(ValueIterState::InlineLeaf(inner)),
792 remaining,
793 freed_pages: None,
794 free_on_drop: vec![],
795 _transaction_guard: guard,
796 mem: None,
797 _value_type: Default::default(),
798 }
799 }
800
801 pub fn len(&self) -> u64 {
805 self.remaining
806 }
807
808 pub fn is_empty(&self) -> bool {
809 self.len() == 0
810 }
811}
812
813impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
814 type Item = Result<AccessGuard<'a, V>>;
815
816 fn next(&mut self) -> Option<Self::Item> {
817 let bytes = match self.inner.as_mut().unwrap() {
819 ValueIterState::Subtree(iter) => match iter.next()? {
820 Ok(e) => e.key_data(),
821 Err(err) => {
822 return Some(Err(err));
823 }
824 },
825 ValueIterState::InlineLeaf(iter) => iter.next_key()?.to_vec(),
826 };
827 self.remaining -= 1;
828 Some(Ok(AccessGuard::with_owned_value(bytes)))
829 }
830}
831
832impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
833 fn next_back(&mut self) -> Option<Self::Item> {
834 let bytes = match self.inner.as_mut().unwrap() {
836 ValueIterState::Subtree(iter) => match iter.next_back()? {
837 Ok(e) => e.key_data(),
838 Err(err) => {
839 return Some(Err(err));
840 }
841 },
842 ValueIterState::InlineLeaf(iter) => iter.next_key_back()?.to_vec(),
843 };
844 Some(Ok(AccessGuard::with_owned_value(bytes)))
845 }
846}
847
848impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
849 fn drop(&mut self) {
850 drop(mem::take(&mut self.inner));
852 if !self.free_on_drop.is_empty() {
853 let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
854 for page in &self.free_on_drop {
855 if !self.mem.as_ref().unwrap().free_if_uncommitted(*page) {
856 freed_pages.push(*page);
857 }
858 }
859 }
860 }
861}
862
863pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
864 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
865 mem: Arc<TransactionalMemory>,
866 transaction_guard: Arc<TransactionGuard>,
867 _key_type: PhantomData<K>,
868 _value_type: PhantomData<V>,
869 _lifetime: PhantomData<&'a ()>,
870}
871
872impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
873 fn new(
874 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
875 guard: Arc<TransactionGuard>,
876 mem: Arc<TransactionalMemory>,
877 ) -> Self {
878 Self {
879 inner,
880 mem,
881 transaction_guard: guard,
882 _key_type: Default::default(),
883 _value_type: Default::default(),
884 _lifetime: Default::default(),
885 }
886 }
887}
888
889impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
890 type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
891
892 fn next(&mut self) -> Option<Self::Item> {
893 match self.inner.next()? {
894 Ok(entry) => {
895 let key = AccessGuard::with_owned_value(entry.key_data());
896 let (page, _, value_range) = entry.into_raw();
897 let collection = AccessGuard::with_page(page, value_range);
898 Some(
899 DynamicCollection::iter(
900 collection,
901 self.transaction_guard.clone(),
902 self.mem.clone(),
903 )
904 .map(|iter| (key, iter)),
905 )
906 }
907 Err(err) => Some(Err(err)),
908 }
909 }
910}
911
912impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
913 fn next_back(&mut self) -> Option<Self::Item> {
914 match self.inner.next_back()? {
915 Ok(entry) => {
916 let key = AccessGuard::with_owned_value(entry.key_data());
917 let (page, _, value_range) = entry.into_raw();
918 let collection = AccessGuard::with_page(page, value_range);
919 Some(
920 DynamicCollection::iter(
921 collection,
922 self.transaction_guard.clone(),
923 self.mem.clone(),
924 )
925 .map(|iter| (key, iter)),
926 )
927 }
928 Err(err) => Some(Err(err)),
929 }
930 }
931}
932
933pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
937 name: String,
938 num_values: u64,
939 transaction: &'txn WriteTransaction,
940 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
941 tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
942 mem: Arc<TransactionalMemory>,
943 _value_type: PhantomData<V>,
944}
945
946impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
947 fn name(&self) -> &str {
948 &self.name
949 }
950}
951
952impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
953 pub(crate) fn new(
954 name: &str,
955 table_root: Option<BtreeHeader>,
956 num_values: u64,
957 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
958 mem: Arc<TransactionalMemory>,
959 transaction: &'txn WriteTransaction,
960 ) -> MultimapTable<'txn, K, V> {
961 MultimapTable {
962 name: name.to_string(),
963 num_values,
964 transaction,
965 freed_pages: freed_pages.clone(),
966 tree: BtreeMut::new(
967 table_root,
968 transaction.transaction_guard(),
969 mem.clone(),
970 freed_pages,
971 ),
972 mem,
973 _value_type: Default::default(),
974 }
975 }
976
977 #[allow(dead_code)]
978 pub(crate) fn print_debug(&self, include_values: bool) -> Result {
979 self.tree.print_debug(include_values)
980 }
981
982 pub fn insert<'k, 'v>(
986 &mut self,
987 key: impl Borrow<K::SelfType<'k>>,
988 value: impl Borrow<V::SelfType<'v>>,
989 ) -> Result<bool> {
990 let value_bytes = V::as_bytes(value.borrow());
991 let value_bytes_ref = value_bytes.as_ref();
992 if value_bytes_ref.len() > MAX_VALUE_LENGTH {
993 return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
994 }
995 let key_len = K::as_bytes(key.borrow()).as_ref().len();
996 if key_len > MAX_VALUE_LENGTH {
997 return Err(StorageError::ValueTooLarge(key_len));
998 }
999 if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1000 return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1001 }
1002 let get_result = self.tree.get(key.borrow())?;
1003 let existed = if get_result.is_some() {
1004 #[allow(clippy::unnecessary_unwrap)]
1005 let guard = get_result.unwrap();
1006 let collection_type = guard.value().collection_type();
1007 match collection_type {
1008 Inline => {
1009 let leaf_data = guard.value().as_inline();
1010 let accessor = LeafAccessor::new(
1011 leaf_data,
1012 V::fixed_width(),
1013 <() as Value>::fixed_width(),
1014 );
1015 let (position, found) = accessor.position::<V>(value_bytes_ref);
1016 if found {
1017 return Ok(true);
1018 }
1019
1020 let num_pairs = accessor.num_pairs();
1021 let new_pairs = num_pairs + 1;
1022 let new_pair_bytes =
1023 accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1024 let new_key_bytes =
1025 accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1026 let required_inline_bytes = RawLeafBuilder::required_bytes(
1027 new_pairs,
1028 new_pair_bytes,
1029 V::fixed_width(),
1030 <() as Value>::fixed_width(),
1031 );
1032
1033 if required_inline_bytes < self.mem.get_page_size() / 2 {
1034 let mut data = vec![0; required_inline_bytes];
1035 let mut builder = RawLeafBuilder::new(
1036 &mut data,
1037 new_pairs,
1038 V::fixed_width(),
1039 <() as Value>::fixed_width(),
1040 new_key_bytes,
1041 );
1042 for i in 0..accessor.num_pairs() {
1043 if i == position {
1044 builder
1045 .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1046 }
1047 let entry = accessor.entry(i).unwrap();
1048 builder.append(entry.key(), entry.value());
1049 }
1050 if position == accessor.num_pairs() {
1051 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1052 }
1053 drop(builder);
1054 drop(guard);
1055 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1056 self.tree
1057 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1058 } else {
1059 let mut page = self.mem.allocate(leaf_data.len())?;
1061 page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1062 let page_number = page.get_page_number();
1063 drop(page);
1064 drop(guard);
1065
1066 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1068 Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1069 self.transaction.transaction_guard(),
1070 self.mem.clone(),
1071 self.freed_pages.clone(),
1072 );
1073 let existed = subtree.insert(value.borrow(), &())?.is_some();
1074 assert_eq!(existed, found);
1075 let subtree_data =
1076 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1077 self.tree
1078 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1079 }
1080
1081 found
1082 }
1083 SubtreeV2 => {
1084 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1085 Some(guard.value().as_subtree()),
1086 self.transaction.transaction_guard(),
1087 self.mem.clone(),
1088 self.freed_pages.clone(),
1089 );
1090 drop(guard);
1091 let existed = subtree.insert(value.borrow(), &())?.is_some();
1092 let subtree_data =
1093 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1094 self.tree
1095 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1096
1097 existed
1098 }
1099 }
1100 } else {
1101 drop(get_result);
1102 let required_inline_bytes = RawLeafBuilder::required_bytes(
1103 1,
1104 value_bytes_ref.len(),
1105 V::fixed_width(),
1106 <() as Value>::fixed_width(),
1107 );
1108 if required_inline_bytes < self.mem.get_page_size() / 2 {
1109 let mut data = vec![0; required_inline_bytes];
1110 let mut builder = RawLeafBuilder::new(
1111 &mut data,
1112 1,
1113 V::fixed_width(),
1114 <() as Value>::fixed_width(),
1115 value_bytes_ref.len(),
1116 );
1117 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1118 drop(builder);
1119 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1120 self.tree
1121 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1122 } else {
1123 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1124 None,
1125 self.transaction.transaction_guard(),
1126 self.mem.clone(),
1127 self.freed_pages.clone(),
1128 );
1129 subtree.insert(value.borrow(), &())?;
1130 let subtree_data =
1131 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1132 self.tree
1133 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1134 }
1135 false
1136 };
1137
1138 if !existed {
1139 self.num_values += 1;
1140 }
1141
1142 Ok(existed)
1143 }
1144
1145 pub fn remove<'k, 'v>(
1149 &mut self,
1150 key: impl Borrow<K::SelfType<'k>>,
1151 value: impl Borrow<V::SelfType<'v>>,
1152 ) -> Result<bool> {
1153 let get_result = self.tree.get(key.borrow())?;
1154 if get_result.is_none() {
1155 return Ok(false);
1156 }
1157 let guard = get_result.unwrap();
1158 let v = guard.value();
1159 let existed = match v.collection_type() {
1160 Inline => {
1161 let leaf_data = v.as_inline();
1162 let accessor =
1163 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1164 if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1165 {
1166 let old_num_pairs = accessor.num_pairs();
1167 if old_num_pairs == 1 {
1168 drop(guard);
1169 self.tree.remove(key.borrow())?;
1170 } else {
1171 let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1172 let removed_value_len = accessor.entry(position).unwrap().key().len();
1173 let required = RawLeafBuilder::required_bytes(
1174 old_num_pairs - 1,
1175 old_pairs_len - removed_value_len,
1176 V::fixed_width(),
1177 <() as Value>::fixed_width(),
1178 );
1179 let mut new_data = vec![0; required];
1180 let new_key_len =
1181 accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1182 let mut builder = RawLeafBuilder::new(
1183 &mut new_data,
1184 old_num_pairs - 1,
1185 V::fixed_width(),
1186 <() as Value>::fixed_width(),
1187 new_key_len,
1188 );
1189 for i in 0..old_num_pairs {
1190 if i != position {
1191 let entry = accessor.entry(i).unwrap();
1192 builder.append(entry.key(), entry.value());
1193 }
1194 }
1195 drop(builder);
1196 drop(guard);
1197
1198 let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1199 self.tree
1200 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1201 }
1202 true
1203 } else {
1204 drop(guard);
1205 false
1206 }
1207 }
1208 SubtreeV2 => {
1209 let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1210 Some(v.as_subtree()),
1211 self.transaction.transaction_guard(),
1212 self.mem.clone(),
1213 self.freed_pages.clone(),
1214 );
1215 drop(guard);
1216 let existed = subtree.remove(value.borrow())?.is_some();
1217
1218 if let Some(BtreeHeader {
1219 root: new_root,
1220 checksum: new_checksum,
1221 length: new_length,
1222 }) = subtree.get_root()
1223 {
1224 let page = self.mem.get_page(new_root)?;
1225 match page.memory()[0] {
1226 LEAF => {
1227 let accessor = LeafAccessor::new(
1228 page.memory(),
1229 V::fixed_width(),
1230 <() as Value>::fixed_width(),
1231 );
1232 let len = accessor.total_length();
1233 if len < self.mem.get_page_size() / 2 {
1234 let inline_data =
1235 DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1236 self.tree
1237 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1238 drop(page);
1239 if !self.mem.free_if_uncommitted(new_root) {
1240 (*self.freed_pages).lock().unwrap().push(new_root);
1241 }
1242 } else {
1243 let subtree_data =
1244 DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1245 new_root,
1246 new_checksum,
1247 accessor.num_pairs() as u64,
1248 ));
1249 self.tree
1250 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1251 }
1252 }
1253 BRANCH => {
1254 let subtree_data = DynamicCollection::<V>::make_subtree_data(
1255 BtreeHeader::new(new_root, new_checksum, new_length),
1256 );
1257 self.tree
1258 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1259 }
1260 _ => unreachable!(),
1261 }
1262 } else {
1263 self.tree.remove(key.borrow())?;
1264 }
1265
1266 existed
1267 }
1268 };
1269
1270 if existed {
1271 self.num_values -= 1;
1272 }
1273
1274 Ok(existed)
1275 }
1276
1277 pub fn remove_all<'a>(
1281 &mut self,
1282 key: impl Borrow<K::SelfType<'a>>,
1283 ) -> Result<MultimapValue<V>> {
1284 let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1285 let mut pages = vec![];
1286 if matches!(
1287 collection.value().collection_type(),
1288 DynamicCollectionType::SubtreeV2
1289 ) {
1290 let root = collection.value().as_subtree().root;
1291 let all_pages = AllPageNumbersBtreeIter::new(
1292 root,
1293 V::fixed_width(),
1294 <() as Value>::fixed_width(),
1295 self.mem.clone(),
1296 )?;
1297 for page in all_pages {
1298 pages.push(page?);
1299 }
1300 }
1301
1302 self.num_values -= collection.value().get_num_values();
1303
1304 DynamicCollection::iter_free_on_drop(
1305 collection,
1306 pages,
1307 self.freed_pages.clone(),
1308 self.transaction.transaction_guard(),
1309 self.mem.clone(),
1310 )?
1311 } else {
1312 MultimapValue::new_subtree(
1313 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1314 0,
1315 self.transaction.transaction_guard(),
1316 )
1317 };
1318
1319 Ok(iter)
1320 }
1321}
1322
1323impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1324 fn stats(&self) -> Result<TableStats> {
1325 let tree_stats = multimap_btree_stats(
1326 self.tree.get_root().map(|x| x.root),
1327 &self.mem,
1328 K::fixed_width(),
1329 V::fixed_width(),
1330 )?;
1331
1332 Ok(TableStats {
1333 tree_height: tree_stats.tree_height,
1334 leaf_pages: tree_stats.leaf_pages,
1335 branch_pages: tree_stats.branch_pages,
1336 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1337 metadata_bytes: tree_stats.metadata_bytes,
1338 fragmented_bytes: tree_stats.fragmented_bytes,
1339 })
1340 }
1341
1342 fn len(&self) -> Result<u64> {
1344 Ok(self.num_values)
1345 }
1346}
1347
1348impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1349 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1351 let guard = self.transaction.transaction_guard();
1352 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1353 DynamicCollection::iter(collection, guard, self.mem.clone())?
1354 } else {
1355 MultimapValue::new_subtree(
1356 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1357 0,
1358 guard,
1359 )
1360 };
1361
1362 Ok(iter)
1363 }
1364
1365 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1367 where
1368 KR: Borrow<K::SelfType<'a>> + 'a,
1369 {
1370 let inner = self.tree.range(&range)?;
1371 Ok(MultimapRange::new(
1372 inner,
1373 self.transaction.transaction_guard(),
1374 self.mem.clone(),
1375 ))
1376 }
1377}
1378
1379impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1380
1381impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1382 fn drop(&mut self) {
1383 self.transaction
1384 .close_table(&self.name, &self.tree, self.num_values);
1385 }
1386}
1387
1388pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1389 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1391
1392 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1393 where
1394 KR: Borrow<K::SelfType<'a>> + 'a;
1395
1396 fn iter(&self) -> Result<MultimapRange<K, V>> {
1399 self.range::<K::SelfType<'_>>(..)
1400 }
1401}
1402
1403pub struct ReadOnlyUntypedMultimapTable {
1405 num_values: u64,
1406 tree: RawBtree,
1407 fixed_key_size: Option<usize>,
1408 fixed_value_size: Option<usize>,
1409 mem: Arc<TransactionalMemory>,
1410}
1411
1412impl Sealed for ReadOnlyUntypedMultimapTable {}
1413
1414impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1415 fn stats(&self) -> Result<TableStats> {
1417 let tree_stats = multimap_btree_stats(
1418 self.tree.get_root().map(|x| x.root),
1419 &self.mem,
1420 self.fixed_key_size,
1421 self.fixed_value_size,
1422 )?;
1423
1424 Ok(TableStats {
1425 tree_height: tree_stats.tree_height,
1426 leaf_pages: tree_stats.leaf_pages,
1427 branch_pages: tree_stats.branch_pages,
1428 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1429 metadata_bytes: tree_stats.metadata_bytes,
1430 fragmented_bytes: tree_stats.fragmented_bytes,
1431 })
1432 }
1433
1434 fn len(&self) -> Result<u64> {
1435 Ok(self.num_values)
1436 }
1437}
1438
1439impl ReadOnlyUntypedMultimapTable {
1440 pub(crate) fn new(
1441 root: Option<BtreeHeader>,
1442 num_values: u64,
1443 fixed_key_size: Option<usize>,
1444 fixed_value_size: Option<usize>,
1445 mem: Arc<TransactionalMemory>,
1446 ) -> Self {
1447 Self {
1448 num_values,
1449 tree: RawBtree::new(
1450 root,
1451 fixed_key_size,
1452 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1453 mem.clone(),
1454 ),
1455 fixed_key_size,
1456 fixed_value_size,
1457 mem,
1458 }
1459 }
1460}
1461
1462pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1464 tree: Btree<K, &'static DynamicCollection<V>>,
1465 num_values: u64,
1466 mem: Arc<TransactionalMemory>,
1467 transaction_guard: Arc<TransactionGuard>,
1468 _value_type: PhantomData<V>,
1469}
1470
1471impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1472 pub(crate) fn new(
1473 root: Option<BtreeHeader>,
1474 num_values: u64,
1475 hint: PageHint,
1476 guard: Arc<TransactionGuard>,
1477 mem: Arc<TransactionalMemory>,
1478 ) -> Result<ReadOnlyMultimapTable<K, V>> {
1479 Ok(ReadOnlyMultimapTable {
1480 tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1481 num_values,
1482 mem,
1483 transaction_guard: guard,
1484 _value_type: Default::default(),
1485 })
1486 }
1487
1488 pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1491 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1492 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1493 } else {
1494 MultimapValue::new_subtree(
1495 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1496 0,
1497 self.transaction_guard.clone(),
1498 )
1499 };
1500
1501 Ok(iter)
1502 }
1503
1504 pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1507 where
1508 KR: Borrow<K::SelfType<'a>>,
1509 {
1510 let inner = self.tree.range(&range)?;
1511 Ok(MultimapRange::new(
1512 inner,
1513 self.transaction_guard.clone(),
1514 self.mem.clone(),
1515 ))
1516 }
1517}
1518
1519impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1520 fn stats(&self) -> Result<TableStats> {
1521 let tree_stats = multimap_btree_stats(
1522 self.tree.get_root().map(|x| x.root),
1523 &self.mem,
1524 K::fixed_width(),
1525 V::fixed_width(),
1526 )?;
1527
1528 Ok(TableStats {
1529 tree_height: tree_stats.tree_height,
1530 leaf_pages: tree_stats.leaf_pages,
1531 branch_pages: tree_stats.branch_pages,
1532 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1533 metadata_bytes: tree_stats.metadata_bytes,
1534 fragmented_bytes: tree_stats.fragmented_bytes,
1535 })
1536 }
1537
1538 fn len(&self) -> Result<u64> {
1539 Ok(self.num_values)
1540 }
1541}
1542
1543impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1544 for ReadOnlyMultimapTable<K, V>
1545{
1546 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1548 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1549 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1550 } else {
1551 MultimapValue::new_subtree(
1552 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1553 0,
1554 self.transaction_guard.clone(),
1555 )
1556 };
1557
1558 Ok(iter)
1559 }
1560
1561 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1562 where
1563 KR: Borrow<K::SelfType<'a>> + 'a,
1564 {
1565 let inner = self.tree.range(&range)?;
1566 Ok(MultimapRange::new(
1567 inner,
1568 self.transaction_guard.clone(),
1569 self.mem.clone(),
1570 ))
1571 }
1572}
1573
1574impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}