redb/tree_store/
table_tree.rs

1use crate::db::TransactionGuard;
2use crate::error::TableError;
3use crate::multimap_table::{
4    finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums,
5};
6use crate::tree_store::btree::{UntypedBtreeMut, btree_stats};
7use crate::tree_store::btree_base::BtreeHeader;
8use crate::tree_store::{
9    Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath,
10    PageTrackerPolicy, RawBtree, TableType, TransactionalMemory,
11};
12use crate::types::{Key, MutInPlaceValue, TypeName, Value};
13use crate::{DatabaseStats, Result};
14use std::cmp::max;
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17use std::ops::RangeFull;
18use std::sync::{Arc, Mutex};
19use std::{mem, thread};
20
21// TODO: remove this struct in 3.0 release
22#[derive(Debug)]
23pub(crate) struct FreedTableKey {
24    pub(crate) transaction_id: u64,
25    pub(crate) pagination_id: u64,
26}
27
28impl Value for FreedTableKey {
29    type SelfType<'a>
30        = FreedTableKey
31    where
32        Self: 'a;
33    type AsBytes<'a>
34        = [u8; 2 * size_of::<u64>()]
35    where
36        Self: 'a;
37
38    fn fixed_width() -> Option<usize> {
39        Some(2 * size_of::<u64>())
40    }
41
42    fn from_bytes<'a>(data: &'a [u8]) -> Self
43    where
44        Self: 'a,
45    {
46        let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
47        let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
48        Self {
49            transaction_id,
50            pagination_id,
51        }
52    }
53
54    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
55    where
56        Self: 'b,
57    {
58        let mut result = [0u8; 2 * size_of::<u64>()];
59        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
60        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
61        result
62    }
63
64    fn type_name() -> TypeName {
65        TypeName::internal("redb::FreedTableKey")
66    }
67}
68
69impl Key for FreedTableKey {
70    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
71        let value1 = Self::from_bytes(data1);
72        let value2 = Self::from_bytes(data2);
73
74        match value1.transaction_id.cmp(&value2.transaction_id) {
75            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
76            std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
77            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
78        }
79    }
80}
81
82// Format:
83// 2 bytes: length
84// length * size_of(PageNumber): array of page numbers
85// TODO: remove this struct in 3.0 release
86#[derive(Debug)]
87pub(crate) struct FreedPageList<'a> {
88    data: &'a [u8],
89}
90
91impl FreedPageList<'_> {
92    pub(crate) fn required_bytes(len: usize) -> usize {
93        2 + PageNumber::serialized_size() * len
94    }
95
96    pub(crate) fn len(&self) -> usize {
97        u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
98    }
99
100    pub(crate) fn get(&self, index: usize) -> PageNumber {
101        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
102        PageNumber::from_le_bytes(
103            self.data[start..(start + PageNumber::serialized_size())]
104                .try_into()
105                .unwrap(),
106        )
107    }
108}
109
110#[derive(Debug)]
111#[repr(transparent)]
112pub(crate) struct PageListMut {
113    data: [u8],
114}
115
116impl PageListMut {
117    pub(crate) fn push_back(&mut self, value: PageNumber) {
118        let len = u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap());
119        self.data[..size_of::<u16>()].copy_from_slice(&(len + 1).to_le_bytes());
120        let len: usize = len.into();
121        let start = size_of::<u16>() + PageNumber::serialized_size() * len;
122        self.data[start..(start + PageNumber::serialized_size())]
123            .copy_from_slice(&value.to_le_bytes());
124    }
125
126    pub(crate) fn clear(&mut self) {
127        self.data[..size_of::<u16>()].fill(0);
128    }
129}
130
131impl Value for FreedPageList<'_> {
132    type SelfType<'a>
133        = FreedPageList<'a>
134    where
135        Self: 'a;
136    type AsBytes<'a>
137        = &'a [u8]
138    where
139        Self: 'a;
140
141    fn fixed_width() -> Option<usize> {
142        None
143    }
144
145    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
146    where
147        Self: 'a,
148    {
149        FreedPageList { data }
150    }
151
152    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
153    where
154        Self: 'b,
155    {
156        value.data
157    }
158
159    fn type_name() -> TypeName {
160        TypeName::internal("redb::FreedPageList")
161    }
162}
163
164impl MutInPlaceValue for FreedPageList<'_> {
165    type BaseRefType = PageListMut;
166
167    fn initialize(data: &mut [u8]) {
168        assert!(data.len() >= 8);
169        // Set the length to zero
170        data[..8].fill(0);
171    }
172
173    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
174        unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
175    }
176}
177
178pub struct TableNameIter {
179    inner: BtreeRangeIter<&'static str, InternalTableDefinition>,
180    table_type: TableType,
181}
182
183impl Iterator for TableNameIter {
184    type Item = Result<String>;
185
186    fn next(&mut self) -> Option<Self::Item> {
187        for entry in self.inner.by_ref() {
188            match entry {
189                Ok(entry) => {
190                    if entry.value().get_type() == self.table_type {
191                        return Some(Ok(entry.key().to_string()));
192                    }
193                }
194                Err(err) => {
195                    return Some(Err(err));
196                }
197            }
198        }
199        None
200    }
201}
202
203pub(crate) struct TableTree {
204    tree: Btree<&'static str, InternalTableDefinition>,
205    mem: Arc<TransactionalMemory>,
206}
207
208impl TableTree {
209    pub(crate) fn new(
210        master_root: Option<BtreeHeader>,
211        page_hint: PageHint,
212        guard: Arc<TransactionGuard>,
213        mem: Arc<TransactionalMemory>,
214    ) -> Result<Self> {
215        Ok(Self {
216            tree: Btree::new(master_root, page_hint, guard, mem.clone())?,
217            mem,
218        })
219    }
220
221    pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
222        self.tree.transaction_guard()
223    }
224
225    // root_page: the root of the master table
226    pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
227        let iter = self.tree.range::<RangeFull, &str>(&(..))?;
228        let iter = TableNameIter {
229            inner: iter,
230            table_type,
231        };
232        let mut result = vec![];
233        for table in iter {
234            result.push(table?);
235        }
236        Ok(result)
237    }
238
239    pub(crate) fn get_table_untyped(
240        &self,
241        name: &str,
242        table_type: TableType,
243    ) -> Result<Option<InternalTableDefinition>, TableError> {
244        if let Some(guard) = self.tree.get(&name)? {
245            let definition = guard.value();
246            definition.check_match_untyped(table_type, name)?;
247            Ok(Some(definition))
248        } else {
249            Ok(None)
250        }
251    }
252
253    // root_page: the root of the master table
254    pub(crate) fn get_table<K: Key, V: Value>(
255        &self,
256        name: &str,
257        table_type: TableType,
258    ) -> Result<Option<InternalTableDefinition>, TableError> {
259        Ok(
260            if let Some(definition) = self.get_table_untyped(name, table_type)? {
261                // Do additional checks on the types to be sure they match
262                definition.check_match::<K, V>(table_type, name)?;
263                Some(definition)
264            } else {
265                None
266            },
267        )
268    }
269
270    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
271    where
272        F: FnMut(&PagePath) -> Result,
273    {
274        // All the pages in the table tree itself
275        self.tree.visit_all_pages(&mut visitor)?;
276
277        // All the normal tables
278        for entry in self.list_tables(TableType::Normal)? {
279            let definition = self
280                .get_table_untyped(&entry, TableType::Normal)
281                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
282                .unwrap();
283            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
284        }
285
286        for entry in self.list_tables(TableType::Multimap)? {
287            let definition = self
288                .get_table_untyped(&entry, TableType::Multimap)
289                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
290                .unwrap();
291            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
292        }
293
294        Ok(())
295    }
296}
297
298pub(crate) struct TableTreeMut<'txn> {
299    tree: BtreeMut<'txn, &'static str, InternalTableDefinition>,
300    guard: Arc<TransactionGuard>,
301    mem: Arc<TransactionalMemory>,
302    // Cached updates from tables that have been closed. These must be flushed to the btree
303    pending_table_updates: HashMap<String, (Option<BtreeHeader>, u64)>,
304    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
305    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
306}
307
308impl TableTreeMut<'_> {
309    pub(crate) fn new(
310        master_root: Option<BtreeHeader>,
311        guard: Arc<TransactionGuard>,
312        mem: Arc<TransactionalMemory>,
313        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
314        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
315    ) -> Self {
316        Self {
317            tree: BtreeMut::new(
318                master_root,
319                guard.clone(),
320                mem.clone(),
321                freed_pages.clone(),
322                allocated_pages.clone(),
323            ),
324            guard,
325            mem,
326            pending_table_updates: Default::default(),
327            freed_pages,
328            allocated_pages,
329        }
330    }
331
332    pub(crate) fn set_root(&mut self, root: Option<BtreeHeader>) {
333        self.tree.set_root(root);
334    }
335
336    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
337    where
338        F: FnMut(&PagePath) -> Result,
339    {
340        // All the pages in the table tree itself
341        self.tree.visit_all_pages(&mut visitor)?;
342
343        // All the normal tables
344        for entry in self.list_tables(TableType::Normal)? {
345            let definition = self
346                .get_table_untyped(&entry, TableType::Normal)
347                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
348                .unwrap();
349            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
350        }
351
352        for entry in self.list_tables(TableType::Multimap)? {
353            let definition = self
354                .get_table_untyped(&entry, TableType::Multimap)
355                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
356                .unwrap();
357            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
358        }
359
360        Ok(())
361    }
362
363    // Queues an update to the table root
364    pub(crate) fn stage_update_table_root(
365        &mut self,
366        name: &str,
367        table_root: Option<BtreeHeader>,
368        length: u64,
369    ) {
370        self.pending_table_updates
371            .insert(name.to_string(), (table_root, length));
372    }
373
374    pub(crate) fn verify_checksums(&self) -> Result<bool> {
375        assert!(self.pending_table_updates.is_empty());
376        if !self.tree.verify_checksum()? {
377            return Ok(false);
378        }
379
380        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
381            let entry = entry?;
382            let definition = entry.value();
383            match definition {
384                InternalTableDefinition::Normal {
385                    table_root,
386                    fixed_key_size,
387                    fixed_value_size,
388                    ..
389                } => {
390                    if let Some(header) = table_root {
391                        if !RawBtree::new(
392                            Some(header),
393                            fixed_key_size,
394                            fixed_value_size,
395                            self.mem.clone(),
396                        )
397                        .verify_checksum()?
398                        {
399                            return Ok(false);
400                        }
401                    }
402                }
403                InternalTableDefinition::Multimap {
404                    table_root,
405                    fixed_key_size,
406                    fixed_value_size,
407                    ..
408                } => {
409                    if !verify_tree_and_subtree_checksums(
410                        table_root,
411                        fixed_key_size,
412                        fixed_value_size,
413                        self.mem.clone(),
414                    )? {
415                        return Ok(false);
416                    }
417                }
418            }
419        }
420
421        Ok(true)
422    }
423
424    pub(crate) fn clear_root_updates_and_close(&mut self) {
425        self.pending_table_updates.clear();
426        self.allocated_pages.lock().unwrap().close();
427    }
428
429    pub(crate) fn flush_and_close(
430        &mut self,
431    ) -> Result<(Option<BtreeHeader>, HashSet<PageNumber>, Vec<PageNumber>)> {
432        match self.flush_inner() {
433            Ok(header) => {
434                let allocated = self.allocated_pages.lock()?.close();
435                let mut old = vec![];
436                let mut freed_pages = self.freed_pages.lock()?;
437                mem::swap(freed_pages.as_mut(), &mut old);
438                Ok((header, allocated, old))
439            }
440            Err(err) => {
441                // Ensure that the allocated pages get clear. Otherwise it will cause a panic
442                // when they are dropped
443                self.allocated_pages.lock()?.close();
444                Err(err)
445            }
446        }
447    }
448
449    fn flush_inner(&mut self) -> Result<Option<BtreeHeader>> {
450        self.flush_table_root_updates()?.finalize_dirty_checksums()
451    }
452
453    pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> {
454        for (name, (new_root, new_length)) in self.pending_table_updates.drain() {
455            // Bypass .get_table() since the table types are dynamic
456            let mut definition = self.tree.get(&name.as_str())?.unwrap().value();
457            // No-op if the root has not changed
458            match definition {
459                InternalTableDefinition::Normal { table_root, .. }
460                | InternalTableDefinition::Multimap { table_root, .. } => {
461                    if table_root == new_root {
462                        continue;
463                    }
464                }
465            }
466            // Finalize any dirty checksums
467            match definition {
468                InternalTableDefinition::Normal {
469                    ref mut table_root,
470                    ref mut table_length,
471                    fixed_key_size,
472                    fixed_value_size,
473                    ..
474                } => {
475                    let mut tree = UntypedBtreeMut::new(
476                        new_root,
477                        self.mem.clone(),
478                        self.freed_pages.clone(),
479                        fixed_key_size,
480                        fixed_value_size,
481                    );
482                    *table_root = tree.finalize_dirty_checksums()?;
483                    *table_length = new_length;
484                }
485                InternalTableDefinition::Multimap {
486                    ref mut table_root,
487                    ref mut table_length,
488                    fixed_key_size,
489                    fixed_value_size,
490                    ..
491                } => {
492                    *table_root = finalize_tree_and_subtree_checksums(
493                        new_root,
494                        fixed_key_size,
495                        fixed_value_size,
496                        self.mem.clone(),
497                    )?;
498                    *table_length = new_length;
499                }
500            }
501            self.tree.insert(&name.as_str(), &definition)?;
502        }
503        Ok(self)
504    }
505
506    // Opens a table, calls the provided closure to insert entries into it, and then
507    // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed
508    // that no pages will be allocated or freed after the closure returns
509    pub(crate) fn open_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
510        &mut self,
511        name: &str,
512        f: impl FnOnce(&mut BtreeMut<K, V>) -> Result,
513    ) -> Result {
514        assert!(self.pending_table_updates.is_empty());
515
516        // Reserve space in the table tree
517        // TODO: maybe we should have a more explicit method, like "force_uncommitted()"
518        let existing = self
519            .tree
520            .insert(
521                &name,
522                &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
523            )?
524            .map(|x| x.value());
525        if let Some(existing) = existing {
526            self.tree.insert(&name, &existing)?;
527        }
528
529        let table_root = match self.tree.get(&name)?.unwrap().value() {
530            InternalTableDefinition::Normal { table_root, .. } => table_root,
531            InternalTableDefinition::Multimap { .. } => {
532                unreachable!()
533            }
534        };
535
536        // Open the table and call the provided closure on it
537        let mut tree: BtreeMut<K, V> = BtreeMut::new(
538            table_root,
539            self.guard.clone(),
540            self.mem.clone(),
541            self.freed_pages.clone(),
542            self.allocated_pages.clone(),
543        );
544        f(&mut tree)?;
545
546        // Finalize the table's checksums
547        let table_root = tree.finalize_dirty_checksums()?;
548        let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
549
550        // Flush the root to the table tree, without allocating
551        self.tree.insert_inplace(
552            &name,
553            &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
554        )?;
555
556        Ok(())
557    }
558
559    // Creates a new table, calls the provided closure to insert entries into it, and then
560    // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed
561    // that no pages will be allocated or freed after the closure returns
562    pub(crate) fn create_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
563        &mut self,
564        name: &str,
565        f: impl FnOnce(&mut Self, &mut BtreeMut<K, V>) -> Result,
566    ) -> Result {
567        assert!(self.pending_table_updates.is_empty());
568        assert!(self.tree.get(&name)?.is_none());
569
570        // Reserve space in the table tree
571        self.tree.insert(
572            &name,
573            &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
574        )?;
575
576        // Create an empty table and call the provided closure on it
577        let mut tree: BtreeMut<K, V> = BtreeMut::new(
578            None,
579            self.guard.clone(),
580            self.mem.clone(),
581            self.freed_pages.clone(),
582            self.allocated_pages.clone(),
583        );
584        f(self, &mut tree)?;
585
586        // Finalize the table's checksums
587        let table_root = tree.finalize_dirty_checksums()?;
588        let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
589
590        // Flush the root to the table tree, without allocating
591        self.tree.insert_inplace(
592            &name,
593            &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
594        )?;
595
596        Ok(())
597    }
598
599    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
600        self.tree.finalize_dirty_checksums()
601    }
602
603    // root_page: the root of the master table
604    pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
605        let tree = TableTree::new(
606            self.tree.get_root(),
607            PageHint::None,
608            self.guard.clone(),
609            self.mem.clone(),
610        )?;
611        tree.list_tables(table_type)
612    }
613
614    pub(crate) fn get_table_untyped(
615        &self,
616        name: &str,
617        table_type: TableType,
618    ) -> Result<Option<InternalTableDefinition>, TableError> {
619        let tree = TableTree::new(
620            self.tree.get_root(),
621            PageHint::None,
622            self.guard.clone(),
623            self.mem.clone(),
624        )?;
625        let mut result = tree.get_table_untyped(name, table_type);
626
627        if let Ok(Some(definition)) = result.as_mut() {
628            if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
629                definition.set_header(*updated_root, *updated_length);
630            }
631        }
632
633        result
634    }
635
636    // root_page: the root of the master table
637    pub(crate) fn get_table<K: Key, V: Value>(
638        &self,
639        name: &str,
640        table_type: TableType,
641    ) -> Result<Option<InternalTableDefinition>, TableError> {
642        let tree = TableTree::new(
643            self.tree.get_root(),
644            PageHint::None,
645            self.guard.clone(),
646            self.mem.clone(),
647        )?;
648        let mut result = tree.get_table::<K, V>(name, table_type);
649
650        if let Ok(Some(definition)) = result.as_mut() {
651            if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
652                definition.set_header(*updated_root, *updated_length);
653            }
654        }
655
656        result
657    }
658
659    pub(crate) fn rename_table(
660        &mut self,
661        name: &str,
662        new_name: &str,
663        table_type: TableType,
664    ) -> Result<(), TableError> {
665        if let Some(definition) = self.get_table_untyped(name, table_type)? {
666            if self.get_table_untyped(new_name, table_type)?.is_some() {
667                return Err(TableError::TableExists(new_name.to_string()));
668            }
669            if let Some(update) = self.pending_table_updates.remove(name) {
670                self.pending_table_updates
671                    .insert(new_name.to_string(), update);
672            }
673            assert!(self.tree.remove(&name)?.is_some());
674            assert!(self.tree.insert(&new_name, &definition)?.is_none());
675        } else {
676            return Err(TableError::TableDoesNotExist(name.to_string()));
677        }
678
679        Ok(())
680    }
681
682    pub(crate) fn delete_table(
683        &mut self,
684        name: &str,
685        table_type: TableType,
686    ) -> Result<bool, TableError> {
687        if let Some(definition) = self.get_table_untyped(name, table_type)? {
688            let mut freed_pages = self.freed_pages.lock().unwrap();
689            definition.visit_all_pages(self.mem.clone(), |path| {
690                freed_pages.push(path.page_number());
691                Ok(())
692            })?;
693            drop(freed_pages);
694
695            self.pending_table_updates.remove(name);
696
697            let found = self.tree.remove(&name)?.is_some();
698            return Ok(found);
699        }
700
701        Ok(false)
702    }
703
704    pub(crate) fn get_or_create_table<K: Key, V: Value>(
705        &mut self,
706        name: &str,
707        table_type: TableType,
708    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
709        let table = if let Some(found) = self.get_table::<K, V>(name, table_type)? {
710            found
711        } else {
712            let table = InternalTableDefinition::new::<K, V>(table_type, None, 0);
713            self.tree.insert(&name, &table)?;
714            table
715        };
716
717        match table {
718            InternalTableDefinition::Normal {
719                table_root,
720                table_length,
721                ..
722            }
723            | InternalTableDefinition::Multimap {
724                table_root,
725                table_length,
726                ..
727            } => Ok((table_root, table_length)),
728        }
729    }
730
731    // Returns the paths to the n pages that are closest to the end of the database
732    // The return value is sorted, according to path.page_number()'s Ord
733    pub(crate) fn highest_index_pages(
734        &self,
735        n: usize,
736        output: &mut BTreeMap<PageNumber, PagePath>,
737    ) -> Result {
738        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
739            let entry = entry?;
740            let mut definition = entry.value();
741            if let Some((updated_root, updated_length)) =
742                self.pending_table_updates.get(entry.key())
743            {
744                definition.set_header(*updated_root, *updated_length);
745            }
746
747            definition.visit_all_pages(self.mem.clone(), |path| {
748                output.insert(path.page_number(), path.clone());
749                while output.len() > n {
750                    output.pop_first();
751                }
752                Ok(())
753            })?;
754        }
755
756        self.tree.visit_all_pages(|path| {
757            output.insert(path.page_number(), path.clone());
758            while output.len() > n {
759                output.pop_first();
760            }
761            Ok(())
762        })?;
763
764        Ok(())
765    }
766
767    pub(crate) fn relocate_tables(
768        &mut self,
769        relocation_map: &HashMap<PageNumber, PageNumber>,
770    ) -> Result {
771        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
772            let entry = entry?;
773            let mut definition = entry.value();
774            if let Some((updated_root, updated_length)) =
775                self.pending_table_updates.get(entry.key())
776            {
777                definition.set_header(*updated_root, *updated_length);
778            }
779
780            if let Some(new_root) = definition.relocate_tree(
781                self.mem.clone(),
782                self.freed_pages.clone(),
783                relocation_map,
784            )? {
785                self.pending_table_updates
786                    .insert(entry.key().to_string(), (new_root, definition.get_length()));
787            }
788        }
789
790        self.tree.relocate(relocation_map)?;
791
792        Ok(())
793    }
794
795    pub fn stats(&self) -> Result<DatabaseStats> {
796        let master_tree_stats = self.tree.stats()?;
797        let mut max_subtree_height = 0;
798        let mut total_stored_bytes = 0;
799        // Count the master tree leaf pages as branches, since they point to the data trees
800        let mut branch_pages = master_tree_stats.branch_pages + master_tree_stats.leaf_pages;
801        let mut leaf_pages = 0;
802        // Include the master table in the overhead
803        let mut total_metadata_bytes =
804            master_tree_stats.metadata_bytes + master_tree_stats.stored_leaf_bytes;
805        let mut total_fragmented = master_tree_stats.fragmented_bytes;
806
807        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
808            let entry = entry?;
809            let mut definition = entry.value();
810            if let Some((updated_root, length)) = self.pending_table_updates.get(entry.key()) {
811                definition.set_header(*updated_root, *length);
812            }
813            match definition {
814                InternalTableDefinition::Normal {
815                    table_root,
816                    fixed_key_size,
817                    fixed_value_size,
818                    ..
819                } => {
820                    let subtree_stats = btree_stats(
821                        table_root.map(|x| x.root),
822                        &self.mem,
823                        fixed_key_size,
824                        fixed_value_size,
825                    )?;
826                    max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
827                    total_stored_bytes += subtree_stats.stored_leaf_bytes;
828                    total_metadata_bytes += subtree_stats.metadata_bytes;
829                    total_fragmented += subtree_stats.fragmented_bytes;
830                    branch_pages += subtree_stats.branch_pages;
831                    leaf_pages += subtree_stats.leaf_pages;
832                }
833                InternalTableDefinition::Multimap {
834                    table_root,
835                    fixed_key_size,
836                    fixed_value_size,
837                    ..
838                } => {
839                    let subtree_stats = multimap_btree_stats(
840                        table_root.map(|x| x.root),
841                        &self.mem,
842                        fixed_key_size,
843                        fixed_value_size,
844                    )?;
845                    max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
846                    total_stored_bytes += subtree_stats.stored_leaf_bytes;
847                    total_metadata_bytes += subtree_stats.metadata_bytes;
848                    total_fragmented += subtree_stats.fragmented_bytes;
849                    branch_pages += subtree_stats.branch_pages;
850                    leaf_pages += subtree_stats.leaf_pages;
851                }
852            }
853        }
854        Ok(DatabaseStats {
855            tree_height: master_tree_stats.tree_height + max_subtree_height,
856            allocated_pages: self.mem.count_allocated_pages()?,
857            leaf_pages,
858            branch_pages,
859            stored_leaf_bytes: total_stored_bytes,
860            metadata_bytes: total_metadata_bytes,
861            fragmented_bytes: total_fragmented,
862            page_size: self.mem.get_page_size(),
863        })
864    }
865}
866
867impl Drop for TableTreeMut<'_> {
868    fn drop(&mut self) {
869        if thread::panicking() {
870            return;
871        }
872        assert!(self.allocated_pages.lock().unwrap().is_empty());
873    }
874}
875
876#[cfg(test)]
877mod test {
878    use crate::Value;
879    use crate::tree_store::table_tree_base::InternalTableDefinition;
880    use crate::types::TypeName;
881
882    #[test]
883    fn round_trip() {
884        let x = InternalTableDefinition::Multimap {
885            table_root: None,
886            table_length: 0,
887            fixed_key_size: None,
888            fixed_value_size: Some(5),
889            key_alignment: 6,
890            value_alignment: 7,
891            key_type: TypeName::new("test::Key"),
892            value_type: TypeName::new("test::Value"),
893        };
894        let y = InternalTableDefinition::from_bytes(InternalTableDefinition::as_bytes(&x).as_ref());
895        assert_eq!(x, y);
896    }
897}