redb/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, Page,
9    PageHint, PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType,
10    TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
11};
12use crate::types::{Key, Value};
13use crate::{
14    AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15    ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16    TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17    UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28#[cfg(any(test, fuzzing))]
29use std::ops::RangeFull;
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::{Arc, Mutex};
32use std::{panic, thread};
33
34const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
35const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
36    SystemTableDefinition::new("next_savepoint_id");
37pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
38    SystemTableDefinition::new("persistent_savepoints");
39// The allocator state table is stored in the system table tree, but it's accessed using
40// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
41pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
42pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
43
44#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
45pub(crate) enum AllocatorStateKey {
46    Region(u32),
47    RegionTracker,
48    TransactionId,
49}
50
51impl Value for AllocatorStateKey {
52    type SelfType<'a> = Self;
53    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
54
55    fn fixed_width() -> Option<usize> {
56        Some(1 + size_of::<u32>())
57    }
58
59    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
60    where
61        Self: 'a,
62    {
63        match data[0] {
64            0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
65            1 => Self::RegionTracker,
66            2 => Self::TransactionId,
67            _ => unreachable!(),
68        }
69    }
70
71    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
72    where
73        Self: 'a,
74        Self: 'b,
75    {
76        let mut result = Self::AsBytes::default();
77        match value {
78            Self::Region(region) => {
79                result[0] = 0;
80                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
81            }
82            Self::RegionTracker => {
83                result[0] = 1;
84            }
85            Self::TransactionId => {
86                result[0] = 2;
87            }
88        }
89
90        result
91    }
92
93    fn type_name() -> TypeName {
94        TypeName::internal("redb::AllocatorStateKey")
95    }
96}
97
98impl Key for AllocatorStateKey {
99    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
100        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
101    }
102}
103
104pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
105    name: &'a str,
106    _key_type: PhantomData<K>,
107    _value_type: PhantomData<V>,
108}
109
110impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
111    pub const fn new(name: &'a str) -> Self {
112        assert!(!name.is_empty());
113        Self {
114            name,
115            _key_type: PhantomData,
116            _value_type: PhantomData,
117        }
118    }
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'a, K, V> {
122    fn name(&self) -> &str {
123        self.name
124    }
125}
126
127impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
128
129impl<'a, K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'a, K, V> {
130    fn clone(&self) -> Self {
131        *self
132    }
133}
134
135impl<'a, K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'a, K, V> {}
136
137impl<'a, K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'a, K, V> {
138    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
139        write!(
140            f,
141            "{}<{}, {}>",
142            self.name,
143            K::type_name().name(),
144            V::type_name().name()
145        )
146    }
147}
148
149/// Informational storage stats about the database
150#[derive(Debug)]
151pub struct DatabaseStats {
152    pub(crate) tree_height: u32,
153    pub(crate) allocated_pages: u64,
154    pub(crate) leaf_pages: u64,
155    pub(crate) branch_pages: u64,
156    pub(crate) stored_leaf_bytes: u64,
157    pub(crate) metadata_bytes: u64,
158    pub(crate) fragmented_bytes: u64,
159    pub(crate) page_size: usize,
160}
161
162impl DatabaseStats {
163    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
164    pub fn tree_height(&self) -> u32 {
165        self.tree_height
166    }
167
168    /// Number of pages allocated
169    pub fn allocated_pages(&self) -> u64 {
170        self.allocated_pages
171    }
172
173    /// Number of leaf pages that store user data
174    pub fn leaf_pages(&self) -> u64 {
175        self.leaf_pages
176    }
177
178    /// Number of branch pages in btrees that store user data
179    pub fn branch_pages(&self) -> u64 {
180        self.branch_pages
181    }
182
183    /// Number of bytes consumed by keys and values that have been inserted.
184    /// Does not include indexing overhead
185    pub fn stored_bytes(&self) -> u64 {
186        self.stored_leaf_bytes
187    }
188
189    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
190    pub fn metadata_bytes(&self) -> u64 {
191        self.metadata_bytes
192    }
193
194    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
195    pub fn fragmented_bytes(&self) -> u64 {
196        self.fragmented_bytes
197    }
198
199    /// Number of bytes per page
200    pub fn page_size(&self) -> usize {
201        self.page_size
202    }
203}
204
205#[derive(Copy, Clone, Debug)]
206#[non_exhaustive]
207pub enum Durability {
208    /// Commits with this durability level will not be persisted to disk unless followed by a
209    /// commit with a higher durability level.
210    ///
211    /// Note: Pages are only freed during commits with higher durability levels. Exclusively using
212    /// this durability level will result in rapid growth of the database file.
213    None,
214    /// Commits with this durability level have been queued for persitance to disk, and should be
215    /// persistent some time after [`WriteTransaction::commit`] returns.
216    Eventual,
217    /// Commits with this durability level are guaranteed to be persistent as soon as
218    /// [`WriteTransaction::commit`] returns.
219    Immediate,
220    /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code
221    /// should call `set_two_phase_commit(true)` directly instead.
222    #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
223    Paranoid,
224}
225
226// These are the actual durability levels used internally. `Durability::Paranoid` is translated
227// to `InternalDurability::Immediate`, and also enables 2-phase commit
228#[derive(Copy, Clone, Debug, PartialEq, Eq)]
229enum InternalDurability {
230    None,
231    Eventual,
232    Immediate,
233}
234
235// Like a Table but only one may be open at a time to avoid possible races
236pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
237    name: String,
238    namespace: &'s mut SystemNamespace<'db>,
239    tree: BtreeMut<'s, K, V>,
240    transaction_guard: Arc<TransactionGuard>,
241}
242
243impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
244    fn new(
245        name: &str,
246        table_root: Option<BtreeHeader>,
247        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
248        guard: Arc<TransactionGuard>,
249        mem: Arc<TransactionalMemory>,
250        namespace: &'s mut SystemNamespace<'db>,
251    ) -> SystemTable<'db, 's, K, V> {
252        SystemTable {
253            name: name.to_string(),
254            namespace,
255            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
256            transaction_guard: guard,
257        }
258    }
259
260    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
261    where
262        K: 'a,
263    {
264        self.tree.get(key.borrow())
265    }
266
267    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
268    where
269        K: 'a,
270        KR: Borrow<K::SelfType<'a>> + 'a,
271    {
272        self.tree
273            .range(&range)
274            .map(|x| Range::new(x, self.transaction_guard.clone()))
275    }
276
277    pub fn insert<'k, 'v>(
278        &mut self,
279        key: impl Borrow<K::SelfType<'k>>,
280        value: impl Borrow<V::SelfType<'v>>,
281    ) -> Result<Option<AccessGuard<V>>> {
282        let value_len = V::as_bytes(value.borrow()).as_ref().len();
283        if value_len > MAX_VALUE_LENGTH {
284            return Err(StorageError::ValueTooLarge(value_len));
285        }
286        let key_len = K::as_bytes(key.borrow()).as_ref().len();
287        if key_len > MAX_VALUE_LENGTH {
288            return Err(StorageError::ValueTooLarge(key_len));
289        }
290        if value_len + key_len > MAX_PAIR_LENGTH {
291            return Err(StorageError::ValueTooLarge(value_len + key_len));
292        }
293        self.tree.insert(key.borrow(), value.borrow())
294    }
295
296    pub fn remove<'a>(
297        &mut self,
298        key: impl Borrow<K::SelfType<'a>>,
299    ) -> Result<Option<AccessGuard<V>>>
300    where
301        K: 'a,
302    {
303        self.tree.remove(key.borrow())
304    }
305}
306
307impl<'db, 's, K: Key + 'static, V: Value + 'static> Drop for SystemTable<'db, 's, K, V> {
308    fn drop(&mut self) {
309        self.namespace.close_table(
310            &self.name,
311            &self.tree,
312            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
313        );
314    }
315}
316
317struct SystemNamespace<'db> {
318    table_tree: TableTreeMut<'db>,
319    transaction_guard: Arc<TransactionGuard>,
320}
321
322impl<'db> SystemNamespace<'db> {
323    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
324        &'s mut self,
325        transaction: &'txn WriteTransaction,
326        definition: SystemTableDefinition<K, V>,
327    ) -> Result<SystemTable<'db, 's, K, V>> {
328        #[cfg(feature = "logging")]
329        debug!("Opening system table: {}", definition);
330        let (root, _) = self
331            .table_tree
332            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
333            .map_err(|e| {
334                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
335            })?;
336        transaction.dirty.store(true, Ordering::Release);
337
338        Ok(SystemTable::new(
339            definition.name(),
340            root,
341            transaction.freed_pages.clone(),
342            self.transaction_guard.clone(),
343            transaction.mem.clone(),
344            self,
345        ))
346    }
347
348    fn close_table<K: Key + 'static, V: Value + 'static>(
349        &mut self,
350        name: &str,
351        table: &BtreeMut<K, V>,
352        length: u64,
353    ) {
354        self.table_tree
355            .stage_update_table_root(name, table.get_root(), length);
356    }
357}
358
359struct TableNamespace<'db> {
360    open_tables: HashMap<String, &'static panic::Location<'static>>,
361    table_tree: TableTreeMut<'db>,
362}
363
364impl<'db> TableNamespace<'db> {
365    #[track_caller]
366    fn inner_open<K: Key + 'static, V: Value + 'static>(
367        &mut self,
368        name: &str,
369        table_type: TableType,
370    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
371        if let Some(location) = self.open_tables.get(name) {
372            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
373        }
374
375        let root = self
376            .table_tree
377            .get_or_create_table::<K, V>(name, table_type)?;
378        self.open_tables
379            .insert(name.to_string(), panic::Location::caller());
380
381        Ok(root)
382    }
383
384    #[track_caller]
385    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
386        &mut self,
387        transaction: &'txn WriteTransaction,
388        definition: MultimapTableDefinition<K, V>,
389    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
390        #[cfg(feature = "logging")]
391        debug!("Opening multimap table: {}", definition);
392        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
393        transaction.dirty.store(true, Ordering::Release);
394
395        Ok(MultimapTable::new(
396            definition.name(),
397            root,
398            length,
399            transaction.freed_pages.clone(),
400            transaction.mem.clone(),
401            transaction,
402        ))
403    }
404
405    #[track_caller]
406    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
407        &mut self,
408        transaction: &'txn WriteTransaction,
409        definition: TableDefinition<K, V>,
410    ) -> Result<Table<'txn, K, V>, TableError> {
411        #[cfg(feature = "logging")]
412        debug!("Opening table: {}", definition);
413        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
414        transaction.dirty.store(true, Ordering::Release);
415
416        Ok(Table::new(
417            definition.name(),
418            root,
419            transaction.freed_pages.clone(),
420            transaction.mem.clone(),
421            transaction,
422        ))
423    }
424
425    #[track_caller]
426    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
427        if let Some(location) = self.open_tables.get(name) {
428            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
429        }
430
431        self.table_tree.delete_table(name, table_type)
432    }
433
434    #[track_caller]
435    fn delete_table(
436        &mut self,
437        transaction: &WriteTransaction,
438        name: &str,
439    ) -> Result<bool, TableError> {
440        #[cfg(feature = "logging")]
441        debug!("Deleting table: {}", name);
442        transaction.dirty.store(true, Ordering::Release);
443        self.inner_delete(name, TableType::Normal)
444    }
445
446    #[track_caller]
447    fn delete_multimap_table(
448        &mut self,
449        transaction: &WriteTransaction,
450        name: &str,
451    ) -> Result<bool, TableError> {
452        #[cfg(feature = "logging")]
453        debug!("Deleting multimap table: {}", name);
454        transaction.dirty.store(true, Ordering::Release);
455        self.inner_delete(name, TableType::Multimap)
456    }
457
458    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
459        &mut self,
460        name: &str,
461        table: &BtreeMut<K, V>,
462        length: u64,
463    ) {
464        self.open_tables.remove(name).unwrap();
465        self.table_tree
466            .stage_update_table_root(name, table.get_root(), length);
467    }
468}
469
470/// A read/write transaction
471///
472/// Only a single [`WriteTransaction`] may exist at a time
473pub struct WriteTransaction {
474    transaction_tracker: Arc<TransactionTracker>,
475    mem: Arc<TransactionalMemory>,
476    transaction_guard: Arc<TransactionGuard>,
477    transaction_id: TransactionId,
478    // The table of freed pages by transaction. FreedTableKey -> binary.
479    // The binary blob is a length-prefixed array of PageNumber
480    freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
481    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
482    // Pages that were freed from the freed-tree. These can be freed immediately after commit(),
483    // since read transactions do not access the freed-tree
484    post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
485    tables: Mutex<TableNamespace<'static>>,
486    system_tables: Mutex<SystemNamespace<'static>>,
487    completed: bool,
488    dirty: AtomicBool,
489    durability: InternalDurability,
490    two_phase_commit: bool,
491    quick_repair: bool,
492    // Persistent savepoints created during this transaction
493    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
494    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
495}
496
497impl WriteTransaction {
498    pub(crate) fn new(
499        guard: TransactionGuard,
500        transaction_tracker: Arc<TransactionTracker>,
501        mem: Arc<TransactionalMemory>,
502    ) -> Result<Self> {
503        let transaction_id = guard.id();
504        let guard = Arc::new(guard);
505
506        let root_page = mem.get_data_root();
507        let system_page = mem.get_system_root();
508        let freed_root = mem.get_freed_root();
509        let freed_pages = Arc::new(Mutex::new(vec![]));
510        let post_commit_frees = Arc::new(Mutex::new(vec![]));
511
512        let tables = TableNamespace {
513            open_tables: Default::default(),
514            table_tree: TableTreeMut::new(
515                root_page,
516                guard.clone(),
517                mem.clone(),
518                freed_pages.clone(),
519            ),
520        };
521        let system_tables = SystemNamespace {
522            table_tree: TableTreeMut::new(
523                system_page,
524                guard.clone(),
525                mem.clone(),
526                freed_pages.clone(),
527            ),
528            transaction_guard: guard.clone(),
529        };
530
531        Ok(Self {
532            transaction_tracker,
533            mem: mem.clone(),
534            transaction_guard: guard.clone(),
535            transaction_id,
536            tables: Mutex::new(tables),
537            system_tables: Mutex::new(system_tables),
538            freed_tree: Mutex::new(BtreeMut::new(
539                freed_root,
540                guard,
541                mem,
542                post_commit_frees.clone(),
543            )),
544            freed_pages,
545            post_commit_frees,
546            completed: false,
547            dirty: AtomicBool::new(false),
548            durability: InternalDurability::Immediate,
549            two_phase_commit: false,
550            quick_repair: false,
551            created_persistent_savepoints: Mutex::new(Default::default()),
552            deleted_persistent_savepoints: Mutex::new(vec![]),
553        })
554    }
555
556    #[cfg(any(test, fuzzing))]
557    pub fn print_allocated_page_debug(&self) {
558        let mut all_allocated: HashSet<PageNumber> =
559            HashSet::from_iter(self.mem.all_allocated_pages());
560
561        let tracker = self.mem.tracker_page();
562        all_allocated.remove(&tracker);
563        println!("Tracker page");
564        println!("{tracker:?}");
565
566        let table_allocators = self
567            .tables
568            .lock()
569            .unwrap()
570            .table_tree
571            .all_referenced_pages()
572            .unwrap();
573        let mut table_pages = vec![];
574        for (i, allocator) in table_allocators.iter().enumerate() {
575            allocator.get_allocated_pages(i.try_into().unwrap(), &mut table_pages);
576        }
577        println!("Tables");
578        for p in table_pages {
579            all_allocated.remove(&p);
580            println!("{p:?}");
581        }
582
583        let system_table_allocators = self
584            .system_tables
585            .lock()
586            .unwrap()
587            .table_tree
588            .all_referenced_pages()
589            .unwrap();
590        let mut system_table_pages = vec![];
591        for (i, allocator) in system_table_allocators.iter().enumerate() {
592            allocator.get_allocated_pages(i.try_into().unwrap(), &mut system_table_pages);
593        }
594        println!("System tables");
595        for p in system_table_pages {
596            all_allocated.remove(&p);
597            println!("{p:?}");
598        }
599
600        println!("Free table");
601        if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
602            for p in freed_iter {
603                let p = p.unwrap();
604                all_allocated.remove(&p);
605                println!("{p:?}");
606            }
607        }
608        println!("Pending free (i.e. in freed table)");
609        for entry in self
610            .freed_tree
611            .lock()
612            .unwrap()
613            .range::<RangeFull, FreedTableKey>(&(..))
614            .unwrap()
615        {
616            let entry = entry.unwrap();
617            let value = entry.value();
618            for i in 0..value.len() {
619                let p = value.get(i);
620                all_allocated.remove(&p);
621                println!("{p:?}");
622            }
623        }
624        if !all_allocated.is_empty() {
625            println!("Leaked pages");
626            for p in all_allocated {
627                println!("{p:?}");
628            }
629        }
630    }
631
632    /// Creates a snapshot of the current database state, which can be used to rollback the database.
633    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
634    ///
635    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
636    /// Therefore, the lifetime of a savepoint should be minimized.
637    ///
638    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
639    /// or if the transaction's durability is less than `[Durability::Immediate]`
640    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
641        if self.durability != InternalDurability::Immediate {
642            return Err(SavepointError::InvalidSavepoint);
643        }
644
645        let mut savepoint = self.ephemeral_savepoint()?;
646
647        let mut system_tables = self.system_tables.lock().unwrap();
648
649        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
650        next_table.insert((), savepoint.get_id().next())?;
651        drop(next_table);
652
653        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
654        savepoint_table.insert(
655            savepoint.get_id(),
656            SerializedSavepoint::from_savepoint(&savepoint),
657        )?;
658
659        savepoint.set_persistent();
660
661        self.created_persistent_savepoints
662            .lock()
663            .unwrap()
664            .insert(savepoint.get_id());
665
666        Ok(savepoint.get_id().0)
667    }
668
669    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
670        self.transaction_guard.clone()
671    }
672
673    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
674        let mut system_tables = self.system_tables.lock().unwrap();
675        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
676        let value = next_table.get(())?;
677        if let Some(next_id) = value {
678            Ok(Some(next_id.value()))
679        } else {
680            Ok(None)
681        }
682    }
683
684    /// Get a persistent savepoint given its id
685    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
686        let mut system_tables = self.system_tables.lock().unwrap();
687        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
688        let value = table.get(SavepointId(id))?;
689
690        value
691            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
692            .ok_or(SavepointError::InvalidSavepoint)
693    }
694
695    /// Delete the given persistent savepoint.
696    ///
697    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
698    ///
699    /// Returns `true` if the savepoint existed
700    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
701    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
702        if self.durability != InternalDurability::Immediate {
703            return Err(SavepointError::InvalidSavepoint);
704        }
705        let mut system_tables = self.system_tables.lock().unwrap();
706        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
707        let savepoint = table.remove(SavepointId(id))?;
708        if let Some(serialized) = savepoint {
709            let savepoint = serialized
710                .value()
711                .to_savepoint(self.transaction_tracker.clone());
712            self.deleted_persistent_savepoints
713                .lock()
714                .unwrap()
715                .push((savepoint.get_id(), savepoint.get_transaction_id()));
716            Ok(true)
717        } else {
718            Ok(false)
719        }
720    }
721
722    /// List all persistent savepoints
723    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
724        let mut system_tables = self.system_tables.lock().unwrap();
725        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
726        let mut savepoints = vec![];
727        for savepoint in table.range::<SavepointId>(..)? {
728            savepoints.push(savepoint?.0.value().0);
729        }
730        Ok(savepoints.into_iter())
731    }
732
733    // TODO: deduplicate this with the one in Database
734    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
735        let id = self
736            .transaction_tracker
737            .register_read_transaction(&self.mem)?;
738
739        Ok(TransactionGuard::new_read(
740            id,
741            self.transaction_tracker.clone(),
742        ))
743    }
744
745    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
746        let id = self.transaction_tracker.allocate_savepoint();
747        Ok((id, self.allocate_read_transaction()?.leak()))
748    }
749
750    /// Creates a snapshot of the current database state, which can be used to rollback the database
751    ///
752    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
753    ///
754    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
755    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
756        if self.dirty.load(Ordering::Acquire) {
757            return Err(SavepointError::InvalidSavepoint);
758        }
759
760        let (id, transaction_id) = self.allocate_savepoint()?;
761        #[cfg(feature = "logging")]
762        debug!(
763            "Creating savepoint id={:?}, txn_id={:?}",
764            id, transaction_id
765        );
766
767        let regional_allocators = self.mem.get_raw_allocator_states();
768        let root = self.mem.get_data_root();
769        let system_root = self.mem.get_system_root();
770        let freed_root = self.mem.get_freed_root();
771        let savepoint = Savepoint::new_ephemeral(
772            &self.mem,
773            self.transaction_tracker.clone(),
774            id,
775            transaction_id,
776            root,
777            system_root,
778            freed_root,
779            regional_allocators,
780        );
781
782        Ok(savepoint)
783    }
784
785    /// Restore the state of the database to the given [`Savepoint`]
786    ///
787    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
788    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
789        // Ensure that user does not try to restore a Savepoint that is from a different Database
790        assert_eq!(
791            std::ptr::from_ref(self.transaction_tracker.as_ref()),
792            savepoint.db_address()
793        );
794
795        if !self
796            .transaction_tracker
797            .is_valid_savepoint(savepoint.get_id())
798        {
799            return Err(SavepointError::InvalidSavepoint);
800        }
801        #[cfg(feature = "logging")]
802        debug!(
803            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
804            savepoint.get_id(),
805            self.transaction_id
806        );
807        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
808        // the database
809        assert_eq!(self.mem.get_version(), savepoint.get_version());
810        self.dirty.store(true, Ordering::Release);
811
812        // Restoring a savepoint needs to accomplish the following:
813        // 1) restore the table tree. This is trivial, since we have the old root
814        // 1a) we also filter the freed tree to remove any pages referenced by the old root
815        // 2) free all pages that were allocated since the savepoint and are unreachable
816        //    from the restored table tree root. Here we diff the reachable pages from the old
817        //    and new roots
818        // 3) update the system tree to remove invalid persistent savepoints.
819
820        let old_table_tree = TableTreeMut::new(
821            savepoint.get_user_root(),
822            self.transaction_guard.clone(),
823            self.mem.clone(),
824            self.freed_pages.clone(),
825        );
826        // TODO: traversing these can be very slow in a large database. Speed this up.
827        let current_root_pages = self
828            .tables
829            .lock()
830            .unwrap()
831            .table_tree
832            .all_referenced_pages()?;
833        let old_root_pages = old_table_tree.all_referenced_pages()?;
834
835        // 1) restore the table tree
836        self.tables.lock().unwrap().table_tree = TableTreeMut::new(
837            savepoint.get_user_root(),
838            self.transaction_guard.clone(),
839            self.mem.clone(),
840            self.freed_pages.clone(),
841        );
842
843        // 1a) filter any pages referenced by the old data root to bring them back to the committed state
844        let mut txn_id = savepoint.get_transaction_id().raw_id();
845        let mut freed_tree = self.freed_tree.lock().unwrap();
846        loop {
847            let lower = FreedTableKey {
848                transaction_id: txn_id,
849                pagination_id: 0,
850            };
851
852            if freed_tree.range(&(lower..))?.next().is_none() {
853                break;
854            }
855            let lower = FreedTableKey {
856                transaction_id: txn_id,
857                pagination_id: 0,
858            };
859            let upper = FreedTableKey {
860                transaction_id: txn_id + 1,
861                pagination_id: 0,
862            };
863
864            // Find all the pending pages for this txn and filter them
865            let mut pending_pages = vec![];
866            for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
867                let item = entry?;
868                for i in 0..item.value().len() {
869                    let p = item.value().get(i);
870                    if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) {
871                        pending_pages.push(p);
872                    }
873                }
874            }
875
876            let mut pagination_counter = 0u64;
877            while !pending_pages.is_empty() {
878                let chunk_size = 100;
879                let buffer_size = FreedPageList::required_bytes(chunk_size);
880                let key = FreedTableKey {
881                    transaction_id: txn_id,
882                    pagination_id: pagination_counter,
883                };
884                let mut access_guard =
885                    freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
886
887                let len = pending_pages.len();
888                access_guard.as_mut().clear();
889                for page in pending_pages.drain(len - min(len, chunk_size)..) {
890                    access_guard.as_mut().push_back(page);
891                }
892                drop(access_guard);
893
894                pagination_counter += 1;
895            }
896
897            txn_id += 1;
898        }
899
900        // 2) free all pages that became unreachable
901        let mut freed_pages = self.freed_pages.lock().unwrap();
902        for i in 0..current_root_pages.len() {
903            let mut pages = vec![];
904            current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages);
905            for page in pages {
906                if self.mem.uncommitted(page) {
907                    self.mem.free(page);
908                } else {
909                    freed_pages.push(page);
910                }
911            }
912        }
913        drop(freed_pages);
914
915        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
916        // from later trying to restore a savepoint "on another timeline"
917        self.transaction_tracker
918            .invalidate_savepoints_after(savepoint.get_id());
919        for persistent_savepoint in self.list_persistent_savepoints()? {
920            if persistent_savepoint > savepoint.get_id().0 {
921                self.delete_persistent_savepoint(persistent_savepoint)?;
922            }
923        }
924
925        Ok(())
926    }
927
928    /// Set the desired durability level for writes made in this transaction
929    /// Defaults to [`Durability::Immediate`]
930    ///
931    /// Will panic if the durability is reduced below `[Durability::Immediate]` after a persistent savepoint has been created or deleted.
932    pub fn set_durability(&mut self, durability: Durability) {
933        let no_created = self
934            .created_persistent_savepoints
935            .lock()
936            .unwrap()
937            .is_empty();
938        let no_deleted = self
939            .deleted_persistent_savepoints
940            .lock()
941            .unwrap()
942            .is_empty();
943        assert!(no_created && no_deleted);
944
945        self.durability = match durability {
946            Durability::None => InternalDurability::None,
947            Durability::Eventual => InternalDurability::Eventual,
948            Durability::Immediate => InternalDurability::Immediate,
949            #[allow(deprecated)]
950            Durability::Paranoid => {
951                self.set_two_phase_commit(true);
952                InternalDurability::Immediate
953            }
954        };
955    }
956
957    /// Enable or disable 2-phase commit (defaults to disabled)
958    ///
959    /// By default, data is written using the following 1-phase commit algorithm:
960    ///
961    /// 1. Update the inactive commit slot with the new database state
962    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
963    /// 3. Call `fsync` to ensure all writes have been persisted to disk
964    ///
965    /// All data is written with checksums. When opening the database after a crash, the most
966    /// recent of the two commit slots with a valid checksum is used.
967    ///
968    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
969    /// function with close to perfect collision resistance when used with non-malicious input. An
970    /// attacker with an extremely high degree of control over the database's workload, including
971    /// the ability to cause the database process to crash, can cause invalid data to be written
972    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
973    ///
974    /// Alternatively, you can enable 2-phase commit, which writes data like this:
975    ///
976    /// 1. Update the inactive commit slot with the new database state
977    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
978    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
979    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
980    ///
981    /// This mitigates a theoretical attack where an attacker who
982    /// 1. can control the order in which pages are flushed to disk
983    /// 2. can introduce crashes during `fsync`,
984    /// 3. has knowledge of the database file contents, and
985    /// 4. can include arbitrary data in a write transaction
986    ///
987    /// could cause a transaction to partially commit (some but not all of the data is written).
988    /// This is described in the design doc in futher detail.
989    ///
990    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
991    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
992    /// a high degree of control over the database's workload, including the ability to cause the
993    /// database process to crash, can cause the database to crash with the god byte primary bit
994    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
995    /// controlled state.
996    pub fn set_two_phase_commit(&mut self, enabled: bool) {
997        self.two_phase_commit = enabled;
998    }
999
1000    /// Enable or disable quick-repair (defaults to disabled)
1001    ///
1002    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1003    /// This involves walking the entire database to verify the checksums and reconstruct the
1004    /// allocator state, so it can be very slow if the database is large.
1005    ///
1006    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1007    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1008    /// (which guarantees that the primary commit slot is valid without needing to look at the
1009    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1010    pub fn set_quick_repair(&mut self, enabled: bool) {
1011        self.quick_repair = enabled;
1012    }
1013
1014    /// Open the given table
1015    ///
1016    /// The table will be created if it does not exist
1017    #[track_caller]
1018    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1019        &'txn self,
1020        definition: TableDefinition<K, V>,
1021    ) -> Result<Table<'txn, K, V>, TableError> {
1022        self.tables.lock().unwrap().open_table(self, definition)
1023    }
1024
1025    /// Open the given table
1026    ///
1027    /// The table will be created if it does not exist
1028    #[track_caller]
1029    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1030        &'txn self,
1031        definition: MultimapTableDefinition<K, V>,
1032    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1033        self.tables
1034            .lock()
1035            .unwrap()
1036            .open_multimap_table(self, definition)
1037    }
1038
1039    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1040        &self,
1041        name: &str,
1042        table: &BtreeMut<K, V>,
1043        length: u64,
1044    ) {
1045        self.tables.lock().unwrap().close_table(name, table, length);
1046    }
1047
1048    /// Delete the given table
1049    ///
1050    /// Returns a bool indicating whether the table existed
1051    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1052        let name = definition.name().to_string();
1053        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1054        drop(definition);
1055        self.tables.lock().unwrap().delete_table(self, &name)
1056    }
1057
1058    /// Delete the given table
1059    ///
1060    /// Returns a bool indicating whether the table existed
1061    pub fn delete_multimap_table(
1062        &self,
1063        definition: impl MultimapTableHandle,
1064    ) -> Result<bool, TableError> {
1065        let name = definition.name().to_string();
1066        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1067        drop(definition);
1068        self.tables
1069            .lock()
1070            .unwrap()
1071            .delete_multimap_table(self, &name)
1072    }
1073
1074    /// List all the tables
1075    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1076        self.tables
1077            .lock()
1078            .unwrap()
1079            .table_tree
1080            .list_tables(TableType::Normal)
1081            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1082    }
1083
1084    /// List all the multimap tables
1085    pub fn list_multimap_tables(
1086        &self,
1087    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1088        self.tables
1089            .lock()
1090            .unwrap()
1091            .table_tree
1092            .list_tables(TableType::Multimap)
1093            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1094    }
1095
1096    /// Commit the transaction
1097    ///
1098    /// All writes performed in this transaction will be visible to future transactions, and are
1099    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1100    pub fn commit(mut self) -> Result<(), CommitError> {
1101        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1102        self.completed = true;
1103        self.commit_inner()
1104    }
1105
1106    fn commit_inner(&mut self) -> Result<(), CommitError> {
1107        // Quick-repair requires 2-phase commit
1108        if self.quick_repair {
1109            self.two_phase_commit = true;
1110        }
1111
1112        #[cfg(feature = "logging")]
1113        debug!(
1114            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1115            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1116        );
1117        match self.durability {
1118            InternalDurability::None => self.non_durable_commit()?,
1119            InternalDurability::Eventual => self.durable_commit(true)?,
1120            InternalDurability::Immediate => self.durable_commit(false)?,
1121        }
1122
1123        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1124            self.transaction_tracker
1125                .deallocate_savepoint(*savepoint, *transaction);
1126        }
1127
1128        #[cfg(feature = "logging")]
1129        debug!(
1130            "Finished commit of transaction id={:?}",
1131            self.transaction_id
1132        );
1133
1134        Ok(())
1135    }
1136
1137    /// Abort the transaction
1138    ///
1139    /// All writes performed in this transaction will be rolled back
1140    pub fn abort(mut self) -> Result {
1141        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1142        self.completed = true;
1143        self.abort_inner()
1144    }
1145
1146    fn abort_inner(&mut self) -> Result {
1147        #[cfg(feature = "logging")]
1148        debug!("Aborting transaction id={:?}", self.transaction_id);
1149        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1150            match self.delete_persistent_savepoint(savepoint.0) {
1151                Ok(_) => {}
1152                Err(err) => match err {
1153                    SavepointError::InvalidSavepoint => {
1154                        unreachable!();
1155                    }
1156                    SavepointError::Storage(storage_err) => {
1157                        return Err(storage_err);
1158                    }
1159                },
1160            }
1161        }
1162        self.tables
1163            .lock()
1164            .unwrap()
1165            .table_tree
1166            .clear_table_root_updates();
1167        self.mem.rollback_uncommitted_writes()?;
1168        #[cfg(feature = "logging")]
1169        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1170        Ok(())
1171    }
1172
1173    pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1174        let free_until_transaction = self
1175            .transaction_tracker
1176            .oldest_live_read_transaction()
1177            .map_or(self.transaction_id, |x| x.next());
1178        self.process_freed_pages(free_until_transaction)?;
1179
1180        let user_root = self
1181            .tables
1182            .lock()
1183            .unwrap()
1184            .table_tree
1185            .flush_table_root_updates()?
1186            .finalize_dirty_checksums()?;
1187
1188        let mut system_tables = self.system_tables.lock().unwrap();
1189        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1190        system_tree
1191            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1192            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1193
1194        if self.quick_repair {
1195            system_tree.create_table_and_flush_table_root(
1196                ALLOCATOR_STATE_TABLE_NAME,
1197                |tree: &mut AllocatorStateTree| {
1198                    let mut pagination_counter = 0;
1199
1200                    loop {
1201                        let num_regions = self
1202                            .mem
1203                            .reserve_allocator_state(tree, self.transaction_id)?;
1204
1205                        // We can't free pages after the commit, because that would invalidate our
1206                        // saved allocator state. Everything needs to go through the transactional
1207                        // free mechanism
1208                        self.store_freed_pages(&mut pagination_counter, true)?;
1209
1210                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1211                            return Ok(());
1212                        }
1213
1214                        // Clear out the table before retrying, just in case the number of regions
1215                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1216                        // free the pages immediately -- we need to reuse those pages to guarantee
1217                        // that our retry loop will eventually terminate
1218                        while let Some(guards) = tree.last()? {
1219                            let key = guards.0.value();
1220                            drop(guards);
1221                            tree.remove(&key)?;
1222                        }
1223                    }
1224                },
1225            )?;
1226        } else {
1227            // If a savepoint exists it might reference the freed-tree, since it holds a reference to the
1228            // root of the freed-tree. Therefore, we must use the transactional free mechanism to free
1229            // those pages. If there are no save points then these can be immediately freed, which is
1230            // done at the end of this function.
1231            let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1232            self.store_freed_pages(&mut 0, savepoint_exists)?;
1233        }
1234
1235        let system_root = system_tree.finalize_dirty_checksums()?;
1236
1237        // Finalize freed table checksums, before doing the final commit
1238        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1239
1240        self.mem.commit(
1241            user_root,
1242            system_root,
1243            freed_root,
1244            self.transaction_id,
1245            eventual,
1246            self.two_phase_commit,
1247        )?;
1248
1249        // Mark any pending non-durable commits as fully committed.
1250        self.transaction_tracker.clear_pending_non_durable_commits();
1251
1252        // Immediately free the pages that were freed from the freed-tree itself. These are only
1253        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
1254        for page in self.post_commit_frees.lock().unwrap().drain(..) {
1255            self.mem.free(page);
1256        }
1257
1258        Ok(())
1259    }
1260
1261    // Commit without a durability guarantee
1262    pub(crate) fn non_durable_commit(&mut self) -> Result {
1263        let user_root = self
1264            .tables
1265            .lock()
1266            .unwrap()
1267            .table_tree
1268            .flush_table_root_updates()?
1269            .finalize_dirty_checksums()?;
1270
1271        let system_root = self
1272            .system_tables
1273            .lock()
1274            .unwrap()
1275            .table_tree
1276            .flush_table_root_updates()?
1277            .finalize_dirty_checksums()?;
1278
1279        // Store all freed pages for a future commit(), since we can't free pages during a
1280        // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
1281        self.store_freed_pages(&mut 0, true)?;
1282
1283        // Finalize all checksums, before doing the final commit
1284        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1285
1286        self.mem
1287            .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1288        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
1289        // are only processed after this has been persisted
1290        self.transaction_tracker
1291            .register_non_durable_commit(self.transaction_id);
1292        Ok(())
1293    }
1294
1295    // Relocate pages to lower number regions/pages
1296    // Returns true if a page(s) was moved
1297    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1298        let mut progress = false;
1299        // Relocate the region tracker page
1300        if self.mem.relocate_region_tracker()? {
1301            progress = true;
1302        }
1303
1304        // Find the 1M highest pages
1305        let mut highest_pages = BTreeMap::new();
1306        let mut tables = self.tables.lock().unwrap();
1307        let table_tree = &mut tables.table_tree;
1308        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1309        let mut system_tables = self.system_tables.lock().unwrap();
1310        let system_table_tree = &mut system_tables.table_tree;
1311        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1312
1313        // Calculate how many of them can be relocated to lower pages, starting from the last page
1314        let mut relocation_map = HashMap::new();
1315        for path in highest_pages.into_values().rev() {
1316            if relocation_map.contains_key(&path.page_number()) {
1317                continue;
1318            }
1319            let old_page = self.mem.get_page(path.page_number())?;
1320            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1321            let new_page_number = new_page.get_page_number();
1322            // We have to copy at least the page type into the new page.
1323            // Otherwise its cache priority will be calculated incorrectly
1324            new_page.memory_mut()[0] = old_page.memory()[0];
1325            drop(new_page);
1326            // We're able to move this to a lower page, so insert it and rewrite all its parents
1327            if new_page_number < path.page_number() {
1328                relocation_map.insert(path.page_number(), new_page_number);
1329                for parent in path.parents() {
1330                    if relocation_map.contains_key(parent) {
1331                        continue;
1332                    }
1333                    let old_parent = self.mem.get_page(*parent)?;
1334                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1335                    let new_page_number = new_page.get_page_number();
1336                    // We have to copy at least the page type into the new page.
1337                    // Otherwise its cache priority will be calculated incorrectly
1338                    new_page.memory_mut()[0] = old_parent.memory()[0];
1339                    drop(new_page);
1340                    relocation_map.insert(*parent, new_page_number);
1341                }
1342            } else {
1343                self.mem.free(new_page_number);
1344                break;
1345            }
1346        }
1347
1348        if !relocation_map.is_empty() {
1349            progress = true;
1350        }
1351
1352        table_tree.relocate_tables(&relocation_map)?;
1353        system_table_tree.relocate_tables(&relocation_map)?;
1354
1355        Ok(progress)
1356    }
1357
1358    // NOTE: must be called before store_freed_pages() during commit, since this can create
1359    // more pages freed by the current transaction
1360    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1361        // We assume below that PageNumber is length 8
1362        assert_eq!(PageNumber::serialized_size(), 8);
1363        let lookup_key = FreedTableKey {
1364            transaction_id: free_until.raw_id(),
1365            pagination_id: 0,
1366        };
1367
1368        let mut to_remove = vec![];
1369        let mut freed_tree = self.freed_tree.lock().unwrap();
1370        for entry in freed_tree.range(&(..lookup_key))? {
1371            let entry = entry?;
1372            to_remove.push(entry.key());
1373            let value = entry.value();
1374            for i in 0..value.len() {
1375                self.mem.free(value.get(i));
1376            }
1377        }
1378
1379        // Remove all the old transactions
1380        for key in to_remove {
1381            freed_tree.remove(&key)?;
1382        }
1383
1384        Ok(())
1385    }
1386
1387    fn store_freed_pages(
1388        &self,
1389        pagination_counter: &mut u64,
1390        include_post_commit_free: bool,
1391    ) -> Result {
1392        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
1393
1394        let mut freed_tree = self.freed_tree.lock().unwrap();
1395        if include_post_commit_free {
1396            // Move all the post-commit pages that came from the freed-tree. These need to be stored
1397            // since we can't free pages until a durable commit
1398            self.freed_pages
1399                .lock()
1400                .unwrap()
1401                .extend(self.post_commit_frees.lock().unwrap().drain(..));
1402        }
1403        while !self.freed_pages.lock().unwrap().is_empty() {
1404            let chunk_size = 100;
1405            let buffer_size = FreedPageList::required_bytes(chunk_size);
1406            let key = FreedTableKey {
1407                transaction_id: self.transaction_id.raw_id(),
1408                pagination_id: *pagination_counter,
1409            };
1410            let mut access_guard =
1411                freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1412
1413            let mut freed_pages = self.freed_pages.lock().unwrap();
1414            let len = freed_pages.len();
1415            access_guard.as_mut().clear();
1416            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1417                access_guard.as_mut().push_back(page);
1418            }
1419            drop(access_guard);
1420
1421            *pagination_counter += 1;
1422
1423            if include_post_commit_free {
1424                // Move all the post-commit pages that came from the freed-tree. These need to be stored
1425                // since we can't free pages until a durable commit
1426                freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1427            }
1428        }
1429
1430        Ok(())
1431    }
1432
1433    /// Retrieves information about storage usage in the database
1434    pub fn stats(&self) -> Result<DatabaseStats> {
1435        let tables = self.tables.lock().unwrap();
1436        let table_tree = &tables.table_tree;
1437        let data_tree_stats = table_tree.stats()?;
1438
1439        let system_tables = self.system_tables.lock().unwrap();
1440        let system_table_tree = &system_tables.table_tree;
1441        let system_tree_stats = system_table_tree.stats()?;
1442
1443        let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1444
1445        let total_metadata_bytes = data_tree_stats.metadata_bytes()
1446            + system_tree_stats.metadata_bytes
1447            + system_tree_stats.stored_leaf_bytes
1448            + freed_tree_stats.metadata_bytes
1449            + freed_tree_stats.stored_leaf_bytes;
1450        let total_fragmented = data_tree_stats.fragmented_bytes()
1451            + system_tree_stats.fragmented_bytes
1452            + freed_tree_stats.fragmented_bytes;
1453
1454        Ok(DatabaseStats {
1455            tree_height: data_tree_stats.tree_height(),
1456            allocated_pages: self.mem.count_allocated_pages()?,
1457            leaf_pages: data_tree_stats.leaf_pages(),
1458            branch_pages: data_tree_stats.branch_pages(),
1459            stored_leaf_bytes: data_tree_stats.stored_bytes(),
1460            metadata_bytes: total_metadata_bytes,
1461            fragmented_bytes: total_fragmented,
1462            page_size: self.mem.get_page_size(),
1463        })
1464    }
1465
1466    #[cfg(any(test, fuzzing))]
1467    pub fn num_region_tracker_pages(&self) -> u64 {
1468        1 << self.mem.tracker_page().page_order
1469    }
1470
1471    #[allow(dead_code)]
1472    pub(crate) fn print_debug(&self) -> Result {
1473        // Flush any pending updates to make sure we get the latest root
1474        let mut tables = self.tables.lock().unwrap();
1475        if let Some(page) = tables
1476            .table_tree
1477            .flush_table_root_updates()
1478            .unwrap()
1479            .finalize_dirty_checksums()
1480            .unwrap()
1481        {
1482            eprintln!("Master tree:");
1483            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1484                Some(page),
1485                PageHint::None,
1486                self.transaction_guard.clone(),
1487                self.mem.clone(),
1488            )?;
1489            master_tree.print_debug(true)?;
1490        }
1491
1492        Ok(())
1493    }
1494}
1495
1496impl Drop for WriteTransaction {
1497    fn drop(&mut self) {
1498        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1499            #[allow(unused_variables)]
1500            if let Err(error) = self.abort_inner() {
1501                #[cfg(feature = "logging")]
1502                warn!("Failure automatically aborting transaction: {}", error);
1503            }
1504        }
1505    }
1506}
1507
1508/// A read-only transaction
1509///
1510/// Read-only transactions may exist concurrently with writes
1511pub struct ReadTransaction {
1512    mem: Arc<TransactionalMemory>,
1513    tree: TableTree,
1514}
1515
1516impl ReadTransaction {
1517    pub(crate) fn new(
1518        mem: Arc<TransactionalMemory>,
1519        guard: TransactionGuard,
1520    ) -> Result<Self, TransactionError> {
1521        let root_page = mem.get_data_root();
1522        let guard = Arc::new(guard);
1523        Ok(Self {
1524            mem: mem.clone(),
1525            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1526                .map_err(TransactionError::Storage)?,
1527        })
1528    }
1529
1530    /// Open the given table
1531    pub fn open_table<K: Key + 'static, V: Value + 'static>(
1532        &self,
1533        definition: TableDefinition<K, V>,
1534    ) -> Result<ReadOnlyTable<K, V>, TableError> {
1535        let header = self
1536            .tree
1537            .get_table::<K, V>(definition.name(), TableType::Normal)?
1538            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1539
1540        match header {
1541            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1542                definition.name().to_string(),
1543                table_root,
1544                PageHint::Clean,
1545                self.tree.transaction_guard().clone(),
1546                self.mem.clone(),
1547            )?),
1548            InternalTableDefinition::Multimap { .. } => unreachable!(),
1549        }
1550    }
1551
1552    /// Open the given table without a type
1553    pub fn open_untyped_table(
1554        &self,
1555        handle: impl TableHandle,
1556    ) -> Result<ReadOnlyUntypedTable, TableError> {
1557        let header = self
1558            .tree
1559            .get_table_untyped(handle.name(), TableType::Normal)?
1560            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1561
1562        match header {
1563            InternalTableDefinition::Normal {
1564                table_root,
1565                fixed_key_size,
1566                fixed_value_size,
1567                ..
1568            } => Ok(ReadOnlyUntypedTable::new(
1569                table_root,
1570                fixed_key_size,
1571                fixed_value_size,
1572                self.mem.clone(),
1573            )),
1574            InternalTableDefinition::Multimap { .. } => unreachable!(),
1575        }
1576    }
1577
1578    /// Open the given table
1579    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1580        &self,
1581        definition: MultimapTableDefinition<K, V>,
1582    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1583        let header = self
1584            .tree
1585            .get_table::<K, V>(definition.name(), TableType::Multimap)?
1586            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1587
1588        match header {
1589            InternalTableDefinition::Normal { .. } => unreachable!(),
1590            InternalTableDefinition::Multimap {
1591                table_root,
1592                table_length,
1593                ..
1594            } => Ok(ReadOnlyMultimapTable::new(
1595                table_root,
1596                table_length,
1597                PageHint::Clean,
1598                self.tree.transaction_guard().clone(),
1599                self.mem.clone(),
1600            )?),
1601        }
1602    }
1603
1604    /// Open the given table without a type
1605    pub fn open_untyped_multimap_table(
1606        &self,
1607        handle: impl MultimapTableHandle,
1608    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1609        let header = self
1610            .tree
1611            .get_table_untyped(handle.name(), TableType::Multimap)?
1612            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1613
1614        match header {
1615            InternalTableDefinition::Normal { .. } => unreachable!(),
1616            InternalTableDefinition::Multimap {
1617                table_root,
1618                table_length,
1619                fixed_key_size,
1620                fixed_value_size,
1621                ..
1622            } => Ok(ReadOnlyUntypedMultimapTable::new(
1623                table_root,
1624                table_length,
1625                fixed_key_size,
1626                fixed_value_size,
1627                self.mem.clone(),
1628            )),
1629        }
1630    }
1631
1632    /// List all the tables
1633    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1634        self.tree
1635            .list_tables(TableType::Normal)
1636            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1637    }
1638
1639    /// List all the multimap tables
1640    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1641        self.tree
1642            .list_tables(TableType::Multimap)
1643            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1644    }
1645
1646    /// Close the transaction
1647    ///
1648    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
1649    /// so this method does not normally need to be called.
1650    /// This method can be used to ensure that there are no outstanding objects remaining.
1651    ///
1652    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
1653    pub fn close(self) -> Result<(), TransactionError> {
1654        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1655            return Err(TransactionError::ReadTransactionStillInUse(self));
1656        }
1657        // No-op, just drop ourself
1658        Ok(())
1659    }
1660}
1661
1662impl Debug for ReadTransaction {
1663    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1664        f.write_str("ReadTransaction")
1665    }
1666}
1667
1668#[cfg(test)]
1669mod test {
1670    use crate::{Database, TableDefinition};
1671
1672    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1673
1674    #[test]
1675    fn transaction_id_persistence() {
1676        let tmpfile = crate::create_tempfile();
1677        let db = Database::create(tmpfile.path()).unwrap();
1678        let write_txn = db.begin_write().unwrap();
1679        {
1680            let mut table = write_txn.open_table(X).unwrap();
1681            table.insert("hello", "world").unwrap();
1682        }
1683        let first_txn_id = write_txn.transaction_id;
1684        write_txn.commit().unwrap();
1685        drop(db);
1686
1687        let db2 = Database::create(tmpfile.path()).unwrap();
1688        let write_txn = db2.begin_write().unwrap();
1689        assert!(write_txn.transaction_id > first_txn_id);
1690    }
1691}