1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9 InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber,
10 SerializedSavepoint, TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14 AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15 ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16 TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17 UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28use std::ops::RangeFull;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35 SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37 SystemTableDefinition::new("persistent_savepoints");
38pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
41pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
42
43#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
44pub(crate) enum AllocatorStateKey {
45 Region(u32),
46 RegionTracker,
47 TransactionId,
48}
49
50impl Value for AllocatorStateKey {
51 type SelfType<'a> = Self;
52 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
53
54 fn fixed_width() -> Option<usize> {
55 Some(1 + size_of::<u32>())
56 }
57
58 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
59 where
60 Self: 'a,
61 {
62 match data[0] {
63 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
64 1 => Self::RegionTracker,
65 2 => Self::TransactionId,
66 _ => unreachable!(),
67 }
68 }
69
70 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
71 where
72 Self: 'a,
73 Self: 'b,
74 {
75 let mut result = Self::AsBytes::default();
76 match value {
77 Self::Region(region) => {
78 result[0] = 0;
79 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
80 }
81 Self::RegionTracker => {
82 result[0] = 1;
83 }
84 Self::TransactionId => {
85 result[0] = 2;
86 }
87 }
88
89 result
90 }
91
92 fn type_name() -> TypeName {
93 TypeName::internal("redb::AllocatorStateKey")
94 }
95}
96
97impl Key for AllocatorStateKey {
98 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
99 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
100 }
101}
102
103pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
104 name: &'a str,
105 _key_type: PhantomData<K>,
106 _value_type: PhantomData<V>,
107}
108
109impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
110 pub const fn new(name: &'a str) -> Self {
111 assert!(!name.is_empty());
112 Self {
113 name,
114 _key_type: PhantomData,
115 _value_type: PhantomData,
116 }
117 }
118}
119
120impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
121 fn name(&self) -> &str {
122 self.name
123 }
124}
125
126impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
127
128impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
129 fn clone(&self) -> Self {
130 *self
131 }
132}
133
134impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
135
136impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 write!(
139 f,
140 "{}<{}, {}>",
141 self.name,
142 K::type_name().name(),
143 V::type_name().name()
144 )
145 }
146}
147
148#[derive(Debug)]
150pub struct DatabaseStats {
151 pub(crate) tree_height: u32,
152 pub(crate) allocated_pages: u64,
153 pub(crate) leaf_pages: u64,
154 pub(crate) branch_pages: u64,
155 pub(crate) stored_leaf_bytes: u64,
156 pub(crate) metadata_bytes: u64,
157 pub(crate) fragmented_bytes: u64,
158 pub(crate) page_size: usize,
159}
160
161impl DatabaseStats {
162 pub fn tree_height(&self) -> u32 {
164 self.tree_height
165 }
166
167 pub fn allocated_pages(&self) -> u64 {
169 self.allocated_pages
170 }
171
172 pub fn leaf_pages(&self) -> u64 {
174 self.leaf_pages
175 }
176
177 pub fn branch_pages(&self) -> u64 {
179 self.branch_pages
180 }
181
182 pub fn stored_bytes(&self) -> u64 {
185 self.stored_leaf_bytes
186 }
187
188 pub fn metadata_bytes(&self) -> u64 {
190 self.metadata_bytes
191 }
192
193 pub fn fragmented_bytes(&self) -> u64 {
195 self.fragmented_bytes
196 }
197
198 pub fn page_size(&self) -> usize {
200 self.page_size
201 }
202}
203
204#[derive(Copy, Clone, Debug)]
205#[non_exhaustive]
206pub enum Durability {
207 None,
213 Eventual,
216 Immediate,
219 #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
222 Paranoid,
223}
224
225#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228enum InternalDurability {
229 None,
230 Eventual,
231 Immediate,
232}
233
234pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
236 name: String,
237 namespace: &'s mut SystemNamespace<'db>,
238 tree: BtreeMut<'s, K, V>,
239 transaction_guard: Arc<TransactionGuard>,
240}
241
242impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
243 fn new(
244 name: &str,
245 table_root: Option<BtreeHeader>,
246 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
247 guard: Arc<TransactionGuard>,
248 mem: Arc<TransactionalMemory>,
249 namespace: &'s mut SystemNamespace<'db>,
250 ) -> SystemTable<'db, 's, K, V> {
251 SystemTable {
252 name: name.to_string(),
253 namespace,
254 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
255 transaction_guard: guard,
256 }
257 }
258
259 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
260 where
261 K: 'a,
262 {
263 self.tree.get(key.borrow())
264 }
265
266 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
267 where
268 K: 'a,
269 KR: Borrow<K::SelfType<'a>> + 'a,
270 {
271 self.tree
272 .range(&range)
273 .map(|x| Range::new(x, self.transaction_guard.clone()))
274 }
275
276 pub fn insert<'k, 'v>(
277 &mut self,
278 key: impl Borrow<K::SelfType<'k>>,
279 value: impl Borrow<V::SelfType<'v>>,
280 ) -> Result<Option<AccessGuard<V>>> {
281 let value_len = V::as_bytes(value.borrow()).as_ref().len();
282 if value_len > MAX_VALUE_LENGTH {
283 return Err(StorageError::ValueTooLarge(value_len));
284 }
285 let key_len = K::as_bytes(key.borrow()).as_ref().len();
286 if key_len > MAX_VALUE_LENGTH {
287 return Err(StorageError::ValueTooLarge(key_len));
288 }
289 if value_len + key_len > MAX_PAIR_LENGTH {
290 return Err(StorageError::ValueTooLarge(value_len + key_len));
291 }
292 self.tree.insert(key.borrow(), value.borrow())
293 }
294
295 pub fn remove<'a>(
296 &mut self,
297 key: impl Borrow<K::SelfType<'a>>,
298 ) -> Result<Option<AccessGuard<V>>>
299 where
300 K: 'a,
301 {
302 self.tree.remove(key.borrow())
303 }
304}
305
306impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
307 fn drop(&mut self) {
308 self.namespace.close_table(
309 &self.name,
310 &self.tree,
311 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
312 );
313 }
314}
315
316struct SystemNamespace<'db> {
317 table_tree: TableTreeMut<'db>,
318 transaction_guard: Arc<TransactionGuard>,
319}
320
321impl<'db> SystemNamespace<'db> {
322 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
323 &'s mut self,
324 transaction: &'txn WriteTransaction,
325 definition: SystemTableDefinition<K, V>,
326 ) -> Result<SystemTable<'db, 's, K, V>> {
327 #[cfg(feature = "logging")]
328 debug!("Opening system table: {}", definition);
329 let (root, _) = self
330 .table_tree
331 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
332 .map_err(|e| {
333 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
334 })?;
335 transaction.dirty.store(true, Ordering::Release);
336
337 Ok(SystemTable::new(
338 definition.name(),
339 root,
340 transaction.freed_pages.clone(),
341 self.transaction_guard.clone(),
342 transaction.mem.clone(),
343 self,
344 ))
345 }
346
347 fn close_table<K: Key + 'static, V: Value + 'static>(
348 &mut self,
349 name: &str,
350 table: &BtreeMut<K, V>,
351 length: u64,
352 ) {
353 self.table_tree
354 .stage_update_table_root(name, table.get_root(), length);
355 }
356}
357
358struct TableNamespace<'db> {
359 open_tables: HashMap<String, &'static panic::Location<'static>>,
360 table_tree: TableTreeMut<'db>,
361}
362
363impl TableNamespace<'_> {
364 #[track_caller]
365 fn inner_open<K: Key + 'static, V: Value + 'static>(
366 &mut self,
367 name: &str,
368 table_type: TableType,
369 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
370 if let Some(location) = self.open_tables.get(name) {
371 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372 }
373
374 let root = self
375 .table_tree
376 .get_or_create_table::<K, V>(name, table_type)?;
377 self.open_tables
378 .insert(name.to_string(), panic::Location::caller());
379
380 Ok(root)
381 }
382
383 #[track_caller]
384 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
385 &mut self,
386 transaction: &'txn WriteTransaction,
387 definition: MultimapTableDefinition<K, V>,
388 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
389 #[cfg(feature = "logging")]
390 debug!("Opening multimap table: {}", definition);
391 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
392 transaction.dirty.store(true, Ordering::Release);
393
394 Ok(MultimapTable::new(
395 definition.name(),
396 root,
397 length,
398 transaction.freed_pages.clone(),
399 transaction.mem.clone(),
400 transaction,
401 ))
402 }
403
404 #[track_caller]
405 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
406 &mut self,
407 transaction: &'txn WriteTransaction,
408 definition: TableDefinition<K, V>,
409 ) -> Result<Table<'txn, K, V>, TableError> {
410 #[cfg(feature = "logging")]
411 debug!("Opening table: {}", definition);
412 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
413 transaction.dirty.store(true, Ordering::Release);
414
415 Ok(Table::new(
416 definition.name(),
417 root,
418 transaction.freed_pages.clone(),
419 transaction.mem.clone(),
420 transaction,
421 ))
422 }
423
424 #[track_caller]
425 fn inner_rename(
426 &mut self,
427 name: &str,
428 new_name: &str,
429 table_type: TableType,
430 ) -> Result<(), TableError> {
431 if let Some(location) = self.open_tables.get(name) {
432 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
433 }
434
435 self.table_tree.rename_table(name, new_name, table_type)
436 }
437
438 #[track_caller]
439 fn rename_table(
440 &mut self,
441 transaction: &WriteTransaction,
442 name: &str,
443 new_name: &str,
444 ) -> Result<(), TableError> {
445 #[cfg(feature = "logging")]
446 debug!("Renaming table: {} to {}", name, new_name);
447 transaction.dirty.store(true, Ordering::Release);
448 self.inner_rename(name, new_name, TableType::Normal)
449 }
450
451 #[track_caller]
452 fn rename_multimap_table(
453 &mut self,
454 transaction: &WriteTransaction,
455 name: &str,
456 new_name: &str,
457 ) -> Result<(), TableError> {
458 #[cfg(feature = "logging")]
459 debug!("Renaming multimap table: {} to {}", name, new_name);
460 transaction.dirty.store(true, Ordering::Release);
461 self.inner_rename(name, new_name, TableType::Multimap)
462 }
463
464 #[track_caller]
465 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
466 if let Some(location) = self.open_tables.get(name) {
467 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
468 }
469
470 self.table_tree.delete_table(name, table_type)
471 }
472
473 #[track_caller]
474 fn delete_table(
475 &mut self,
476 transaction: &WriteTransaction,
477 name: &str,
478 ) -> Result<bool, TableError> {
479 #[cfg(feature = "logging")]
480 debug!("Deleting table: {}", name);
481 transaction.dirty.store(true, Ordering::Release);
482 self.inner_delete(name, TableType::Normal)
483 }
484
485 #[track_caller]
486 fn delete_multimap_table(
487 &mut self,
488 transaction: &WriteTransaction,
489 name: &str,
490 ) -> Result<bool, TableError> {
491 #[cfg(feature = "logging")]
492 debug!("Deleting multimap table: {}", name);
493 transaction.dirty.store(true, Ordering::Release);
494 self.inner_delete(name, TableType::Multimap)
495 }
496
497 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
498 &mut self,
499 name: &str,
500 table: &BtreeMut<K, V>,
501 length: u64,
502 ) {
503 self.open_tables.remove(name).unwrap();
504 self.table_tree
505 .stage_update_table_root(name, table.get_root(), length);
506 }
507}
508
509pub struct WriteTransaction {
513 transaction_tracker: Arc<TransactionTracker>,
514 mem: Arc<TransactionalMemory>,
515 transaction_guard: Arc<TransactionGuard>,
516 transaction_id: TransactionId,
517 freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
520 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
521 post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
524 tables: Mutex<TableNamespace<'static>>,
525 system_tables: Mutex<SystemNamespace<'static>>,
526 completed: bool,
527 dirty: AtomicBool,
528 durability: InternalDurability,
529 two_phase_commit: bool,
530 quick_repair: bool,
531 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
533 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
534}
535
536impl WriteTransaction {
537 pub(crate) fn new(
538 guard: TransactionGuard,
539 transaction_tracker: Arc<TransactionTracker>,
540 mem: Arc<TransactionalMemory>,
541 ) -> Result<Self> {
542 let transaction_id = guard.id();
543 let guard = Arc::new(guard);
544
545 let root_page = mem.get_data_root();
546 let system_page = mem.get_system_root();
547 let freed_root = mem.get_freed_root();
548 let freed_pages = Arc::new(Mutex::new(vec![]));
549 let post_commit_frees = Arc::new(Mutex::new(vec![]));
550
551 let tables = TableNamespace {
552 open_tables: Default::default(),
553 table_tree: TableTreeMut::new(
554 root_page,
555 guard.clone(),
556 mem.clone(),
557 freed_pages.clone(),
558 ),
559 };
560 let system_tables = SystemNamespace {
561 table_tree: TableTreeMut::new(
562 system_page,
563 guard.clone(),
564 mem.clone(),
565 freed_pages.clone(),
566 ),
567 transaction_guard: guard.clone(),
568 };
569
570 Ok(Self {
571 transaction_tracker,
572 mem: mem.clone(),
573 transaction_guard: guard.clone(),
574 transaction_id,
575 tables: Mutex::new(tables),
576 system_tables: Mutex::new(system_tables),
577 freed_tree: Mutex::new(BtreeMut::new(
578 freed_root,
579 guard,
580 mem,
581 post_commit_frees.clone(),
582 )),
583 freed_pages,
584 post_commit_frees,
585 completed: false,
586 dirty: AtomicBool::new(false),
587 durability: InternalDurability::Immediate,
588 two_phase_commit: false,
589 quick_repair: false,
590 created_persistent_savepoints: Mutex::new(Default::default()),
591 deleted_persistent_savepoints: Mutex::new(vec![]),
592 })
593 }
594
595 #[cfg(any(test, fuzzing))]
596 pub fn print_allocated_page_debug(&self) {
597 let mut all_allocated: HashSet<PageNumber> =
598 HashSet::from_iter(self.mem.all_allocated_pages());
599
600 let tracker = self.mem.tracker_page();
601 all_allocated.remove(&tracker);
602 println!("Tracker page");
603 println!("{tracker:?}");
604
605 let mut table_pages = vec![];
606 self.tables
607 .lock()
608 .unwrap()
609 .table_tree
610 .visit_all_pages(|path| {
611 table_pages.push(path.page_number());
612 Ok(())
613 })
614 .unwrap();
615 println!("Tables");
616 for p in table_pages {
617 all_allocated.remove(&p);
618 println!("{p:?}");
619 }
620
621 let mut system_table_pages = vec![];
622 self.system_tables
623 .lock()
624 .unwrap()
625 .table_tree
626 .visit_all_pages(|path| {
627 system_table_pages.push(path.page_number());
628 Ok(())
629 })
630 .unwrap();
631 println!("System tables");
632 for p in system_table_pages {
633 all_allocated.remove(&p);
634 println!("{p:?}");
635 }
636
637 println!("Free table");
638 if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
639 for p in freed_iter {
640 let p = p.unwrap();
641 all_allocated.remove(&p);
642 println!("{p:?}");
643 }
644 }
645 println!("Pending free (i.e. in freed table)");
646 for entry in self
647 .freed_tree
648 .lock()
649 .unwrap()
650 .range::<RangeFull, FreedTableKey>(&(..))
651 .unwrap()
652 {
653 let entry = entry.unwrap();
654 let value = entry.value();
655 for i in 0..value.len() {
656 let p = value.get(i);
657 all_allocated.remove(&p);
658 println!("{p:?}");
659 }
660 }
661 {
662 let pages = self.freed_pages.lock().unwrap();
663 if !pages.is_empty() {
664 println!("Pages in in-memory freed_pages");
665 for p in pages.iter() {
666 println!("{p:?}");
667 all_allocated.remove(p);
668 }
669 }
670 }
671 {
672 let pages = self.post_commit_frees.lock().unwrap();
673 if !pages.is_empty() {
674 println!("Pages in in-memory post_commit_frees");
675 for p in pages.iter() {
676 println!("{p:?}");
677 all_allocated.remove(p);
678 }
679 }
680 }
681 if !all_allocated.is_empty() {
682 println!("Leaked pages");
683 for p in all_allocated {
684 println!("{p:?}");
685 }
686 }
687 }
688
689 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
698 if self.durability != InternalDurability::Immediate {
699 return Err(SavepointError::InvalidSavepoint);
700 }
701
702 let mut savepoint = self.ephemeral_savepoint()?;
703
704 let mut system_tables = self.system_tables.lock().unwrap();
705
706 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
707 next_table.insert((), savepoint.get_id().next())?;
708 drop(next_table);
709
710 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
711 savepoint_table.insert(
712 savepoint.get_id(),
713 SerializedSavepoint::from_savepoint(&savepoint),
714 )?;
715
716 savepoint.set_persistent();
717
718 self.created_persistent_savepoints
719 .lock()
720 .unwrap()
721 .insert(savepoint.get_id());
722
723 Ok(savepoint.get_id().0)
724 }
725
726 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
727 self.transaction_guard.clone()
728 }
729
730 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
731 let mut system_tables = self.system_tables.lock().unwrap();
732 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
733 let value = next_table.get(())?;
734 if let Some(next_id) = value {
735 Ok(Some(next_id.value()))
736 } else {
737 Ok(None)
738 }
739 }
740
741 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
743 let mut system_tables = self.system_tables.lock().unwrap();
744 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
745 let value = table.get(SavepointId(id))?;
746
747 value
748 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
749 .ok_or(SavepointError::InvalidSavepoint)
750 }
751
752 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
759 if self.durability != InternalDurability::Immediate {
760 return Err(SavepointError::InvalidSavepoint);
761 }
762 let mut system_tables = self.system_tables.lock().unwrap();
763 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
764 let savepoint = table.remove(SavepointId(id))?;
765 if let Some(serialized) = savepoint {
766 let savepoint = serialized
767 .value()
768 .to_savepoint(self.transaction_tracker.clone());
769 self.deleted_persistent_savepoints
770 .lock()
771 .unwrap()
772 .push((savepoint.get_id(), savepoint.get_transaction_id()));
773 Ok(true)
774 } else {
775 Ok(false)
776 }
777 }
778
779 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
781 let mut system_tables = self.system_tables.lock().unwrap();
782 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
783 let mut savepoints = vec![];
784 for savepoint in table.range::<SavepointId>(..)? {
785 savepoints.push(savepoint?.0.value().0);
786 }
787 Ok(savepoints.into_iter())
788 }
789
790 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
792 let id = self
793 .transaction_tracker
794 .register_read_transaction(&self.mem)?;
795
796 Ok(TransactionGuard::new_read(
797 id,
798 self.transaction_tracker.clone(),
799 ))
800 }
801
802 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
803 let id = self.transaction_tracker.allocate_savepoint();
804 Ok((id, self.allocate_read_transaction()?.leak()))
805 }
806
807 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
813 if self.dirty.load(Ordering::Acquire) {
814 return Err(SavepointError::InvalidSavepoint);
815 }
816
817 let (id, transaction_id) = self.allocate_savepoint()?;
818 #[cfg(feature = "logging")]
819 debug!(
820 "Creating savepoint id={:?}, txn_id={:?}",
821 id, transaction_id
822 );
823
824 let regional_allocators = self.mem.get_raw_allocator_states();
825 let root = self.mem.get_data_root();
826 let system_root = self.mem.get_system_root();
827 let freed_root = self.mem.get_freed_root();
828 let savepoint = Savepoint::new_ephemeral(
829 &self.mem,
830 self.transaction_tracker.clone(),
831 id,
832 transaction_id,
833 root,
834 system_root,
835 freed_root,
836 regional_allocators,
837 );
838
839 Ok(savepoint)
840 }
841
842 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
846 assert_eq!(
848 std::ptr::from_ref(self.transaction_tracker.as_ref()),
849 savepoint.db_address()
850 );
851
852 if !self
853 .transaction_tracker
854 .is_valid_savepoint(savepoint.get_id())
855 {
856 return Err(SavepointError::InvalidSavepoint);
857 }
858 #[cfg(feature = "logging")]
859 debug!(
860 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
861 savepoint.get_id(),
862 self.transaction_id
863 );
864 assert_eq!(self.mem.get_version(), savepoint.get_version());
867 self.dirty.store(true, Ordering::Release);
868
869 let old_system_tree = TableTree::new(
878 savepoint.get_system_root(),
879 PageHint::None,
880 self.transaction_guard.clone(),
881 self.mem.clone(),
882 )?;
883 let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
884 savepoint.get_freed_root(),
885 PageHint::None,
886 self.transaction_guard.clone(),
887 self.mem.clone(),
888 )?;
889
890 let mut old_system_and_freed_pages = HashSet::new();
894 old_system_tree.visit_all_pages(|path| {
895 old_system_and_freed_pages.insert(path.page_number());
896 Ok(())
897 })?;
898 old_freed_tree.visit_all_pages(|path| {
899 old_system_and_freed_pages.insert(path.page_number());
900 Ok(())
901 })?;
902
903 {
905 self.tables.lock().unwrap().table_tree = TableTreeMut::new(
906 savepoint.get_user_root(),
907 self.transaction_guard.clone(),
908 self.mem.clone(),
909 self.freed_pages.clone(),
910 );
911 }
912
913 let mut txn_id = savepoint.get_transaction_id().next().raw_id();
922 let mut freed_tree = self.freed_tree.lock().unwrap();
923 loop {
924 let lower = FreedTableKey {
925 transaction_id: txn_id,
926 pagination_id: 0,
927 };
928
929 if freed_tree.range(&(lower..))?.next().is_none() {
930 break;
931 }
932 let lower = FreedTableKey {
933 transaction_id: txn_id,
934 pagination_id: 0,
935 };
936 let upper = FreedTableKey {
937 transaction_id: txn_id + 1,
938 pagination_id: 0,
939 };
940
941 let mut pending_pages = vec![];
943 for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
944 let item = entry?;
945 for i in 0..item.value().len() {
946 let p = item.value().get(i);
947 if old_system_and_freed_pages.contains(&p) {
949 pending_pages.push(p);
950 }
951 }
952 }
953
954 let mut pagination_counter = 0u64;
955 while !pending_pages.is_empty() {
956 let chunk_size = 100;
957 let buffer_size = FreedPageList::required_bytes(chunk_size);
958 let key = FreedTableKey {
959 transaction_id: txn_id,
960 pagination_id: pagination_counter,
961 };
962 let mut access_guard =
963 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
964
965 let len = pending_pages.len();
966 access_guard.as_mut().clear();
967 for page in pending_pages.drain(len - min(len, chunk_size)..) {
968 access_guard.as_mut().push_back(page);
969 }
970 drop(access_guard);
971
972 pagination_counter += 1;
973 }
974
975 txn_id += 1;
976 }
977
978 let mut current_system_and_freed_pages = HashSet::new();
979 self.system_tables
980 .lock()
981 .unwrap()
982 .table_tree
983 .visit_all_pages(|path| {
984 current_system_and_freed_pages.insert(path.page_number());
985 Ok(())
986 })?;
987 freed_tree.visit_all_pages(|path| {
988 current_system_and_freed_pages.insert(path.page_number());
989 Ok(())
990 })?;
991
992 let mut old_allocators: Vec<BuddyAllocator> = savepoint
993 .get_regional_allocators()
994 .iter()
995 .map(|data| BuddyAllocator::from_savepoint_state(data))
996 .collect();
997
998 {
1000 let oldest_unprocessed_transaction =
1001 if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
1002 entry?.key().transaction_id
1003 } else {
1004 self.transaction_id.raw_id()
1005 };
1006
1007 let lookup_key = FreedTableKey {
1008 transaction_id: oldest_unprocessed_transaction,
1009 pagination_id: 0,
1010 };
1011
1012 for entry in old_freed_tree.range(&(..lookup_key))? {
1015 let item = entry?;
1016 let pages: FreedPageList = item.value();
1017 for i in 0..pages.len() {
1018 let page = pages.get(i);
1019 assert!(
1020 old_allocators[page.region as usize]
1021 .is_allocated(page.page_index, page.page_order)
1022 );
1023 old_allocators[page.region as usize].free(page.page_index, page.page_order);
1024 }
1025 }
1026 }
1027
1028 let mut freed_pages = self.freed_pages.lock().unwrap();
1030 let mut already_awaiting_free: HashSet<PageNumber> = freed_pages.iter().copied().collect();
1031 already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
1032 let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
1033 for page in to_free {
1034 if already_awaiting_free.contains(&page) {
1035 continue;
1037 }
1038 if current_system_and_freed_pages.contains(&page) {
1039 continue;
1043 }
1044 if self.mem.uncommitted(page) {
1045 self.mem.free(page);
1046 } else {
1047 freed_pages.push(page);
1048 }
1049 }
1050 drop(freed_pages);
1051
1052 self.transaction_tracker
1055 .invalidate_savepoints_after(savepoint.get_id());
1056 for persistent_savepoint in self.list_persistent_savepoints()? {
1057 if persistent_savepoint > savepoint.get_id().0 {
1058 self.delete_persistent_savepoint(persistent_savepoint)?;
1059 }
1060 }
1061
1062 Ok(())
1063 }
1064
1065 pub fn set_durability(&mut self, durability: Durability) {
1070 let no_created = self
1071 .created_persistent_savepoints
1072 .lock()
1073 .unwrap()
1074 .is_empty();
1075 let no_deleted = self
1076 .deleted_persistent_savepoints
1077 .lock()
1078 .unwrap()
1079 .is_empty();
1080 assert!(no_created && no_deleted);
1081
1082 self.durability = match durability {
1083 Durability::None => InternalDurability::None,
1084 Durability::Eventual => InternalDurability::Eventual,
1085 Durability::Immediate => InternalDurability::Immediate,
1086 #[allow(deprecated)]
1087 Durability::Paranoid => {
1088 self.set_two_phase_commit(true);
1089 InternalDurability::Immediate
1090 }
1091 };
1092 }
1093
1094 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1134 self.two_phase_commit = enabled;
1135 }
1136
1137 pub fn set_quick_repair(&mut self, enabled: bool) {
1148 self.quick_repair = enabled;
1149 }
1150
1151 #[track_caller]
1155 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1156 &'txn self,
1157 definition: TableDefinition<K, V>,
1158 ) -> Result<Table<'txn, K, V>, TableError> {
1159 self.tables.lock().unwrap().open_table(self, definition)
1160 }
1161
1162 #[track_caller]
1166 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1167 &'txn self,
1168 definition: MultimapTableDefinition<K, V>,
1169 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1170 self.tables
1171 .lock()
1172 .unwrap()
1173 .open_multimap_table(self, definition)
1174 }
1175
1176 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1177 &self,
1178 name: &str,
1179 table: &BtreeMut<K, V>,
1180 length: u64,
1181 ) {
1182 self.tables.lock().unwrap().close_table(name, table, length);
1183 }
1184
1185 pub fn rename_table(
1187 &self,
1188 definition: impl TableHandle,
1189 new_name: impl TableHandle,
1190 ) -> Result<(), TableError> {
1191 let name = definition.name().to_string();
1192 drop(definition);
1194 self.tables
1195 .lock()
1196 .unwrap()
1197 .rename_table(self, &name, new_name.name())
1198 }
1199
1200 pub fn rename_multimap_table(
1202 &self,
1203 definition: impl MultimapTableHandle,
1204 new_name: impl MultimapTableHandle,
1205 ) -> Result<(), TableError> {
1206 let name = definition.name().to_string();
1207 drop(definition);
1209 self.tables
1210 .lock()
1211 .unwrap()
1212 .rename_multimap_table(self, &name, new_name.name())
1213 }
1214
1215 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1219 let name = definition.name().to_string();
1220 drop(definition);
1222 self.tables.lock().unwrap().delete_table(self, &name)
1223 }
1224
1225 pub fn delete_multimap_table(
1229 &self,
1230 definition: impl MultimapTableHandle,
1231 ) -> Result<bool, TableError> {
1232 let name = definition.name().to_string();
1233 drop(definition);
1235 self.tables
1236 .lock()
1237 .unwrap()
1238 .delete_multimap_table(self, &name)
1239 }
1240
1241 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1243 self.tables
1244 .lock()
1245 .unwrap()
1246 .table_tree
1247 .list_tables(TableType::Normal)
1248 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1249 }
1250
1251 pub fn list_multimap_tables(
1253 &self,
1254 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1255 self.tables
1256 .lock()
1257 .unwrap()
1258 .table_tree
1259 .list_tables(TableType::Multimap)
1260 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1261 }
1262
1263 pub fn commit(mut self) -> Result<(), CommitError> {
1268 self.completed = true;
1270 self.commit_inner()
1271 }
1272
1273 fn commit_inner(&mut self) -> Result<(), CommitError> {
1274 if self.quick_repair {
1276 self.two_phase_commit = true;
1277 }
1278
1279 #[cfg(feature = "logging")]
1280 debug!(
1281 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1282 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1283 );
1284 match self.durability {
1285 InternalDurability::None => self.non_durable_commit()?,
1286 InternalDurability::Eventual => self.durable_commit(true)?,
1287 InternalDurability::Immediate => self.durable_commit(false)?,
1288 }
1289
1290 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1291 self.transaction_tracker
1292 .deallocate_savepoint(*savepoint, *transaction);
1293 }
1294
1295 #[cfg(feature = "logging")]
1296 debug!(
1297 "Finished commit of transaction id={:?}",
1298 self.transaction_id
1299 );
1300
1301 Ok(())
1302 }
1303
1304 pub fn abort(mut self) -> Result {
1308 self.completed = true;
1310 self.abort_inner()
1311 }
1312
1313 fn abort_inner(&mut self) -> Result {
1314 #[cfg(feature = "logging")]
1315 debug!("Aborting transaction id={:?}", self.transaction_id);
1316 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1317 match self.delete_persistent_savepoint(savepoint.0) {
1318 Ok(_) => {}
1319 Err(err) => match err {
1320 SavepointError::InvalidSavepoint => {
1321 unreachable!();
1322 }
1323 SavepointError::Storage(storage_err) => {
1324 return Err(storage_err);
1325 }
1326 },
1327 }
1328 }
1329 self.tables
1330 .lock()
1331 .unwrap()
1332 .table_tree
1333 .clear_table_root_updates();
1334 self.mem.rollback_uncommitted_writes()?;
1335 #[cfg(feature = "logging")]
1336 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1337 Ok(())
1338 }
1339
1340 pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1341 let free_until_transaction = self
1342 .transaction_tracker
1343 .oldest_live_read_transaction()
1344 .map_or(self.transaction_id, |x| x.next());
1345 self.process_freed_pages(free_until_transaction)?;
1346
1347 let user_root = self
1348 .tables
1349 .lock()
1350 .unwrap()
1351 .table_tree
1352 .flush_table_root_updates()?
1353 .finalize_dirty_checksums()?;
1354
1355 let mut system_tables = self.system_tables.lock().unwrap();
1356 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1357 system_tree
1358 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1359 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1360
1361 if self.quick_repair {
1362 system_tree.create_table_and_flush_table_root(
1363 ALLOCATOR_STATE_TABLE_NAME,
1364 |tree: &mut AllocatorStateTree| {
1365 let mut pagination_counter = 0;
1366
1367 loop {
1368 let num_regions = self
1369 .mem
1370 .reserve_allocator_state(tree, self.transaction_id)?;
1371
1372 self.store_freed_pages(&mut pagination_counter, true)?;
1376
1377 if self.mem.try_save_allocator_state(tree, num_regions)? {
1378 return Ok(());
1379 }
1380
1381 while let Some(guards) = tree.last()? {
1386 let key = guards.0.value();
1387 drop(guards);
1388 tree.remove(&key)?;
1389 }
1390 }
1391 },
1392 )?;
1393 } else {
1394 let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1399 self.store_freed_pages(&mut 0, savepoint_exists)?;
1400 }
1401
1402 let system_root = system_tree.finalize_dirty_checksums()?;
1403
1404 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1406
1407 self.mem.commit(
1408 user_root,
1409 system_root,
1410 freed_root,
1411 self.transaction_id,
1412 eventual,
1413 self.two_phase_commit,
1414 )?;
1415
1416 self.transaction_tracker.clear_pending_non_durable_commits();
1418
1419 for page in self.post_commit_frees.lock().unwrap().drain(..) {
1422 self.mem.free(page);
1423 }
1424
1425 Ok(())
1426 }
1427
1428 pub(crate) fn non_durable_commit(&mut self) -> Result {
1430 let user_root = self
1431 .tables
1432 .lock()
1433 .unwrap()
1434 .table_tree
1435 .flush_table_root_updates()?
1436 .finalize_dirty_checksums()?;
1437
1438 let system_root = self
1439 .system_tables
1440 .lock()
1441 .unwrap()
1442 .table_tree
1443 .flush_table_root_updates()?
1444 .finalize_dirty_checksums()?;
1445
1446 self.store_freed_pages(&mut 0, true)?;
1449
1450 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1452
1453 self.mem
1454 .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1455 self.transaction_tracker
1458 .register_non_durable_commit(self.transaction_id);
1459 Ok(())
1460 }
1461
1462 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1465 let mut progress = false;
1466 if self.mem.relocate_region_tracker()? {
1468 progress = true;
1469 }
1470
1471 let mut highest_pages = BTreeMap::new();
1473 let mut tables = self.tables.lock().unwrap();
1474 let table_tree = &mut tables.table_tree;
1475 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1476 let mut system_tables = self.system_tables.lock().unwrap();
1477 let system_table_tree = &mut system_tables.table_tree;
1478 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1479
1480 let mut relocation_map = HashMap::new();
1482 for path in highest_pages.into_values().rev() {
1483 if relocation_map.contains_key(&path.page_number()) {
1484 continue;
1485 }
1486 let old_page = self.mem.get_page(path.page_number())?;
1487 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1488 let new_page_number = new_page.get_page_number();
1489 new_page.memory_mut()[0] = old_page.memory()[0];
1492 drop(new_page);
1493 if new_page_number < path.page_number() {
1495 relocation_map.insert(path.page_number(), new_page_number);
1496 for parent in path.parents() {
1497 if relocation_map.contains_key(parent) {
1498 continue;
1499 }
1500 let old_parent = self.mem.get_page(*parent)?;
1501 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1502 let new_page_number = new_page.get_page_number();
1503 new_page.memory_mut()[0] = old_parent.memory()[0];
1506 drop(new_page);
1507 relocation_map.insert(*parent, new_page_number);
1508 }
1509 } else {
1510 self.mem.free(new_page_number);
1511 break;
1512 }
1513 }
1514
1515 if !relocation_map.is_empty() {
1516 progress = true;
1517 }
1518
1519 table_tree.relocate_tables(&relocation_map)?;
1520 system_table_tree.relocate_tables(&relocation_map)?;
1521
1522 Ok(progress)
1523 }
1524
1525 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1528 assert_eq!(PageNumber::serialized_size(), 8);
1530 let lookup_key = FreedTableKey {
1531 transaction_id: free_until.raw_id(),
1532 pagination_id: 0,
1533 };
1534
1535 let mut to_remove = vec![];
1536 let mut freed_tree = self.freed_tree.lock().unwrap();
1537 for entry in freed_tree.range(&(..lookup_key))? {
1538 let entry = entry?;
1539 to_remove.push(entry.key());
1540 let value = entry.value();
1541 for i in 0..value.len() {
1542 self.mem.free(value.get(i));
1543 }
1544 }
1545
1546 for key in to_remove {
1548 freed_tree.remove(&key)?;
1549 }
1550
1551 Ok(())
1552 }
1553
1554 fn store_freed_pages(
1555 &self,
1556 pagination_counter: &mut u64,
1557 include_post_commit_free: bool,
1558 ) -> Result {
1559 assert_eq!(PageNumber::serialized_size(), 8); let mut freed_tree = self.freed_tree.lock().unwrap();
1562 if include_post_commit_free {
1563 self.freed_pages
1566 .lock()
1567 .unwrap()
1568 .extend(self.post_commit_frees.lock().unwrap().drain(..));
1569 }
1570 while !self.freed_pages.lock().unwrap().is_empty() {
1571 let chunk_size = 100;
1572 let buffer_size = FreedPageList::required_bytes(chunk_size);
1573 let key = FreedTableKey {
1574 transaction_id: self.transaction_id.raw_id(),
1575 pagination_id: *pagination_counter,
1576 };
1577 let mut access_guard =
1578 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1579
1580 let mut freed_pages = self.freed_pages.lock().unwrap();
1581 let len = freed_pages.len();
1582 access_guard.as_mut().clear();
1583 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1584 access_guard.as_mut().push_back(page);
1585 }
1586 drop(access_guard);
1587
1588 *pagination_counter += 1;
1589
1590 if include_post_commit_free {
1591 freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1594 }
1595 }
1596
1597 Ok(())
1598 }
1599
1600 pub fn stats(&self) -> Result<DatabaseStats> {
1602 let tables = self.tables.lock().unwrap();
1603 let table_tree = &tables.table_tree;
1604 let data_tree_stats = table_tree.stats()?;
1605
1606 let system_tables = self.system_tables.lock().unwrap();
1607 let system_table_tree = &system_tables.table_tree;
1608 let system_tree_stats = system_table_tree.stats()?;
1609
1610 let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1611
1612 let total_metadata_bytes = data_tree_stats.metadata_bytes()
1613 + system_tree_stats.metadata_bytes
1614 + system_tree_stats.stored_leaf_bytes
1615 + freed_tree_stats.metadata_bytes
1616 + freed_tree_stats.stored_leaf_bytes;
1617 let total_fragmented = data_tree_stats.fragmented_bytes()
1618 + system_tree_stats.fragmented_bytes
1619 + freed_tree_stats.fragmented_bytes
1620 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
1621
1622 Ok(DatabaseStats {
1623 tree_height: data_tree_stats.tree_height(),
1624 allocated_pages: self.mem.count_allocated_pages()?,
1625 leaf_pages: data_tree_stats.leaf_pages(),
1626 branch_pages: data_tree_stats.branch_pages(),
1627 stored_leaf_bytes: data_tree_stats.stored_bytes(),
1628 metadata_bytes: total_metadata_bytes,
1629 fragmented_bytes: total_fragmented,
1630 page_size: self.mem.get_page_size(),
1631 })
1632 }
1633
1634 #[cfg(any(test, fuzzing))]
1635 pub fn num_region_tracker_pages(&self) -> u64 {
1636 1 << self.mem.tracker_page().page_order
1637 }
1638
1639 #[allow(dead_code)]
1640 pub(crate) fn print_debug(&self) -> Result {
1641 let mut tables = self.tables.lock().unwrap();
1643 if let Some(page) = tables
1644 .table_tree
1645 .flush_table_root_updates()
1646 .unwrap()
1647 .finalize_dirty_checksums()
1648 .unwrap()
1649 {
1650 eprintln!("Master tree:");
1651 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1652 Some(page),
1653 PageHint::None,
1654 self.transaction_guard.clone(),
1655 self.mem.clone(),
1656 )?;
1657 master_tree.print_debug(true)?;
1658 }
1659
1660 Ok(())
1661 }
1662}
1663
1664impl Drop for WriteTransaction {
1665 fn drop(&mut self) {
1666 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1667 #[allow(unused_variables)]
1668 if let Err(error) = self.abort_inner() {
1669 #[cfg(feature = "logging")]
1670 warn!("Failure automatically aborting transaction: {}", error);
1671 }
1672 }
1673 }
1674}
1675
1676pub struct ReadTransaction {
1680 mem: Arc<TransactionalMemory>,
1681 tree: TableTree,
1682}
1683
1684impl ReadTransaction {
1685 pub(crate) fn new(
1686 mem: Arc<TransactionalMemory>,
1687 guard: TransactionGuard,
1688 ) -> Result<Self, TransactionError> {
1689 let root_page = mem.get_data_root();
1690 let guard = Arc::new(guard);
1691 Ok(Self {
1692 mem: mem.clone(),
1693 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1694 .map_err(TransactionError::Storage)?,
1695 })
1696 }
1697
1698 pub fn open_table<K: Key + 'static, V: Value + 'static>(
1700 &self,
1701 definition: TableDefinition<K, V>,
1702 ) -> Result<ReadOnlyTable<K, V>, TableError> {
1703 let header = self
1704 .tree
1705 .get_table::<K, V>(definition.name(), TableType::Normal)?
1706 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1707
1708 match header {
1709 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1710 definition.name().to_string(),
1711 table_root,
1712 PageHint::Clean,
1713 self.tree.transaction_guard().clone(),
1714 self.mem.clone(),
1715 )?),
1716 InternalTableDefinition::Multimap { .. } => unreachable!(),
1717 }
1718 }
1719
1720 pub fn open_untyped_table(
1722 &self,
1723 handle: impl TableHandle,
1724 ) -> Result<ReadOnlyUntypedTable, TableError> {
1725 let header = self
1726 .tree
1727 .get_table_untyped(handle.name(), TableType::Normal)?
1728 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1729
1730 match header {
1731 InternalTableDefinition::Normal {
1732 table_root,
1733 fixed_key_size,
1734 fixed_value_size,
1735 ..
1736 } => Ok(ReadOnlyUntypedTable::new(
1737 table_root,
1738 fixed_key_size,
1739 fixed_value_size,
1740 self.mem.clone(),
1741 )),
1742 InternalTableDefinition::Multimap { .. } => unreachable!(),
1743 }
1744 }
1745
1746 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1748 &self,
1749 definition: MultimapTableDefinition<K, V>,
1750 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1751 let header = self
1752 .tree
1753 .get_table::<K, V>(definition.name(), TableType::Multimap)?
1754 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1755
1756 match header {
1757 InternalTableDefinition::Normal { .. } => unreachable!(),
1758 InternalTableDefinition::Multimap {
1759 table_root,
1760 table_length,
1761 ..
1762 } => Ok(ReadOnlyMultimapTable::new(
1763 table_root,
1764 table_length,
1765 PageHint::Clean,
1766 self.tree.transaction_guard().clone(),
1767 self.mem.clone(),
1768 )?),
1769 }
1770 }
1771
1772 pub fn open_untyped_multimap_table(
1774 &self,
1775 handle: impl MultimapTableHandle,
1776 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1777 let header = self
1778 .tree
1779 .get_table_untyped(handle.name(), TableType::Multimap)?
1780 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1781
1782 match header {
1783 InternalTableDefinition::Normal { .. } => unreachable!(),
1784 InternalTableDefinition::Multimap {
1785 table_root,
1786 table_length,
1787 fixed_key_size,
1788 fixed_value_size,
1789 ..
1790 } => Ok(ReadOnlyUntypedMultimapTable::new(
1791 table_root,
1792 table_length,
1793 fixed_key_size,
1794 fixed_value_size,
1795 self.mem.clone(),
1796 )),
1797 }
1798 }
1799
1800 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1802 self.tree
1803 .list_tables(TableType::Normal)
1804 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1805 }
1806
1807 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1809 self.tree
1810 .list_tables(TableType::Multimap)
1811 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1812 }
1813
1814 pub fn close(self) -> Result<(), TransactionError> {
1822 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1823 return Err(TransactionError::ReadTransactionStillInUse(self));
1824 }
1825 Ok(())
1827 }
1828}
1829
1830impl Debug for ReadTransaction {
1831 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1832 f.write_str("ReadTransaction")
1833 }
1834}
1835
1836#[cfg(test)]
1837mod test {
1838 use crate::{Database, TableDefinition};
1839
1840 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1841
1842 #[test]
1843 fn transaction_id_persistence() {
1844 let tmpfile = crate::create_tempfile();
1845 let db = Database::create(tmpfile.path()).unwrap();
1846 let write_txn = db.begin_write().unwrap();
1847 {
1848 let mut table = write_txn.open_table(X).unwrap();
1849 table.insert("hello", "world").unwrap();
1850 }
1851 let first_txn_id = write_txn.transaction_id;
1852 write_txn.commit().unwrap();
1853 drop(db);
1854
1855 let db2 = Database::create(tmpfile.path()).unwrap();
1856 let write_txn = db2.begin_write().unwrap();
1857 assert!(write_txn.transaction_id > first_txn_id);
1858 }
1859}