redb/tree_store/
table_tree.rs

1use crate::db::TransactionGuard;
2use crate::error::TableError;
3use crate::multimap_table::{
4    finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums,
5};
6use crate::tree_store::btree::{btree_stats, UntypedBtreeMut};
7use crate::tree_store::btree_base::BtreeHeader;
8use crate::tree_store::page_store::{new_allocators, BuddyAllocator};
9use crate::tree_store::{
10    Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath,
11    RawBtree, TableType, TransactionalMemory,
12};
13use crate::types::{Key, MutInPlaceValue, TypeName, Value};
14use crate::{DatabaseStats, Result};
15use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17use std::mem::size_of;
18use std::ops::RangeFull;
19use std::sync::{Arc, Mutex};
20
21#[derive(Debug)]
22pub(crate) struct FreedTableKey {
23    pub(crate) transaction_id: u64,
24    pub(crate) pagination_id: u64,
25}
26
27impl Value for FreedTableKey {
28    type SelfType<'a> = FreedTableKey
29    where
30        Self: 'a;
31    type AsBytes<'a> = [u8; 2 * size_of::<u64>()]
32    where
33        Self: 'a;
34
35    fn fixed_width() -> Option<usize> {
36        Some(2 * size_of::<u64>())
37    }
38
39    fn from_bytes<'a>(data: &'a [u8]) -> Self
40    where
41        Self: 'a,
42    {
43        let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
44        let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
45        Self {
46            transaction_id,
47            pagination_id,
48        }
49    }
50
51    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
52    where
53        Self: 'b,
54    {
55        let mut result = [0u8; 2 * size_of::<u64>()];
56        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
57        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
58        result
59    }
60
61    fn type_name() -> TypeName {
62        TypeName::internal("redb::FreedTableKey")
63    }
64}
65
66impl Key for FreedTableKey {
67    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
68        let value1 = Self::from_bytes(data1);
69        let value2 = Self::from_bytes(data2);
70
71        match value1.transaction_id.cmp(&value2.transaction_id) {
72            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
73            std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
74            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
75        }
76    }
77}
78
79// Format:
80// 2 bytes: length
81// length * size_of(PageNumber): array of page numbers
82#[derive(Debug)]
83pub(crate) struct FreedPageList<'a> {
84    data: &'a [u8],
85}
86
87impl<'a> FreedPageList<'a> {
88    pub(crate) fn required_bytes(len: usize) -> usize {
89        2 + PageNumber::serialized_size() * len
90    }
91
92    pub(crate) fn len(&self) -> usize {
93        u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
94    }
95
96    pub(crate) fn get(&self, index: usize) -> PageNumber {
97        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
98        PageNumber::from_le_bytes(
99            self.data[start..(start + PageNumber::serialized_size())]
100                .try_into()
101                .unwrap(),
102        )
103    }
104}
105
106#[derive(Debug)]
107#[repr(transparent)]
108pub(crate) struct FreedPageListMut {
109    data: [u8],
110}
111
112impl FreedPageListMut {
113    pub(crate) fn push_back(&mut self, value: PageNumber) {
114        let len = u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap());
115        self.data[..size_of::<u16>()].copy_from_slice(&(len + 1).to_le_bytes());
116        let len: usize = len.into();
117        let start = size_of::<u16>() + PageNumber::serialized_size() * len;
118        self.data[start..(start + PageNumber::serialized_size())]
119            .copy_from_slice(&value.to_le_bytes());
120    }
121
122    pub(crate) fn clear(&mut self) {
123        self.data[..size_of::<u16>()].fill(0);
124    }
125}
126
127impl Value for FreedPageList<'_> {
128    type SelfType<'a> = FreedPageList<'a>
129        where
130            Self: 'a;
131    type AsBytes<'a> = &'a [u8]
132        where
133            Self: 'a;
134
135    fn fixed_width() -> Option<usize> {
136        None
137    }
138
139    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
140    where
141        Self: 'a,
142    {
143        FreedPageList { data }
144    }
145
146    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
147    where
148        Self: 'b,
149    {
150        value.data
151    }
152
153    fn type_name() -> TypeName {
154        TypeName::internal("redb::FreedPageList")
155    }
156}
157
158impl MutInPlaceValue for FreedPageList<'_> {
159    type BaseRefType = FreedPageListMut;
160
161    fn initialize(data: &mut [u8]) {
162        assert!(data.len() >= 8);
163        // Set the length to zero
164        data[..8].fill(0);
165    }
166
167    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
168        unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut FreedPageListMut) }
169    }
170}
171
172pub struct TableNameIter {
173    inner: BtreeRangeIter<&'static str, InternalTableDefinition>,
174    table_type: TableType,
175}
176
177impl Iterator for TableNameIter {
178    type Item = Result<String>;
179
180    fn next(&mut self) -> Option<Self::Item> {
181        for entry in self.inner.by_ref() {
182            match entry {
183                Ok(entry) => {
184                    if entry.value().get_type() == self.table_type {
185                        return Some(Ok(entry.key().to_string()));
186                    }
187                }
188                Err(err) => {
189                    return Some(Err(err));
190                }
191            }
192        }
193        None
194    }
195}
196
197pub(crate) struct TableTree {
198    tree: Btree<&'static str, InternalTableDefinition>,
199}
200
201impl TableTree {
202    pub(crate) fn new(
203        master_root: Option<BtreeHeader>,
204        page_hint: PageHint,
205        guard: Arc<TransactionGuard>,
206        mem: Arc<TransactionalMemory>,
207    ) -> Result<Self> {
208        Ok(Self {
209            tree: Btree::new(master_root, page_hint, guard, mem)?,
210        })
211    }
212
213    pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
214        self.tree.transaction_guard()
215    }
216
217    // root_page: the root of the master table
218    pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
219        let iter = self.tree.range::<RangeFull, &str>(&(..))?;
220        let iter = TableNameIter {
221            inner: iter,
222            table_type,
223        };
224        let mut result = vec![];
225        for table in iter {
226            result.push(table?);
227        }
228        Ok(result)
229    }
230
231    pub(crate) fn get_table_untyped(
232        &self,
233        name: &str,
234        table_type: TableType,
235    ) -> Result<Option<InternalTableDefinition>, TableError> {
236        if let Some(guard) = self.tree.get(&name)? {
237            let definition = guard.value();
238            definition.check_match_untyped(table_type, name)?;
239            Ok(Some(definition))
240        } else {
241            Ok(None)
242        }
243    }
244
245    // root_page: the root of the master table
246    pub(crate) fn get_table<K: Key, V: Value>(
247        &self,
248        name: &str,
249        table_type: TableType,
250    ) -> Result<Option<InternalTableDefinition>, TableError> {
251        Ok(
252            if let Some(definition) = self.get_table_untyped(name, table_type)? {
253                // Do additional checks on the types to be sure they match
254                definition.check_match::<K, V>(table_type, name)?;
255                Some(definition)
256            } else {
257                None
258            },
259        )
260    }
261}
262
263pub(crate) struct TableTreeMut<'txn> {
264    tree: BtreeMut<'txn, &'static str, InternalTableDefinition>,
265    guard: Arc<TransactionGuard>,
266    mem: Arc<TransactionalMemory>,
267    // Cached updates from tables that have been closed. These must be flushed to the btree
268    pending_table_updates: HashMap<String, (Option<BtreeHeader>, u64)>,
269    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
270}
271
272impl<'txn> TableTreeMut<'txn> {
273    pub(crate) fn new(
274        master_root: Option<BtreeHeader>,
275        guard: Arc<TransactionGuard>,
276        mem: Arc<TransactionalMemory>,
277        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
278    ) -> Self {
279        Self {
280            tree: BtreeMut::new(master_root, guard.clone(), mem.clone(), freed_pages.clone()),
281            guard,
282            mem,
283            pending_table_updates: Default::default(),
284            freed_pages,
285        }
286    }
287
288    pub(crate) fn all_referenced_pages(&self) -> Result<Vec<BuddyAllocator>> {
289        let mut result = new_allocators(self.mem.get_layout());
290
291        self.visit_all_pages(|path| {
292            let page = path.page_number();
293            result[page.region as usize].record_alloc(page.page_index, page.page_order);
294            Ok(())
295        })?;
296
297        Ok(result)
298    }
299
300    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
301    where
302        F: FnMut(&PagePath) -> Result,
303    {
304        // All the pages in the table tree itself
305        self.tree.visit_all_pages(&mut visitor)?;
306
307        // All the normal tables
308        for entry in self.list_tables(TableType::Normal)? {
309            let definition = self
310                .get_table_untyped(&entry, TableType::Normal)
311                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
312                .unwrap();
313            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
314        }
315
316        for entry in self.list_tables(TableType::Multimap)? {
317            let definition = self
318                .get_table_untyped(&entry, TableType::Multimap)
319                .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
320                .unwrap();
321            definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
322        }
323
324        Ok(())
325    }
326
327    // Queues an update to the table root
328    pub(crate) fn stage_update_table_root(
329        &mut self,
330        name: &str,
331        table_root: Option<BtreeHeader>,
332        length: u64,
333    ) {
334        self.pending_table_updates
335            .insert(name.to_string(), (table_root, length));
336    }
337
338    pub(crate) fn clear_table_root_updates(&mut self) {
339        self.pending_table_updates.clear();
340    }
341
342    pub(crate) fn verify_checksums(&self) -> Result<bool> {
343        assert!(self.pending_table_updates.is_empty());
344        if !self.tree.verify_checksum()? {
345            return Ok(false);
346        }
347
348        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
349            let entry = entry?;
350            let definition = entry.value();
351            match definition {
352                InternalTableDefinition::Normal {
353                    table_root,
354                    fixed_key_size,
355                    fixed_value_size,
356                    ..
357                } => {
358                    if let Some(header) = table_root {
359                        if !RawBtree::new(
360                            Some(header),
361                            fixed_key_size,
362                            fixed_value_size,
363                            self.mem.clone(),
364                        )
365                        .verify_checksum()?
366                        {
367                            return Ok(false);
368                        }
369                    }
370                }
371                InternalTableDefinition::Multimap {
372                    table_root,
373                    fixed_key_size,
374                    fixed_value_size,
375                    ..
376                } => {
377                    if !verify_tree_and_subtree_checksums(
378                        table_root,
379                        fixed_key_size,
380                        fixed_value_size,
381                        self.mem.clone(),
382                    )? {
383                        return Ok(false);
384                    }
385                }
386            }
387        }
388
389        Ok(true)
390    }
391
392    pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> {
393        for (name, (new_root, new_length)) in self.pending_table_updates.drain() {
394            // Bypass .get_table() since the table types are dynamic
395            let mut definition = self.tree.get(&name.as_str())?.unwrap().value();
396            // No-op if the root has not changed
397            match definition {
398                InternalTableDefinition::Normal { table_root, .. }
399                | InternalTableDefinition::Multimap { table_root, .. } => {
400                    if table_root == new_root {
401                        continue;
402                    }
403                }
404            }
405            // Finalize any dirty checksums
406            match definition {
407                InternalTableDefinition::Normal {
408                    ref mut table_root,
409                    ref mut table_length,
410                    fixed_key_size,
411                    fixed_value_size,
412                    ..
413                } => {
414                    let mut tree = UntypedBtreeMut::new(
415                        new_root,
416                        self.mem.clone(),
417                        self.freed_pages.clone(),
418                        fixed_key_size,
419                        fixed_value_size,
420                    );
421                    *table_root = tree.finalize_dirty_checksums()?;
422                    *table_length = new_length;
423                }
424                InternalTableDefinition::Multimap {
425                    ref mut table_root,
426                    ref mut table_length,
427                    fixed_key_size,
428                    fixed_value_size,
429                    ..
430                } => {
431                    *table_root = finalize_tree_and_subtree_checksums(
432                        new_root,
433                        fixed_key_size,
434                        fixed_value_size,
435                        self.mem.clone(),
436                    )?;
437                    *table_length = new_length;
438                }
439            }
440            self.tree.insert(&name.as_str(), &definition)?;
441        }
442        Ok(self)
443    }
444
445    // Creates a new table, calls the provided closure to insert entries into it, and then
446    // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed
447    // that no pages will be allocated or freed after the closure returns
448    pub(crate) fn create_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
449        &mut self,
450        name: &str,
451        f: impl FnOnce(&mut BtreeMut<K, V>) -> Result,
452    ) -> Result {
453        assert!(self.pending_table_updates.is_empty());
454        assert!(self.tree.get(&name)?.is_none());
455
456        // Reserve space in the table tree
457        self.tree.insert(
458            &name,
459            &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
460        )?;
461
462        // Create an empty table and call the provided closure on it
463        let mut tree: BtreeMut<K, V> = BtreeMut::new(
464            None,
465            self.guard.clone(),
466            self.mem.clone(),
467            self.freed_pages.clone(),
468        );
469        f(&mut tree)?;
470
471        // Finalize the table's checksums
472        let table_root = tree.finalize_dirty_checksums()?;
473        let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
474
475        // Flush the root to the table tree, without allocating
476        self.tree.insert_inplace(
477            &name,
478            &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
479        )?;
480
481        Ok(())
482    }
483
484    pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
485        self.tree.finalize_dirty_checksums()
486    }
487
488    // root_page: the root of the master table
489    pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
490        let tree = TableTree::new(
491            self.tree.get_root(),
492            PageHint::None,
493            self.guard.clone(),
494            self.mem.clone(),
495        )?;
496        tree.list_tables(table_type)
497    }
498
499    pub(crate) fn get_table_untyped(
500        &self,
501        name: &str,
502        table_type: TableType,
503    ) -> Result<Option<InternalTableDefinition>, TableError> {
504        let tree = TableTree::new(
505            self.tree.get_root(),
506            PageHint::None,
507            self.guard.clone(),
508            self.mem.clone(),
509        )?;
510        let mut result = tree.get_table_untyped(name, table_type);
511
512        if let Ok(Some(definition)) = result.as_mut() {
513            if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
514                definition.set_header(*updated_root, *updated_length);
515            }
516        }
517
518        result
519    }
520
521    // root_page: the root of the master table
522    pub(crate) fn get_table<K: Key, V: Value>(
523        &self,
524        name: &str,
525        table_type: TableType,
526    ) -> Result<Option<InternalTableDefinition>, TableError> {
527        let tree = TableTree::new(
528            self.tree.get_root(),
529            PageHint::None,
530            self.guard.clone(),
531            self.mem.clone(),
532        )?;
533        let mut result = tree.get_table::<K, V>(name, table_type);
534
535        if let Ok(Some(definition)) = result.as_mut() {
536            if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
537                definition.set_header(*updated_root, *updated_length);
538            }
539        }
540
541        result
542    }
543
544    // root_page: the root of the master table
545    pub(crate) fn delete_table(
546        &mut self,
547        name: &str,
548        table_type: TableType,
549    ) -> Result<bool, TableError> {
550        if let Some(definition) = self.get_table_untyped(name, table_type)? {
551            let mut freed_pages = self.freed_pages.lock().unwrap();
552            definition.visit_all_pages(self.mem.clone(), |path| {
553                freed_pages.push(path.page_number());
554                Ok(())
555            })?;
556            drop(freed_pages);
557
558            self.pending_table_updates.remove(name);
559
560            let found = self.tree.remove(&name)?.is_some();
561            return Ok(found);
562        }
563
564        Ok(false)
565    }
566
567    pub(crate) fn get_or_create_table<K: Key, V: Value>(
568        &mut self,
569        name: &str,
570        table_type: TableType,
571    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
572        let table = if let Some(found) = self.get_table::<K, V>(name, table_type)? {
573            found
574        } else {
575            let table = InternalTableDefinition::new::<K, V>(table_type, None, 0);
576            self.tree.insert(&name, &table)?;
577            table
578        };
579
580        match table {
581            InternalTableDefinition::Normal {
582                table_root,
583                table_length,
584                ..
585            }
586            | InternalTableDefinition::Multimap {
587                table_root,
588                table_length,
589                ..
590            } => Ok((table_root, table_length)),
591        }
592    }
593
594    // Returns the paths to the n pages that are closest to the end of the database
595    // The return value is sorted, according to path.page_number()'s Ord
596    pub(crate) fn highest_index_pages(
597        &self,
598        n: usize,
599        output: &mut BTreeMap<PageNumber, PagePath>,
600    ) -> Result {
601        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
602            let entry = entry?;
603            let mut definition = entry.value();
604            if let Some((updated_root, updated_length)) =
605                self.pending_table_updates.get(entry.key())
606            {
607                definition.set_header(*updated_root, *updated_length);
608            }
609
610            definition.visit_all_pages(self.mem.clone(), |path| {
611                output.insert(path.page_number(), path.clone());
612                while output.len() > n {
613                    output.pop_first();
614                }
615                Ok(())
616            })?;
617        }
618
619        self.tree.visit_all_pages(|path| {
620            output.insert(path.page_number(), path.clone());
621            while output.len() > n {
622                output.pop_first();
623            }
624            Ok(())
625        })?;
626
627        Ok(())
628    }
629
630    pub(crate) fn relocate_tables(
631        &mut self,
632        relocation_map: &HashMap<PageNumber, PageNumber>,
633    ) -> Result {
634        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
635            let entry = entry?;
636            let mut definition = entry.value();
637            if let Some((updated_root, updated_length)) =
638                self.pending_table_updates.get(entry.key())
639            {
640                definition.set_header(*updated_root, *updated_length);
641            }
642
643            if let Some(new_root) = definition.relocate_tree(
644                self.mem.clone(),
645                self.freed_pages.clone(),
646                relocation_map,
647            )? {
648                self.pending_table_updates
649                    .insert(entry.key().to_string(), (new_root, definition.get_length()));
650            }
651        }
652
653        self.tree.relocate(relocation_map)?;
654
655        Ok(())
656    }
657
658    pub fn stats(&self) -> Result<DatabaseStats> {
659        let master_tree_stats = self.tree.stats()?;
660        let mut max_subtree_height = 0;
661        let mut total_stored_bytes = 0;
662        // Count the master tree leaf pages as branches, since they point to the data trees
663        let mut branch_pages = master_tree_stats.branch_pages + master_tree_stats.leaf_pages;
664        let mut leaf_pages = 0;
665        // Include the master table in the overhead
666        let mut total_metadata_bytes =
667            master_tree_stats.metadata_bytes + master_tree_stats.stored_leaf_bytes;
668        let mut total_fragmented = master_tree_stats.fragmented_bytes;
669
670        for entry in self.tree.range::<RangeFull, &str>(&(..))? {
671            let entry = entry?;
672            let mut definition = entry.value();
673            if let Some((updated_root, length)) = self.pending_table_updates.get(entry.key()) {
674                definition.set_header(*updated_root, *length);
675            }
676            match definition {
677                InternalTableDefinition::Normal {
678                    table_root,
679                    fixed_key_size,
680                    fixed_value_size,
681                    ..
682                } => {
683                    let subtree_stats = btree_stats(
684                        table_root.map(|x| x.root),
685                        &self.mem,
686                        fixed_key_size,
687                        fixed_value_size,
688                    )?;
689                    max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
690                    total_stored_bytes += subtree_stats.stored_leaf_bytes;
691                    total_metadata_bytes += subtree_stats.metadata_bytes;
692                    total_fragmented += subtree_stats.fragmented_bytes;
693                    branch_pages += subtree_stats.branch_pages;
694                    leaf_pages += subtree_stats.leaf_pages;
695                }
696                InternalTableDefinition::Multimap {
697                    table_root,
698                    fixed_key_size,
699                    fixed_value_size,
700                    ..
701                } => {
702                    let subtree_stats = multimap_btree_stats(
703                        table_root.map(|x| x.root),
704                        &self.mem,
705                        fixed_key_size,
706                        fixed_value_size,
707                    )?;
708                    max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
709                    total_stored_bytes += subtree_stats.stored_leaf_bytes;
710                    total_metadata_bytes += subtree_stats.metadata_bytes;
711                    total_fragmented += subtree_stats.fragmented_bytes;
712                    branch_pages += subtree_stats.branch_pages;
713                    leaf_pages += subtree_stats.leaf_pages;
714                }
715            }
716        }
717        Ok(DatabaseStats {
718            tree_height: master_tree_stats.tree_height + max_subtree_height,
719            allocated_pages: self.mem.count_allocated_pages()?,
720            leaf_pages,
721            branch_pages,
722            stored_leaf_bytes: total_stored_bytes,
723            metadata_bytes: total_metadata_bytes,
724            fragmented_bytes: total_fragmented,
725            page_size: self.mem.get_page_size(),
726        })
727    }
728}
729
730#[cfg(test)]
731mod test {
732    use crate::tree_store::table_tree_base::InternalTableDefinition;
733    use crate::types::TypeName;
734    use crate::Value;
735
736    #[test]
737    fn round_trip() {
738        let x = InternalTableDefinition::Multimap {
739            table_root: None,
740            table_length: 0,
741            fixed_key_size: None,
742            fixed_value_size: Some(5),
743            key_alignment: 6,
744            value_alignment: 7,
745            key_type: TypeName::new("test::Key"),
746            value_type: TypeName::new("test::Value"),
747        };
748        let y = InternalTableDefinition::from_bytes(InternalTableDefinition::as_bytes(&x).as_ref());
749        assert_eq!(x, y);
750    }
751}