1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, RawBtree, SerializedSavepoint,
5 TableTreeMut, TableType, TransactionalMemory,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::marker::PhantomData;
14use std::ops::RangeFull;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22 ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, SAVEPOINT_TABLE,
23};
24use crate::tree_store::file_backend::FileBackend;
25#[cfg(feature = "logging")]
26use log::{debug, info, warn};
27
28#[allow(clippy::len_without_is_empty)]
29pub trait StorageBackend: 'static + Debug + Send + Sync {
31 fn len(&self) -> std::result::Result<u64, io::Error>;
33
34 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
38
39 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
43
44 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
50
51 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
53}
54
55pub trait TableHandle: Sealed {
56 fn name(&self) -> &str;
58}
59
60#[derive(Clone)]
61pub struct UntypedTableHandle {
62 name: String,
63}
64
65impl UntypedTableHandle {
66 pub(crate) fn new(name: String) -> Self {
67 Self { name }
68 }
69}
70
71impl TableHandle for UntypedTableHandle {
72 fn name(&self) -> &str {
73 &self.name
74 }
75}
76
77impl Sealed for UntypedTableHandle {}
78
79pub trait MultimapTableHandle: Sealed {
80 fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedMultimapTableHandle {
86 name: String,
87}
88
89impl UntypedMultimapTableHandle {
90 pub(crate) fn new(name: String) -> Self {
91 Self { name }
92 }
93}
94
95impl MultimapTableHandle for UntypedMultimapTableHandle {
96 fn name(&self) -> &str {
97 &self.name
98 }
99}
100
101impl Sealed for UntypedMultimapTableHandle {}
102
103pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
110 name: &'a str,
111 _key_type: PhantomData<K>,
112 _value_type: PhantomData<V>,
113}
114
115impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
116 pub const fn new(name: &'a str) -> Self {
122 assert!(!name.is_empty());
123 Self {
124 name,
125 _key_type: PhantomData,
126 _value_type: PhantomData,
127 }
128 }
129}
130
131impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
132 fn name(&self) -> &str {
133 self.name
134 }
135}
136
137impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
138
139impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
140 fn clone(&self) -> Self {
141 *self
142 }
143}
144
145impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
146
147impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 write!(
150 f,
151 "{}<{}, {}>",
152 self.name,
153 K::type_name().name(),
154 V::type_name().name()
155 )
156 }
157}
158
159pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
168 name: &'a str,
169 _key_type: PhantomData<K>,
170 _value_type: PhantomData<V>,
171}
172
173impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
174 pub const fn new(name: &'a str) -> Self {
175 assert!(!name.is_empty());
176 Self {
177 name,
178 _key_type: PhantomData,
179 _value_type: PhantomData,
180 }
181 }
182}
183
184impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
185 fn name(&self) -> &str {
186 self.name
187 }
188}
189
190impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
191
192impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
193 fn clone(&self) -> Self {
194 *self
195 }
196}
197
198impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
199
200impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202 write!(
203 f,
204 "{}<{}, {}>",
205 self.name,
206 K::type_name().name(),
207 V::type_name().name()
208 )
209 }
210}
211
212#[derive(Debug)]
216pub struct CacheStats {
217 pub(crate) evictions: u64,
218}
219
220impl CacheStats {
221 pub fn evictions(&self) -> u64 {
225 self.evictions
226 }
227}
228
229pub(crate) struct TransactionGuard {
230 transaction_tracker: Option<Arc<TransactionTracker>>,
231 transaction_id: Option<TransactionId>,
232 write_transaction: bool,
233}
234
235impl TransactionGuard {
236 pub(crate) fn new_read(
237 transaction_id: TransactionId,
238 tracker: Arc<TransactionTracker>,
239 ) -> Self {
240 Self {
241 transaction_tracker: Some(tracker),
242 transaction_id: Some(transaction_id),
243 write_transaction: false,
244 }
245 }
246
247 pub(crate) fn new_write(
248 transaction_id: TransactionId,
249 tracker: Arc<TransactionTracker>,
250 ) -> Self {
251 Self {
252 transaction_tracker: Some(tracker),
253 transaction_id: Some(transaction_id),
254 write_transaction: true,
255 }
256 }
257
258 pub(crate) fn fake() -> Self {
260 Self {
261 transaction_tracker: None,
262 transaction_id: None,
263 write_transaction: false,
264 }
265 }
266
267 pub(crate) fn id(&self) -> TransactionId {
268 self.transaction_id.unwrap()
269 }
270
271 pub(crate) fn leak(mut self) -> TransactionId {
272 self.transaction_id.take().unwrap()
273 }
274}
275
276impl Drop for TransactionGuard {
277 fn drop(&mut self) {
278 if self.transaction_tracker.is_none() {
279 return;
280 }
281 if let Some(transaction_id) = self.transaction_id {
282 if self.write_transaction {
283 self.transaction_tracker
284 .as_ref()
285 .unwrap()
286 .end_write_transaction(transaction_id);
287 } else {
288 self.transaction_tracker
289 .as_ref()
290 .unwrap()
291 .deallocate_read_transaction(transaction_id);
292 }
293 }
294 }
295}
296
297pub struct Database {
328 mem: Arc<TransactionalMemory>,
329 transaction_tracker: Arc<TransactionTracker>,
330}
331
332impl Database {
333 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
338 Self::builder().create(path)
339 }
340
341 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
343 Self::builder().open(path)
344 }
345
346 pub fn cache_stats(&self) -> CacheStats {
350 self.mem.cache_stats()
351 }
352
353 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
354 self.mem.clone()
355 }
356
357 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
358 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
359 let table_tree = TableTreeMut::new(
360 mem.get_data_root(),
361 Arc::new(TransactionGuard::fake()),
362 mem.clone(),
363 fake_freed_pages.clone(),
364 );
365 if !table_tree.verify_checksums()? {
366 return Ok(false);
367 }
368 let system_table_tree = TableTreeMut::new(
369 mem.get_system_root(),
370 Arc::new(TransactionGuard::fake()),
371 mem.clone(),
372 fake_freed_pages.clone(),
373 );
374 if !system_table_tree.verify_checksums()? {
375 return Ok(false);
376 }
377 assert!(fake_freed_pages.lock().unwrap().is_empty());
378
379 if let Some(header) = mem.get_freed_root() {
380 if !RawBtree::new(
381 Some(header),
382 FreedTableKey::fixed_width(),
383 FreedPageList::fixed_width(),
384 mem.clone(),
385 )
386 .verify_checksum()?
387 {
388 return Ok(false);
389 }
390 }
391
392 Ok(true)
393 }
394
395 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
405 let allocator_hash = self.mem.allocator_hash();
406 let mut was_clean = Arc::get_mut(&mut self.mem)
407 .unwrap()
408 .clear_cache_and_reload()?;
409
410 let old_roots = [
411 self.mem.get_data_root(),
412 self.mem.get_system_root(),
413 self.mem.get_freed_root(),
414 ];
415
416 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
417 DatabaseError::Storage(storage_err) => storage_err,
418 _ => unreachable!(),
419 })?;
420
421 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
422 was_clean = false;
423 }
424
425 if !was_clean {
426 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
427 let [data_root, system_root, freed_root] = new_roots;
428 self.mem.commit(
429 data_root,
430 system_root,
431 freed_root,
432 next_transaction_id,
433 false,
434 true,
435 )?;
436 }
437
438 self.mem.begin_writable()?;
439
440 Ok(was_clean)
441 }
442
443 pub fn compact(&mut self) -> Result<bool, CompactionError> {
447 if self
448 .transaction_tracker
449 .oldest_live_read_transaction()
450 .is_some()
451 {
452 return Err(CompactionError::TransactionInProgress);
453 }
454 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
459 if txn.list_persistent_savepoints()?.next().is_some() {
460 return Err(CompactionError::PersistentSavepointExists);
461 }
462 if self.transaction_tracker.any_savepoint_exists() {
463 return Err(CompactionError::EphemeralSavepointExists);
464 }
465 txn.set_two_phase_commit(true);
466 txn.commit().map_err(|e| e.into_storage_error())?;
467 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
469 txn.set_two_phase_commit(true);
470 txn.commit().map_err(|e| e.into_storage_error())?;
471 assert!(self.mem.get_freed_root().is_none());
474
475 let mut compacted = false;
476 loop {
478 let mut progress = false;
479
480 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
481 if txn.compact_pages()? {
482 progress = true;
483 txn.commit().map_err(|e| e.into_storage_error())?;
484 } else {
485 txn.abort()?;
486 }
487
488 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
490 txn.set_two_phase_commit(true);
491 txn.commit().map_err(|e| e.into_storage_error())?;
492 assert!(self.mem.get_freed_root().is_none());
493
494 if !progress {
495 break;
496 }
497
498 compacted = true;
499 }
500
501 Ok(compacted)
502 }
503
504 fn check_repaired_persistent_savepoints(
505 system_root: Option<BtreeHeader>,
506 mem: Arc<TransactionalMemory>,
507 ) -> Result {
508 let freed_list = Arc::new(Mutex::new(vec![]));
509 let table_tree = TableTreeMut::new(
510 system_root,
511 Arc::new(TransactionGuard::fake()),
512 mem.clone(),
513 freed_list,
514 );
515 let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
516 if let Some(savepoint_table_def) = table_tree
517 .get_table::<SavepointId, SerializedSavepoint>(
518 SAVEPOINT_TABLE.name(),
519 TableType::Normal,
520 )
521 .map_err(|e| {
522 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
523 })?
524 {
525 let savepoint_table_root =
526 if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
527 table_root
528 } else {
529 unreachable!()
530 };
531 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
532 ReadOnlyTable::new(
533 "internal savepoint table".to_string(),
534 savepoint_table_root,
535 PageHint::None,
536 Arc::new(TransactionGuard::fake()),
537 mem.clone(),
538 )?;
539 for result in savepoint_table.range::<SavepointId>(..)? {
540 let (_, savepoint_data) = result?;
541 let savepoint = savepoint_data
542 .value()
543 .to_savepoint(fake_transaction_tracker.clone());
544 if let Some(header) = savepoint.get_user_root() {
545 Self::check_pages_allocated_recursive(header.root, mem.clone())?;
546 }
547 }
548 }
549
550 Ok(())
551 }
552
553 fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
554 if let Some(header) = freed_root {
555 let freed_pages_iter = AllPageNumbersBtreeIter::new(
556 header.root,
557 FreedTableKey::fixed_width(),
558 FreedPageList::fixed_width(),
559 mem.clone(),
560 )?;
561 for page in freed_pages_iter {
562 mem.mark_page_allocated(page?);
563 }
564 }
565
566 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
567 "internal freed table".to_string(),
568 freed_root,
569 PageHint::None,
570 Arc::new(TransactionGuard::fake()),
571 mem.clone(),
572 )?;
573 for result in freed_table.range::<FreedTableKey>(..)? {
574 let (_, freed_page_list) = result?;
575 for i in 0..freed_page_list.value().len() {
576 mem.mark_page_allocated(freed_page_list.value().get(i));
577 }
578 }
579
580 Ok(())
581 }
582
583 fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
584 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
587 for result in master_pages_iter {
588 let page = result?;
589 assert!(mem.is_allocated(page));
590 }
591
592 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
594 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
595
596 for entry in iter {
598 let definition = entry?.value();
599 definition.visit_all_pages(mem.clone(), |path| {
600 assert!(mem.is_allocated(path.page_number()));
601 Ok(())
602 })?;
603 }
604
605 Ok(())
606 }
607
608 fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
609 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
612 for page in master_pages_iter {
613 mem.mark_page_allocated(page?);
614 }
615
616 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
618 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
619
620 for entry in iter {
622 let definition = entry?.value();
623 definition.visit_all_pages(mem.clone(), |path| {
624 mem.mark_page_allocated(path.page_number());
625 Ok(())
626 })?;
627 }
628
629 Ok(())
630 }
631
632 fn do_repair(
633 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
635 ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
636 if !Self::verify_primary_checksums(mem.clone())? {
637 if mem.used_two_phase_commit() {
638 return Err(DatabaseError::Storage(StorageError::Corrupted(
639 "Primary is corrupted despite 2-phase commit".to_string(),
640 )));
641 }
642
643 let mut handle = RepairSession::new(0.3);
645 repair_callback(&mut handle);
646 if handle.aborted() {
647 return Err(DatabaseError::RepairAborted);
648 }
649
650 mem.repair_primary_corrupted();
651 mem.clear_read_cache();
655 if !Self::verify_primary_checksums(mem.clone())? {
656 return Err(DatabaseError::Storage(StorageError::Corrupted(
657 "Failed to repair database. All roots are corrupted".to_string(),
658 )));
659 }
660 }
661 let mut handle = RepairSession::new(0.6);
663 repair_callback(&mut handle);
664 if handle.aborted() {
665 return Err(DatabaseError::RepairAborted);
666 }
667
668 mem.begin_repair()?;
669
670 let data_root = mem.get_data_root();
671 if let Some(header) = data_root {
672 Self::mark_tables_recursive(header.root, mem.clone())?;
673 }
674
675 let freed_root = mem.get_freed_root();
676 Self::mark_freed_tree(freed_root, mem.clone())?;
678 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
679 "internal freed table".to_string(),
680 freed_root,
681 PageHint::None,
682 Arc::new(TransactionGuard::fake()),
683 mem.clone(),
684 )?;
685 drop(freed_table);
686
687 let mut handle = RepairSession::new(0.9);
689 repair_callback(&mut handle);
690 if handle.aborted() {
691 return Err(DatabaseError::RepairAborted);
692 }
693
694 let system_root = mem.get_system_root();
695 if let Some(header) = system_root {
696 Self::mark_tables_recursive(header.root, mem.clone())?;
697 }
698 #[cfg(debug_assertions)]
699 {
700 Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
703 }
704
705 mem.end_repair()?;
706
707 mem.clear_read_cache();
710
711 Ok([data_root, system_root, freed_root])
712 }
713
714 fn new(
715 file: Box<dyn StorageBackend>,
716 allow_initialize: bool,
717 page_size: usize,
718 region_size: Option<u64>,
719 read_cache_size_bytes: usize,
720 write_cache_size_bytes: usize,
721 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
722 ) -> Result<Self, DatabaseError> {
723 #[cfg(feature = "logging")]
724 let file_path = format!("{:?}", &file);
725 #[cfg(feature = "logging")]
726 info!("Opening database {:?}", &file_path);
727 let mem = TransactionalMemory::new(
728 file,
729 allow_initialize,
730 page_size,
731 region_size,
732 read_cache_size_bytes,
733 write_cache_size_bytes,
734 )?;
735 let mut mem = Arc::new(mem);
736 if mem.needs_repair()? {
737 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
740 #[cfg(feature = "logging")]
741 info!("Found valid allocator state, full repair not needed");
742 mem.load_allocator_state(&tree)?;
743 } else {
744 #[cfg(feature = "logging")]
745 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
746 let mut handle = RepairSession::new(0.0);
747 repair_callback(&mut handle);
748 if handle.aborted() {
749 return Err(DatabaseError::RepairAborted);
750 }
751 let [data_root, system_root, freed_root] =
752 Self::do_repair(&mut mem, repair_callback)?;
753 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
754 mem.commit(
755 data_root,
756 system_root,
757 freed_root,
758 next_transaction_id,
759 false,
760 true,
761 )?;
762 }
763 }
764
765 mem.begin_writable()?;
766 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
767
768 let db = Database {
769 mem,
770 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
771 };
772
773 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
775 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
776 db.transaction_tracker
777 .restore_savepoint_counter_state(next_id);
778 }
779 for id in txn.list_persistent_savepoints()? {
780 let savepoint = match txn.get_persistent_savepoint(id) {
781 Ok(savepoint) => savepoint,
782 Err(err) => match err {
783 SavepointError::InvalidSavepoint => unreachable!(),
784 SavepointError::Storage(storage) => {
785 return Err(storage.into());
786 }
787 },
788 };
789 db.transaction_tracker
790 .register_persistent_savepoint(&savepoint);
791 }
792 txn.abort()?;
793
794 Ok(db)
795 }
796
797 fn get_allocator_state_table(
798 mem: &Arc<TransactionalMemory>,
799 ) -> Result<Option<AllocatorStateTree>> {
800 if !mem.used_two_phase_commit() {
802 return Ok(None);
803 }
804
805 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
807 let system_table_tree = TableTreeMut::new(
808 mem.get_system_root(),
809 Arc::new(TransactionGuard::fake()),
810 mem.clone(),
811 fake_freed_pages.clone(),
812 );
813 let Some(allocator_state_table) = system_table_tree
814 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
815 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
816 else {
817 return Ok(None);
818 };
819
820 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
822 unreachable!();
823 };
824 let tree = AllocatorStateTree::new(
825 table_root,
826 Arc::new(TransactionGuard::fake()),
827 mem.clone(),
828 fake_freed_pages,
829 );
830
831 if !mem.is_valid_allocator_state(&tree)? {
833 return Ok(None);
834 }
835
836 Ok(Some(tree))
837 }
838
839 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
840 let id = self
841 .transaction_tracker
842 .register_read_transaction(&self.mem)?;
843
844 Ok(TransactionGuard::new_read(
845 id,
846 self.transaction_tracker.clone(),
847 ))
848 }
849
850 pub fn builder() -> Builder {
852 Builder::new()
853 }
854
855 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
861 self.mem.check_io_errors()?;
863 let guard = TransactionGuard::new_write(
864 self.transaction_tracker.start_write_transaction(),
865 self.transaction_tracker.clone(),
866 );
867 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
868 .map_err(|e| e.into())
869 }
870
871 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
879 let guard = self.allocate_read_transaction()?;
880 #[cfg(feature = "logging")]
881 debug!("Beginning read transaction id={:?}", guard.id());
882 ReadTransaction::new(self.get_memory(), guard)
883 }
884
885 fn ensure_allocator_state_table(&self) -> Result<(), Error> {
886 if Self::get_allocator_state_table(&self.mem)?.is_some() {
888 return Ok(());
889 }
890
891 #[cfg(feature = "logging")]
893 debug!("Writing allocator state table");
894 let mut tx = self.begin_write()?;
895 tx.set_quick_repair(true);
896 tx.commit()?;
897
898 Ok(())
899 }
900}
901
902impl Drop for Database {
903 fn drop(&mut self) {
904 if thread::panicking() {
905 return;
906 }
907
908 if self.ensure_allocator_state_table().is_err() {
909 #[cfg(feature = "logging")]
910 warn!("Failed to write allocator state table. Repair may be required at restart.")
911 }
912 }
913}
914
915pub struct RepairSession {
916 progress: f64,
917 aborted: bool,
918}
919
920impl RepairSession {
921 pub(crate) fn new(progress: f64) -> Self {
922 Self {
923 progress,
924 aborted: false,
925 }
926 }
927
928 pub(crate) fn aborted(&self) -> bool {
929 self.aborted
930 }
931
932 pub fn abort(&mut self) {
934 self.aborted = true;
935 }
936
937 pub fn progress(&self) -> f64 {
939 self.progress
940 }
941}
942
943pub struct Builder {
945 page_size: usize,
946 region_size: Option<u64>,
947 read_cache_size_bytes: usize,
948 write_cache_size_bytes: usize,
949 repair_callback: Box<dyn Fn(&mut RepairSession)>,
950}
951
952impl Builder {
953 #[allow(clippy::new_without_default)]
959 pub fn new() -> Self {
960 let mut result = Self {
961 page_size: PAGE_SIZE,
965 region_size: None,
966 read_cache_size_bytes: 0,
968 write_cache_size_bytes: 0,
970 repair_callback: Box::new(|_| {}),
971 };
972
973 result.set_cache_size(1024 * 1024 * 1024);
974 result
975 }
976
977 pub fn set_repair_callback(
985 &mut self,
986 callback: impl Fn(&mut RepairSession) + 'static,
987 ) -> &mut Self {
988 self.repair_callback = Box::new(callback);
989 self
990 }
991
992 #[cfg(any(fuzzing, test))]
1000 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1001 assert!(size.is_power_of_two());
1002 self.page_size = std::cmp::max(size, 512);
1003 self
1004 }
1005
1006 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1008 self.read_cache_size_bytes = bytes / 10 * 9;
1010 self.write_cache_size_bytes = bytes / 10;
1011 self
1012 }
1013
1014 #[cfg(any(test, fuzzing))]
1015 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1016 assert!(size.is_power_of_two());
1017 self.region_size = Some(size);
1018 self
1019 }
1020
1021 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1026 let file = OpenOptions::new()
1027 .read(true)
1028 .write(true)
1029 .create(true)
1030 .truncate(false)
1031 .open(path)?;
1032
1033 Database::new(
1034 Box::new(FileBackend::new(file)?),
1035 true,
1036 self.page_size,
1037 self.region_size,
1038 self.read_cache_size_bytes,
1039 self.write_cache_size_bytes,
1040 &self.repair_callback,
1041 )
1042 }
1043
1044 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1046 let file = OpenOptions::new().read(true).write(true).open(path)?;
1047
1048 Database::new(
1049 Box::new(FileBackend::new(file)?),
1050 false,
1051 self.page_size,
1052 None,
1053 self.read_cache_size_bytes,
1054 self.write_cache_size_bytes,
1055 &self.repair_callback,
1056 )
1057 }
1058
1059 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1063 Database::new(
1064 Box::new(FileBackend::new(file)?),
1065 true,
1066 self.page_size,
1067 self.region_size,
1068 self.read_cache_size_bytes,
1069 self.write_cache_size_bytes,
1070 &self.repair_callback,
1071 )
1072 }
1073
1074 pub fn create_with_backend(
1076 &self,
1077 backend: impl StorageBackend,
1078 ) -> Result<Database, DatabaseError> {
1079 Database::new(
1080 Box::new(backend),
1081 true,
1082 self.page_size,
1083 self.region_size,
1084 self.read_cache_size_bytes,
1085 self.write_cache_size_bytes,
1086 &self.repair_callback,
1087 )
1088 }
1089}
1090
1091impl std::fmt::Debug for Database {
1092 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1093 f.debug_struct("Database").finish()
1094 }
1095}
1096
1097#[cfg(test)]
1098mod test {
1099 use crate::backends::FileBackend;
1100 use crate::{
1101 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1102 StorageError, TableDefinition, TransactionError,
1103 };
1104 use std::fs::File;
1105 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1106 use std::sync::Arc;
1107 use std::sync::atomic::{AtomicU64, Ordering};
1108
1109 #[derive(Debug)]
1110 struct FailingBackend {
1111 inner: FileBackend,
1112 countdown: Arc<AtomicU64>,
1113 }
1114
1115 impl FailingBackend {
1116 fn new(backend: FileBackend, countdown: u64) -> Self {
1117 Self {
1118 inner: backend,
1119 countdown: Arc::new(AtomicU64::new(countdown)),
1120 }
1121 }
1122
1123 fn check_countdown(&self) -> Result<(), std::io::Error> {
1124 if self.countdown.load(Ordering::SeqCst) == 0 {
1125 return Err(std::io::Error::from(ErrorKind::Other));
1126 }
1127
1128 Ok(())
1129 }
1130
1131 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1132 if self
1133 .countdown
1134 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1135 if x > 0 { Some(x - 1) } else { None }
1136 })
1137 .is_err()
1138 {
1139 return Err(std::io::Error::from(ErrorKind::Other));
1140 }
1141
1142 Ok(())
1143 }
1144 }
1145
1146 impl StorageBackend for FailingBackend {
1147 fn len(&self) -> Result<u64, std::io::Error> {
1148 self.inner.len()
1149 }
1150
1151 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1152 self.check_countdown()?;
1153 self.inner.read(offset, len)
1154 }
1155
1156 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1157 self.inner.set_len(len)
1158 }
1159
1160 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1161 self.check_countdown()?;
1162 self.inner.sync_data(eventual)
1163 }
1164
1165 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1166 self.decrement_countdown()?;
1167 self.inner.write(offset, data)
1168 }
1169 }
1170
1171 #[test]
1172 fn crash_regression4() {
1173 let tmpfile = crate::create_tempfile();
1174 let (file, path) = tmpfile.into_parts();
1175
1176 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1177 let db = Database::builder()
1178 .set_cache_size(12686)
1179 .set_page_size(8 * 1024)
1180 .set_region_size(32 * 4096)
1181 .create_with_backend(backend)
1182 .unwrap();
1183
1184 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1185
1186 let tx = db.begin_write().unwrap();
1187 let _savepoint = tx.ephemeral_savepoint().unwrap();
1188 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1189 tx.commit().unwrap();
1190 let tx = db.begin_write().unwrap();
1191 {
1192 let mut table = tx.open_table(table_def).unwrap();
1193 let _ = table.insert_reserve(118821, 360).unwrap();
1194 }
1195 let result = tx.commit();
1196 assert!(result.is_err());
1197
1198 drop(db);
1199 Database::builder()
1200 .set_cache_size(1024 * 1024)
1201 .set_page_size(8 * 1024)
1202 .set_region_size(32 * 4096)
1203 .create(&path)
1204 .unwrap();
1205 }
1206
1207 #[test]
1208 fn transient_io_error() {
1209 let tmpfile = crate::create_tempfile();
1210 let (file, path) = tmpfile.into_parts();
1211
1212 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1213 let countdown = backend.countdown.clone();
1214 let db = Database::builder()
1215 .set_cache_size(0)
1216 .create_with_backend(backend)
1217 .unwrap();
1218
1219 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1220
1221 let tx = db.begin_write().unwrap();
1223 {
1224 let mut table = tx.open_table(table_def).unwrap();
1225 table.insert(0, 0).unwrap();
1226 }
1227 tx.commit().unwrap();
1228 let tx = db.begin_write().unwrap();
1229 {
1230 let mut table = tx.open_table(table_def).unwrap();
1231 table.insert(0, 1).unwrap();
1232 }
1233 tx.commit().unwrap();
1234
1235 let tx = db.begin_write().unwrap();
1236 countdown.store(0, Ordering::SeqCst);
1238 let result = tx.commit().err().unwrap();
1239 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1240 let result = db.begin_write().err().unwrap();
1241 assert!(matches!(
1242 result,
1243 TransactionError::Storage(StorageError::PreviousIo)
1244 ));
1245 countdown.store(u64::MAX, Ordering::SeqCst);
1247 drop(db);
1248
1249 let mut file = File::open(&path).unwrap();
1251 file.seek(SeekFrom::Start(9)).unwrap();
1252 let mut god_byte = vec![0u8];
1253 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1254 assert_ne!(god_byte[0] & 2, 0);
1255 }
1256
1257 #[test]
1258 fn small_pages() {
1259 let tmpfile = crate::create_tempfile();
1260
1261 let db = Database::builder()
1262 .set_page_size(512)
1263 .create(tmpfile.path())
1264 .unwrap();
1265
1266 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1267 let txn = db.begin_write().unwrap();
1268 {
1269 txn.open_table(table_definition).unwrap();
1270 }
1271 txn.commit().unwrap();
1272 }
1273
1274 #[test]
1275 fn small_pages2() {
1276 let tmpfile = crate::create_tempfile();
1277
1278 let db = Database::builder()
1279 .set_page_size(512)
1280 .create(tmpfile.path())
1281 .unwrap();
1282
1283 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1284
1285 let mut tx = db.begin_write().unwrap();
1286 tx.set_two_phase_commit(true);
1287 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1288 {
1289 tx.open_table(table_def).unwrap();
1290 }
1291 tx.commit().unwrap();
1292
1293 let mut tx = db.begin_write().unwrap();
1294 tx.set_two_phase_commit(true);
1295 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1296 tx.restore_savepoint(&savepoint0).unwrap();
1297 tx.set_durability(Durability::None);
1298 {
1299 let mut t = tx.open_table(table_def).unwrap();
1300 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1301 assert!(t.remove(&291295).unwrap().is_none());
1302 }
1303 tx.commit().unwrap();
1304
1305 let mut tx = db.begin_write().unwrap();
1306 tx.set_two_phase_commit(true);
1307 tx.restore_savepoint(&savepoint0).unwrap();
1308 {
1309 tx.open_table(table_def).unwrap();
1310 }
1311 tx.commit().unwrap();
1312
1313 let mut tx = db.begin_write().unwrap();
1314 tx.set_two_phase_commit(true);
1315 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1316 drop(savepoint0);
1317 tx.restore_savepoint(&savepoint2).unwrap();
1318 {
1319 let mut t = tx.open_table(table_def).unwrap();
1320 assert!(t.get(&2059).unwrap().is_none());
1321 assert!(t.remove(&145227).unwrap().is_none());
1322 assert!(t.remove(&145227).unwrap().is_none());
1323 }
1324 tx.commit().unwrap();
1325
1326 let mut tx = db.begin_write().unwrap();
1327 tx.set_two_phase_commit(true);
1328 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1329 drop(savepoint1);
1330 tx.restore_savepoint(&savepoint3).unwrap();
1331 {
1332 tx.open_table(table_def).unwrap();
1333 }
1334 tx.commit().unwrap();
1335
1336 let mut tx = db.begin_write().unwrap();
1337 tx.set_two_phase_commit(true);
1338 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1339 drop(savepoint2);
1340 tx.restore_savepoint(&savepoint3).unwrap();
1341 tx.set_durability(Durability::None);
1342 {
1343 let mut t = tx.open_table(table_def).unwrap();
1344 assert!(t.remove(&207936).unwrap().is_none());
1345 }
1346 tx.abort().unwrap();
1347
1348 let mut tx = db.begin_write().unwrap();
1349 tx.set_two_phase_commit(true);
1350 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1351 drop(savepoint3);
1352 assert!(tx.restore_savepoint(&savepoint4).is_err());
1353 {
1354 tx.open_table(table_def).unwrap();
1355 }
1356 tx.commit().unwrap();
1357
1358 let mut tx = db.begin_write().unwrap();
1359 tx.set_two_phase_commit(true);
1360 tx.restore_savepoint(&savepoint5).unwrap();
1361 tx.set_durability(Durability::None);
1362 {
1363 tx.open_table(table_def).unwrap();
1364 }
1365 tx.commit().unwrap();
1366 }
1367
1368 #[test]
1369 fn small_pages3() {
1370 let tmpfile = crate::create_tempfile();
1371
1372 let db = Database::builder()
1373 .set_page_size(1024)
1374 .create(tmpfile.path())
1375 .unwrap();
1376
1377 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1378
1379 let mut tx = db.begin_write().unwrap();
1380 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1381 tx.set_durability(Durability::None);
1382 {
1383 let mut t = tx.open_table(table_def).unwrap();
1384 let value = vec![0; 306];
1385 t.insert(&539717, value.as_slice()).unwrap();
1386 }
1387 tx.abort().unwrap();
1388
1389 let mut tx = db.begin_write().unwrap();
1390 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1391 tx.restore_savepoint(&savepoint1).unwrap();
1392 tx.set_durability(Durability::None);
1393 {
1394 let mut t = tx.open_table(table_def).unwrap();
1395 let value = vec![0; 2008];
1396 t.insert(&784384, value.as_slice()).unwrap();
1397 }
1398 tx.abort().unwrap();
1399 }
1400
1401 #[test]
1402 fn small_pages4() {
1403 let tmpfile = crate::create_tempfile();
1404
1405 let db = Database::builder()
1406 .set_cache_size(1024 * 1024)
1407 .set_page_size(1024)
1408 .create(tmpfile.path())
1409 .unwrap();
1410
1411 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1412
1413 let tx = db.begin_write().unwrap();
1414 {
1415 tx.open_table(table_def).unwrap();
1416 }
1417 tx.commit().unwrap();
1418
1419 let tx = db.begin_write().unwrap();
1420 {
1421 let mut t = tx.open_table(table_def).unwrap();
1422 assert!(t.get(&131072).unwrap().is_none());
1423 let value = vec![0xFF; 1130];
1424 t.insert(&42394, value.as_slice()).unwrap();
1425 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1426 assert!(t.get(&0).unwrap().is_none());
1427 }
1428 tx.abort().unwrap();
1429
1430 let tx = db.begin_write().unwrap();
1431 {
1432 let mut t = tx.open_table(table_def).unwrap();
1433 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1434 }
1435 tx.abort().unwrap();
1436 }
1437
1438 #[test]
1439 fn dynamic_shrink() {
1440 let tmpfile = crate::create_tempfile();
1441 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1442 let big_value = vec![0u8; 1024];
1443
1444 let db = Database::builder()
1445 .set_region_size(1024 * 1024)
1446 .create(tmpfile.path())
1447 .unwrap();
1448
1449 let txn = db.begin_write().unwrap();
1450 {
1451 let mut table = txn.open_table(table_definition).unwrap();
1452 for i in 0..2048 {
1453 table.insert(&i, big_value.as_slice()).unwrap();
1454 }
1455 }
1456 txn.commit().unwrap();
1457
1458 let file_size = tmpfile.as_file().metadata().unwrap().len();
1459
1460 let txn = db.begin_write().unwrap();
1461 {
1462 let mut table = txn.open_table(table_definition).unwrap();
1463 for i in 0..2048 {
1464 table.remove(&i).unwrap();
1465 }
1466 }
1467 txn.commit().unwrap();
1468
1469 let txn = db.begin_write().unwrap();
1471 {
1472 let mut table = txn.open_table(table_definition).unwrap();
1473 table.insert(0, [].as_slice()).unwrap();
1474 }
1475 txn.commit().unwrap();
1476 let txn = db.begin_write().unwrap();
1477 {
1478 let mut table = txn.open_table(table_definition).unwrap();
1479 table.remove(0).unwrap();
1480 }
1481 txn.commit().unwrap();
1482 let txn = db.begin_write().unwrap();
1483 txn.commit().unwrap();
1484
1485 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1486 assert!(final_file_size < file_size);
1487 }
1488
1489 #[test]
1490 fn create_new_db_in_empty_file() {
1491 let tmpfile = crate::create_tempfile();
1492
1493 let _db = Database::builder()
1494 .create_file(tmpfile.into_file())
1495 .unwrap();
1496 }
1497
1498 #[test]
1499 fn open_missing_file() {
1500 let tmpfile = crate::create_tempfile();
1501
1502 let err = Database::builder()
1503 .open(tmpfile.path().with_extension("missing"))
1504 .unwrap_err();
1505
1506 match err {
1507 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1508 err => panic!("Unexpected error for empty file: {err}"),
1509 }
1510 }
1511
1512 #[test]
1513 fn open_empty_file() {
1514 let tmpfile = crate::create_tempfile();
1515
1516 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1517
1518 match err {
1519 DatabaseError::Storage(StorageError::Io(err))
1520 if err.kind() == ErrorKind::InvalidData => {}
1521 err => panic!("Unexpected error for empty file: {err}"),
1522 }
1523 }
1524}