redb/
multimap_table.rs

1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6    btree_stats, AllPageNumbersBtreeIter, BranchAccessor, BranchMutator, Btree, BtreeHeader,
7    BtreeMut, BtreeRangeIter, BtreeStats, Checksum, LeafAccessor, LeafMutator, Page, PageHint,
8    PageNumber, PagePath, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree,
9    UntypedBtreeMut, BRANCH, DEFERRED, LEAF, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24    root: Option<PageNumber>,
25    mem: &TransactionalMemory,
26    fixed_key_size: Option<usize>,
27    fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29    if let Some(root) = root {
30        multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31    } else {
32        Ok(BtreeStats {
33            tree_height: 0,
34            leaf_pages: 0,
35            branch_pages: 0,
36            stored_leaf_bytes: 0,
37            metadata_bytes: 0,
38            fragmented_bytes: 0,
39        })
40    }
41}
42
43fn multimap_stats_helper(
44    page_number: PageNumber,
45    mem: &TransactionalMemory,
46    fixed_key_size: Option<usize>,
47    fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49    let page = mem.get_page(page_number)?;
50    let node_mem = page.memory();
51    match node_mem[0] {
52        LEAF => {
53            let accessor = LeafAccessor::new(
54                page.memory(),
55                fixed_key_size,
56                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57            );
58            let mut leaf_bytes = 0u64;
59            let mut is_branch = false;
60            for i in 0..accessor.num_pairs() {
61                let entry = accessor.entry(i).unwrap();
62                let collection: &UntypedDynamicCollection =
63                    UntypedDynamicCollection::new(entry.value());
64                match collection.collection_type() {
65                    Inline => {
66                        let inline_accessor = LeafAccessor::new(
67                            collection.as_inline(),
68                            fixed_value_size,
69                            <() as Value>::fixed_width(),
70                        );
71                        leaf_bytes +=
72                            inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73                    }
74                    SubtreeV2 => {
75                        is_branch = true;
76                    }
77                }
78            }
79            let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81            let mut max_child_height = 0;
82            let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84            for i in 0..accessor.num_pairs() {
85                let entry = accessor.entry(i).unwrap();
86                let collection: &UntypedDynamicCollection =
87                    UntypedDynamicCollection::new(entry.value());
88                match collection.collection_type() {
89                    Inline => {
90                        // data is inline, so it was already counted above
91                    }
92                    SubtreeV2 => {
93                        // this is a sub-tree, so traverse it
94                        let stats = btree_stats(
95                            Some(collection.as_subtree().root),
96                            mem,
97                            fixed_value_size,
98                            <() as Value>::fixed_width(),
99                        )?;
100                        max_child_height = max(max_child_height, stats.tree_height);
101                        branch_pages += stats.branch_pages;
102                        leaf_pages += stats.leaf_pages;
103                        fragmented_bytes += stats.fragmented_bytes;
104                        overhead_bytes += stats.metadata_bytes;
105                        leaf_bytes += stats.stored_leaf_bytes;
106                    }
107                }
108            }
109
110            Ok(BtreeStats {
111                tree_height: max_child_height + 1,
112                leaf_pages,
113                branch_pages,
114                stored_leaf_bytes: leaf_bytes,
115                metadata_bytes: overhead_bytes,
116                fragmented_bytes,
117            })
118        }
119        BRANCH => {
120            let accessor = BranchAccessor::new(&page, fixed_key_size);
121            let mut max_child_height = 0;
122            let mut leaf_pages = 0;
123            let mut branch_pages = 1;
124            let mut stored_leaf_bytes = 0;
125            let mut metadata_bytes = accessor.total_length() as u64;
126            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127            for i in 0..accessor.count_children() {
128                if let Some(child) = accessor.child_page(i) {
129                    let stats =
130                        multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131                    max_child_height = max(max_child_height, stats.tree_height);
132                    leaf_pages += stats.leaf_pages;
133                    branch_pages += stats.branch_pages;
134                    stored_leaf_bytes += stats.stored_leaf_bytes;
135                    metadata_bytes += stats.metadata_bytes;
136                    fragmented_bytes += stats.fragmented_bytes;
137                }
138            }
139
140            Ok(BtreeStats {
141                tree_height: max_child_height + 1,
142                leaf_pages,
143                branch_pages,
144                stored_leaf_bytes,
145                metadata_bytes,
146                fragmented_bytes,
147            })
148        }
149        _ => unreachable!(),
150    }
151}
152
153// Verify all the checksums in the tree, including any Dynamic collection subtrees
154pub(crate) fn verify_tree_and_subtree_checksums(
155    root: Option<BtreeHeader>,
156    key_size: Option<usize>,
157    value_size: Option<usize>,
158    mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160    if let Some(header) = root {
161        if !RawBtree::new(
162            Some(header),
163            key_size,
164            DynamicCollection::<()>::fixed_width_with(value_size),
165            mem.clone(),
166        )
167        .verify_checksum()?
168        {
169            return Ok(false);
170        }
171
172        let table_pages_iter = AllPageNumbersBtreeIter::new(
173            header.root,
174            key_size,
175            DynamicCollection::<()>::fixed_width_with(value_size),
176            mem.clone(),
177        )?;
178        for table_page in table_pages_iter {
179            let page = mem.get_page(table_page?)?;
180            let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181            for header in subtree_roots {
182                if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183                    .verify_checksum()?
184                {
185                    return Ok(false);
186                }
187            }
188        }
189    }
190
191    Ok(true)
192}
193
194// Relocate all subtrees to lower index pages, if possible
195pub(crate) fn relocate_subtrees(
196    root: (PageNumber, Checksum),
197    key_size: Option<usize>,
198    value_size: Option<usize>,
199    mem: Arc<TransactionalMemory>,
200    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201    relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203    let old_page = mem.get_page(root.0)?;
204    let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205        mem.get_page_mut(*new_page_number)?
206    } else {
207        return Ok(root);
208    };
209    let new_page_number = new_page.get_page_number();
210    new_page.memory_mut().copy_from_slice(old_page.memory());
211
212    match old_page.memory()[0] {
213        LEAF => {
214            let accessor = LeafAccessor::new(
215                old_page.memory(),
216                key_size,
217                UntypedDynamicCollection::fixed_width_with(value_size),
218            );
219            // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
220            let mut mutator = LeafMutator::new(
221                &mut new_page,
222                key_size,
223                UntypedDynamicCollection::fixed_width_with(value_size),
224            );
225            for i in 0..accessor.num_pairs() {
226                let entry = accessor.entry(i).unwrap();
227                let collection = UntypedDynamicCollection::from_bytes(entry.value());
228                if matches!(collection.collection_type(), SubtreeV2) {
229                    let sub_root = collection.as_subtree();
230                    let mut tree = UntypedBtreeMut::new(
231                        Some(sub_root),
232                        mem.clone(),
233                        freed_pages.clone(),
234                        value_size,
235                        <() as Value>::fixed_width(),
236                    );
237                    tree.relocate(relocation_map)?;
238                    if sub_root != tree.get_root().unwrap() {
239                        let new_collection =
240                            UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241                        mutator.insert(i, true, entry.key(), &new_collection);
242                    }
243                }
244            }
245        }
246        BRANCH => {
247            let accessor = BranchAccessor::new(&old_page, key_size);
248            let mut mutator = BranchMutator::new(&mut new_page);
249            for i in 0..accessor.count_children() {
250                if let Some(child) = accessor.child_page(i) {
251                    let child_checksum = accessor.child_checksum(i).unwrap();
252                    let (new_child, new_checksum) = relocate_subtrees(
253                        (child, child_checksum),
254                        key_size,
255                        value_size,
256                        mem.clone(),
257                        freed_pages.clone(),
258                        relocation_map,
259                    )?;
260                    mutator.write_child_page(i, new_child, new_checksum);
261                }
262            }
263        }
264        _ => unreachable!(),
265    }
266
267    let old_page_number = old_page.get_page_number();
268    drop(old_page);
269    if !mem.free_if_uncommitted(old_page_number) {
270        freed_pages.lock().unwrap().push(old_page_number);
271    }
272    Ok((new_page_number, DEFERRED))
273}
274
275// Finalize all the checksums in the tree, including any Dynamic collection subtrees
276// Returns the root checksum
277pub(crate) fn finalize_tree_and_subtree_checksums(
278    root: Option<BtreeHeader>,
279    key_size: Option<usize>,
280    value_size: Option<usize>,
281    mem: Arc<TransactionalMemory>,
282) -> Result<Option<BtreeHeader>> {
283    let freed_pages = Arc::new(Mutex::new(vec![]));
284    let mut tree = UntypedBtreeMut::new(
285        root,
286        mem.clone(),
287        freed_pages.clone(),
288        key_size,
289        DynamicCollection::<()>::fixed_width_with(value_size),
290    );
291    tree.dirty_leaf_visitor(|mut leaf_page| {
292        let mut sub_root_updates = vec![];
293        let accessor = LeafAccessor::new(
294            leaf_page.memory(),
295            key_size,
296            DynamicCollection::<()>::fixed_width_with(value_size),
297        );
298        for i in 0..accessor.num_pairs() {
299            let entry = accessor.entry(i).unwrap();
300            let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
301            if matches!(collection.collection_type(), SubtreeV2) {
302                let sub_root = collection.as_subtree();
303                if mem.uncommitted(sub_root.root) {
304                    let mut subtree = UntypedBtreeMut::new(
305                        Some(sub_root),
306                        mem.clone(),
307                        freed_pages.clone(),
308                        value_size,
309                        <()>::fixed_width(),
310                    );
311                    let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
312                    sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
313                }
314            }
315        }
316        // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
317        let mut mutator = LeafMutator::new(
318            &mut leaf_page,
319            key_size,
320            DynamicCollection::<()>::fixed_width_with(value_size),
321        );
322        for (i, key, sub_root) in sub_root_updates {
323            let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
324            mutator.insert(i, true, &key, &collection);
325        }
326
327        Ok(())
328    })?;
329
330    let root = tree.finalize_dirty_checksums()?;
331    // No pages should have been freed by this operation
332    assert!(freed_pages.lock().unwrap().is_empty());
333    Ok(root)
334}
335
336fn parse_subtree_roots<T: Page>(
337    page: &T,
338    fixed_key_size: Option<usize>,
339    fixed_value_size: Option<usize>,
340) -> Vec<BtreeHeader> {
341    match page.memory()[0] {
342        BRANCH => {
343            vec![]
344        }
345        LEAF => {
346            let mut result = vec![];
347            let accessor = LeafAccessor::new(
348                page.memory(),
349                fixed_key_size,
350                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
351            );
352            for i in 0..accessor.num_pairs() {
353                let entry = accessor.entry(i).unwrap();
354                let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
355                if matches!(collection.collection_type(), SubtreeV2) {
356                    result.push(collection.as_subtree());
357                }
358            }
359
360            result
361        }
362        _ => unreachable!(),
363    }
364}
365
366pub(crate) struct UntypedMultiBtree {
367    mem: Arc<TransactionalMemory>,
368    root: Option<BtreeHeader>,
369    key_width: Option<usize>,
370    value_width: Option<usize>,
371}
372
373impl UntypedMultiBtree {
374    pub(crate) fn new(
375        root: Option<BtreeHeader>,
376        mem: Arc<TransactionalMemory>,
377        key_width: Option<usize>,
378        value_width: Option<usize>,
379    ) -> Self {
380        Self {
381            mem,
382            root,
383            key_width,
384            value_width,
385        }
386    }
387
388    // Applies visitor to pages in the tree
389    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
390    where
391        F: FnMut(&PagePath) -> Result,
392    {
393        let tree = UntypedBtree::new(
394            self.root,
395            self.mem.clone(),
396            self.key_width,
397            UntypedDynamicCollection::fixed_width_with(self.value_width),
398        );
399        tree.visit_all_pages(|path| {
400            visitor(path)?;
401            let page = self.mem.get_page(path.page_number())?;
402            match page.memory()[0] {
403                LEAF => {
404                    for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
405                        let subtree = UntypedBtree::new(
406                            Some(header),
407                            self.mem.clone(),
408                            self.value_width,
409                            <() as Value>::fixed_width(),
410                        );
411                        subtree.visit_all_pages(|subpath| {
412                            let full_path = path.with_subpath(subpath);
413                            visitor(&full_path)
414                        })?;
415                    }
416                }
417                BRANCH => {
418                    // No-op. The tree.visit_pages() call will process this sub-tree
419                }
420                _ => unreachable!(),
421            }
422            Ok(())
423        })?;
424
425        Ok(())
426    }
427}
428
429pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
430    inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
431    fixed_key_size: Option<usize>,
432    fixed_value_size: Option<usize>,
433    start_entry: isize, // inclusive
434    end_entry: isize,   // inclusive
435}
436
437impl<'a, V: Key> LeafKeyIter<'a, V> {
438    fn new(
439        data: AccessGuard<'a, &'static DynamicCollection<V>>,
440        fixed_key_size: Option<usize>,
441        fixed_value_size: Option<usize>,
442    ) -> Self {
443        let accessor =
444            LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
445        let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
446        Self {
447            inline_collection: data,
448            fixed_key_size,
449            fixed_value_size,
450            start_entry: 0,
451            end_entry,
452        }
453    }
454
455    fn next_key(&mut self) -> Option<&[u8]> {
456        if self.end_entry < self.start_entry {
457            return None;
458        }
459        let accessor = LeafAccessor::new(
460            self.inline_collection.value().as_inline(),
461            self.fixed_key_size,
462            self.fixed_value_size,
463        );
464        self.start_entry += 1;
465        accessor
466            .entry((self.start_entry - 1).try_into().unwrap())
467            .map(|e| e.key())
468    }
469
470    fn next_key_back(&mut self) -> Option<&[u8]> {
471        if self.end_entry < self.start_entry {
472            return None;
473        }
474        let accessor = LeafAccessor::new(
475            self.inline_collection.value().as_inline(),
476            self.fixed_key_size,
477            self.fixed_value_size,
478        );
479        self.end_entry -= 1;
480        accessor
481            .entry((self.end_entry + 1).try_into().unwrap())
482            .map(|e| e.key())
483    }
484}
485
486enum DynamicCollectionType {
487    Inline,
488    // Was used in file format version 1
489    // Subtree,
490    SubtreeV2,
491}
492
493impl From<u8> for DynamicCollectionType {
494    fn from(value: u8) -> Self {
495        match value {
496            LEAF => Inline,
497            // 2 => Subtree,
498            3 => SubtreeV2,
499            _ => unreachable!(),
500        }
501    }
502}
503
504#[allow(clippy::from_over_into)]
505impl Into<u8> for DynamicCollectionType {
506    fn into(self) -> u8 {
507        match self {
508            // Reuse the LEAF type id, so that we can cast this directly into the format used by
509            // LeafAccessor
510            Inline => LEAF,
511            // Subtree => 2,
512            SubtreeV2 => 3,
513        }
514    }
515}
516
517/// Layout:
518/// type (1 byte):
519/// * 1 = inline data
520/// * 2 = sub tree
521///
522/// (when type = 1) data (n bytes): inlined leaf node
523///
524/// (when type = 2) root (8 bytes): sub tree root page number
525/// (when type = 2) checksum (16 bytes): sub tree checksum
526///
527/// NOTE: Even though the [`PhantomData`] is zero-sized, the inner data DST must be placed last.
528/// See [Exotically Sized Types](https://doc.rust-lang.org/nomicon/exotic-sizes.html#dynamically-sized-types-dsts)
529/// section of the Rustonomicon for more details.
530#[repr(transparent)]
531struct DynamicCollection<V: Key> {
532    _value_type: PhantomData<V>,
533    data: [u8],
534}
535
536impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        f.debug_struct("DynamicCollection")
539            .field("data", &&self.data)
540            .finish()
541    }
542}
543
544impl<V: Key> Value for &DynamicCollection<V> {
545    type SelfType<'a> = &'a DynamicCollection<V>
546    where
547        Self: 'a;
548    type AsBytes<'a> = &'a [u8]
549    where
550        Self: 'a;
551
552    fn fixed_width() -> Option<usize> {
553        None
554    }
555
556    fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
557    where
558        Self: 'a,
559    {
560        DynamicCollection::new(data)
561    }
562
563    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
564    where
565        Self: 'b,
566    {
567        &value.data
568    }
569
570    fn type_name() -> TypeName {
571        TypeName::internal("redb::DynamicCollection")
572    }
573}
574
575impl<V: Key> DynamicCollection<V> {
576    fn new(data: &[u8]) -> &Self {
577        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
578    }
579
580    fn collection_type(&self) -> DynamicCollectionType {
581        DynamicCollectionType::from(self.data[0])
582    }
583
584    fn as_inline(&self) -> &[u8] {
585        debug_assert!(matches!(self.collection_type(), Inline));
586        &self.data[1..]
587    }
588
589    fn as_subtree(&self) -> BtreeHeader {
590        assert!(matches!(self.collection_type(), SubtreeV2));
591        BtreeHeader::from_le_bytes(
592            self.data[1..=BtreeHeader::serialized_size()]
593                .try_into()
594                .unwrap(),
595        )
596    }
597
598    fn get_num_values(&self) -> u64 {
599        match self.collection_type() {
600            Inline => {
601                let leaf_data = self.as_inline();
602                let accessor =
603                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
604                accessor.num_pairs() as u64
605            }
606            SubtreeV2 => {
607                let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
608                u64::from_le_bytes(
609                    self.data[offset..(offset + size_of::<u64>())]
610                        .try_into()
611                        .unwrap(),
612                )
613            }
614        }
615    }
616
617    fn make_inline_data(data: &[u8]) -> Vec<u8> {
618        let mut result = vec![Inline.into()];
619        result.extend_from_slice(data);
620
621        result
622    }
623
624    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
625        let mut result = vec![SubtreeV2.into()];
626        result.extend_from_slice(&header.to_le_bytes());
627        result
628    }
629
630    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
631        None
632    }
633}
634
635impl<V: Key> DynamicCollection<V> {
636    fn iter<'a>(
637        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
638        guard: Arc<TransactionGuard>,
639        mem: Arc<TransactionalMemory>,
640    ) -> Result<MultimapValue<'a, V>> {
641        Ok(match collection.value().collection_type() {
642            Inline => {
643                let leaf_iter =
644                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
645                MultimapValue::new_inline(leaf_iter, guard)
646            }
647            SubtreeV2 => {
648                let root = collection.value().as_subtree().root;
649                MultimapValue::new_subtree(
650                    BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
651                    collection.value().get_num_values(),
652                    guard,
653                )
654            }
655        })
656    }
657
658    fn iter_free_on_drop<'a>(
659        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
660        pages: Vec<PageNumber>,
661        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
662        guard: Arc<TransactionGuard>,
663        mem: Arc<TransactionalMemory>,
664    ) -> Result<MultimapValue<'a, V>> {
665        let num_values = collection.value().get_num_values();
666        Ok(match collection.value().collection_type() {
667            Inline => {
668                let leaf_iter =
669                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
670                MultimapValue::new_inline(leaf_iter, guard)
671            }
672            SubtreeV2 => {
673                let root = collection.value().as_subtree().root;
674                let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
675                    &(..),
676                    Some(root),
677                    mem.clone(),
678                )?;
679                MultimapValue::new_subtree_free_on_drop(
680                    inner,
681                    num_values,
682                    freed_pages,
683                    pages,
684                    guard,
685                    mem,
686                )
687            }
688        })
689    }
690}
691
692#[repr(transparent)]
693struct UntypedDynamicCollection {
694    data: [u8],
695}
696
697impl UntypedDynamicCollection {
698    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
699        None
700    }
701
702    fn new(data: &[u8]) -> &Self {
703        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
704    }
705
706    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
707        let mut result = vec![SubtreeV2.into()];
708        result.extend_from_slice(&header.to_le_bytes());
709        result
710    }
711
712    fn from_bytes(data: &[u8]) -> &Self {
713        Self::new(data)
714    }
715
716    fn collection_type(&self) -> DynamicCollectionType {
717        DynamicCollectionType::from(self.data[0])
718    }
719
720    fn as_inline(&self) -> &[u8] {
721        debug_assert!(matches!(self.collection_type(), Inline));
722        &self.data[1..]
723    }
724
725    fn as_subtree(&self) -> BtreeHeader {
726        assert!(matches!(self.collection_type(), SubtreeV2));
727        BtreeHeader::from_le_bytes(
728            self.data[1..=BtreeHeader::serialized_size()]
729                .try_into()
730                .unwrap(),
731        )
732    }
733}
734
735enum ValueIterState<'a, V: Key + 'static> {
736    Subtree(BtreeRangeIter<V, ()>),
737    InlineLeaf(LeafKeyIter<'a, V>),
738}
739
740pub struct MultimapValue<'a, V: Key + 'static> {
741    inner: Option<ValueIterState<'a, V>>,
742    remaining: u64,
743    freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
744    free_on_drop: Vec<PageNumber>,
745    _transaction_guard: Arc<TransactionGuard>,
746    mem: Option<Arc<TransactionalMemory>>,
747    _value_type: PhantomData<V>,
748}
749
750impl<'a, V: Key + 'static> MultimapValue<'a, V> {
751    fn new_subtree(
752        inner: BtreeRangeIter<V, ()>,
753        num_values: u64,
754        guard: Arc<TransactionGuard>,
755    ) -> Self {
756        Self {
757            inner: Some(ValueIterState::Subtree(inner)),
758            remaining: num_values,
759            freed_pages: None,
760            free_on_drop: vec![],
761            _transaction_guard: guard,
762            mem: None,
763            _value_type: Default::default(),
764        }
765    }
766
767    fn new_subtree_free_on_drop(
768        inner: BtreeRangeIter<V, ()>,
769        num_values: u64,
770        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
771        pages: Vec<PageNumber>,
772        guard: Arc<TransactionGuard>,
773        mem: Arc<TransactionalMemory>,
774    ) -> Self {
775        Self {
776            inner: Some(ValueIterState::Subtree(inner)),
777            remaining: num_values,
778            freed_pages: Some(freed_pages),
779            free_on_drop: pages,
780            _transaction_guard: guard,
781            mem: Some(mem),
782            _value_type: Default::default(),
783        }
784    }
785
786    fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
787        let remaining = inner.inline_collection.value().get_num_values();
788        Self {
789            inner: Some(ValueIterState::InlineLeaf(inner)),
790            remaining,
791            freed_pages: None,
792            free_on_drop: vec![],
793            _transaction_guard: guard,
794            mem: None,
795            _value_type: Default::default(),
796        }
797    }
798
799    /// Returns the number of times this iterator will return `Some(Ok(_))`
800    ///
801    /// Note that `Some` may be returned from `next()` more than `len()` times if `Some(Err(_))` is returned
802    pub fn len(&self) -> u64 {
803        self.remaining
804    }
805
806    pub fn is_empty(&self) -> bool {
807        self.len() == 0
808    }
809}
810
811impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
812    type Item = Result<AccessGuard<'a, V>>;
813
814    fn next(&mut self) -> Option<Self::Item> {
815        // TODO: optimize out this copy
816        let bytes = match self.inner.as_mut().unwrap() {
817            ValueIterState::Subtree(ref mut iter) => match iter.next()? {
818                Ok(e) => e.key_data(),
819                Err(err) => {
820                    return Some(Err(err));
821                }
822            },
823            ValueIterState::InlineLeaf(ref mut iter) => iter.next_key()?.to_vec(),
824        };
825        self.remaining -= 1;
826        Some(Ok(AccessGuard::with_owned_value(bytes)))
827    }
828}
829
830impl<'a, V: Key + 'static> DoubleEndedIterator for MultimapValue<'a, V> {
831    fn next_back(&mut self) -> Option<Self::Item> {
832        // TODO: optimize out this copy
833        let bytes = match self.inner.as_mut().unwrap() {
834            ValueIterState::Subtree(ref mut iter) => match iter.next_back()? {
835                Ok(e) => e.key_data(),
836                Err(err) => {
837                    return Some(Err(err));
838                }
839            },
840            ValueIterState::InlineLeaf(ref mut iter) => iter.next_key_back()?.to_vec(),
841        };
842        Some(Ok(AccessGuard::with_owned_value(bytes)))
843    }
844}
845
846impl<'a, V: Key + 'static> Drop for MultimapValue<'a, V> {
847    fn drop(&mut self) {
848        // Drop our references to the pages that are about to be freed
849        drop(mem::take(&mut self.inner));
850        if !self.free_on_drop.is_empty() {
851            let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
852            for page in &self.free_on_drop {
853                if !self.mem.as_ref().unwrap().free_if_uncommitted(*page) {
854                    freed_pages.push(*page);
855                }
856            }
857        }
858    }
859}
860
861pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
862    inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
863    mem: Arc<TransactionalMemory>,
864    transaction_guard: Arc<TransactionGuard>,
865    _key_type: PhantomData<K>,
866    _value_type: PhantomData<V>,
867    _lifetime: PhantomData<&'a ()>,
868}
869
870impl<'a, K: Key + 'static, V: Key + 'static> MultimapRange<'a, K, V> {
871    fn new(
872        inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
873        guard: Arc<TransactionGuard>,
874        mem: Arc<TransactionalMemory>,
875    ) -> Self {
876        Self {
877            inner,
878            mem,
879            transaction_guard: guard,
880            _key_type: Default::default(),
881            _value_type: Default::default(),
882            _lifetime: Default::default(),
883        }
884    }
885}
886
887impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
888    type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
889
890    fn next(&mut self) -> Option<Self::Item> {
891        match self.inner.next()? {
892            Ok(entry) => {
893                let key = AccessGuard::with_owned_value(entry.key_data());
894                let (page, _, value_range) = entry.into_raw();
895                let collection = AccessGuard::with_page(page, value_range);
896                Some(
897                    DynamicCollection::iter(
898                        collection,
899                        self.transaction_guard.clone(),
900                        self.mem.clone(),
901                    )
902                    .map(|iter| (key, iter)),
903                )
904            }
905            Err(err) => Some(Err(err)),
906        }
907    }
908}
909
910impl<'a, K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'a, K, V> {
911    fn next_back(&mut self) -> Option<Self::Item> {
912        match self.inner.next_back()? {
913            Ok(entry) => {
914                let key = AccessGuard::with_owned_value(entry.key_data());
915                let (page, _, value_range) = entry.into_raw();
916                let collection = AccessGuard::with_page(page, value_range);
917                Some(
918                    DynamicCollection::iter(
919                        collection,
920                        self.transaction_guard.clone(),
921                        self.mem.clone(),
922                    )
923                    .map(|iter| (key, iter)),
924                )
925            }
926            Err(err) => Some(Err(err)),
927        }
928    }
929}
930
931/// A multimap table
932///
933/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
934pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
935    name: String,
936    num_values: u64,
937    transaction: &'txn WriteTransaction,
938    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
939    tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
940    mem: Arc<TransactionalMemory>,
941    _value_type: PhantomData<V>,
942}
943
944impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
945    fn name(&self) -> &str {
946        &self.name
947    }
948}
949
950impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
951    pub(crate) fn new(
952        name: &str,
953        table_root: Option<BtreeHeader>,
954        num_values: u64,
955        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
956        mem: Arc<TransactionalMemory>,
957        transaction: &'txn WriteTransaction,
958    ) -> MultimapTable<'txn, K, V> {
959        MultimapTable {
960            name: name.to_string(),
961            num_values,
962            transaction,
963            freed_pages: freed_pages.clone(),
964            tree: BtreeMut::new(
965                table_root,
966                transaction.transaction_guard(),
967                mem.clone(),
968                freed_pages,
969            ),
970            mem,
971            _value_type: Default::default(),
972        }
973    }
974
975    #[allow(dead_code)]
976    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
977        self.tree.print_debug(include_values)
978    }
979
980    /// Add the given value to the mapping of the key
981    ///
982    /// Returns `true` if the key-value pair was present
983    pub fn insert<'k, 'v>(
984        &mut self,
985        key: impl Borrow<K::SelfType<'k>>,
986        value: impl Borrow<V::SelfType<'v>>,
987    ) -> Result<bool> {
988        let value_bytes = V::as_bytes(value.borrow());
989        let value_bytes_ref = value_bytes.as_ref();
990        if value_bytes_ref.len() > MAX_VALUE_LENGTH {
991            return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
992        }
993        let key_len = K::as_bytes(key.borrow()).as_ref().len();
994        if key_len > MAX_VALUE_LENGTH {
995            return Err(StorageError::ValueTooLarge(key_len));
996        }
997        if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
998            return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
999        }
1000        let get_result = self.tree.get(key.borrow())?;
1001        let existed = if get_result.is_some() {
1002            #[allow(clippy::unnecessary_unwrap)]
1003            let guard = get_result.unwrap();
1004            let collection_type = guard.value().collection_type();
1005            match collection_type {
1006                Inline => {
1007                    let leaf_data = guard.value().as_inline();
1008                    let accessor = LeafAccessor::new(
1009                        leaf_data,
1010                        V::fixed_width(),
1011                        <() as Value>::fixed_width(),
1012                    );
1013                    let (position, found) = accessor.position::<V>(value_bytes_ref);
1014                    if found {
1015                        return Ok(true);
1016                    }
1017
1018                    let num_pairs = accessor.num_pairs();
1019                    let new_pairs = num_pairs + 1;
1020                    let new_pair_bytes =
1021                        accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1022                    let new_key_bytes =
1023                        accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1024                    let required_inline_bytes = RawLeafBuilder::required_bytes(
1025                        new_pairs,
1026                        new_pair_bytes,
1027                        V::fixed_width(),
1028                        <() as Value>::fixed_width(),
1029                    );
1030
1031                    if required_inline_bytes < self.mem.get_page_size() / 2 {
1032                        let mut data = vec![0; required_inline_bytes];
1033                        let mut builder = RawLeafBuilder::new(
1034                            &mut data,
1035                            new_pairs,
1036                            V::fixed_width(),
1037                            <() as Value>::fixed_width(),
1038                            new_key_bytes,
1039                        );
1040                        for i in 0..accessor.num_pairs() {
1041                            if i == position {
1042                                builder
1043                                    .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1044                            }
1045                            let entry = accessor.entry(i).unwrap();
1046                            builder.append(entry.key(), entry.value());
1047                        }
1048                        if position == accessor.num_pairs() {
1049                            builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1050                        }
1051                        drop(builder);
1052                        drop(guard);
1053                        let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1054                        self.tree
1055                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1056                    } else {
1057                        // convert into a subtree
1058                        let mut page = self.mem.allocate(leaf_data.len())?;
1059                        page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1060                        let page_number = page.get_page_number();
1061                        drop(page);
1062                        drop(guard);
1063
1064                        // Don't bother computing the checksum, since we're about to modify the tree
1065                        let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1066                            Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1067                            self.transaction.transaction_guard(),
1068                            self.mem.clone(),
1069                            self.freed_pages.clone(),
1070                        );
1071                        let existed = subtree.insert(value.borrow(), &())?.is_some();
1072                        assert_eq!(existed, found);
1073                        let subtree_data =
1074                            DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1075                        self.tree
1076                            .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1077                    }
1078
1079                    found
1080                }
1081                SubtreeV2 => {
1082                    let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1083                        Some(guard.value().as_subtree()),
1084                        self.transaction.transaction_guard(),
1085                        self.mem.clone(),
1086                        self.freed_pages.clone(),
1087                    );
1088                    drop(guard);
1089                    let existed = subtree.insert(value.borrow(), &())?.is_some();
1090                    let subtree_data =
1091                        DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1092                    self.tree
1093                        .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1094
1095                    existed
1096                }
1097            }
1098        } else {
1099            drop(get_result);
1100            let required_inline_bytes = RawLeafBuilder::required_bytes(
1101                1,
1102                value_bytes_ref.len(),
1103                V::fixed_width(),
1104                <() as Value>::fixed_width(),
1105            );
1106            if required_inline_bytes < self.mem.get_page_size() / 2 {
1107                let mut data = vec![0; required_inline_bytes];
1108                let mut builder = RawLeafBuilder::new(
1109                    &mut data,
1110                    1,
1111                    V::fixed_width(),
1112                    <() as Value>::fixed_width(),
1113                    value_bytes_ref.len(),
1114                );
1115                builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1116                drop(builder);
1117                let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1118                self.tree
1119                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1120            } else {
1121                let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1122                    None,
1123                    self.transaction.transaction_guard(),
1124                    self.mem.clone(),
1125                    self.freed_pages.clone(),
1126                );
1127                subtree.insert(value.borrow(), &())?;
1128                let subtree_data =
1129                    DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1130                self.tree
1131                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1132            }
1133            false
1134        };
1135
1136        if !existed {
1137            self.num_values += 1;
1138        }
1139
1140        Ok(existed)
1141    }
1142
1143    /// Removes the given key-value pair
1144    ///
1145    /// Returns `true` if the key-value pair was present
1146    pub fn remove<'k, 'v>(
1147        &mut self,
1148        key: impl Borrow<K::SelfType<'k>>,
1149        value: impl Borrow<V::SelfType<'v>>,
1150    ) -> Result<bool> {
1151        let get_result = self.tree.get(key.borrow())?;
1152        if get_result.is_none() {
1153            return Ok(false);
1154        }
1155        let guard = get_result.unwrap();
1156        let v = guard.value();
1157        let existed = match v.collection_type() {
1158            Inline => {
1159                let leaf_data = v.as_inline();
1160                let accessor =
1161                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1162                if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1163                {
1164                    let old_num_pairs = accessor.num_pairs();
1165                    if old_num_pairs == 1 {
1166                        drop(guard);
1167                        self.tree.remove(key.borrow())?;
1168                    } else {
1169                        let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1170                        let removed_value_len = accessor.entry(position).unwrap().key().len();
1171                        let required = RawLeafBuilder::required_bytes(
1172                            old_num_pairs - 1,
1173                            old_pairs_len - removed_value_len,
1174                            V::fixed_width(),
1175                            <() as Value>::fixed_width(),
1176                        );
1177                        let mut new_data = vec![0; required];
1178                        let new_key_len =
1179                            accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1180                        let mut builder = RawLeafBuilder::new(
1181                            &mut new_data,
1182                            old_num_pairs - 1,
1183                            V::fixed_width(),
1184                            <() as Value>::fixed_width(),
1185                            new_key_len,
1186                        );
1187                        for i in 0..old_num_pairs {
1188                            if i != position {
1189                                let entry = accessor.entry(i).unwrap();
1190                                builder.append(entry.key(), entry.value());
1191                            }
1192                        }
1193                        drop(builder);
1194                        drop(guard);
1195
1196                        let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1197                        self.tree
1198                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1199                    }
1200                    true
1201                } else {
1202                    drop(guard);
1203                    false
1204                }
1205            }
1206            SubtreeV2 => {
1207                let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1208                    Some(v.as_subtree()),
1209                    self.transaction.transaction_guard(),
1210                    self.mem.clone(),
1211                    self.freed_pages.clone(),
1212                );
1213                drop(guard);
1214                let existed = subtree.remove(value.borrow())?.is_some();
1215
1216                if let Some(BtreeHeader {
1217                    root: new_root,
1218                    checksum: new_checksum,
1219                    length: new_length,
1220                }) = subtree.get_root()
1221                {
1222                    let page = self.mem.get_page(new_root)?;
1223                    match page.memory()[0] {
1224                        LEAF => {
1225                            let accessor = LeafAccessor::new(
1226                                page.memory(),
1227                                V::fixed_width(),
1228                                <() as Value>::fixed_width(),
1229                            );
1230                            let len = accessor.total_length();
1231                            if len < self.mem.get_page_size() / 2 {
1232                                let inline_data =
1233                                    DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1234                                self.tree
1235                                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1236                                drop(page);
1237                                if !self.mem.free_if_uncommitted(new_root) {
1238                                    (*self.freed_pages).lock().unwrap().push(new_root);
1239                                }
1240                            } else {
1241                                let subtree_data =
1242                                    DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1243                                        new_root,
1244                                        new_checksum,
1245                                        accessor.num_pairs() as u64,
1246                                    ));
1247                                self.tree
1248                                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1249                            }
1250                        }
1251                        BRANCH => {
1252                            let subtree_data = DynamicCollection::<V>::make_subtree_data(
1253                                BtreeHeader::new(new_root, new_checksum, new_length),
1254                            );
1255                            self.tree
1256                                .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1257                        }
1258                        _ => unreachable!(),
1259                    }
1260                } else {
1261                    self.tree.remove(key.borrow())?;
1262                }
1263
1264                existed
1265            }
1266        };
1267
1268        if existed {
1269            self.num_values -= 1;
1270        }
1271
1272        Ok(existed)
1273    }
1274
1275    /// Removes all values for the given key
1276    ///
1277    /// Returns an iterator over the removed values. Values are in ascending order.
1278    pub fn remove_all<'a>(
1279        &mut self,
1280        key: impl Borrow<K::SelfType<'a>>,
1281    ) -> Result<MultimapValue<V>> {
1282        let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1283            let mut pages = vec![];
1284            if matches!(
1285                collection.value().collection_type(),
1286                DynamicCollectionType::SubtreeV2
1287            ) {
1288                let root = collection.value().as_subtree().root;
1289                let all_pages = AllPageNumbersBtreeIter::new(
1290                    root,
1291                    V::fixed_width(),
1292                    <() as Value>::fixed_width(),
1293                    self.mem.clone(),
1294                )?;
1295                for page in all_pages {
1296                    pages.push(page?);
1297                }
1298            }
1299
1300            self.num_values -= collection.value().get_num_values();
1301
1302            DynamicCollection::iter_free_on_drop(
1303                collection,
1304                pages,
1305                self.freed_pages.clone(),
1306                self.transaction.transaction_guard(),
1307                self.mem.clone(),
1308            )?
1309        } else {
1310            MultimapValue::new_subtree(
1311                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1312                0,
1313                self.transaction.transaction_guard(),
1314            )
1315        };
1316
1317        Ok(iter)
1318    }
1319}
1320
1321impl<'txn, K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'txn, K, V> {
1322    fn stats(&self) -> Result<TableStats> {
1323        let tree_stats = multimap_btree_stats(
1324            self.tree.get_root().map(|x| x.root),
1325            &self.mem,
1326            K::fixed_width(),
1327            V::fixed_width(),
1328        )?;
1329
1330        Ok(TableStats {
1331            tree_height: tree_stats.tree_height,
1332            leaf_pages: tree_stats.leaf_pages,
1333            branch_pages: tree_stats.branch_pages,
1334            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1335            metadata_bytes: tree_stats.metadata_bytes,
1336            fragmented_bytes: tree_stats.fragmented_bytes,
1337        })
1338    }
1339
1340    /// Returns the number of key-value pairs in the table
1341    fn len(&self) -> Result<u64> {
1342        Ok(self.num_values)
1343    }
1344}
1345
1346impl<'txn, K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1347    for MultimapTable<'txn, K, V>
1348{
1349    /// Returns an iterator over all values for the given key. Values are in ascending order.
1350    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1351        let guard = self.transaction.transaction_guard();
1352        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1353            DynamicCollection::iter(collection, guard, self.mem.clone())?
1354        } else {
1355            MultimapValue::new_subtree(
1356                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1357                0,
1358                guard,
1359            )
1360        };
1361
1362        Ok(iter)
1363    }
1364
1365    /// Returns a double-ended iterator over a range of elements in the table
1366    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1367    where
1368        KR: Borrow<K::SelfType<'a>> + 'a,
1369    {
1370        let inner = self.tree.range(&range)?;
1371        Ok(MultimapRange::new(
1372            inner,
1373            self.transaction.transaction_guard(),
1374            self.mem.clone(),
1375        ))
1376    }
1377}
1378
1379impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1380
1381impl<'txn, K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'txn, K, V> {
1382    fn drop(&mut self) {
1383        self.transaction
1384            .close_table(&self.name, &self.tree, self.num_values);
1385    }
1386}
1387
1388pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1389    /// Returns an iterator over all values for the given key. Values are in ascending order.
1390    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1391
1392    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1393    where
1394        KR: Borrow<K::SelfType<'a>> + 'a;
1395
1396    /// Returns an double-ended iterator over all elements in the table. Values are in ascending
1397    /// order.
1398    fn iter(&self) -> Result<MultimapRange<K, V>> {
1399        self.range::<K::SelfType<'_>>(..)
1400    }
1401}
1402
1403/// A read-only untyped multimap table
1404pub struct ReadOnlyUntypedMultimapTable {
1405    num_values: u64,
1406    tree: RawBtree,
1407    fixed_key_size: Option<usize>,
1408    fixed_value_size: Option<usize>,
1409    mem: Arc<TransactionalMemory>,
1410}
1411
1412impl Sealed for ReadOnlyUntypedMultimapTable {}
1413
1414impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1415    /// Retrieves information about storage usage for the table
1416    fn stats(&self) -> Result<TableStats> {
1417        let tree_stats = multimap_btree_stats(
1418            self.tree.get_root().map(|x| x.root),
1419            &self.mem,
1420            self.fixed_key_size,
1421            self.fixed_value_size,
1422        )?;
1423
1424        Ok(TableStats {
1425            tree_height: tree_stats.tree_height,
1426            leaf_pages: tree_stats.leaf_pages,
1427            branch_pages: tree_stats.branch_pages,
1428            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1429            metadata_bytes: tree_stats.metadata_bytes,
1430            fragmented_bytes: tree_stats.fragmented_bytes,
1431        })
1432    }
1433
1434    fn len(&self) -> Result<u64> {
1435        Ok(self.num_values)
1436    }
1437}
1438
1439impl ReadOnlyUntypedMultimapTable {
1440    pub(crate) fn new(
1441        root: Option<BtreeHeader>,
1442        num_values: u64,
1443        fixed_key_size: Option<usize>,
1444        fixed_value_size: Option<usize>,
1445        mem: Arc<TransactionalMemory>,
1446    ) -> Self {
1447        Self {
1448            num_values,
1449            tree: RawBtree::new(
1450                root,
1451                fixed_key_size,
1452                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1453                mem.clone(),
1454            ),
1455            fixed_key_size,
1456            fixed_value_size,
1457            mem,
1458        }
1459    }
1460}
1461
1462/// A read-only multimap table
1463pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1464    tree: Btree<K, &'static DynamicCollection<V>>,
1465    num_values: u64,
1466    mem: Arc<TransactionalMemory>,
1467    transaction_guard: Arc<TransactionGuard>,
1468    _value_type: PhantomData<V>,
1469}
1470
1471impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1472    pub(crate) fn new(
1473        root: Option<BtreeHeader>,
1474        num_values: u64,
1475        hint: PageHint,
1476        guard: Arc<TransactionGuard>,
1477        mem: Arc<TransactionalMemory>,
1478    ) -> Result<ReadOnlyMultimapTable<K, V>> {
1479        Ok(ReadOnlyMultimapTable {
1480            tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1481            num_values,
1482            mem,
1483            transaction_guard: guard,
1484            _value_type: Default::default(),
1485        })
1486    }
1487
1488    /// This method is like [`ReadableMultimapTable::get()`], but the iterator is reference counted and keeps the transaction
1489    /// alive until it is dropped.
1490    pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1491        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1492            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1493        } else {
1494            MultimapValue::new_subtree(
1495                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1496                0,
1497                self.transaction_guard.clone(),
1498            )
1499        };
1500
1501        Ok(iter)
1502    }
1503
1504    /// This method is like [`ReadableMultimapTable::range()`], but the iterator is reference counted and keeps the transaction
1505    /// alive until it is dropped.
1506    pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1507    where
1508        KR: Borrow<K::SelfType<'a>>,
1509    {
1510        let inner = self.tree.range(&range)?;
1511        Ok(MultimapRange::new(
1512            inner,
1513            self.transaction_guard.clone(),
1514            self.mem.clone(),
1515        ))
1516    }
1517}
1518
1519impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1520    fn stats(&self) -> Result<TableStats> {
1521        let tree_stats = multimap_btree_stats(
1522            self.tree.get_root().map(|x| x.root),
1523            &self.mem,
1524            K::fixed_width(),
1525            V::fixed_width(),
1526        )?;
1527
1528        Ok(TableStats {
1529            tree_height: tree_stats.tree_height,
1530            leaf_pages: tree_stats.leaf_pages,
1531            branch_pages: tree_stats.branch_pages,
1532            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1533            metadata_bytes: tree_stats.metadata_bytes,
1534            fragmented_bytes: tree_stats.fragmented_bytes,
1535        })
1536    }
1537
1538    fn len(&self) -> Result<u64> {
1539        Ok(self.num_values)
1540    }
1541}
1542
1543impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1544    for ReadOnlyMultimapTable<K, V>
1545{
1546    /// Returns an iterator over all values for the given key. Values are in ascending order.
1547    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1548        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1549            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1550        } else {
1551            MultimapValue::new_subtree(
1552                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1553                0,
1554                self.transaction_guard.clone(),
1555            )
1556        };
1557
1558        Ok(iter)
1559    }
1560
1561    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1562    where
1563        KR: Borrow<K::SelfType<'a>> + 'a,
1564    {
1565        let inner = self.tree.range(&range)?;
1566        Ok(MultimapRange::new(
1567            inner,
1568            self.transaction_guard.clone(),
1569            self.mem.clone(),
1570        ))
1571    }
1572}
1573
1574impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}