redb/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTreeMut,
5    TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::io::ErrorKind;
14use std::marker::PhantomData;
15use std::ops::RangeFull;
16use std::path::Path;
17use std::sync::{Arc, Mutex};
18use std::{io, thread};
19
20use crate::error::TransactionError;
21use crate::sealed::Sealed;
22use crate::transactions::{
23    AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE,
24};
25use crate::tree_store::file_backend::FileBackend;
26#[cfg(feature = "logging")]
27use log::{debug, info, warn};
28
29#[allow(clippy::len_without_is_empty)]
30/// Implements persistent storage for a database.
31pub trait StorageBackend: 'static + Debug + Send + Sync {
32    /// Gets the current length of the storage.
33    fn len(&self) -> std::result::Result<u64, io::Error>;
34
35    /// Reads the specified array of bytes from the storage.
36    ///
37    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
38    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
39
40    /// Sets the length of the storage.
41    ///
42    /// When extending the storage the new positions should be zero initialized.
43    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
44
45    /// Syncs all buffered data with the persistent storage.
46    ///
47    /// If `eventual` is true, data may become persistent at some point after this call returns,
48    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
49    /// call to `sync_data()` will become persistent before any writes that occur after.
50    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
51
52    /// Writes the specified array to the storage.
53    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
54}
55
56pub trait TableHandle: Sealed {
57    // Returns the name of the table
58    fn name(&self) -> &str;
59}
60
61#[derive(Clone)]
62pub struct UntypedTableHandle {
63    name: String,
64}
65
66impl UntypedTableHandle {
67    pub(crate) fn new(name: String) -> Self {
68        Self { name }
69    }
70}
71
72impl TableHandle for UntypedTableHandle {
73    fn name(&self) -> &str {
74        &self.name
75    }
76}
77
78impl Sealed for UntypedTableHandle {}
79
80pub trait MultimapTableHandle: Sealed {
81    // Returns the name of the multimap table
82    fn name(&self) -> &str;
83}
84
85#[derive(Clone)]
86pub struct UntypedMultimapTableHandle {
87    name: String,
88}
89
90impl UntypedMultimapTableHandle {
91    pub(crate) fn new(name: String) -> Self {
92        Self { name }
93    }
94}
95
96impl MultimapTableHandle for UntypedMultimapTableHandle {
97    fn name(&self) -> &str {
98        &self.name
99    }
100}
101
102impl Sealed for UntypedMultimapTableHandle {}
103
104/// Defines the name and types of a table
105///
106/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
107///
108/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
109/// that is stored or retreived from the table
110pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
111    name: &'a str,
112    _key_type: PhantomData<K>,
113    _value_type: PhantomData<V>,
114}
115
116impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
117    /// Construct a new table with given `name`
118    ///
119    /// ## Invariant
120    ///
121    /// `name` must not be empty.
122    pub const fn new(name: &'a str) -> Self {
123        assert!(!name.is_empty());
124        Self {
125            name,
126            _key_type: PhantomData,
127            _value_type: PhantomData,
128        }
129    }
130}
131
132impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'a, K, V> {
133    fn name(&self) -> &str {
134        self.name
135    }
136}
137
138impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
139
140impl<'a, K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'a, K, V> {
141    fn clone(&self) -> Self {
142        *self
143    }
144}
145
146impl<'a, K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'a, K, V> {}
147
148impl<'a, K: Key + 'static, V: Value + 'static> Display for TableDefinition<'a, K, V> {
149    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150        write!(
151            f,
152            "{}<{}, {}>",
153            self.name,
154            K::type_name().name(),
155            V::type_name().name()
156        )
157    }
158}
159
160/// Defines the name and types of a multimap table
161///
162/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
163///
164/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
165///
166/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
167/// that is stored or retreived from the table
168pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
169    name: &'a str,
170    _key_type: PhantomData<K>,
171    _value_type: PhantomData<V>,
172}
173
174impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
175    pub const fn new(name: &'a str) -> Self {
176        assert!(!name.is_empty());
177        Self {
178            name,
179            _key_type: PhantomData,
180            _value_type: PhantomData,
181        }
182    }
183}
184
185impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableHandle
186    for MultimapTableDefinition<'a, K, V>
187{
188    fn name(&self) -> &str {
189        self.name
190    }
191}
192
193impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
194
195impl<'a, K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'a, K, V> {
196    fn clone(&self) -> Self {
197        *self
198    }
199}
200
201impl<'a, K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
202
203impl<'a, K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'a, K, V> {
204    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205        write!(
206            f,
207            "{}<{}, {}>",
208            self.name,
209            K::type_name().name(),
210            V::type_name().name()
211        )
212    }
213}
214
215pub(crate) struct TransactionGuard {
216    transaction_tracker: Option<Arc<TransactionTracker>>,
217    transaction_id: Option<TransactionId>,
218    write_transaction: bool,
219}
220
221impl TransactionGuard {
222    pub(crate) fn new_read(
223        transaction_id: TransactionId,
224        tracker: Arc<TransactionTracker>,
225    ) -> Self {
226        Self {
227            transaction_tracker: Some(tracker),
228            transaction_id: Some(transaction_id),
229            write_transaction: false,
230        }
231    }
232
233    pub(crate) fn new_write(
234        transaction_id: TransactionId,
235        tracker: Arc<TransactionTracker>,
236    ) -> Self {
237        Self {
238            transaction_tracker: Some(tracker),
239            transaction_id: Some(transaction_id),
240            write_transaction: true,
241        }
242    }
243
244    // TODO: remove this hack
245    pub(crate) fn fake() -> Self {
246        Self {
247            transaction_tracker: None,
248            transaction_id: None,
249            write_transaction: false,
250        }
251    }
252
253    pub(crate) fn id(&self) -> TransactionId {
254        self.transaction_id.unwrap()
255    }
256
257    pub(crate) fn leak(mut self) -> TransactionId {
258        self.transaction_id.take().unwrap()
259    }
260}
261
262impl Drop for TransactionGuard {
263    fn drop(&mut self) {
264        if self.transaction_tracker.is_none() {
265            return;
266        }
267        if let Some(transaction_id) = self.transaction_id {
268            if self.write_transaction {
269                self.transaction_tracker
270                    .as_ref()
271                    .unwrap()
272                    .end_write_transaction(transaction_id);
273            } else {
274                self.transaction_tracker
275                    .as_ref()
276                    .unwrap()
277                    .deallocate_read_transaction(transaction_id);
278            }
279        }
280    }
281}
282
283/// Opened redb database file
284///
285/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
286/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
287///
288/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
289/// may be in progress at a time.
290///
291/// # Examples
292///
293/// Basic usage:
294///
295/// ```rust
296/// use redb::*;
297/// # use tempfile::NamedTempFile;
298/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
299///
300/// # fn main() -> Result<(), Error> {
301/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
302/// # let filename = tmpfile.path();
303/// let db = Database::create(filename)?;
304/// let write_txn = db.begin_write()?;
305/// {
306///     let mut table = write_txn.open_table(TABLE)?;
307///     table.insert(&0, &0)?;
308/// }
309/// write_txn.commit()?;
310/// # Ok(())
311/// # }
312/// ```
313pub struct Database {
314    mem: Arc<TransactionalMemory>,
315    transaction_tracker: Arc<TransactionTracker>,
316}
317
318impl Database {
319    /// Opens the specified file as a redb database.
320    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
321    /// * if the file is a valid redb database, it will be opened
322    /// * otherwise this function will return an error
323    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
324        Self::builder().create(path)
325    }
326
327    /// Opens an existing redb database.
328    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
329        Self::builder().open(path)
330    }
331
332    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
333        self.mem.clone()
334    }
335
336    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
337        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
338        let table_tree = TableTreeMut::new(
339            mem.get_data_root(),
340            Arc::new(TransactionGuard::fake()),
341            mem.clone(),
342            fake_freed_pages.clone(),
343        );
344        if !table_tree.verify_checksums()? {
345            return Ok(false);
346        }
347        let system_table_tree = TableTreeMut::new(
348            mem.get_system_root(),
349            Arc::new(TransactionGuard::fake()),
350            mem.clone(),
351            fake_freed_pages.clone(),
352        );
353        if !system_table_tree.verify_checksums()? {
354            return Ok(false);
355        }
356        assert!(fake_freed_pages.lock().unwrap().is_empty());
357
358        if let Some(header) = mem.get_freed_root() {
359            if !RawBtree::new(
360                Some(header),
361                FreedTableKey::fixed_width(),
362                FreedPageList::fixed_width(),
363                mem.clone(),
364            )
365            .verify_checksum()?
366            {
367                return Ok(false);
368            }
369        }
370
371        Ok(true)
372    }
373
374    /// Force a check of the integrity of the database file, and repair it if possible.
375    ///
376    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
377    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
378    /// quite slow and should only be used when you suspect the database file may have been modified
379    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
380    ///
381    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
382    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
383    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
384        let allocator_hash = self.mem.allocator_hash();
385        let mut was_clean = Arc::get_mut(&mut self.mem)
386            .unwrap()
387            .clear_cache_and_reload()?;
388
389        let old_roots = [
390            self.mem.get_data_root(),
391            self.mem.get_system_root(),
392            self.mem.get_freed_root(),
393        ];
394
395        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
396            DatabaseError::Storage(storage_err) => storage_err,
397            _ => unreachable!(),
398        })?;
399
400        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
401            was_clean = false;
402        }
403
404        if !was_clean {
405            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
406            let [data_root, system_root, freed_root] = new_roots;
407            self.mem.commit(
408                data_root,
409                system_root,
410                freed_root,
411                next_transaction_id,
412                false,
413                true,
414            )?;
415        }
416
417        self.mem.begin_writable()?;
418
419        Ok(was_clean)
420    }
421
422    /// Compacts the database file
423    ///
424    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
425    pub fn compact(&mut self) -> Result<bool, CompactionError> {
426        if self
427            .transaction_tracker
428            .oldest_live_read_transaction()
429            .is_some()
430        {
431            return Err(CompactionError::TransactionInProgress);
432        }
433        // Commit to free up any pending free pages
434        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
435        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
436        // can cancel the compaction without requiring a full repair afterwards
437        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
438        if txn.list_persistent_savepoints()?.next().is_some() {
439            return Err(CompactionError::PersistentSavepointExists);
440        }
441        if self.transaction_tracker.any_savepoint_exists() {
442            return Err(CompactionError::EphemeralSavepointExists);
443        }
444        txn.set_two_phase_commit(true);
445        txn.commit().map_err(|e| e.into_storage_error())?;
446        // Repeat, just in case executing list_persistent_savepoints() created a new table
447        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
448        txn.set_two_phase_commit(true);
449        txn.commit().map_err(|e| e.into_storage_error())?;
450        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
451        // should have been cleared out by the above commit()
452        assert!(self.mem.get_freed_root().is_none());
453
454        let mut compacted = false;
455        // Iteratively compact until no progress is made
456        loop {
457            let mut progress = false;
458
459            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
460            if txn.compact_pages()? {
461                progress = true;
462                txn.commit().map_err(|e| e.into_storage_error())?;
463            } else {
464                txn.abort()?;
465            }
466
467            // Double commit to free up the relocated pages for reuse
468            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
469            txn.set_two_phase_commit(true);
470            txn.commit().map_err(|e| e.into_storage_error())?;
471            assert!(self.mem.get_freed_root().is_none());
472
473            if !progress {
474                break;
475            }
476
477            compacted = true;
478        }
479
480        Ok(compacted)
481    }
482
483    fn check_repaired_persistent_savepoints(
484        system_root: Option<BtreeHeader>,
485        mem: Arc<TransactionalMemory>,
486    ) -> Result {
487        let freed_list = Arc::new(Mutex::new(vec![]));
488        let table_tree = TableTreeMut::new(
489            system_root,
490            Arc::new(TransactionGuard::fake()),
491            mem.clone(),
492            freed_list,
493        );
494        let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
495        if let Some(savepoint_table_def) = table_tree
496            .get_table::<SavepointId, SerializedSavepoint>(
497                SAVEPOINT_TABLE.name(),
498                TableType::Normal,
499            )
500            .map_err(|e| {
501                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
502            })?
503        {
504            let savepoint_table_root =
505                if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
506                    table_root
507                } else {
508                    unreachable!()
509                };
510            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
511                ReadOnlyTable::new(
512                    "internal savepoint table".to_string(),
513                    savepoint_table_root,
514                    PageHint::None,
515                    Arc::new(TransactionGuard::fake()),
516                    mem.clone(),
517                )?;
518            for result in savepoint_table.range::<SavepointId>(..)? {
519                let (_, savepoint_data) = result?;
520                let savepoint = savepoint_data
521                    .value()
522                    .to_savepoint(fake_transaction_tracker.clone());
523                if let Some(header) = savepoint.get_user_root() {
524                    Self::check_pages_allocated_recursive(header.root, mem.clone())?;
525                }
526            }
527        }
528
529        Ok(())
530    }
531
532    fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
533        if let Some(header) = freed_root {
534            let freed_pages_iter = AllPageNumbersBtreeIter::new(
535                header.root,
536                FreedTableKey::fixed_width(),
537                FreedPageList::fixed_width(),
538                mem.clone(),
539            )?;
540            for page in freed_pages_iter {
541                mem.mark_page_allocated(page?);
542            }
543        }
544
545        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
546            "internal freed table".to_string(),
547            freed_root,
548            PageHint::None,
549            Arc::new(TransactionGuard::fake()),
550            mem.clone(),
551        )?;
552        for result in freed_table.range::<FreedTableKey>(..)? {
553            let (_, freed_page_list) = result?;
554            for i in 0..freed_page_list.value().len() {
555                mem.mark_page_allocated(freed_page_list.value().get(i));
556            }
557        }
558
559        Ok(())
560    }
561
562    fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
563        // Repair the allocator state
564        // All pages in the master table
565        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
566        for result in master_pages_iter {
567            let page = result?;
568            assert!(mem.is_allocated(page));
569        }
570
571        // Iterate over all other tables
572        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
573            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
574
575        // Chain all the other tables to the master table iter
576        for entry in iter {
577            let definition = entry?.value();
578            definition.visit_all_pages(mem.clone(), |path| {
579                assert!(mem.is_allocated(path.page_number()));
580                Ok(())
581            })?;
582        }
583
584        Ok(())
585    }
586
587    fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
588        // Repair the allocator state
589        // All pages in the master table
590        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
591        for page in master_pages_iter {
592            mem.mark_page_allocated(page?);
593        }
594
595        // Iterate over all other tables
596        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
597            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
598
599        // Chain all the other tables to the master table iter
600        for entry in iter {
601            let definition = entry?.value();
602            definition.visit_all_pages(mem.clone(), |path| {
603                mem.mark_page_allocated(path.page_number());
604                Ok(())
605            })?;
606        }
607
608        Ok(())
609    }
610
611    fn do_repair(
612        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
613        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
614    ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
615        if !Self::verify_primary_checksums(mem.clone())? {
616            if mem.used_two_phase_commit() {
617                return Err(DatabaseError::Storage(StorageError::Corrupted(
618                    "Primary is corrupted despite 2-phase commit".to_string(),
619                )));
620            }
621
622            // 0.3 because the repair takes 3 full scans and the first is done now
623            let mut handle = RepairSession::new(0.3);
624            repair_callback(&mut handle);
625            if handle.aborted() {
626                return Err(DatabaseError::RepairAborted);
627            }
628
629            mem.repair_primary_corrupted();
630            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
631            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
632            // that rolls back a partially committed transaction.
633            mem.clear_read_cache();
634            if !Self::verify_primary_checksums(mem.clone())? {
635                return Err(DatabaseError::Storage(StorageError::Corrupted(
636                    "Failed to repair database. All roots are corrupted".to_string(),
637                )));
638            }
639        }
640        // 0.6 because the repair takes 3 full scans and the second is done now
641        let mut handle = RepairSession::new(0.6);
642        repair_callback(&mut handle);
643        if handle.aborted() {
644            return Err(DatabaseError::RepairAborted);
645        }
646
647        mem.begin_repair()?;
648
649        let data_root = mem.get_data_root();
650        if let Some(header) = data_root {
651            Self::mark_tables_recursive(header.root, mem.clone())?;
652        }
653
654        let freed_root = mem.get_freed_root();
655        // Allow processing of all transactions, since this is the main freed tree
656        Self::mark_freed_tree(freed_root, mem.clone())?;
657        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
658            "internal freed table".to_string(),
659            freed_root,
660            PageHint::None,
661            Arc::new(TransactionGuard::fake()),
662            mem.clone(),
663        )?;
664        drop(freed_table);
665
666        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
667        let mut handle = RepairSession::new(0.9);
668        repair_callback(&mut handle);
669        if handle.aborted() {
670            return Err(DatabaseError::RepairAborted);
671        }
672
673        let system_root = mem.get_system_root();
674        if let Some(header) = system_root {
675            Self::mark_tables_recursive(header.root, mem.clone())?;
676        }
677        #[cfg(debug_assertions)]
678        {
679            // Savepoints only reference pages from a previous commit, so those are either referenced
680            // in the current root, or are in the freed tree
681            Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
682        }
683
684        mem.end_repair()?;
685
686        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
687        // by storing an empty root during the below commit()
688        mem.clear_read_cache();
689
690        Ok([data_root, system_root, freed_root])
691    }
692
693    fn new(
694        file: Box<dyn StorageBackend>,
695        page_size: usize,
696        region_size: Option<u64>,
697        read_cache_size_bytes: usize,
698        write_cache_size_bytes: usize,
699        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
700    ) -> Result<Self, DatabaseError> {
701        #[cfg(feature = "logging")]
702        let file_path = format!("{:?}", &file);
703        #[cfg(feature = "logging")]
704        info!("Opening database {:?}", &file_path);
705        let mem = TransactionalMemory::new(
706            file,
707            page_size,
708            region_size,
709            read_cache_size_bytes,
710            write_cache_size_bytes,
711        )?;
712        let mut mem = Arc::new(mem);
713        if mem.needs_repair()? {
714            // If the last transaction used 2-phase commit and updated the allocator state table, then
715            // we can just load the allocator state from there. Otherwise, we need a full repair
716            if let Some(tree) = Self::get_allocator_state_table(&mem)? {
717                #[cfg(feature = "logging")]
718                info!("Found valid allocator state, full repair not needed");
719                mem.load_allocator_state(&tree)?;
720            } else {
721                #[cfg(feature = "logging")]
722                warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
723                let mut handle = RepairSession::new(0.0);
724                repair_callback(&mut handle);
725                if handle.aborted() {
726                    return Err(DatabaseError::RepairAborted);
727                }
728                let [data_root, system_root, freed_root] =
729                    Self::do_repair(&mut mem, repair_callback)?;
730                let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
731                mem.commit(
732                    data_root,
733                    system_root,
734                    freed_root,
735                    next_transaction_id,
736                    false,
737                    true,
738                )?;
739            }
740        }
741
742        mem.begin_writable()?;
743        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
744
745        let db = Database {
746            mem,
747            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
748        };
749
750        // Restore the tracker state for any persistent savepoints
751        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
752        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
753            db.transaction_tracker
754                .restore_savepoint_counter_state(next_id);
755        }
756        for id in txn.list_persistent_savepoints()? {
757            let savepoint = match txn.get_persistent_savepoint(id) {
758                Ok(savepoint) => savepoint,
759                Err(err) => match err {
760                    SavepointError::InvalidSavepoint => unreachable!(),
761                    SavepointError::Storage(storage) => {
762                        return Err(storage.into());
763                    }
764                },
765            };
766            db.transaction_tracker
767                .register_persistent_savepoint(&savepoint);
768        }
769        txn.abort()?;
770
771        Ok(db)
772    }
773
774    fn get_allocator_state_table(
775        mem: &Arc<TransactionalMemory>,
776    ) -> Result<Option<AllocatorStateTree>> {
777        // The allocator state table is only valid if the primary was written using 2-phase commit
778        if !mem.used_two_phase_commit() {
779            return Ok(None);
780        }
781
782        // See if it's present in the system table tree
783        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
784        let system_table_tree = TableTreeMut::new(
785            mem.get_system_root(),
786            Arc::new(TransactionGuard::fake()),
787            mem.clone(),
788            fake_freed_pages.clone(),
789        );
790        let Some(allocator_state_table) = system_table_tree
791            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
792            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
793        else {
794            return Ok(None);
795        };
796
797        // Load the allocator state table
798        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
799            unreachable!();
800        };
801        let tree = AllocatorStateTree::new(
802            table_root,
803            Arc::new(TransactionGuard::fake()),
804            mem.clone(),
805            fake_freed_pages,
806        );
807
808        // Make sure this isn't stale allocator state left over from a previous transaction
809        if !mem.is_valid_allocator_state(&tree)? {
810            return Ok(None);
811        }
812
813        Ok(Some(tree))
814    }
815
816    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
817        let id = self
818            .transaction_tracker
819            .register_read_transaction(&self.mem)?;
820
821        Ok(TransactionGuard::new_read(
822            id,
823            self.transaction_tracker.clone(),
824        ))
825    }
826
827    /// Convenience method for [`Builder::new`]
828    pub fn builder() -> Builder {
829        Builder::new()
830    }
831
832    /// Begins a write transaction
833    ///
834    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
835    /// write may be in progress at a time. If a write is in progress, this function will block
836    /// until it completes.
837    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
838        // Fail early if there has been an I/O error -- nothing can be committed in that case
839        self.mem.check_io_errors()?;
840        let guard = TransactionGuard::new_write(
841            self.transaction_tracker.start_write_transaction(),
842            self.transaction_tracker.clone(),
843        );
844        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
845            .map_err(|e| e.into())
846    }
847
848    /// Begins a read transaction
849    ///
850    /// Captures a snapshot of the database, so that only data committed before calling this method
851    /// is visible in the transaction
852    ///
853    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
854    /// may exist concurrently with writes
855    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
856        let guard = self.allocate_read_transaction()?;
857        #[cfg(feature = "logging")]
858        debug!("Beginning read transaction id={:?}", guard.id());
859        ReadTransaction::new(self.get_memory(), guard)
860    }
861
862    fn ensure_allocator_state_table(&self) -> Result<(), Error> {
863        // If the allocator state table is already up to date, we're done
864        if Self::get_allocator_state_table(&self.mem)?.is_some() {
865            return Ok(());
866        }
867
868        // Make a new quick-repair commit to update the allocator state table
869        #[cfg(feature = "logging")]
870        debug!("Writing allocator state table");
871        let mut tx = self.begin_write()?;
872        tx.set_quick_repair(true);
873        tx.commit()?;
874
875        Ok(())
876    }
877}
878
879impl Drop for Database {
880    fn drop(&mut self) {
881        if thread::panicking() {
882            return;
883        }
884
885        if self.ensure_allocator_state_table().is_err() {
886            #[cfg(feature = "logging")]
887            warn!("Failed to write allocator state table. Repair may be required at restart.")
888        }
889    }
890}
891
892pub struct RepairSession {
893    progress: f64,
894    aborted: bool,
895}
896
897impl RepairSession {
898    pub(crate) fn new(progress: f64) -> Self {
899        Self {
900            progress,
901            aborted: false,
902        }
903    }
904
905    pub(crate) fn aborted(&self) -> bool {
906        self.aborted
907    }
908
909    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
910    pub fn abort(&mut self) {
911        self.aborted = true;
912    }
913
914    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
915    pub fn progress(&self) -> f64 {
916        self.progress
917    }
918}
919
920/// Configuration builder of a redb [Database].
921pub struct Builder {
922    page_size: usize,
923    region_size: Option<u64>,
924    read_cache_size_bytes: usize,
925    write_cache_size_bytes: usize,
926    repair_callback: Box<dyn Fn(&mut RepairSession)>,
927}
928
929impl Builder {
930    /// Construct a new [Builder] with sensible defaults.
931    ///
932    /// ## Defaults
933    ///
934    /// - `cache_size_bytes`: 1GiB
935    #[allow(clippy::new_without_default)]
936    pub fn new() -> Self {
937        let mut result = Self {
938            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
939            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
940            // It is part of the file format, so can be enabled in the future.
941            page_size: PAGE_SIZE,
942            region_size: None,
943            // TODO: Default should probably take into account the total system memory
944            read_cache_size_bytes: 0,
945            // TODO: Default should probably take into account the total system memory
946            write_cache_size_bytes: 0,
947            repair_callback: Box::new(|_| {}),
948        };
949
950        result.set_cache_size(1024 * 1024 * 1024);
951        result
952    }
953
954    /// Set a callback which will be invoked periodically in the event that the database file needs
955    /// to be repaired.
956    ///
957    /// The [`RepairSession`] argument can be used to control the repair process.
958    ///
959    /// If the database file needs repair, the callback will be invoked at least once.
960    /// There is no upper limit on the number of times it may be called.
961    pub fn set_repair_callback(
962        &mut self,
963        callback: impl Fn(&mut RepairSession) + 'static,
964    ) -> &mut Self {
965        self.repair_callback = Box::new(callback);
966        self
967    }
968
969    /// Set the internal page size of the database
970    ///
971    /// Valid values are powers of two, greater than or equal to 512
972    ///
973    /// ## Defaults
974    ///
975    /// Default to 4 Kib pages.
976    #[cfg(any(fuzzing, test))]
977    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
978        assert!(size.is_power_of_two());
979        self.page_size = std::cmp::max(size, 512);
980        self
981    }
982
983    /// Set the amount of memory (in bytes) used for caching data
984    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
985        // TODO: allow dynamic expansion of the read/write cache
986        self.read_cache_size_bytes = bytes / 10 * 9;
987        self.write_cache_size_bytes = bytes / 10;
988        self
989    }
990
991    #[cfg(any(test, fuzzing))]
992    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
993        assert!(size.is_power_of_two());
994        self.region_size = Some(size);
995        self
996    }
997
998    /// Opens the specified file as a redb database.
999    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1000    /// * if the file is a valid redb database, it will be opened
1001    /// * otherwise this function will return an error
1002    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1003        let file = OpenOptions::new()
1004            .read(true)
1005            .write(true)
1006            .create(true)
1007            .truncate(false)
1008            .open(path)?;
1009
1010        Database::new(
1011            Box::new(FileBackend::new(file)?),
1012            self.page_size,
1013            self.region_size,
1014            self.read_cache_size_bytes,
1015            self.write_cache_size_bytes,
1016            &self.repair_callback,
1017        )
1018    }
1019
1020    /// Opens an existing redb database.
1021    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1022        let file = OpenOptions::new().read(true).write(true).open(path)?;
1023
1024        if file.metadata()?.len() == 0 {
1025            return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
1026        }
1027
1028        Database::new(
1029            Box::new(FileBackend::new(file)?),
1030            self.page_size,
1031            None,
1032            self.read_cache_size_bytes,
1033            self.write_cache_size_bytes,
1034            &self.repair_callback,
1035        )
1036    }
1037
1038    /// Open an existing or create a new database in the given `file`.
1039    ///
1040    /// The file must be empty or contain a valid database.
1041    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1042        Database::new(
1043            Box::new(FileBackend::new(file)?),
1044            self.page_size,
1045            self.region_size,
1046            self.read_cache_size_bytes,
1047            self.write_cache_size_bytes,
1048            &self.repair_callback,
1049        )
1050    }
1051
1052    /// Open an existing or create a new database with the given backend.
1053    pub fn create_with_backend(
1054        &self,
1055        backend: impl StorageBackend,
1056    ) -> Result<Database, DatabaseError> {
1057        Database::new(
1058            Box::new(backend),
1059            self.page_size,
1060            self.region_size,
1061            self.read_cache_size_bytes,
1062            self.write_cache_size_bytes,
1063            &self.repair_callback,
1064        )
1065    }
1066}
1067
1068impl std::fmt::Debug for Database {
1069    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1070        f.debug_struct("Database").finish()
1071    }
1072}
1073
1074#[cfg(test)]
1075mod test {
1076    use crate::backends::FileBackend;
1077    use crate::{
1078        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1079        StorageError, TableDefinition, TransactionError,
1080    };
1081    use std::fs::File;
1082    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1083    use std::sync::atomic::{AtomicU64, Ordering};
1084    use std::sync::Arc;
1085
1086    #[derive(Debug)]
1087    struct FailingBackend {
1088        inner: FileBackend,
1089        countdown: Arc<AtomicU64>,
1090    }
1091
1092    impl FailingBackend {
1093        fn new(backend: FileBackend, countdown: u64) -> Self {
1094            Self {
1095                inner: backend,
1096                countdown: Arc::new(AtomicU64::new(countdown)),
1097            }
1098        }
1099
1100        fn check_countdown(&self) -> Result<(), std::io::Error> {
1101            if self.countdown.load(Ordering::SeqCst) == 0 {
1102                return Err(std::io::Error::from(ErrorKind::Other));
1103            }
1104
1105            Ok(())
1106        }
1107
1108        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1109            if self
1110                .countdown
1111                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1112                    if x > 0 {
1113                        Some(x - 1)
1114                    } else {
1115                        None
1116                    }
1117                })
1118                .is_err()
1119            {
1120                return Err(std::io::Error::from(ErrorKind::Other));
1121            }
1122
1123            Ok(())
1124        }
1125    }
1126
1127    impl StorageBackend for FailingBackend {
1128        fn len(&self) -> Result<u64, std::io::Error> {
1129            self.inner.len()
1130        }
1131
1132        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1133            self.check_countdown()?;
1134            self.inner.read(offset, len)
1135        }
1136
1137        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1138            self.inner.set_len(len)
1139        }
1140
1141        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1142            self.check_countdown()?;
1143            self.inner.sync_data(eventual)
1144        }
1145
1146        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1147            self.decrement_countdown()?;
1148            self.inner.write(offset, data)
1149        }
1150    }
1151
1152    #[test]
1153    fn crash_regression4() {
1154        let tmpfile = crate::create_tempfile();
1155        let (file, path) = tmpfile.into_parts();
1156
1157        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1158        let db = Database::builder()
1159            .set_cache_size(12686)
1160            .set_page_size(8 * 1024)
1161            .set_region_size(32 * 4096)
1162            .create_with_backend(backend)
1163            .unwrap();
1164
1165        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1166
1167        let tx = db.begin_write().unwrap();
1168        let _savepoint = tx.ephemeral_savepoint().unwrap();
1169        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1170        tx.commit().unwrap();
1171        let tx = db.begin_write().unwrap();
1172        {
1173            let mut table = tx.open_table(table_def).unwrap();
1174            let _ = table.insert_reserve(118821, 360).unwrap();
1175        }
1176        let result = tx.commit();
1177        assert!(result.is_err());
1178
1179        drop(db);
1180        Database::builder()
1181            .set_cache_size(1024 * 1024)
1182            .set_page_size(8 * 1024)
1183            .set_region_size(32 * 4096)
1184            .create(&path)
1185            .unwrap();
1186    }
1187
1188    #[test]
1189    fn transient_io_error() {
1190        let tmpfile = crate::create_tempfile();
1191        let (file, path) = tmpfile.into_parts();
1192
1193        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1194        let countdown = backend.countdown.clone();
1195        let db = Database::builder()
1196            .set_cache_size(0)
1197            .create_with_backend(backend)
1198            .unwrap();
1199
1200        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1201
1202        // Create some garbage
1203        let tx = db.begin_write().unwrap();
1204        {
1205            let mut table = tx.open_table(table_def).unwrap();
1206            table.insert(0, 0).unwrap();
1207        }
1208        tx.commit().unwrap();
1209        let tx = db.begin_write().unwrap();
1210        {
1211            let mut table = tx.open_table(table_def).unwrap();
1212            table.insert(0, 1).unwrap();
1213        }
1214        tx.commit().unwrap();
1215
1216        let tx = db.begin_write().unwrap();
1217        // Cause an error in the commit
1218        countdown.store(0, Ordering::SeqCst);
1219        let result = tx.commit().err().unwrap();
1220        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1221        let result = db.begin_write().err().unwrap();
1222        assert!(matches!(
1223            result,
1224            TransactionError::Storage(StorageError::PreviousIo)
1225        ));
1226        // Simulate a transient error
1227        countdown.store(u64::MAX, Ordering::SeqCst);
1228        drop(db);
1229
1230        // Check that recovery flag is set, even though the error has "cleared"
1231        let mut file = File::open(&path).unwrap();
1232        file.seek(SeekFrom::Start(9)).unwrap();
1233        let mut god_byte = vec![0u8];
1234        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1235        assert_ne!(god_byte[0] & 2, 0);
1236    }
1237
1238    #[test]
1239    fn small_pages() {
1240        let tmpfile = crate::create_tempfile();
1241
1242        let db = Database::builder()
1243            .set_page_size(512)
1244            .create(tmpfile.path())
1245            .unwrap();
1246
1247        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1248        let txn = db.begin_write().unwrap();
1249        {
1250            txn.open_table(table_definition).unwrap();
1251        }
1252        txn.commit().unwrap();
1253    }
1254
1255    #[test]
1256    fn small_pages2() {
1257        let tmpfile = crate::create_tempfile();
1258
1259        let db = Database::builder()
1260            .set_page_size(512)
1261            .create(tmpfile.path())
1262            .unwrap();
1263
1264        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1265
1266        let mut tx = db.begin_write().unwrap();
1267        tx.set_two_phase_commit(true);
1268        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1269        {
1270            tx.open_table(table_def).unwrap();
1271        }
1272        tx.commit().unwrap();
1273
1274        let mut tx = db.begin_write().unwrap();
1275        tx.set_two_phase_commit(true);
1276        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1277        tx.restore_savepoint(&savepoint0).unwrap();
1278        tx.set_durability(Durability::None);
1279        {
1280            let mut t = tx.open_table(table_def).unwrap();
1281            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1282            assert!(t.remove(&291295).unwrap().is_none());
1283        }
1284        tx.commit().unwrap();
1285
1286        let mut tx = db.begin_write().unwrap();
1287        tx.set_two_phase_commit(true);
1288        tx.restore_savepoint(&savepoint0).unwrap();
1289        {
1290            tx.open_table(table_def).unwrap();
1291        }
1292        tx.commit().unwrap();
1293
1294        let mut tx = db.begin_write().unwrap();
1295        tx.set_two_phase_commit(true);
1296        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1297        drop(savepoint0);
1298        tx.restore_savepoint(&savepoint2).unwrap();
1299        {
1300            let mut t = tx.open_table(table_def).unwrap();
1301            assert!(t.get(&2059).unwrap().is_none());
1302            assert!(t.remove(&145227).unwrap().is_none());
1303            assert!(t.remove(&145227).unwrap().is_none());
1304        }
1305        tx.commit().unwrap();
1306
1307        let mut tx = db.begin_write().unwrap();
1308        tx.set_two_phase_commit(true);
1309        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1310        drop(savepoint1);
1311        tx.restore_savepoint(&savepoint3).unwrap();
1312        {
1313            tx.open_table(table_def).unwrap();
1314        }
1315        tx.commit().unwrap();
1316
1317        let mut tx = db.begin_write().unwrap();
1318        tx.set_two_phase_commit(true);
1319        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1320        drop(savepoint2);
1321        tx.restore_savepoint(&savepoint3).unwrap();
1322        tx.set_durability(Durability::None);
1323        {
1324            let mut t = tx.open_table(table_def).unwrap();
1325            assert!(t.remove(&207936).unwrap().is_none());
1326        }
1327        tx.abort().unwrap();
1328
1329        let mut tx = db.begin_write().unwrap();
1330        tx.set_two_phase_commit(true);
1331        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1332        drop(savepoint3);
1333        assert!(tx.restore_savepoint(&savepoint4).is_err());
1334        {
1335            tx.open_table(table_def).unwrap();
1336        }
1337        tx.commit().unwrap();
1338
1339        let mut tx = db.begin_write().unwrap();
1340        tx.set_two_phase_commit(true);
1341        tx.restore_savepoint(&savepoint5).unwrap();
1342        tx.set_durability(Durability::None);
1343        {
1344            tx.open_table(table_def).unwrap();
1345        }
1346        tx.commit().unwrap();
1347    }
1348
1349    #[test]
1350    fn small_pages3() {
1351        let tmpfile = crate::create_tempfile();
1352
1353        let db = Database::builder()
1354            .set_page_size(1024)
1355            .create(tmpfile.path())
1356            .unwrap();
1357
1358        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1359
1360        let mut tx = db.begin_write().unwrap();
1361        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1362        tx.set_durability(Durability::None);
1363        {
1364            let mut t = tx.open_table(table_def).unwrap();
1365            let value = vec![0; 306];
1366            t.insert(&539717, value.as_slice()).unwrap();
1367        }
1368        tx.abort().unwrap();
1369
1370        let mut tx = db.begin_write().unwrap();
1371        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1372        tx.restore_savepoint(&savepoint1).unwrap();
1373        tx.set_durability(Durability::None);
1374        {
1375            let mut t = tx.open_table(table_def).unwrap();
1376            let value = vec![0; 2008];
1377            t.insert(&784384, value.as_slice()).unwrap();
1378        }
1379        tx.abort().unwrap();
1380    }
1381
1382    #[test]
1383    fn small_pages4() {
1384        let tmpfile = crate::create_tempfile();
1385
1386        let db = Database::builder()
1387            .set_cache_size(1024 * 1024)
1388            .set_page_size(1024)
1389            .create(tmpfile.path())
1390            .unwrap();
1391
1392        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1393
1394        let tx = db.begin_write().unwrap();
1395        {
1396            tx.open_table(table_def).unwrap();
1397        }
1398        tx.commit().unwrap();
1399
1400        let tx = db.begin_write().unwrap();
1401        {
1402            let mut t = tx.open_table(table_def).unwrap();
1403            assert!(t.get(&131072).unwrap().is_none());
1404            let value = vec![0xFF; 1130];
1405            t.insert(&42394, value.as_slice()).unwrap();
1406            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1407            assert!(t.get(&0).unwrap().is_none());
1408        }
1409        tx.abort().unwrap();
1410
1411        let tx = db.begin_write().unwrap();
1412        {
1413            let mut t = tx.open_table(table_def).unwrap();
1414            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1415        }
1416        tx.abort().unwrap();
1417    }
1418
1419    #[test]
1420    fn dynamic_shrink() {
1421        let tmpfile = crate::create_tempfile();
1422        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1423        let big_value = vec![0u8; 1024];
1424
1425        let db = Database::builder()
1426            .set_region_size(1024 * 1024)
1427            .create(tmpfile.path())
1428            .unwrap();
1429
1430        let txn = db.begin_write().unwrap();
1431        {
1432            let mut table = txn.open_table(table_definition).unwrap();
1433            for i in 0..2048 {
1434                table.insert(&i, big_value.as_slice()).unwrap();
1435            }
1436        }
1437        txn.commit().unwrap();
1438
1439        let file_size = tmpfile.as_file().metadata().unwrap().len();
1440
1441        let txn = db.begin_write().unwrap();
1442        {
1443            let mut table = txn.open_table(table_definition).unwrap();
1444            for i in 0..2048 {
1445                table.remove(&i).unwrap();
1446            }
1447        }
1448        txn.commit().unwrap();
1449
1450        // Perform a couple more commits to be sure the database has a chance to compact
1451        let txn = db.begin_write().unwrap();
1452        {
1453            let mut table = txn.open_table(table_definition).unwrap();
1454            table.insert(0, [].as_slice()).unwrap();
1455        }
1456        txn.commit().unwrap();
1457        let txn = db.begin_write().unwrap();
1458        {
1459            let mut table = txn.open_table(table_definition).unwrap();
1460            table.remove(0).unwrap();
1461        }
1462        txn.commit().unwrap();
1463        let txn = db.begin_write().unwrap();
1464        txn.commit().unwrap();
1465
1466        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1467        assert!(final_file_size < file_size);
1468    }
1469
1470    #[test]
1471    fn create_new_db_in_empty_file() {
1472        let tmpfile = crate::create_tempfile();
1473
1474        let _db = Database::builder()
1475            .create_file(tmpfile.into_file())
1476            .unwrap();
1477    }
1478
1479    #[test]
1480    fn open_missing_file() {
1481        let tmpfile = crate::create_tempfile();
1482
1483        let err = Database::builder()
1484            .open(tmpfile.path().with_extension("missing"))
1485            .unwrap_err();
1486
1487        match err {
1488            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1489            err => panic!("Unexpected error for empty file: {err}"),
1490        }
1491    }
1492
1493    #[test]
1494    fn open_empty_file() {
1495        let tmpfile = crate::create_tempfile();
1496
1497        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1498
1499        match err {
1500            DatabaseError::Storage(StorageError::Io(err))
1501                if err.kind() == ErrorKind::InvalidData => {}
1502            err => panic!("Unexpected error for empty file: {err}"),
1503        }
1504    }
1505}