1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6 btree_stats, AllPageNumbersBtreeIter, BranchAccessor, BranchMutator, Btree, BtreeHeader,
7 BtreeMut, BtreeRangeIter, BtreeStats, Checksum, LeafAccessor, LeafMutator, Page, PageHint,
8 PageNumber, PagePath, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree,
9 UntypedBtreeMut, BRANCH, DEFERRED, LEAF, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24 root: Option<PageNumber>,
25 mem: &TransactionalMemory,
26 fixed_key_size: Option<usize>,
27 fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29 if let Some(root) = root {
30 multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31 } else {
32 Ok(BtreeStats {
33 tree_height: 0,
34 leaf_pages: 0,
35 branch_pages: 0,
36 stored_leaf_bytes: 0,
37 metadata_bytes: 0,
38 fragmented_bytes: 0,
39 })
40 }
41}
42
43fn multimap_stats_helper(
44 page_number: PageNumber,
45 mem: &TransactionalMemory,
46 fixed_key_size: Option<usize>,
47 fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49 let page = mem.get_page(page_number)?;
50 let node_mem = page.memory();
51 match node_mem[0] {
52 LEAF => {
53 let accessor = LeafAccessor::new(
54 page.memory(),
55 fixed_key_size,
56 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57 );
58 let mut leaf_bytes = 0u64;
59 let mut is_branch = false;
60 for i in 0..accessor.num_pairs() {
61 let entry = accessor.entry(i).unwrap();
62 let collection: &UntypedDynamicCollection =
63 UntypedDynamicCollection::new(entry.value());
64 match collection.collection_type() {
65 Inline => {
66 let inline_accessor = LeafAccessor::new(
67 collection.as_inline(),
68 fixed_value_size,
69 <() as Value>::fixed_width(),
70 );
71 leaf_bytes +=
72 inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73 }
74 SubtreeV2 => {
75 is_branch = true;
76 }
77 }
78 }
79 let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81 let mut max_child_height = 0;
82 let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84 for i in 0..accessor.num_pairs() {
85 let entry = accessor.entry(i).unwrap();
86 let collection: &UntypedDynamicCollection =
87 UntypedDynamicCollection::new(entry.value());
88 match collection.collection_type() {
89 Inline => {
90 }
92 SubtreeV2 => {
93 let stats = btree_stats(
95 Some(collection.as_subtree().root),
96 mem,
97 fixed_value_size,
98 <() as Value>::fixed_width(),
99 )?;
100 max_child_height = max(max_child_height, stats.tree_height);
101 branch_pages += stats.branch_pages;
102 leaf_pages += stats.leaf_pages;
103 fragmented_bytes += stats.fragmented_bytes;
104 overhead_bytes += stats.metadata_bytes;
105 leaf_bytes += stats.stored_leaf_bytes;
106 }
107 }
108 }
109
110 Ok(BtreeStats {
111 tree_height: max_child_height + 1,
112 leaf_pages,
113 branch_pages,
114 stored_leaf_bytes: leaf_bytes,
115 metadata_bytes: overhead_bytes,
116 fragmented_bytes,
117 })
118 }
119 BRANCH => {
120 let accessor = BranchAccessor::new(&page, fixed_key_size);
121 let mut max_child_height = 0;
122 let mut leaf_pages = 0;
123 let mut branch_pages = 1;
124 let mut stored_leaf_bytes = 0;
125 let mut metadata_bytes = accessor.total_length() as u64;
126 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127 for i in 0..accessor.count_children() {
128 if let Some(child) = accessor.child_page(i) {
129 let stats =
130 multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131 max_child_height = max(max_child_height, stats.tree_height);
132 leaf_pages += stats.leaf_pages;
133 branch_pages += stats.branch_pages;
134 stored_leaf_bytes += stats.stored_leaf_bytes;
135 metadata_bytes += stats.metadata_bytes;
136 fragmented_bytes += stats.fragmented_bytes;
137 }
138 }
139
140 Ok(BtreeStats {
141 tree_height: max_child_height + 1,
142 leaf_pages,
143 branch_pages,
144 stored_leaf_bytes,
145 metadata_bytes,
146 fragmented_bytes,
147 })
148 }
149 _ => unreachable!(),
150 }
151}
152
153pub(crate) fn verify_tree_and_subtree_checksums(
155 root: Option<BtreeHeader>,
156 key_size: Option<usize>,
157 value_size: Option<usize>,
158 mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160 if let Some(header) = root {
161 if !RawBtree::new(
162 Some(header),
163 key_size,
164 DynamicCollection::<()>::fixed_width_with(value_size),
165 mem.clone(),
166 )
167 .verify_checksum()?
168 {
169 return Ok(false);
170 }
171
172 let table_pages_iter = AllPageNumbersBtreeIter::new(
173 header.root,
174 key_size,
175 DynamicCollection::<()>::fixed_width_with(value_size),
176 mem.clone(),
177 )?;
178 for table_page in table_pages_iter {
179 let page = mem.get_page(table_page?)?;
180 let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181 for header in subtree_roots {
182 if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183 .verify_checksum()?
184 {
185 return Ok(false);
186 }
187 }
188 }
189 }
190
191 Ok(true)
192}
193
194pub(crate) fn relocate_subtrees(
196 root: (PageNumber, Checksum),
197 key_size: Option<usize>,
198 value_size: Option<usize>,
199 mem: Arc<TransactionalMemory>,
200 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201 relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203 let old_page = mem.get_page(root.0)?;
204 let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205 mem.get_page_mut(*new_page_number)?
206 } else {
207 return Ok(root);
208 };
209 let new_page_number = new_page.get_page_number();
210 new_page.memory_mut().copy_from_slice(old_page.memory());
211
212 match old_page.memory()[0] {
213 LEAF => {
214 let accessor = LeafAccessor::new(
215 old_page.memory(),
216 key_size,
217 UntypedDynamicCollection::fixed_width_with(value_size),
218 );
219 let mut mutator = LeafMutator::new(
221 &mut new_page,
222 key_size,
223 UntypedDynamicCollection::fixed_width_with(value_size),
224 );
225 for i in 0..accessor.num_pairs() {
226 let entry = accessor.entry(i).unwrap();
227 let collection = UntypedDynamicCollection::from_bytes(entry.value());
228 if matches!(collection.collection_type(), SubtreeV2) {
229 let sub_root = collection.as_subtree();
230 let mut tree = UntypedBtreeMut::new(
231 Some(sub_root),
232 mem.clone(),
233 freed_pages.clone(),
234 value_size,
235 <() as Value>::fixed_width(),
236 );
237 tree.relocate(relocation_map)?;
238 if sub_root != tree.get_root().unwrap() {
239 let new_collection =
240 UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241 mutator.insert(i, true, entry.key(), &new_collection);
242 }
243 }
244 }
245 }
246 BRANCH => {
247 let accessor = BranchAccessor::new(&old_page, key_size);
248 let mut mutator = BranchMutator::new(&mut new_page);
249 for i in 0..accessor.count_children() {
250 if let Some(child) = accessor.child_page(i) {
251 let child_checksum = accessor.child_checksum(i).unwrap();
252 let (new_child, new_checksum) = relocate_subtrees(
253 (child, child_checksum),
254 key_size,
255 value_size,
256 mem.clone(),
257 freed_pages.clone(),
258 relocation_map,
259 )?;
260 mutator.write_child_page(i, new_child, new_checksum);
261 }
262 }
263 }
264 _ => unreachable!(),
265 }
266
267 let old_page_number = old_page.get_page_number();
268 drop(old_page);
269 if !mem.free_if_uncommitted(old_page_number) {
270 freed_pages.lock().unwrap().push(old_page_number);
271 }
272 Ok((new_page_number, DEFERRED))
273}
274
275pub(crate) fn finalize_tree_and_subtree_checksums(
278 root: Option<BtreeHeader>,
279 key_size: Option<usize>,
280 value_size: Option<usize>,
281 mem: Arc<TransactionalMemory>,
282) -> Result<Option<BtreeHeader>> {
283 let freed_pages = Arc::new(Mutex::new(vec![]));
284 let mut tree = UntypedBtreeMut::new(
285 root,
286 mem.clone(),
287 freed_pages.clone(),
288 key_size,
289 DynamicCollection::<()>::fixed_width_with(value_size),
290 );
291 tree.dirty_leaf_visitor(|mut leaf_page| {
292 let mut sub_root_updates = vec![];
293 let accessor = LeafAccessor::new(
294 leaf_page.memory(),
295 key_size,
296 DynamicCollection::<()>::fixed_width_with(value_size),
297 );
298 for i in 0..accessor.num_pairs() {
299 let entry = accessor.entry(i).unwrap();
300 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
301 if matches!(collection.collection_type(), SubtreeV2) {
302 let sub_root = collection.as_subtree();
303 if mem.uncommitted(sub_root.root) {
304 let mut subtree = UntypedBtreeMut::new(
305 Some(sub_root),
306 mem.clone(),
307 freed_pages.clone(),
308 value_size,
309 <()>::fixed_width(),
310 );
311 let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
312 sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
313 }
314 }
315 }
316 let mut mutator = LeafMutator::new(
318 &mut leaf_page,
319 key_size,
320 DynamicCollection::<()>::fixed_width_with(value_size),
321 );
322 for (i, key, sub_root) in sub_root_updates {
323 let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
324 mutator.insert(i, true, &key, &collection);
325 }
326
327 Ok(())
328 })?;
329
330 let root = tree.finalize_dirty_checksums()?;
331 assert!(freed_pages.lock().unwrap().is_empty());
333 Ok(root)
334}
335
336fn parse_subtree_roots<T: Page>(
337 page: &T,
338 fixed_key_size: Option<usize>,
339 fixed_value_size: Option<usize>,
340) -> Vec<BtreeHeader> {
341 match page.memory()[0] {
342 BRANCH => {
343 vec![]
344 }
345 LEAF => {
346 let mut result = vec![];
347 let accessor = LeafAccessor::new(
348 page.memory(),
349 fixed_key_size,
350 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
351 );
352 for i in 0..accessor.num_pairs() {
353 let entry = accessor.entry(i).unwrap();
354 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
355 if matches!(collection.collection_type(), SubtreeV2) {
356 result.push(collection.as_subtree());
357 }
358 }
359
360 result
361 }
362 _ => unreachable!(),
363 }
364}
365
366pub(crate) struct UntypedMultiBtree {
367 mem: Arc<TransactionalMemory>,
368 root: Option<BtreeHeader>,
369 key_width: Option<usize>,
370 value_width: Option<usize>,
371}
372
373impl UntypedMultiBtree {
374 pub(crate) fn new(
375 root: Option<BtreeHeader>,
376 mem: Arc<TransactionalMemory>,
377 key_width: Option<usize>,
378 value_width: Option<usize>,
379 ) -> Self {
380 Self {
381 mem,
382 root,
383 key_width,
384 value_width,
385 }
386 }
387
388 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
390 where
391 F: FnMut(&PagePath) -> Result,
392 {
393 let tree = UntypedBtree::new(
394 self.root,
395 self.mem.clone(),
396 self.key_width,
397 UntypedDynamicCollection::fixed_width_with(self.value_width),
398 );
399 tree.visit_all_pages(|path| {
400 visitor(path)?;
401 let page = self.mem.get_page(path.page_number())?;
402 match page.memory()[0] {
403 LEAF => {
404 for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
405 let subtree = UntypedBtree::new(
406 Some(header),
407 self.mem.clone(),
408 self.value_width,
409 <() as Value>::fixed_width(),
410 );
411 subtree.visit_all_pages(|subpath| {
412 let full_path = path.with_subpath(subpath);
413 visitor(&full_path)
414 })?;
415 }
416 }
417 BRANCH => {
418 }
420 _ => unreachable!(),
421 }
422 Ok(())
423 })?;
424
425 Ok(())
426 }
427}
428
429pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
430 inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
431 fixed_key_size: Option<usize>,
432 fixed_value_size: Option<usize>,
433 start_entry: isize, end_entry: isize, }
436
437impl<'a, V: Key> LeafKeyIter<'a, V> {
438 fn new(
439 data: AccessGuard<'a, &'static DynamicCollection<V>>,
440 fixed_key_size: Option<usize>,
441 fixed_value_size: Option<usize>,
442 ) -> Self {
443 let accessor =
444 LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
445 let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
446 Self {
447 inline_collection: data,
448 fixed_key_size,
449 fixed_value_size,
450 start_entry: 0,
451 end_entry,
452 }
453 }
454
455 fn next_key(&mut self) -> Option<&[u8]> {
456 if self.end_entry < self.start_entry {
457 return None;
458 }
459 let accessor = LeafAccessor::new(
460 self.inline_collection.value().as_inline(),
461 self.fixed_key_size,
462 self.fixed_value_size,
463 );
464 self.start_entry += 1;
465 accessor
466 .entry((self.start_entry - 1).try_into().unwrap())
467 .map(|e| e.key())
468 }
469
470 fn next_key_back(&mut self) -> Option<&[u8]> {
471 if self.end_entry < self.start_entry {
472 return None;
473 }
474 let accessor = LeafAccessor::new(
475 self.inline_collection.value().as_inline(),
476 self.fixed_key_size,
477 self.fixed_value_size,
478 );
479 self.end_entry -= 1;
480 accessor
481 .entry((self.end_entry + 1).try_into().unwrap())
482 .map(|e| e.key())
483 }
484}
485
486enum DynamicCollectionType {
487 Inline,
488 SubtreeV2,
491}
492
493impl From<u8> for DynamicCollectionType {
494 fn from(value: u8) -> Self {
495 match value {
496 LEAF => Inline,
497 3 => SubtreeV2,
499 _ => unreachable!(),
500 }
501 }
502}
503
504#[allow(clippy::from_over_into)]
505impl Into<u8> for DynamicCollectionType {
506 fn into(self) -> u8 {
507 match self {
508 Inline => LEAF,
511 SubtreeV2 => 3,
513 }
514 }
515}
516
517#[repr(transparent)]
531struct DynamicCollection<V: Key> {
532 _value_type: PhantomData<V>,
533 data: [u8],
534}
535
536impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
537 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538 f.debug_struct("DynamicCollection")
539 .field("data", &&self.data)
540 .finish()
541 }
542}
543
544impl<V: Key> Value for &DynamicCollection<V> {
545 type SelfType<'a> = &'a DynamicCollection<V>
546 where
547 Self: 'a;
548 type AsBytes<'a> = &'a [u8]
549 where
550 Self: 'a;
551
552 fn fixed_width() -> Option<usize> {
553 None
554 }
555
556 fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
557 where
558 Self: 'a,
559 {
560 DynamicCollection::new(data)
561 }
562
563 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
564 where
565 Self: 'b,
566 {
567 &value.data
568 }
569
570 fn type_name() -> TypeName {
571 TypeName::internal("redb::DynamicCollection")
572 }
573}
574
575impl<V: Key> DynamicCollection<V> {
576 fn new(data: &[u8]) -> &Self {
577 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
578 }
579
580 fn collection_type(&self) -> DynamicCollectionType {
581 DynamicCollectionType::from(self.data[0])
582 }
583
584 fn as_inline(&self) -> &[u8] {
585 debug_assert!(matches!(self.collection_type(), Inline));
586 &self.data[1..]
587 }
588
589 fn as_subtree(&self) -> BtreeHeader {
590 assert!(matches!(self.collection_type(), SubtreeV2));
591 BtreeHeader::from_le_bytes(
592 self.data[1..=BtreeHeader::serialized_size()]
593 .try_into()
594 .unwrap(),
595 )
596 }
597
598 fn get_num_values(&self) -> u64 {
599 match self.collection_type() {
600 Inline => {
601 let leaf_data = self.as_inline();
602 let accessor =
603 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
604 accessor.num_pairs() as u64
605 }
606 SubtreeV2 => {
607 let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
608 u64::from_le_bytes(
609 self.data[offset..(offset + size_of::<u64>())]
610 .try_into()
611 .unwrap(),
612 )
613 }
614 }
615 }
616
617 fn make_inline_data(data: &[u8]) -> Vec<u8> {
618 let mut result = vec![Inline.into()];
619 result.extend_from_slice(data);
620
621 result
622 }
623
624 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
625 let mut result = vec![SubtreeV2.into()];
626 result.extend_from_slice(&header.to_le_bytes());
627 result
628 }
629
630 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
631 None
632 }
633}
634
635impl<V: Key> DynamicCollection<V> {
636 fn iter<'a>(
637 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
638 guard: Arc<TransactionGuard>,
639 mem: Arc<TransactionalMemory>,
640 ) -> Result<MultimapValue<'a, V>> {
641 Ok(match collection.value().collection_type() {
642 Inline => {
643 let leaf_iter =
644 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
645 MultimapValue::new_inline(leaf_iter, guard)
646 }
647 SubtreeV2 => {
648 let root = collection.value().as_subtree().root;
649 MultimapValue::new_subtree(
650 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
651 collection.value().get_num_values(),
652 guard,
653 )
654 }
655 })
656 }
657
658 fn iter_free_on_drop<'a>(
659 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
660 pages: Vec<PageNumber>,
661 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
662 guard: Arc<TransactionGuard>,
663 mem: Arc<TransactionalMemory>,
664 ) -> Result<MultimapValue<'a, V>> {
665 let num_values = collection.value().get_num_values();
666 Ok(match collection.value().collection_type() {
667 Inline => {
668 let leaf_iter =
669 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
670 MultimapValue::new_inline(leaf_iter, guard)
671 }
672 SubtreeV2 => {
673 let root = collection.value().as_subtree().root;
674 let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
675 &(..),
676 Some(root),
677 mem.clone(),
678 )?;
679 MultimapValue::new_subtree_free_on_drop(
680 inner,
681 num_values,
682 freed_pages,
683 pages,
684 guard,
685 mem,
686 )
687 }
688 })
689 }
690}
691
692#[repr(transparent)]
693struct UntypedDynamicCollection {
694 data: [u8],
695}
696
697impl UntypedDynamicCollection {
698 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
699 None
700 }
701
702 fn new(data: &[u8]) -> &Self {
703 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
704 }
705
706 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
707 let mut result = vec![SubtreeV2.into()];
708 result.extend_from_slice(&header.to_le_bytes());
709 result
710 }
711
712 fn from_bytes(data: &[u8]) -> &Self {
713 Self::new(data)
714 }
715
716 fn collection_type(&self) -> DynamicCollectionType {
717 DynamicCollectionType::from(self.data[0])
718 }
719
720 fn as_inline(&self) -> &[u8] {
721 debug_assert!(matches!(self.collection_type(), Inline));
722 &self.data[1..]
723 }
724
725 fn as_subtree(&self) -> BtreeHeader {
726 assert!(matches!(self.collection_type(), SubtreeV2));
727 BtreeHeader::from_le_bytes(
728 self.data[1..=BtreeHeader::serialized_size()]
729 .try_into()
730 .unwrap(),
731 )
732 }
733}
734
735enum ValueIterState<'a, V: Key + 'static> {
736 Subtree(BtreeRangeIter<V, ()>),
737 InlineLeaf(LeafKeyIter<'a, V>),
738}
739
740pub struct MultimapValue<'a, V: Key + 'static> {
741 inner: Option<ValueIterState<'a, V>>,
742 remaining: u64,
743 freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
744 free_on_drop: Vec<PageNumber>,
745 _transaction_guard: Arc<TransactionGuard>,
746 mem: Option<Arc<TransactionalMemory>>,
747 _value_type: PhantomData<V>,
748}
749
750impl<'a, V: Key + 'static> MultimapValue<'a, V> {
751 fn new_subtree(
752 inner: BtreeRangeIter<V, ()>,
753 num_values: u64,
754 guard: Arc<TransactionGuard>,
755 ) -> Self {
756 Self {
757 inner: Some(ValueIterState::Subtree(inner)),
758 remaining: num_values,
759 freed_pages: None,
760 free_on_drop: vec![],
761 _transaction_guard: guard,
762 mem: None,
763 _value_type: Default::default(),
764 }
765 }
766
767 fn new_subtree_free_on_drop(
768 inner: BtreeRangeIter<V, ()>,
769 num_values: u64,
770 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
771 pages: Vec<PageNumber>,
772 guard: Arc<TransactionGuard>,
773 mem: Arc<TransactionalMemory>,
774 ) -> Self {
775 Self {
776 inner: Some(ValueIterState::Subtree(inner)),
777 remaining: num_values,
778 freed_pages: Some(freed_pages),
779 free_on_drop: pages,
780 _transaction_guard: guard,
781 mem: Some(mem),
782 _value_type: Default::default(),
783 }
784 }
785
786 fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
787 let remaining = inner.inline_collection.value().get_num_values();
788 Self {
789 inner: Some(ValueIterState::InlineLeaf(inner)),
790 remaining,
791 freed_pages: None,
792 free_on_drop: vec![],
793 _transaction_guard: guard,
794 mem: None,
795 _value_type: Default::default(),
796 }
797 }
798
799 pub fn len(&self) -> u64 {
803 self.remaining
804 }
805
806 pub fn is_empty(&self) -> bool {
807 self.len() == 0
808 }
809}
810
811impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
812 type Item = Result<AccessGuard<'a, V>>;
813
814 fn next(&mut self) -> Option<Self::Item> {
815 let bytes = match self.inner.as_mut().unwrap() {
817 ValueIterState::Subtree(ref mut iter) => match iter.next()? {
818 Ok(e) => e.key_data(),
819 Err(err) => {
820 return Some(Err(err));
821 }
822 },
823 ValueIterState::InlineLeaf(ref mut iter) => iter.next_key()?.to_vec(),
824 };
825 self.remaining -= 1;
826 Some(Ok(AccessGuard::with_owned_value(bytes)))
827 }
828}
829
830impl<'a, V: Key + 'static> DoubleEndedIterator for MultimapValue<'a, V> {
831 fn next_back(&mut self) -> Option<Self::Item> {
832 let bytes = match self.inner.as_mut().unwrap() {
834 ValueIterState::Subtree(ref mut iter) => match iter.next_back()? {
835 Ok(e) => e.key_data(),
836 Err(err) => {
837 return Some(Err(err));
838 }
839 },
840 ValueIterState::InlineLeaf(ref mut iter) => iter.next_key_back()?.to_vec(),
841 };
842 Some(Ok(AccessGuard::with_owned_value(bytes)))
843 }
844}
845
846impl<'a, V: Key + 'static> Drop for MultimapValue<'a, V> {
847 fn drop(&mut self) {
848 drop(mem::take(&mut self.inner));
850 if !self.free_on_drop.is_empty() {
851 let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
852 for page in &self.free_on_drop {
853 if !self.mem.as_ref().unwrap().free_if_uncommitted(*page) {
854 freed_pages.push(*page);
855 }
856 }
857 }
858 }
859}
860
861pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
862 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
863 mem: Arc<TransactionalMemory>,
864 transaction_guard: Arc<TransactionGuard>,
865 _key_type: PhantomData<K>,
866 _value_type: PhantomData<V>,
867 _lifetime: PhantomData<&'a ()>,
868}
869
870impl<'a, K: Key + 'static, V: Key + 'static> MultimapRange<'a, K, V> {
871 fn new(
872 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
873 guard: Arc<TransactionGuard>,
874 mem: Arc<TransactionalMemory>,
875 ) -> Self {
876 Self {
877 inner,
878 mem,
879 transaction_guard: guard,
880 _key_type: Default::default(),
881 _value_type: Default::default(),
882 _lifetime: Default::default(),
883 }
884 }
885}
886
887impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
888 type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
889
890 fn next(&mut self) -> Option<Self::Item> {
891 match self.inner.next()? {
892 Ok(entry) => {
893 let key = AccessGuard::with_owned_value(entry.key_data());
894 let (page, _, value_range) = entry.into_raw();
895 let collection = AccessGuard::with_page(page, value_range);
896 Some(
897 DynamicCollection::iter(
898 collection,
899 self.transaction_guard.clone(),
900 self.mem.clone(),
901 )
902 .map(|iter| (key, iter)),
903 )
904 }
905 Err(err) => Some(Err(err)),
906 }
907 }
908}
909
910impl<'a, K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'a, K, V> {
911 fn next_back(&mut self) -> Option<Self::Item> {
912 match self.inner.next_back()? {
913 Ok(entry) => {
914 let key = AccessGuard::with_owned_value(entry.key_data());
915 let (page, _, value_range) = entry.into_raw();
916 let collection = AccessGuard::with_page(page, value_range);
917 Some(
918 DynamicCollection::iter(
919 collection,
920 self.transaction_guard.clone(),
921 self.mem.clone(),
922 )
923 .map(|iter| (key, iter)),
924 )
925 }
926 Err(err) => Some(Err(err)),
927 }
928 }
929}
930
931pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
935 name: String,
936 num_values: u64,
937 transaction: &'txn WriteTransaction,
938 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
939 tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
940 mem: Arc<TransactionalMemory>,
941 _value_type: PhantomData<V>,
942}
943
944impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
945 fn name(&self) -> &str {
946 &self.name
947 }
948}
949
950impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
951 pub(crate) fn new(
952 name: &str,
953 table_root: Option<BtreeHeader>,
954 num_values: u64,
955 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
956 mem: Arc<TransactionalMemory>,
957 transaction: &'txn WriteTransaction,
958 ) -> MultimapTable<'txn, K, V> {
959 MultimapTable {
960 name: name.to_string(),
961 num_values,
962 transaction,
963 freed_pages: freed_pages.clone(),
964 tree: BtreeMut::new(
965 table_root,
966 transaction.transaction_guard(),
967 mem.clone(),
968 freed_pages,
969 ),
970 mem,
971 _value_type: Default::default(),
972 }
973 }
974
975 #[allow(dead_code)]
976 pub(crate) fn print_debug(&self, include_values: bool) -> Result {
977 self.tree.print_debug(include_values)
978 }
979
980 pub fn insert<'k, 'v>(
984 &mut self,
985 key: impl Borrow<K::SelfType<'k>>,
986 value: impl Borrow<V::SelfType<'v>>,
987 ) -> Result<bool> {
988 let value_bytes = V::as_bytes(value.borrow());
989 let value_bytes_ref = value_bytes.as_ref();
990 if value_bytes_ref.len() > MAX_VALUE_LENGTH {
991 return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
992 }
993 let key_len = K::as_bytes(key.borrow()).as_ref().len();
994 if key_len > MAX_VALUE_LENGTH {
995 return Err(StorageError::ValueTooLarge(key_len));
996 }
997 if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
998 return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
999 }
1000 let get_result = self.tree.get(key.borrow())?;
1001 let existed = if get_result.is_some() {
1002 #[allow(clippy::unnecessary_unwrap)]
1003 let guard = get_result.unwrap();
1004 let collection_type = guard.value().collection_type();
1005 match collection_type {
1006 Inline => {
1007 let leaf_data = guard.value().as_inline();
1008 let accessor = LeafAccessor::new(
1009 leaf_data,
1010 V::fixed_width(),
1011 <() as Value>::fixed_width(),
1012 );
1013 let (position, found) = accessor.position::<V>(value_bytes_ref);
1014 if found {
1015 return Ok(true);
1016 }
1017
1018 let num_pairs = accessor.num_pairs();
1019 let new_pairs = num_pairs + 1;
1020 let new_pair_bytes =
1021 accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1022 let new_key_bytes =
1023 accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1024 let required_inline_bytes = RawLeafBuilder::required_bytes(
1025 new_pairs,
1026 new_pair_bytes,
1027 V::fixed_width(),
1028 <() as Value>::fixed_width(),
1029 );
1030
1031 if required_inline_bytes < self.mem.get_page_size() / 2 {
1032 let mut data = vec![0; required_inline_bytes];
1033 let mut builder = RawLeafBuilder::new(
1034 &mut data,
1035 new_pairs,
1036 V::fixed_width(),
1037 <() as Value>::fixed_width(),
1038 new_key_bytes,
1039 );
1040 for i in 0..accessor.num_pairs() {
1041 if i == position {
1042 builder
1043 .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1044 }
1045 let entry = accessor.entry(i).unwrap();
1046 builder.append(entry.key(), entry.value());
1047 }
1048 if position == accessor.num_pairs() {
1049 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1050 }
1051 drop(builder);
1052 drop(guard);
1053 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1054 self.tree
1055 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1056 } else {
1057 let mut page = self.mem.allocate(leaf_data.len())?;
1059 page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1060 let page_number = page.get_page_number();
1061 drop(page);
1062 drop(guard);
1063
1064 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1066 Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1067 self.transaction.transaction_guard(),
1068 self.mem.clone(),
1069 self.freed_pages.clone(),
1070 );
1071 let existed = subtree.insert(value.borrow(), &())?.is_some();
1072 assert_eq!(existed, found);
1073 let subtree_data =
1074 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1075 self.tree
1076 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1077 }
1078
1079 found
1080 }
1081 SubtreeV2 => {
1082 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1083 Some(guard.value().as_subtree()),
1084 self.transaction.transaction_guard(),
1085 self.mem.clone(),
1086 self.freed_pages.clone(),
1087 );
1088 drop(guard);
1089 let existed = subtree.insert(value.borrow(), &())?.is_some();
1090 let subtree_data =
1091 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1092 self.tree
1093 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1094
1095 existed
1096 }
1097 }
1098 } else {
1099 drop(get_result);
1100 let required_inline_bytes = RawLeafBuilder::required_bytes(
1101 1,
1102 value_bytes_ref.len(),
1103 V::fixed_width(),
1104 <() as Value>::fixed_width(),
1105 );
1106 if required_inline_bytes < self.mem.get_page_size() / 2 {
1107 let mut data = vec![0; required_inline_bytes];
1108 let mut builder = RawLeafBuilder::new(
1109 &mut data,
1110 1,
1111 V::fixed_width(),
1112 <() as Value>::fixed_width(),
1113 value_bytes_ref.len(),
1114 );
1115 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1116 drop(builder);
1117 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1118 self.tree
1119 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1120 } else {
1121 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1122 None,
1123 self.transaction.transaction_guard(),
1124 self.mem.clone(),
1125 self.freed_pages.clone(),
1126 );
1127 subtree.insert(value.borrow(), &())?;
1128 let subtree_data =
1129 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1130 self.tree
1131 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1132 }
1133 false
1134 };
1135
1136 if !existed {
1137 self.num_values += 1;
1138 }
1139
1140 Ok(existed)
1141 }
1142
1143 pub fn remove<'k, 'v>(
1147 &mut self,
1148 key: impl Borrow<K::SelfType<'k>>,
1149 value: impl Borrow<V::SelfType<'v>>,
1150 ) -> Result<bool> {
1151 let get_result = self.tree.get(key.borrow())?;
1152 if get_result.is_none() {
1153 return Ok(false);
1154 }
1155 let guard = get_result.unwrap();
1156 let v = guard.value();
1157 let existed = match v.collection_type() {
1158 Inline => {
1159 let leaf_data = v.as_inline();
1160 let accessor =
1161 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1162 if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1163 {
1164 let old_num_pairs = accessor.num_pairs();
1165 if old_num_pairs == 1 {
1166 drop(guard);
1167 self.tree.remove(key.borrow())?;
1168 } else {
1169 let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1170 let removed_value_len = accessor.entry(position).unwrap().key().len();
1171 let required = RawLeafBuilder::required_bytes(
1172 old_num_pairs - 1,
1173 old_pairs_len - removed_value_len,
1174 V::fixed_width(),
1175 <() as Value>::fixed_width(),
1176 );
1177 let mut new_data = vec![0; required];
1178 let new_key_len =
1179 accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1180 let mut builder = RawLeafBuilder::new(
1181 &mut new_data,
1182 old_num_pairs - 1,
1183 V::fixed_width(),
1184 <() as Value>::fixed_width(),
1185 new_key_len,
1186 );
1187 for i in 0..old_num_pairs {
1188 if i != position {
1189 let entry = accessor.entry(i).unwrap();
1190 builder.append(entry.key(), entry.value());
1191 }
1192 }
1193 drop(builder);
1194 drop(guard);
1195
1196 let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1197 self.tree
1198 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1199 }
1200 true
1201 } else {
1202 drop(guard);
1203 false
1204 }
1205 }
1206 SubtreeV2 => {
1207 let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1208 Some(v.as_subtree()),
1209 self.transaction.transaction_guard(),
1210 self.mem.clone(),
1211 self.freed_pages.clone(),
1212 );
1213 drop(guard);
1214 let existed = subtree.remove(value.borrow())?.is_some();
1215
1216 if let Some(BtreeHeader {
1217 root: new_root,
1218 checksum: new_checksum,
1219 length: new_length,
1220 }) = subtree.get_root()
1221 {
1222 let page = self.mem.get_page(new_root)?;
1223 match page.memory()[0] {
1224 LEAF => {
1225 let accessor = LeafAccessor::new(
1226 page.memory(),
1227 V::fixed_width(),
1228 <() as Value>::fixed_width(),
1229 );
1230 let len = accessor.total_length();
1231 if len < self.mem.get_page_size() / 2 {
1232 let inline_data =
1233 DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1234 self.tree
1235 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1236 drop(page);
1237 if !self.mem.free_if_uncommitted(new_root) {
1238 (*self.freed_pages).lock().unwrap().push(new_root);
1239 }
1240 } else {
1241 let subtree_data =
1242 DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1243 new_root,
1244 new_checksum,
1245 accessor.num_pairs() as u64,
1246 ));
1247 self.tree
1248 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1249 }
1250 }
1251 BRANCH => {
1252 let subtree_data = DynamicCollection::<V>::make_subtree_data(
1253 BtreeHeader::new(new_root, new_checksum, new_length),
1254 );
1255 self.tree
1256 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1257 }
1258 _ => unreachable!(),
1259 }
1260 } else {
1261 self.tree.remove(key.borrow())?;
1262 }
1263
1264 existed
1265 }
1266 };
1267
1268 if existed {
1269 self.num_values -= 1;
1270 }
1271
1272 Ok(existed)
1273 }
1274
1275 pub fn remove_all<'a>(
1279 &mut self,
1280 key: impl Borrow<K::SelfType<'a>>,
1281 ) -> Result<MultimapValue<V>> {
1282 let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1283 let mut pages = vec![];
1284 if matches!(
1285 collection.value().collection_type(),
1286 DynamicCollectionType::SubtreeV2
1287 ) {
1288 let root = collection.value().as_subtree().root;
1289 let all_pages = AllPageNumbersBtreeIter::new(
1290 root,
1291 V::fixed_width(),
1292 <() as Value>::fixed_width(),
1293 self.mem.clone(),
1294 )?;
1295 for page in all_pages {
1296 pages.push(page?);
1297 }
1298 }
1299
1300 self.num_values -= collection.value().get_num_values();
1301
1302 DynamicCollection::iter_free_on_drop(
1303 collection,
1304 pages,
1305 self.freed_pages.clone(),
1306 self.transaction.transaction_guard(),
1307 self.mem.clone(),
1308 )?
1309 } else {
1310 MultimapValue::new_subtree(
1311 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1312 0,
1313 self.transaction.transaction_guard(),
1314 )
1315 };
1316
1317 Ok(iter)
1318 }
1319}
1320
1321impl<'txn, K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'txn, K, V> {
1322 fn stats(&self) -> Result<TableStats> {
1323 let tree_stats = multimap_btree_stats(
1324 self.tree.get_root().map(|x| x.root),
1325 &self.mem,
1326 K::fixed_width(),
1327 V::fixed_width(),
1328 )?;
1329
1330 Ok(TableStats {
1331 tree_height: tree_stats.tree_height,
1332 leaf_pages: tree_stats.leaf_pages,
1333 branch_pages: tree_stats.branch_pages,
1334 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1335 metadata_bytes: tree_stats.metadata_bytes,
1336 fragmented_bytes: tree_stats.fragmented_bytes,
1337 })
1338 }
1339
1340 fn len(&self) -> Result<u64> {
1342 Ok(self.num_values)
1343 }
1344}
1345
1346impl<'txn, K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1347 for MultimapTable<'txn, K, V>
1348{
1349 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1351 let guard = self.transaction.transaction_guard();
1352 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1353 DynamicCollection::iter(collection, guard, self.mem.clone())?
1354 } else {
1355 MultimapValue::new_subtree(
1356 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1357 0,
1358 guard,
1359 )
1360 };
1361
1362 Ok(iter)
1363 }
1364
1365 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1367 where
1368 KR: Borrow<K::SelfType<'a>> + 'a,
1369 {
1370 let inner = self.tree.range(&range)?;
1371 Ok(MultimapRange::new(
1372 inner,
1373 self.transaction.transaction_guard(),
1374 self.mem.clone(),
1375 ))
1376 }
1377}
1378
1379impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1380
1381impl<'txn, K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'txn, K, V> {
1382 fn drop(&mut self) {
1383 self.transaction
1384 .close_table(&self.name, &self.tree, self.num_values);
1385 }
1386}
1387
1388pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1389 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1391
1392 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1393 where
1394 KR: Borrow<K::SelfType<'a>> + 'a;
1395
1396 fn iter(&self) -> Result<MultimapRange<K, V>> {
1399 self.range::<K::SelfType<'_>>(..)
1400 }
1401}
1402
1403pub struct ReadOnlyUntypedMultimapTable {
1405 num_values: u64,
1406 tree: RawBtree,
1407 fixed_key_size: Option<usize>,
1408 fixed_value_size: Option<usize>,
1409 mem: Arc<TransactionalMemory>,
1410}
1411
1412impl Sealed for ReadOnlyUntypedMultimapTable {}
1413
1414impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1415 fn stats(&self) -> Result<TableStats> {
1417 let tree_stats = multimap_btree_stats(
1418 self.tree.get_root().map(|x| x.root),
1419 &self.mem,
1420 self.fixed_key_size,
1421 self.fixed_value_size,
1422 )?;
1423
1424 Ok(TableStats {
1425 tree_height: tree_stats.tree_height,
1426 leaf_pages: tree_stats.leaf_pages,
1427 branch_pages: tree_stats.branch_pages,
1428 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1429 metadata_bytes: tree_stats.metadata_bytes,
1430 fragmented_bytes: tree_stats.fragmented_bytes,
1431 })
1432 }
1433
1434 fn len(&self) -> Result<u64> {
1435 Ok(self.num_values)
1436 }
1437}
1438
1439impl ReadOnlyUntypedMultimapTable {
1440 pub(crate) fn new(
1441 root: Option<BtreeHeader>,
1442 num_values: u64,
1443 fixed_key_size: Option<usize>,
1444 fixed_value_size: Option<usize>,
1445 mem: Arc<TransactionalMemory>,
1446 ) -> Self {
1447 Self {
1448 num_values,
1449 tree: RawBtree::new(
1450 root,
1451 fixed_key_size,
1452 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1453 mem.clone(),
1454 ),
1455 fixed_key_size,
1456 fixed_value_size,
1457 mem,
1458 }
1459 }
1460}
1461
1462pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1464 tree: Btree<K, &'static DynamicCollection<V>>,
1465 num_values: u64,
1466 mem: Arc<TransactionalMemory>,
1467 transaction_guard: Arc<TransactionGuard>,
1468 _value_type: PhantomData<V>,
1469}
1470
1471impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1472 pub(crate) fn new(
1473 root: Option<BtreeHeader>,
1474 num_values: u64,
1475 hint: PageHint,
1476 guard: Arc<TransactionGuard>,
1477 mem: Arc<TransactionalMemory>,
1478 ) -> Result<ReadOnlyMultimapTable<K, V>> {
1479 Ok(ReadOnlyMultimapTable {
1480 tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1481 num_values,
1482 mem,
1483 transaction_guard: guard,
1484 _value_type: Default::default(),
1485 })
1486 }
1487
1488 pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1491 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1492 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1493 } else {
1494 MultimapValue::new_subtree(
1495 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1496 0,
1497 self.transaction_guard.clone(),
1498 )
1499 };
1500
1501 Ok(iter)
1502 }
1503
1504 pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1507 where
1508 KR: Borrow<K::SelfType<'a>>,
1509 {
1510 let inner = self.tree.range(&range)?;
1511 Ok(MultimapRange::new(
1512 inner,
1513 self.transaction_guard.clone(),
1514 self.mem.clone(),
1515 ))
1516 }
1517}
1518
1519impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1520 fn stats(&self) -> Result<TableStats> {
1521 let tree_stats = multimap_btree_stats(
1522 self.tree.get_root().map(|x| x.root),
1523 &self.mem,
1524 K::fixed_width(),
1525 V::fixed_width(),
1526 )?;
1527
1528 Ok(TableStats {
1529 tree_height: tree_stats.tree_height,
1530 leaf_pages: tree_stats.leaf_pages,
1531 branch_pages: tree_stats.branch_pages,
1532 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1533 metadata_bytes: tree_stats.metadata_bytes,
1534 fragmented_bytes: tree_stats.fragmented_bytes,
1535 })
1536 }
1537
1538 fn len(&self) -> Result<u64> {
1539 Ok(self.num_values)
1540 }
1541}
1542
1543impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1544 for ReadOnlyMultimapTable<K, V>
1545{
1546 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1548 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1549 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1550 } else {
1551 MultimapValue::new_subtree(
1552 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1553 0,
1554 self.transaction_guard.clone(),
1555 )
1556 };
1557
1558 Ok(iter)
1559 }
1560
1561 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1562 where
1563 KR: Borrow<K::SelfType<'a>> + 'a,
1564 {
1565 let inner = self.tree.range(&range)?;
1566 Ok(MultimapRange::new(
1567 inner,
1568 self.transaction_guard.clone(),
1569 self.mem.clone(),
1570 ))
1571 }
1572}
1573
1574impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}