redb/
multimap_table.rs

1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6    AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
7    BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
8    MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, RawBtree,
9    RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24    root: Option<PageNumber>,
25    mem: &TransactionalMemory,
26    fixed_key_size: Option<usize>,
27    fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29    if let Some(root) = root {
30        multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31    } else {
32        Ok(BtreeStats {
33            tree_height: 0,
34            leaf_pages: 0,
35            branch_pages: 0,
36            stored_leaf_bytes: 0,
37            metadata_bytes: 0,
38            fragmented_bytes: 0,
39        })
40    }
41}
42
43fn multimap_stats_helper(
44    page_number: PageNumber,
45    mem: &TransactionalMemory,
46    fixed_key_size: Option<usize>,
47    fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49    let page = mem.get_page(page_number)?;
50    let node_mem = page.memory();
51    match node_mem[0] {
52        LEAF => {
53            let accessor = LeafAccessor::new(
54                page.memory(),
55                fixed_key_size,
56                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57            );
58            let mut leaf_bytes = 0u64;
59            let mut is_branch = false;
60            for i in 0..accessor.num_pairs() {
61                let entry = accessor.entry(i).unwrap();
62                let collection: &UntypedDynamicCollection =
63                    UntypedDynamicCollection::new(entry.value());
64                match collection.collection_type() {
65                    Inline => {
66                        let inline_accessor = LeafAccessor::new(
67                            collection.as_inline(),
68                            fixed_value_size,
69                            <() as Value>::fixed_width(),
70                        );
71                        leaf_bytes +=
72                            inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73                    }
74                    SubtreeV2 => {
75                        is_branch = true;
76                    }
77                }
78            }
79            let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81            let mut max_child_height = 0;
82            let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84            for i in 0..accessor.num_pairs() {
85                let entry = accessor.entry(i).unwrap();
86                let collection: &UntypedDynamicCollection =
87                    UntypedDynamicCollection::new(entry.value());
88                match collection.collection_type() {
89                    Inline => {
90                        // data is inline, so it was already counted above
91                    }
92                    SubtreeV2 => {
93                        // this is a sub-tree, so traverse it
94                        let stats = btree_stats(
95                            Some(collection.as_subtree().root),
96                            mem,
97                            fixed_value_size,
98                            <() as Value>::fixed_width(),
99                        )?;
100                        max_child_height = max(max_child_height, stats.tree_height);
101                        branch_pages += stats.branch_pages;
102                        leaf_pages += stats.leaf_pages;
103                        fragmented_bytes += stats.fragmented_bytes;
104                        overhead_bytes += stats.metadata_bytes;
105                        leaf_bytes += stats.stored_leaf_bytes;
106                    }
107                }
108            }
109
110            Ok(BtreeStats {
111                tree_height: max_child_height + 1,
112                leaf_pages,
113                branch_pages,
114                stored_leaf_bytes: leaf_bytes,
115                metadata_bytes: overhead_bytes,
116                fragmented_bytes,
117            })
118        }
119        BRANCH => {
120            let accessor = BranchAccessor::new(&page, fixed_key_size);
121            let mut max_child_height = 0;
122            let mut leaf_pages = 0;
123            let mut branch_pages = 1;
124            let mut stored_leaf_bytes = 0;
125            let mut metadata_bytes = accessor.total_length() as u64;
126            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127            for i in 0..accessor.count_children() {
128                if let Some(child) = accessor.child_page(i) {
129                    let stats =
130                        multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131                    max_child_height = max(max_child_height, stats.tree_height);
132                    leaf_pages += stats.leaf_pages;
133                    branch_pages += stats.branch_pages;
134                    stored_leaf_bytes += stats.stored_leaf_bytes;
135                    metadata_bytes += stats.metadata_bytes;
136                    fragmented_bytes += stats.fragmented_bytes;
137                }
138            }
139
140            Ok(BtreeStats {
141                tree_height: max_child_height + 1,
142                leaf_pages,
143                branch_pages,
144                stored_leaf_bytes,
145                metadata_bytes,
146                fragmented_bytes,
147            })
148        }
149        _ => unreachable!(),
150    }
151}
152
153// Verify all the checksums in the tree, including any Dynamic collection subtrees
154pub(crate) fn verify_tree_and_subtree_checksums(
155    root: Option<BtreeHeader>,
156    key_size: Option<usize>,
157    value_size: Option<usize>,
158    mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160    if let Some(header) = root {
161        if !RawBtree::new(
162            Some(header),
163            key_size,
164            DynamicCollection::<()>::fixed_width_with(value_size),
165            mem.clone(),
166        )
167        .verify_checksum()?
168        {
169            return Ok(false);
170        }
171
172        let table_pages_iter = AllPageNumbersBtreeIter::new(
173            header.root,
174            key_size,
175            DynamicCollection::<()>::fixed_width_with(value_size),
176            mem.clone(),
177        )?;
178        for table_page in table_pages_iter {
179            let page = mem.get_page(table_page?)?;
180            let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181            for header in subtree_roots {
182                if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183                    .verify_checksum()?
184                {
185                    return Ok(false);
186                }
187            }
188        }
189    }
190
191    Ok(true)
192}
193
194// Relocate all subtrees to lower index pages, if possible
195pub(crate) fn relocate_subtrees(
196    root: (PageNumber, Checksum),
197    key_size: Option<usize>,
198    value_size: Option<usize>,
199    mem: Arc<TransactionalMemory>,
200    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201    relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203    let old_page = mem.get_page(root.0)?;
204    let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205        mem.get_page_mut(*new_page_number)?
206    } else {
207        return Ok(root);
208    };
209    let new_page_number = new_page.get_page_number();
210    new_page.memory_mut().copy_from_slice(old_page.memory());
211
212    match old_page.memory()[0] {
213        LEAF => {
214            let accessor = LeafAccessor::new(
215                old_page.memory(),
216                key_size,
217                UntypedDynamicCollection::fixed_width_with(value_size),
218            );
219            // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
220            let mut mutator = LeafMutator::new(
221                &mut new_page,
222                key_size,
223                UntypedDynamicCollection::fixed_width_with(value_size),
224            );
225            for i in 0..accessor.num_pairs() {
226                let entry = accessor.entry(i).unwrap();
227                let collection = UntypedDynamicCollection::from_bytes(entry.value());
228                if matches!(collection.collection_type(), SubtreeV2) {
229                    let sub_root = collection.as_subtree();
230                    let mut tree = UntypedBtreeMut::new(
231                        Some(sub_root),
232                        mem.clone(),
233                        freed_pages.clone(),
234                        value_size,
235                        <() as Value>::fixed_width(),
236                    );
237                    tree.relocate(relocation_map)?;
238                    if sub_root != tree.get_root().unwrap() {
239                        let new_collection =
240                            UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241                        mutator.insert(i, true, entry.key(), &new_collection);
242                    }
243                }
244            }
245        }
246        BRANCH => {
247            let accessor = BranchAccessor::new(&old_page, key_size);
248            let mut mutator = BranchMutator::new(&mut new_page);
249            for i in 0..accessor.count_children() {
250                if let Some(child) = accessor.child_page(i) {
251                    let child_checksum = accessor.child_checksum(i).unwrap();
252                    let (new_child, new_checksum) = relocate_subtrees(
253                        (child, child_checksum),
254                        key_size,
255                        value_size,
256                        mem.clone(),
257                        freed_pages.clone(),
258                        relocation_map,
259                    )?;
260                    mutator.write_child_page(i, new_child, new_checksum);
261                }
262            }
263        }
264        _ => unreachable!(),
265    }
266
267    let old_page_number = old_page.get_page_number();
268    drop(old_page);
269    if !mem.free_if_uncommitted(old_page_number) {
270        freed_pages.lock().unwrap().push(old_page_number);
271    }
272    Ok((new_page_number, DEFERRED))
273}
274
275// Finalize all the checksums in the tree, including any Dynamic collection subtrees
276// Returns the root checksum
277pub(crate) fn finalize_tree_and_subtree_checksums(
278    root: Option<BtreeHeader>,
279    key_size: Option<usize>,
280    value_size: Option<usize>,
281    mem: Arc<TransactionalMemory>,
282) -> Result<Option<BtreeHeader>> {
283    let freed_pages = Arc::new(Mutex::new(vec![]));
284    let mut tree = UntypedBtreeMut::new(
285        root,
286        mem.clone(),
287        freed_pages.clone(),
288        key_size,
289        DynamicCollection::<()>::fixed_width_with(value_size),
290    );
291    tree.dirty_leaf_visitor(|mut leaf_page| {
292        let mut sub_root_updates = vec![];
293        let accessor = LeafAccessor::new(
294            leaf_page.memory(),
295            key_size,
296            DynamicCollection::<()>::fixed_width_with(value_size),
297        );
298        for i in 0..accessor.num_pairs() {
299            let entry = accessor.entry(i).unwrap();
300            let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
301            if matches!(collection.collection_type(), SubtreeV2) {
302                let sub_root = collection.as_subtree();
303                if mem.uncommitted(sub_root.root) {
304                    let mut subtree = UntypedBtreeMut::new(
305                        Some(sub_root),
306                        mem.clone(),
307                        freed_pages.clone(),
308                        value_size,
309                        <()>::fixed_width(),
310                    );
311                    let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
312                    sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
313                }
314            }
315        }
316        // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
317        let mut mutator = LeafMutator::new(
318            &mut leaf_page,
319            key_size,
320            DynamicCollection::<()>::fixed_width_with(value_size),
321        );
322        for (i, key, sub_root) in sub_root_updates {
323            let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
324            mutator.insert(i, true, &key, &collection);
325        }
326
327        Ok(())
328    })?;
329
330    let root = tree.finalize_dirty_checksums()?;
331    // No pages should have been freed by this operation
332    assert!(freed_pages.lock().unwrap().is_empty());
333    Ok(root)
334}
335
336fn parse_subtree_roots<T: Page>(
337    page: &T,
338    fixed_key_size: Option<usize>,
339    fixed_value_size: Option<usize>,
340) -> Vec<BtreeHeader> {
341    match page.memory()[0] {
342        BRANCH => {
343            vec![]
344        }
345        LEAF => {
346            let mut result = vec![];
347            let accessor = LeafAccessor::new(
348                page.memory(),
349                fixed_key_size,
350                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
351            );
352            for i in 0..accessor.num_pairs() {
353                let entry = accessor.entry(i).unwrap();
354                let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
355                if matches!(collection.collection_type(), SubtreeV2) {
356                    result.push(collection.as_subtree());
357                }
358            }
359
360            result
361        }
362        _ => unreachable!(),
363    }
364}
365
366pub(crate) struct UntypedMultiBtree {
367    mem: Arc<TransactionalMemory>,
368    root: Option<BtreeHeader>,
369    key_width: Option<usize>,
370    value_width: Option<usize>,
371}
372
373impl UntypedMultiBtree {
374    pub(crate) fn new(
375        root: Option<BtreeHeader>,
376        mem: Arc<TransactionalMemory>,
377        key_width: Option<usize>,
378        value_width: Option<usize>,
379    ) -> Self {
380        Self {
381            mem,
382            root,
383            key_width,
384            value_width,
385        }
386    }
387
388    // Applies visitor to pages in the tree
389    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
390    where
391        F: FnMut(&PagePath) -> Result,
392    {
393        let tree = UntypedBtree::new(
394            self.root,
395            self.mem.clone(),
396            self.key_width,
397            UntypedDynamicCollection::fixed_width_with(self.value_width),
398        );
399        tree.visit_all_pages(|path| {
400            visitor(path)?;
401            let page = self.mem.get_page(path.page_number())?;
402            match page.memory()[0] {
403                LEAF => {
404                    for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
405                        let subtree = UntypedBtree::new(
406                            Some(header),
407                            self.mem.clone(),
408                            self.value_width,
409                            <() as Value>::fixed_width(),
410                        );
411                        subtree.visit_all_pages(|subpath| {
412                            let full_path = path.with_subpath(subpath);
413                            visitor(&full_path)
414                        })?;
415                    }
416                }
417                BRANCH => {
418                    // No-op. The tree.visit_pages() call will process this sub-tree
419                }
420                _ => unreachable!(),
421            }
422            Ok(())
423        })?;
424
425        Ok(())
426    }
427}
428
429pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
430    inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
431    fixed_key_size: Option<usize>,
432    fixed_value_size: Option<usize>,
433    start_entry: isize, // inclusive
434    end_entry: isize,   // inclusive
435}
436
437impl<'a, V: Key> LeafKeyIter<'a, V> {
438    fn new(
439        data: AccessGuard<'a, &'static DynamicCollection<V>>,
440        fixed_key_size: Option<usize>,
441        fixed_value_size: Option<usize>,
442    ) -> Self {
443        let accessor =
444            LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
445        let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
446        Self {
447            inline_collection: data,
448            fixed_key_size,
449            fixed_value_size,
450            start_entry: 0,
451            end_entry,
452        }
453    }
454
455    fn next_key(&mut self) -> Option<&[u8]> {
456        if self.end_entry < self.start_entry {
457            return None;
458        }
459        let accessor = LeafAccessor::new(
460            self.inline_collection.value().as_inline(),
461            self.fixed_key_size,
462            self.fixed_value_size,
463        );
464        self.start_entry += 1;
465        accessor
466            .entry((self.start_entry - 1).try_into().unwrap())
467            .map(|e| e.key())
468    }
469
470    fn next_key_back(&mut self) -> Option<&[u8]> {
471        if self.end_entry < self.start_entry {
472            return None;
473        }
474        let accessor = LeafAccessor::new(
475            self.inline_collection.value().as_inline(),
476            self.fixed_key_size,
477            self.fixed_value_size,
478        );
479        self.end_entry -= 1;
480        accessor
481            .entry((self.end_entry + 1).try_into().unwrap())
482            .map(|e| e.key())
483    }
484}
485
486enum DynamicCollectionType {
487    Inline,
488    // Was used in file format version 1
489    // Subtree,
490    SubtreeV2,
491}
492
493impl From<u8> for DynamicCollectionType {
494    fn from(value: u8) -> Self {
495        match value {
496            LEAF => Inline,
497            // 2 => Subtree,
498            3 => SubtreeV2,
499            _ => unreachable!(),
500        }
501    }
502}
503
504#[allow(clippy::from_over_into)]
505impl Into<u8> for DynamicCollectionType {
506    fn into(self) -> u8 {
507        match self {
508            // Reuse the LEAF type id, so that we can cast this directly into the format used by
509            // LeafAccessor
510            Inline => LEAF,
511            // Subtree => 2,
512            SubtreeV2 => 3,
513        }
514    }
515}
516
517/// Layout:
518/// type (1 byte):
519/// * 1 = inline data
520/// * 2 = sub tree
521///
522/// (when type = 1) data (n bytes): inlined leaf node
523///
524/// (when type = 2) root (8 bytes): sub tree root page number
525/// (when type = 2) checksum (16 bytes): sub tree checksum
526///
527/// NOTE: Even though the [`PhantomData`] is zero-sized, the inner data DST must be placed last.
528/// See [Exotically Sized Types](https://doc.rust-lang.org/nomicon/exotic-sizes.html#dynamically-sized-types-dsts)
529/// section of the Rustonomicon for more details.
530#[repr(transparent)]
531struct DynamicCollection<V: Key> {
532    _value_type: PhantomData<V>,
533    data: [u8],
534}
535
536impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        f.debug_struct("DynamicCollection")
539            .field("data", &&self.data)
540            .finish()
541    }
542}
543
544impl<V: Key> Value for &DynamicCollection<V> {
545    type SelfType<'a>
546        = &'a DynamicCollection<V>
547    where
548        Self: 'a;
549    type AsBytes<'a>
550        = &'a [u8]
551    where
552        Self: 'a;
553
554    fn fixed_width() -> Option<usize> {
555        None
556    }
557
558    fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
559    where
560        Self: 'a,
561    {
562        DynamicCollection::new(data)
563    }
564
565    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
566    where
567        Self: 'b,
568    {
569        &value.data
570    }
571
572    fn type_name() -> TypeName {
573        TypeName::internal("redb::DynamicCollection")
574    }
575}
576
577impl<V: Key> DynamicCollection<V> {
578    fn new(data: &[u8]) -> &Self {
579        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
580    }
581
582    fn collection_type(&self) -> DynamicCollectionType {
583        DynamicCollectionType::from(self.data[0])
584    }
585
586    fn as_inline(&self) -> &[u8] {
587        debug_assert!(matches!(self.collection_type(), Inline));
588        &self.data[1..]
589    }
590
591    fn as_subtree(&self) -> BtreeHeader {
592        assert!(matches!(self.collection_type(), SubtreeV2));
593        BtreeHeader::from_le_bytes(
594            self.data[1..=BtreeHeader::serialized_size()]
595                .try_into()
596                .unwrap(),
597        )
598    }
599
600    fn get_num_values(&self) -> u64 {
601        match self.collection_type() {
602            Inline => {
603                let leaf_data = self.as_inline();
604                let accessor =
605                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
606                accessor.num_pairs() as u64
607            }
608            SubtreeV2 => {
609                let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
610                u64::from_le_bytes(
611                    self.data[offset..(offset + size_of::<u64>())]
612                        .try_into()
613                        .unwrap(),
614                )
615            }
616        }
617    }
618
619    fn make_inline_data(data: &[u8]) -> Vec<u8> {
620        let mut result = vec![Inline.into()];
621        result.extend_from_slice(data);
622
623        result
624    }
625
626    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
627        let mut result = vec![SubtreeV2.into()];
628        result.extend_from_slice(&header.to_le_bytes());
629        result
630    }
631
632    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
633        None
634    }
635}
636
637impl<V: Key> DynamicCollection<V> {
638    fn iter<'a>(
639        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
640        guard: Arc<TransactionGuard>,
641        mem: Arc<TransactionalMemory>,
642    ) -> Result<MultimapValue<'a, V>> {
643        Ok(match collection.value().collection_type() {
644            Inline => {
645                let leaf_iter =
646                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
647                MultimapValue::new_inline(leaf_iter, guard)
648            }
649            SubtreeV2 => {
650                let root = collection.value().as_subtree().root;
651                MultimapValue::new_subtree(
652                    BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
653                    collection.value().get_num_values(),
654                    guard,
655                )
656            }
657        })
658    }
659
660    fn iter_free_on_drop<'a>(
661        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
662        pages: Vec<PageNumber>,
663        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
664        guard: Arc<TransactionGuard>,
665        mem: Arc<TransactionalMemory>,
666    ) -> Result<MultimapValue<'a, V>> {
667        let num_values = collection.value().get_num_values();
668        Ok(match collection.value().collection_type() {
669            Inline => {
670                let leaf_iter =
671                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
672                MultimapValue::new_inline(leaf_iter, guard)
673            }
674            SubtreeV2 => {
675                let root = collection.value().as_subtree().root;
676                let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
677                    &(..),
678                    Some(root),
679                    mem.clone(),
680                )?;
681                MultimapValue::new_subtree_free_on_drop(
682                    inner,
683                    num_values,
684                    freed_pages,
685                    pages,
686                    guard,
687                    mem,
688                )
689            }
690        })
691    }
692}
693
694#[repr(transparent)]
695struct UntypedDynamicCollection {
696    data: [u8],
697}
698
699impl UntypedDynamicCollection {
700    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
701        None
702    }
703
704    fn new(data: &[u8]) -> &Self {
705        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
706    }
707
708    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
709        let mut result = vec![SubtreeV2.into()];
710        result.extend_from_slice(&header.to_le_bytes());
711        result
712    }
713
714    fn from_bytes(data: &[u8]) -> &Self {
715        Self::new(data)
716    }
717
718    fn collection_type(&self) -> DynamicCollectionType {
719        DynamicCollectionType::from(self.data[0])
720    }
721
722    fn as_inline(&self) -> &[u8] {
723        debug_assert!(matches!(self.collection_type(), Inline));
724        &self.data[1..]
725    }
726
727    fn as_subtree(&self) -> BtreeHeader {
728        assert!(matches!(self.collection_type(), SubtreeV2));
729        BtreeHeader::from_le_bytes(
730            self.data[1..=BtreeHeader::serialized_size()]
731                .try_into()
732                .unwrap(),
733        )
734    }
735}
736
737enum ValueIterState<'a, V: Key + 'static> {
738    Subtree(BtreeRangeIter<V, ()>),
739    InlineLeaf(LeafKeyIter<'a, V>),
740}
741
742pub struct MultimapValue<'a, V: Key + 'static> {
743    inner: Option<ValueIterState<'a, V>>,
744    remaining: u64,
745    freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
746    free_on_drop: Vec<PageNumber>,
747    _transaction_guard: Arc<TransactionGuard>,
748    mem: Option<Arc<TransactionalMemory>>,
749    _value_type: PhantomData<V>,
750}
751
752impl<'a, V: Key + 'static> MultimapValue<'a, V> {
753    fn new_subtree(
754        inner: BtreeRangeIter<V, ()>,
755        num_values: u64,
756        guard: Arc<TransactionGuard>,
757    ) -> Self {
758        Self {
759            inner: Some(ValueIterState::Subtree(inner)),
760            remaining: num_values,
761            freed_pages: None,
762            free_on_drop: vec![],
763            _transaction_guard: guard,
764            mem: None,
765            _value_type: Default::default(),
766        }
767    }
768
769    fn new_subtree_free_on_drop(
770        inner: BtreeRangeIter<V, ()>,
771        num_values: u64,
772        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
773        pages: Vec<PageNumber>,
774        guard: Arc<TransactionGuard>,
775        mem: Arc<TransactionalMemory>,
776    ) -> Self {
777        Self {
778            inner: Some(ValueIterState::Subtree(inner)),
779            remaining: num_values,
780            freed_pages: Some(freed_pages),
781            free_on_drop: pages,
782            _transaction_guard: guard,
783            mem: Some(mem),
784            _value_type: Default::default(),
785        }
786    }
787
788    fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
789        let remaining = inner.inline_collection.value().get_num_values();
790        Self {
791            inner: Some(ValueIterState::InlineLeaf(inner)),
792            remaining,
793            freed_pages: None,
794            free_on_drop: vec![],
795            _transaction_guard: guard,
796            mem: None,
797            _value_type: Default::default(),
798        }
799    }
800
801    /// Returns the number of times this iterator will return `Some(Ok(_))`
802    ///
803    /// Note that `Some` may be returned from `next()` more than `len()` times if `Some(Err(_))` is returned
804    pub fn len(&self) -> u64 {
805        self.remaining
806    }
807
808    pub fn is_empty(&self) -> bool {
809        self.len() == 0
810    }
811}
812
813impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
814    type Item = Result<AccessGuard<'a, V>>;
815
816    fn next(&mut self) -> Option<Self::Item> {
817        // TODO: optimize out this copy
818        let bytes = match self.inner.as_mut().unwrap() {
819            ValueIterState::Subtree(iter) => match iter.next()? {
820                Ok(e) => e.key_data(),
821                Err(err) => {
822                    return Some(Err(err));
823                }
824            },
825            ValueIterState::InlineLeaf(iter) => iter.next_key()?.to_vec(),
826        };
827        self.remaining -= 1;
828        Some(Ok(AccessGuard::with_owned_value(bytes)))
829    }
830}
831
832impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
833    fn next_back(&mut self) -> Option<Self::Item> {
834        // TODO: optimize out this copy
835        let bytes = match self.inner.as_mut().unwrap() {
836            ValueIterState::Subtree(iter) => match iter.next_back()? {
837                Ok(e) => e.key_data(),
838                Err(err) => {
839                    return Some(Err(err));
840                }
841            },
842            ValueIterState::InlineLeaf(iter) => iter.next_key_back()?.to_vec(),
843        };
844        Some(Ok(AccessGuard::with_owned_value(bytes)))
845    }
846}
847
848impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
849    fn drop(&mut self) {
850        // Drop our references to the pages that are about to be freed
851        drop(mem::take(&mut self.inner));
852        if !self.free_on_drop.is_empty() {
853            let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
854            for page in &self.free_on_drop {
855                if !self.mem.as_ref().unwrap().free_if_uncommitted(*page) {
856                    freed_pages.push(*page);
857                }
858            }
859        }
860    }
861}
862
863pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
864    inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
865    mem: Arc<TransactionalMemory>,
866    transaction_guard: Arc<TransactionGuard>,
867    _key_type: PhantomData<K>,
868    _value_type: PhantomData<V>,
869    _lifetime: PhantomData<&'a ()>,
870}
871
872impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
873    fn new(
874        inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
875        guard: Arc<TransactionGuard>,
876        mem: Arc<TransactionalMemory>,
877    ) -> Self {
878        Self {
879            inner,
880            mem,
881            transaction_guard: guard,
882            _key_type: Default::default(),
883            _value_type: Default::default(),
884            _lifetime: Default::default(),
885        }
886    }
887}
888
889impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
890    type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
891
892    fn next(&mut self) -> Option<Self::Item> {
893        match self.inner.next()? {
894            Ok(entry) => {
895                let key = AccessGuard::with_owned_value(entry.key_data());
896                let (page, _, value_range) = entry.into_raw();
897                let collection = AccessGuard::with_page(page, value_range);
898                Some(
899                    DynamicCollection::iter(
900                        collection,
901                        self.transaction_guard.clone(),
902                        self.mem.clone(),
903                    )
904                    .map(|iter| (key, iter)),
905                )
906            }
907            Err(err) => Some(Err(err)),
908        }
909    }
910}
911
912impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
913    fn next_back(&mut self) -> Option<Self::Item> {
914        match self.inner.next_back()? {
915            Ok(entry) => {
916                let key = AccessGuard::with_owned_value(entry.key_data());
917                let (page, _, value_range) = entry.into_raw();
918                let collection = AccessGuard::with_page(page, value_range);
919                Some(
920                    DynamicCollection::iter(
921                        collection,
922                        self.transaction_guard.clone(),
923                        self.mem.clone(),
924                    )
925                    .map(|iter| (key, iter)),
926                )
927            }
928            Err(err) => Some(Err(err)),
929        }
930    }
931}
932
933/// A multimap table
934///
935/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
936pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
937    name: String,
938    num_values: u64,
939    transaction: &'txn WriteTransaction,
940    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
941    tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
942    mem: Arc<TransactionalMemory>,
943    _value_type: PhantomData<V>,
944}
945
946impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
947    fn name(&self) -> &str {
948        &self.name
949    }
950}
951
952impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
953    pub(crate) fn new(
954        name: &str,
955        table_root: Option<BtreeHeader>,
956        num_values: u64,
957        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
958        mem: Arc<TransactionalMemory>,
959        transaction: &'txn WriteTransaction,
960    ) -> MultimapTable<'txn, K, V> {
961        MultimapTable {
962            name: name.to_string(),
963            num_values,
964            transaction,
965            freed_pages: freed_pages.clone(),
966            tree: BtreeMut::new(
967                table_root,
968                transaction.transaction_guard(),
969                mem.clone(),
970                freed_pages,
971            ),
972            mem,
973            _value_type: Default::default(),
974        }
975    }
976
977    #[allow(dead_code)]
978    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
979        self.tree.print_debug(include_values)
980    }
981
982    /// Add the given value to the mapping of the key
983    ///
984    /// Returns `true` if the key-value pair was present
985    pub fn insert<'k, 'v>(
986        &mut self,
987        key: impl Borrow<K::SelfType<'k>>,
988        value: impl Borrow<V::SelfType<'v>>,
989    ) -> Result<bool> {
990        let value_bytes = V::as_bytes(value.borrow());
991        let value_bytes_ref = value_bytes.as_ref();
992        if value_bytes_ref.len() > MAX_VALUE_LENGTH {
993            return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
994        }
995        let key_len = K::as_bytes(key.borrow()).as_ref().len();
996        if key_len > MAX_VALUE_LENGTH {
997            return Err(StorageError::ValueTooLarge(key_len));
998        }
999        if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1000            return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1001        }
1002        let get_result = self.tree.get(key.borrow())?;
1003        let existed = if get_result.is_some() {
1004            #[allow(clippy::unnecessary_unwrap)]
1005            let guard = get_result.unwrap();
1006            let collection_type = guard.value().collection_type();
1007            match collection_type {
1008                Inline => {
1009                    let leaf_data = guard.value().as_inline();
1010                    let accessor = LeafAccessor::new(
1011                        leaf_data,
1012                        V::fixed_width(),
1013                        <() as Value>::fixed_width(),
1014                    );
1015                    let (position, found) = accessor.position::<V>(value_bytes_ref);
1016                    if found {
1017                        return Ok(true);
1018                    }
1019
1020                    let num_pairs = accessor.num_pairs();
1021                    let new_pairs = num_pairs + 1;
1022                    let new_pair_bytes =
1023                        accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1024                    let new_key_bytes =
1025                        accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1026                    let required_inline_bytes = RawLeafBuilder::required_bytes(
1027                        new_pairs,
1028                        new_pair_bytes,
1029                        V::fixed_width(),
1030                        <() as Value>::fixed_width(),
1031                    );
1032
1033                    if required_inline_bytes < self.mem.get_page_size() / 2 {
1034                        let mut data = vec![0; required_inline_bytes];
1035                        let mut builder = RawLeafBuilder::new(
1036                            &mut data,
1037                            new_pairs,
1038                            V::fixed_width(),
1039                            <() as Value>::fixed_width(),
1040                            new_key_bytes,
1041                        );
1042                        for i in 0..accessor.num_pairs() {
1043                            if i == position {
1044                                builder
1045                                    .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1046                            }
1047                            let entry = accessor.entry(i).unwrap();
1048                            builder.append(entry.key(), entry.value());
1049                        }
1050                        if position == accessor.num_pairs() {
1051                            builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1052                        }
1053                        drop(builder);
1054                        drop(guard);
1055                        let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1056                        self.tree
1057                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1058                    } else {
1059                        // convert into a subtree
1060                        let mut page = self.mem.allocate(leaf_data.len())?;
1061                        page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1062                        let page_number = page.get_page_number();
1063                        drop(page);
1064                        drop(guard);
1065
1066                        // Don't bother computing the checksum, since we're about to modify the tree
1067                        let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1068                            Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1069                            self.transaction.transaction_guard(),
1070                            self.mem.clone(),
1071                            self.freed_pages.clone(),
1072                        );
1073                        let existed = subtree.insert(value.borrow(), &())?.is_some();
1074                        assert_eq!(existed, found);
1075                        let subtree_data =
1076                            DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1077                        self.tree
1078                            .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1079                    }
1080
1081                    found
1082                }
1083                SubtreeV2 => {
1084                    let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1085                        Some(guard.value().as_subtree()),
1086                        self.transaction.transaction_guard(),
1087                        self.mem.clone(),
1088                        self.freed_pages.clone(),
1089                    );
1090                    drop(guard);
1091                    let existed = subtree.insert(value.borrow(), &())?.is_some();
1092                    let subtree_data =
1093                        DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1094                    self.tree
1095                        .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1096
1097                    existed
1098                }
1099            }
1100        } else {
1101            drop(get_result);
1102            let required_inline_bytes = RawLeafBuilder::required_bytes(
1103                1,
1104                value_bytes_ref.len(),
1105                V::fixed_width(),
1106                <() as Value>::fixed_width(),
1107            );
1108            if required_inline_bytes < self.mem.get_page_size() / 2 {
1109                let mut data = vec![0; required_inline_bytes];
1110                let mut builder = RawLeafBuilder::new(
1111                    &mut data,
1112                    1,
1113                    V::fixed_width(),
1114                    <() as Value>::fixed_width(),
1115                    value_bytes_ref.len(),
1116                );
1117                builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1118                drop(builder);
1119                let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1120                self.tree
1121                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1122            } else {
1123                let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1124                    None,
1125                    self.transaction.transaction_guard(),
1126                    self.mem.clone(),
1127                    self.freed_pages.clone(),
1128                );
1129                subtree.insert(value.borrow(), &())?;
1130                let subtree_data =
1131                    DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1132                self.tree
1133                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1134            }
1135            false
1136        };
1137
1138        if !existed {
1139            self.num_values += 1;
1140        }
1141
1142        Ok(existed)
1143    }
1144
1145    /// Removes the given key-value pair
1146    ///
1147    /// Returns `true` if the key-value pair was present
1148    pub fn remove<'k, 'v>(
1149        &mut self,
1150        key: impl Borrow<K::SelfType<'k>>,
1151        value: impl Borrow<V::SelfType<'v>>,
1152    ) -> Result<bool> {
1153        let get_result = self.tree.get(key.borrow())?;
1154        if get_result.is_none() {
1155            return Ok(false);
1156        }
1157        let guard = get_result.unwrap();
1158        let v = guard.value();
1159        let existed = match v.collection_type() {
1160            Inline => {
1161                let leaf_data = v.as_inline();
1162                let accessor =
1163                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1164                if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1165                {
1166                    let old_num_pairs = accessor.num_pairs();
1167                    if old_num_pairs == 1 {
1168                        drop(guard);
1169                        self.tree.remove(key.borrow())?;
1170                    } else {
1171                        let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1172                        let removed_value_len = accessor.entry(position).unwrap().key().len();
1173                        let required = RawLeafBuilder::required_bytes(
1174                            old_num_pairs - 1,
1175                            old_pairs_len - removed_value_len,
1176                            V::fixed_width(),
1177                            <() as Value>::fixed_width(),
1178                        );
1179                        let mut new_data = vec![0; required];
1180                        let new_key_len =
1181                            accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1182                        let mut builder = RawLeafBuilder::new(
1183                            &mut new_data,
1184                            old_num_pairs - 1,
1185                            V::fixed_width(),
1186                            <() as Value>::fixed_width(),
1187                            new_key_len,
1188                        );
1189                        for i in 0..old_num_pairs {
1190                            if i != position {
1191                                let entry = accessor.entry(i).unwrap();
1192                                builder.append(entry.key(), entry.value());
1193                            }
1194                        }
1195                        drop(builder);
1196                        drop(guard);
1197
1198                        let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1199                        self.tree
1200                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1201                    }
1202                    true
1203                } else {
1204                    drop(guard);
1205                    false
1206                }
1207            }
1208            SubtreeV2 => {
1209                let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1210                    Some(v.as_subtree()),
1211                    self.transaction.transaction_guard(),
1212                    self.mem.clone(),
1213                    self.freed_pages.clone(),
1214                );
1215                drop(guard);
1216                let existed = subtree.remove(value.borrow())?.is_some();
1217
1218                if let Some(BtreeHeader {
1219                    root: new_root,
1220                    checksum: new_checksum,
1221                    length: new_length,
1222                }) = subtree.get_root()
1223                {
1224                    let page = self.mem.get_page(new_root)?;
1225                    match page.memory()[0] {
1226                        LEAF => {
1227                            let accessor = LeafAccessor::new(
1228                                page.memory(),
1229                                V::fixed_width(),
1230                                <() as Value>::fixed_width(),
1231                            );
1232                            let len = accessor.total_length();
1233                            if len < self.mem.get_page_size() / 2 {
1234                                let inline_data =
1235                                    DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1236                                self.tree
1237                                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1238                                drop(page);
1239                                if !self.mem.free_if_uncommitted(new_root) {
1240                                    (*self.freed_pages).lock().unwrap().push(new_root);
1241                                }
1242                            } else {
1243                                let subtree_data =
1244                                    DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1245                                        new_root,
1246                                        new_checksum,
1247                                        accessor.num_pairs() as u64,
1248                                    ));
1249                                self.tree
1250                                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1251                            }
1252                        }
1253                        BRANCH => {
1254                            let subtree_data = DynamicCollection::<V>::make_subtree_data(
1255                                BtreeHeader::new(new_root, new_checksum, new_length),
1256                            );
1257                            self.tree
1258                                .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1259                        }
1260                        _ => unreachable!(),
1261                    }
1262                } else {
1263                    self.tree.remove(key.borrow())?;
1264                }
1265
1266                existed
1267            }
1268        };
1269
1270        if existed {
1271            self.num_values -= 1;
1272        }
1273
1274        Ok(existed)
1275    }
1276
1277    /// Removes all values for the given key
1278    ///
1279    /// Returns an iterator over the removed values. Values are in ascending order.
1280    pub fn remove_all<'a>(
1281        &mut self,
1282        key: impl Borrow<K::SelfType<'a>>,
1283    ) -> Result<MultimapValue<V>> {
1284        let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1285            let mut pages = vec![];
1286            if matches!(
1287                collection.value().collection_type(),
1288                DynamicCollectionType::SubtreeV2
1289            ) {
1290                let root = collection.value().as_subtree().root;
1291                let all_pages = AllPageNumbersBtreeIter::new(
1292                    root,
1293                    V::fixed_width(),
1294                    <() as Value>::fixed_width(),
1295                    self.mem.clone(),
1296                )?;
1297                for page in all_pages {
1298                    pages.push(page?);
1299                }
1300            }
1301
1302            self.num_values -= collection.value().get_num_values();
1303
1304            DynamicCollection::iter_free_on_drop(
1305                collection,
1306                pages,
1307                self.freed_pages.clone(),
1308                self.transaction.transaction_guard(),
1309                self.mem.clone(),
1310            )?
1311        } else {
1312            MultimapValue::new_subtree(
1313                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1314                0,
1315                self.transaction.transaction_guard(),
1316            )
1317        };
1318
1319        Ok(iter)
1320    }
1321}
1322
1323impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1324    fn stats(&self) -> Result<TableStats> {
1325        let tree_stats = multimap_btree_stats(
1326            self.tree.get_root().map(|x| x.root),
1327            &self.mem,
1328            K::fixed_width(),
1329            V::fixed_width(),
1330        )?;
1331
1332        Ok(TableStats {
1333            tree_height: tree_stats.tree_height,
1334            leaf_pages: tree_stats.leaf_pages,
1335            branch_pages: tree_stats.branch_pages,
1336            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1337            metadata_bytes: tree_stats.metadata_bytes,
1338            fragmented_bytes: tree_stats.fragmented_bytes,
1339        })
1340    }
1341
1342    /// Returns the number of key-value pairs in the table
1343    fn len(&self) -> Result<u64> {
1344        Ok(self.num_values)
1345    }
1346}
1347
1348impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1349    /// Returns an iterator over all values for the given key. Values are in ascending order.
1350    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1351        let guard = self.transaction.transaction_guard();
1352        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1353            DynamicCollection::iter(collection, guard, self.mem.clone())?
1354        } else {
1355            MultimapValue::new_subtree(
1356                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1357                0,
1358                guard,
1359            )
1360        };
1361
1362        Ok(iter)
1363    }
1364
1365    /// Returns a double-ended iterator over a range of elements in the table
1366    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1367    where
1368        KR: Borrow<K::SelfType<'a>> + 'a,
1369    {
1370        let inner = self.tree.range(&range)?;
1371        Ok(MultimapRange::new(
1372            inner,
1373            self.transaction.transaction_guard(),
1374            self.mem.clone(),
1375        ))
1376    }
1377}
1378
1379impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1380
1381impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1382    fn drop(&mut self) {
1383        self.transaction
1384            .close_table(&self.name, &self.tree, self.num_values);
1385    }
1386}
1387
1388pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1389    /// Returns an iterator over all values for the given key. Values are in ascending order.
1390    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1391
1392    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1393    where
1394        KR: Borrow<K::SelfType<'a>> + 'a;
1395
1396    /// Returns an double-ended iterator over all elements in the table. Values are in ascending
1397    /// order.
1398    fn iter(&self) -> Result<MultimapRange<K, V>> {
1399        self.range::<K::SelfType<'_>>(..)
1400    }
1401}
1402
1403/// A read-only untyped multimap table
1404pub struct ReadOnlyUntypedMultimapTable {
1405    num_values: u64,
1406    tree: RawBtree,
1407    fixed_key_size: Option<usize>,
1408    fixed_value_size: Option<usize>,
1409    mem: Arc<TransactionalMemory>,
1410}
1411
1412impl Sealed for ReadOnlyUntypedMultimapTable {}
1413
1414impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1415    /// Retrieves information about storage usage for the table
1416    fn stats(&self) -> Result<TableStats> {
1417        let tree_stats = multimap_btree_stats(
1418            self.tree.get_root().map(|x| x.root),
1419            &self.mem,
1420            self.fixed_key_size,
1421            self.fixed_value_size,
1422        )?;
1423
1424        Ok(TableStats {
1425            tree_height: tree_stats.tree_height,
1426            leaf_pages: tree_stats.leaf_pages,
1427            branch_pages: tree_stats.branch_pages,
1428            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1429            metadata_bytes: tree_stats.metadata_bytes,
1430            fragmented_bytes: tree_stats.fragmented_bytes,
1431        })
1432    }
1433
1434    fn len(&self) -> Result<u64> {
1435        Ok(self.num_values)
1436    }
1437}
1438
1439impl ReadOnlyUntypedMultimapTable {
1440    pub(crate) fn new(
1441        root: Option<BtreeHeader>,
1442        num_values: u64,
1443        fixed_key_size: Option<usize>,
1444        fixed_value_size: Option<usize>,
1445        mem: Arc<TransactionalMemory>,
1446    ) -> Self {
1447        Self {
1448            num_values,
1449            tree: RawBtree::new(
1450                root,
1451                fixed_key_size,
1452                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1453                mem.clone(),
1454            ),
1455            fixed_key_size,
1456            fixed_value_size,
1457            mem,
1458        }
1459    }
1460}
1461
1462/// A read-only multimap table
1463pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1464    tree: Btree<K, &'static DynamicCollection<V>>,
1465    num_values: u64,
1466    mem: Arc<TransactionalMemory>,
1467    transaction_guard: Arc<TransactionGuard>,
1468    _value_type: PhantomData<V>,
1469}
1470
1471impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1472    pub(crate) fn new(
1473        root: Option<BtreeHeader>,
1474        num_values: u64,
1475        hint: PageHint,
1476        guard: Arc<TransactionGuard>,
1477        mem: Arc<TransactionalMemory>,
1478    ) -> Result<ReadOnlyMultimapTable<K, V>> {
1479        Ok(ReadOnlyMultimapTable {
1480            tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1481            num_values,
1482            mem,
1483            transaction_guard: guard,
1484            _value_type: Default::default(),
1485        })
1486    }
1487
1488    /// This method is like [`ReadableMultimapTable::get()`], but the iterator is reference counted and keeps the transaction
1489    /// alive until it is dropped.
1490    pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1491        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1492            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1493        } else {
1494            MultimapValue::new_subtree(
1495                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1496                0,
1497                self.transaction_guard.clone(),
1498            )
1499        };
1500
1501        Ok(iter)
1502    }
1503
1504    /// This method is like [`ReadableMultimapTable::range()`], but the iterator is reference counted and keeps the transaction
1505    /// alive until it is dropped.
1506    pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1507    where
1508        KR: Borrow<K::SelfType<'a>>,
1509    {
1510        let inner = self.tree.range(&range)?;
1511        Ok(MultimapRange::new(
1512            inner,
1513            self.transaction_guard.clone(),
1514            self.mem.clone(),
1515        ))
1516    }
1517}
1518
1519impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1520    fn stats(&self) -> Result<TableStats> {
1521        let tree_stats = multimap_btree_stats(
1522            self.tree.get_root().map(|x| x.root),
1523            &self.mem,
1524            K::fixed_width(),
1525            V::fixed_width(),
1526        )?;
1527
1528        Ok(TableStats {
1529            tree_height: tree_stats.tree_height,
1530            leaf_pages: tree_stats.leaf_pages,
1531            branch_pages: tree_stats.branch_pages,
1532            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1533            metadata_bytes: tree_stats.metadata_bytes,
1534            fragmented_bytes: tree_stats.fragmented_bytes,
1535        })
1536    }
1537
1538    fn len(&self) -> Result<u64> {
1539        Ok(self.num_values)
1540    }
1541}
1542
1543impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1544    for ReadOnlyMultimapTable<K, V>
1545{
1546    /// Returns an iterator over all values for the given key. Values are in ascending order.
1547    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1548        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1549            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1550        } else {
1551            MultimapValue::new_subtree(
1552                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1553                0,
1554                self.transaction_guard.clone(),
1555            )
1556        };
1557
1558        Ok(iter)
1559    }
1560
1561    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1562    where
1563        KR: Borrow<K::SelfType<'a>> + 'a,
1564    {
1565        let inner = self.tree.range(&range)?;
1566        Ok(MultimapRange::new(
1567            inner,
1568            self.transaction_guard.clone(),
1569            self.mem.clone(),
1570        ))
1571    }
1572}
1573
1574impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}