1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, Page,
9 PageHint, PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType,
10 TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
11};
12use crate::types::{Key, Value};
13use crate::{
14 AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
15 ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
16 TableDefinition, TableError, TableHandle, TransactionError, TypeName,
17 UntypedMultimapTableHandle, UntypedTableHandle,
18};
19#[cfg(feature = "logging")]
20use log::{debug, warn};
21use std::borrow::Borrow;
22use std::cmp::min;
23use std::collections::{BTreeMap, HashMap, HashSet};
24use std::fmt::{Debug, Display, Formatter};
25use std::marker::PhantomData;
26use std::mem::size_of;
27use std::ops::RangeBounds;
28#[cfg(any(test, fuzzing))]
29use std::ops::RangeFull;
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::{Arc, Mutex};
32use std::{panic, thread};
33
34const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
35const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
36 SystemTableDefinition::new("next_savepoint_id");
37pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
38 SystemTableDefinition::new("persistent_savepoints");
39pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
42pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
43
44#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
45pub(crate) enum AllocatorStateKey {
46 Region(u32),
47 RegionTracker,
48 TransactionId,
49}
50
51impl Value for AllocatorStateKey {
52 type SelfType<'a> = Self;
53 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
54
55 fn fixed_width() -> Option<usize> {
56 Some(1 + size_of::<u32>())
57 }
58
59 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
60 where
61 Self: 'a,
62 {
63 match data[0] {
64 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
65 1 => Self::RegionTracker,
66 2 => Self::TransactionId,
67 _ => unreachable!(),
68 }
69 }
70
71 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
72 where
73 Self: 'a,
74 Self: 'b,
75 {
76 let mut result = Self::AsBytes::default();
77 match value {
78 Self::Region(region) => {
79 result[0] = 0;
80 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
81 }
82 Self::RegionTracker => {
83 result[0] = 1;
84 }
85 Self::TransactionId => {
86 result[0] = 2;
87 }
88 }
89
90 result
91 }
92
93 fn type_name() -> TypeName {
94 TypeName::internal("redb::AllocatorStateKey")
95 }
96}
97
98impl Key for AllocatorStateKey {
99 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
100 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
101 }
102}
103
104pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
105 name: &'a str,
106 _key_type: PhantomData<K>,
107 _value_type: PhantomData<V>,
108}
109
110impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
111 pub const fn new(name: &'a str) -> Self {
112 assert!(!name.is_empty());
113 Self {
114 name,
115 _key_type: PhantomData,
116 _value_type: PhantomData,
117 }
118 }
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'a, K, V> {
122 fn name(&self) -> &str {
123 self.name
124 }
125}
126
127impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
128
129impl<'a, K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'a, K, V> {
130 fn clone(&self) -> Self {
131 *self
132 }
133}
134
135impl<'a, K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'a, K, V> {}
136
137impl<'a, K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'a, K, V> {
138 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
139 write!(
140 f,
141 "{}<{}, {}>",
142 self.name,
143 K::type_name().name(),
144 V::type_name().name()
145 )
146 }
147}
148
149#[derive(Debug)]
151pub struct DatabaseStats {
152 pub(crate) tree_height: u32,
153 pub(crate) allocated_pages: u64,
154 pub(crate) leaf_pages: u64,
155 pub(crate) branch_pages: u64,
156 pub(crate) stored_leaf_bytes: u64,
157 pub(crate) metadata_bytes: u64,
158 pub(crate) fragmented_bytes: u64,
159 pub(crate) page_size: usize,
160}
161
162impl DatabaseStats {
163 pub fn tree_height(&self) -> u32 {
165 self.tree_height
166 }
167
168 pub fn allocated_pages(&self) -> u64 {
170 self.allocated_pages
171 }
172
173 pub fn leaf_pages(&self) -> u64 {
175 self.leaf_pages
176 }
177
178 pub fn branch_pages(&self) -> u64 {
180 self.branch_pages
181 }
182
183 pub fn stored_bytes(&self) -> u64 {
186 self.stored_leaf_bytes
187 }
188
189 pub fn metadata_bytes(&self) -> u64 {
191 self.metadata_bytes
192 }
193
194 pub fn fragmented_bytes(&self) -> u64 {
196 self.fragmented_bytes
197 }
198
199 pub fn page_size(&self) -> usize {
201 self.page_size
202 }
203}
204
205#[derive(Copy, Clone, Debug)]
206#[non_exhaustive]
207pub enum Durability {
208 None,
214 Eventual,
217 Immediate,
220 #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
223 Paranoid,
224}
225
226#[derive(Copy, Clone, Debug, PartialEq, Eq)]
229enum InternalDurability {
230 None,
231 Eventual,
232 Immediate,
233}
234
235pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
237 name: String,
238 namespace: &'s mut SystemNamespace<'db>,
239 tree: BtreeMut<'s, K, V>,
240 transaction_guard: Arc<TransactionGuard>,
241}
242
243impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
244 fn new(
245 name: &str,
246 table_root: Option<BtreeHeader>,
247 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
248 guard: Arc<TransactionGuard>,
249 mem: Arc<TransactionalMemory>,
250 namespace: &'s mut SystemNamespace<'db>,
251 ) -> SystemTable<'db, 's, K, V> {
252 SystemTable {
253 name: name.to_string(),
254 namespace,
255 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
256 transaction_guard: guard,
257 }
258 }
259
260 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
261 where
262 K: 'a,
263 {
264 self.tree.get(key.borrow())
265 }
266
267 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
268 where
269 K: 'a,
270 KR: Borrow<K::SelfType<'a>> + 'a,
271 {
272 self.tree
273 .range(&range)
274 .map(|x| Range::new(x, self.transaction_guard.clone()))
275 }
276
277 pub fn insert<'k, 'v>(
278 &mut self,
279 key: impl Borrow<K::SelfType<'k>>,
280 value: impl Borrow<V::SelfType<'v>>,
281 ) -> Result<Option<AccessGuard<V>>> {
282 let value_len = V::as_bytes(value.borrow()).as_ref().len();
283 if value_len > MAX_VALUE_LENGTH {
284 return Err(StorageError::ValueTooLarge(value_len));
285 }
286 let key_len = K::as_bytes(key.borrow()).as_ref().len();
287 if key_len > MAX_VALUE_LENGTH {
288 return Err(StorageError::ValueTooLarge(key_len));
289 }
290 if value_len + key_len > MAX_PAIR_LENGTH {
291 return Err(StorageError::ValueTooLarge(value_len + key_len));
292 }
293 self.tree.insert(key.borrow(), value.borrow())
294 }
295
296 pub fn remove<'a>(
297 &mut self,
298 key: impl Borrow<K::SelfType<'a>>,
299 ) -> Result<Option<AccessGuard<V>>>
300 where
301 K: 'a,
302 {
303 self.tree.remove(key.borrow())
304 }
305}
306
307impl<'db, 's, K: Key + 'static, V: Value + 'static> Drop for SystemTable<'db, 's, K, V> {
308 fn drop(&mut self) {
309 self.namespace.close_table(
310 &self.name,
311 &self.tree,
312 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
313 );
314 }
315}
316
317struct SystemNamespace<'db> {
318 table_tree: TableTreeMut<'db>,
319 transaction_guard: Arc<TransactionGuard>,
320}
321
322impl<'db> SystemNamespace<'db> {
323 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
324 &'s mut self,
325 transaction: &'txn WriteTransaction,
326 definition: SystemTableDefinition<K, V>,
327 ) -> Result<SystemTable<'db, 's, K, V>> {
328 #[cfg(feature = "logging")]
329 debug!("Opening system table: {}", definition);
330 let (root, _) = self
331 .table_tree
332 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
333 .map_err(|e| {
334 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
335 })?;
336 transaction.dirty.store(true, Ordering::Release);
337
338 Ok(SystemTable::new(
339 definition.name(),
340 root,
341 transaction.freed_pages.clone(),
342 self.transaction_guard.clone(),
343 transaction.mem.clone(),
344 self,
345 ))
346 }
347
348 fn close_table<K: Key + 'static, V: Value + 'static>(
349 &mut self,
350 name: &str,
351 table: &BtreeMut<K, V>,
352 length: u64,
353 ) {
354 self.table_tree
355 .stage_update_table_root(name, table.get_root(), length);
356 }
357}
358
359struct TableNamespace<'db> {
360 open_tables: HashMap<String, &'static panic::Location<'static>>,
361 table_tree: TableTreeMut<'db>,
362}
363
364impl<'db> TableNamespace<'db> {
365 #[track_caller]
366 fn inner_open<K: Key + 'static, V: Value + 'static>(
367 &mut self,
368 name: &str,
369 table_type: TableType,
370 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
371 if let Some(location) = self.open_tables.get(name) {
372 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
373 }
374
375 let root = self
376 .table_tree
377 .get_or_create_table::<K, V>(name, table_type)?;
378 self.open_tables
379 .insert(name.to_string(), panic::Location::caller());
380
381 Ok(root)
382 }
383
384 #[track_caller]
385 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
386 &mut self,
387 transaction: &'txn WriteTransaction,
388 definition: MultimapTableDefinition<K, V>,
389 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
390 #[cfg(feature = "logging")]
391 debug!("Opening multimap table: {}", definition);
392 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
393 transaction.dirty.store(true, Ordering::Release);
394
395 Ok(MultimapTable::new(
396 definition.name(),
397 root,
398 length,
399 transaction.freed_pages.clone(),
400 transaction.mem.clone(),
401 transaction,
402 ))
403 }
404
405 #[track_caller]
406 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
407 &mut self,
408 transaction: &'txn WriteTransaction,
409 definition: TableDefinition<K, V>,
410 ) -> Result<Table<'txn, K, V>, TableError> {
411 #[cfg(feature = "logging")]
412 debug!("Opening table: {}", definition);
413 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
414 transaction.dirty.store(true, Ordering::Release);
415
416 Ok(Table::new(
417 definition.name(),
418 root,
419 transaction.freed_pages.clone(),
420 transaction.mem.clone(),
421 transaction,
422 ))
423 }
424
425 #[track_caller]
426 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
427 if let Some(location) = self.open_tables.get(name) {
428 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
429 }
430
431 self.table_tree.delete_table(name, table_type)
432 }
433
434 #[track_caller]
435 fn delete_table(
436 &mut self,
437 transaction: &WriteTransaction,
438 name: &str,
439 ) -> Result<bool, TableError> {
440 #[cfg(feature = "logging")]
441 debug!("Deleting table: {}", name);
442 transaction.dirty.store(true, Ordering::Release);
443 self.inner_delete(name, TableType::Normal)
444 }
445
446 #[track_caller]
447 fn delete_multimap_table(
448 &mut self,
449 transaction: &WriteTransaction,
450 name: &str,
451 ) -> Result<bool, TableError> {
452 #[cfg(feature = "logging")]
453 debug!("Deleting multimap table: {}", name);
454 transaction.dirty.store(true, Ordering::Release);
455 self.inner_delete(name, TableType::Multimap)
456 }
457
458 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
459 &mut self,
460 name: &str,
461 table: &BtreeMut<K, V>,
462 length: u64,
463 ) {
464 self.open_tables.remove(name).unwrap();
465 self.table_tree
466 .stage_update_table_root(name, table.get_root(), length);
467 }
468}
469
470pub struct WriteTransaction {
474 transaction_tracker: Arc<TransactionTracker>,
475 mem: Arc<TransactionalMemory>,
476 transaction_guard: Arc<TransactionGuard>,
477 transaction_id: TransactionId,
478 freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
481 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
482 post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
485 tables: Mutex<TableNamespace<'static>>,
486 system_tables: Mutex<SystemNamespace<'static>>,
487 completed: bool,
488 dirty: AtomicBool,
489 durability: InternalDurability,
490 two_phase_commit: bool,
491 quick_repair: bool,
492 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
494 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
495}
496
497impl WriteTransaction {
498 pub(crate) fn new(
499 guard: TransactionGuard,
500 transaction_tracker: Arc<TransactionTracker>,
501 mem: Arc<TransactionalMemory>,
502 ) -> Result<Self> {
503 let transaction_id = guard.id();
504 let guard = Arc::new(guard);
505
506 let root_page = mem.get_data_root();
507 let system_page = mem.get_system_root();
508 let freed_root = mem.get_freed_root();
509 let freed_pages = Arc::new(Mutex::new(vec![]));
510 let post_commit_frees = Arc::new(Mutex::new(vec![]));
511
512 let tables = TableNamespace {
513 open_tables: Default::default(),
514 table_tree: TableTreeMut::new(
515 root_page,
516 guard.clone(),
517 mem.clone(),
518 freed_pages.clone(),
519 ),
520 };
521 let system_tables = SystemNamespace {
522 table_tree: TableTreeMut::new(
523 system_page,
524 guard.clone(),
525 mem.clone(),
526 freed_pages.clone(),
527 ),
528 transaction_guard: guard.clone(),
529 };
530
531 Ok(Self {
532 transaction_tracker,
533 mem: mem.clone(),
534 transaction_guard: guard.clone(),
535 transaction_id,
536 tables: Mutex::new(tables),
537 system_tables: Mutex::new(system_tables),
538 freed_tree: Mutex::new(BtreeMut::new(
539 freed_root,
540 guard,
541 mem,
542 post_commit_frees.clone(),
543 )),
544 freed_pages,
545 post_commit_frees,
546 completed: false,
547 dirty: AtomicBool::new(false),
548 durability: InternalDurability::Immediate,
549 two_phase_commit: false,
550 quick_repair: false,
551 created_persistent_savepoints: Mutex::new(Default::default()),
552 deleted_persistent_savepoints: Mutex::new(vec![]),
553 })
554 }
555
556 #[cfg(any(test, fuzzing))]
557 pub fn print_allocated_page_debug(&self) {
558 let mut all_allocated: HashSet<PageNumber> =
559 HashSet::from_iter(self.mem.all_allocated_pages());
560
561 let tracker = self.mem.tracker_page();
562 all_allocated.remove(&tracker);
563 println!("Tracker page");
564 println!("{tracker:?}");
565
566 let table_allocators = self
567 .tables
568 .lock()
569 .unwrap()
570 .table_tree
571 .all_referenced_pages()
572 .unwrap();
573 let mut table_pages = vec![];
574 for (i, allocator) in table_allocators.iter().enumerate() {
575 allocator.get_allocated_pages(i.try_into().unwrap(), &mut table_pages);
576 }
577 println!("Tables");
578 for p in table_pages {
579 all_allocated.remove(&p);
580 println!("{p:?}");
581 }
582
583 let system_table_allocators = self
584 .system_tables
585 .lock()
586 .unwrap()
587 .table_tree
588 .all_referenced_pages()
589 .unwrap();
590 let mut system_table_pages = vec![];
591 for (i, allocator) in system_table_allocators.iter().enumerate() {
592 allocator.get_allocated_pages(i.try_into().unwrap(), &mut system_table_pages);
593 }
594 println!("System tables");
595 for p in system_table_pages {
596 all_allocated.remove(&p);
597 println!("{p:?}");
598 }
599
600 println!("Free table");
601 if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
602 for p in freed_iter {
603 let p = p.unwrap();
604 all_allocated.remove(&p);
605 println!("{p:?}");
606 }
607 }
608 println!("Pending free (i.e. in freed table)");
609 for entry in self
610 .freed_tree
611 .lock()
612 .unwrap()
613 .range::<RangeFull, FreedTableKey>(&(..))
614 .unwrap()
615 {
616 let entry = entry.unwrap();
617 let value = entry.value();
618 for i in 0..value.len() {
619 let p = value.get(i);
620 all_allocated.remove(&p);
621 println!("{p:?}");
622 }
623 }
624 if !all_allocated.is_empty() {
625 println!("Leaked pages");
626 for p in all_allocated {
627 println!("{p:?}");
628 }
629 }
630 }
631
632 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
641 if self.durability != InternalDurability::Immediate {
642 return Err(SavepointError::InvalidSavepoint);
643 }
644
645 let mut savepoint = self.ephemeral_savepoint()?;
646
647 let mut system_tables = self.system_tables.lock().unwrap();
648
649 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
650 next_table.insert((), savepoint.get_id().next())?;
651 drop(next_table);
652
653 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
654 savepoint_table.insert(
655 savepoint.get_id(),
656 SerializedSavepoint::from_savepoint(&savepoint),
657 )?;
658
659 savepoint.set_persistent();
660
661 self.created_persistent_savepoints
662 .lock()
663 .unwrap()
664 .insert(savepoint.get_id());
665
666 Ok(savepoint.get_id().0)
667 }
668
669 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
670 self.transaction_guard.clone()
671 }
672
673 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
674 let mut system_tables = self.system_tables.lock().unwrap();
675 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
676 let value = next_table.get(())?;
677 if let Some(next_id) = value {
678 Ok(Some(next_id.value()))
679 } else {
680 Ok(None)
681 }
682 }
683
684 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
686 let mut system_tables = self.system_tables.lock().unwrap();
687 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
688 let value = table.get(SavepointId(id))?;
689
690 value
691 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
692 .ok_or(SavepointError::InvalidSavepoint)
693 }
694
695 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
702 if self.durability != InternalDurability::Immediate {
703 return Err(SavepointError::InvalidSavepoint);
704 }
705 let mut system_tables = self.system_tables.lock().unwrap();
706 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
707 let savepoint = table.remove(SavepointId(id))?;
708 if let Some(serialized) = savepoint {
709 let savepoint = serialized
710 .value()
711 .to_savepoint(self.transaction_tracker.clone());
712 self.deleted_persistent_savepoints
713 .lock()
714 .unwrap()
715 .push((savepoint.get_id(), savepoint.get_transaction_id()));
716 Ok(true)
717 } else {
718 Ok(false)
719 }
720 }
721
722 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
724 let mut system_tables = self.system_tables.lock().unwrap();
725 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
726 let mut savepoints = vec![];
727 for savepoint in table.range::<SavepointId>(..)? {
728 savepoints.push(savepoint?.0.value().0);
729 }
730 Ok(savepoints.into_iter())
731 }
732
733 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
735 let id = self
736 .transaction_tracker
737 .register_read_transaction(&self.mem)?;
738
739 Ok(TransactionGuard::new_read(
740 id,
741 self.transaction_tracker.clone(),
742 ))
743 }
744
745 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
746 let id = self.transaction_tracker.allocate_savepoint();
747 Ok((id, self.allocate_read_transaction()?.leak()))
748 }
749
750 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
756 if self.dirty.load(Ordering::Acquire) {
757 return Err(SavepointError::InvalidSavepoint);
758 }
759
760 let (id, transaction_id) = self.allocate_savepoint()?;
761 #[cfg(feature = "logging")]
762 debug!(
763 "Creating savepoint id={:?}, txn_id={:?}",
764 id, transaction_id
765 );
766
767 let regional_allocators = self.mem.get_raw_allocator_states();
768 let root = self.mem.get_data_root();
769 let system_root = self.mem.get_system_root();
770 let freed_root = self.mem.get_freed_root();
771 let savepoint = Savepoint::new_ephemeral(
772 &self.mem,
773 self.transaction_tracker.clone(),
774 id,
775 transaction_id,
776 root,
777 system_root,
778 freed_root,
779 regional_allocators,
780 );
781
782 Ok(savepoint)
783 }
784
785 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
789 assert_eq!(
791 std::ptr::from_ref(self.transaction_tracker.as_ref()),
792 savepoint.db_address()
793 );
794
795 if !self
796 .transaction_tracker
797 .is_valid_savepoint(savepoint.get_id())
798 {
799 return Err(SavepointError::InvalidSavepoint);
800 }
801 #[cfg(feature = "logging")]
802 debug!(
803 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
804 savepoint.get_id(),
805 self.transaction_id
806 );
807 assert_eq!(self.mem.get_version(), savepoint.get_version());
810 self.dirty.store(true, Ordering::Release);
811
812 let old_table_tree = TableTreeMut::new(
821 savepoint.get_user_root(),
822 self.transaction_guard.clone(),
823 self.mem.clone(),
824 self.freed_pages.clone(),
825 );
826 let current_root_pages = self
828 .tables
829 .lock()
830 .unwrap()
831 .table_tree
832 .all_referenced_pages()?;
833 let old_root_pages = old_table_tree.all_referenced_pages()?;
834
835 self.tables.lock().unwrap().table_tree = TableTreeMut::new(
837 savepoint.get_user_root(),
838 self.transaction_guard.clone(),
839 self.mem.clone(),
840 self.freed_pages.clone(),
841 );
842
843 let mut txn_id = savepoint.get_transaction_id().raw_id();
845 let mut freed_tree = self.freed_tree.lock().unwrap();
846 loop {
847 let lower = FreedTableKey {
848 transaction_id: txn_id,
849 pagination_id: 0,
850 };
851
852 if freed_tree.range(&(lower..))?.next().is_none() {
853 break;
854 }
855 let lower = FreedTableKey {
856 transaction_id: txn_id,
857 pagination_id: 0,
858 };
859 let upper = FreedTableKey {
860 transaction_id: txn_id + 1,
861 pagination_id: 0,
862 };
863
864 let mut pending_pages = vec![];
866 for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
867 let item = entry?;
868 for i in 0..item.value().len() {
869 let p = item.value().get(i);
870 if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) {
871 pending_pages.push(p);
872 }
873 }
874 }
875
876 let mut pagination_counter = 0u64;
877 while !pending_pages.is_empty() {
878 let chunk_size = 100;
879 let buffer_size = FreedPageList::required_bytes(chunk_size);
880 let key = FreedTableKey {
881 transaction_id: txn_id,
882 pagination_id: pagination_counter,
883 };
884 let mut access_guard =
885 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
886
887 let len = pending_pages.len();
888 access_guard.as_mut().clear();
889 for page in pending_pages.drain(len - min(len, chunk_size)..) {
890 access_guard.as_mut().push_back(page);
891 }
892 drop(access_guard);
893
894 pagination_counter += 1;
895 }
896
897 txn_id += 1;
898 }
899
900 let mut freed_pages = self.freed_pages.lock().unwrap();
902 for i in 0..current_root_pages.len() {
903 let mut pages = vec![];
904 current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages);
905 for page in pages {
906 if self.mem.uncommitted(page) {
907 self.mem.free(page);
908 } else {
909 freed_pages.push(page);
910 }
911 }
912 }
913 drop(freed_pages);
914
915 self.transaction_tracker
918 .invalidate_savepoints_after(savepoint.get_id());
919 for persistent_savepoint in self.list_persistent_savepoints()? {
920 if persistent_savepoint > savepoint.get_id().0 {
921 self.delete_persistent_savepoint(persistent_savepoint)?;
922 }
923 }
924
925 Ok(())
926 }
927
928 pub fn set_durability(&mut self, durability: Durability) {
933 let no_created = self
934 .created_persistent_savepoints
935 .lock()
936 .unwrap()
937 .is_empty();
938 let no_deleted = self
939 .deleted_persistent_savepoints
940 .lock()
941 .unwrap()
942 .is_empty();
943 assert!(no_created && no_deleted);
944
945 self.durability = match durability {
946 Durability::None => InternalDurability::None,
947 Durability::Eventual => InternalDurability::Eventual,
948 Durability::Immediate => InternalDurability::Immediate,
949 #[allow(deprecated)]
950 Durability::Paranoid => {
951 self.set_two_phase_commit(true);
952 InternalDurability::Immediate
953 }
954 };
955 }
956
957 pub fn set_two_phase_commit(&mut self, enabled: bool) {
997 self.two_phase_commit = enabled;
998 }
999
1000 pub fn set_quick_repair(&mut self, enabled: bool) {
1011 self.quick_repair = enabled;
1012 }
1013
1014 #[track_caller]
1018 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1019 &'txn self,
1020 definition: TableDefinition<K, V>,
1021 ) -> Result<Table<'txn, K, V>, TableError> {
1022 self.tables.lock().unwrap().open_table(self, definition)
1023 }
1024
1025 #[track_caller]
1029 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1030 &'txn self,
1031 definition: MultimapTableDefinition<K, V>,
1032 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1033 self.tables
1034 .lock()
1035 .unwrap()
1036 .open_multimap_table(self, definition)
1037 }
1038
1039 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1040 &self,
1041 name: &str,
1042 table: &BtreeMut<K, V>,
1043 length: u64,
1044 ) {
1045 self.tables.lock().unwrap().close_table(name, table, length);
1046 }
1047
1048 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1052 let name = definition.name().to_string();
1053 drop(definition);
1055 self.tables.lock().unwrap().delete_table(self, &name)
1056 }
1057
1058 pub fn delete_multimap_table(
1062 &self,
1063 definition: impl MultimapTableHandle,
1064 ) -> Result<bool, TableError> {
1065 let name = definition.name().to_string();
1066 drop(definition);
1068 self.tables
1069 .lock()
1070 .unwrap()
1071 .delete_multimap_table(self, &name)
1072 }
1073
1074 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1076 self.tables
1077 .lock()
1078 .unwrap()
1079 .table_tree
1080 .list_tables(TableType::Normal)
1081 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1082 }
1083
1084 pub fn list_multimap_tables(
1086 &self,
1087 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1088 self.tables
1089 .lock()
1090 .unwrap()
1091 .table_tree
1092 .list_tables(TableType::Multimap)
1093 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1094 }
1095
1096 pub fn commit(mut self) -> Result<(), CommitError> {
1101 self.completed = true;
1103 self.commit_inner()
1104 }
1105
1106 fn commit_inner(&mut self) -> Result<(), CommitError> {
1107 if self.quick_repair {
1109 self.two_phase_commit = true;
1110 }
1111
1112 #[cfg(feature = "logging")]
1113 debug!(
1114 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1115 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1116 );
1117 match self.durability {
1118 InternalDurability::None => self.non_durable_commit()?,
1119 InternalDurability::Eventual => self.durable_commit(true)?,
1120 InternalDurability::Immediate => self.durable_commit(false)?,
1121 }
1122
1123 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1124 self.transaction_tracker
1125 .deallocate_savepoint(*savepoint, *transaction);
1126 }
1127
1128 #[cfg(feature = "logging")]
1129 debug!(
1130 "Finished commit of transaction id={:?}",
1131 self.transaction_id
1132 );
1133
1134 Ok(())
1135 }
1136
1137 pub fn abort(mut self) -> Result {
1141 self.completed = true;
1143 self.abort_inner()
1144 }
1145
1146 fn abort_inner(&mut self) -> Result {
1147 #[cfg(feature = "logging")]
1148 debug!("Aborting transaction id={:?}", self.transaction_id);
1149 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1150 match self.delete_persistent_savepoint(savepoint.0) {
1151 Ok(_) => {}
1152 Err(err) => match err {
1153 SavepointError::InvalidSavepoint => {
1154 unreachable!();
1155 }
1156 SavepointError::Storage(storage_err) => {
1157 return Err(storage_err);
1158 }
1159 },
1160 }
1161 }
1162 self.tables
1163 .lock()
1164 .unwrap()
1165 .table_tree
1166 .clear_table_root_updates();
1167 self.mem.rollback_uncommitted_writes()?;
1168 #[cfg(feature = "logging")]
1169 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1170 Ok(())
1171 }
1172
1173 pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
1174 let free_until_transaction = self
1175 .transaction_tracker
1176 .oldest_live_read_transaction()
1177 .map_or(self.transaction_id, |x| x.next());
1178 self.process_freed_pages(free_until_transaction)?;
1179
1180 let user_root = self
1181 .tables
1182 .lock()
1183 .unwrap()
1184 .table_tree
1185 .flush_table_root_updates()?
1186 .finalize_dirty_checksums()?;
1187
1188 let mut system_tables = self.system_tables.lock().unwrap();
1189 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1190 system_tree
1191 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1192 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1193
1194 if self.quick_repair {
1195 system_tree.create_table_and_flush_table_root(
1196 ALLOCATOR_STATE_TABLE_NAME,
1197 |tree: &mut AllocatorStateTree| {
1198 let mut pagination_counter = 0;
1199
1200 loop {
1201 let num_regions = self
1202 .mem
1203 .reserve_allocator_state(tree, self.transaction_id)?;
1204
1205 self.store_freed_pages(&mut pagination_counter, true)?;
1209
1210 if self.mem.try_save_allocator_state(tree, num_regions)? {
1211 return Ok(());
1212 }
1213
1214 while let Some(guards) = tree.last()? {
1219 let key = guards.0.value();
1220 drop(guards);
1221 tree.remove(&key)?;
1222 }
1223 }
1224 },
1225 )?;
1226 } else {
1227 let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1232 self.store_freed_pages(&mut 0, savepoint_exists)?;
1233 }
1234
1235 let system_root = system_tree.finalize_dirty_checksums()?;
1236
1237 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1239
1240 self.mem.commit(
1241 user_root,
1242 system_root,
1243 freed_root,
1244 self.transaction_id,
1245 eventual,
1246 self.two_phase_commit,
1247 )?;
1248
1249 self.transaction_tracker.clear_pending_non_durable_commits();
1251
1252 for page in self.post_commit_frees.lock().unwrap().drain(..) {
1255 self.mem.free(page);
1256 }
1257
1258 Ok(())
1259 }
1260
1261 pub(crate) fn non_durable_commit(&mut self) -> Result {
1263 let user_root = self
1264 .tables
1265 .lock()
1266 .unwrap()
1267 .table_tree
1268 .flush_table_root_updates()?
1269 .finalize_dirty_checksums()?;
1270
1271 let system_root = self
1272 .system_tables
1273 .lock()
1274 .unwrap()
1275 .table_tree
1276 .flush_table_root_updates()?
1277 .finalize_dirty_checksums()?;
1278
1279 self.store_freed_pages(&mut 0, true)?;
1282
1283 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
1285
1286 self.mem
1287 .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
1288 self.transaction_tracker
1291 .register_non_durable_commit(self.transaction_id);
1292 Ok(())
1293 }
1294
1295 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1298 let mut progress = false;
1299 if self.mem.relocate_region_tracker()? {
1301 progress = true;
1302 }
1303
1304 let mut highest_pages = BTreeMap::new();
1306 let mut tables = self.tables.lock().unwrap();
1307 let table_tree = &mut tables.table_tree;
1308 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1309 let mut system_tables = self.system_tables.lock().unwrap();
1310 let system_table_tree = &mut system_tables.table_tree;
1311 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1312
1313 let mut relocation_map = HashMap::new();
1315 for path in highest_pages.into_values().rev() {
1316 if relocation_map.contains_key(&path.page_number()) {
1317 continue;
1318 }
1319 let old_page = self.mem.get_page(path.page_number())?;
1320 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1321 let new_page_number = new_page.get_page_number();
1322 new_page.memory_mut()[0] = old_page.memory()[0];
1325 drop(new_page);
1326 if new_page_number < path.page_number() {
1328 relocation_map.insert(path.page_number(), new_page_number);
1329 for parent in path.parents() {
1330 if relocation_map.contains_key(parent) {
1331 continue;
1332 }
1333 let old_parent = self.mem.get_page(*parent)?;
1334 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1335 let new_page_number = new_page.get_page_number();
1336 new_page.memory_mut()[0] = old_parent.memory()[0];
1339 drop(new_page);
1340 relocation_map.insert(*parent, new_page_number);
1341 }
1342 } else {
1343 self.mem.free(new_page_number);
1344 break;
1345 }
1346 }
1347
1348 if !relocation_map.is_empty() {
1349 progress = true;
1350 }
1351
1352 table_tree.relocate_tables(&relocation_map)?;
1353 system_table_tree.relocate_tables(&relocation_map)?;
1354
1355 Ok(progress)
1356 }
1357
1358 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1361 assert_eq!(PageNumber::serialized_size(), 8);
1363 let lookup_key = FreedTableKey {
1364 transaction_id: free_until.raw_id(),
1365 pagination_id: 0,
1366 };
1367
1368 let mut to_remove = vec![];
1369 let mut freed_tree = self.freed_tree.lock().unwrap();
1370 for entry in freed_tree.range(&(..lookup_key))? {
1371 let entry = entry?;
1372 to_remove.push(entry.key());
1373 let value = entry.value();
1374 for i in 0..value.len() {
1375 self.mem.free(value.get(i));
1376 }
1377 }
1378
1379 for key in to_remove {
1381 freed_tree.remove(&key)?;
1382 }
1383
1384 Ok(())
1385 }
1386
1387 fn store_freed_pages(
1388 &self,
1389 pagination_counter: &mut u64,
1390 include_post_commit_free: bool,
1391 ) -> Result {
1392 assert_eq!(PageNumber::serialized_size(), 8); let mut freed_tree = self.freed_tree.lock().unwrap();
1395 if include_post_commit_free {
1396 self.freed_pages
1399 .lock()
1400 .unwrap()
1401 .extend(self.post_commit_frees.lock().unwrap().drain(..));
1402 }
1403 while !self.freed_pages.lock().unwrap().is_empty() {
1404 let chunk_size = 100;
1405 let buffer_size = FreedPageList::required_bytes(chunk_size);
1406 let key = FreedTableKey {
1407 transaction_id: self.transaction_id.raw_id(),
1408 pagination_id: *pagination_counter,
1409 };
1410 let mut access_guard =
1411 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1412
1413 let mut freed_pages = self.freed_pages.lock().unwrap();
1414 let len = freed_pages.len();
1415 access_guard.as_mut().clear();
1416 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1417 access_guard.as_mut().push_back(page);
1418 }
1419 drop(access_guard);
1420
1421 *pagination_counter += 1;
1422
1423 if include_post_commit_free {
1424 freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1427 }
1428 }
1429
1430 Ok(())
1431 }
1432
1433 pub fn stats(&self) -> Result<DatabaseStats> {
1435 let tables = self.tables.lock().unwrap();
1436 let table_tree = &tables.table_tree;
1437 let data_tree_stats = table_tree.stats()?;
1438
1439 let system_tables = self.system_tables.lock().unwrap();
1440 let system_table_tree = &system_tables.table_tree;
1441 let system_tree_stats = system_table_tree.stats()?;
1442
1443 let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1444
1445 let total_metadata_bytes = data_tree_stats.metadata_bytes()
1446 + system_tree_stats.metadata_bytes
1447 + system_tree_stats.stored_leaf_bytes
1448 + freed_tree_stats.metadata_bytes
1449 + freed_tree_stats.stored_leaf_bytes;
1450 let total_fragmented = data_tree_stats.fragmented_bytes()
1451 + system_tree_stats.fragmented_bytes
1452 + freed_tree_stats.fragmented_bytes;
1453
1454 Ok(DatabaseStats {
1455 tree_height: data_tree_stats.tree_height(),
1456 allocated_pages: self.mem.count_allocated_pages()?,
1457 leaf_pages: data_tree_stats.leaf_pages(),
1458 branch_pages: data_tree_stats.branch_pages(),
1459 stored_leaf_bytes: data_tree_stats.stored_bytes(),
1460 metadata_bytes: total_metadata_bytes,
1461 fragmented_bytes: total_fragmented,
1462 page_size: self.mem.get_page_size(),
1463 })
1464 }
1465
1466 #[cfg(any(test, fuzzing))]
1467 pub fn num_region_tracker_pages(&self) -> u64 {
1468 1 << self.mem.tracker_page().page_order
1469 }
1470
1471 #[allow(dead_code)]
1472 pub(crate) fn print_debug(&self) -> Result {
1473 let mut tables = self.tables.lock().unwrap();
1475 if let Some(page) = tables
1476 .table_tree
1477 .flush_table_root_updates()
1478 .unwrap()
1479 .finalize_dirty_checksums()
1480 .unwrap()
1481 {
1482 eprintln!("Master tree:");
1483 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
1484 Some(page),
1485 PageHint::None,
1486 self.transaction_guard.clone(),
1487 self.mem.clone(),
1488 )?;
1489 master_tree.print_debug(true)?;
1490 }
1491
1492 Ok(())
1493 }
1494}
1495
1496impl Drop for WriteTransaction {
1497 fn drop(&mut self) {
1498 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1499 #[allow(unused_variables)]
1500 if let Err(error) = self.abort_inner() {
1501 #[cfg(feature = "logging")]
1502 warn!("Failure automatically aborting transaction: {}", error);
1503 }
1504 }
1505 }
1506}
1507
1508pub struct ReadTransaction {
1512 mem: Arc<TransactionalMemory>,
1513 tree: TableTree,
1514}
1515
1516impl ReadTransaction {
1517 pub(crate) fn new(
1518 mem: Arc<TransactionalMemory>,
1519 guard: TransactionGuard,
1520 ) -> Result<Self, TransactionError> {
1521 let root_page = mem.get_data_root();
1522 let guard = Arc::new(guard);
1523 Ok(Self {
1524 mem: mem.clone(),
1525 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
1526 .map_err(TransactionError::Storage)?,
1527 })
1528 }
1529
1530 pub fn open_table<K: Key + 'static, V: Value + 'static>(
1532 &self,
1533 definition: TableDefinition<K, V>,
1534 ) -> Result<ReadOnlyTable<K, V>, TableError> {
1535 let header = self
1536 .tree
1537 .get_table::<K, V>(definition.name(), TableType::Normal)?
1538 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1539
1540 match header {
1541 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
1542 definition.name().to_string(),
1543 table_root,
1544 PageHint::Clean,
1545 self.tree.transaction_guard().clone(),
1546 self.mem.clone(),
1547 )?),
1548 InternalTableDefinition::Multimap { .. } => unreachable!(),
1549 }
1550 }
1551
1552 pub fn open_untyped_table(
1554 &self,
1555 handle: impl TableHandle,
1556 ) -> Result<ReadOnlyUntypedTable, TableError> {
1557 let header = self
1558 .tree
1559 .get_table_untyped(handle.name(), TableType::Normal)?
1560 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1561
1562 match header {
1563 InternalTableDefinition::Normal {
1564 table_root,
1565 fixed_key_size,
1566 fixed_value_size,
1567 ..
1568 } => Ok(ReadOnlyUntypedTable::new(
1569 table_root,
1570 fixed_key_size,
1571 fixed_value_size,
1572 self.mem.clone(),
1573 )),
1574 InternalTableDefinition::Multimap { .. } => unreachable!(),
1575 }
1576 }
1577
1578 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
1580 &self,
1581 definition: MultimapTableDefinition<K, V>,
1582 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1583 let header = self
1584 .tree
1585 .get_table::<K, V>(definition.name(), TableType::Multimap)?
1586 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1587
1588 match header {
1589 InternalTableDefinition::Normal { .. } => unreachable!(),
1590 InternalTableDefinition::Multimap {
1591 table_root,
1592 table_length,
1593 ..
1594 } => Ok(ReadOnlyMultimapTable::new(
1595 table_root,
1596 table_length,
1597 PageHint::Clean,
1598 self.tree.transaction_guard().clone(),
1599 self.mem.clone(),
1600 )?),
1601 }
1602 }
1603
1604 pub fn open_untyped_multimap_table(
1606 &self,
1607 handle: impl MultimapTableHandle,
1608 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1609 let header = self
1610 .tree
1611 .get_table_untyped(handle.name(), TableType::Multimap)?
1612 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1613
1614 match header {
1615 InternalTableDefinition::Normal { .. } => unreachable!(),
1616 InternalTableDefinition::Multimap {
1617 table_root,
1618 table_length,
1619 fixed_key_size,
1620 fixed_value_size,
1621 ..
1622 } => Ok(ReadOnlyUntypedMultimapTable::new(
1623 table_root,
1624 table_length,
1625 fixed_key_size,
1626 fixed_value_size,
1627 self.mem.clone(),
1628 )),
1629 }
1630 }
1631
1632 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1634 self.tree
1635 .list_tables(TableType::Normal)
1636 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1637 }
1638
1639 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1641 self.tree
1642 .list_tables(TableType::Multimap)
1643 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1644 }
1645
1646 pub fn close(self) -> Result<(), TransactionError> {
1654 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
1655 return Err(TransactionError::ReadTransactionStillInUse(self));
1656 }
1657 Ok(())
1659 }
1660}
1661
1662impl Debug for ReadTransaction {
1663 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1664 f.write_str("ReadTransaction")
1665 }
1666}
1667
1668#[cfg(test)]
1669mod test {
1670 use crate::{Database, TableDefinition};
1671
1672 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1673
1674 #[test]
1675 fn transaction_id_persistence() {
1676 let tmpfile = crate::create_tempfile();
1677 let db = Database::create(tmpfile.path()).unwrap();
1678 let write_txn = db.begin_write().unwrap();
1679 {
1680 let mut table = write_txn.open_table(X).unwrap();
1681 table.insert("hello", "world").unwrap();
1682 }
1683 let first_txn_id = write_txn.transaction_id;
1684 write_txn.commit().unwrap();
1685 drop(db);
1686
1687 let db2 = Database::create(tmpfile.path()).unwrap();
1688 let write_txn = db2.begin_write().unwrap();
1689 assert!(write_txn.transaction_id > first_txn_id);
1690 }
1691}