1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeHeader, BtreeRangeIter, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTreeMut,
5 TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{Key, Value};
8use crate::{CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError};
9use crate::{ReadTransaction, Result, WriteTransaction};
10use std::fmt::{Debug, Display, Formatter};
11
12use std::fs::{File, OpenOptions};
13use std::io::ErrorKind;
14use std::marker::PhantomData;
15use std::ops::RangeFull;
16use std::path::Path;
17use std::sync::{Arc, Mutex};
18use std::{io, thread};
19
20use crate::error::TransactionError;
21use crate::sealed::Sealed;
22use crate::transactions::{
23 AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE,
24};
25use crate::tree_store::file_backend::FileBackend;
26#[cfg(feature = "logging")]
27use log::{debug, info, warn};
28
29#[allow(clippy::len_without_is_empty)]
30pub trait StorageBackend: 'static + Debug + Send + Sync {
32 fn len(&self) -> std::result::Result<u64, io::Error>;
34
35 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
39
40 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
44
45 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
51
52 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
54}
55
56pub trait TableHandle: Sealed {
57 fn name(&self) -> &str;
59}
60
61#[derive(Clone)]
62pub struct UntypedTableHandle {
63 name: String,
64}
65
66impl UntypedTableHandle {
67 pub(crate) fn new(name: String) -> Self {
68 Self { name }
69 }
70}
71
72impl TableHandle for UntypedTableHandle {
73 fn name(&self) -> &str {
74 &self.name
75 }
76}
77
78impl Sealed for UntypedTableHandle {}
79
80pub trait MultimapTableHandle: Sealed {
81 fn name(&self) -> &str;
83}
84
85#[derive(Clone)]
86pub struct UntypedMultimapTableHandle {
87 name: String,
88}
89
90impl UntypedMultimapTableHandle {
91 pub(crate) fn new(name: String) -> Self {
92 Self { name }
93 }
94}
95
96impl MultimapTableHandle for UntypedMultimapTableHandle {
97 fn name(&self) -> &str {
98 &self.name
99 }
100}
101
102impl Sealed for UntypedMultimapTableHandle {}
103
104pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
111 name: &'a str,
112 _key_type: PhantomData<K>,
113 _value_type: PhantomData<V>,
114}
115
116impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
117 pub const fn new(name: &'a str) -> Self {
123 assert!(!name.is_empty());
124 Self {
125 name,
126 _key_type: PhantomData,
127 _value_type: PhantomData,
128 }
129 }
130}
131
132impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'a, K, V> {
133 fn name(&self) -> &str {
134 self.name
135 }
136}
137
138impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
139
140impl<'a, K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'a, K, V> {
141 fn clone(&self) -> Self {
142 *self
143 }
144}
145
146impl<'a, K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'a, K, V> {}
147
148impl<'a, K: Key + 'static, V: Value + 'static> Display for TableDefinition<'a, K, V> {
149 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
150 write!(
151 f,
152 "{}<{}, {}>",
153 self.name,
154 K::type_name().name(),
155 V::type_name().name()
156 )
157 }
158}
159
160pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
169 name: &'a str,
170 _key_type: PhantomData<K>,
171 _value_type: PhantomData<V>,
172}
173
174impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
175 pub const fn new(name: &'a str) -> Self {
176 assert!(!name.is_empty());
177 Self {
178 name,
179 _key_type: PhantomData,
180 _value_type: PhantomData,
181 }
182 }
183}
184
185impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableHandle
186 for MultimapTableDefinition<'a, K, V>
187{
188 fn name(&self) -> &str {
189 self.name
190 }
191}
192
193impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
194
195impl<'a, K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'a, K, V> {
196 fn clone(&self) -> Self {
197 *self
198 }
199}
200
201impl<'a, K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
202
203impl<'a, K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'a, K, V> {
204 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205 write!(
206 f,
207 "{}<{}, {}>",
208 self.name,
209 K::type_name().name(),
210 V::type_name().name()
211 )
212 }
213}
214
215pub(crate) struct TransactionGuard {
216 transaction_tracker: Option<Arc<TransactionTracker>>,
217 transaction_id: Option<TransactionId>,
218 write_transaction: bool,
219}
220
221impl TransactionGuard {
222 pub(crate) fn new_read(
223 transaction_id: TransactionId,
224 tracker: Arc<TransactionTracker>,
225 ) -> Self {
226 Self {
227 transaction_tracker: Some(tracker),
228 transaction_id: Some(transaction_id),
229 write_transaction: false,
230 }
231 }
232
233 pub(crate) fn new_write(
234 transaction_id: TransactionId,
235 tracker: Arc<TransactionTracker>,
236 ) -> Self {
237 Self {
238 transaction_tracker: Some(tracker),
239 transaction_id: Some(transaction_id),
240 write_transaction: true,
241 }
242 }
243
244 pub(crate) fn fake() -> Self {
246 Self {
247 transaction_tracker: None,
248 transaction_id: None,
249 write_transaction: false,
250 }
251 }
252
253 pub(crate) fn id(&self) -> TransactionId {
254 self.transaction_id.unwrap()
255 }
256
257 pub(crate) fn leak(mut self) -> TransactionId {
258 self.transaction_id.take().unwrap()
259 }
260}
261
262impl Drop for TransactionGuard {
263 fn drop(&mut self) {
264 if self.transaction_tracker.is_none() {
265 return;
266 }
267 if let Some(transaction_id) = self.transaction_id {
268 if self.write_transaction {
269 self.transaction_tracker
270 .as_ref()
271 .unwrap()
272 .end_write_transaction(transaction_id);
273 } else {
274 self.transaction_tracker
275 .as_ref()
276 .unwrap()
277 .deallocate_read_transaction(transaction_id);
278 }
279 }
280 }
281}
282
283pub struct Database {
314 mem: Arc<TransactionalMemory>,
315 transaction_tracker: Arc<TransactionTracker>,
316}
317
318impl Database {
319 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
324 Self::builder().create(path)
325 }
326
327 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
329 Self::builder().open(path)
330 }
331
332 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
333 self.mem.clone()
334 }
335
336 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
337 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
338 let table_tree = TableTreeMut::new(
339 mem.get_data_root(),
340 Arc::new(TransactionGuard::fake()),
341 mem.clone(),
342 fake_freed_pages.clone(),
343 );
344 if !table_tree.verify_checksums()? {
345 return Ok(false);
346 }
347 let system_table_tree = TableTreeMut::new(
348 mem.get_system_root(),
349 Arc::new(TransactionGuard::fake()),
350 mem.clone(),
351 fake_freed_pages.clone(),
352 );
353 if !system_table_tree.verify_checksums()? {
354 return Ok(false);
355 }
356 assert!(fake_freed_pages.lock().unwrap().is_empty());
357
358 if let Some(header) = mem.get_freed_root() {
359 if !RawBtree::new(
360 Some(header),
361 FreedTableKey::fixed_width(),
362 FreedPageList::fixed_width(),
363 mem.clone(),
364 )
365 .verify_checksum()?
366 {
367 return Ok(false);
368 }
369 }
370
371 Ok(true)
372 }
373
374 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
384 let allocator_hash = self.mem.allocator_hash();
385 let mut was_clean = Arc::get_mut(&mut self.mem)
386 .unwrap()
387 .clear_cache_and_reload()?;
388
389 let old_roots = [
390 self.mem.get_data_root(),
391 self.mem.get_system_root(),
392 self.mem.get_freed_root(),
393 ];
394
395 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
396 DatabaseError::Storage(storage_err) => storage_err,
397 _ => unreachable!(),
398 })?;
399
400 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
401 was_clean = false;
402 }
403
404 if !was_clean {
405 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
406 let [data_root, system_root, freed_root] = new_roots;
407 self.mem.commit(
408 data_root,
409 system_root,
410 freed_root,
411 next_transaction_id,
412 false,
413 true,
414 )?;
415 }
416
417 self.mem.begin_writable()?;
418
419 Ok(was_clean)
420 }
421
422 pub fn compact(&mut self) -> Result<bool, CompactionError> {
426 if self
427 .transaction_tracker
428 .oldest_live_read_transaction()
429 .is_some()
430 {
431 return Err(CompactionError::TransactionInProgress);
432 }
433 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
438 if txn.list_persistent_savepoints()?.next().is_some() {
439 return Err(CompactionError::PersistentSavepointExists);
440 }
441 if self.transaction_tracker.any_savepoint_exists() {
442 return Err(CompactionError::EphemeralSavepointExists);
443 }
444 txn.set_two_phase_commit(true);
445 txn.commit().map_err(|e| e.into_storage_error())?;
446 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
448 txn.set_two_phase_commit(true);
449 txn.commit().map_err(|e| e.into_storage_error())?;
450 assert!(self.mem.get_freed_root().is_none());
453
454 let mut compacted = false;
455 loop {
457 let mut progress = false;
458
459 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
460 if txn.compact_pages()? {
461 progress = true;
462 txn.commit().map_err(|e| e.into_storage_error())?;
463 } else {
464 txn.abort()?;
465 }
466
467 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
469 txn.set_two_phase_commit(true);
470 txn.commit().map_err(|e| e.into_storage_error())?;
471 assert!(self.mem.get_freed_root().is_none());
472
473 if !progress {
474 break;
475 }
476
477 compacted = true;
478 }
479
480 Ok(compacted)
481 }
482
483 fn check_repaired_persistent_savepoints(
484 system_root: Option<BtreeHeader>,
485 mem: Arc<TransactionalMemory>,
486 ) -> Result {
487 let freed_list = Arc::new(Mutex::new(vec![]));
488 let table_tree = TableTreeMut::new(
489 system_root,
490 Arc::new(TransactionGuard::fake()),
491 mem.clone(),
492 freed_list,
493 );
494 let fake_transaction_tracker = Arc::new(TransactionTracker::new(TransactionId::new(0)));
495 if let Some(savepoint_table_def) = table_tree
496 .get_table::<SavepointId, SerializedSavepoint>(
497 SAVEPOINT_TABLE.name(),
498 TableType::Normal,
499 )
500 .map_err(|e| {
501 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
502 })?
503 {
504 let savepoint_table_root =
505 if let InternalTableDefinition::Normal { table_root, .. } = savepoint_table_def {
506 table_root
507 } else {
508 unreachable!()
509 };
510 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
511 ReadOnlyTable::new(
512 "internal savepoint table".to_string(),
513 savepoint_table_root,
514 PageHint::None,
515 Arc::new(TransactionGuard::fake()),
516 mem.clone(),
517 )?;
518 for result in savepoint_table.range::<SavepointId>(..)? {
519 let (_, savepoint_data) = result?;
520 let savepoint = savepoint_data
521 .value()
522 .to_savepoint(fake_transaction_tracker.clone());
523 if let Some(header) = savepoint.get_user_root() {
524 Self::check_pages_allocated_recursive(header.root, mem.clone())?;
525 }
526 }
527 }
528
529 Ok(())
530 }
531
532 fn mark_freed_tree(freed_root: Option<BtreeHeader>, mem: Arc<TransactionalMemory>) -> Result {
533 if let Some(header) = freed_root {
534 let freed_pages_iter = AllPageNumbersBtreeIter::new(
535 header.root,
536 FreedTableKey::fixed_width(),
537 FreedPageList::fixed_width(),
538 mem.clone(),
539 )?;
540 for page in freed_pages_iter {
541 mem.mark_page_allocated(page?);
542 }
543 }
544
545 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
546 "internal freed table".to_string(),
547 freed_root,
548 PageHint::None,
549 Arc::new(TransactionGuard::fake()),
550 mem.clone(),
551 )?;
552 for result in freed_table.range::<FreedTableKey>(..)? {
553 let (_, freed_page_list) = result?;
554 for i in 0..freed_page_list.value().len() {
555 mem.mark_page_allocated(freed_page_list.value().get(i));
556 }
557 }
558
559 Ok(())
560 }
561
562 fn check_pages_allocated_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
563 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
566 for result in master_pages_iter {
567 let page = result?;
568 assert!(mem.is_allocated(page));
569 }
570
571 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
573 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
574
575 for entry in iter {
577 let definition = entry?.value();
578 definition.visit_all_pages(mem.clone(), |path| {
579 assert!(mem.is_allocated(path.page_number()));
580 Ok(())
581 })?;
582 }
583
584 Ok(())
585 }
586
587 fn mark_tables_recursive(root: PageNumber, mem: Arc<TransactionalMemory>) -> Result {
588 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem.clone())?;
591 for page in master_pages_iter {
592 mem.mark_page_allocated(page?);
593 }
594
595 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
597 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem.clone())?;
598
599 for entry in iter {
601 let definition = entry?.value();
602 definition.visit_all_pages(mem.clone(), |path| {
603 mem.mark_page_allocated(path.page_number());
604 Ok(())
605 })?;
606 }
607
608 Ok(())
609 }
610
611 fn do_repair(
612 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
614 ) -> Result<[Option<BtreeHeader>; 3], DatabaseError> {
615 if !Self::verify_primary_checksums(mem.clone())? {
616 if mem.used_two_phase_commit() {
617 return Err(DatabaseError::Storage(StorageError::Corrupted(
618 "Primary is corrupted despite 2-phase commit".to_string(),
619 )));
620 }
621
622 let mut handle = RepairSession::new(0.3);
624 repair_callback(&mut handle);
625 if handle.aborted() {
626 return Err(DatabaseError::RepairAborted);
627 }
628
629 mem.repair_primary_corrupted();
630 mem.clear_read_cache();
634 if !Self::verify_primary_checksums(mem.clone())? {
635 return Err(DatabaseError::Storage(StorageError::Corrupted(
636 "Failed to repair database. All roots are corrupted".to_string(),
637 )));
638 }
639 }
640 let mut handle = RepairSession::new(0.6);
642 repair_callback(&mut handle);
643 if handle.aborted() {
644 return Err(DatabaseError::RepairAborted);
645 }
646
647 mem.begin_repair()?;
648
649 let data_root = mem.get_data_root();
650 if let Some(header) = data_root {
651 Self::mark_tables_recursive(header.root, mem.clone())?;
652 }
653
654 let freed_root = mem.get_freed_root();
655 Self::mark_freed_tree(freed_root, mem.clone())?;
657 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
658 "internal freed table".to_string(),
659 freed_root,
660 PageHint::None,
661 Arc::new(TransactionGuard::fake()),
662 mem.clone(),
663 )?;
664 drop(freed_table);
665
666 let mut handle = RepairSession::new(0.9);
668 repair_callback(&mut handle);
669 if handle.aborted() {
670 return Err(DatabaseError::RepairAborted);
671 }
672
673 let system_root = mem.get_system_root();
674 if let Some(header) = system_root {
675 Self::mark_tables_recursive(header.root, mem.clone())?;
676 }
677 #[cfg(debug_assertions)]
678 {
679 Self::check_repaired_persistent_savepoints(system_root, mem.clone())?;
682 }
683
684 mem.end_repair()?;
685
686 mem.clear_read_cache();
689
690 Ok([data_root, system_root, freed_root])
691 }
692
693 fn new(
694 file: Box<dyn StorageBackend>,
695 page_size: usize,
696 region_size: Option<u64>,
697 read_cache_size_bytes: usize,
698 write_cache_size_bytes: usize,
699 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
700 ) -> Result<Self, DatabaseError> {
701 #[cfg(feature = "logging")]
702 let file_path = format!("{:?}", &file);
703 #[cfg(feature = "logging")]
704 info!("Opening database {:?}", &file_path);
705 let mem = TransactionalMemory::new(
706 file,
707 page_size,
708 region_size,
709 read_cache_size_bytes,
710 write_cache_size_bytes,
711 )?;
712 let mut mem = Arc::new(mem);
713 if mem.needs_repair()? {
714 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
717 #[cfg(feature = "logging")]
718 info!("Found valid allocator state, full repair not needed");
719 mem.load_allocator_state(&tree)?;
720 } else {
721 #[cfg(feature = "logging")]
722 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
723 let mut handle = RepairSession::new(0.0);
724 repair_callback(&mut handle);
725 if handle.aborted() {
726 return Err(DatabaseError::RepairAborted);
727 }
728 let [data_root, system_root, freed_root] =
729 Self::do_repair(&mut mem, repair_callback)?;
730 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
731 mem.commit(
732 data_root,
733 system_root,
734 freed_root,
735 next_transaction_id,
736 false,
737 true,
738 )?;
739 }
740 }
741
742 mem.begin_writable()?;
743 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
744
745 let db = Database {
746 mem,
747 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
748 };
749
750 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
752 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
753 db.transaction_tracker
754 .restore_savepoint_counter_state(next_id);
755 }
756 for id in txn.list_persistent_savepoints()? {
757 let savepoint = match txn.get_persistent_savepoint(id) {
758 Ok(savepoint) => savepoint,
759 Err(err) => match err {
760 SavepointError::InvalidSavepoint => unreachable!(),
761 SavepointError::Storage(storage) => {
762 return Err(storage.into());
763 }
764 },
765 };
766 db.transaction_tracker
767 .register_persistent_savepoint(&savepoint);
768 }
769 txn.abort()?;
770
771 Ok(db)
772 }
773
774 fn get_allocator_state_table(
775 mem: &Arc<TransactionalMemory>,
776 ) -> Result<Option<AllocatorStateTree>> {
777 if !mem.used_two_phase_commit() {
779 return Ok(None);
780 }
781
782 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
784 let system_table_tree = TableTreeMut::new(
785 mem.get_system_root(),
786 Arc::new(TransactionGuard::fake()),
787 mem.clone(),
788 fake_freed_pages.clone(),
789 );
790 let Some(allocator_state_table) = system_table_tree
791 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
792 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
793 else {
794 return Ok(None);
795 };
796
797 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
799 unreachable!();
800 };
801 let tree = AllocatorStateTree::new(
802 table_root,
803 Arc::new(TransactionGuard::fake()),
804 mem.clone(),
805 fake_freed_pages,
806 );
807
808 if !mem.is_valid_allocator_state(&tree)? {
810 return Ok(None);
811 }
812
813 Ok(Some(tree))
814 }
815
816 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
817 let id = self
818 .transaction_tracker
819 .register_read_transaction(&self.mem)?;
820
821 Ok(TransactionGuard::new_read(
822 id,
823 self.transaction_tracker.clone(),
824 ))
825 }
826
827 pub fn builder() -> Builder {
829 Builder::new()
830 }
831
832 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
838 self.mem.check_io_errors()?;
840 let guard = TransactionGuard::new_write(
841 self.transaction_tracker.start_write_transaction(),
842 self.transaction_tracker.clone(),
843 );
844 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
845 .map_err(|e| e.into())
846 }
847
848 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
856 let guard = self.allocate_read_transaction()?;
857 #[cfg(feature = "logging")]
858 debug!("Beginning read transaction id={:?}", guard.id());
859 ReadTransaction::new(self.get_memory(), guard)
860 }
861
862 fn ensure_allocator_state_table(&self) -> Result<(), Error> {
863 if Self::get_allocator_state_table(&self.mem)?.is_some() {
865 return Ok(());
866 }
867
868 #[cfg(feature = "logging")]
870 debug!("Writing allocator state table");
871 let mut tx = self.begin_write()?;
872 tx.set_quick_repair(true);
873 tx.commit()?;
874
875 Ok(())
876 }
877}
878
879impl Drop for Database {
880 fn drop(&mut self) {
881 if thread::panicking() {
882 return;
883 }
884
885 if self.ensure_allocator_state_table().is_err() {
886 #[cfg(feature = "logging")]
887 warn!("Failed to write allocator state table. Repair may be required at restart.")
888 }
889 }
890}
891
892pub struct RepairSession {
893 progress: f64,
894 aborted: bool,
895}
896
897impl RepairSession {
898 pub(crate) fn new(progress: f64) -> Self {
899 Self {
900 progress,
901 aborted: false,
902 }
903 }
904
905 pub(crate) fn aborted(&self) -> bool {
906 self.aborted
907 }
908
909 pub fn abort(&mut self) {
911 self.aborted = true;
912 }
913
914 pub fn progress(&self) -> f64 {
916 self.progress
917 }
918}
919
920pub struct Builder {
922 page_size: usize,
923 region_size: Option<u64>,
924 read_cache_size_bytes: usize,
925 write_cache_size_bytes: usize,
926 repair_callback: Box<dyn Fn(&mut RepairSession)>,
927}
928
929impl Builder {
930 #[allow(clippy::new_without_default)]
936 pub fn new() -> Self {
937 let mut result = Self {
938 page_size: PAGE_SIZE,
942 region_size: None,
943 read_cache_size_bytes: 0,
945 write_cache_size_bytes: 0,
947 repair_callback: Box::new(|_| {}),
948 };
949
950 result.set_cache_size(1024 * 1024 * 1024);
951 result
952 }
953
954 pub fn set_repair_callback(
962 &mut self,
963 callback: impl Fn(&mut RepairSession) + 'static,
964 ) -> &mut Self {
965 self.repair_callback = Box::new(callback);
966 self
967 }
968
969 #[cfg(any(fuzzing, test))]
977 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
978 assert!(size.is_power_of_two());
979 self.page_size = std::cmp::max(size, 512);
980 self
981 }
982
983 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
985 self.read_cache_size_bytes = bytes / 10 * 9;
987 self.write_cache_size_bytes = bytes / 10;
988 self
989 }
990
991 #[cfg(any(test, fuzzing))]
992 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
993 assert!(size.is_power_of_two());
994 self.region_size = Some(size);
995 self
996 }
997
998 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1003 let file = OpenOptions::new()
1004 .read(true)
1005 .write(true)
1006 .create(true)
1007 .truncate(false)
1008 .open(path)?;
1009
1010 Database::new(
1011 Box::new(FileBackend::new(file)?),
1012 self.page_size,
1013 self.region_size,
1014 self.read_cache_size_bytes,
1015 self.write_cache_size_bytes,
1016 &self.repair_callback,
1017 )
1018 }
1019
1020 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1022 let file = OpenOptions::new().read(true).write(true).open(path)?;
1023
1024 if file.metadata()?.len() == 0 {
1025 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
1026 }
1027
1028 Database::new(
1029 Box::new(FileBackend::new(file)?),
1030 self.page_size,
1031 None,
1032 self.read_cache_size_bytes,
1033 self.write_cache_size_bytes,
1034 &self.repair_callback,
1035 )
1036 }
1037
1038 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1042 Database::new(
1043 Box::new(FileBackend::new(file)?),
1044 self.page_size,
1045 self.region_size,
1046 self.read_cache_size_bytes,
1047 self.write_cache_size_bytes,
1048 &self.repair_callback,
1049 )
1050 }
1051
1052 pub fn create_with_backend(
1054 &self,
1055 backend: impl StorageBackend,
1056 ) -> Result<Database, DatabaseError> {
1057 Database::new(
1058 Box::new(backend),
1059 self.page_size,
1060 self.region_size,
1061 self.read_cache_size_bytes,
1062 self.write_cache_size_bytes,
1063 &self.repair_callback,
1064 )
1065 }
1066}
1067
1068impl std::fmt::Debug for Database {
1069 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1070 f.debug_struct("Database").finish()
1071 }
1072}
1073
1074#[cfg(test)]
1075mod test {
1076 use crate::backends::FileBackend;
1077 use crate::{
1078 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1079 StorageError, TableDefinition, TransactionError,
1080 };
1081 use std::fs::File;
1082 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1083 use std::sync::atomic::{AtomicU64, Ordering};
1084 use std::sync::Arc;
1085
1086 #[derive(Debug)]
1087 struct FailingBackend {
1088 inner: FileBackend,
1089 countdown: Arc<AtomicU64>,
1090 }
1091
1092 impl FailingBackend {
1093 fn new(backend: FileBackend, countdown: u64) -> Self {
1094 Self {
1095 inner: backend,
1096 countdown: Arc::new(AtomicU64::new(countdown)),
1097 }
1098 }
1099
1100 fn check_countdown(&self) -> Result<(), std::io::Error> {
1101 if self.countdown.load(Ordering::SeqCst) == 0 {
1102 return Err(std::io::Error::from(ErrorKind::Other));
1103 }
1104
1105 Ok(())
1106 }
1107
1108 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1109 if self
1110 .countdown
1111 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1112 if x > 0 {
1113 Some(x - 1)
1114 } else {
1115 None
1116 }
1117 })
1118 .is_err()
1119 {
1120 return Err(std::io::Error::from(ErrorKind::Other));
1121 }
1122
1123 Ok(())
1124 }
1125 }
1126
1127 impl StorageBackend for FailingBackend {
1128 fn len(&self) -> Result<u64, std::io::Error> {
1129 self.inner.len()
1130 }
1131
1132 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1133 self.check_countdown()?;
1134 self.inner.read(offset, len)
1135 }
1136
1137 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1138 self.inner.set_len(len)
1139 }
1140
1141 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1142 self.check_countdown()?;
1143 self.inner.sync_data(eventual)
1144 }
1145
1146 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1147 self.decrement_countdown()?;
1148 self.inner.write(offset, data)
1149 }
1150 }
1151
1152 #[test]
1153 fn crash_regression4() {
1154 let tmpfile = crate::create_tempfile();
1155 let (file, path) = tmpfile.into_parts();
1156
1157 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 23);
1158 let db = Database::builder()
1159 .set_cache_size(12686)
1160 .set_page_size(8 * 1024)
1161 .set_region_size(32 * 4096)
1162 .create_with_backend(backend)
1163 .unwrap();
1164
1165 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1166
1167 let tx = db.begin_write().unwrap();
1168 let _savepoint = tx.ephemeral_savepoint().unwrap();
1169 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1170 tx.commit().unwrap();
1171 let tx = db.begin_write().unwrap();
1172 {
1173 let mut table = tx.open_table(table_def).unwrap();
1174 let _ = table.insert_reserve(118821, 360).unwrap();
1175 }
1176 let result = tx.commit();
1177 assert!(result.is_err());
1178
1179 drop(db);
1180 Database::builder()
1181 .set_cache_size(1024 * 1024)
1182 .set_page_size(8 * 1024)
1183 .set_region_size(32 * 4096)
1184 .create(&path)
1185 .unwrap();
1186 }
1187
1188 #[test]
1189 fn transient_io_error() {
1190 let tmpfile = crate::create_tempfile();
1191 let (file, path) = tmpfile.into_parts();
1192
1193 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1194 let countdown = backend.countdown.clone();
1195 let db = Database::builder()
1196 .set_cache_size(0)
1197 .create_with_backend(backend)
1198 .unwrap();
1199
1200 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1201
1202 let tx = db.begin_write().unwrap();
1204 {
1205 let mut table = tx.open_table(table_def).unwrap();
1206 table.insert(0, 0).unwrap();
1207 }
1208 tx.commit().unwrap();
1209 let tx = db.begin_write().unwrap();
1210 {
1211 let mut table = tx.open_table(table_def).unwrap();
1212 table.insert(0, 1).unwrap();
1213 }
1214 tx.commit().unwrap();
1215
1216 let tx = db.begin_write().unwrap();
1217 countdown.store(0, Ordering::SeqCst);
1219 let result = tx.commit().err().unwrap();
1220 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1221 let result = db.begin_write().err().unwrap();
1222 assert!(matches!(
1223 result,
1224 TransactionError::Storage(StorageError::PreviousIo)
1225 ));
1226 countdown.store(u64::MAX, Ordering::SeqCst);
1228 drop(db);
1229
1230 let mut file = File::open(&path).unwrap();
1232 file.seek(SeekFrom::Start(9)).unwrap();
1233 let mut god_byte = vec![0u8];
1234 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1235 assert_ne!(god_byte[0] & 2, 0);
1236 }
1237
1238 #[test]
1239 fn small_pages() {
1240 let tmpfile = crate::create_tempfile();
1241
1242 let db = Database::builder()
1243 .set_page_size(512)
1244 .create(tmpfile.path())
1245 .unwrap();
1246
1247 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1248 let txn = db.begin_write().unwrap();
1249 {
1250 txn.open_table(table_definition).unwrap();
1251 }
1252 txn.commit().unwrap();
1253 }
1254
1255 #[test]
1256 fn small_pages2() {
1257 let tmpfile = crate::create_tempfile();
1258
1259 let db = Database::builder()
1260 .set_page_size(512)
1261 .create(tmpfile.path())
1262 .unwrap();
1263
1264 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1265
1266 let mut tx = db.begin_write().unwrap();
1267 tx.set_two_phase_commit(true);
1268 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1269 {
1270 tx.open_table(table_def).unwrap();
1271 }
1272 tx.commit().unwrap();
1273
1274 let mut tx = db.begin_write().unwrap();
1275 tx.set_two_phase_commit(true);
1276 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1277 tx.restore_savepoint(&savepoint0).unwrap();
1278 tx.set_durability(Durability::None);
1279 {
1280 let mut t = tx.open_table(table_def).unwrap();
1281 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1282 assert!(t.remove(&291295).unwrap().is_none());
1283 }
1284 tx.commit().unwrap();
1285
1286 let mut tx = db.begin_write().unwrap();
1287 tx.set_two_phase_commit(true);
1288 tx.restore_savepoint(&savepoint0).unwrap();
1289 {
1290 tx.open_table(table_def).unwrap();
1291 }
1292 tx.commit().unwrap();
1293
1294 let mut tx = db.begin_write().unwrap();
1295 tx.set_two_phase_commit(true);
1296 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1297 drop(savepoint0);
1298 tx.restore_savepoint(&savepoint2).unwrap();
1299 {
1300 let mut t = tx.open_table(table_def).unwrap();
1301 assert!(t.get(&2059).unwrap().is_none());
1302 assert!(t.remove(&145227).unwrap().is_none());
1303 assert!(t.remove(&145227).unwrap().is_none());
1304 }
1305 tx.commit().unwrap();
1306
1307 let mut tx = db.begin_write().unwrap();
1308 tx.set_two_phase_commit(true);
1309 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1310 drop(savepoint1);
1311 tx.restore_savepoint(&savepoint3).unwrap();
1312 {
1313 tx.open_table(table_def).unwrap();
1314 }
1315 tx.commit().unwrap();
1316
1317 let mut tx = db.begin_write().unwrap();
1318 tx.set_two_phase_commit(true);
1319 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1320 drop(savepoint2);
1321 tx.restore_savepoint(&savepoint3).unwrap();
1322 tx.set_durability(Durability::None);
1323 {
1324 let mut t = tx.open_table(table_def).unwrap();
1325 assert!(t.remove(&207936).unwrap().is_none());
1326 }
1327 tx.abort().unwrap();
1328
1329 let mut tx = db.begin_write().unwrap();
1330 tx.set_two_phase_commit(true);
1331 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1332 drop(savepoint3);
1333 assert!(tx.restore_savepoint(&savepoint4).is_err());
1334 {
1335 tx.open_table(table_def).unwrap();
1336 }
1337 tx.commit().unwrap();
1338
1339 let mut tx = db.begin_write().unwrap();
1340 tx.set_two_phase_commit(true);
1341 tx.restore_savepoint(&savepoint5).unwrap();
1342 tx.set_durability(Durability::None);
1343 {
1344 tx.open_table(table_def).unwrap();
1345 }
1346 tx.commit().unwrap();
1347 }
1348
1349 #[test]
1350 fn small_pages3() {
1351 let tmpfile = crate::create_tempfile();
1352
1353 let db = Database::builder()
1354 .set_page_size(1024)
1355 .create(tmpfile.path())
1356 .unwrap();
1357
1358 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1359
1360 let mut tx = db.begin_write().unwrap();
1361 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1362 tx.set_durability(Durability::None);
1363 {
1364 let mut t = tx.open_table(table_def).unwrap();
1365 let value = vec![0; 306];
1366 t.insert(&539717, value.as_slice()).unwrap();
1367 }
1368 tx.abort().unwrap();
1369
1370 let mut tx = db.begin_write().unwrap();
1371 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1372 tx.restore_savepoint(&savepoint1).unwrap();
1373 tx.set_durability(Durability::None);
1374 {
1375 let mut t = tx.open_table(table_def).unwrap();
1376 let value = vec![0; 2008];
1377 t.insert(&784384, value.as_slice()).unwrap();
1378 }
1379 tx.abort().unwrap();
1380 }
1381
1382 #[test]
1383 fn small_pages4() {
1384 let tmpfile = crate::create_tempfile();
1385
1386 let db = Database::builder()
1387 .set_cache_size(1024 * 1024)
1388 .set_page_size(1024)
1389 .create(tmpfile.path())
1390 .unwrap();
1391
1392 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1393
1394 let tx = db.begin_write().unwrap();
1395 {
1396 tx.open_table(table_def).unwrap();
1397 }
1398 tx.commit().unwrap();
1399
1400 let tx = db.begin_write().unwrap();
1401 {
1402 let mut t = tx.open_table(table_def).unwrap();
1403 assert!(t.get(&131072).unwrap().is_none());
1404 let value = vec![0xFF; 1130];
1405 t.insert(&42394, value.as_slice()).unwrap();
1406 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1407 assert!(t.get(&0).unwrap().is_none());
1408 }
1409 tx.abort().unwrap();
1410
1411 let tx = db.begin_write().unwrap();
1412 {
1413 let mut t = tx.open_table(table_def).unwrap();
1414 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1415 }
1416 tx.abort().unwrap();
1417 }
1418
1419 #[test]
1420 fn dynamic_shrink() {
1421 let tmpfile = crate::create_tempfile();
1422 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1423 let big_value = vec![0u8; 1024];
1424
1425 let db = Database::builder()
1426 .set_region_size(1024 * 1024)
1427 .create(tmpfile.path())
1428 .unwrap();
1429
1430 let txn = db.begin_write().unwrap();
1431 {
1432 let mut table = txn.open_table(table_definition).unwrap();
1433 for i in 0..2048 {
1434 table.insert(&i, big_value.as_slice()).unwrap();
1435 }
1436 }
1437 txn.commit().unwrap();
1438
1439 let file_size = tmpfile.as_file().metadata().unwrap().len();
1440
1441 let txn = db.begin_write().unwrap();
1442 {
1443 let mut table = txn.open_table(table_definition).unwrap();
1444 for i in 0..2048 {
1445 table.remove(&i).unwrap();
1446 }
1447 }
1448 txn.commit().unwrap();
1449
1450 let txn = db.begin_write().unwrap();
1452 {
1453 let mut table = txn.open_table(table_definition).unwrap();
1454 table.insert(0, [].as_slice()).unwrap();
1455 }
1456 txn.commit().unwrap();
1457 let txn = db.begin_write().unwrap();
1458 {
1459 let mut table = txn.open_table(table_definition).unwrap();
1460 table.remove(0).unwrap();
1461 }
1462 txn.commit().unwrap();
1463 let txn = db.begin_write().unwrap();
1464 txn.commit().unwrap();
1465
1466 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1467 assert!(final_file_size < file_size);
1468 }
1469
1470 #[test]
1471 fn create_new_db_in_empty_file() {
1472 let tmpfile = crate::create_tempfile();
1473
1474 let _db = Database::builder()
1475 .create_file(tmpfile.into_file())
1476 .unwrap();
1477 }
1478
1479 #[test]
1480 fn open_missing_file() {
1481 let tmpfile = crate::create_tempfile();
1482
1483 let err = Database::builder()
1484 .open(tmpfile.path().with_extension("missing"))
1485 .unwrap_err();
1486
1487 match err {
1488 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1489 err => panic!("Unexpected error for empty file: {err}"),
1490 }
1491 }
1492
1493 #[test]
1494 fn open_empty_file() {
1495 let tmpfile = crate::create_tempfile();
1496
1497 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1498
1499 match err {
1500 DatabaseError::Storage(StorageError::Io(err))
1501 if err.kind() == ErrorKind::InvalidData => {}
1502 err => panic!("Unexpected error for empty file: {err}"),
1503 }
1504 }
1505}