redb/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, RawBtree, SerializedSavepoint,
5    TableTreeMut, TableType, TransactionalMemory,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::marker::PhantomData;
14use std::ops::RangeFull;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22    ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, SAVEPOINT_TABLE,
23};
24use crate::tree_store::file_backend::FileBackend;
25#[cfg(feature = "logging")]
26use log::{debug, info, warn};
27
28#[allow(clippy::len_without_is_empty)]
29/// Implements persistent storage for a database.
30pub trait StorageBackend: 'static + Debug + Send + Sync {
31    /// Gets the current length of the storage.
32    fn len(&self) -> std::result::Result<u64, io::Error>;
33
34    /// Reads the specified array of bytes from the storage.
35    ///
36    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
37    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
38
39    /// Sets the length of the storage.
40    ///
41    /// When extending the storage the new positions should be zero initialized.
42    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
43
44    /// Syncs all buffered data with the persistent storage.
45    ///
46    /// If `eventual` is true, data may become persistent at some point after this call returns,
47    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
48    /// call to `sync_data()` will become persistent before any writes that occur after.
49    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
50
51    /// Writes the specified array to the storage.
52    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
53}
54
55pub trait TableHandle: Sealed {
56    // Returns the name of the table
57    fn name(&self) -> &str;
58}
59
60#[derive(Clone)]
61pub struct UntypedTableHandle {
62    name: String,
63}
64
65impl UntypedTableHandle {
66    pub(crate) fn new(name: String) -> Self {
67        Self { name }
68    }
69}
70
71impl TableHandle for UntypedTableHandle {
72    fn name(&self) -> &str {
73        &self.name
74    }
75}
76
77impl Sealed for UntypedTableHandle {}
78
79pub trait MultimapTableHandle: Sealed {
80    // Returns the name of the multimap table
81    fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedMultimapTableHandle {
86    name: String,
87}
88
89impl UntypedMultimapTableHandle {
90    pub(crate) fn new(name: String) -> Self {
91        Self { name }
92    }
93}
94
95impl MultimapTableHandle for UntypedMultimapTableHandle {
96    fn name(&self) -> &str {
97        &self.name
98    }
99}
100
101impl Sealed for UntypedMultimapTableHandle {}
102
103/// Defines the name and types of a table
104///
105/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
106///
107/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
108/// that is stored or retreived from the table
109pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
110    name: &'a str,
111    _key_type: PhantomData<K>,
112    _value_type: PhantomData<V>,
113}
114
115impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
116    /// Construct a new table with given `name`
117    ///
118    /// ## Invariant
119    ///
120    /// `name` must not be empty.
121    pub const fn new(name: &'a str) -> Self {
122        assert!(!name.is_empty());
123        Self {
124            name,
125            _key_type: PhantomData,
126            _value_type: PhantomData,
127        }
128    }
129}
130
131impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
132    fn name(&self) -> &str {
133        self.name
134    }
135}
136
137impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
138
139impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
140    fn clone(&self) -> Self {
141        *self
142    }
143}
144
145impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
146
147impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        write!(
150            f,
151            "{}<{}, {}>",
152            self.name,
153            K::type_name().name(),
154            V::type_name().name()
155        )
156    }
157}
158
159/// Defines the name and types of a multimap table
160///
161/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
162///
163/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
164///
165/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
166/// that is stored or retreived from the table
167pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
168    name: &'a str,
169    _key_type: PhantomData<K>,
170    _value_type: PhantomData<V>,
171}
172
173impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
174    pub const fn new(name: &'a str) -> Self {
175        assert!(!name.is_empty());
176        Self {
177            name,
178            _key_type: PhantomData,
179            _value_type: PhantomData,
180        }
181    }
182}
183
184impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
185    fn name(&self) -> &str {
186        self.name
187    }
188}
189
190impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
191
192impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
193    fn clone(&self) -> Self {
194        *self
195    }
196}
197
198impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
199
200impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
201    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202        write!(
203            f,
204            "{}<{}, {}>",
205            self.name,
206            K::type_name().name(),
207            V::type_name().name()
208        )
209    }
210}
211
212/// Information regarding the usage of the in-memory cache
213///
214/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
215#[derive(Debug)]
216pub struct CacheStats {
217    pub(crate) evictions: u64,
218}
219
220impl CacheStats {
221    /// Number of times that data has been evicted, due to the cache being full
222    ///
223    /// To increase the cache size use [`Builder::set_cache_size`]
224    pub fn evictions(&self) -> u64 {
225        self.evictions
226    }
227}
228
229pub(crate) struct TransactionGuard {
230    transaction_tracker: Option<Arc<TransactionTracker>>,
231    transaction_id: Option<TransactionId>,
232    write_transaction: bool,
233}
234
235impl TransactionGuard {
236    pub(crate) fn new_read(
237        transaction_id: TransactionId,
238        tracker: Arc<TransactionTracker>,
239    ) -> Self {
240        Self {
241            transaction_tracker: Some(tracker),
242            transaction_id: Some(transaction_id),
243            write_transaction: false,
244        }
245    }
246
247    pub(crate) fn new_write(
248        transaction_id: TransactionId,
249        tracker: Arc<TransactionTracker>,
250    ) -> Self {
251        Self {
252            transaction_tracker: Some(tracker),
253            transaction_id: Some(transaction_id),
254            write_transaction: true,
255        }
256    }
257
258    // TODO: remove this hack
259    pub(crate) fn fake() -> Self {
260        Self {
261            transaction_tracker: None,
262            transaction_id: None,
263            write_transaction: false,
264        }
265    }
266
267    pub(crate) fn id(&self) -> TransactionId {
268        self.transaction_id.unwrap()
269    }
270
271    pub(crate) fn leak(mut self) -> TransactionId {
272        self.transaction_id.take().unwrap()
273    }
274}
275
276impl Drop for TransactionGuard {
277    fn drop(&mut self) {
278        if self.transaction_tracker.is_none() {
279            return;
280        }
281        if let Some(transaction_id) = self.transaction_id {
282            if self.write_transaction {
283                self.transaction_tracker
284                    .as_ref()
285                    .unwrap()
286                    .end_write_transaction(transaction_id);
287            } else {
288                self.transaction_tracker
289                    .as_ref()
290                    .unwrap()
291                    .deallocate_read_transaction(transaction_id);
292            }
293        }
294    }
295}
296
297/// Opened redb database file
298///
299/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
300/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
301///
302/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
303/// may be in progress at a time.
304///
305/// # Examples
306///
307/// Basic usage:
308///
309/// ```rust
310/// use redb::*;
311/// # use tempfile::NamedTempFile;
312/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
313///
314/// # fn main() -> Result<(), Error> {
315/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
316/// # let filename = tmpfile.path();
317/// let db = Database::create(filename)?;
318/// let write_txn = db.begin_write()?;
319/// {
320///     let mut table = write_txn.open_table(TABLE)?;
321///     table.insert(&0, &0)?;
322/// }
323/// write_txn.commit()?;
324/// # Ok(())
325/// # }
326/// ```
327pub struct Database {
328    mem: Arc<TransactionalMemory>,
329    transaction_tracker: Arc<TransactionTracker>,
330}
331
332impl Database {
333    /// Opens the specified file as a redb database.
334    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
335    /// * if the file is a valid redb database, it will be opened
336    /// * otherwise this function will return an error
337    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
338        Self::builder().create(path)
339    }
340
341    /// Opens an existing redb database.
342    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
343        Self::builder().open(path)
344    }
345
346    /// Information regarding the usage of the in-memory cache
347    ///
348    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
349    pub fn cache_stats(&self) -> CacheStats {
350        self.mem.cache_stats()
351    }
352
353    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
354        self.mem.clone()
355    }
356
357    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
358        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
359        let table_tree = TableTreeMut::new(
360            mem.get_data_root(),
361            Arc::new(TransactionGuard::fake()),
362            mem.clone(),
363            fake_freed_pages.clone(),
364        );
365        if !table_tree.verify_checksums()? {
366            return Ok(false);
367        }
368        let system_table_tree = TableTreeMut::new(
369            mem.get_system_root(),
370            Arc::new(TransactionGuard::fake()),
371            mem.clone(),
372            fake_freed_pages.clone(),
373        );
374        if !system_table_tree.verify_checksums()? {
375            return Ok(false);
376        }
377        assert!(fake_freed_pages.lock().unwrap().is_empty());
378
379        if let Some(header) = mem.get_freed_root() {
380            if !RawBtree::new(
381                Some(header),
382                FreedTableKey::fixed_width(),
383                FreedPageList::fixed_width(),
384                mem.clone(),
385            )
386            .verify_checksum()?
387            {
388                return Ok(false);
389            }
390        }
391
392        Ok(true)
393    }
394
395    /// Force a check of the integrity of the database file, and repair it if possible.
396    ///
397    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
398    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
399    /// quite slow and should only be used when you suspect the database file may have been modified
400    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
401    ///
402    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
403    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
404    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
405        let allocator_hash = self.mem.allocator_hash();
406        let mut was_clean = Arc::get_mut(&mut self.mem)
407            .unwrap()
408            .clear_cache_and_reload()?;
409
410        let old_roots = [
411            self.mem.get_data_root(),
412            self.mem.get_system_root(),
413            self.mem.get_freed_root(),
414        ];
415
416        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
417            DatabaseError::Storage(storage_err) => storage_err,
418            _ => unreachable!(),
419        })?;
420
421        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
422            was_clean = false;
423        }
424
425        if !was_clean {
426            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
427            let [data_root, system_root, freed_root] = new_roots;
428            self.mem.commit(
429                data_root,
430                system_root,
431                freed_root,
432                next_transaction_id,
433                false,
434                true,
435            )?;
436        }
437
438        self.mem.begin_writable()?;
439
440        Ok(was_clean)
441    }
442
443    /// Compacts the database file
444    ///
445    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
446    pub fn compact(&mut self) -> Result<bool, CompactionError> {
447        if self
448            .transaction_tracker
449            .oldest_live_read_transaction()
450            .is_some()
451        {
452            return Err(CompactionError::TransactionInProgress);
453        }
454        // Commit to free up any pending free pages
455        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
456        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
457        // can cancel the compaction without requiring a full repair afterwards
458        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
459        if txn.list_persistent_savepoints()?.next().is_some() {
460            return Err(CompactionError::PersistentSavepointExists);
461        }
462        if self.transaction_tracker.any_savepoint_exists() {
463            return Err(CompactionError::EphemeralSavepointExists);
464        }
465        txn.set_two_phase_commit(true);
466        txn.commit().map_err(|e| e.into_storage_error())?;
467        // Repeat, just in case executing list_persistent_savepoints() created a new table
468        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
469        txn.set_two_phase_commit(true);
470        txn.commit().map_err(|e| e.into_storage_error())?;
471        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
472        // should have been cleared out by the above commit()
473        assert!(self.mem.get_freed_root().is_none());
474
475        let mut compacted = false;
476        // Iteratively compact until no progress is made
477        loop {
478            let mut progress = false;
479
480            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
481            if txn.compact_pages()? {
482                progress = true;
483                txn.commit().map_err(|e| e.into_storage_error())?;
484            } else {
485                txn.abort()?;
486            }
487
488            // Double commit to free up the relocated pages for reuse
489            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
490            txn.set_two_phase_commit(true);
491            txn.commit().map_err(|e| e.into_storage_error())?;
492            assert!(self.mem.get_freed_root().is_none());
493
494            if !progress {
495                break;
496            }
497
498            compacted = true;
499        }
500
501        Ok(compacted)
502    }
503
504    fn check_repaired_persistent_savepoints(
505        system_root: Option<BtreeHeader>,
506        mem: Arc<TransactionalMemory>,
507    ) -> Result {
508        let freed_list = Arc::new(Mutex::new(vec![]));
509        let table_tree = TableTreeMut::new(
510            system_root,
511            Arc::new(TransactionGuard::fake()),
512            mem.clone(),
513            freed_list,
514        );
515        let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
516        if let Some(savepoint_table_def) = table_tree
517            .get_table::<SavepointId, SerializedSavepoint>(
518                SAVEPOINT_TABLE.name(),
519                TableType::Normal,
520            )
521            .map_err(|e| {
522                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
523            })?
524        {
525            let savepoint_table_root =
526                if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
527                    table_root
528                } else {
529                    unreachable!()
530                };
531            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
532                ReadOnlyTable::new(
533                    "internal savepoint table".to_string(),
534                    savepoint_table_root,
535                    PageHint::None,
536                    Arc::new(TransactionGuard::fake()),
537                    mem.clone(),
538                )?;
539            for result in savepoint_table.range::<SavepointId>(..)? {
540                let (_, savepoint_data) = result?;
541                let savepoint = savepoint_data
542                    .value()
543                    .to_savepoint(fake_transaction_tracker.clone());
544                if let Some(header) = savepoint.get_user_root() {
545                    Self::check_pages_allocated_recursive(header.root, mem.clone())?;
546                }
547            }
548        }
549
550        Ok(())
551    }
552
553    fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
554        if let Some(header) = freed_root {
555            let freed_pages_iter = AllPageNumbersBtreeIter::new(
556                header.root,
557                FreedTableKey::fixed_width(),
558                FreedPageList::fixed_width(),
559                mem.clone(),
560            )?;
561            for page in freed_pages_iter {
562                mem.mark_page_allocated(page?);
563            }
564        }
565
566        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
567            "internal freed table".to_string(),
568            freed_root,
569            PageHint::None,
570            Arc::new(TransactionGuard::fake()),
571            mem.clone(),
572        )?;
573        for result in freed_table.range::<FreedTableKey>(..)? {
574            let (_, freed_page_list) = result?;
575            for i in 0..freed_page_list.value().len() {
576                mem.mark_page_allocated(freed_page_list.value().get(i));
577            }
578        }
579
580        Ok(())
581    }
582
583    fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
584        // Repair the allocator state
585        // All pages in the master table
586        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
587        for result in master_pages_iter {
588            let page = result?;
589            assert!(mem.is_allocated(page));
590        }
591
592        // Iterate over all other tables
593        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
594            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
595
596        // Chain all the other tables to the master table iter
597        for entry in iter {
598            let definition = entry?.value();
599            definition.visit_all_pages(mem.clone(), |path| {
600                assert!(mem.is_allocated(path.page_number()));
601                Ok(())
602            })?;
603        }
604
605        Ok(())
606    }
607
608    fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
609        // Repair the allocator state
610        // All pages in the master table
611        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
612        for page in master_pages_iter {
613            mem.mark_page_allocated(page?);
614        }
615
616        // Iterate over all other tables
617        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
618            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
619
620        // Chain all the other tables to the master table iter
621        for entry in iter {
622            let definition = entry?.value();
623            definition.visit_all_pages(mem.clone(), |path| {
624                mem.mark_page_allocated(path.page_number());
625                Ok(())
626            })?;
627        }
628
629        Ok(())
630    }
631
632    fn do_repair(
633        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
634        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
635    ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
636        if !Self::verify_primary_checksums(mem.clone())? {
637            if mem.used_two_phase_commit() {
638                return Err(DatabaseError::Storage(StorageError::Corrupted(
639                    "Primary is corrupted despite 2-phase commit".to_string(),
640                )));
641            }
642
643            // 0.3 because the repair takes 3 full scans and the first is done now
644            let mut handle = RepairSession::new(0.3);
645            repair_callback(&mut handle);
646            if handle.aborted() {
647                return Err(DatabaseError::RepairAborted);
648            }
649
650            mem.repair_primary_corrupted();
651            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
652            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
653            // that rolls back a partially committed transaction.
654            mem.clear_read_cache();
655            if !Self::verify_primary_checksums(mem.clone())? {
656                return Err(DatabaseError::Storage(StorageError::Corrupted(
657                    "Failed to repair database. All roots are corrupted".to_string(),
658                )));
659            }
660        }
661        // 0.6 because the repair takes 3 full scans and the second is done now
662        let mut handle = RepairSession::new(0.6);
663        repair_callback(&mut handle);
664        if handle.aborted() {
665            return Err(DatabaseError::RepairAborted);
666        }
667
668        mem.begin_repair()?;
669
670        let data_root = mem.get_data_root();
671        if let Some(header) = data_root {
672            Self::mark_tables_recursive(header.root, mem.clone())?;
673        }
674
675        let freed_root = mem.get_freed_root();
676        // Allow processing of all transactions, since this is the main freed tree
677        Self::mark_freed_tree(freed_root, mem.clone())?;
678        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
679            "internal freed table".to_string(),
680            freed_root,
681            PageHint::None,
682            Arc::new(TransactionGuard::fake()),
683            mem.clone(),
684        )?;
685        drop(freed_table);
686
687        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
688        let mut handle = RepairSession::new(0.9);
689        repair_callback(&mut handle);
690        if handle.aborted() {
691            return Err(DatabaseError::RepairAborted);
692        }
693
694        let system_root = mem.get_system_root();
695        if let Some(header) = system_root {
696            Self::mark_tables_recursive(header.root, mem.clone())?;
697        }
698        #[cfg(debug_assertions)]
699        {
700            // Savepoints only reference pages from a previous commit, so those are either referenced
701            // in the current root, or are in the freed tree
702            Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
703        }
704
705        mem.end_repair()?;
706
707        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
708        // by storing an empty root during the below commit()
709        mem.clear_read_cache();
710
711        Ok([data_root, system_root, freed_root])
712    }
713
714    fn new(
715        file: Box<dyn StorageBackend>,
716        allow_initialize: bool,
717        page_size: usize,
718        region_size: Option<u64>,
719        read_cache_size_bytes: usize,
720        write_cache_size_bytes: usize,
721        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
722    ) -> Result<Self, DatabaseError> {
723        #[cfg(feature = "logging")]
724        let file_path = format!("{:?}", &file);
725        #[cfg(feature = "logging")]
726        info!("Opening database {:?}", &file_path);
727        let mem = TransactionalMemory::new(
728            file,
729            allow_initialize,
730            page_size,
731            region_size,
732            read_cache_size_bytes,
733            write_cache_size_bytes,
734        )?;
735        let mut mem = Arc::new(mem);
736        if mem.needs_repair()? {
737            // If the last transaction used 2-phase commit and updated the allocator state table, then
738            // we can just load the allocator state from there. Otherwise, we need a full repair
739            if let Some(tree) = Self::get_allocator_state_table(&mem)? {
740                #[cfg(feature = "logging")]
741                info!("Found valid allocator state, full repair not needed");
742                mem.load_allocator_state(&tree)?;
743            } else {
744                #[cfg(feature = "logging")]
745                warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
746                let mut handle = RepairSession::new(0.0);
747                repair_callback(&mut handle);
748                if handle.aborted() {
749                    return Err(DatabaseError::RepairAborted);
750                }
751                let [data_root, system_root, freed_root] =
752                    Self::do_repair(&mut mem, repair_callback)?;
753                let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
754                mem.commit(
755                    data_root,
756                    system_root,
757                    freed_root,
758                    next_transaction_id,
759                    false,
760                    true,
761                )?;
762            }
763        }
764
765        mem.begin_writable()?;
766        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
767
768        let db = Database {
769            mem,
770            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
771        };
772
773        // Restore the tracker state for any persistent savepoints
774        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
775        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
776            db.transaction_tracker
777                .restore_savepoint_counter_state(next_id);
778        }
779        for id in txn.list_persistent_savepoints()? {
780            let savepoint = match txn.get_persistent_savepoint(id) {
781                Ok(savepoint) => savepoint,
782                Err(err) => match err {
783                    SavepointError::InvalidSavepoint => unreachable!(),
784                    SavepointError::Storage(storage) => {
785                        return Err(storage.into());
786                    }
787                },
788            };
789            db.transaction_tracker
790                .register_persistent_savepoint(&savepoint);
791        }
792        txn.abort()?;
793
794        Ok(db)
795    }
796
797    fn get_allocator_state_table(
798        mem: &Arc<TransactionalMemory>,
799    ) -> Result<Option<AllocatorStateTree>> {
800        // The allocator state table is only valid if the primary was written using 2-phase commit
801        if !mem.used_two_phase_commit() {
802            return Ok(None);
803        }
804
805        // See if it's present in the system table tree
806        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
807        let system_table_tree = TableTreeMut::new(
808            mem.get_system_root(),
809            Arc::new(TransactionGuard::fake()),
810            mem.clone(),
811            fake_freed_pages.clone(),
812        );
813        let Some(allocator_state_table) = system_table_tree
814            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
815            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
816        else {
817            return Ok(None);
818        };
819
820        // Load the allocator state table
821        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
822            unreachable!();
823        };
824        let tree = AllocatorStateTree::new(
825            table_root,
826            Arc::new(TransactionGuard::fake()),
827            mem.clone(),
828            fake_freed_pages,
829        );
830
831        // Make sure this isn't stale allocator state left over from a previous transaction
832        if !mem.is_valid_allocator_state(&tree)? {
833            return Ok(None);
834        }
835
836        Ok(Some(tree))
837    }
838
839    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
840        let id = self
841            .transaction_tracker
842            .register_read_transaction(&self.mem)?;
843
844        Ok(TransactionGuard::new_read(
845            id,
846            self.transaction_tracker.clone(),
847        ))
848    }
849
850    /// Convenience method for [`Builder::new`]
851    pub fn builder() -> Builder {
852        Builder::new()
853    }
854
855    /// Begins a write transaction
856    ///
857    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
858    /// write may be in progress at a time. If a write is in progress, this function will block
859    /// until it completes.
860    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
861        // Fail early if there has been an I/O error -- nothing can be committed in that case
862        self.mem.check_io_errors()?;
863        let guard = TransactionGuard::new_write(
864            self.transaction_tracker.start_write_transaction(),
865            self.transaction_tracker.clone(),
866        );
867        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
868            .map_err(|e| e.into())
869    }
870
871    /// Begins a read transaction
872    ///
873    /// Captures a snapshot of the database, so that only data committed before calling this method
874    /// is visible in the transaction
875    ///
876    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
877    /// may exist concurrently with writes
878    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
879        let guard = self.allocate_read_transaction()?;
880        #[cfg(feature = "logging")]
881        debug!("Beginning read transaction id={:?}", guard.id());
882        ReadTransaction::new(self.get_memory(), guard)
883    }
884
885    fn ensure_allocator_state_table(&self) -> Result<(), Error> {
886        // If the allocator state table is already up to date, we're done
887        if Self::get_allocator_state_table(&self.mem)?.is_some() {
888            return Ok(());
889        }
890
891        // Make a new quick-repair commit to update the allocator state table
892        #[cfg(feature = "logging")]
893        debug!("Writing allocator state table");
894        let mut tx = self.begin_write()?;
895        tx.set_quick_repair(true);
896        tx.commit()?;
897
898        Ok(())
899    }
900}
901
902impl Drop for Database {
903    fn drop(&mut self) {
904        if thread::panicking() {
905            return;
906        }
907
908        if self.ensure_allocator_state_table().is_err() {
909            #[cfg(feature = "logging")]
910            warn!("Failed to write allocator state table. Repair may be required at restart.")
911        }
912    }
913}
914
915pub struct RepairSession {
916    progress: f64,
917    aborted: bool,
918}
919
920impl RepairSession {
921    pub(crate) fn new(progress: f64) -> Self {
922        Self {
923            progress,
924            aborted: false,
925        }
926    }
927
928    pub(crate) fn aborted(&self) -> bool {
929        self.aborted
930    }
931
932    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
933    pub fn abort(&mut self) {
934        self.aborted = true;
935    }
936
937    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
938    pub fn progress(&self) -> f64 {
939        self.progress
940    }
941}
942
943/// Configuration builder of a redb [Database].
944pub struct Builder {
945    page_size: usize,
946    region_size: Option<u64>,
947    read_cache_size_bytes: usize,
948    write_cache_size_bytes: usize,
949    repair_callback: Box<dyn Fn(&mut RepairSession)>,
950}
951
952impl Builder {
953    /// Construct a new [Builder] with sensible defaults.
954    ///
955    /// ## Defaults
956    ///
957    /// - `cache_size_bytes`: 1GiB
958    #[allow(clippy::new_without_default)]
959    pub fn new() -> Self {
960        let mut result = Self {
961            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
962            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
963            // It is part of the file format, so can be enabled in the future.
964            page_size: PAGE_SIZE,
965            region_size: None,
966            // TODO: Default should probably take into account the total system memory
967            read_cache_size_bytes: 0,
968            // TODO: Default should probably take into account the total system memory
969            write_cache_size_bytes: 0,
970            repair_callback: Box::new(|_| {}),
971        };
972
973        result.set_cache_size(1024 * 1024 * 1024);
974        result
975    }
976
977    /// Set a callback which will be invoked periodically in the event that the database file needs
978    /// to be repaired.
979    ///
980    /// The [`RepairSession`] argument can be used to control the repair process.
981    ///
982    /// If the database file needs repair, the callback will be invoked at least once.
983    /// There is no upper limit on the number of times it may be called.
984    pub fn set_repair_callback(
985        &mut self,
986        callback: impl Fn(&mut RepairSession) + 'static,
987    ) -> &mut Self {
988        self.repair_callback = Box::new(callback);
989        self
990    }
991
992    /// Set the internal page size of the database
993    ///
994    /// Valid values are powers of two, greater than or equal to 512
995    ///
996    /// ## Defaults
997    ///
998    /// Default to 4 Kib pages.
999    #[cfg(any(fuzzing, test))]
1000    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1001        assert!(size.is_power_of_two());
1002        self.page_size = std::cmp::max(size, 512);
1003        self
1004    }
1005
1006    /// Set the amount of memory (in bytes) used for caching data
1007    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1008        // TODO: allow dynamic expansion of the read/write cache
1009        self.read_cache_size_bytes = bytes / 10 * 9;
1010        self.write_cache_size_bytes = bytes / 10;
1011        self
1012    }
1013
1014    #[cfg(any(test, fuzzing))]
1015    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1016        assert!(size.is_power_of_two());
1017        self.region_size = Some(size);
1018        self
1019    }
1020
1021    /// Opens the specified file as a redb database.
1022    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1023    /// * if the file is a valid redb database, it will be opened
1024    /// * otherwise this function will return an error
1025    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1026        let file = OpenOptions::new()
1027            .read(true)
1028            .write(true)
1029            .create(true)
1030            .truncate(false)
1031            .open(path)?;
1032
1033        Database::new(
1034            Box::new(FileBackend::new(file)?),
1035            true,
1036            self.page_size,
1037            self.region_size,
1038            self.read_cache_size_bytes,
1039            self.write_cache_size_bytes,
1040            &self.repair_callback,
1041        )
1042    }
1043
1044    /// Opens an existing redb database.
1045    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1046        let file = OpenOptions::new().read(true).write(true).open(path)?;
1047
1048        Database::new(
1049            Box::new(FileBackend::new(file)?),
1050            false,
1051            self.page_size,
1052            None,
1053            self.read_cache_size_bytes,
1054            self.write_cache_size_bytes,
1055            &self.repair_callback,
1056        )
1057    }
1058
1059    /// Open an existing or create a new database in the given `file`.
1060    ///
1061    /// The file must be empty or contain a valid database.
1062    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1063        Database::new(
1064            Box::new(FileBackend::new(file)?),
1065            true,
1066            self.page_size,
1067            self.region_size,
1068            self.read_cache_size_bytes,
1069            self.write_cache_size_bytes,
1070            &self.repair_callback,
1071        )
1072    }
1073
1074    /// Open an existing or create a new database with the given backend.
1075    pub fn create_with_backend(
1076        &self,
1077        backend: impl StorageBackend,
1078    ) -> Result<Database, DatabaseError> {
1079        Database::new(
1080            Box::new(backend),
1081            true,
1082            self.page_size,
1083            self.region_size,
1084            self.read_cache_size_bytes,
1085            self.write_cache_size_bytes,
1086            &self.repair_callback,
1087        )
1088    }
1089}
1090
1091impl std::fmt::Debug for Database {
1092    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1093        f.debug_struct("Database").finish()
1094    }
1095}
1096
1097#[cfg(test)]
1098mod test {
1099    use crate::backends::FileBackend;
1100    use crate::{
1101        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1102        StorageError, TableDefinition, TransactionError,
1103    };
1104    use std::fs::File;
1105    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1106    use std::sync::Arc;
1107    use std::sync::atomic::{AtomicU64, Ordering};
1108
1109    #[derive(Debug)]
1110    struct FailingBackend {
1111        inner: FileBackend,
1112        countdown: Arc<AtomicU64>,
1113    }
1114
1115    impl FailingBackend {
1116        fn new(backend: FileBackend, countdown: u64) -> Self {
1117            Self {
1118                inner: backend,
1119                countdown: Arc::new(AtomicU64::new(countdown)),
1120            }
1121        }
1122
1123        fn check_countdown(&self) -> Result<(), std::io::Error> {
1124            if self.countdown.load(Ordering::SeqCst) == 0 {
1125                return Err(std::io::Error::from(ErrorKind::Other));
1126            }
1127
1128            Ok(())
1129        }
1130
1131        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1132            if self
1133                .countdown
1134                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1135                    if x > 0 { Some(x - 1) } else { None }
1136                })
1137                .is_err()
1138            {
1139                return Err(std::io::Error::from(ErrorKind::Other));
1140            }
1141
1142            Ok(())
1143        }
1144    }
1145
1146    impl StorageBackend for FailingBackend {
1147        fn len(&self) -> Result<u64, std::io::Error> {
1148            self.inner.len()
1149        }
1150
1151        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1152            self.check_countdown()?;
1153            self.inner.read(offset, len)
1154        }
1155
1156        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1157            self.inner.set_len(len)
1158        }
1159
1160        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1161            self.check_countdown()?;
1162            self.inner.sync_data(eventual)
1163        }
1164
1165        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1166            self.decrement_countdown()?;
1167            self.inner.write(offset, data)
1168        }
1169    }
1170
1171    #[test]
1172    fn crash_regression4() {
1173        let tmpfile = crate::create_tempfile();
1174        let (file, path) = tmpfile.into_parts();
1175
1176        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1177        let db = Database::builder()
1178            .set_cache_size(12686)
1179            .set_page_size(8 * 1024)
1180            .set_region_size(32 * 4096)
1181            .create_with_backend(backend)
1182            .unwrap();
1183
1184        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1185
1186        let tx = db.begin_write().unwrap();
1187        let _savepoint = tx.ephemeral_savepoint().unwrap();
1188        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1189        tx.commit().unwrap();
1190        let tx = db.begin_write().unwrap();
1191        {
1192            let mut table = tx.open_table(table_def).unwrap();
1193            let _ = table.insert_reserve(118821, 360).unwrap();
1194        }
1195        let result = tx.commit();
1196        assert!(result.is_err());
1197
1198        drop(db);
1199        Database::builder()
1200            .set_cache_size(1024 * 1024)
1201            .set_page_size(8 * 1024)
1202            .set_region_size(32 * 4096)
1203            .create(&path)
1204            .unwrap();
1205    }
1206
1207    #[test]
1208    fn transient_io_error() {
1209        let tmpfile = crate::create_tempfile();
1210        let (file, path) = tmpfile.into_parts();
1211
1212        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1213        let countdown = backend.countdown.clone();
1214        let db = Database::builder()
1215            .set_cache_size(0)
1216            .create_with_backend(backend)
1217            .unwrap();
1218
1219        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1220
1221        // Create some garbage
1222        let tx = db.begin_write().unwrap();
1223        {
1224            let mut table = tx.open_table(table_def).unwrap();
1225            table.insert(0, 0).unwrap();
1226        }
1227        tx.commit().unwrap();
1228        let tx = db.begin_write().unwrap();
1229        {
1230            let mut table = tx.open_table(table_def).unwrap();
1231            table.insert(0, 1).unwrap();
1232        }
1233        tx.commit().unwrap();
1234
1235        let tx = db.begin_write().unwrap();
1236        // Cause an error in the commit
1237        countdown.store(0, Ordering::SeqCst);
1238        let result = tx.commit().err().unwrap();
1239        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1240        let result = db.begin_write().err().unwrap();
1241        assert!(matches!(
1242            result,
1243            TransactionError::Storage(StorageError::PreviousIo)
1244        ));
1245        // Simulate a transient error
1246        countdown.store(u64::MAX, Ordering::SeqCst);
1247        drop(db);
1248
1249        // Check that recovery flag is set, even though the error has "cleared"
1250        let mut file = File::open(&path).unwrap();
1251        file.seek(SeekFrom::Start(9)).unwrap();
1252        let mut god_byte = vec![0u8];
1253        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1254        assert_ne!(god_byte[0] & 2, 0);
1255    }
1256
1257    #[test]
1258    fn small_pages() {
1259        let tmpfile = crate::create_tempfile();
1260
1261        let db = Database::builder()
1262            .set_page_size(512)
1263            .create(tmpfile.path())
1264            .unwrap();
1265
1266        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1267        let txn = db.begin_write().unwrap();
1268        {
1269            txn.open_table(table_definition).unwrap();
1270        }
1271        txn.commit().unwrap();
1272    }
1273
1274    #[test]
1275    fn small_pages2() {
1276        let tmpfile = crate::create_tempfile();
1277
1278        let db = Database::builder()
1279            .set_page_size(512)
1280            .create(tmpfile.path())
1281            .unwrap();
1282
1283        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1284
1285        let mut tx = db.begin_write().unwrap();
1286        tx.set_two_phase_commit(true);
1287        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1288        {
1289            tx.open_table(table_def).unwrap();
1290        }
1291        tx.commit().unwrap();
1292
1293        let mut tx = db.begin_write().unwrap();
1294        tx.set_two_phase_commit(true);
1295        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1296        tx.restore_savepoint(&savepoint0).unwrap();
1297        tx.set_durability(Durability::None);
1298        {
1299            let mut t = tx.open_table(table_def).unwrap();
1300            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1301            assert!(t.remove(&291295).unwrap().is_none());
1302        }
1303        tx.commit().unwrap();
1304
1305        let mut tx = db.begin_write().unwrap();
1306        tx.set_two_phase_commit(true);
1307        tx.restore_savepoint(&savepoint0).unwrap();
1308        {
1309            tx.open_table(table_def).unwrap();
1310        }
1311        tx.commit().unwrap();
1312
1313        let mut tx = db.begin_write().unwrap();
1314        tx.set_two_phase_commit(true);
1315        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1316        drop(savepoint0);
1317        tx.restore_savepoint(&savepoint2).unwrap();
1318        {
1319            let mut t = tx.open_table(table_def).unwrap();
1320            assert!(t.get(&2059).unwrap().is_none());
1321            assert!(t.remove(&145227).unwrap().is_none());
1322            assert!(t.remove(&145227).unwrap().is_none());
1323        }
1324        tx.commit().unwrap();
1325
1326        let mut tx = db.begin_write().unwrap();
1327        tx.set_two_phase_commit(true);
1328        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1329        drop(savepoint1);
1330        tx.restore_savepoint(&savepoint3).unwrap();
1331        {
1332            tx.open_table(table_def).unwrap();
1333        }
1334        tx.commit().unwrap();
1335
1336        let mut tx = db.begin_write().unwrap();
1337        tx.set_two_phase_commit(true);
1338        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1339        drop(savepoint2);
1340        tx.restore_savepoint(&savepoint3).unwrap();
1341        tx.set_durability(Durability::None);
1342        {
1343            let mut t = tx.open_table(table_def).unwrap();
1344            assert!(t.remove(&207936).unwrap().is_none());
1345        }
1346        tx.abort().unwrap();
1347
1348        let mut tx = db.begin_write().unwrap();
1349        tx.set_two_phase_commit(true);
1350        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1351        drop(savepoint3);
1352        assert!(tx.restore_savepoint(&savepoint4).is_err());
1353        {
1354            tx.open_table(table_def).unwrap();
1355        }
1356        tx.commit().unwrap();
1357
1358        let mut tx = db.begin_write().unwrap();
1359        tx.set_two_phase_commit(true);
1360        tx.restore_savepoint(&savepoint5).unwrap();
1361        tx.set_durability(Durability::None);
1362        {
1363            tx.open_table(table_def).unwrap();
1364        }
1365        tx.commit().unwrap();
1366    }
1367
1368    #[test]
1369    fn small_pages3() {
1370        let tmpfile = crate::create_tempfile();
1371
1372        let db = Database::builder()
1373            .set_page_size(1024)
1374            .create(tmpfile.path())
1375            .unwrap();
1376
1377        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1378
1379        let mut tx = db.begin_write().unwrap();
1380        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1381        tx.set_durability(Durability::None);
1382        {
1383            let mut t = tx.open_table(table_def).unwrap();
1384            let value = vec![0; 306];
1385            t.insert(&539717, value.as_slice()).unwrap();
1386        }
1387        tx.abort().unwrap();
1388
1389        let mut tx = db.begin_write().unwrap();
1390        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1391        tx.restore_savepoint(&savepoint1).unwrap();
1392        tx.set_durability(Durability::None);
1393        {
1394            let mut t = tx.open_table(table_def).unwrap();
1395            let value = vec![0; 2008];
1396            t.insert(&784384, value.as_slice()).unwrap();
1397        }
1398        tx.abort().unwrap();
1399    }
1400
1401    #[test]
1402    fn small_pages4() {
1403        let tmpfile = crate::create_tempfile();
1404
1405        let db = Database::builder()
1406            .set_cache_size(1024 * 1024)
1407            .set_page_size(1024)
1408            .create(tmpfile.path())
1409            .unwrap();
1410
1411        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1412
1413        let tx = db.begin_write().unwrap();
1414        {
1415            tx.open_table(table_def).unwrap();
1416        }
1417        tx.commit().unwrap();
1418
1419        let tx = db.begin_write().unwrap();
1420        {
1421            let mut t = tx.open_table(table_def).unwrap();
1422            assert!(t.get(&131072).unwrap().is_none());
1423            let value = vec![0xFF; 1130];
1424            t.insert(&42394, value.as_slice()).unwrap();
1425            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1426            assert!(t.get(&0).unwrap().is_none());
1427        }
1428        tx.abort().unwrap();
1429
1430        let tx = db.begin_write().unwrap();
1431        {
1432            let mut t = tx.open_table(table_def).unwrap();
1433            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1434        }
1435        tx.abort().unwrap();
1436    }
1437
1438    #[test]
1439    fn dynamic_shrink() {
1440        let tmpfile = crate::create_tempfile();
1441        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1442        let big_value = vec![0u8; 1024];
1443
1444        let db = Database::builder()
1445            .set_region_size(1024 * 1024)
1446            .create(tmpfile.path())
1447            .unwrap();
1448
1449        let txn = db.begin_write().unwrap();
1450        {
1451            let mut table = txn.open_table(table_definition).unwrap();
1452            for i in 0..2048 {
1453                table.insert(&i, big_value.as_slice()).unwrap();
1454            }
1455        }
1456        txn.commit().unwrap();
1457
1458        let file_size = tmpfile.as_file().metadata().unwrap().len();
1459
1460        let txn = db.begin_write().unwrap();
1461        {
1462            let mut table = txn.open_table(table_definition).unwrap();
1463            for i in 0..2048 {
1464                table.remove(&i).unwrap();
1465            }
1466        }
1467        txn.commit().unwrap();
1468
1469        // Perform a couple more commits to be sure the database has a chance to compact
1470        let txn = db.begin_write().unwrap();
1471        {
1472            let mut table = txn.open_table(table_definition).unwrap();
1473            table.insert(0, [].as_slice()).unwrap();
1474        }
1475        txn.commit().unwrap();
1476        let txn = db.begin_write().unwrap();
1477        {
1478            let mut table = txn.open_table(table_definition).unwrap();
1479            table.remove(0).unwrap();
1480        }
1481        txn.commit().unwrap();
1482        let txn = db.begin_write().unwrap();
1483        txn.commit().unwrap();
1484
1485        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1486        assert!(final_file_size < file_size);
1487    }
1488
1489    #[test]
1490    fn create_new_db_in_empty_file() {
1491        let tmpfile = crate::create_tempfile();
1492
1493        let _db = Database::builder()
1494            .create_file(tmpfile.into_file())
1495            .unwrap();
1496    }
1497
1498    #[test]
1499    fn open_missing_file() {
1500        let tmpfile = crate::create_tempfile();
1501
1502        let err = Database::builder()
1503            .open(tmpfile.path().with_extension("missing"))
1504            .unwrap_err();
1505
1506        match err {
1507            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1508            err => panic!("Unexpected error for empty file: {err}"),
1509        }
1510    }
1511
1512    #[test]
1513    fn open_empty_file() {
1514        let tmpfile = crate::create_tempfile();
1515
1516        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1517
1518        match err {
1519            DatabaseError::Storage(StorageError::Io(err))
1520                if err.kind() == ErrorKind::InvalidData => {}
1521            err => panic!("Unexpected error for empty file: {err}"),
1522        }
1523    }
1524}