redb/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9    InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber,
10    SerializedSavepoint, TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14    AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15    ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16    TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17    UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28use std::ops::RangeFull;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35    SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37    SystemTableDefinition::new("persistent_savepoints");
38// The allocator state table is stored in the system table tree, but it's accessed using
39// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
40pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
41pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
42
43#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
44pub(crate) enum AllocatorStateKey {
45    Region(u32),
46    RegionTracker,
47    TransactionId,
48}
49
50impl Value for AllocatorStateKey {
51    type SelfType<'a> = Self;
52    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
53
54    fn fixed_width() -> Option<usize> {
55        Some(1 + size_of::<u32>())
56    }
57
58    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
59    where
60        Self: 'a,
61    {
62        match data[0] {
63            0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
64            1 => Self::RegionTracker,
65            2 => Self::TransactionId,
66            _ => unreachable!(),
67        }
68    }
69
70    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
71    where
72        Self: 'a,
73        Self: 'b,
74    {
75        let mut result = Self::AsBytes::default();
76        match value {
77            Self::Region(region) => {
78                result[0] = 0;
79                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
80            }
81            Self::RegionTracker => {
82                result[0] = 1;
83            }
84            Self::TransactionId => {
85                result[0] = 2;
86            }
87        }
88
89        result
90    }
91
92    fn type_name() -> TypeName {
93        TypeName::internal("redb::AllocatorStateKey")
94    }
95}
96
97impl Key for AllocatorStateKey {
98    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
99        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
100    }
101}
102
103pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
104    name: &'a str,
105    _key_type: PhantomData<K>,
106    _value_type: PhantomData<V>,
107}
108
109impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
110    pub const fn new(name: &'a str) -> Self {
111        assert!(!name.is_empty());
112        Self {
113            name,
114            _key_type: PhantomData,
115            _value_type: PhantomData,
116        }
117    }
118}
119
120impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
121    fn name(&self) -> &str {
122        self.name
123    }
124}
125
126impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
127
128impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
129    fn clone(&self) -> Self {
130        *self
131    }
132}
133
134impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
135
136impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        write!(
139            f,
140            "{}<{}, {}>",
141            self.name,
142            K::type_name().name(),
143            V::type_name().name()
144        )
145    }
146}
147
148/// Informational storage stats about the database
149#[derive(Debug)]
150pub struct DatabaseStats {
151    pub(crate) tree_height: u32,
152    pub(crate) allocated_pages: u64,
153    pub(crate) leaf_pages: u64,
154    pub(crate) branch_pages: u64,
155    pub(crate) stored_leaf_bytes: u64,
156    pub(crate) metadata_bytes: u64,
157    pub(crate) fragmented_bytes: u64,
158    pub(crate) page_size: usize,
159}
160
161impl DatabaseStats {
162    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
163    pub fn tree_height(&self) -> u32 {
164        self.tree_height
165    }
166
167    /// Number of pages allocated
168    pub fn allocated_pages(&self) -> u64 {
169        self.allocated_pages
170    }
171
172    /// Number of leaf pages that store user data
173    pub fn leaf_pages(&self) -> u64 {
174        self.leaf_pages
175    }
176
177    /// Number of branch pages in btrees that store user data
178    pub fn branch_pages(&self) -> u64 {
179        self.branch_pages
180    }
181
182    /// Number of bytes consumed by keys and values that have been inserted.
183    /// Does not include indexing overhead
184    pub fn stored_bytes(&self) -> u64 {
185        self.stored_leaf_bytes
186    }
187
188    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
189    pub fn metadata_bytes(&self) -> u64 {
190        self.metadata_bytes
191    }
192
193    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
194    pub fn fragmented_bytes(&self) -> u64 {
195        self.fragmented_bytes
196    }
197
198    /// Number of bytes per page
199    pub fn page_size(&self) -> usize {
200        self.page_size
201    }
202}
203
204#[derive(Copy, Clone, Debug)]
205#[non_exhaustive]
206pub enum Durability {
207    /// Commits with this durability level will not be persisted to disk unless followed by a
208    /// commit with a higher durability level.
209    ///
210    /// Note: Pages are only freed during commits with higher durability levels. Exclusively using
211    /// this durability level will result in rapid growth of the database file.
212    None,
213    /// Commits with this durability level have been queued for persitance to disk, and should be
214    /// persistent some time after [`WriteTransaction::commit`] returns.
215    Eventual,
216    /// Commits with this durability level are guaranteed to be persistent as soon as
217    /// [`WriteTransaction::commit`] returns.
218    Immediate,
219    /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code
220    /// should call `set_two_phase_commit(true)` directly instead.
221    #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
222    Paranoid,
223}
224
225// These are the actual durability levels used internally. `Durability::Paranoid` is translated
226// to `InternalDurability::Immediate`, and also enables 2-phase commit
227#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228enum InternalDurability {
229    None,
230    Eventual,
231    Immediate,
232}
233
234// Like a Table but only one may be open at a time to avoid possible races
235pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
236    name: String,
237    namespace: &'s mut SystemNamespace<'db>,
238    tree: BtreeMut<'s, K, V>,
239    transaction_guard: Arc<TransactionGuard>,
240}
241
242impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
243    fn new(
244        name: &str,
245        table_root: Option<BtreeHeader>,
246        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
247        guard: Arc<TransactionGuard>,
248        mem: Arc<TransactionalMemory>,
249        namespace: &'s mut SystemNamespace<'db>,
250    ) -> SystemTable<'db, 's, K, V> {
251        SystemTable {
252            name: name.to_string(),
253            namespace,
254            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
255            transaction_guard: guard,
256        }
257    }
258
259    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
260    where
261        K: 'a,
262    {
263        self.tree.get(key.borrow())
264    }
265
266    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
267    where
268        K: 'a,
269        KR: Borrow<K::SelfType<'a>> + 'a,
270    {
271        self.tree
272            .range(&range)
273            .map(|x| Range::new(x, self.transaction_guard.clone()))
274    }
275
276    pub fn insert<'k, 'v>(
277        &mut self,
278        key: impl Borrow<K::SelfType<'k>>,
279        value: impl Borrow<V::SelfType<'v>>,
280    ) -> Result<Option<AccessGuard<V>>> {
281        let value_len = V::as_bytes(value.borrow()).as_ref().len();
282        if value_len > MAX_VALUE_LENGTH {
283            return Err(StorageError::ValueTooLarge(value_len));
284        }
285        let key_len = K::as_bytes(key.borrow()).as_ref().len();
286        if key_len > MAX_VALUE_LENGTH {
287            return Err(StorageError::ValueTooLarge(key_len));
288        }
289        if value_len + key_len > MAX_PAIR_LENGTH {
290            return Err(StorageError::ValueTooLarge(value_len + key_len));
291        }
292        self.tree.insert(key.borrow(), value.borrow())
293    }
294
295    pub fn remove<'a>(
296        &mut self,
297        key: impl Borrow<K::SelfType<'a>>,
298    ) -> Result<Option<AccessGuard<V>>>
299    where
300        K: 'a,
301    {
302        self.tree.remove(key.borrow())
303    }
304}
305
306impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
307    fn drop(&mut self) {
308        self.namespace.close_table(
309            &self.name,
310            &self.tree,
311            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
312        );
313    }
314}
315
316struct SystemNamespace<'db> {
317    table_tree: TableTreeMut<'db>,
318    transaction_guard: Arc<TransactionGuard>,
319}
320
321impl<'db> SystemNamespace<'db> {
322    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
323        &'s mut self,
324        transaction: &'txn WriteTransaction,
325        definition: SystemTableDefinition<K, V>,
326    ) -> Result<SystemTable<'db, 's, K, V>> {
327        #[cfg(feature = "logging")]
328        debug!("Opening system table: {}", definition);
329        let (root, _) = self
330            .table_tree
331            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
332            .map_err(|e| {
333                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
334            })?;
335        transaction.dirty.store(true, Ordering::Release);
336
337        Ok(SystemTable::new(
338            definition.name(),
339            root,
340            transaction.freed_pages.clone(),
341            self.transaction_guard.clone(),
342            transaction.mem.clone(),
343            self,
344        ))
345    }
346
347    fn close_table<K: Key + 'static, V: Value + 'static>(
348        &mut self,
349        name: &str,
350        table: &BtreeMut<K, V>,
351        length: u64,
352    ) {
353        self.table_tree
354            .stage_update_table_root(name, table.get_root(), length);
355    }
356}
357
358struct TableNamespace<'db> {
359    open_tables: HashMap<String, &'static panic::Location<'static>>,
360    table_tree: TableTreeMut<'db>,
361}
362
363impl TableNamespace<'_> {
364    #[track_caller]
365    fn inner_open<K: Key + 'static, V: Value + 'static>(
366        &mut self,
367        name: &str,
368        table_type: TableType,
369    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
370        if let Some(location) = self.open_tables.get(name) {
371            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372        }
373
374        let root = self
375            .table_tree
376            .get_or_create_table::<K, V>(name, table_type)?;
377        self.open_tables
378            .insert(name.to_string(), panic::Location::caller());
379
380        Ok(root)
381    }
382
383    #[track_caller]
384    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
385        &mut self,
386        transaction: &'txn WriteTransaction,
387        definition: MultimapTableDefinition<K, V>,
388    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
389        #[cfg(feature = "logging")]
390        debug!("Opening multimap table: {}", definition);
391        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
392        transaction.dirty.store(true, Ordering::Release);
393
394        Ok(MultimapTable::new(
395            definition.name(),
396            root,
397            length,
398            transaction.freed_pages.clone(),
399            transaction.mem.clone(),
400            transaction,
401        ))
402    }
403
404    #[track_caller]
405    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
406        &mut self,
407        transaction: &'txn WriteTransaction,
408        definition: TableDefinition<K, V>,
409    ) -> Result<Table<'txn, K, V>, TableError> {
410        #[cfg(feature = "logging")]
411        debug!("Opening table: {}", definition);
412        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
413        transaction.dirty.store(true, Ordering::Release);
414
415        Ok(Table::new(
416            definition.name(),
417            root,
418            transaction.freed_pages.clone(),
419            transaction.mem.clone(),
420            transaction,
421        ))
422    }
423
424    #[track_caller]
425    fn inner_rename(
426        &mut self,
427        name: &str,
428        new_name: &str,
429        table_type: TableType,
430    ) -> Result<(), TableError> {
431        if let Some(location) = self.open_tables.get(name) {
432            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
433        }
434
435        self.table_tree.rename_table(name, new_name, table_type)
436    }
437
438    #[track_caller]
439    fn rename_table(
440        &mut self,
441        transaction: &WriteTransaction,
442        name: &str,
443        new_name: &str,
444    ) -> Result<(), TableError> {
445        #[cfg(feature = "logging")]
446        debug!("Renaming table: {} to {}", name, new_name);
447        transaction.dirty.store(true, Ordering::Release);
448        self.inner_rename(name, new_name, TableType::Normal)
449    }
450
451    #[track_caller]
452    fn rename_multimap_table(
453        &mut self,
454        transaction: &WriteTransaction,
455        name: &str,
456        new_name: &str,
457    ) -> Result<(), TableError> {
458        #[cfg(feature = "logging")]
459        debug!("Renaming multimap table: {} to {}", name, new_name);
460        transaction.dirty.store(true, Ordering::Release);
461        self.inner_rename(name, new_name, TableType::Multimap)
462    }
463
464    #[track_caller]
465    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
466        if let Some(location) = self.open_tables.get(name) {
467            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
468        }
469
470        self.table_tree.delete_table(name, table_type)
471    }
472
473    #[track_caller]
474    fn delete_table(
475        &mut self,
476        transaction: &WriteTransaction,
477        name: &str,
478    ) -> Result<bool, TableError> {
479        #[cfg(feature = "logging")]
480        debug!("Deleting table: {}", name);
481        transaction.dirty.store(true, Ordering::Release);
482        self.inner_delete(name, TableType::Normal)
483    }
484
485    #[track_caller]
486    fn delete_multimap_table(
487        &mut self,
488        transaction: &WriteTransaction,
489        name: &str,
490    ) -> Result<bool, TableError> {
491        #[cfg(feature = "logging")]
492        debug!("Deleting multimap table: {}", name);
493        transaction.dirty.store(true, Ordering::Release);
494        self.inner_delete(name, TableType::Multimap)
495    }
496
497    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
498        &mut self,
499        name: &str,
500        table: &BtreeMut<K, V>,
501        length: u64,
502    ) {
503        self.open_tables.remove(name).unwrap();
504        self.table_tree
505            .stage_update_table_root(name, table.get_root(), length);
506    }
507}
508
509/// A read/write transaction
510///
511/// Only a single [`WriteTransaction`] may exist at a time
512pub struct WriteTransaction {
513    transaction_tracker: Arc<TransactionTracker>,
514    mem: Arc<TransactionalMemory>,
515    transaction_guard: Arc<TransactionGuard>,
516    transaction_id: TransactionId,
517    // The table of freed pages by transaction. FreedTableKey -> binary.
518    // The binary blob is a length-prefixed array of PageNumber
519    freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
520    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
521    // Pages that were freed from the freed-tree. These can be freed immediately after commit(),
522    // since read transactions do not access the freed-tree
523    post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
524    tables: Mutex<TableNamespace<'static>>,
525    system_tables: Mutex<SystemNamespace<'static>>,
526    completed: bool,
527    dirty: AtomicBool,
528    durability: InternalDurability,
529    two_phase_commit: bool,
530    quick_repair: bool,
531    // Persistent savepoints created during this transaction
532    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
533    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
534}
535
536impl WriteTransaction {
537    pub(crate) fn new(
538        guard: TransactionGuard,
539        transaction_tracker: Arc<TransactionTracker>,
540        mem: Arc<TransactionalMemory>,
541    ) -> Result<Self> {
542        let transaction_id = guard.id();
543        let guard = Arc::new(guard);
544
545        let root_page = mem.get_data_root();
546        let system_page = mem.get_system_root();
547        let freed_root = mem.get_freed_root();
548        let freed_pages = Arc::new(Mutex::new(vec![]));
549        let post_commit_frees = Arc::new(Mutex::new(vec![]));
550
551        let tables = TableNamespace {
552            open_tables: Default::default(),
553            table_tree: TableTreeMut::new(
554                root_page,
555                guard.clone(),
556                mem.clone(),
557                freed_pages.clone(),
558            ),
559        };
560        let system_tables = SystemNamespace {
561            table_tree: TableTreeMut::new(
562                system_page,
563                guard.clone(),
564                mem.clone(),
565                freed_pages.clone(),
566            ),
567            transaction_guard: guard.clone(),
568        };
569
570        Ok(Self {
571            transaction_tracker,
572            mem: mem.clone(),
573            transaction_guard: guard.clone(),
574            transaction_id,
575            tables: Mutex::new(tables),
576            system_tables: Mutex::new(system_tables),
577            freed_tree: Mutex::new(BtreeMut::new(
578                freed_root,
579                guard,
580                mem,
581                post_commit_frees.clone(),
582            )),
583            freed_pages,
584            post_commit_frees,
585            completed: false,
586            dirty: AtomicBool::new(false),
587            durability: InternalDurability::Immediate,
588            two_phase_commit: false,
589            quick_repair: false,
590            created_persistent_savepoints: Mutex::new(Default::default()),
591            deleted_persistent_savepoints: Mutex::new(vec![]),
592        })
593    }
594
595    #[cfg(any(test, fuzzing))]
596    pub fn print_allocated_page_debug(&self) {
597        let mut all_allocated: HashSet<PageNumber> =
598            HashSet::from_iter(self.mem.all_allocated_pages());
599
600        let tracker = self.mem.tracker_page();
601        all_allocated.remove(&tracker);
602        println!("Tracker page");
603        println!("{tracker:?}");
604
605        let mut table_pages = vec![];
606        self.tables
607            .lock()
608            .unwrap()
609            .table_tree
610            .visit_all_pages(|path| {
611                table_pages.push(path.page_number());
612                Ok(())
613            })
614            .unwrap();
615        println!("Tables");
616        for p in table_pages {
617            all_allocated.remove(&p);
618            println!("{p:?}");
619        }
620
621        let mut system_table_pages = vec![];
622        self.system_tables
623            .lock()
624            .unwrap()
625            .table_tree
626            .visit_all_pages(|path| {
627                system_table_pages.push(path.page_number());
628                Ok(())
629            })
630            .unwrap();
631        println!("System tables");
632        for p in system_table_pages {
633            all_allocated.remove(&p);
634            println!("{p:?}");
635        }
636
637        println!("Free table");
638        if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
639            for p in freed_iter {
640                let p = p.unwrap();
641                all_allocated.remove(&p);
642                println!("{p:?}");
643            }
644        }
645        println!("Pending free (i.e. in freed table)");
646        for entry in self
647            .freed_tree
648            .lock()
649            .unwrap()
650            .range::<RangeFull, FreedTableKey>(&(..))
651            .unwrap()
652        {
653            let entry = entry.unwrap();
654            let value = entry.value();
655            for i in 0..value.len() {
656                let p = value.get(i);
657                all_allocated.remove(&p);
658                println!("{p:?}");
659            }
660        }
661        {
662            let pages = self.freed_pages.lock().unwrap();
663            if !pages.is_empty() {
664                println!("Pages in in-memory freed_pages");
665                for p in pages.iter() {
666                    println!("{p:?}");
667                    all_allocated.remove(p);
668                }
669            }
670        }
671        {
672            let pages = self.post_commit_frees.lock().unwrap();
673            if !pages.is_empty() {
674                println!("Pages in in-memory post_commit_frees");
675                for p in pages.iter() {
676                    println!("{p:?}");
677                    all_allocated.remove(p);
678                }
679            }
680        }
681        if !all_allocated.is_empty() {
682            println!("Leaked pages");
683            for p in all_allocated {
684                println!("{p:?}");
685            }
686        }
687    }
688
689    /// Creates a snapshot of the current database state, which can be used to rollback the database.
690    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
691    ///
692    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
693    /// Therefore, the lifetime of a savepoint should be minimized.
694    ///
695    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
696    /// or if the transaction's durability is less than `[Durability::Immediate]`
697    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
698        if self.durability != InternalDurability::Immediate {
699            return Err(SavepointError::InvalidSavepoint);
700        }
701
702        let mut savepoint = self.ephemeral_savepoint()?;
703
704        let mut system_tables = self.system_tables.lock().unwrap();
705
706        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
707        next_table.insert((), savepoint.get_id().next())?;
708        drop(next_table);
709
710        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
711        savepoint_table.insert(
712            savepoint.get_id(),
713            SerializedSavepoint::from_savepoint(&savepoint),
714        )?;
715
716        savepoint.set_persistent();
717
718        self.created_persistent_savepoints
719            .lock()
720            .unwrap()
721            .insert(savepoint.get_id());
722
723        Ok(savepoint.get_id().0)
724    }
725
726    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
727        self.transaction_guard.clone()
728    }
729
730    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
731        let mut system_tables = self.system_tables.lock().unwrap();
732        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
733        let value = next_table.get(())?;
734        if let Some(next_id) = value {
735            Ok(Some(next_id.value()))
736        } else {
737            Ok(None)
738        }
739    }
740
741    /// Get a persistent savepoint given its id
742    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
743        let mut system_tables = self.system_tables.lock().unwrap();
744        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
745        let value = table.get(SavepointId(id))?;
746
747        value
748            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
749            .ok_or(SavepointError::InvalidSavepoint)
750    }
751
752    /// Delete the given persistent savepoint.
753    ///
754    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
755    ///
756    /// Returns `true` if the savepoint existed
757    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
758    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
759        if self.durability != InternalDurability::Immediate {
760            return Err(SavepointError::InvalidSavepoint);
761        }
762        let mut system_tables = self.system_tables.lock().unwrap();
763        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
764        let savepoint = table.remove(SavepointId(id))?;
765        if let Some(serialized) = savepoint {
766            let savepoint = serialized
767                .value()
768                .to_savepoint(self.transaction_tracker.clone());
769            self.deleted_persistent_savepoints
770                .lock()
771                .unwrap()
772                .push((savepoint.get_id(), savepoint.get_transaction_id()));
773            Ok(true)
774        } else {
775            Ok(false)
776        }
777    }
778
779    /// List all persistent savepoints
780    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
781        let mut system_tables = self.system_tables.lock().unwrap();
782        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
783        let mut savepoints = vec![];
784        for savepoint in table.range::<SavepointId>(..)? {
785            savepoints.push(savepoint?.0.value().0);
786        }
787        Ok(savepoints.into_iter())
788    }
789
790    // TODO: deduplicate this with the one in Database
791    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
792        let id = self
793            .transaction_tracker
794            .register_read_transaction(&self.mem)?;
795
796        Ok(TransactionGuard::new_read(
797            id,
798            self.transaction_tracker.clone(),
799        ))
800    }
801
802    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
803        let id = self.transaction_tracker.allocate_savepoint();
804        Ok((id, self.allocate_read_transaction()?.leak()))
805    }
806
807    /// Creates a snapshot of the current database state, which can be used to rollback the database
808    ///
809    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
810    ///
811    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
812    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
813        if self.dirty.load(Ordering::Acquire) {
814            return Err(SavepointError::InvalidSavepoint);
815        }
816
817        let (id, transaction_id) = self.allocate_savepoint()?;
818        #[cfg(feature = "logging")]
819        debug!(
820            "Creating savepoint id={:?}, txn_id={:?}",
821            id, transaction_id
822        );
823
824        let regional_allocators = self.mem.get_raw_allocator_states();
825        let root = self.mem.get_data_root();
826        let system_root = self.mem.get_system_root();
827        let freed_root = self.mem.get_freed_root();
828        let savepoint = Savepoint::new_ephemeral(
829            &self.mem,
830            self.transaction_tracker.clone(),
831            id,
832            transaction_id,
833            root,
834            system_root,
835            freed_root,
836            regional_allocators,
837        );
838
839        Ok(savepoint)
840    }
841
842    /// Restore the state of the database to the given [`Savepoint`]
843    ///
844    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
845    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
846        // Ensure that user does not try to restore a Savepoint that is from a different Database
847        assert_eq!(
848            std::ptr::from_ref(self.transaction_tracker.as_ref()),
849            savepoint.db_address()
850        );
851
852        if !self
853            .transaction_tracker
854            .is_valid_savepoint(savepoint.get_id())
855        {
856            return Err(SavepointError::InvalidSavepoint);
857        }
858        #[cfg(feature = "logging")]
859        debug!(
860            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
861            savepoint.get_id(),
862            self.transaction_id
863        );
864        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
865        // the database
866        assert_eq!(self.mem.get_version(), savepoint.get_version());
867        self.dirty.store(true, Ordering::Release);
868
869        // Restoring a savepoint needs to accomplish the following:
870        // 1) restore the table tree. This is trivial, since we have the old root
871        // 1a) we also filter the freed tree to remove any pages referenced by the old root
872        // 2) free all pages that were allocated since the savepoint and are unreachable
873        //    from the restored table tree root. Here we diff the reachable pages from the old
874        //    and new roots
875        // 3) update the system tree to remove invalid persistent savepoints.
876
877        let old_system_tree = TableTree::new(
878            savepoint.get_system_root(),
879            PageHint::None,
880            self.transaction_guard.clone(),
881            self.mem.clone(),
882        )?;
883        let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
884            savepoint.get_freed_root(),
885            PageHint::None,
886            self.transaction_guard.clone(),
887            self.mem.clone(),
888        )?;
889
890        // Pages which are part of the system and freed trees in the savepoint, should be freed
891        // even after the savepoint is restored, because the system and freed trees only roll
892        // forward
893        let mut old_system_and_freed_pages = HashSet::new();
894        old_system_tree.visit_all_pages(|path| {
895            old_system_and_freed_pages.insert(path.page_number());
896            Ok(())
897        })?;
898        old_freed_tree.visit_all_pages(|path| {
899            old_system_and_freed_pages.insert(path.page_number());
900            Ok(())
901        })?;
902
903        // 1) restore the table tree
904        {
905            self.tables.lock().unwrap().table_tree = TableTreeMut::new(
906                savepoint.get_user_root(),
907                self.transaction_guard.clone(),
908                self.mem.clone(),
909                self.freed_pages.clone(),
910            );
911        }
912
913        // 1a) purge all transactions that happened after the savepoint from freed tree,
914        // except pages from the old system or freed tree in the savepoint. Those still need to be
915        // freed, since the system tree only rolls forward, never back. This brings all pages in the
916        // old data root back to the committed state
917        // This operation will also leak everything else that was allocated since the savepoint,
918        // but we fix that below -- noting that all the system trees that existed between the savepoint
919        // and now which might be referenced by other savepoints will become unreachable, since those
920        // savepoints are invalidated by this restoration
921        let mut txn_id = savepoint.get_transaction_id().next().raw_id();
922        let mut freed_tree = self.freed_tree.lock().unwrap();
923        loop {
924            let lower = FreedTableKey {
925                transaction_id: txn_id,
926                pagination_id: 0,
927            };
928
929            if freed_tree.range(&(lower..))?.next().is_none() {
930                break;
931            }
932            let lower = FreedTableKey {
933                transaction_id: txn_id,
934                pagination_id: 0,
935            };
936            let upper = FreedTableKey {
937                transaction_id: txn_id + 1,
938                pagination_id: 0,
939            };
940
941            // Find all the pending pages for this txn and filter them
942            let mut pending_pages = vec![];
943            for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
944                let item = entry?;
945                for i in 0..item.value().len() {
946                    let p = item.value().get(i);
947                    // Keep the old system and freed tree pages, but purge anything else
948                    if old_system_and_freed_pages.contains(&p) {
949                        pending_pages.push(p);
950                    }
951                }
952            }
953
954            let mut pagination_counter = 0u64;
955            while !pending_pages.is_empty() {
956                let chunk_size = 100;
957                let buffer_size = FreedPageList::required_bytes(chunk_size);
958                let key = FreedTableKey {
959                    transaction_id: txn_id,
960                    pagination_id: pagination_counter,
961                };
962                let mut access_guard =
963                    freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
964
965                let len = pending_pages.len();
966                access_guard.as_mut().clear();
967                for page in pending_pages.drain(len - min(len, chunk_size)..) {
968                    access_guard.as_mut().push_back(page);
969                }
970                drop(access_guard);
971
972                pagination_counter += 1;
973            }
974
975            txn_id += 1;
976        }
977
978        let mut current_system_and_freed_pages = HashSet::new();
979        self.system_tables
980            .lock()
981            .unwrap()
982            .table_tree
983            .visit_all_pages(|path| {
984                current_system_and_freed_pages.insert(path.page_number());
985                Ok(())
986            })?;
987        freed_tree.visit_all_pages(|path| {
988            current_system_and_freed_pages.insert(path.page_number());
989            Ok(())
990        })?;
991
992        let mut old_allocators: Vec<BuddyAllocator> = savepoint
993            .get_regional_allocators()
994            .iter()
995            .map(|data| BuddyAllocator::from_savepoint_state(data))
996            .collect();
997
998        // Find the oldest transaction in the current freed tree, for use below
999        {
1000            let oldest_unprocessed_transaction =
1001                if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
1002                    entry?.key().transaction_id
1003                } else {
1004                    self.transaction_id.raw_id()
1005                };
1006
1007            let lookup_key = FreedTableKey {
1008                transaction_id: oldest_unprocessed_transaction,
1009                pagination_id: 0,
1010            };
1011
1012            // Replay all finalized frees into the old allocator state to ensure that a page which
1013            // was pending free, freed, and then reallocated does not leak
1014            for entry in old_freed_tree.range(&(..lookup_key))? {
1015                let item = entry?;
1016                let pages: FreedPageList = item.value();
1017                for i in 0..pages.len() {
1018                    let page = pages.get(i);
1019                    assert!(
1020                        old_allocators[page.region as usize]
1021                            .is_allocated(page.page_index, page.page_order)
1022                    );
1023                    old_allocators[page.region as usize].free(page.page_index, page.page_order);
1024                }
1025            }
1026        }
1027
1028        // 2) free all pages that became unreachable
1029        let mut freed_pages = self.freed_pages.lock().unwrap();
1030        let mut already_awaiting_free: HashSet<PageNumber> = freed_pages.iter().copied().collect();
1031        already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
1032        let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
1033        for page in to_free {
1034            if already_awaiting_free.contains(&page) {
1035                // Make sure that we don't double free something that is already going to be freed
1036                continue;
1037            }
1038            if current_system_and_freed_pages.contains(&page) {
1039                // Don't free pages which are part of the current system or freed tree, even though
1040                // these pages are new. Again this is because these trees only move forward;
1041                // never backwards as part of a savepoint restore
1042                continue;
1043            }
1044            if self.mem.uncommitted(page) {
1045                self.mem.free(page);
1046            } else {
1047                freed_pages.push(page);
1048            }
1049        }
1050        drop(freed_pages);
1051
1052        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1053        // from later trying to restore a savepoint "on another timeline"
1054        self.transaction_tracker
1055            .invalidate_savepoints_after(savepoint.get_id());
1056        for persistent_savepoint in self.list_persistent_savepoints()? {
1057            if persistent_savepoint > savepoint.get_id().0 {
1058                self.delete_persistent_savepoint(persistent_savepoint)?;
1059            }
1060        }
1061
1062        Ok(())
1063    }
1064
1065    /// Set the desired durability level for writes made in this transaction
1066    /// Defaults to [`Durability::Immediate`]
1067    ///
1068    /// Will panic if the durability is reduced below `[Durability::Immediate]` after a persistent savepoint has been created or deleted.
1069    pub fn set_durability(&mut self, durability: Durability) {
1070        let no_created = self
1071            .created_persistent_savepoints
1072            .lock()
1073            .unwrap()
1074            .is_empty();
1075        let no_deleted = self
1076            .deleted_persistent_savepoints
1077            .lock()
1078            .unwrap()
1079            .is_empty();
1080        assert!(no_created && no_deleted);
1081
1082        self.durability = match durability {
1083            Durability::None => InternalDurability::None,
1084            Durability::Eventual => InternalDurability::Eventual,
1085            Durability::Immediate => InternalDurability::Immediate,
1086            #[allow(deprecated)]
1087            Durability::Paranoid => {
1088                self.set_two_phase_commit(true);
1089                InternalDurability::Immediate
1090            }
1091        };
1092    }
1093
1094    /// Enable or disable 2-phase commit (defaults to disabled)
1095    ///
1096    /// By default, data is written using the following 1-phase commit algorithm:
1097    ///
1098    /// 1. Update the inactive commit slot with the new database state
1099    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1100    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1101    ///
1102    /// All data is written with checksums. When opening the database after a crash, the most
1103    /// recent of the two commit slots with a valid checksum is used.
1104    ///
1105    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1106    /// function with close to perfect collision resistance when used with non-malicious input. An
1107    /// attacker with an extremely high degree of control over the database's workload, including
1108    /// the ability to cause the database process to crash, can cause invalid data to be written
1109    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1110    ///
1111    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1112    ///
1113    /// 1. Update the inactive commit slot with the new database state
1114    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1115    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1116    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1117    ///
1118    /// This mitigates a theoretical attack where an attacker who
1119    /// 1. can control the order in which pages are flushed to disk
1120    /// 2. can introduce crashes during `fsync`,
1121    /// 3. has knowledge of the database file contents, and
1122    /// 4. can include arbitrary data in a write transaction
1123    ///
1124    /// could cause a transaction to partially commit (some but not all of the data is written).
1125    /// This is described in the design doc in futher detail.
1126    ///
1127    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1128    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1129    /// a high degree of control over the database's workload, including the ability to cause the
1130    /// database process to crash, can cause the database to crash with the god byte primary bit
1131    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1132    /// controlled state.
1133    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1134        self.two_phase_commit = enabled;
1135    }
1136
1137    /// Enable or disable quick-repair (defaults to disabled)
1138    ///
1139    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1140    /// This involves walking the entire database to verify the checksums and reconstruct the
1141    /// allocator state, so it can be very slow if the database is large.
1142    ///
1143    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1144    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1145    /// (which guarantees that the primary commit slot is valid without needing to look at the
1146    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1147    pub fn set_quick_repair(&mut self, enabled: bool) {
1148        self.quick_repair = enabled;
1149    }
1150
1151    /// Open the given table
1152    ///
1153    /// The table will be created if it does not exist
1154    #[track_caller]
1155    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1156        &'txn self,
1157        definition: TableDefinition<K, V>,
1158    ) -> Result<Table<'txn, K, V>, TableError> {
1159        self.tables.lock().unwrap().open_table(self, definition)
1160    }
1161
1162    /// Open the given table
1163    ///
1164    /// The table will be created if it does not exist
1165    #[track_caller]
1166    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1167        &'txn self,
1168        definition: MultimapTableDefinition<K, V>,
1169    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1170        self.tables
1171            .lock()
1172            .unwrap()
1173            .open_multimap_table(self, definition)
1174    }
1175
1176    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1177        &self,
1178        name: &str,
1179        table: &BtreeMut<K, V>,
1180        length: u64,
1181    ) {
1182        self.tables.lock().unwrap().close_table(name, table, length);
1183    }
1184
1185    /// Rename the given table
1186    pub fn rename_table(
1187        &self,
1188        definition: impl TableHandle,
1189        new_name: impl TableHandle,
1190    ) -> Result<(), TableError> {
1191        let name = definition.name().to_string();
1192        // Drop the definition so that callers can pass in a `Table` to rename, without getting a TableAlreadyOpen error
1193        drop(definition);
1194        self.tables
1195            .lock()
1196            .unwrap()
1197            .rename_table(self, &name, new_name.name())
1198    }
1199
1200    /// Rename the given multimap table
1201    pub fn rename_multimap_table(
1202        &self,
1203        definition: impl MultimapTableHandle,
1204        new_name: impl MultimapTableHandle,
1205    ) -> Result<(), TableError> {
1206        let name = definition.name().to_string();
1207        // Drop the definition so that callers can pass in a `MultimapTable` to rename, without getting a TableAlreadyOpen error
1208        drop(definition);
1209        self.tables
1210            .lock()
1211            .unwrap()
1212            .rename_multimap_table(self, &name, new_name.name())
1213    }
1214
1215    /// Delete the given table
1216    ///
1217    /// Returns a bool indicating whether the table existed
1218    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1219        let name = definition.name().to_string();
1220        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1221        drop(definition);
1222        self.tables.lock().unwrap().delete_table(self, &name)
1223    }
1224
1225    /// Delete the given table
1226    ///
1227    /// Returns a bool indicating whether the table existed
1228    pub fn delete_multimap_table(
1229        &self,
1230        definition: impl MultimapTableHandle,
1231    ) -> Result<bool, TableError> {
1232        let name = definition.name().to_string();
1233        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1234        drop(definition);
1235        self.tables
1236            .lock()
1237            .unwrap()
1238            .delete_multimap_table(self, &name)
1239    }
1240
1241    /// List all the tables
1242    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1243        self.tables
1244            .lock()
1245            .unwrap()
1246            .table_tree
1247            .list_tables(TableType::Normal)
1248            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1249    }
1250
1251    /// List all the multimap tables
1252    pub fn list_multimap_tables(
1253        &self,
1254    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1255        self.tables
1256            .lock()
1257            .unwrap()
1258            .table_tree
1259            .list_tables(TableType::Multimap)
1260            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1261    }
1262
1263    /// Commit the transaction
1264    ///
1265    /// All writes performed in this transaction will be visible to future transactions, and are
1266    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1267    pub fn commit(mut self) -> Result<(), CommitError> {
1268        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1269        self.completed = true;
1270        self.commit_inner()
1271    }
1272
1273    fn commit_inner(&mut self) -> Result<(), CommitError> {
1274        // Quick-repair requires 2-phase commit
1275        if self.quick_repair {
1276            self.two_phase_commit = true;
1277        }
1278
1279        #[cfg(feature = "logging")]
1280        debug!(
1281            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1282            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1283        );
1284        match self.durability {
1285            InternalDurability::None => self.non_durable_commit()?,
1286            InternalDurability::Eventual => self.durable_commit(true)?,
1287            InternalDurability::Immediate => self.durable_commit(false)?,
1288        }
1289
1290        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1291            self.transaction_tracker
1292                .deallocate_savepoint(*savepoint, *transaction);
1293        }
1294
1295        #[cfg(feature = "logging")]
1296        debug!(
1297            "Finished commit of transaction id={:?}",
1298            self.transaction_id
1299        );
1300
1301        Ok(())
1302    }
1303
1304    /// Abort the transaction
1305    ///
1306    /// All writes performed in this transaction will be rolled back
1307    pub fn abort(mut self) -> Result {
1308        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1309        self.completed = true;
1310        self.abort_inner()
1311    }
1312
1313    fn abort_inner(&mut self) -> Result {
1314        #[cfg(feature = "logging")]
1315        debug!("Aborting transaction id={:?}", self.transaction_id);
1316        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1317            match self.delete_persistent_savepoint(savepoint.0) {
1318                Ok(_) => {}
1319                Err(err) => match err {
1320                    SavepointError::InvalidSavepoint => {
1321                        unreachable!();
1322                    }
1323                    SavepointError::Storage(storage_err) => {
1324                        return Err(storage_err);
1325                    }
1326                },
1327            }
1328        }
1329        self.tables
1330            .lock()
1331            .unwrap()
1332            .table_tree
1333            .clear_table_root_updates();
1334        self.mem.rollback_uncommitted_writes()?;
1335        #[cfg(feature = "logging")]
1336        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1337        Ok(())
1338    }
1339
1340    pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1341        let free_until_transaction = self
1342            .transaction_tracker
1343            .oldest_live_read_transaction()
1344            .map_or(self.transaction_id, |x| x.next());
1345        self.process_freed_pages(free_until_transaction)?;
1346
1347        let user_root = self
1348            .tables
1349            .lock()
1350            .unwrap()
1351            .table_tree
1352            .flush_table_root_updates()?
1353            .finalize_dirty_checksums()?;
1354
1355        let mut system_tables = self.system_tables.lock().unwrap();
1356        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1357        system_tree
1358            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1359            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1360
1361        if self.quick_repair {
1362            system_tree.create_table_and_flush_table_root(
1363                ALLOCATOR_STATE_TABLE_NAME,
1364                |tree: &mut AllocatorStateTree| {
1365                    let mut pagination_counter = 0;
1366
1367                    loop {
1368                        let num_regions = self
1369                            .mem
1370                            .reserve_allocator_state(tree, self.transaction_id)?;
1371
1372                        // We can't free pages after the commit, because that would invalidate our
1373                        // saved allocator state. Everything needs to go through the transactional
1374                        // free mechanism
1375                        self.store_freed_pages(&mut pagination_counter, true)?;
1376
1377                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1378                            return Ok(());
1379                        }
1380
1381                        // Clear out the table before retrying, just in case the number of regions
1382                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1383                        // free the pages immediately -- we need to reuse those pages to guarantee
1384                        // that our retry loop will eventually terminate
1385                        while let Some(guards) = tree.last()? {
1386                            let key = guards.0.value();
1387                            drop(guards);
1388                            tree.remove(&key)?;
1389                        }
1390                    }
1391                },
1392            )?;
1393        } else {
1394            // If a savepoint exists it might reference the freed-tree, since it holds a reference to the
1395            // root of the freed-tree. Therefore, we must use the transactional free mechanism to free
1396            // those pages. If there are no save points then these can be immediately freed, which is
1397            // done at the end of this function.
1398            let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1399            self.store_freed_pages(&mut 0, savepoint_exists)?;
1400        }
1401
1402        let system_root = system_tree.finalize_dirty_checksums()?;
1403
1404        // Finalize freed table checksums, before doing the final commit
1405        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1406
1407        self.mem.commit(
1408            user_root,
1409            system_root,
1410            freed_root,
1411            self.transaction_id,
1412            eventual,
1413            self.two_phase_commit,
1414        )?;
1415
1416        // Mark any pending non-durable commits as fully committed.
1417        self.transaction_tracker.clear_pending_non_durable_commits();
1418
1419        // Immediately free the pages that were freed from the freed-tree itself. These are only
1420        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
1421        for page in self.post_commit_frees.lock().unwrap().drain(..) {
1422            self.mem.free(page);
1423        }
1424
1425        Ok(())
1426    }
1427
1428    // Commit without a durability guarantee
1429    pub(crate) fn non_durable_commit(&mut self) -> Result {
1430        let user_root = self
1431            .tables
1432            .lock()
1433            .unwrap()
1434            .table_tree
1435            .flush_table_root_updates()?
1436            .finalize_dirty_checksums()?;
1437
1438        let system_root = self
1439            .system_tables
1440            .lock()
1441            .unwrap()
1442            .table_tree
1443            .flush_table_root_updates()?
1444            .finalize_dirty_checksums()?;
1445
1446        // Store all freed pages for a future commit(), since we can't free pages during a
1447        // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
1448        self.store_freed_pages(&mut 0, true)?;
1449
1450        // Finalize all checksums, before doing the final commit
1451        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1452
1453        self.mem
1454            .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1455        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
1456        // are only processed after this has been persisted
1457        self.transaction_tracker
1458            .register_non_durable_commit(self.transaction_id);
1459        Ok(())
1460    }
1461
1462    // Relocate pages to lower number regions/pages
1463    // Returns true if a page(s) was moved
1464    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1465        let mut progress = false;
1466        // Relocate the region tracker page
1467        if self.mem.relocate_region_tracker()? {
1468            progress = true;
1469        }
1470
1471        // Find the 1M highest pages
1472        let mut highest_pages = BTreeMap::new();
1473        let mut tables = self.tables.lock().unwrap();
1474        let table_tree = &mut tables.table_tree;
1475        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1476        let mut system_tables = self.system_tables.lock().unwrap();
1477        let system_table_tree = &mut system_tables.table_tree;
1478        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1479
1480        // Calculate how many of them can be relocated to lower pages, starting from the last page
1481        let mut relocation_map = HashMap::new();
1482        for path in highest_pages.into_values().rev() {
1483            if relocation_map.contains_key(&path.page_number()) {
1484                continue;
1485            }
1486            let old_page = self.mem.get_page(path.page_number())?;
1487            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1488            let new_page_number = new_page.get_page_number();
1489            // We have to copy at least the page type into the new page.
1490            // Otherwise its cache priority will be calculated incorrectly
1491            new_page.memory_mut()[0] = old_page.memory()[0];
1492            drop(new_page);
1493            // We're able to move this to a lower page, so insert it and rewrite all its parents
1494            if new_page_number < path.page_number() {
1495                relocation_map.insert(path.page_number(), new_page_number);
1496                for parent in path.parents() {
1497                    if relocation_map.contains_key(parent) {
1498                        continue;
1499                    }
1500                    let old_parent = self.mem.get_page(*parent)?;
1501                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1502                    let new_page_number = new_page.get_page_number();
1503                    // We have to copy at least the page type into the new page.
1504                    // Otherwise its cache priority will be calculated incorrectly
1505                    new_page.memory_mut()[0] = old_parent.memory()[0];
1506                    drop(new_page);
1507                    relocation_map.insert(*parent, new_page_number);
1508                }
1509            } else {
1510                self.mem.free(new_page_number);
1511                break;
1512            }
1513        }
1514
1515        if !relocation_map.is_empty() {
1516            progress = true;
1517        }
1518
1519        table_tree.relocate_tables(&relocation_map)?;
1520        system_table_tree.relocate_tables(&relocation_map)?;
1521
1522        Ok(progress)
1523    }
1524
1525    // NOTE: must be called before store_freed_pages() during commit, since this can create
1526    // more pages freed by the current transaction
1527    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1528        // We assume below that PageNumber is length 8
1529        assert_eq!(PageNumber::serialized_size(), 8);
1530        let lookup_key = FreedTableKey {
1531            transaction_id: free_until.raw_id(),
1532            pagination_id: 0,
1533        };
1534
1535        let mut to_remove = vec![];
1536        let mut freed_tree = self.freed_tree.lock().unwrap();
1537        for entry in freed_tree.range(&(..lookup_key))? {
1538            let entry = entry?;
1539            to_remove.push(entry.key());
1540            let value = entry.value();
1541            for i in 0..value.len() {
1542                self.mem.free(value.get(i));
1543            }
1544        }
1545
1546        // Remove all the old transactions
1547        for key in to_remove {
1548            freed_tree.remove(&key)?;
1549        }
1550
1551        Ok(())
1552    }
1553
1554    fn store_freed_pages(
1555        &self,
1556        pagination_counter: &mut u64,
1557        include_post_commit_free: bool,
1558    ) -> Result {
1559        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
1560
1561        let mut freed_tree = self.freed_tree.lock().unwrap();
1562        if include_post_commit_free {
1563            // Move all the post-commit pages that came from the freed-tree. These need to be stored
1564            // since we can't free pages until a durable commit
1565            self.freed_pages
1566                .lock()
1567                .unwrap()
1568                .extend(self.post_commit_frees.lock().unwrap().drain(..));
1569        }
1570        while !self.freed_pages.lock().unwrap().is_empty() {
1571            let chunk_size = 100;
1572            let buffer_size = FreedPageList::required_bytes(chunk_size);
1573            let key = FreedTableKey {
1574                transaction_id: self.transaction_id.raw_id(),
1575                pagination_id: *pagination_counter,
1576            };
1577            let mut access_guard =
1578                freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1579
1580            let mut freed_pages = self.freed_pages.lock().unwrap();
1581            let len = freed_pages.len();
1582            access_guard.as_mut().clear();
1583            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1584                access_guard.as_mut().push_back(page);
1585            }
1586            drop(access_guard);
1587
1588            *pagination_counter += 1;
1589
1590            if include_post_commit_free {
1591                // Move all the post-commit pages that came from the freed-tree. These need to be stored
1592                // since we can't free pages until a durable commit
1593                freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1594            }
1595        }
1596
1597        Ok(())
1598    }
1599
1600    /// Retrieves information about storage usage in the database
1601    pub fn stats(&self) -> Result<DatabaseStats> {
1602        let tables = self.tables.lock().unwrap();
1603        let table_tree = &tables.table_tree;
1604        let data_tree_stats = table_tree.stats()?;
1605
1606        let system_tables = self.system_tables.lock().unwrap();
1607        let system_table_tree = &system_tables.table_tree;
1608        let system_tree_stats = system_table_tree.stats()?;
1609
1610        let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1611
1612        let total_metadata_bytes = data_tree_stats.metadata_bytes()
1613            + system_tree_stats.metadata_bytes
1614            + system_tree_stats.stored_leaf_bytes
1615            + freed_tree_stats.metadata_bytes
1616            + freed_tree_stats.stored_leaf_bytes;
1617        let total_fragmented = data_tree_stats.fragmented_bytes()
1618            + system_tree_stats.fragmented_bytes
1619            + freed_tree_stats.fragmented_bytes
1620            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1621
1622        Ok(DatabaseStats {
1623            tree_height: data_tree_stats.tree_height(),
1624            allocated_pages: self.mem.count_allocated_pages()?,
1625            leaf_pages: data_tree_stats.leaf_pages(),
1626            branch_pages: data_tree_stats.branch_pages(),
1627            stored_leaf_bytes: data_tree_stats.stored_bytes(),
1628            metadata_bytes: total_metadata_bytes,
1629            fragmented_bytes: total_fragmented,
1630            page_size: self.mem.get_page_size(),
1631        })
1632    }
1633
1634    #[cfg(any(test, fuzzing))]
1635    pub fn num_region_tracker_pages(&self) -> u64 {
1636        1 << self.mem.tracker_page().page_order
1637    }
1638
1639    #[allow(dead_code)]
1640    pub(crate) fn print_debug(&self) -> Result {
1641        // Flush any pending updates to make sure we get the latest root
1642        let mut tables = self.tables.lock().unwrap();
1643        if let Some(page) = tables
1644            .table_tree
1645            .flush_table_root_updates()
1646            .unwrap()
1647            .finalize_dirty_checksums()
1648            .unwrap()
1649        {
1650            eprintln!("Master tree:");
1651            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1652                Some(page),
1653                PageHint::None,
1654                self.transaction_guard.clone(),
1655                self.mem.clone(),
1656            )?;
1657            master_tree.print_debug(true)?;
1658        }
1659
1660        Ok(())
1661    }
1662}
1663
1664impl Drop for WriteTransaction {
1665    fn drop(&mut self) {
1666        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1667            #[allow(unused_variables)]
1668            if let Err(error) = self.abort_inner() {
1669                #[cfg(feature = "logging")]
1670                warn!("Failure automatically aborting transaction: {}", error);
1671            }
1672        }
1673    }
1674}
1675
1676/// A read-only transaction
1677///
1678/// Read-only transactions may exist concurrently with writes
1679pub struct ReadTransaction {
1680    mem: Arc<TransactionalMemory>,
1681    tree: TableTree,
1682}
1683
1684impl ReadTransaction {
1685    pub(crate) fn new(
1686        mem: Arc<TransactionalMemory>,
1687        guard: TransactionGuard,
1688    ) -> Result<Self, TransactionError> {
1689        let root_page = mem.get_data_root();
1690        let guard = Arc::new(guard);
1691        Ok(Self {
1692            mem: mem.clone(),
1693            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1694                .map_err(TransactionError::Storage)?,
1695        })
1696    }
1697
1698    /// Open the given table
1699    pub fn open_table<K: Key + 'static, V: Value + 'static>(
1700        &self,
1701        definition: TableDefinition<K, V>,
1702    ) -> Result<ReadOnlyTable<K, V>, TableError> {
1703        let header = self
1704            .tree
1705            .get_table::<K, V>(definition.name(), TableType::Normal)?
1706            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1707
1708        match header {
1709            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1710                definition.name().to_string(),
1711                table_root,
1712                PageHint::Clean,
1713                self.tree.transaction_guard().clone(),
1714                self.mem.clone(),
1715            )?),
1716            InternalTableDefinition::Multimap { .. } => unreachable!(),
1717        }
1718    }
1719
1720    /// Open the given table without a type
1721    pub fn open_untyped_table(
1722        &self,
1723        handle: impl TableHandle,
1724    ) -> Result<ReadOnlyUntypedTable, TableError> {
1725        let header = self
1726            .tree
1727            .get_table_untyped(handle.name(), TableType::Normal)?
1728            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1729
1730        match header {
1731            InternalTableDefinition::Normal {
1732                table_root,
1733                fixed_key_size,
1734                fixed_value_size,
1735                ..
1736            } => Ok(ReadOnlyUntypedTable::new(
1737                table_root,
1738                fixed_key_size,
1739                fixed_value_size,
1740                self.mem.clone(),
1741            )),
1742            InternalTableDefinition::Multimap { .. } => unreachable!(),
1743        }
1744    }
1745
1746    /// Open the given table
1747    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1748        &self,
1749        definition: MultimapTableDefinition<K, V>,
1750    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1751        let header = self
1752            .tree
1753            .get_table::<K, V>(definition.name(), TableType::Multimap)?
1754            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1755
1756        match header {
1757            InternalTableDefinition::Normal { .. } => unreachable!(),
1758            InternalTableDefinition::Multimap {
1759                table_root,
1760                table_length,
1761                ..
1762            } => Ok(ReadOnlyMultimapTable::new(
1763                table_root,
1764                table_length,
1765                PageHint::Clean,
1766                self.tree.transaction_guard().clone(),
1767                self.mem.clone(),
1768            )?),
1769        }
1770    }
1771
1772    /// Open the given table without a type
1773    pub fn open_untyped_multimap_table(
1774        &self,
1775        handle: impl MultimapTableHandle,
1776    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1777        let header = self
1778            .tree
1779            .get_table_untyped(handle.name(), TableType::Multimap)?
1780            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1781
1782        match header {
1783            InternalTableDefinition::Normal { .. } => unreachable!(),
1784            InternalTableDefinition::Multimap {
1785                table_root,
1786                table_length,
1787                fixed_key_size,
1788                fixed_value_size,
1789                ..
1790            } => Ok(ReadOnlyUntypedMultimapTable::new(
1791                table_root,
1792                table_length,
1793                fixed_key_size,
1794                fixed_value_size,
1795                self.mem.clone(),
1796            )),
1797        }
1798    }
1799
1800    /// List all the tables
1801    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1802        self.tree
1803            .list_tables(TableType::Normal)
1804            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1805    }
1806
1807    /// List all the multimap tables
1808    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1809        self.tree
1810            .list_tables(TableType::Multimap)
1811            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1812    }
1813
1814    /// Close the transaction
1815    ///
1816    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
1817    /// so this method does not normally need to be called.
1818    /// This method can be used to ensure that there are no outstanding objects remaining.
1819    ///
1820    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
1821    pub fn close(self) -> Result<(), TransactionError> {
1822        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1823            return Err(TransactionError::ReadTransactionStillInUse(self));
1824        }
1825        // No-op, just drop ourself
1826        Ok(())
1827    }
1828}
1829
1830impl Debug for ReadTransaction {
1831    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1832        f.write_str("ReadTransaction")
1833    }
1834}
1835
1836#[cfg(test)]
1837mod test {
1838    use crate::{Database, TableDefinition};
1839
1840    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1841
1842    #[test]
1843    fn transaction_id_persistence() {
1844        let tmpfile = crate::create_tempfile();
1845        let db = Database::create(tmpfile.path()).unwrap();
1846        let write_txn = db.begin_write().unwrap();
1847        {
1848            let mut table = write_txn.open_table(X).unwrap();
1849            table.insert("hello", "world").unwrap();
1850        }
1851        let first_txn_id = write_txn.transaction_id;
1852        write_txn.commit().unwrap();
1853        drop(db);
1854
1855        let db2 = Database::create(tmpfile.path()).unwrap();
1856        let write_txn = db2.begin_write().unwrap();
1857        assert!(write_txn.transaction_id > first_txn_id);
1858    }
1859}