1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey,
9 InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageListMut,
10 PageNumber, PageTrackerPolicy, SerializedSavepoint, TableTree, TableTreeMut, TableType,
11 TransactionalMemory,
12};
13use crate::types::{Key, Value};
14use crate::{
15 AccessGuard, AccessGuardMut, ExtractIf, MultimapTable, MultimapTableDefinition,
16 MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
17 Savepoint, SavepointError, StorageError, Table, TableDefinition, TableError, TableHandle,
18 TransactionError, TypeName, UntypedMultimapTableHandle, UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::ops::RangeFull;
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::{Arc, Mutex};
32use std::{panic, thread};
33
34const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
35const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
36 SystemTableDefinition::new("next_savepoint_id");
37pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
38 SystemTableDefinition::new("persistent_savepoints");
39pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
42 TransactionIdWithPagination,
43 PageList,
44> = SystemTableDefinition::new("data_pages_allocated");
45pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
48 SystemTableDefinition::new("data_pages_unreachable");
49pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
52 SystemTableDefinition::new("system_pages_unreachable");
53pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
56pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64 data: &'a [u8],
65}
66
67impl PageList<'_> {
68 fn required_bytes(len: usize) -> usize {
69 2 + PageNumber::serialized_size() * len
70 }
71
72 pub(crate) fn len(&self) -> usize {
73 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74 }
75
76 pub(crate) fn get(&self, index: usize) -> PageNumber {
77 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78 PageNumber::from_le_bytes(
79 self.data[start..(start + PageNumber::serialized_size())]
80 .try_into()
81 .unwrap(),
82 )
83 }
84}
85
86impl Value for PageList<'_> {
87 type SelfType<'a>
88 = PageList<'a>
89 where
90 Self: 'a;
91 type AsBytes<'a>
92 = &'a [u8]
93 where
94 Self: 'a;
95
96 fn fixed_width() -> Option<usize> {
97 None
98 }
99
100 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101 where
102 Self: 'a,
103 {
104 PageList { data }
105 }
106
107 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108 where
109 Self: 'b,
110 {
111 value.data
112 }
113
114 fn type_name() -> TypeName {
115 TypeName::internal("redb::PageList")
116 }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120 type BaseRefType = PageListMut;
121
122 fn initialize(data: &mut [u8]) {
123 assert!(data.len() >= 8);
124 data[..8].fill(0);
126 }
127
128 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130 }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135 pub(crate) transaction_id: u64,
136 pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140 type SelfType<'a>
141 = TransactionIdWithPagination
142 where
143 Self: 'a;
144 type AsBytes<'a>
145 = [u8; 2 * size_of::<u64>()]
146 where
147 Self: 'a;
148
149 fn fixed_width() -> Option<usize> {
150 Some(2 * size_of::<u64>())
151 }
152
153 fn from_bytes<'a>(data: &'a [u8]) -> Self
154 where
155 Self: 'a,
156 {
157 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159 Self {
160 transaction_id,
161 pagination_id,
162 }
163 }
164
165 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166 where
167 Self: 'b,
168 {
169 let mut result = [0u8; 2 * size_of::<u64>()];
170 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172 result
173 }
174
175 fn type_name() -> TypeName {
176 TypeName::internal("redb::TransactionIdWithPagination")
177 }
178}
179
180impl Key for TransactionIdWithPagination {
181 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182 let value1 = Self::from_bytes(data1);
183 let value2 = Self::from_bytes(data2);
184
185 match value1.transaction_id.cmp(&value2.transaction_id) {
186 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189 }
190 }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195 Region(u32),
196 RegionTracker,
197 TransactionId,
198}
199
200impl Value for AllocatorStateKey {
201 type SelfType<'a> = Self;
202 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
203
204 fn fixed_width() -> Option<usize> {
205 Some(1 + size_of::<u32>())
206 }
207
208 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
209 where
210 Self: 'a,
211 {
212 match data[0] {
213 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
214 1 => Self::RegionTracker,
215 2 => Self::TransactionId,
216 _ => unreachable!(),
217 }
218 }
219
220 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
221 where
222 Self: 'a,
223 Self: 'b,
224 {
225 let mut result = Self::AsBytes::default();
226 match value {
227 Self::Region(region) => {
228 result[0] = 0;
229 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
230 }
231 Self::RegionTracker => {
232 result[0] = 1;
233 }
234 Self::TransactionId => {
235 result[0] = 2;
236 }
237 }
238
239 result
240 }
241
242 fn type_name() -> TypeName {
243 TypeName::internal("redb::AllocatorStateKey")
244 }
245}
246
247impl Key for AllocatorStateKey {
248 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
249 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
250 }
251}
252
253pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
254 name: &'a str,
255 _key_type: PhantomData<K>,
256 _value_type: PhantomData<V>,
257}
258
259impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
260 pub const fn new(name: &'a str) -> Self {
261 assert!(!name.is_empty());
262 Self {
263 name,
264 _key_type: PhantomData,
265 _value_type: PhantomData,
266 }
267 }
268}
269
270impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
271 fn name(&self) -> &str {
272 self.name
273 }
274}
275
276impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
277
278impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
279 fn clone(&self) -> Self {
280 *self
281 }
282}
283
284impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
285
286impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
287 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
288 write!(
289 f,
290 "{}<{}, {}>",
291 self.name,
292 K::type_name().name(),
293 V::type_name().name()
294 )
295 }
296}
297
298#[derive(Debug)]
300pub struct DatabaseStats {
301 pub(crate) tree_height: u32,
302 pub(crate) allocated_pages: u64,
303 pub(crate) leaf_pages: u64,
304 pub(crate) branch_pages: u64,
305 pub(crate) stored_leaf_bytes: u64,
306 pub(crate) metadata_bytes: u64,
307 pub(crate) fragmented_bytes: u64,
308 pub(crate) page_size: usize,
309}
310
311impl DatabaseStats {
312 pub fn tree_height(&self) -> u32 {
314 self.tree_height
315 }
316
317 pub fn allocated_pages(&self) -> u64 {
319 self.allocated_pages
320 }
321
322 pub fn leaf_pages(&self) -> u64 {
324 self.leaf_pages
325 }
326
327 pub fn branch_pages(&self) -> u64 {
329 self.branch_pages
330 }
331
332 pub fn stored_bytes(&self) -> u64 {
335 self.stored_leaf_bytes
336 }
337
338 pub fn metadata_bytes(&self) -> u64 {
340 self.metadata_bytes
341 }
342
343 pub fn fragmented_bytes(&self) -> u64 {
345 self.fragmented_bytes
346 }
347
348 pub fn page_size(&self) -> usize {
350 self.page_size
351 }
352}
353
354#[derive(Copy, Clone, Debug)]
355#[non_exhaustive]
356pub enum Durability {
357 None,
363 Eventual,
366 Immediate,
369 #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
372 Paranoid,
373}
374
375#[derive(Copy, Clone, Debug, PartialEq, Eq)]
378enum InternalDurability {
379 None,
380 Eventual,
381 Immediate,
382}
383
384pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
386 name: String,
387 namespace: &'s mut SystemNamespace<'db>,
388 tree: BtreeMut<'s, K, V>,
389 transaction_guard: Arc<TransactionGuard>,
390}
391
392impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
393 fn new(
394 name: &str,
395 table_root: Option<BtreeHeader>,
396 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
397 guard: Arc<TransactionGuard>,
398 mem: Arc<TransactionalMemory>,
399 namespace: &'s mut SystemNamespace<'db>,
400 ) -> SystemTable<'db, 's, K, V> {
401 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
404 SystemTable {
405 name: name.to_string(),
406 namespace,
407 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
408 transaction_guard: guard,
409 }
410 }
411
412 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
413 where
414 K: 'a,
415 {
416 self.tree.get(key.borrow())
417 }
418
419 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
420 where
421 K: 'a,
422 KR: Borrow<K::SelfType<'a>> + 'a,
423 {
424 self.tree
425 .range(&range)
426 .map(|x| Range::new(x, self.transaction_guard.clone()))
427 }
428
429 pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
430 &mut self,
431 range: impl RangeBounds<KR> + 'a,
432 predicate: F,
433 ) -> Result<ExtractIf<K, V, F>>
434 where
435 KR: Borrow<K::SelfType<'a>> + 'a,
436 {
437 self.tree
438 .extract_from_if(&range, predicate)
439 .map(ExtractIf::new)
440 }
441
442 pub fn insert<'k, 'v>(
443 &mut self,
444 key: impl Borrow<K::SelfType<'k>>,
445 value: impl Borrow<V::SelfType<'v>>,
446 ) -> Result<Option<AccessGuard<V>>> {
447 let value_len = V::as_bytes(value.borrow()).as_ref().len();
448 if value_len > MAX_VALUE_LENGTH {
449 return Err(StorageError::ValueTooLarge(value_len));
450 }
451 let key_len = K::as_bytes(key.borrow()).as_ref().len();
452 if key_len > MAX_VALUE_LENGTH {
453 return Err(StorageError::ValueTooLarge(key_len));
454 }
455 if value_len + key_len > MAX_PAIR_LENGTH {
456 return Err(StorageError::ValueTooLarge(value_len + key_len));
457 }
458 self.tree.insert(key.borrow(), value.borrow())
459 }
460
461 pub fn remove<'a>(
462 &mut self,
463 key: impl Borrow<K::SelfType<'a>>,
464 ) -> Result<Option<AccessGuard<V>>>
465 where
466 K: 'a,
467 {
468 self.tree.remove(key.borrow())
469 }
470}
471
472impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
473 pub fn insert_reserve<'a>(
474 &mut self,
475 key: impl Borrow<K::SelfType<'a>>,
476 value_length: u32,
477 ) -> Result<AccessGuardMut<V>> {
478 if value_length as usize > MAX_VALUE_LENGTH {
479 return Err(StorageError::ValueTooLarge(value_length as usize));
480 }
481 let key_len = K::as_bytes(key.borrow()).as_ref().len();
482 if key_len > MAX_VALUE_LENGTH {
483 return Err(StorageError::ValueTooLarge(key_len));
484 }
485 if value_length as usize + key_len > MAX_PAIR_LENGTH {
486 return Err(StorageError::ValueTooLarge(value_length as usize + key_len));
487 }
488 self.tree.insert_reserve(key.borrow(), value_length)
489 }
490}
491
492impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
493 fn drop(&mut self) {
494 self.namespace.close_table(
495 &self.name,
496 &self.tree,
497 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
498 );
499 }
500}
501
502struct SystemNamespace<'db> {
503 table_tree: TableTreeMut<'db>,
504 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
505 transaction_guard: Arc<TransactionGuard>,
506}
507
508impl<'db> SystemNamespace<'db> {
509 fn new(
510 root_page: Option<BtreeHeader>,
511 guard: Arc<TransactionGuard>,
512 mem: Arc<TransactionalMemory>,
513 ) -> Self {
514 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
517 let freed_pages = Arc::new(Mutex::new(vec![]));
518 Self {
519 table_tree: TableTreeMut::new(
520 root_page,
521 guard.clone(),
522 mem,
523 freed_pages.clone(),
524 ignore,
525 ),
526 freed_pages,
527 transaction_guard: guard.clone(),
528 }
529 }
530
531 fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
532 self.freed_pages.clone()
533 }
534
535 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
536 &'s mut self,
537 transaction: &'txn WriteTransaction,
538 definition: SystemTableDefinition<K, V>,
539 ) -> Result<SystemTable<'db, 's, K, V>> {
540 #[cfg(feature = "logging")]
541 debug!("Opening system table: {}", definition);
542 let (root, _) = self
543 .table_tree
544 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
545 .map_err(|e| {
546 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
547 })?;
548 transaction.dirty.store(true, Ordering::Release);
549
550 Ok(SystemTable::new(
551 definition.name(),
552 root,
553 self.freed_pages.clone(),
554 self.transaction_guard.clone(),
555 transaction.mem.clone(),
556 self,
557 ))
558 }
559
560 fn close_table<K: Key + 'static, V: Value + 'static>(
561 &mut self,
562 name: &str,
563 table: &BtreeMut<K, V>,
564 length: u64,
565 ) {
566 self.table_tree
567 .stage_update_table_root(name, table.get_root(), length);
568 }
569}
570
571struct TableNamespace<'db> {
572 open_tables: HashMap<String, &'static panic::Location<'static>>,
573 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
574 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
575 table_tree: TableTreeMut<'db>,
576}
577
578impl TableNamespace<'_> {
579 fn new(
580 root_page: Option<BtreeHeader>,
581 guard: Arc<TransactionGuard>,
582 mem: Arc<TransactionalMemory>,
583 ) -> Self {
584 let allocated = if mem.file_format_v3() {
585 Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()))
586 } else {
587 Arc::new(Mutex::new(PageTrackerPolicy::Ignore))
588 };
589 let freed_pages = Arc::new(Mutex::new(vec![]));
590 let table_tree = TableTreeMut::new(
591 root_page,
592 guard,
593 mem,
594 freed_pages.clone(),
597 allocated.clone(),
598 );
599 Self {
600 open_tables: Default::default(),
601 table_tree,
602 freed_pages,
603 allocated_pages: allocated,
604 }
605 }
606
607 fn set_dirty(&mut self, transaction: &WriteTransaction) {
608 transaction.dirty.store(true, Ordering::Release);
609 if !transaction.transaction_tracker.any_savepoint_exists() {
610 *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
613 }
614 }
615
616 fn set_root(&mut self, root: Option<BtreeHeader>) {
617 assert!(self.open_tables.is_empty());
618 self.table_tree.set_root(root);
619 }
620
621 #[track_caller]
622 fn inner_open<K: Key + 'static, V: Value + 'static>(
623 &mut self,
624 name: &str,
625 table_type: TableType,
626 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
627 if let Some(location) = self.open_tables.get(name) {
628 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
629 }
630
631 let root = self
632 .table_tree
633 .get_or_create_table::<K, V>(name, table_type)?;
634 self.open_tables
635 .insert(name.to_string(), panic::Location::caller());
636
637 Ok(root)
638 }
639
640 #[track_caller]
641 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
642 &mut self,
643 transaction: &'txn WriteTransaction,
644 definition: MultimapTableDefinition<K, V>,
645 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
646 #[cfg(feature = "logging")]
647 debug!("Opening multimap table: {}", definition);
648 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
649 self.set_dirty(transaction);
650
651 Ok(MultimapTable::new(
652 definition.name(),
653 root,
654 length,
655 self.freed_pages.clone(),
656 self.allocated_pages.clone(),
657 transaction.mem.clone(),
658 transaction,
659 ))
660 }
661
662 #[track_caller]
663 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
664 &mut self,
665 transaction: &'txn WriteTransaction,
666 definition: TableDefinition<K, V>,
667 ) -> Result<Table<'txn, K, V>, TableError> {
668 #[cfg(feature = "logging")]
669 debug!("Opening table: {}", definition);
670 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
671 self.set_dirty(transaction);
672
673 Ok(Table::new(
674 definition.name(),
675 root,
676 self.freed_pages.clone(),
677 self.allocated_pages.clone(),
678 transaction.mem.clone(),
679 transaction,
680 ))
681 }
682
683 #[track_caller]
684 fn inner_rename(
685 &mut self,
686 name: &str,
687 new_name: &str,
688 table_type: TableType,
689 ) -> Result<(), TableError> {
690 if let Some(location) = self.open_tables.get(name) {
691 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
692 }
693
694 self.table_tree.rename_table(name, new_name, table_type)
695 }
696
697 #[track_caller]
698 fn rename_table(
699 &mut self,
700 transaction: &WriteTransaction,
701 name: &str,
702 new_name: &str,
703 ) -> Result<(), TableError> {
704 #[cfg(feature = "logging")]
705 debug!("Renaming table: {} to {}", name, new_name);
706 self.set_dirty(transaction);
707 self.inner_rename(name, new_name, TableType::Normal)
708 }
709
710 #[track_caller]
711 fn rename_multimap_table(
712 &mut self,
713 transaction: &WriteTransaction,
714 name: &str,
715 new_name: &str,
716 ) -> Result<(), TableError> {
717 #[cfg(feature = "logging")]
718 debug!("Renaming multimap table: {} to {}", name, new_name);
719 self.set_dirty(transaction);
720 self.inner_rename(name, new_name, TableType::Multimap)
721 }
722
723 #[track_caller]
724 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
725 if let Some(location) = self.open_tables.get(name) {
726 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
727 }
728
729 self.table_tree.delete_table(name, table_type)
730 }
731
732 #[track_caller]
733 fn delete_table(
734 &mut self,
735 transaction: &WriteTransaction,
736 name: &str,
737 ) -> Result<bool, TableError> {
738 #[cfg(feature = "logging")]
739 debug!("Deleting table: {}", name);
740 self.set_dirty(transaction);
741 self.inner_delete(name, TableType::Normal)
742 }
743
744 #[track_caller]
745 fn delete_multimap_table(
746 &mut self,
747 transaction: &WriteTransaction,
748 name: &str,
749 ) -> Result<bool, TableError> {
750 #[cfg(feature = "logging")]
751 debug!("Deleting multimap table: {}", name);
752 self.set_dirty(transaction);
753 self.inner_delete(name, TableType::Multimap)
754 }
755
756 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
757 &mut self,
758 name: &str,
759 table: &BtreeMut<K, V>,
760 length: u64,
761 ) {
762 self.open_tables.remove(name).unwrap();
763 self.table_tree
764 .stage_update_table_root(name, table.get_root(), length);
765 }
766}
767
768pub struct WriteTransaction {
772 transaction_tracker: Arc<TransactionTracker>,
773 mem: Arc<TransactionalMemory>,
774 transaction_guard: Arc<TransactionGuard>,
775 transaction_id: TransactionId,
776 freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
779 legacy_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
782 post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
785 tables: Mutex<TableNamespace<'static>>,
786 system_tables: Mutex<SystemNamespace<'static>>,
787 completed: bool,
788 dirty: AtomicBool,
789 durability: InternalDurability,
790 two_phase_commit: bool,
791 quick_repair: bool,
792 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
794 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
795}
796
797impl WriteTransaction {
798 pub(crate) fn new(
799 guard: TransactionGuard,
800 transaction_tracker: Arc<TransactionTracker>,
801 mem: Arc<TransactionalMemory>,
802 ) -> Result<Self> {
803 let transaction_id = guard.id();
804 let guard = Arc::new(guard);
805
806 let root_page = mem.get_data_root();
807 let system_page = mem.get_system_root();
808 let freed_root = mem.get_freed_root();
809 let post_commit_frees = Arc::new(Mutex::new(vec![]));
810
811 let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
812 let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
813 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
816
817 Ok(Self {
818 transaction_tracker,
819 mem: mem.clone(),
820 transaction_guard: guard.clone(),
821 transaction_id,
822 tables: Mutex::new(tables),
823 system_tables: Mutex::new(system_tables),
824 freed_tree: Mutex::new(BtreeMut::new(
825 freed_root,
826 guard,
827 mem,
828 post_commit_frees.clone(),
829 ignore,
830 )),
831 legacy_freed_pages: Arc::new(Mutex::new(vec![])),
832 post_commit_frees,
833 completed: false,
834 dirty: AtomicBool::new(false),
835 durability: InternalDurability::Immediate,
836 two_phase_commit: false,
837 quick_repair: false,
838 created_persistent_savepoints: Mutex::new(Default::default()),
839 deleted_persistent_savepoints: Mutex::new(vec![]),
840 })
841 }
842
843 pub(crate) fn version_asserts(&self) -> Result {
844 if self.mem.file_format_v3() {
845 assert_eq!(self.freed_tree.lock().unwrap().len()?, 0);
846 } else {
847 let mut system_tables = self.system_tables.lock().unwrap();
848 let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
849 assert!(
850 data_allocated
851 .range::<TransactionIdWithPagination>(..)?
852 .next()
853 .is_none()
854 );
855 drop(data_allocated);
856 let data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
857 assert!(
858 data_freed
859 .range::<TransactionIdWithPagination>(..)?
860 .next()
861 .is_none()
862 );
863 drop(data_freed);
864 let system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
865 assert!(
866 system_freed
867 .range::<TransactionIdWithPagination>(..)?
868 .next()
869 .is_none()
870 );
871 }
872
873 Ok(())
874 }
875
876 pub(crate) fn pending_free_pages(&self) -> Result<bool> {
877 if self.mem.get_freed_root().is_some() {
878 return Ok(true);
879 }
880 let mut system_tables = self.system_tables.lock().unwrap();
881 if system_tables
882 .open_system_table(self, DATA_FREED_TABLE)?
883 .tree
884 .get_root()
885 .is_some()
886 {
887 return Ok(true);
888 }
889 if system_tables
890 .open_system_table(self, SYSTEM_FREED_TABLE)?
891 .tree
892 .get_root()
893 .is_some()
894 {
895 return Ok(true);
896 }
897
898 Ok(false)
899 }
900
901 #[cfg(any(test, fuzzing))]
902 pub fn print_allocated_page_debug(&self) {
903 let mut all_allocated: HashSet<PageNumber> =
904 HashSet::from_iter(self.mem.all_allocated_pages());
905
906 let tracker = self.mem.tracker_page();
907 all_allocated.remove(&tracker);
908 println!("Tracker page");
909 println!("{tracker:?}");
910
911 let mut table_pages = vec![];
912 self.tables
913 .lock()
914 .unwrap()
915 .table_tree
916 .visit_all_pages(|path| {
917 table_pages.push(path.page_number());
918 Ok(())
919 })
920 .unwrap();
921 println!("Tables");
922 for p in table_pages {
923 all_allocated.remove(&p);
924 println!("{p:?}");
925 }
926
927 let mut system_table_pages = vec![];
928 self.system_tables
929 .lock()
930 .unwrap()
931 .table_tree
932 .visit_all_pages(|path| {
933 system_table_pages.push(path.page_number());
934 Ok(())
935 })
936 .unwrap();
937 println!("System tables");
938 for p in system_table_pages {
939 all_allocated.remove(&p);
940 println!("{p:?}");
941 }
942
943 println!("Free table");
944 if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
945 for p in freed_iter {
946 let p = p.unwrap();
947 all_allocated.remove(&p);
948 println!("{p:?}");
949 }
950 }
951 println!("Pending free (in legacy freed tree)");
952 for entry in self
953 .freed_tree
954 .lock()
955 .unwrap()
956 .range::<RangeFull, FreedTableKey>(&(..))
957 .unwrap()
958 {
959 let entry = entry.unwrap();
960 let value = entry.value();
961 for i in 0..value.len() {
962 let p = value.get(i);
963 all_allocated.remove(&p);
964 println!("{p:?}");
965 }
966 }
967 {
968 println!("Pending free (in data freed table)");
969 let mut system_tables = self.system_tables.lock().unwrap();
970 let data_freed = system_tables
971 .open_system_table(self, DATA_FREED_TABLE)
972 .unwrap();
973 for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
974 let (_, entry) = entry.unwrap();
975 let value = entry.value();
976 for i in 0..value.len() {
977 let p = value.get(i);
978 all_allocated.remove(&p);
979 println!("{p:?}");
980 }
981 }
982 }
983 {
984 println!("Pending free (in system freed table)");
985 let mut system_tables = self.system_tables.lock().unwrap();
986 let system_freed = system_tables
987 .open_system_table(self, SYSTEM_FREED_TABLE)
988 .unwrap();
989 for entry in system_freed
990 .range::<TransactionIdWithPagination>(..)
991 .unwrap()
992 {
993 let (_, entry) = entry.unwrap();
994 let value = entry.value();
995 for i in 0..value.len() {
996 let p = value.get(i);
997 all_allocated.remove(&p);
998 println!("{p:?}");
999 }
1000 }
1001 }
1002 {
1003 let pages = self.legacy_freed_pages.lock().unwrap();
1004 if !pages.is_empty() {
1005 println!("Pages in in-memory legacy freed_pages");
1006 for p in pages.iter() {
1007 println!("{p:?}");
1008 all_allocated.remove(p);
1009 }
1010 }
1011 }
1012 {
1013 let tables = self.tables.lock().unwrap();
1014 let pages = tables.freed_pages.lock().unwrap();
1015 if !pages.is_empty() {
1016 println!("Pages in in-memory data freed_pages");
1017 for p in pages.iter() {
1018 println!("{p:?}");
1019 all_allocated.remove(p);
1020 }
1021 }
1022 }
1023 {
1024 let system_tables = self.system_tables.lock().unwrap();
1025 let pages = system_tables.freed_pages.lock().unwrap();
1026 if !pages.is_empty() {
1027 println!("Pages in in-memory system freed_pages");
1028 for p in pages.iter() {
1029 println!("{p:?}");
1030 all_allocated.remove(p);
1031 }
1032 }
1033 }
1034 {
1035 let pages = self.post_commit_frees.lock().unwrap();
1036 if !pages.is_empty() {
1037 println!("Pages in in-memory post_commit_frees");
1038 for p in pages.iter() {
1039 println!("{p:?}");
1040 all_allocated.remove(p);
1041 }
1042 }
1043 }
1044 if !all_allocated.is_empty() {
1045 println!("Leaked pages");
1046 for p in all_allocated {
1047 println!("{p:?}");
1048 }
1049 }
1050 }
1051
1052 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
1061 if self.durability != InternalDurability::Immediate {
1062 return Err(SavepointError::InvalidSavepoint);
1063 }
1064
1065 let mut savepoint = self.ephemeral_savepoint()?;
1066
1067 let mut system_tables = self.system_tables.lock().unwrap();
1068
1069 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1070 next_table.insert((), savepoint.get_id().next())?;
1071 drop(next_table);
1072
1073 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1074 savepoint_table.insert(
1075 savepoint.get_id(),
1076 SerializedSavepoint::from_savepoint(&savepoint),
1077 )?;
1078
1079 savepoint.set_persistent();
1080
1081 self.created_persistent_savepoints
1082 .lock()
1083 .unwrap()
1084 .insert(savepoint.get_id());
1085
1086 Ok(savepoint.get_id().0)
1087 }
1088
1089 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1090 self.transaction_guard.clone()
1091 }
1092
1093 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1094 let mut system_tables = self.system_tables.lock().unwrap();
1095 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1096 let value = next_table.get(())?;
1097 if let Some(next_id) = value {
1098 Ok(Some(next_id.value()))
1099 } else {
1100 Ok(None)
1101 }
1102 }
1103
1104 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1106 let mut system_tables = self.system_tables.lock().unwrap();
1107 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1108 let value = table.get(SavepointId(id))?;
1109
1110 value
1111 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1112 .ok_or(SavepointError::InvalidSavepoint)
1113 }
1114
1115 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1122 if self.durability != InternalDurability::Immediate {
1123 return Err(SavepointError::InvalidSavepoint);
1124 }
1125 let mut system_tables = self.system_tables.lock().unwrap();
1126 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1127 let savepoint = table.remove(SavepointId(id))?;
1128 if let Some(serialized) = savepoint {
1129 let savepoint = serialized
1130 .value()
1131 .to_savepoint(self.transaction_tracker.clone());
1132 self.deleted_persistent_savepoints
1133 .lock()
1134 .unwrap()
1135 .push((savepoint.get_id(), savepoint.get_transaction_id()));
1136 Ok(true)
1137 } else {
1138 Ok(false)
1139 }
1140 }
1141
1142 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1144 let mut system_tables = self.system_tables.lock().unwrap();
1145 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1146 let mut savepoints = vec![];
1147 for savepoint in table.range::<SavepointId>(..)? {
1148 savepoints.push(savepoint?.0.value().0);
1149 }
1150 Ok(savepoints.into_iter())
1151 }
1152
1153 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1155 let id = self
1156 .transaction_tracker
1157 .register_read_transaction(&self.mem)?;
1158
1159 Ok(TransactionGuard::new_read(
1160 id,
1161 self.transaction_tracker.clone(),
1162 ))
1163 }
1164
1165 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1166 let transaction_id = self.allocate_read_transaction()?.leak();
1167 let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1168 Ok((id, transaction_id))
1169 }
1170
1171 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1177 if self.dirty.load(Ordering::Acquire) {
1178 return Err(SavepointError::InvalidSavepoint);
1179 }
1180
1181 let (id, transaction_id) = self.allocate_savepoint()?;
1182 #[cfg(feature = "logging")]
1183 debug!(
1184 "Creating savepoint id={:?}, txn_id={:?}",
1185 id, transaction_id
1186 );
1187
1188 let (system_root, freed_root, regional_allocators) = if self.mem.file_format_v3() {
1189 (None, None, vec![])
1190 } else {
1191 let system_root = self.mem.get_system_root();
1192 let freed_root = self.mem.get_freed_root();
1193 let regional_allocators = self.mem.get_raw_allocator_states();
1194
1195 (system_root, freed_root, regional_allocators)
1196 };
1197
1198 let root = self.mem.get_data_root();
1199 let savepoint = Savepoint::new_ephemeral(
1200 &self.mem,
1201 self.transaction_tracker.clone(),
1202 id,
1203 transaction_id,
1204 root,
1205 system_root,
1206 freed_root,
1207 regional_allocators,
1208 );
1209
1210 Ok(savepoint)
1211 }
1212
1213 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1217 assert_eq!(
1219 std::ptr::from_ref(self.transaction_tracker.as_ref()),
1220 savepoint.db_address()
1221 );
1222
1223 if !self
1224 .transaction_tracker
1225 .is_valid_savepoint(savepoint.get_id())
1226 {
1227 return Err(SavepointError::InvalidSavepoint);
1228 }
1229 #[cfg(feature = "logging")]
1230 debug!(
1231 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1232 savepoint.get_id(),
1233 self.transaction_id
1234 );
1235 assert_eq!(self.mem.get_version(), savepoint.get_version());
1238 self.dirty.store(true, Ordering::Release);
1239
1240 if self.mem.file_format_v3() {
1249 self.restore_savepoint_v2(savepoint)
1250 } else {
1251 self.restore_savepoint_v1(savepoint)
1252 }
1253 }
1254
1255 fn restore_savepoint_v1(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1256 let old_system_tree = TableTree::new(
1257 savepoint.get_system_root(),
1258 PageHint::None,
1259 self.transaction_guard.clone(),
1260 self.mem.clone(),
1261 )?;
1262 let old_freed_tree: Btree<FreedTableKey, FreedPageList<'static>> = Btree::new(
1263 savepoint.get_freed_root(),
1264 PageHint::None,
1265 self.transaction_guard.clone(),
1266 self.mem.clone(),
1267 )?;
1268
1269 let mut old_system_and_freed_pages = HashSet::new();
1273 old_system_tree.visit_all_pages(|path| {
1274 old_system_and_freed_pages.insert(path.page_number());
1275 Ok(())
1276 })?;
1277 old_freed_tree.visit_all_pages(|path| {
1278 old_system_and_freed_pages.insert(path.page_number());
1279 Ok(())
1280 })?;
1281
1282 {
1284 self.tables
1285 .lock()
1286 .unwrap()
1287 .set_root(savepoint.get_user_root());
1288 }
1289
1290 let mut txn_id = savepoint.get_transaction_id().next().raw_id();
1299 let mut freed_tree = self.freed_tree.lock().unwrap();
1300 loop {
1301 let lower = FreedTableKey {
1302 transaction_id: txn_id,
1303 pagination_id: 0,
1304 };
1305
1306 if freed_tree.range(&(lower..))?.next().is_none() {
1307 break;
1308 }
1309 let lower = FreedTableKey {
1310 transaction_id: txn_id,
1311 pagination_id: 0,
1312 };
1313 let upper = FreedTableKey {
1314 transaction_id: txn_id + 1,
1315 pagination_id: 0,
1316 };
1317
1318 let mut pending_pages = vec![];
1320 for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
1321 let item = entry?;
1322 for i in 0..item.value().len() {
1323 let p = item.value().get(i);
1324 if old_system_and_freed_pages.contains(&p) {
1326 pending_pages.push(p);
1327 }
1328 }
1329 }
1330
1331 let mut pagination_counter = 0u64;
1332 while !pending_pages.is_empty() {
1333 let chunk_size = 100;
1334 let buffer_size = FreedPageList::required_bytes(chunk_size);
1335 let key = FreedTableKey {
1336 transaction_id: txn_id,
1337 pagination_id: pagination_counter,
1338 };
1339 let mut access_guard =
1340 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1341
1342 let len = pending_pages.len();
1343 access_guard.as_mut().clear();
1344 for page in pending_pages.drain(len - min(len, chunk_size)..) {
1345 access_guard.as_mut().push_back(page);
1346 }
1347 drop(access_guard);
1348
1349 pagination_counter += 1;
1350 }
1351
1352 txn_id += 1;
1353 }
1354
1355 let mut current_system_and_freed_pages = HashSet::new();
1356 self.system_tables
1357 .lock()
1358 .unwrap()
1359 .table_tree
1360 .visit_all_pages(|path| {
1361 current_system_and_freed_pages.insert(path.page_number());
1362 Ok(())
1363 })?;
1364 freed_tree.visit_all_pages(|path| {
1365 current_system_and_freed_pages.insert(path.page_number());
1366 Ok(())
1367 })?;
1368
1369 let mut old_allocators: Vec<BuddyAllocator> = savepoint
1370 .get_regional_allocators()
1371 .iter()
1372 .map(|data| BuddyAllocator::from_savepoint_state(data))
1373 .collect();
1374
1375 {
1377 let oldest_unprocessed_transaction =
1378 if let Some(entry) = freed_tree.range::<RangeFull, FreedTableKey>(&(..))?.next() {
1379 entry?.key().transaction_id
1380 } else {
1381 self.transaction_id.raw_id()
1382 };
1383
1384 let lookup_key = FreedTableKey {
1385 transaction_id: oldest_unprocessed_transaction,
1386 pagination_id: 0,
1387 };
1388
1389 for entry in old_freed_tree.range(&(..lookup_key))? {
1392 let item = entry?;
1393 let pages: FreedPageList = item.value();
1394 for i in 0..pages.len() {
1395 let page = pages.get(i);
1396 assert!(
1397 old_allocators[page.region as usize]
1398 .is_allocated(page.page_index, page.page_order)
1399 );
1400 old_allocators[page.region as usize].free(page.page_index, page.page_order);
1401 }
1402 }
1403
1404 if let Some(header) = old_system_tree
1405 .get_table::<TransactionIdWithPagination, PageList>(
1406 DATA_FREED_TABLE.name(),
1407 TableType::Normal,
1408 )
1409 .map_err(|e| e.into_storage_error_or_corrupted("Data freed table is corrupted"))?
1410 {
1411 match header {
1412 InternalTableDefinition::Normal { table_root, .. } => {
1413 assert!(table_root.is_none());
1414 }
1415 InternalTableDefinition::Multimap { .. } => unreachable!(),
1416 };
1417 }
1418
1419 if let Some(header) = old_system_tree
1420 .get_table::<TransactionIdWithPagination, PageList>(
1421 SYSTEM_FREED_TABLE.name(),
1422 TableType::Normal,
1423 )
1424 .map_err(|e| e.into_storage_error_or_corrupted("System freed table is corrupted"))?
1425 {
1426 match header {
1427 InternalTableDefinition::Normal { table_root, .. } => {
1428 assert!(table_root.is_none());
1429 }
1430 InternalTableDefinition::Multimap { .. } => unreachable!(),
1431 };
1432 }
1433 }
1434
1435 let mut legacy_freed_pages = self.legacy_freed_pages.lock().unwrap();
1437 let tables = self.tables.lock().unwrap();
1438 let mut allocated = tables.allocated_pages.lock().unwrap();
1439 let mut already_awaiting_free: HashSet<PageNumber> =
1440 legacy_freed_pages.iter().copied().collect();
1441 already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied());
1442 already_awaiting_free.extend(tables.freed_pages.lock().unwrap().iter().copied());
1443 already_awaiting_free.extend(
1444 self.system_tables
1445 .lock()
1446 .unwrap()
1447 .system_freed_pages()
1448 .lock()
1449 .unwrap()
1450 .iter()
1451 .copied(),
1452 );
1453 let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators);
1454 for page in to_free {
1455 if already_awaiting_free.contains(&page) {
1456 continue;
1458 }
1459 if current_system_and_freed_pages.contains(&page) {
1460 continue;
1464 }
1465 if self.mem.uncommitted(page) {
1466 self.mem.free(page, &mut allocated);
1467 } else {
1468 legacy_freed_pages.push(page);
1469 }
1470 }
1471 drop(legacy_freed_pages);
1472 drop(freed_tree);
1473 drop(allocated);
1474 drop(tables);
1475
1476 self.transaction_tracker
1479 .invalidate_savepoints_after(savepoint.get_id());
1480 for persistent_savepoint in self.list_persistent_savepoints()? {
1481 if persistent_savepoint > savepoint.get_id().0 {
1482 self.delete_persistent_savepoint(persistent_savepoint)?;
1483 }
1484 }
1485
1486 Ok(())
1487 }
1488
1489 fn restore_savepoint_v2(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1490 {
1492 self.tables
1493 .lock()
1494 .unwrap()
1495 .set_root(savepoint.get_user_root());
1496 }
1497
1498 let txn_id = savepoint.get_transaction_id().next().raw_id();
1500 {
1501 let lower = TransactionIdWithPagination {
1502 transaction_id: txn_id,
1503 pagination_id: 0,
1504 };
1505 let mut system_tables = self.system_tables.lock().unwrap();
1506 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1507 for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1508 entry?;
1509 }
1510 }
1512 let freed_tree = self.freed_tree.lock().unwrap();
1513 assert!(freed_tree.get_root().is_none());
1514
1515 {
1517 let tables = self.tables.lock().unwrap();
1518 let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1519 let mut system_tables = self.system_tables.lock().unwrap();
1520 let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1521 let lower = TransactionIdWithPagination {
1522 transaction_id: txn_id,
1523 pagination_id: 0,
1524 };
1525 for entry in data_allocated.range(lower..)? {
1526 let (_, value) = entry?;
1527 for i in 0..value.value().len() {
1528 data_freed_pages.push(value.value().get(i));
1529 }
1530 }
1531 }
1532
1533 self.transaction_tracker
1536 .invalidate_savepoints_after(savepoint.get_id());
1537 for persistent_savepoint in self.list_persistent_savepoints()? {
1538 if persistent_savepoint > savepoint.get_id().0 {
1539 self.delete_persistent_savepoint(persistent_savepoint)?;
1540 }
1541 }
1542
1543 Ok(())
1544 }
1545
1546 pub fn set_durability(&mut self, durability: Durability) {
1551 let no_created = self
1552 .created_persistent_savepoints
1553 .lock()
1554 .unwrap()
1555 .is_empty();
1556 let no_deleted = self
1557 .deleted_persistent_savepoints
1558 .lock()
1559 .unwrap()
1560 .is_empty();
1561 assert!(no_created && no_deleted);
1562
1563 self.durability = match durability {
1564 Durability::None => InternalDurability::None,
1565 Durability::Eventual => InternalDurability::Eventual,
1566 Durability::Immediate => InternalDurability::Immediate,
1567 #[allow(deprecated)]
1568 Durability::Paranoid => {
1569 self.set_two_phase_commit(true);
1570 InternalDurability::Immediate
1571 }
1572 };
1573 }
1574
1575 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1615 self.two_phase_commit = enabled;
1616 }
1617
1618 pub fn set_quick_repair(&mut self, enabled: bool) {
1629 self.quick_repair = enabled;
1630 }
1631
1632 #[track_caller]
1636 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1637 &'txn self,
1638 definition: TableDefinition<K, V>,
1639 ) -> Result<Table<'txn, K, V>, TableError> {
1640 self.tables.lock().unwrap().open_table(self, definition)
1641 }
1642
1643 #[track_caller]
1647 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1648 &'txn self,
1649 definition: MultimapTableDefinition<K, V>,
1650 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1651 self.tables
1652 .lock()
1653 .unwrap()
1654 .open_multimap_table(self, definition)
1655 }
1656
1657 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1658 &self,
1659 name: &str,
1660 table: &BtreeMut<K, V>,
1661 length: u64,
1662 ) {
1663 self.tables.lock().unwrap().close_table(name, table, length);
1664 }
1665
1666 pub fn rename_table(
1668 &self,
1669 definition: impl TableHandle,
1670 new_name: impl TableHandle,
1671 ) -> Result<(), TableError> {
1672 let name = definition.name().to_string();
1673 drop(definition);
1675 self.tables
1676 .lock()
1677 .unwrap()
1678 .rename_table(self, &name, new_name.name())
1679 }
1680
1681 pub fn rename_multimap_table(
1683 &self,
1684 definition: impl MultimapTableHandle,
1685 new_name: impl MultimapTableHandle,
1686 ) -> Result<(), TableError> {
1687 let name = definition.name().to_string();
1688 drop(definition);
1690 self.tables
1691 .lock()
1692 .unwrap()
1693 .rename_multimap_table(self, &name, new_name.name())
1694 }
1695
1696 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1700 let name = definition.name().to_string();
1701 drop(definition);
1703 self.tables.lock().unwrap().delete_table(self, &name)
1704 }
1705
1706 pub fn delete_multimap_table(
1710 &self,
1711 definition: impl MultimapTableHandle,
1712 ) -> Result<bool, TableError> {
1713 let name = definition.name().to_string();
1714 drop(definition);
1716 self.tables
1717 .lock()
1718 .unwrap()
1719 .delete_multimap_table(self, &name)
1720 }
1721
1722 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1724 self.tables
1725 .lock()
1726 .unwrap()
1727 .table_tree
1728 .list_tables(TableType::Normal)
1729 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1730 }
1731
1732 pub fn list_multimap_tables(
1734 &self,
1735 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1736 self.tables
1737 .lock()
1738 .unwrap()
1739 .table_tree
1740 .list_tables(TableType::Multimap)
1741 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1742 }
1743
1744 pub fn commit(mut self) -> Result<(), CommitError> {
1749 self.completed = true;
1751 self.commit_inner()
1752 }
1753
1754 fn commit_inner(&mut self) -> Result<(), CommitError> {
1755 if self.quick_repair {
1757 self.two_phase_commit = true;
1758 }
1759
1760 let (user_root, allocated_pages, data_freed) =
1761 self.tables.lock().unwrap().table_tree.flush_and_close()?;
1762
1763 if self.mem.file_format_v3() {
1764 self.store_data_freed_pages(data_freed)?;
1765 } else {
1766 self.legacy_freed_pages.lock().unwrap().extend(data_freed);
1767 }
1768 self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1769
1770 #[cfg(feature = "logging")]
1771 debug!(
1772 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1773 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1774 );
1775 match self.durability {
1776 InternalDurability::None => self.non_durable_commit(user_root)?,
1777 InternalDurability::Eventual => self.durable_commit(user_root, true)?,
1778 InternalDurability::Immediate => self.durable_commit(user_root, false)?,
1779 }
1780
1781 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1782 self.transaction_tracker
1783 .deallocate_savepoint(*savepoint, *transaction);
1784 }
1785
1786 assert!(self.legacy_freed_pages.lock().unwrap().is_empty());
1787 assert!(
1788 self.system_tables
1789 .lock()
1790 .unwrap()
1791 .system_freed_pages()
1792 .lock()
1793 .unwrap()
1794 .is_empty()
1795 );
1796 assert!(
1797 self.tables
1798 .lock()
1799 .unwrap()
1800 .freed_pages
1801 .lock()
1802 .unwrap()
1803 .is_empty()
1804 );
1805 assert!(self.post_commit_frees.lock().unwrap().is_empty());
1806
1807 #[cfg(feature = "logging")]
1808 debug!(
1809 "Finished commit of transaction id={:?}",
1810 self.transaction_id
1811 );
1812
1813 Ok(())
1814 }
1815
1816 fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1817 let mut system_tables = self.system_tables.lock().unwrap();
1818 let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1819 let mut pagination_counter = 0;
1820 while !freed_pages.is_empty() {
1821 let chunk_size = 400;
1822 let buffer_size = PageList::required_bytes(chunk_size);
1823 let key = TransactionIdWithPagination {
1824 transaction_id: self.transaction_id.raw_id(),
1825 pagination_id: pagination_counter,
1826 };
1827 let mut access_guard =
1828 freed_table.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1829
1830 let len = freed_pages.len();
1831 access_guard.as_mut().clear();
1832 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1833 debug_assert!(
1835 self.mem.is_allocated(page),
1836 "Page is not allocated: {page:?}"
1837 );
1838 debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1839 access_guard.as_mut().push_back(page);
1840 }
1841
1842 pagination_counter += 1;
1843 }
1844
1845 Ok(())
1846 }
1847
1848 fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1849 let mut system_tables = self.system_tables.lock().unwrap();
1850 let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1851 let mut pagination_counter = 0;
1852 while !data_allocated_pages.is_empty() {
1853 let chunk_size = 400;
1854 let buffer_size = PageList::required_bytes(chunk_size);
1855 let key = TransactionIdWithPagination {
1856 transaction_id: self.transaction_id.raw_id(),
1857 pagination_id: pagination_counter,
1858 };
1859 let mut access_guard =
1860 allocated_table.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1861
1862 let len = data_allocated_pages.len();
1863 access_guard.as_mut().clear();
1864 for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1865 debug_assert!(
1869 self.mem.is_allocated(page),
1870 "Page is not allocated: {page:?}"
1871 );
1872 debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1873 access_guard.as_mut().push_back(page);
1874 }
1875
1876 pagination_counter += 1;
1877 }
1878
1879 let oldest = self
1881 .transaction_tracker
1882 .oldest_savepoint()
1883 .map_or(u64::MAX, |(_, x)| x.raw_id());
1884 let key = TransactionIdWithPagination {
1885 transaction_id: oldest,
1886 pagination_id: 0,
1887 };
1888 for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1889 entry?;
1890 }
1891
1892 Ok(())
1893 }
1894
1895 pub fn abort(mut self) -> Result {
1899 self.completed = true;
1901 self.abort_inner()
1902 }
1903
1904 fn abort_inner(&mut self) -> Result {
1905 #[cfg(feature = "logging")]
1906 debug!("Aborting transaction id={:?}", self.transaction_id);
1907 self.tables
1908 .lock()
1909 .unwrap()
1910 .table_tree
1911 .clear_root_updates_and_close();
1912 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1913 match self.delete_persistent_savepoint(savepoint.0) {
1914 Ok(_) => {}
1915 Err(err) => match err {
1916 SavepointError::InvalidSavepoint => {
1917 unreachable!();
1918 }
1919 SavepointError::Storage(storage_err) => {
1920 return Err(storage_err);
1921 }
1922 },
1923 }
1924 }
1925 self.mem.rollback_uncommitted_writes()?;
1926 #[cfg(feature = "logging")]
1927 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1928 Ok(())
1929 }
1930
1931 pub(crate) fn durable_commit(
1932 &mut self,
1933 user_root: Option<BtreeHeader>,
1934 eventual: bool,
1935 ) -> Result {
1936 let free_until_transaction = self
1937 .transaction_tracker
1938 .oldest_live_read_transaction()
1939 .map_or(self.transaction_id, |x| x.next());
1940 self.process_freed_pages(free_until_transaction)?;
1941
1942 let mut system_tables = self.system_tables.lock().unwrap();
1943 let system_freed_pages = system_tables.system_freed_pages();
1944 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1945 system_tree
1946 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1947 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1948
1949 if self.quick_repair {
1950 system_tree.create_table_and_flush_table_root(
1951 ALLOCATOR_STATE_TABLE_NAME,
1952 |system_tree_ref, tree: &mut AllocatorStateTree| {
1953 let mut legacy_pagination_counter = 0;
1954 let mut v2_pagination_counter = 0;
1955
1956 loop {
1957 let num_regions = self
1958 .mem
1959 .reserve_allocator_state(tree, self.transaction_id)?;
1960
1961 self.store_freed_pages(
1965 system_tree_ref,
1966 system_freed_pages.clone(),
1967 &mut legacy_pagination_counter,
1968 &mut v2_pagination_counter,
1969 true,
1970 )?;
1971
1972 if self.mem.try_save_allocator_state(tree, num_regions)? {
1973 return Ok(());
1974 }
1975
1976 while let Some(guards) = tree.last()? {
1981 let key = guards.0.value();
1982 drop(guards);
1983 tree.remove(&key)?;
1984 }
1985 }
1986 },
1987 )?;
1988 } else {
1989 let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
1994 self.store_freed_pages(
1995 system_tree,
1996 system_freed_pages,
1997 &mut 0,
1998 &mut 0,
1999 savepoint_exists,
2000 )?;
2001 }
2002
2003 let system_root = system_tree.finalize_dirty_checksums()?;
2004
2005 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
2007
2008 self.mem.commit(
2009 user_root,
2010 system_root,
2011 freed_root,
2012 self.transaction_id,
2013 eventual,
2014 self.two_phase_commit,
2015 )?;
2016
2017 self.transaction_tracker.clear_pending_non_durable_commits();
2019
2020 for page in self.post_commit_frees.lock().unwrap().drain(..) {
2023 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2024 }
2025
2026 Ok(())
2027 }
2028
2029 pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2031 let system_root = {
2032 let mut system_tables = self.system_tables.lock().unwrap();
2033 let system_freed_pages = system_tables.system_freed_pages();
2034 system_tables.table_tree.flush_table_root_updates()?;
2035 self.store_freed_pages(
2038 &mut system_tables.table_tree,
2039 system_freed_pages,
2040 &mut 0,
2041 &mut 0,
2042 true,
2043 )?;
2044
2045 system_tables
2046 .table_tree
2047 .flush_table_root_updates()?
2048 .finalize_dirty_checksums()?
2049 };
2050
2051 let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
2053
2054 self.mem
2055 .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
2056 self.transaction_tracker
2059 .register_non_durable_commit(self.transaction_id);
2060 Ok(())
2061 }
2062
2063 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
2066 let mut progress = false;
2067 if !self.mem.file_format_v3() && self.mem.relocate_region_tracker()? {
2069 progress = true;
2070 }
2071
2072 let mut highest_pages = BTreeMap::new();
2074 let mut tables = self.tables.lock().unwrap();
2075 let table_tree = &mut tables.table_tree;
2076 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2077 let mut system_tables = self.system_tables.lock().unwrap();
2078 let system_table_tree = &mut system_tables.table_tree;
2079 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2080
2081 let mut relocation_map = HashMap::new();
2083 for path in highest_pages.into_values().rev() {
2084 if relocation_map.contains_key(&path.page_number()) {
2085 continue;
2086 }
2087 let old_page = self.mem.get_page(path.page_number())?;
2088 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
2089 let new_page_number = new_page.get_page_number();
2090 new_page.memory_mut()[0] = old_page.memory()[0];
2093 drop(new_page);
2094 if new_page_number < path.page_number() {
2096 relocation_map.insert(path.page_number(), new_page_number);
2097 for parent in path.parents() {
2098 if relocation_map.contains_key(parent) {
2099 continue;
2100 }
2101 let old_parent = self.mem.get_page(*parent)?;
2102 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
2103 let new_page_number = new_page.get_page_number();
2104 new_page.memory_mut()[0] = old_parent.memory()[0];
2107 drop(new_page);
2108 relocation_map.insert(*parent, new_page_number);
2109 }
2110 } else {
2111 self.mem
2112 .free(new_page_number, &mut PageTrackerPolicy::Ignore);
2113 break;
2114 }
2115 }
2116
2117 if !relocation_map.is_empty() {
2118 progress = true;
2119 }
2120
2121 table_tree.relocate_tables(&relocation_map)?;
2122 system_table_tree.relocate_tables(&relocation_map)?;
2123
2124 Ok(progress)
2125 }
2126
2127 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
2130 assert_eq!(PageNumber::serialized_size(), 8);
2132 let lookup_key = FreedTableKey {
2134 transaction_id: free_until.raw_id(),
2135 pagination_id: 0,
2136 };
2137
2138 let mut to_remove = vec![];
2139 let mut freed_tree = self.freed_tree.lock().unwrap();
2140 for entry in freed_tree.range(&(..lookup_key))? {
2141 let entry = entry?;
2142 to_remove.push(entry.key());
2143 let value = entry.value();
2144 for i in 0..value.len() {
2145 self.mem.free(value.get(i), &mut PageTrackerPolicy::Ignore);
2146 }
2147 }
2148
2149 for key in to_remove {
2151 freed_tree.remove(&key)?;
2152 }
2153
2154 let mut system_tables = self.system_tables.lock().unwrap();
2156 {
2157 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
2158 let key = TransactionIdWithPagination {
2159 transaction_id: free_until.raw_id(),
2160 pagination_id: 0,
2161 };
2162 for entry in data_freed.extract_from_if(..key, |_, _| true)? {
2163 let (_, page_list) = entry?;
2164 for i in 0..page_list.value().len() {
2165 self.mem
2166 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
2167 }
2168 }
2169 }
2170
2171 {
2173 let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
2174 let key = TransactionIdWithPagination {
2175 transaction_id: free_until.raw_id(),
2176 pagination_id: 0,
2177 };
2178 for entry in system_freed.extract_from_if(..key, |_, _| true)? {
2179 let (_, page_list) = entry?;
2180 for i in 0..page_list.value().len() {
2181 self.mem
2182 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
2183 }
2184 }
2185 }
2186
2187 Ok(())
2188 }
2189
2190 fn store_freed_pages(
2191 &self,
2192 system_tree: &mut TableTreeMut,
2193 system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
2194 legacy_pagination_counter: &mut u64,
2195 v2_pagination_counter: &mut u64,
2196 include_post_commit_free: bool,
2197 ) -> Result {
2198 assert_eq!(PageNumber::serialized_size(), 8); if self.mem.file_format_v3() {
2201 if include_post_commit_free {
2202 system_tree.open_table_and_flush_table_root(
2203 SYSTEM_FREED_TABLE.name(),
2204 |system_freed_tree: &mut SystemFreedTree| {
2205 while !system_freed_pages.lock().unwrap().is_empty() {
2206 let chunk_size = 200;
2207 let buffer_size = PageList::required_bytes(chunk_size);
2208 let key = TransactionIdWithPagination {
2209 transaction_id: self.transaction_id.raw_id(),
2210 pagination_id: *v2_pagination_counter,
2211 };
2212 let mut access_guard = system_freed_tree
2213 .insert_reserve(&key, buffer_size.try_into().unwrap())?;
2214
2215 let mut freed_pages = system_freed_pages.lock().unwrap();
2216 let len = freed_pages.len();
2217 access_guard.as_mut().clear();
2218 for page in freed_pages.drain(len - min(len, chunk_size)..) {
2219 access_guard.as_mut().push_back(page);
2220 }
2221 drop(access_guard);
2222
2223 *v2_pagination_counter += 1;
2224 }
2225 Ok(())
2226 },
2227 )?;
2228 } else {
2229 self.post_commit_frees
2230 .lock()
2231 .unwrap()
2232 .extend(system_freed_pages.lock().unwrap().drain(..));
2233 }
2234 } else {
2235 self.legacy_freed_pages
2236 .lock()
2237 .unwrap()
2238 .extend(system_freed_pages.lock().unwrap().drain(..));
2239 }
2240
2241 let mut freed_tree = self.freed_tree.lock().unwrap();
2243 if include_post_commit_free {
2244 self.legacy_freed_pages
2247 .lock()
2248 .unwrap()
2249 .extend(self.post_commit_frees.lock().unwrap().drain(..));
2250 }
2251 while !self.legacy_freed_pages.lock().unwrap().is_empty() {
2252 let chunk_size = 100;
2253 let buffer_size = FreedPageList::required_bytes(chunk_size);
2254 let key = FreedTableKey {
2255 transaction_id: self.transaction_id.raw_id(),
2256 pagination_id: *legacy_pagination_counter,
2257 };
2258 let mut access_guard =
2259 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
2260
2261 let mut freed_pages = self.legacy_freed_pages.lock().unwrap();
2262 let len = freed_pages.len();
2263 access_guard.as_mut().clear();
2264 for page in freed_pages.drain(len - min(len, chunk_size)..) {
2265 access_guard.as_mut().push_back(page);
2266 }
2267 drop(access_guard);
2268
2269 *legacy_pagination_counter += 1;
2270
2271 if include_post_commit_free {
2272 freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
2275 }
2276 }
2277
2278 Ok(())
2279 }
2280
2281 pub fn stats(&self) -> Result<DatabaseStats> {
2283 let tables = self.tables.lock().unwrap();
2284 let table_tree = &tables.table_tree;
2285 let data_tree_stats = table_tree.stats()?;
2286
2287 let system_tables = self.system_tables.lock().unwrap();
2288 let system_table_tree = &system_tables.table_tree;
2289 let system_tree_stats = system_table_tree.stats()?;
2290
2291 let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
2292
2293 let total_metadata_bytes = data_tree_stats.metadata_bytes()
2294 + system_tree_stats.metadata_bytes
2295 + system_tree_stats.stored_leaf_bytes
2296 + freed_tree_stats.metadata_bytes
2297 + freed_tree_stats.stored_leaf_bytes;
2298 let total_fragmented = data_tree_stats.fragmented_bytes()
2299 + system_tree_stats.fragmented_bytes
2300 + freed_tree_stats.fragmented_bytes
2301 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
2302
2303 Ok(DatabaseStats {
2304 tree_height: data_tree_stats.tree_height(),
2305 allocated_pages: self.mem.count_allocated_pages()?,
2306 leaf_pages: data_tree_stats.leaf_pages(),
2307 branch_pages: data_tree_stats.branch_pages(),
2308 stored_leaf_bytes: data_tree_stats.stored_bytes(),
2309 metadata_bytes: total_metadata_bytes,
2310 fragmented_bytes: total_fragmented,
2311 page_size: self.mem.get_page_size(),
2312 })
2313 }
2314
2315 #[cfg(any(test, fuzzing))]
2316 pub fn num_region_tracker_pages(&self) -> u64 {
2317 if self.mem.file_format_v3() {
2318 0
2319 } else {
2320 1 << self.mem.tracker_page().page_order
2321 }
2322 }
2323
2324 #[allow(dead_code)]
2325 pub(crate) fn print_debug(&self) -> Result {
2326 let mut tables = self.tables.lock().unwrap();
2328 if let Some(page) = tables
2329 .table_tree
2330 .flush_table_root_updates()
2331 .unwrap()
2332 .finalize_dirty_checksums()
2333 .unwrap()
2334 {
2335 eprintln!("Master tree:");
2336 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2337 Some(page),
2338 PageHint::None,
2339 self.transaction_guard.clone(),
2340 self.mem.clone(),
2341 )?;
2342 master_tree.print_debug(true)?;
2343 }
2344
2345 Ok(())
2346 }
2347}
2348
2349impl Drop for WriteTransaction {
2350 fn drop(&mut self) {
2351 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2352 #[allow(unused_variables)]
2353 if let Err(error) = self.abort_inner() {
2354 #[cfg(feature = "logging")]
2355 warn!("Failure automatically aborting transaction: {}", error);
2356 }
2357 } else if !self.completed && self.mem.storage_failure() {
2358 self.tables
2359 .lock()
2360 .unwrap()
2361 .table_tree
2362 .clear_root_updates_and_close();
2363 }
2364 }
2365}
2366
2367pub struct ReadTransaction {
2371 mem: Arc<TransactionalMemory>,
2372 tree: TableTree,
2373}
2374
2375impl ReadTransaction {
2376 pub(crate) fn new(
2377 mem: Arc<TransactionalMemory>,
2378 guard: TransactionGuard,
2379 ) -> Result<Self, TransactionError> {
2380 let root_page = mem.get_data_root();
2381 let guard = Arc::new(guard);
2382 Ok(Self {
2383 mem: mem.clone(),
2384 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2385 .map_err(TransactionError::Storage)?,
2386 })
2387 }
2388
2389 pub fn open_table<K: Key + 'static, V: Value + 'static>(
2391 &self,
2392 definition: TableDefinition<K, V>,
2393 ) -> Result<ReadOnlyTable<K, V>, TableError> {
2394 let header = self
2395 .tree
2396 .get_table::<K, V>(definition.name(), TableType::Normal)?
2397 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2398
2399 match header {
2400 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2401 definition.name().to_string(),
2402 table_root,
2403 PageHint::Clean,
2404 self.tree.transaction_guard().clone(),
2405 self.mem.clone(),
2406 )?),
2407 InternalTableDefinition::Multimap { .. } => unreachable!(),
2408 }
2409 }
2410
2411 pub fn open_untyped_table(
2413 &self,
2414 handle: impl TableHandle,
2415 ) -> Result<ReadOnlyUntypedTable, TableError> {
2416 let header = self
2417 .tree
2418 .get_table_untyped(handle.name(), TableType::Normal)?
2419 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2420
2421 match header {
2422 InternalTableDefinition::Normal {
2423 table_root,
2424 fixed_key_size,
2425 fixed_value_size,
2426 ..
2427 } => Ok(ReadOnlyUntypedTable::new(
2428 table_root,
2429 fixed_key_size,
2430 fixed_value_size,
2431 self.mem.clone(),
2432 )),
2433 InternalTableDefinition::Multimap { .. } => unreachable!(),
2434 }
2435 }
2436
2437 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2439 &self,
2440 definition: MultimapTableDefinition<K, V>,
2441 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2442 let header = self
2443 .tree
2444 .get_table::<K, V>(definition.name(), TableType::Multimap)?
2445 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2446
2447 match header {
2448 InternalTableDefinition::Normal { .. } => unreachable!(),
2449 InternalTableDefinition::Multimap {
2450 table_root,
2451 table_length,
2452 ..
2453 } => Ok(ReadOnlyMultimapTable::new(
2454 table_root,
2455 table_length,
2456 PageHint::Clean,
2457 self.tree.transaction_guard().clone(),
2458 self.mem.clone(),
2459 )?),
2460 }
2461 }
2462
2463 pub fn open_untyped_multimap_table(
2465 &self,
2466 handle: impl MultimapTableHandle,
2467 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2468 let header = self
2469 .tree
2470 .get_table_untyped(handle.name(), TableType::Multimap)?
2471 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2472
2473 match header {
2474 InternalTableDefinition::Normal { .. } => unreachable!(),
2475 InternalTableDefinition::Multimap {
2476 table_root,
2477 table_length,
2478 fixed_key_size,
2479 fixed_value_size,
2480 ..
2481 } => Ok(ReadOnlyUntypedMultimapTable::new(
2482 table_root,
2483 table_length,
2484 fixed_key_size,
2485 fixed_value_size,
2486 self.mem.clone(),
2487 )),
2488 }
2489 }
2490
2491 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2493 self.tree
2494 .list_tables(TableType::Normal)
2495 .map(|x| x.into_iter().map(UntypedTableHandle::new))
2496 }
2497
2498 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2500 self.tree
2501 .list_tables(TableType::Multimap)
2502 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2503 }
2504
2505 pub fn close(self) -> Result<(), TransactionError> {
2513 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2514 return Err(TransactionError::ReadTransactionStillInUse(self));
2515 }
2516 Ok(())
2518 }
2519}
2520
2521impl Debug for ReadTransaction {
2522 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2523 f.write_str("ReadTransaction")
2524 }
2525}
2526
2527#[cfg(test)]
2528mod test {
2529 use crate::{Database, TableDefinition};
2530
2531 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2532
2533 #[test]
2534 fn transaction_id_persistence() {
2535 let tmpfile = crate::create_tempfile();
2536 let db = Database::create(tmpfile.path()).unwrap();
2537 let write_txn = db.begin_write().unwrap();
2538 {
2539 let mut table = write_txn.open_table(X).unwrap();
2540 table.insert("hello", "world").unwrap();
2541 }
2542 let first_txn_id = write_txn.transaction_id;
2543 write_txn.commit().unwrap();
2544 drop(db);
2545
2546 let db2 = Database::create(tmpfile.path()).unwrap();
2547 let write_txn = db2.begin_write().unwrap();
2548 assert!(write_txn.transaction_id > first_txn_id);
2549 }
2550}