use crate::db::TransactionGuard;
use crate::error::CommitError;
use crate::multimap_table::ReadOnlyUntypedMultimapTable;
use crate::sealed::Sealed;
use crate::table::ReadOnlyUntypedTable;
use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
Btree, BtreeHeader, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, Page,
PageHint, PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType,
TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH,
};
use crate::types::{Key, Value};
use crate::{
AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
TableDefinition, TableError, TableHandle, TransactionError, TypeName,
UntypedMultimapTableHandle, UntypedTableHandle,
};
#[cfg(feature = "logging")]
use log::{debug, warn};
use std::borrow::Borrow;
use std::cmp::min;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::mem::size_of;
use std::ops::RangeBounds;
#[cfg(any(test, fuzzing))]
use std::ops::RangeFull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{panic, thread};
const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
SystemTableDefinition::new("next_savepoint_id");
pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
SystemTableDefinition::new("persistent_savepoints");
pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
pub(crate) enum AllocatorStateKey {
Region(u32),
RegionTracker,
TransactionId,
}
impl Value for AllocatorStateKey {
type SelfType<'a> = Self;
type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
fn fixed_width() -> Option<usize> {
Some(1 + size_of::<u32>())
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
match data[0] {
0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
1 => Self::RegionTracker,
2 => Self::TransactionId,
_ => unreachable!(),
}
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
let mut result = Self::AsBytes::default();
match value {
Self::Region(region) => {
result[0] = 0;
result[1..].copy_from_slice(&u32::to_le_bytes(*region));
}
Self::RegionTracker => {
result[0] = 1;
}
Self::TransactionId => {
result[0] = 2;
}
}
result
}
fn type_name() -> TypeName {
TypeName::internal("redb::AllocatorStateKey")
}
}
impl Key for AllocatorStateKey {
fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<'a, K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'a, K, V> {
fn name(&self) -> &str {
self.name
}
}
impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
impl<'a, K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'a, K, V> {}
impl<'a, K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
#[derive(Debug)]
pub struct DatabaseStats {
pub(crate) tree_height: u32,
pub(crate) allocated_pages: u64,
pub(crate) leaf_pages: u64,
pub(crate) branch_pages: u64,
pub(crate) stored_leaf_bytes: u64,
pub(crate) metadata_bytes: u64,
pub(crate) fragmented_bytes: u64,
pub(crate) page_size: usize,
}
impl DatabaseStats {
pub fn tree_height(&self) -> u32 {
self.tree_height
}
pub fn allocated_pages(&self) -> u64 {
self.allocated_pages
}
pub fn leaf_pages(&self) -> u64 {
self.leaf_pages
}
pub fn branch_pages(&self) -> u64 {
self.branch_pages
}
pub fn stored_bytes(&self) -> u64 {
self.stored_leaf_bytes
}
pub fn metadata_bytes(&self) -> u64 {
self.metadata_bytes
}
pub fn fragmented_bytes(&self) -> u64 {
self.fragmented_bytes
}
pub fn page_size(&self) -> usize {
self.page_size
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum Durability {
None,
Eventual,
Immediate,
#[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")]
Paranoid,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum InternalDurability {
None,
Eventual,
Immediate,
}
pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
name: String,
namespace: &'s mut SystemNamespace<'db>,
tree: BtreeMut<'s, K, V>,
transaction_guard: Arc<TransactionGuard>,
}
impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
fn new(
name: &str,
table_root: Option<BtreeHeader>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
namespace: &'s mut SystemNamespace<'db>,
) -> SystemTable<'db, 's, K, V> {
SystemTable {
name: name.to_string(),
namespace,
tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages),
transaction_guard: guard,
}
}
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.get(key.borrow())
}
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree
.range(&range)
.map(|x| Range::new(x, self.transaction_guard.clone()))
}
pub fn insert<'k, 'v>(
&mut self,
key: impl Borrow<K::SelfType<'k>>,
value: impl Borrow<V::SelfType<'v>>,
) -> Result<Option<AccessGuard<V>>> {
let value_len = V::as_bytes(value.borrow()).as_ref().len();
if value_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_len));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
if value_len + key_len > MAX_PAIR_LENGTH {
return Err(StorageError::ValueTooLarge(value_len + key_len));
}
self.tree.insert(key.borrow(), value.borrow())
}
pub fn remove<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.remove(key.borrow())
}
}
impl<'db, 's, K: Key + 'static, V: Value + 'static> Drop for SystemTable<'db, 's, K, V> {
fn drop(&mut self) {
self.namespace.close_table(
&self.name,
&self.tree,
self.tree.get_root().map(|x| x.length).unwrap_or_default(),
);
}
}
struct SystemNamespace<'db> {
table_tree: TableTreeMut<'db>,
transaction_guard: Arc<TransactionGuard>,
}
impl<'db> SystemNamespace<'db> {
fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
&'s mut self,
transaction: &'txn WriteTransaction,
definition: SystemTableDefinition<K, V>,
) -> Result<SystemTable<'db, 's, K, V>> {
#[cfg(feature = "logging")]
debug!("Opening system table: {}", definition);
let (root, _) = self
.table_tree
.get_or_create_table::<K, V>(definition.name(), TableType::Normal)
.map_err(|e| {
e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
})?;
transaction.dirty.store(true, Ordering::Release);
Ok(SystemTable::new(
definition.name(),
root,
transaction.freed_pages.clone(),
self.transaction_guard.clone(),
transaction.mem.clone(),
self,
))
}
fn close_table<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
self.table_tree
.stage_update_table_root(name, table.get_root(), length);
}
}
struct TableNamespace<'db> {
open_tables: HashMap<String, &'static panic::Location<'static>>,
table_tree: TableTreeMut<'db>,
}
impl<'db> TableNamespace<'db> {
#[track_caller]
fn inner_open<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table_type: TableType,
) -> Result<(Option<BtreeHeader>, u64), TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
let root = self
.table_tree
.get_or_create_table::<K, V>(name, table_type)?;
self.open_tables
.insert(name.to_string(), panic::Location::caller());
Ok(root)
}
#[track_caller]
pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
&mut self,
transaction: &'txn WriteTransaction,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
debug!("Opening multimap table: {}", definition);
let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
transaction.dirty.store(true, Ordering::Release);
Ok(MultimapTable::new(
definition.name(),
root,
length,
transaction.freed_pages.clone(),
transaction.mem.clone(),
transaction,
))
}
#[track_caller]
pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
&mut self,
transaction: &'txn WriteTransaction,
definition: TableDefinition<K, V>,
) -> Result<Table<'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
debug!("Opening table: {}", definition);
let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
transaction.dirty.store(true, Ordering::Release);
Ok(Table::new(
definition.name(),
root,
transaction.freed_pages.clone(),
transaction.mem.clone(),
transaction,
))
}
#[track_caller]
fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
self.table_tree.delete_table(name, table_type)
}
#[track_caller]
fn delete_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
debug!("Deleting table: {}", name);
transaction.dirty.store(true, Ordering::Release);
self.inner_delete(name, TableType::Normal)
}
#[track_caller]
fn delete_multimap_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
debug!("Deleting multimap table: {}", name);
transaction.dirty.store(true, Ordering::Release);
self.inner_delete(name, TableType::Multimap)
}
pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
self.open_tables.remove(name).unwrap();
self.table_tree
.stage_update_table_root(name, table.get_root(), length);
}
}
pub struct WriteTransaction {
transaction_tracker: Arc<TransactionTracker>,
mem: Arc<TransactionalMemory>,
transaction_guard: Arc<TransactionGuard>,
transaction_id: TransactionId,
freed_tree: Mutex<BtreeMut<'static, FreedTableKey, FreedPageList<'static>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
tables: Mutex<TableNamespace<'static>>,
system_tables: Mutex<SystemNamespace<'static>>,
completed: bool,
dirty: AtomicBool,
durability: InternalDurability,
two_phase_commit: bool,
quick_repair: bool,
created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
}
impl WriteTransaction {
pub(crate) fn new(
guard: TransactionGuard,
transaction_tracker: Arc<TransactionTracker>,
mem: Arc<TransactionalMemory>,
) -> Result<Self> {
let transaction_id = guard.id();
let guard = Arc::new(guard);
let root_page = mem.get_data_root();
let system_page = mem.get_system_root();
let freed_root = mem.get_freed_root();
let freed_pages = Arc::new(Mutex::new(vec![]));
let post_commit_frees = Arc::new(Mutex::new(vec![]));
let tables = TableNamespace {
open_tables: Default::default(),
table_tree: TableTreeMut::new(
root_page,
guard.clone(),
mem.clone(),
freed_pages.clone(),
),
};
let system_tables = SystemNamespace {
table_tree: TableTreeMut::new(
system_page,
guard.clone(),
mem.clone(),
freed_pages.clone(),
),
transaction_guard: guard.clone(),
};
Ok(Self {
transaction_tracker,
mem: mem.clone(),
transaction_guard: guard.clone(),
transaction_id,
tables: Mutex::new(tables),
system_tables: Mutex::new(system_tables),
freed_tree: Mutex::new(BtreeMut::new(
freed_root,
guard,
mem,
post_commit_frees.clone(),
)),
freed_pages,
post_commit_frees,
completed: false,
dirty: AtomicBool::new(false),
durability: InternalDurability::Immediate,
two_phase_commit: false,
quick_repair: false,
created_persistent_savepoints: Mutex::new(Default::default()),
deleted_persistent_savepoints: Mutex::new(vec![]),
})
}
#[cfg(any(test, fuzzing))]
pub fn print_allocated_page_debug(&self) {
let mut all_allocated: HashSet<PageNumber> =
HashSet::from_iter(self.mem.all_allocated_pages());
let tracker = self.mem.tracker_page();
all_allocated.remove(&tracker);
println!("Tracker page");
println!("{tracker:?}");
let table_allocators = self
.tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()
.unwrap();
let mut table_pages = vec![];
for (i, allocator) in table_allocators.iter().enumerate() {
allocator.get_allocated_pages(i.try_into().unwrap(), &mut table_pages);
}
println!("Tables");
for p in table_pages {
all_allocated.remove(&p);
println!("{p:?}");
}
let system_table_allocators = self
.system_tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()
.unwrap();
let mut system_table_pages = vec![];
for (i, allocator) in system_table_allocators.iter().enumerate() {
allocator.get_allocated_pages(i.try_into().unwrap(), &mut system_table_pages);
}
println!("System tables");
for p in system_table_pages {
all_allocated.remove(&p);
println!("{p:?}");
}
println!("Free table");
if let Some(freed_iter) = self.freed_tree.lock().unwrap().all_pages_iter().unwrap() {
for p in freed_iter {
let p = p.unwrap();
all_allocated.remove(&p);
println!("{p:?}");
}
}
println!("Pending free (i.e. in freed table)");
for entry in self
.freed_tree
.lock()
.unwrap()
.range::<RangeFull, FreedTableKey>(&(..))
.unwrap()
{
let entry = entry.unwrap();
let value = entry.value();
for i in 0..value.len() {
let p = value.get(i);
all_allocated.remove(&p);
println!("{p:?}");
}
}
if !all_allocated.is_empty() {
println!("Leaked pages");
for p in all_allocated {
println!("{p:?}");
}
}
}
pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
if self.durability != InternalDurability::Immediate {
return Err(SavepointError::InvalidSavepoint);
}
let mut savepoint = self.ephemeral_savepoint()?;
let mut system_tables = self.system_tables.lock().unwrap();
let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
next_table.insert((), savepoint.get_id().next())?;
drop(next_table);
let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
savepoint_table.insert(
savepoint.get_id(),
SerializedSavepoint::from_savepoint(&savepoint),
)?;
savepoint.set_persistent();
self.created_persistent_savepoints
.lock()
.unwrap()
.insert(savepoint.get_id());
Ok(savepoint.get_id().0)
}
pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
self.transaction_guard.clone()
}
pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
let mut system_tables = self.system_tables.lock().unwrap();
let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
let value = next_table.get(())?;
if let Some(next_id) = value {
Ok(Some(next_id.value()))
} else {
Ok(None)
}
}
pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
let mut system_tables = self.system_tables.lock().unwrap();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let value = table.get(SavepointId(id))?;
value
.map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
.ok_or(SavepointError::InvalidSavepoint)
}
pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
if self.durability != InternalDurability::Immediate {
return Err(SavepointError::InvalidSavepoint);
}
let mut system_tables = self.system_tables.lock().unwrap();
let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let savepoint = table.remove(SavepointId(id))?;
if let Some(serialized) = savepoint {
let savepoint = serialized
.value()
.to_savepoint(self.transaction_tracker.clone());
self.deleted_persistent_savepoints
.lock()
.unwrap()
.push((savepoint.get_id(), savepoint.get_transaction_id()));
Ok(true)
} else {
Ok(false)
}
}
pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
let mut system_tables = self.system_tables.lock().unwrap();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let mut savepoints = vec![];
for savepoint in table.range::<SavepointId>(..)? {
savepoints.push(savepoint?.0.value().0);
}
Ok(savepoints.into_iter())
}
fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
let id = self
.transaction_tracker
.register_read_transaction(&self.mem)?;
Ok(TransactionGuard::new_read(
id,
self.transaction_tracker.clone(),
))
}
fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
let id = self.transaction_tracker.allocate_savepoint();
Ok((id, self.allocate_read_transaction()?.leak()))
}
pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
if self.dirty.load(Ordering::Acquire) {
return Err(SavepointError::InvalidSavepoint);
}
let (id, transaction_id) = self.allocate_savepoint()?;
#[cfg(feature = "logging")]
debug!(
"Creating savepoint id={:?}, txn_id={:?}",
id, transaction_id
);
let regional_allocators = self.mem.get_raw_allocator_states();
let root = self.mem.get_data_root();
let system_root = self.mem.get_system_root();
let freed_root = self.mem.get_freed_root();
let savepoint = Savepoint::new_ephemeral(
&self.mem,
self.transaction_tracker.clone(),
id,
transaction_id,
root,
system_root,
freed_root,
regional_allocators,
);
Ok(savepoint)
}
pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
assert_eq!(
std::ptr::from_ref(self.transaction_tracker.as_ref()),
savepoint.db_address()
);
if !self
.transaction_tracker
.is_valid_savepoint(savepoint.get_id())
{
return Err(SavepointError::InvalidSavepoint);
}
#[cfg(feature = "logging")]
debug!(
"Beginning savepoint restore (id={:?}) in transaction id={:?}",
savepoint.get_id(),
self.transaction_id
);
assert_eq!(self.mem.get_version(), savepoint.get_version());
self.dirty.store(true, Ordering::Release);
let old_table_tree = TableTreeMut::new(
savepoint.get_user_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
let current_root_pages = self
.tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()?;
let old_root_pages = old_table_tree.all_referenced_pages()?;
self.tables.lock().unwrap().table_tree = TableTreeMut::new(
savepoint.get_user_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
let mut txn_id = savepoint.get_transaction_id().raw_id();
let mut freed_tree = self.freed_tree.lock().unwrap();
loop {
let lower = FreedTableKey {
transaction_id: txn_id,
pagination_id: 0,
};
if freed_tree.range(&(lower..))?.next().is_none() {
break;
}
let lower = FreedTableKey {
transaction_id: txn_id,
pagination_id: 0,
};
let upper = FreedTableKey {
transaction_id: txn_id + 1,
pagination_id: 0,
};
let mut pending_pages = vec![];
for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
let item = entry?;
for i in 0..item.value().len() {
let p = item.value().get(i);
if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) {
pending_pages.push(p);
}
}
}
let mut pagination_counter = 0u64;
while !pending_pages.is_empty() {
let chunk_size = 100;
let buffer_size = FreedPageList::required_bytes(chunk_size);
let key = FreedTableKey {
transaction_id: txn_id,
pagination_id: pagination_counter,
};
let mut access_guard =
freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
let len = pending_pages.len();
access_guard.as_mut().clear();
for page in pending_pages.drain(len - min(len, chunk_size)..) {
access_guard.as_mut().push_back(page);
}
drop(access_guard);
pagination_counter += 1;
}
txn_id += 1;
}
let mut freed_pages = self.freed_pages.lock().unwrap();
for i in 0..current_root_pages.len() {
let mut pages = vec![];
current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages);
for page in pages {
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
}
}
drop(freed_pages);
self.transaction_tracker
.invalidate_savepoints_after(savepoint.get_id());
for persistent_savepoint in self.list_persistent_savepoints()? {
if persistent_savepoint > savepoint.get_id().0 {
self.delete_persistent_savepoint(persistent_savepoint)?;
}
}
Ok(())
}
pub fn set_durability(&mut self, durability: Durability) {
let no_created = self
.created_persistent_savepoints
.lock()
.unwrap()
.is_empty();
let no_deleted = self
.deleted_persistent_savepoints
.lock()
.unwrap()
.is_empty();
assert!(no_created && no_deleted);
self.durability = match durability {
Durability::None => InternalDurability::None,
Durability::Eventual => InternalDurability::Eventual,
Durability::Immediate => InternalDurability::Immediate,
#[allow(deprecated)]
Durability::Paranoid => {
self.set_two_phase_commit(true);
InternalDurability::Immediate
}
};
}
pub fn set_two_phase_commit(&mut self, enabled: bool) {
self.two_phase_commit = enabled;
}
pub fn set_quick_repair(&mut self, enabled: bool) {
self.quick_repair = enabled;
}
#[track_caller]
pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
&'txn self,
definition: TableDefinition<K, V>,
) -> Result<Table<'txn, K, V>, TableError> {
self.tables.lock().unwrap().open_table(self, definition)
}
#[track_caller]
pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
&'txn self,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'txn, K, V>, TableError> {
self.tables
.lock()
.unwrap()
.open_multimap_table(self, definition)
}
pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
&self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
self.tables.lock().unwrap().close_table(name, table, length);
}
pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables.lock().unwrap().delete_table(self, &name)
}
pub fn delete_multimap_table(
&self,
definition: impl MultimapTableHandle,
) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables
.lock()
.unwrap()
.delete_multimap_table(self, &name)
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
self.tables
.lock()
.unwrap()
.table_tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(
&self,
) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
self.tables
.lock()
.unwrap()
.table_tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
pub fn commit(mut self) -> Result<(), CommitError> {
self.completed = true;
self.commit_inner()
}
fn commit_inner(&mut self) -> Result<(), CommitError> {
if self.quick_repair {
self.two_phase_commit = true;
}
#[cfg(feature = "logging")]
debug!(
"Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
);
match self.durability {
InternalDurability::None => self.non_durable_commit()?,
InternalDurability::Eventual => self.durable_commit(true)?,
InternalDurability::Immediate => self.durable_commit(false)?,
}
for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
self.transaction_tracker
.deallocate_savepoint(*savepoint, *transaction);
}
#[cfg(feature = "logging")]
debug!(
"Finished commit of transaction id={:?}",
self.transaction_id
);
Ok(())
}
pub fn abort(mut self) -> Result {
self.completed = true;
self.abort_inner()
}
fn abort_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
debug!("Aborting transaction id={:?}", self.transaction_id);
for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
match self.delete_persistent_savepoint(savepoint.0) {
Ok(_) => {}
Err(err) => match err {
SavepointError::InvalidSavepoint => {
unreachable!();
}
SavepointError::Storage(storage_err) => {
return Err(storage_err);
}
},
}
}
self.tables
.lock()
.unwrap()
.table_tree
.clear_table_root_updates();
self.mem.rollback_uncommitted_writes()?;
#[cfg(feature = "logging")]
debug!("Finished abort of transaction id={:?}", self.transaction_id);
Ok(())
}
pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
let free_until_transaction = self
.transaction_tracker
.oldest_live_read_transaction()
.map_or(self.transaction_id, |x| x.next());
self.process_freed_pages(free_until_transaction)?;
let user_root = self
.tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?;
let mut system_tables = self.system_tables.lock().unwrap();
let system_tree = system_tables.table_tree.flush_table_root_updates()?;
system_tree
.delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
.map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
if self.quick_repair {
system_tree.create_table_and_flush_table_root(
ALLOCATOR_STATE_TABLE_NAME,
|tree: &mut AllocatorStateTree| {
let mut pagination_counter = 0;
loop {
let num_regions = self
.mem
.reserve_allocator_state(tree, self.transaction_id)?;
self.store_freed_pages(&mut pagination_counter, true)?;
if self.mem.try_save_allocator_state(tree, num_regions)? {
return Ok(());
}
while let Some(guards) = tree.last()? {
let key = guards.0.value();
drop(guards);
tree.remove(&key)?;
}
}
},
)?;
} else {
let savepoint_exists = self.transaction_tracker.any_savepoint_exists();
self.store_freed_pages(&mut 0, savepoint_exists)?;
}
let system_root = system_tree.finalize_dirty_checksums()?;
let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
self.mem.commit(
user_root,
system_root,
freed_root,
self.transaction_id,
eventual,
self.two_phase_commit,
)?;
self.transaction_tracker.clear_pending_non_durable_commits();
for page in self.post_commit_frees.lock().unwrap().drain(..) {
self.mem.free(page);
}
Ok(())
}
pub(crate) fn non_durable_commit(&mut self) -> Result {
let user_root = self
.tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?;
let system_root = self
.system_tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?;
self.store_freed_pages(&mut 0, true)?;
let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
self.mem
.non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
self.transaction_tracker
.register_non_durable_commit(self.transaction_id);
Ok(())
}
pub(crate) fn compact_pages(&mut self) -> Result<bool> {
let mut progress = false;
if self.mem.relocate_region_tracker()? {
progress = true;
}
let mut highest_pages = BTreeMap::new();
let mut tables = self.tables.lock().unwrap();
let table_tree = &mut tables.table_tree;
table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
let mut system_tables = self.system_tables.lock().unwrap();
let system_table_tree = &mut system_tables.table_tree;
system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
let mut relocation_map = HashMap::new();
for path in highest_pages.into_values().rev() {
if relocation_map.contains_key(&path.page_number()) {
continue;
}
let old_page = self.mem.get_page(path.page_number())?;
let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
let new_page_number = new_page.get_page_number();
new_page.memory_mut()[0] = old_page.memory()[0];
drop(new_page);
if new_page_number < path.page_number() {
relocation_map.insert(path.page_number(), new_page_number);
for parent in path.parents() {
if relocation_map.contains_key(parent) {
continue;
}
let old_parent = self.mem.get_page(*parent)?;
let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
let new_page_number = new_page.get_page_number();
new_page.memory_mut()[0] = old_parent.memory()[0];
drop(new_page);
relocation_map.insert(*parent, new_page_number);
}
} else {
self.mem.free(new_page_number);
break;
}
}
if !relocation_map.is_empty() {
progress = true;
}
table_tree.relocate_tables(&relocation_map)?;
system_table_tree.relocate_tables(&relocation_map)?;
Ok(progress)
}
fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let lookup_key = FreedTableKey {
transaction_id: free_until.raw_id(),
pagination_id: 0,
};
let mut to_remove = vec![];
let mut freed_tree = self.freed_tree.lock().unwrap();
for entry in freed_tree.range(&(..lookup_key))? {
let entry = entry?;
to_remove.push(entry.key());
let value = entry.value();
for i in 0..value.len() {
self.mem.free(value.get(i));
}
}
for key in to_remove {
freed_tree.remove(&key)?;
}
Ok(())
}
fn store_freed_pages(
&self,
pagination_counter: &mut u64,
include_post_commit_free: bool,
) -> Result {
assert_eq!(PageNumber::serialized_size(), 8); let mut freed_tree = self.freed_tree.lock().unwrap();
if include_post_commit_free {
self.freed_pages
.lock()
.unwrap()
.extend(self.post_commit_frees.lock().unwrap().drain(..));
}
while !self.freed_pages.lock().unwrap().is_empty() {
let chunk_size = 100;
let buffer_size = FreedPageList::required_bytes(chunk_size);
let key = FreedTableKey {
transaction_id: self.transaction_id.raw_id(),
pagination_id: *pagination_counter,
};
let mut access_guard =
freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
let mut freed_pages = self.freed_pages.lock().unwrap();
let len = freed_pages.len();
access_guard.as_mut().clear();
for page in freed_pages.drain(len - min(len, chunk_size)..) {
access_guard.as_mut().push_back(page);
}
drop(access_guard);
*pagination_counter += 1;
if include_post_commit_free {
freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
}
}
Ok(())
}
pub fn stats(&self) -> Result<DatabaseStats> {
let tables = self.tables.lock().unwrap();
let table_tree = &tables.table_tree;
let data_tree_stats = table_tree.stats()?;
let system_tables = self.system_tables.lock().unwrap();
let system_table_tree = &system_tables.table_tree;
let system_tree_stats = system_table_tree.stats()?;
let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
let total_metadata_bytes = data_tree_stats.metadata_bytes()
+ system_tree_stats.metadata_bytes
+ system_tree_stats.stored_leaf_bytes
+ freed_tree_stats.metadata_bytes
+ freed_tree_stats.stored_leaf_bytes;
let total_fragmented = data_tree_stats.fragmented_bytes()
+ system_tree_stats.fragmented_bytes
+ freed_tree_stats.fragmented_bytes;
Ok(DatabaseStats {
tree_height: data_tree_stats.tree_height(),
allocated_pages: self.mem.count_allocated_pages()?,
leaf_pages: data_tree_stats.leaf_pages(),
branch_pages: data_tree_stats.branch_pages(),
stored_leaf_bytes: data_tree_stats.stored_bytes(),
metadata_bytes: total_metadata_bytes,
fragmented_bytes: total_fragmented,
page_size: self.mem.get_page_size(),
})
}
#[cfg(any(test, fuzzing))]
pub fn num_region_tracker_pages(&self) -> u64 {
1 << self.mem.tracker_page().page_order
}
#[allow(dead_code)]
pub(crate) fn print_debug(&self) -> Result {
let mut tables = self.tables.lock().unwrap();
if let Some(page) = tables
.table_tree
.flush_table_root_updates()
.unwrap()
.finalize_dirty_checksums()
.unwrap()
{
eprintln!("Master tree:");
let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
Some(page),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
)?;
master_tree.print_debug(true)?;
}
Ok(())
}
}
impl Drop for WriteTransaction {
fn drop(&mut self) {
if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
#[allow(unused_variables)]
if let Err(error) = self.abort_inner() {
#[cfg(feature = "logging")]
warn!("Failure automatically aborting transaction: {}", error);
}
}
}
}
pub struct ReadTransaction {
mem: Arc<TransactionalMemory>,
tree: TableTree,
}
impl ReadTransaction {
pub(crate) fn new(
mem: Arc<TransactionalMemory>,
guard: TransactionGuard,
) -> Result<Self, TransactionError> {
let root_page = mem.get_data_root();
let guard = Arc::new(guard);
Ok(Self {
mem: mem.clone(),
tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
.map_err(TransactionError::Storage)?,
})
}
pub fn open_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: TableDefinition<K, V>,
) -> Result<ReadOnlyTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
match header {
InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
definition.name().to_string(),
table_root,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?),
InternalTableDefinition::Multimap { .. } => unreachable!(),
}
}
pub fn open_untyped_table(
&self,
handle: impl TableHandle,
) -> Result<ReadOnlyUntypedTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
match header {
InternalTableDefinition::Normal {
table_root,
fixed_key_size,
fixed_value_size,
..
} => Ok(ReadOnlyUntypedTable::new(
table_root,
fixed_key_size,
fixed_value_size,
self.mem.clone(),
)),
InternalTableDefinition::Multimap { .. } => unreachable!(),
}
}
pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
&self,
definition: MultimapTableDefinition<K, V>,
) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
match header {
InternalTableDefinition::Normal { .. } => unreachable!(),
InternalTableDefinition::Multimap {
table_root,
table_length,
..
} => Ok(ReadOnlyMultimapTable::new(
table_root,
table_length,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?),
}
}
pub fn open_untyped_multimap_table(
&self,
handle: impl MultimapTableHandle,
) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
match header {
InternalTableDefinition::Normal { .. } => unreachable!(),
InternalTableDefinition::Multimap {
table_root,
table_length,
fixed_key_size,
fixed_value_size,
..
} => Ok(ReadOnlyUntypedMultimapTable::new(
table_root,
table_length,
fixed_key_size,
fixed_value_size,
self.mem.clone(),
)),
}
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
self.tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
self.tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
pub fn close(self) -> Result<(), TransactionError> {
if Arc::strong_count(self.tree.transaction_guard()) > 1 {
return Err(TransactionError::ReadTransactionStillInUse(self));
}
Ok(())
}
}
impl Debug for ReadTransaction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("ReadTransaction")
}
}
#[cfg(test)]
mod test {
use crate::{Database, TableDefinition};
const X: TableDefinition<&str, &str> = TableDefinition::new("x");
#[test]
fn transaction_id_persistence() {
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world").unwrap();
}
let first_txn_id = write_txn.transaction_id;
write_txn.commit().unwrap();
drop(db);
let db2 = Database::create(tmpfile.path()).unwrap();
let write_txn = db2.begin_write().unwrap();
assert!(write_txn.transaction_id > first_txn_id);
}
}