redb/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9    InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageListMut,
10    PageNumber, PageTrackerPolicy, SerializedSavepoint, TableTree, TableTreeMut, TableType,
11    TransactionalMemory,
12};
13use crate::types::{Key, Value};
14use crate::{
15    AccessGuard, AccessGuardMut, ExtractIf, MultimapTable, MultimapTableDefinition,
16    MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
17    Savepoint, SavepointError, StorageError, Table, TableDefinition, TableError, TableHandle,
18    TransactionError, TypeName, UntypedMultimapTableHandle, UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::ops::RangeFull;
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::{Arc, Mutex};
32use std::{panic, thread};
33
34const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
35const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
36    SystemTableDefinition::new("next_savepoint_id");
37pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
38    SystemTableDefinition::new("persistent_savepoints");
39// Pages that were allocated in the data tree by a given transaction. Only updated when a savepoint
40// exists
41pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
42    TransactionIdWithPagination,
43    PageList,
44> = SystemTableDefinition::new("data_pages_allocated");
45// Pages in the data tree that are in the pending free state: i.e., they are unreachable from the
46// root as of the given transaction.
47pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
48    SystemTableDefinition::new("data_pages_unreachable");
49// Pages in the system tree that are in the pending free state: i.e., they are unreachable from the
50// root as of the given transaction.
51pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
52    SystemTableDefinition::new("system_pages_unreachable");
53// The allocator state table is stored in the system table tree, but it's accessed using
54// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
55pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
56pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59// Format:
60// 2 bytes: length
61// length * size_of(PageNumber): array of page numbers
62#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64    data: &'a [u8],
65}
66
67impl PageList<'_> {
68    fn required_bytes(len: usize) -> usize {
69        2 + PageNumber::serialized_size() * len
70    }
71
72    pub(crate) fn len(&self) -> usize {
73        u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74    }
75
76    pub(crate) fn get(&self, index: usize) -> PageNumber {
77        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78        PageNumber::from_le_bytes(
79            self.data[start..(start + PageNumber::serialized_size())]
80                .try_into()
81                .unwrap(),
82        )
83    }
84}
85
86impl Value for PageList<'_> {
87    type SelfType<'a>
88        = PageList<'a>
89    where
90        Self: 'a;
91    type AsBytes<'a>
92        = &'a [u8]
93    where
94        Self: 'a;
95
96    fn fixed_width() -> Option<usize> {
97        None
98    }
99
100    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101    where
102        Self: 'a,
103    {
104        PageList { data }
105    }
106
107    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108    where
109        Self: 'b,
110    {
111        value.data
112    }
113
114    fn type_name() -> TypeName {
115        TypeName::internal("redb::PageList")
116    }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120    type BaseRefType = PageListMut;
121
122    fn initialize(data: &mut [u8]) {
123        assert!(data.len() >= 8);
124        // Set the length to zero
125        data[..8].fill(0);
126    }
127
128    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129        unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130    }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135    pub(crate) transaction_id: u64,
136    pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140    type SelfType<'a>
141        = TransactionIdWithPagination
142    where
143        Self: 'a;
144    type AsBytes<'a>
145        = [u8; 2 * size_of::<u64>()]
146    where
147        Self: 'a;
148
149    fn fixed_width() -> Option<usize> {
150        Some(2 * size_of::<u64>())
151    }
152
153    fn from_bytes<'a>(data: &'a [u8]) -> Self
154    where
155        Self: 'a,
156    {
157        let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158        let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159        Self {
160            transaction_id,
161            pagination_id,
162        }
163    }
164
165    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166    where
167        Self: 'b,
168    {
169        let mut result = [0u8; 2 * size_of::<u64>()];
170        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172        result
173    }
174
175    fn type_name() -> TypeName {
176        TypeName::internal("redb::TransactionIdWithPagination")
177    }
178}
179
180impl Key for TransactionIdWithPagination {
181    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182        let value1 = Self::from_bytes(data1);
183        let value2 = Self::from_bytes(data2);
184
185        match value1.transaction_id.cmp(&value2.transaction_id) {
186            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187            std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189        }
190    }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195    Region(u32),
196    RegionTracker,
197    TransactionId,
198}
199
200impl Value for AllocatorStateKey {
201    type SelfType<'a> = Self;
202    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
203
204    fn fixed_width() -> Option<usize> {
205        Some(1 + size_of::<u32>())
206    }
207
208    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
209    where
210        Self: 'a,
211    {
212        match data[0] {
213            0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
214            1 => Self::RegionTracker,
215            2 => Self::TransactionId,
216            _ => unreachable!(),
217        }
218    }
219
220    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
221    where
222        Self: 'a,
223        Self: 'b,
224    {
225        let mut result = Self::AsBytes::default();
226        match value {
227            Self::Region(region) => {
228                result[0] = 0;
229                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
230            }
231            Self::RegionTracker => {
232                result[0] = 1;
233            }
234            Self::TransactionId => {
235                result[0] = 2;
236            }
237        }
238
239        result
240    }
241
242    fn type_name() -> TypeName {
243        TypeName::internal("redb::AllocatorStateKey")
244    }
245}
246
247impl Key for AllocatorStateKey {
248    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
249        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
250    }
251}
252
253pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
254    name: &'a str,
255    _key_type: PhantomData<K>,
256    _value_type: PhantomData<V>,
257}
258
259impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
260    pub const fn new(name: &'a str) -> Self {
261        assert!(!name.is_empty());
262        Self {
263            name,
264            _key_type: PhantomData,
265            _value_type: PhantomData,
266        }
267    }
268}
269
270impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
271    fn name(&self) -> &str {
272        self.name
273    }
274}
275
276impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
277
278impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
279    fn clone(&self) -> Self {
280        *self
281    }
282}
283
284impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
285
286impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
287    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
288        write!(
289            f,
290            "{}<{}, {}>",
291            self.name,
292            K::type_name().name(),
293            V::type_name().name()
294        )
295    }
296}
297
298/// Informational storage stats about the database
299#[derive(Debug)]
300pub struct DatabaseStats {
301    pub(crate) tree_height: u32,
302    pub(crate) allocated_pages: u64,
303    pub(crate) leaf_pages: u64,
304    pub(crate) branch_pages: u64,
305    pub(crate) stored_leaf_bytes: u64,
306    pub(crate) metadata_bytes: u64,
307    pub(crate) fragmented_bytes: u64,
308    pub(crate) page_size: usize,
309}
310
311impl DatabaseStats {
312    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
313    pub fn tree_height(&self) -> u32 {
314        self.tree_height
315    }
316
317    /// Number of pages allocated
318    pub fn allocated_pages(&self) -> u64 {
319        self.allocated_pages
320    }
321
322    /// Number of leaf pages that store user data
323    pub fn leaf_pages(&self) -> u64 {
324        self.leaf_pages
325    }
326
327    /// Number of branch pages in btrees that store user data
328    pub fn branch_pages(&self) -> u64 {
329        self.branch_pages
330    }
331
332    /// Number of bytes consumed by keys and values that have been inserted.
333    /// Does not include indexing overhead
334    pub fn stored_bytes(&self) -> u64 {
335        self.stored_leaf_bytes
336    }
337
338    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
339    pub fn metadata_bytes(&self) -> u64 {
340        self.metadata_bytes
341    }
342
343    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
344    pub fn fragmented_bytes(&self) -> u64 {
345        self.fragmented_bytes
346    }
347
348    /// Number of bytes per page
349    pub fn page_size(&self) -> usize {
350        self.page_size
351    }
352}
353
354#[derive(Copy, Clone, Debug)]
355#[non_exhaustive]
356pub enum Durability {
357    /// Commits with this durability level will not be persisted to disk unless followed by a
358    /// commit with a higher durability level.
359    ///
360    /// Note: Pages are only freed during commits with higher durability levels. Exclusively using
361    /// this durability level will result in rapid growth of the database file.
362    None,
363    /// Commits with this durability level have been queued for persitance to disk, and should be
364    /// persistent some time after [`WriteTransaction::commit`] returns.
365    Eventual,
366    /// Commits with this durability level are guaranteed to be persistent as soon as
367    /// [`WriteTransaction::commit`] returns.
368    Immediate,
369    /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code
370    /// should call `set_two_phase_commit(true)` directly instead.
371    #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
372    Paranoid,
373}
374
375// These are the actual durability levels used internally. `Durability::Paranoid` is translated
376// to `InternalDurability::Immediate`, and also enables 2-phase commit
377#[derive(Copy, Clone, Debug, PartialEq, Eq)]
378enum InternalDurability {
379    None,
380    Eventual,
381    Immediate,
382}
383
384// Like a Table but only one may be open at a time to avoid possible races
385pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
386    name: String,
387    namespace: &'s mut SystemNamespace<'db>,
388    tree: BtreeMut<'s, K, V>,
389    transaction_guard: Arc<TransactionGuard>,
390}
391
392impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
393    fn new(
394        name: &str,
395        table_root: Option<BtreeHeader>,
396        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
397        guard: Arc<TransactionGuard>,
398        mem: Arc<TransactionalMemory>,
399        namespace: &'s mut SystemNamespace<'db>,
400    ) -> SystemTable<'db, 's, K, V> {
401        // No need to track allocations in the system tree. Savepoint restoration only relies on
402        // freeing in the data tree
403        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
404        SystemTable {
405            name: name.to_string(),
406            namespace,
407            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
408            transaction_guard: guard,
409        }
410    }
411
412    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
413    where
414        K: 'a,
415    {
416        self.tree.get(key.borrow())
417    }
418
419    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
420    where
421        K: 'a,
422        KR: Borrow<K::SelfType<'a>> + 'a,
423    {
424        self.tree
425            .range(&range)
426            .map(|x| Range::new(x, self.transaction_guard.clone()))
427    }
428
429    pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
430        &mut self,
431        range: impl RangeBounds<KR> + 'a,
432        predicate: F,
433    ) -> Result<ExtractIf<K, V, F>>
434    where
435        KR: Borrow<K::SelfType<'a>> + 'a,
436    {
437        self.tree
438            .extract_from_if(&range, predicate)
439            .map(ExtractIf::new)
440    }
441
442    pub fn insert<'k, 'v>(
443        &mut self,
444        key: impl Borrow<K::SelfType<'k>>,
445        value: impl Borrow<V::SelfType<'v>>,
446    ) -> Result<Option<AccessGuard<V>>> {
447        let value_len = V::as_bytes(value.borrow()).as_ref().len();
448        if value_len > MAX_VALUE_LENGTH {
449            return Err(StorageError::ValueTooLarge(value_len));
450        }
451        let key_len = K::as_bytes(key.borrow()).as_ref().len();
452        if key_len > MAX_VALUE_LENGTH {
453            return Err(StorageError::ValueTooLarge(key_len));
454        }
455        if value_len + key_len > MAX_PAIR_LENGTH {
456            return Err(StorageError::ValueTooLarge(value_len + key_len));
457        }
458        self.tree.insert(key.borrow(), value.borrow())
459    }
460
461    pub fn remove<'a>(
462        &mut self,
463        key: impl Borrow<K::SelfType<'a>>,
464    ) -> Result<Option<AccessGuard<V>>>
465    where
466        K: 'a,
467    {
468        self.tree.remove(key.borrow())
469    }
470}
471
472impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
473    pub fn insert_reserve<'a>(
474        &mut self,
475        key: impl Borrow<K::SelfType<'a>>,
476        value_length: u32,
477    ) -> Result<AccessGuardMut<V>> {
478        if value_length as usize > MAX_VALUE_LENGTH {
479            return Err(StorageError::ValueTooLarge(value_length as usize));
480        }
481        let key_len = K::as_bytes(key.borrow()).as_ref().len();
482        if key_len > MAX_VALUE_LENGTH {
483            return Err(StorageError::ValueTooLarge(key_len));
484        }
485        if value_length as usize + key_len > MAX_PAIR_LENGTH {
486            return Err(StorageError::ValueTooLarge(value_length as usize + key_len));
487        }
488        self.tree.insert_reserve(key.borrow(), value_length)
489    }
490}
491
492impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
493    fn drop(&mut self) {
494        self.namespace.close_table(
495            &self.name,
496            &self.tree,
497            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
498        );
499    }
500}
501
502struct SystemNamespace<'db> {
503    table_tree: TableTreeMut<'db>,
504    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
505    transaction_guard: Arc<TransactionGuard>,
506}
507
508impl<'db> SystemNamespace<'db> {
509    fn new(
510        root_page: Option<BtreeHeader>,
511        guard: Arc<TransactionGuard>,
512        mem: Arc<TransactionalMemory>,
513    ) -> Self {
514        // No need to track allocations in the system tree. Savepoint restoration only relies on
515        // freeing in the data tree
516        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
517        let freed_pages = Arc::new(Mutex::new(vec![]));
518        Self {
519            table_tree: TableTreeMut::new(
520                root_page,
521                guard.clone(),
522                mem,
523                freed_pages.clone(),
524                ignore,
525            ),
526            freed_pages,
527            transaction_guard: guard.clone(),
528        }
529    }
530
531    fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
532        self.freed_pages.clone()
533    }
534
535    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
536        &'s mut self,
537        transaction: &'txn WriteTransaction,
538        definition: SystemTableDefinition<K, V>,
539    ) -> Result<SystemTable<'db, 's, K, V>> {
540        #[cfg(feature = "logging")]
541        debug!("Opening system table: {}", definition);
542        let (root, _) = self
543            .table_tree
544            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
545            .map_err(|e| {
546                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
547            })?;
548        transaction.dirty.store(true, Ordering::Release);
549
550        Ok(SystemTable::new(
551            definition.name(),
552            root,
553            self.freed_pages.clone(),
554            self.transaction_guard.clone(),
555            transaction.mem.clone(),
556            self,
557        ))
558    }
559
560    fn close_table<K: Key + 'static, V: Value + 'static>(
561        &mut self,
562        name: &str,
563        table: &BtreeMut<K, V>,
564        length: u64,
565    ) {
566        self.table_tree
567            .stage_update_table_root(name, table.get_root(), length);
568    }
569}
570
571struct TableNamespace<'db> {
572    open_tables: HashMap<String, &'static panic::Location<'static>>,
573    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
574    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
575    table_tree: TableTreeMut<'db>,
576}
577
578impl TableNamespace<'_> {
579    fn new(
580        root_page: Option<BtreeHeader>,
581        guard: Arc<TransactionGuard>,
582        mem: Arc<TransactionalMemory>,
583    ) -> Self {
584        let allocated = if mem.file_format_v3() {
585            Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()))
586        } else {
587            Arc::new(Mutex::new(PageTrackerPolicy::Ignore))
588        };
589        let freed_pages = Arc::new(Mutex::new(vec![]));
590        let table_tree = TableTreeMut::new(
591            root_page,
592            guard,
593            mem,
594            // Committed pages which are no longer reachable and will be queued for free'ing
595            // These are separated from the system freed pages
596            freed_pages.clone(),
597            allocated.clone(),
598        );
599        Self {
600            open_tables: Default::default(),
601            table_tree,
602            freed_pages,
603            allocated_pages: allocated,
604        }
605    }
606
607    fn set_dirty(&mut self, transaction: &WriteTransaction) {
608        transaction.dirty.store(true, Ordering::Release);
609        if !transaction.transaction_tracker.any_savepoint_exists() {
610            // No savepoints exist, and we don't allow savepoints to be created in a dirty transaction
611            // so we can disable allocation tracking now
612            *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
613        }
614    }
615
616    fn set_root(&mut self, root: Option<BtreeHeader>) {
617        assert!(self.open_tables.is_empty());
618        self.table_tree.set_root(root);
619    }
620
621    #[track_caller]
622    fn inner_open<K: Key + 'static, V: Value + 'static>(
623        &mut self,
624        name: &str,
625        table_type: TableType,
626    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
627        if let Some(location) = self.open_tables.get(name) {
628            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
629        }
630
631        let root = self
632            .table_tree
633            .get_or_create_table::<K, V>(name, table_type)?;
634        self.open_tables
635            .insert(name.to_string(), panic::Location::caller());
636
637        Ok(root)
638    }
639
640    #[track_caller]
641    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
642        &mut self,
643        transaction: &'txn WriteTransaction,
644        definition: MultimapTableDefinition<K, V>,
645    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
646        #[cfg(feature = "logging")]
647        debug!("Opening multimap table: {}", definition);
648        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
649        self.set_dirty(transaction);
650
651        Ok(MultimapTable::new(
652            definition.name(),
653            root,
654            length,
655            self.freed_pages.clone(),
656            self.allocated_pages.clone(),
657            transaction.mem.clone(),
658            transaction,
659        ))
660    }
661
662    #[track_caller]
663    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
664        &mut self,
665        transaction: &'txn WriteTransaction,
666        definition: TableDefinition<K, V>,
667    ) -> Result<Table<'txn, K, V>, TableError> {
668        #[cfg(feature = "logging")]
669        debug!("Opening table: {}", definition);
670        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
671        self.set_dirty(transaction);
672
673        Ok(Table::new(
674            definition.name(),
675            root,
676            self.freed_pages.clone(),
677            self.allocated_pages.clone(),
678            transaction.mem.clone(),
679            transaction,
680        ))
681    }
682
683    #[track_caller]
684    fn inner_rename(
685        &mut self,
686        name: &str,
687        new_name: &str,
688        table_type: TableType,
689    ) -> Result<(), TableError> {
690        if let Some(location) = self.open_tables.get(name) {
691            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
692        }
693
694        self.table_tree.rename_table(name, new_name, table_type)
695    }
696
697    #[track_caller]
698    fn rename_table(
699        &mut self,
700        transaction: &WriteTransaction,
701        name: &str,
702        new_name: &str,
703    ) -> Result<(), TableError> {
704        #[cfg(feature = "logging")]
705        debug!("Renaming table: {} to {}", name, new_name);
706        self.set_dirty(transaction);
707        self.inner_rename(name, new_name, TableType::Normal)
708    }
709
710    #[track_caller]
711    fn rename_multimap_table(
712        &mut self,
713        transaction: &WriteTransaction,
714        name: &str,
715        new_name: &str,
716    ) -> Result<(), TableError> {
717        #[cfg(feature = "logging")]
718        debug!("Renaming multimap table: {} to {}", name, new_name);
719        self.set_dirty(transaction);
720        self.inner_rename(name, new_name, TableType::Multimap)
721    }
722
723    #[track_caller]
724    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
725        if let Some(location) = self.open_tables.get(name) {
726            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
727        }
728
729        self.table_tree.delete_table(name, table_type)
730    }
731
732    #[track_caller]
733    fn delete_table(
734        &mut self,
735        transaction: &WriteTransaction,
736        name: &str,
737    ) -> Result<bool, TableError> {
738        #[cfg(feature = "logging")]
739        debug!("Deleting table: {}", name);
740        self.set_dirty(transaction);
741        self.inner_delete(name, TableType::Normal)
742    }
743
744    #[track_caller]
745    fn delete_multimap_table(
746        &mut self,
747        transaction: &WriteTransaction,
748        name: &str,
749    ) -> Result<bool, TableError> {
750        #[cfg(feature = "logging")]
751        debug!("Deleting multimap table: {}", name);
752        self.set_dirty(transaction);
753        self.inner_delete(name, TableType::Multimap)
754    }
755
756    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
757        &mut self,
758        name: &str,
759        table: &BtreeMut<K, V>,
760        length: u64,
761    ) {
762        self.open_tables.remove(name).unwrap();
763        self.table_tree
764            .stage_update_table_root(name, table.get_root(), length);
765    }
766}
767
768/// A read/write transaction
769///
770/// Only a single [`WriteTransaction`] may exist at a time
771pub struct WriteTransaction {
772    transaction_tracker: Arc<TransactionTracker>,
773    mem: Arc<TransactionalMemory>,
774    transaction_guard: Arc<TransactionGuard>,
775    transaction_id: TransactionId,
776    // The table of freed pages by transaction. FreedTableKey -> binary.
777    // The binary blob is a length-prefixed array of PageNumber
778    freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
779    // Legacy freed pages list. This is only for pages where we don't know whether they originated
780    // in the data or the system tree
781    legacy_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
782    // Pages that were freed from the freed-tree. These can be freed immediately after commit(),
783    // since read transactions do not access the freed-tree
784    post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
785    tables: Mutex<TableNamespace<'static>>,
786    system_tables: Mutex<SystemNamespace<'static>>,
787    completed: bool,
788    dirty: AtomicBool,
789    durability: InternalDurability,
790    two_phase_commit: bool,
791    quick_repair: bool,
792    // Persistent savepoints created during this transaction
793    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
794    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
795}
796
797impl WriteTransaction {
798    pub(crate) fn new(
799        guard: TransactionGuard,
800        transaction_tracker: Arc<TransactionTracker>,
801        mem: Arc<TransactionalMemory>,
802    ) -> Result<Self> {
803        let transaction_id = guard.id();
804        let guard = Arc::new(guard);
805
806        let root_page = mem.get_data_root();
807        let system_page = mem.get_system_root();
808        let freed_root = mem.get_freed_root();
809        let post_commit_frees = Arc::new(Mutex::new(vec![]));
810
811        let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
812        let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
813        // No need to track allocations in the system tree. Savepoint restoration only relies on
814        // freeing in the data tree
815        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
816
817        Ok(Self {
818            transaction_tracker,
819            mem: mem.clone(),
820            transaction_guard: guard.clone(),
821            transaction_id,
822            tables: Mutex::new(tables),
823            system_tables: Mutex::new(system_tables),
824            freed_tree: Mutex::new(BtreeMut::new(
825                freed_root,
826                guard,
827                mem,
828                post_commit_frees.clone(),
829                ignore,
830            )),
831            legacy_freed_pages: Arc::new(Mutex::new(vec![])),
832            post_commit_frees,
833            completed: false,
834            dirty: AtomicBool::new(false),
835            durability: InternalDurability::Immediate,
836            two_phase_commit: false,
837            quick_repair: false,
838            created_persistent_savepoints: Mutex::new(Default::default()),
839            deleted_persistent_savepoints: Mutex::new(vec![]),
840        })
841    }
842
843    pub(crate) fn version_asserts(&self) -> Result {
844        if self.mem.file_format_v3() {
845            assert_eq!(self.freed_tree.lock().unwrap().len()?, 0);
846        } else {
847            let mut system_tables = self.system_tables.lock().unwrap();
848            let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
849            assert!(
850                data_allocated
851                    .range::<TransactionIdWithPagination>(..)?
852                    .next()
853                    .is_none()
854            );
855            drop(data_allocated);
856            let data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
857            assert!(
858                data_freed
859                    .range::<TransactionIdWithPagination>(..)?
860                    .next()
861                    .is_none()
862            );
863            drop(data_freed);
864            let system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
865            assert!(
866                system_freed
867                    .range::<TransactionIdWithPagination>(..)?
868                    .next()
869                    .is_none()
870            );
871        }
872
873        Ok(())
874    }
875
876    pub(crate) fn pending_free_pages(&self) -> Result<bool> {
877        if self.mem.get_freed_root().is_some() {
878            return Ok(true);
879        }
880        let mut system_tables = self.system_tables.lock().unwrap();
881        if system_tables
882            .open_system_table(self, DATA_FREED_TABLE)?
883            .tree
884            .get_root()
885            .is_some()
886        {
887            return Ok(true);
888        }
889        if system_tables
890            .open_system_table(self, SYSTEM_FREED_TABLE)?
891            .tree
892            .get_root()
893            .is_some()
894        {
895            return Ok(true);
896        }
897
898        Ok(false)
899    }
900
901    #[cfg(any(test, fuzzing))]
902    pub fn print_allocated_page_debug(&self) {
903        let mut all_allocated: HashSet<PageNumber> =
904            HashSet::from_iter(self.mem.all_allocated_pages());
905
906        let tracker = self.mem.tracker_page();
907        all_allocated.remove(&tracker);
908        println!("Tracker page");
909        println!("{tracker:?}");
910
911        let mut table_pages = vec![];
912        self.tables
913            .lock()
914            .unwrap()
915            .table_tree
916            .visit_all_pages(|path| {
917                table_pages.push(path.page_number());
918                Ok(())
919            })
920            .unwrap();
921        println!("Tables");
922        for p in table_pages {
923            all_allocated.remove(&p);
924            println!("{p:?}");
925        }
926
927        let mut system_table_pages = vec![];
928        self.system_tables
929            .lock()
930            .unwrap()
931            .table_tree
932            .visit_all_pages(|path| {
933                system_table_pages.push(path.page_number());
934                Ok(())
935            })
936            .unwrap();
937        println!("System tables");
938        for p in system_table_pages {
939            all_allocated.remove(&p);
940            println!("{p:?}");
941        }
942
943        println!("Free table");
944        if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
945            for p in freed_iter {
946                let p = p.unwrap();
947                all_allocated.remove(&p);
948                println!("{p:?}");
949            }
950        }
951        println!("Pending free (in legacy freed tree)");
952        for entry in self
953            .freed_tree
954            .lock()
955            .unwrap()
956            .range::<RangeFull, FreedTableKey>(&(..))
957            .unwrap()
958        {
959            let entry = entry.unwrap();
960            let value = entry.value();
961            for i in 0..value.len() {
962                let p = value.get(i);
963                all_allocated.remove(&p);
964                println!("{p:?}");
965            }
966        }
967        {
968            println!("Pending free (in data freed table)");
969            let mut system_tables = self.system_tables.lock().unwrap();
970            let data_freed = system_tables
971                .open_system_table(self, DATA_FREED_TABLE)
972                .unwrap();
973            for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
974                let (_, entry) = entry.unwrap();
975                let value = entry.value();
976                for i in 0..value.len() {
977                    let p = value.get(i);
978                    all_allocated.remove(&p);
979                    println!("{p:?}");
980                }
981            }
982        }
983        {
984            println!("Pending free (in system freed table)");
985            let mut system_tables = self.system_tables.lock().unwrap();
986            let system_freed = system_tables
987                .open_system_table(self, SYSTEM_FREED_TABLE)
988                .unwrap();
989            for entry in system_freed
990                .range::<TransactionIdWithPagination>(..)
991                .unwrap()
992            {
993                let (_, entry) = entry.unwrap();
994                let value = entry.value();
995                for i in 0..value.len() {
996                    let p = value.get(i);
997                    all_allocated.remove(&p);
998                    println!("{p:?}");
999                }
1000            }
1001        }
1002        {
1003            let pages = self.legacy_freed_pages.lock().unwrap();
1004            if !pages.is_empty() {
1005                println!("Pages in in-memory legacy freed_pages");
1006                for p in pages.iter() {
1007                    println!("{p:?}");
1008                    all_allocated.remove(p);
1009                }
1010            }
1011        }
1012        {
1013            let tables = self.tables.lock().unwrap();
1014            let pages = tables.freed_pages.lock().unwrap();
1015            if !pages.is_empty() {
1016                println!("Pages in in-memory data freed_pages");
1017                for p in pages.iter() {
1018                    println!("{p:?}");
1019                    all_allocated.remove(p);
1020                }
1021            }
1022        }
1023        {
1024            let system_tables = self.system_tables.lock().unwrap();
1025            let pages = system_tables.freed_pages.lock().unwrap();
1026            if !pages.is_empty() {
1027                println!("Pages in in-memory system freed_pages");
1028                for p in pages.iter() {
1029                    println!("{p:?}");
1030                    all_allocated.remove(p);
1031                }
1032            }
1033        }
1034        {
1035            let pages = self.post_commit_frees.lock().unwrap();
1036            if !pages.is_empty() {
1037                println!("Pages in in-memory post_commit_frees");
1038                for p in pages.iter() {
1039                    println!("{p:?}");
1040                    all_allocated.remove(p);
1041                }
1042            }
1043        }
1044        if !all_allocated.is_empty() {
1045            println!("Leaked pages");
1046            for p in all_allocated {
1047                println!("{p:?}");
1048            }
1049        }
1050    }
1051
1052    /// Creates a snapshot of the current database state, which can be used to rollback the database.
1053    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
1054    ///
1055    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
1056    /// Therefore, the lifetime of a savepoint should be minimized.
1057    ///
1058    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1059    /// or if the transaction's durability is less than `[Durability::Immediate]`
1060    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
1061        if self.durability != InternalDurability::Immediate {
1062            return Err(SavepointError::InvalidSavepoint);
1063        }
1064
1065        let mut savepoint = self.ephemeral_savepoint()?;
1066
1067        let mut system_tables = self.system_tables.lock().unwrap();
1068
1069        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1070        next_table.insert((), savepoint.get_id().next())?;
1071        drop(next_table);
1072
1073        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1074        savepoint_table.insert(
1075            savepoint.get_id(),
1076            SerializedSavepoint::from_savepoint(&savepoint),
1077        )?;
1078
1079        savepoint.set_persistent();
1080
1081        self.created_persistent_savepoints
1082            .lock()
1083            .unwrap()
1084            .insert(savepoint.get_id());
1085
1086        Ok(savepoint.get_id().0)
1087    }
1088
1089    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1090        self.transaction_guard.clone()
1091    }
1092
1093    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1094        let mut system_tables = self.system_tables.lock().unwrap();
1095        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1096        let value = next_table.get(())?;
1097        if let Some(next_id) = value {
1098            Ok(Some(next_id.value()))
1099        } else {
1100            Ok(None)
1101        }
1102    }
1103
1104    /// Get a persistent savepoint given its id
1105    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1106        let mut system_tables = self.system_tables.lock().unwrap();
1107        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1108        let value = table.get(SavepointId(id))?;
1109
1110        value
1111            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1112            .ok_or(SavepointError::InvalidSavepoint)
1113    }
1114
1115    /// Delete the given persistent savepoint.
1116    ///
1117    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
1118    ///
1119    /// Returns `true` if the savepoint existed
1120    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
1121    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1122        if self.durability != InternalDurability::Immediate {
1123            return Err(SavepointError::InvalidSavepoint);
1124        }
1125        let mut system_tables = self.system_tables.lock().unwrap();
1126        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1127        let savepoint = table.remove(SavepointId(id))?;
1128        if let Some(serialized) = savepoint {
1129            let savepoint = serialized
1130                .value()
1131                .to_savepoint(self.transaction_tracker.clone());
1132            self.deleted_persistent_savepoints
1133                .lock()
1134                .unwrap()
1135                .push((savepoint.get_id(), savepoint.get_transaction_id()));
1136            Ok(true)
1137        } else {
1138            Ok(false)
1139        }
1140    }
1141
1142    /// List all persistent savepoints
1143    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1144        let mut system_tables = self.system_tables.lock().unwrap();
1145        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1146        let mut savepoints = vec![];
1147        for savepoint in table.range::<SavepointId>(..)? {
1148            savepoints.push(savepoint?.0.value().0);
1149        }
1150        Ok(savepoints.into_iter())
1151    }
1152
1153    // TODO: deduplicate this with the one in Database
1154    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1155        let id = self
1156            .transaction_tracker
1157            .register_read_transaction(&self.mem)?;
1158
1159        Ok(TransactionGuard::new_read(
1160            id,
1161            self.transaction_tracker.clone(),
1162        ))
1163    }
1164
1165    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1166        let transaction_id = self.allocate_read_transaction()?.leak();
1167        let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1168        Ok((id, transaction_id))
1169    }
1170
1171    /// Creates a snapshot of the current database state, which can be used to rollback the database
1172    ///
1173    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
1174    ///
1175    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1176    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1177        if self.dirty.load(Ordering::Acquire) {
1178            return Err(SavepointError::InvalidSavepoint);
1179        }
1180
1181        let (id, transaction_id) = self.allocate_savepoint()?;
1182        #[cfg(feature = "logging")]
1183        debug!(
1184            "Creating savepoint id={:?}, txn_id={:?}",
1185            id, transaction_id
1186        );
1187
1188        let (system_root, freed_root, regional_allocators) = if self.mem.file_format_v3() {
1189            (None, None, vec![])
1190        } else {
1191            let system_root = self.mem.get_system_root();
1192            let freed_root = self.mem.get_freed_root();
1193            let regional_allocators = self.mem.get_raw_allocator_states();
1194
1195            (system_root, freed_root, regional_allocators)
1196        };
1197
1198        let root = self.mem.get_data_root();
1199        let savepoint = Savepoint::new_ephemeral(
1200            &self.mem,
1201            self.transaction_tracker.clone(),
1202            id,
1203            transaction_id,
1204            root,
1205            system_root,
1206            freed_root,
1207            regional_allocators,
1208        );
1209
1210        Ok(savepoint)
1211    }
1212
1213    /// Restore the state of the database to the given [`Savepoint`]
1214    ///
1215    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
1216    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1217        // Ensure that user does not try to restore a Savepoint that is from a different Database
1218        assert_eq!(
1219            std::ptr::from_ref(self.transaction_tracker.as_ref()),
1220            savepoint.db_address()
1221        );
1222
1223        if !self
1224            .transaction_tracker
1225            .is_valid_savepoint(savepoint.get_id())
1226        {
1227            return Err(SavepointError::InvalidSavepoint);
1228        }
1229        #[cfg(feature = "logging")]
1230        debug!(
1231            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1232            savepoint.get_id(),
1233            self.transaction_id
1234        );
1235        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
1236        // the database
1237        assert_eq!(self.mem.get_version(), savepoint.get_version());
1238        self.dirty.store(true, Ordering::Release);
1239
1240        // Restoring a savepoint needs to accomplish the following:
1241        // 1) restore the table tree. This is trivial, since we have the old root
1242        // 1a) we also filter the freed tree to remove any pages referenced by the old root
1243        // 2) free all pages that were allocated since the savepoint and are unreachable
1244        //    from the restored table tree root. Here we diff the reachable pages from the old
1245        //    and new roots
1246        // 3) update the system tree to remove invalid persistent savepoints.
1247
1248        if self.mem.file_format_v3() {
1249            self.restore_savepoint_v2(savepoint)
1250        } else {
1251            self.restore_savepoint_v1(savepoint)
1252        }
1253    }
1254
1255    fn restore_savepoint_v1(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1256        let old_system_tree = TableTree::new(
1257            savepoint.get_system_root(),
1258            PageHint::None,
1259            self.transaction_guard.clone(),
1260            self.mem.clone(),
1261        )?;
1262        let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
1263            savepoint.get_freed_root(),
1264            PageHint::None,
1265            self.transaction_guard.clone(),
1266            self.mem.clone(),
1267        )?;
1268
1269        // Pages which are part of the system and freed trees in the savepoint, should be freed
1270        // even after the savepoint is restored, because the system and freed trees only roll
1271        // forward
1272        let mut old_system_and_freed_pages = HashSet::new();
1273        old_system_tree.visit_all_pages(|path| {
1274            old_system_and_freed_pages.insert(path.page_number());
1275            Ok(())
1276        })?;
1277        old_freed_tree.visit_all_pages(|path| {
1278            old_system_and_freed_pages.insert(path.page_number());
1279            Ok(())
1280        })?;
1281
1282        // 1) restore the table tree
1283        {
1284            self.tables
1285                .lock()
1286                .unwrap()
1287                .set_root(savepoint.get_user_root());
1288        }
1289
1290        // 1a) purge all transactions that happened after the savepoint from freed tree,
1291        // except pages from the old system or freed tree in the savepoint. Those still need to be
1292        // freed, since the system tree only rolls forward, never back. This brings all pages in the
1293        // old data root back to the committed state
1294        // This operation will also leak everything else that was allocated since the savepoint,
1295        // but we fix that below -- noting that all the system trees that existed between the savepoint
1296        // and now which might be referenced by other savepoints will become unreachable, since those
1297        // savepoints are invalidated by this restoration
1298        let mut txn_id = savepoint.get_transaction_id().next().raw_id();
1299        let mut freed_tree = self.freed_tree.lock().unwrap();
1300        loop {
1301            let lower = FreedTableKey {
1302                transaction_id: txn_id,
1303                pagination_id: 0,
1304            };
1305
1306            if freed_tree.range(&(lower..))?.next().is_none() {
1307                break;
1308            }
1309            let lower = FreedTableKey {
1310                transaction_id: txn_id,
1311                pagination_id: 0,
1312            };
1313            let upper = FreedTableKey {
1314                transaction_id: txn_id + 1,
1315                pagination_id: 0,
1316            };
1317
1318            // Find all the pending pages for this txn and filter them
1319            let mut pending_pages = vec![];
1320            for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
1321                let item = entry?;
1322                for i in 0..item.value().len() {
1323                    let p = item.value().get(i);
1324                    // Keep the old system and freed tree pages, but purge anything else
1325                    if old_system_and_freed_pages.contains(&p) {
1326                        pending_pages.push(p);
1327                    }
1328                }
1329            }
1330
1331            let mut pagination_counter = 0u64;
1332            while !pending_pages.is_empty() {
1333                let chunk_size = 100;
1334                let buffer_size = FreedPageList::required_bytes(chunk_size);
1335                let key = FreedTableKey {
1336                    transaction_id: txn_id,
1337                    pagination_id: pagination_counter,
1338                };
1339                let mut access_guard =
1340                    freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1341
1342                let len = pending_pages.len();
1343                access_guard.as_mut().clear();
1344                for page in pending_pages.drain(len - min(len, chunk_size)..) {
1345                    access_guard.as_mut().push_back(page);
1346                }
1347                drop(access_guard);
1348
1349                pagination_counter += 1;
1350            }
1351
1352            txn_id += 1;
1353        }
1354
1355        let mut current_system_and_freed_pages = HashSet::new();
1356        self.system_tables
1357            .lock()
1358            .unwrap()
1359            .table_tree
1360            .visit_all_pages(|path| {
1361                current_system_and_freed_pages.insert(path.page_number());
1362                Ok(())
1363            })?;
1364        freed_tree.visit_all_pages(|path| {
1365            current_system_and_freed_pages.insert(path.page_number());
1366            Ok(())
1367        })?;
1368
1369        let mut old_allocators: Vec<BuddyAllocator> = savepoint
1370            .get_regional_allocators()
1371            .iter()
1372            .map(|data| BuddyAllocator::from_savepoint_state(data))
1373            .collect();
1374
1375        // Find the oldest transaction in the current freed tree, for use below
1376        {
1377            let oldest_unprocessed_transaction =
1378                if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
1379                    entry?.key().transaction_id
1380                } else {
1381                    self.transaction_id.raw_id()
1382                };
1383
1384            let lookup_key = FreedTableKey {
1385                transaction_id: oldest_unprocessed_transaction,
1386                pagination_id: 0,
1387            };
1388
1389            // Replay all finalized frees into the old allocator state to ensure that a page which
1390            // was pending free, freed, and then reallocated does not leak
1391            for entry in old_freed_tree.range(&(..lookup_key))? {
1392                let item = entry?;
1393                let pages: FreedPageList = item.value();
1394                for i in 0..pages.len() {
1395                    let page = pages.get(i);
1396                    assert!(
1397                        old_allocators[page.region as usize]
1398                            .is_allocated(page.page_index, page.page_order)
1399                    );
1400                    old_allocators[page.region as usize].free(page.page_index, page.page_order);
1401                }
1402            }
1403
1404            if let Some(header) = old_system_tree
1405                .get_table::<TransactionIdWithPagination, PageList>(
1406                    DATA_FREED_TABLE.name(),
1407                    TableType::Normal,
1408                )
1409                .map_err(|e| e.into_storage_error_or_corrupted("Data freed table is corrupted"))?
1410            {
1411                match header {
1412                    InternalTableDefinition::Normal { table_root, .. } => {
1413                        assert!(table_root.is_none());
1414                    }
1415                    InternalTableDefinition::Multimap { .. } => unreachable!(),
1416                };
1417            }
1418
1419            if let Some(header) = old_system_tree
1420                .get_table::<TransactionIdWithPagination, PageList>(
1421                    SYSTEM_FREED_TABLE.name(),
1422                    TableType::Normal,
1423                )
1424                .map_err(|e| e.into_storage_error_or_corrupted("System freed table is corrupted"))?
1425            {
1426                match header {
1427                    InternalTableDefinition::Normal { table_root, .. } => {
1428                        assert!(table_root.is_none());
1429                    }
1430                    InternalTableDefinition::Multimap { .. } => unreachable!(),
1431                };
1432            }
1433        }
1434
1435        // 2) free all pages that became unreachable
1436        let mut legacy_freed_pages = self.legacy_freed_pages.lock().unwrap();
1437        let tables = self.tables.lock().unwrap();
1438        let mut allocated = tables.allocated_pages.lock().unwrap();
1439        let mut already_awaiting_free: HashSet<PageNumber> =
1440            legacy_freed_pages.iter().copied().collect();
1441        already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
1442        already_awaiting_free.extend(tables.freed_pages.lock().unwrap().iter().copied());
1443        already_awaiting_free.extend(
1444            self.system_tables
1445                .lock()
1446                .unwrap()
1447                .system_freed_pages()
1448                .lock()
1449                .unwrap()
1450                .iter()
1451                .copied(),
1452        );
1453        let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
1454        for page in to_free {
1455            if already_awaiting_free.contains(&page) {
1456                // Make sure that we don't double free something that is already going to be freed
1457                continue;
1458            }
1459            if current_system_and_freed_pages.contains(&page) {
1460                // Don't free pages which are part of the current system or freed tree, even though
1461                // these pages are new. Again this is because these trees only move forward;
1462                // never backwards as part of a savepoint restore
1463                continue;
1464            }
1465            if self.mem.uncommitted(page) {
1466                self.mem.free(page, &mut allocated);
1467            } else {
1468                legacy_freed_pages.push(page);
1469            }
1470        }
1471        drop(legacy_freed_pages);
1472        drop(freed_tree);
1473        drop(allocated);
1474        drop(tables);
1475
1476        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1477        // from later trying to restore a savepoint "on another timeline"
1478        self.transaction_tracker
1479            .invalidate_savepoints_after(savepoint.get_id());
1480        for persistent_savepoint in self.list_persistent_savepoints()? {
1481            if persistent_savepoint > savepoint.get_id().0 {
1482                self.delete_persistent_savepoint(persistent_savepoint)?;
1483            }
1484        }
1485
1486        Ok(())
1487    }
1488
1489    fn restore_savepoint_v2(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1490        // 1) restore the table tree
1491        {
1492            self.tables
1493                .lock()
1494                .unwrap()
1495                .set_root(savepoint.get_user_root());
1496        }
1497
1498        // 1a) purge all transactions that happened after the savepoint from the data freed tree
1499        let txn_id = savepoint.get_transaction_id().next().raw_id();
1500        {
1501            let lower = TransactionIdWithPagination {
1502                transaction_id: txn_id,
1503                pagination_id: 0,
1504            };
1505            let mut system_tables = self.system_tables.lock().unwrap();
1506            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1507            for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1508                entry?;
1509            }
1510            // No need to process the system freed table, because it only rolls forward
1511        }
1512        let freed_tree = self.freed_tree.lock().unwrap();
1513        assert!(freed_tree.get_root().is_none());
1514
1515        // 2) queue all pages that became unreachable
1516        {
1517            let tables = self.tables.lock().unwrap();
1518            let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1519            let mut system_tables = self.system_tables.lock().unwrap();
1520            let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1521            let lower = TransactionIdWithPagination {
1522                transaction_id: txn_id,
1523                pagination_id: 0,
1524            };
1525            for entry in data_allocated.range(lower..)? {
1526                let (_, value) = entry?;
1527                for i in 0..value.value().len() {
1528                    data_freed_pages.push(value.value().get(i));
1529                }
1530            }
1531        }
1532
1533        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1534        // from later trying to restore a savepoint "on another timeline"
1535        self.transaction_tracker
1536            .invalidate_savepoints_after(savepoint.get_id());
1537        for persistent_savepoint in self.list_persistent_savepoints()? {
1538            if persistent_savepoint > savepoint.get_id().0 {
1539                self.delete_persistent_savepoint(persistent_savepoint)?;
1540            }
1541        }
1542
1543        Ok(())
1544    }
1545
1546    /// Set the desired durability level for writes made in this transaction
1547    /// Defaults to [`Durability::Immediate`]
1548    ///
1549    /// Will panic if the durability is reduced below `[Durability::Immediate]` after a persistent savepoint has been created or deleted.
1550    pub fn set_durability(&mut self, durability: Durability) {
1551        let no_created = self
1552            .created_persistent_savepoints
1553            .lock()
1554            .unwrap()
1555            .is_empty();
1556        let no_deleted = self
1557            .deleted_persistent_savepoints
1558            .lock()
1559            .unwrap()
1560            .is_empty();
1561        assert!(no_created && no_deleted);
1562
1563        self.durability = match durability {
1564            Durability::None => InternalDurability::None,
1565            Durability::Eventual => InternalDurability::Eventual,
1566            Durability::Immediate => InternalDurability::Immediate,
1567            #[allow(deprecated)]
1568            Durability::Paranoid => {
1569                self.set_two_phase_commit(true);
1570                InternalDurability::Immediate
1571            }
1572        };
1573    }
1574
1575    /// Enable or disable 2-phase commit (defaults to disabled)
1576    ///
1577    /// By default, data is written using the following 1-phase commit algorithm:
1578    ///
1579    /// 1. Update the inactive commit slot with the new database state
1580    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1581    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1582    ///
1583    /// All data is written with checksums. When opening the database after a crash, the most
1584    /// recent of the two commit slots with a valid checksum is used.
1585    ///
1586    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1587    /// function with close to perfect collision resistance when used with non-malicious input. An
1588    /// attacker with an extremely high degree of control over the database's workload, including
1589    /// the ability to cause the database process to crash, can cause invalid data to be written
1590    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1591    ///
1592    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1593    ///
1594    /// 1. Update the inactive commit slot with the new database state
1595    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1596    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1597    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1598    ///
1599    /// This mitigates a theoretical attack where an attacker who
1600    /// 1. can control the order in which pages are flushed to disk
1601    /// 2. can introduce crashes during `fsync`,
1602    /// 3. has knowledge of the database file contents, and
1603    /// 4. can include arbitrary data in a write transaction
1604    ///
1605    /// could cause a transaction to partially commit (some but not all of the data is written).
1606    /// This is described in the design doc in futher detail.
1607    ///
1608    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1609    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1610    /// a high degree of control over the database's workload, including the ability to cause the
1611    /// database process to crash, can cause the database to crash with the god byte primary bit
1612    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1613    /// controlled state.
1614    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1615        self.two_phase_commit = enabled;
1616    }
1617
1618    /// Enable or disable quick-repair (defaults to disabled)
1619    ///
1620    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1621    /// This involves walking the entire database to verify the checksums and reconstruct the
1622    /// allocator state, so it can be very slow if the database is large.
1623    ///
1624    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1625    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1626    /// (which guarantees that the primary commit slot is valid without needing to look at the
1627    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1628    pub fn set_quick_repair(&mut self, enabled: bool) {
1629        self.quick_repair = enabled;
1630    }
1631
1632    /// Open the given table
1633    ///
1634    /// The table will be created if it does not exist
1635    #[track_caller]
1636    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1637        &'txn self,
1638        definition: TableDefinition<K, V>,
1639    ) -> Result<Table<'txn, K, V>, TableError> {
1640        self.tables.lock().unwrap().open_table(self, definition)
1641    }
1642
1643    /// Open the given table
1644    ///
1645    /// The table will be created if it does not exist
1646    #[track_caller]
1647    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1648        &'txn self,
1649        definition: MultimapTableDefinition<K, V>,
1650    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1651        self.tables
1652            .lock()
1653            .unwrap()
1654            .open_multimap_table(self, definition)
1655    }
1656
1657    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1658        &self,
1659        name: &str,
1660        table: &BtreeMut<K, V>,
1661        length: u64,
1662    ) {
1663        self.tables.lock().unwrap().close_table(name, table, length);
1664    }
1665
1666    /// Rename the given table
1667    pub fn rename_table(
1668        &self,
1669        definition: impl TableHandle,
1670        new_name: impl TableHandle,
1671    ) -> Result<(), TableError> {
1672        let name = definition.name().to_string();
1673        // Drop the definition so that callers can pass in a `Table` to rename, without getting a TableAlreadyOpen error
1674        drop(definition);
1675        self.tables
1676            .lock()
1677            .unwrap()
1678            .rename_table(self, &name, new_name.name())
1679    }
1680
1681    /// Rename the given multimap table
1682    pub fn rename_multimap_table(
1683        &self,
1684        definition: impl MultimapTableHandle,
1685        new_name: impl MultimapTableHandle,
1686    ) -> Result<(), TableError> {
1687        let name = definition.name().to_string();
1688        // Drop the definition so that callers can pass in a `MultimapTable` to rename, without getting a TableAlreadyOpen error
1689        drop(definition);
1690        self.tables
1691            .lock()
1692            .unwrap()
1693            .rename_multimap_table(self, &name, new_name.name())
1694    }
1695
1696    /// Delete the given table
1697    ///
1698    /// Returns a bool indicating whether the table existed
1699    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1700        let name = definition.name().to_string();
1701        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1702        drop(definition);
1703        self.tables.lock().unwrap().delete_table(self, &name)
1704    }
1705
1706    /// Delete the given table
1707    ///
1708    /// Returns a bool indicating whether the table existed
1709    pub fn delete_multimap_table(
1710        &self,
1711        definition: impl MultimapTableHandle,
1712    ) -> Result<bool, TableError> {
1713        let name = definition.name().to_string();
1714        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1715        drop(definition);
1716        self.tables
1717            .lock()
1718            .unwrap()
1719            .delete_multimap_table(self, &name)
1720    }
1721
1722    /// List all the tables
1723    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1724        self.tables
1725            .lock()
1726            .unwrap()
1727            .table_tree
1728            .list_tables(TableType::Normal)
1729            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1730    }
1731
1732    /// List all the multimap tables
1733    pub fn list_multimap_tables(
1734        &self,
1735    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1736        self.tables
1737            .lock()
1738            .unwrap()
1739            .table_tree
1740            .list_tables(TableType::Multimap)
1741            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1742    }
1743
1744    /// Commit the transaction
1745    ///
1746    /// All writes performed in this transaction will be visible to future transactions, and are
1747    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1748    pub fn commit(mut self) -> Result<(), CommitError> {
1749        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1750        self.completed = true;
1751        self.commit_inner()
1752    }
1753
1754    fn commit_inner(&mut self) -> Result<(), CommitError> {
1755        // Quick-repair requires 2-phase commit
1756        if self.quick_repair {
1757            self.two_phase_commit = true;
1758        }
1759
1760        let (user_root, allocated_pages, data_freed) =
1761            self.tables.lock().unwrap().table_tree.flush_and_close()?;
1762
1763        if self.mem.file_format_v3() {
1764            self.store_data_freed_pages(data_freed)?;
1765        } else {
1766            self.legacy_freed_pages.lock().unwrap().extend(data_freed);
1767        }
1768        self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1769
1770        #[cfg(feature = "logging")]
1771        debug!(
1772            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1773            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1774        );
1775        match self.durability {
1776            InternalDurability::None => self.non_durable_commit(user_root)?,
1777            InternalDurability::Eventual => self.durable_commit(user_root, true)?,
1778            InternalDurability::Immediate => self.durable_commit(user_root, false)?,
1779        }
1780
1781        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1782            self.transaction_tracker
1783                .deallocate_savepoint(*savepoint, *transaction);
1784        }
1785
1786        assert!(self.legacy_freed_pages.lock().unwrap().is_empty());
1787        assert!(
1788            self.system_tables
1789                .lock()
1790                .unwrap()
1791                .system_freed_pages()
1792                .lock()
1793                .unwrap()
1794                .is_empty()
1795        );
1796        assert!(
1797            self.tables
1798                .lock()
1799                .unwrap()
1800                .freed_pages
1801                .lock()
1802                .unwrap()
1803                .is_empty()
1804        );
1805        assert!(self.post_commit_frees.lock().unwrap().is_empty());
1806
1807        #[cfg(feature = "logging")]
1808        debug!(
1809            "Finished commit of transaction id={:?}",
1810            self.transaction_id
1811        );
1812
1813        Ok(())
1814    }
1815
1816    fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1817        let mut system_tables = self.system_tables.lock().unwrap();
1818        let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1819        let mut pagination_counter = 0;
1820        while !freed_pages.is_empty() {
1821            let chunk_size = 400;
1822            let buffer_size = PageList::required_bytes(chunk_size);
1823            let key = TransactionIdWithPagination {
1824                transaction_id: self.transaction_id.raw_id(),
1825                pagination_id: pagination_counter,
1826            };
1827            let mut access_guard =
1828                freed_table.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1829
1830            let len = freed_pages.len();
1831            access_guard.as_mut().clear();
1832            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1833                // Make sure that the page is currently allocated
1834                debug_assert!(
1835                    self.mem.is_allocated(page),
1836                    "Page is not allocated: {page:?}"
1837                );
1838                debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1839                access_guard.as_mut().push_back(page);
1840            }
1841
1842            pagination_counter += 1;
1843        }
1844
1845        Ok(())
1846    }
1847
1848    fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1849        let mut system_tables = self.system_tables.lock().unwrap();
1850        let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1851        let mut pagination_counter = 0;
1852        while !data_allocated_pages.is_empty() {
1853            let chunk_size = 400;
1854            let buffer_size = PageList::required_bytes(chunk_size);
1855            let key = TransactionIdWithPagination {
1856                transaction_id: self.transaction_id.raw_id(),
1857                pagination_id: pagination_counter,
1858            };
1859            let mut access_guard =
1860                allocated_table.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1861
1862            let len = data_allocated_pages.len();
1863            access_guard.as_mut().clear();
1864            for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1865                // Make sure that the page is currently allocated. This is to catch scenarios like
1866                // a page getting allocated, and then deallocated within the same transaction,
1867                // but errantly being left in the allocated pages list
1868                debug_assert!(
1869                    self.mem.is_allocated(page),
1870                    "Page is not allocated: {page:?}"
1871                );
1872                debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1873                access_guard.as_mut().push_back(page);
1874            }
1875
1876            pagination_counter += 1;
1877        }
1878
1879        // Purge any transactions that are no longer referenced
1880        let oldest = self
1881            .transaction_tracker
1882            .oldest_savepoint()
1883            .map_or(u64::MAX, |(_, x)| x.raw_id());
1884        let key = TransactionIdWithPagination {
1885            transaction_id: oldest,
1886            pagination_id: 0,
1887        };
1888        for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1889            entry?;
1890        }
1891
1892        Ok(())
1893    }
1894
1895    /// Abort the transaction
1896    ///
1897    /// All writes performed in this transaction will be rolled back
1898    pub fn abort(mut self) -> Result {
1899        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1900        self.completed = true;
1901        self.abort_inner()
1902    }
1903
1904    fn abort_inner(&mut self) -> Result {
1905        #[cfg(feature = "logging")]
1906        debug!("Aborting transaction id={:?}", self.transaction_id);
1907        self.tables
1908            .lock()
1909            .unwrap()
1910            .table_tree
1911            .clear_root_updates_and_close();
1912        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1913            match self.delete_persistent_savepoint(savepoint.0) {
1914                Ok(_) => {}
1915                Err(err) => match err {
1916                    SavepointError::InvalidSavepoint => {
1917                        unreachable!();
1918                    }
1919                    SavepointError::Storage(storage_err) => {
1920                        return Err(storage_err);
1921                    }
1922                },
1923            }
1924        }
1925        self.mem.rollback_uncommitted_writes()?;
1926        #[cfg(feature = "logging")]
1927        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1928        Ok(())
1929    }
1930
1931    pub(crate) fn durable_commit(
1932        &mut self,
1933        user_root: Option<BtreeHeader>,
1934        eventual: bool,
1935    ) -> Result {
1936        let free_until_transaction = self
1937            .transaction_tracker
1938            .oldest_live_read_transaction()
1939            .map_or(self.transaction_id, |x| x.next());
1940        self.process_freed_pages(free_until_transaction)?;
1941
1942        let mut system_tables = self.system_tables.lock().unwrap();
1943        let system_freed_pages = system_tables.system_freed_pages();
1944        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1945        system_tree
1946            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1947            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1948
1949        if self.quick_repair {
1950            system_tree.create_table_and_flush_table_root(
1951                ALLOCATOR_STATE_TABLE_NAME,
1952                |system_tree_ref, tree: &mut AllocatorStateTree| {
1953                    let mut legacy_pagination_counter = 0;
1954                    let mut v2_pagination_counter = 0;
1955
1956                    loop {
1957                        let num_regions = self
1958                            .mem
1959                            .reserve_allocator_state(tree, self.transaction_id)?;
1960
1961                        // We can't free pages after the commit, because that would invalidate our
1962                        // saved allocator state. Everything needs to go through the transactional
1963                        // free mechanism
1964                        self.store_freed_pages(
1965                            system_tree_ref,
1966                            system_freed_pages.clone(),
1967                            &mut legacy_pagination_counter,
1968                            &mut v2_pagination_counter,
1969                            true,
1970                        )?;
1971
1972                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1973                            return Ok(());
1974                        }
1975
1976                        // Clear out the table before retrying, just in case the number of regions
1977                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1978                        // free the pages immediately -- we need to reuse those pages to guarantee
1979                        // that our retry loop will eventually terminate
1980                        while let Some(guards) = tree.last()? {
1981                            let key = guards.0.value();
1982                            drop(guards);
1983                            tree.remove(&key)?;
1984                        }
1985                    }
1986                },
1987            )?;
1988        } else {
1989            // If a savepoint exists it might reference the freed-tree, since it holds a reference to the
1990            // root of the freed-tree. Therefore, we must use the transactional free mechanism to free
1991            // those pages. If there are no save points then these can be immediately freed, which is
1992            // done at the end of this function.
1993            let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1994            self.store_freed_pages(
1995                system_tree,
1996                system_freed_pages,
1997                &mut 0,
1998                &mut 0,
1999                savepoint_exists,
2000            )?;
2001        }
2002
2003        let system_root = system_tree.finalize_dirty_checksums()?;
2004
2005        // Finalize freed table checksums, before doing the final commit
2006        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
2007
2008        self.mem.commit(
2009            user_root,
2010            system_root,
2011            freed_root,
2012            self.transaction_id,
2013            eventual,
2014            self.two_phase_commit,
2015        )?;
2016
2017        // Mark any pending non-durable commits as fully committed.
2018        self.transaction_tracker.clear_pending_non_durable_commits();
2019
2020        // Immediately free the pages that were freed from the freed-tree itself. These are only
2021        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
2022        for page in self.post_commit_frees.lock().unwrap().drain(..) {
2023            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2024        }
2025
2026        Ok(())
2027    }
2028
2029    // Commit without a durability guarantee
2030    pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2031        let system_root = {
2032            let mut system_tables = self.system_tables.lock().unwrap();
2033            let system_freed_pages = system_tables.system_freed_pages();
2034            system_tables.table_tree.flush_table_root_updates()?;
2035            // Store all freed pages for a future commit(), since we can't free pages during a
2036            // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
2037            self.store_freed_pages(
2038                &mut system_tables.table_tree,
2039                system_freed_pages,
2040                &mut 0,
2041                &mut 0,
2042                true,
2043            )?;
2044
2045            system_tables
2046                .table_tree
2047                .flush_table_root_updates()?
2048                .finalize_dirty_checksums()?
2049        };
2050
2051        // Finalize all checksums, before doing the final commit
2052        let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
2053
2054        self.mem
2055            .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
2056        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
2057        // are only processed after this has been persisted
2058        self.transaction_tracker
2059            .register_non_durable_commit(self.transaction_id);
2060        Ok(())
2061    }
2062
2063    // Relocate pages to lower number regions/pages
2064    // Returns true if a page(s) was moved
2065    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
2066        let mut progress = false;
2067        // Relocate the region tracker page
2068        if !self.mem.file_format_v3() && self.mem.relocate_region_tracker()? {
2069            progress = true;
2070        }
2071
2072        // Find the 1M highest pages
2073        let mut highest_pages = BTreeMap::new();
2074        let mut tables = self.tables.lock().unwrap();
2075        let table_tree = &mut tables.table_tree;
2076        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2077        let mut system_tables = self.system_tables.lock().unwrap();
2078        let system_table_tree = &mut system_tables.table_tree;
2079        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2080
2081        // Calculate how many of them can be relocated to lower pages, starting from the last page
2082        let mut relocation_map = HashMap::new();
2083        for path in highest_pages.into_values().rev() {
2084            if relocation_map.contains_key(&path.page_number()) {
2085                continue;
2086            }
2087            let old_page = self.mem.get_page(path.page_number())?;
2088            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
2089            let new_page_number = new_page.get_page_number();
2090            // We have to copy at least the page type into the new page.
2091            // Otherwise its cache priority will be calculated incorrectly
2092            new_page.memory_mut()[0] = old_page.memory()[0];
2093            drop(new_page);
2094            // We're able to move this to a lower page, so insert it and rewrite all its parents
2095            if new_page_number < path.page_number() {
2096                relocation_map.insert(path.page_number(), new_page_number);
2097                for parent in path.parents() {
2098                    if relocation_map.contains_key(parent) {
2099                        continue;
2100                    }
2101                    let old_parent = self.mem.get_page(*parent)?;
2102                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
2103                    let new_page_number = new_page.get_page_number();
2104                    // We have to copy at least the page type into the new page.
2105                    // Otherwise its cache priority will be calculated incorrectly
2106                    new_page.memory_mut()[0] = old_parent.memory()[0];
2107                    drop(new_page);
2108                    relocation_map.insert(*parent, new_page_number);
2109                }
2110            } else {
2111                self.mem
2112                    .free(new_page_number, &mut PageTrackerPolicy::Ignore);
2113                break;
2114            }
2115        }
2116
2117        if !relocation_map.is_empty() {
2118            progress = true;
2119        }
2120
2121        table_tree.relocate_tables(&relocation_map)?;
2122        system_table_tree.relocate_tables(&relocation_map)?;
2123
2124        Ok(progress)
2125    }
2126
2127    // NOTE: must be called before store_freed_pages() during commit, since this can create
2128    // more pages freed by the current transaction
2129    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
2130        // We assume below that PageNumber is length 8
2131        assert_eq!(PageNumber::serialized_size(), 8);
2132        // TODO: remove all this old code for the original freed tree
2133        let lookup_key = FreedTableKey {
2134            transaction_id: free_until.raw_id(),
2135            pagination_id: 0,
2136        };
2137
2138        let mut to_remove = vec![];
2139        let mut freed_tree = self.freed_tree.lock().unwrap();
2140        for entry in freed_tree.range(&(..lookup_key))? {
2141            let entry = entry?;
2142            to_remove.push(entry.key());
2143            let value = entry.value();
2144            for i in 0..value.len() {
2145                self.mem.free(value.get(i), &mut PageTrackerPolicy::Ignore);
2146            }
2147        }
2148
2149        // Remove all the old transactions
2150        for key in to_remove {
2151            freed_tree.remove(&key)?;
2152        }
2153
2154        // Handle the data freed tree
2155        let mut system_tables = self.system_tables.lock().unwrap();
2156        {
2157            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
2158            let key = TransactionIdWithPagination {
2159                transaction_id: free_until.raw_id(),
2160                pagination_id: 0,
2161            };
2162            for entry in data_freed.extract_from_if(..key, |_, _| true)? {
2163                let (_, page_list) = entry?;
2164                for i in 0..page_list.value().len() {
2165                    self.mem
2166                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
2167                }
2168            }
2169        }
2170
2171        // Handle the system freed tree
2172        {
2173            let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
2174            let key = TransactionIdWithPagination {
2175                transaction_id: free_until.raw_id(),
2176                pagination_id: 0,
2177            };
2178            for entry in system_freed.extract_from_if(..key, |_, _| true)? {
2179                let (_, page_list) = entry?;
2180                for i in 0..page_list.value().len() {
2181                    self.mem
2182                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
2183                }
2184            }
2185        }
2186
2187        Ok(())
2188    }
2189
2190    fn store_freed_pages(
2191        &self,
2192        system_tree: &mut TableTreeMut,
2193        system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
2194        legacy_pagination_counter: &mut u64,
2195        v2_pagination_counter: &mut u64,
2196        include_post_commit_free: bool,
2197    ) -> Result {
2198        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
2199
2200        if self.mem.file_format_v3() {
2201            if include_post_commit_free {
2202                system_tree.open_table_and_flush_table_root(
2203                    SYSTEM_FREED_TABLE.name(),
2204                    |system_freed_tree: &mut SystemFreedTree| {
2205                        while !system_freed_pages.lock().unwrap().is_empty() {
2206                            let chunk_size = 200;
2207                            let buffer_size = PageList::required_bytes(chunk_size);
2208                            let key = TransactionIdWithPagination {
2209                                transaction_id: self.transaction_id.raw_id(),
2210                                pagination_id: *v2_pagination_counter,
2211                            };
2212                            let mut access_guard = system_freed_tree
2213                                .insert_reserve(&key, buffer_size.try_into().unwrap())?;
2214
2215                            let mut freed_pages = system_freed_pages.lock().unwrap();
2216                            let len = freed_pages.len();
2217                            access_guard.as_mut().clear();
2218                            for page in freed_pages.drain(len - min(len, chunk_size)..) {
2219                                access_guard.as_mut().push_back(page);
2220                            }
2221                            drop(access_guard);
2222
2223                            *v2_pagination_counter += 1;
2224                        }
2225                        Ok(())
2226                    },
2227                )?;
2228            } else {
2229                self.post_commit_frees
2230                    .lock()
2231                    .unwrap()
2232                    .extend(system_freed_pages.lock().unwrap().drain(..));
2233            }
2234        } else {
2235            self.legacy_freed_pages
2236                .lock()
2237                .unwrap()
2238                .extend(system_freed_pages.lock().unwrap().drain(..));
2239        }
2240
2241        // TODO: remove all this code for the old free tree
2242        let mut freed_tree = self.freed_tree.lock().unwrap();
2243        if include_post_commit_free {
2244            // Move all the post-commit pages that came from the freed-tree. These need to be stored
2245            // since we can't free pages until a durable commit
2246            self.legacy_freed_pages
2247                .lock()
2248                .unwrap()
2249                .extend(self.post_commit_frees.lock().unwrap().drain(..));
2250        }
2251        while !self.legacy_freed_pages.lock().unwrap().is_empty() {
2252            let chunk_size = 100;
2253            let buffer_size = FreedPageList::required_bytes(chunk_size);
2254            let key = FreedTableKey {
2255                transaction_id: self.transaction_id.raw_id(),
2256                pagination_id: *legacy_pagination_counter,
2257            };
2258            let mut access_guard =
2259                freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
2260
2261            let mut freed_pages = self.legacy_freed_pages.lock().unwrap();
2262            let len = freed_pages.len();
2263            access_guard.as_mut().clear();
2264            for page in freed_pages.drain(len - min(len, chunk_size)..) {
2265                access_guard.as_mut().push_back(page);
2266            }
2267            drop(access_guard);
2268
2269            *legacy_pagination_counter += 1;
2270
2271            if include_post_commit_free {
2272                // Move all the post-commit pages that came from the freed-tree. These need to be stored
2273                // since we can't free pages until a durable commit
2274                freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
2275            }
2276        }
2277
2278        Ok(())
2279    }
2280
2281    /// Retrieves information about storage usage in the database
2282    pub fn stats(&self) -> Result<DatabaseStats> {
2283        let tables = self.tables.lock().unwrap();
2284        let table_tree = &tables.table_tree;
2285        let data_tree_stats = table_tree.stats()?;
2286
2287        let system_tables = self.system_tables.lock().unwrap();
2288        let system_table_tree = &system_tables.table_tree;
2289        let system_tree_stats = system_table_tree.stats()?;
2290
2291        let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
2292
2293        let total_metadata_bytes = data_tree_stats.metadata_bytes()
2294            + system_tree_stats.metadata_bytes
2295            + system_tree_stats.stored_leaf_bytes
2296            + freed_tree_stats.metadata_bytes
2297            + freed_tree_stats.stored_leaf_bytes;
2298        let total_fragmented = data_tree_stats.fragmented_bytes()
2299            + system_tree_stats.fragmented_bytes
2300            + freed_tree_stats.fragmented_bytes
2301            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
2302
2303        Ok(DatabaseStats {
2304            tree_height: data_tree_stats.tree_height(),
2305            allocated_pages: self.mem.count_allocated_pages()?,
2306            leaf_pages: data_tree_stats.leaf_pages(),
2307            branch_pages: data_tree_stats.branch_pages(),
2308            stored_leaf_bytes: data_tree_stats.stored_bytes(),
2309            metadata_bytes: total_metadata_bytes,
2310            fragmented_bytes: total_fragmented,
2311            page_size: self.mem.get_page_size(),
2312        })
2313    }
2314
2315    #[cfg(any(test, fuzzing))]
2316    pub fn num_region_tracker_pages(&self) -> u64 {
2317        if self.mem.file_format_v3() {
2318            0
2319        } else {
2320            1 << self.mem.tracker_page().page_order
2321        }
2322    }
2323
2324    #[allow(dead_code)]
2325    pub(crate) fn print_debug(&self) -> Result {
2326        // Flush any pending updates to make sure we get the latest root
2327        let mut tables = self.tables.lock().unwrap();
2328        if let Some(page) = tables
2329            .table_tree
2330            .flush_table_root_updates()
2331            .unwrap()
2332            .finalize_dirty_checksums()
2333            .unwrap()
2334        {
2335            eprintln!("Master tree:");
2336            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2337                Some(page),
2338                PageHint::None,
2339                self.transaction_guard.clone(),
2340                self.mem.clone(),
2341            )?;
2342            master_tree.print_debug(true)?;
2343        }
2344
2345        Ok(())
2346    }
2347}
2348
2349impl Drop for WriteTransaction {
2350    fn drop(&mut self) {
2351        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2352            #[allow(unused_variables)]
2353            if let Err(error) = self.abort_inner() {
2354                #[cfg(feature = "logging")]
2355                warn!("Failure automatically aborting transaction: {}", error);
2356            }
2357        } else if !self.completed && self.mem.storage_failure() {
2358            self.tables
2359                .lock()
2360                .unwrap()
2361                .table_tree
2362                .clear_root_updates_and_close();
2363        }
2364    }
2365}
2366
2367/// A read-only transaction
2368///
2369/// Read-only transactions may exist concurrently with writes
2370pub struct ReadTransaction {
2371    mem: Arc<TransactionalMemory>,
2372    tree: TableTree,
2373}
2374
2375impl ReadTransaction {
2376    pub(crate) fn new(
2377        mem: Arc<TransactionalMemory>,
2378        guard: TransactionGuard,
2379    ) -> Result<Self, TransactionError> {
2380        let root_page = mem.get_data_root();
2381        let guard = Arc::new(guard);
2382        Ok(Self {
2383            mem: mem.clone(),
2384            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2385                .map_err(TransactionError::Storage)?,
2386        })
2387    }
2388
2389    /// Open the given table
2390    pub fn open_table<K: Key + 'static, V: Value + 'static>(
2391        &self,
2392        definition: TableDefinition<K, V>,
2393    ) -> Result<ReadOnlyTable<K, V>, TableError> {
2394        let header = self
2395            .tree
2396            .get_table::<K, V>(definition.name(), TableType::Normal)?
2397            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2398
2399        match header {
2400            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2401                definition.name().to_string(),
2402                table_root,
2403                PageHint::Clean,
2404                self.tree.transaction_guard().clone(),
2405                self.mem.clone(),
2406            )?),
2407            InternalTableDefinition::Multimap { .. } => unreachable!(),
2408        }
2409    }
2410
2411    /// Open the given table without a type
2412    pub fn open_untyped_table(
2413        &self,
2414        handle: impl TableHandle,
2415    ) -> Result<ReadOnlyUntypedTable, TableError> {
2416        let header = self
2417            .tree
2418            .get_table_untyped(handle.name(), TableType::Normal)?
2419            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2420
2421        match header {
2422            InternalTableDefinition::Normal {
2423                table_root,
2424                fixed_key_size,
2425                fixed_value_size,
2426                ..
2427            } => Ok(ReadOnlyUntypedTable::new(
2428                table_root,
2429                fixed_key_size,
2430                fixed_value_size,
2431                self.mem.clone(),
2432            )),
2433            InternalTableDefinition::Multimap { .. } => unreachable!(),
2434        }
2435    }
2436
2437    /// Open the given table
2438    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2439        &self,
2440        definition: MultimapTableDefinition<K, V>,
2441    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2442        let header = self
2443            .tree
2444            .get_table::<K, V>(definition.name(), TableType::Multimap)?
2445            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2446
2447        match header {
2448            InternalTableDefinition::Normal { .. } => unreachable!(),
2449            InternalTableDefinition::Multimap {
2450                table_root,
2451                table_length,
2452                ..
2453            } => Ok(ReadOnlyMultimapTable::new(
2454                table_root,
2455                table_length,
2456                PageHint::Clean,
2457                self.tree.transaction_guard().clone(),
2458                self.mem.clone(),
2459            )?),
2460        }
2461    }
2462
2463    /// Open the given table without a type
2464    pub fn open_untyped_multimap_table(
2465        &self,
2466        handle: impl MultimapTableHandle,
2467    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2468        let header = self
2469            .tree
2470            .get_table_untyped(handle.name(), TableType::Multimap)?
2471            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2472
2473        match header {
2474            InternalTableDefinition::Normal { .. } => unreachable!(),
2475            InternalTableDefinition::Multimap {
2476                table_root,
2477                table_length,
2478                fixed_key_size,
2479                fixed_value_size,
2480                ..
2481            } => Ok(ReadOnlyUntypedMultimapTable::new(
2482                table_root,
2483                table_length,
2484                fixed_key_size,
2485                fixed_value_size,
2486                self.mem.clone(),
2487            )),
2488        }
2489    }
2490
2491    /// List all the tables
2492    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2493        self.tree
2494            .list_tables(TableType::Normal)
2495            .map(|x| x.into_iter().map(UntypedTableHandle::new))
2496    }
2497
2498    /// List all the multimap tables
2499    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2500        self.tree
2501            .list_tables(TableType::Multimap)
2502            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2503    }
2504
2505    /// Close the transaction
2506    ///
2507    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
2508    /// so this method does not normally need to be called.
2509    /// This method can be used to ensure that there are no outstanding objects remaining.
2510    ///
2511    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
2512    pub fn close(self) -> Result<(), TransactionError> {
2513        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2514            return Err(TransactionError::ReadTransactionStillInUse(self));
2515        }
2516        // No-op, just drop ourself
2517        Ok(())
2518    }
2519}
2520
2521impl Debug for ReadTransaction {
2522    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2523        f.write_str("ReadTransaction")
2524    }
2525}
2526
2527#[cfg(test)]
2528mod test {
2529    use crate::{Database, TableDefinition};
2530
2531    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2532
2533    #[test]
2534    fn transaction_id_persistence() {
2535        let tmpfile = crate::create_tempfile();
2536        let db = Database::create(tmpfile.path()).unwrap();
2537        let write_txn = db.begin_write().unwrap();
2538        {
2539            let mut table = write_txn.open_table(X).unwrap();
2540            table.insert("hello", "world").unwrap();
2541        }
2542        let first_txn_id = write_txn.transaction_id;
2543        write_txn.commit().unwrap();
2544        drop(db);
2545
2546        let db2 = Database::create(tmpfile.path()).unwrap();
2547        let write_txn = db2.begin_write().unwrap();
2548        assert!(write_txn.transaction_id > first_txn_id);
2549    }
2550}