redb/
transaction_tracker.rs

1use crate::tree_store::TransactionalMemory;
2use crate::{Key, Result, Savepoint, TypeName, Value};
3#[cfg(feature = "logging")]
4use log::debug;
5use std::cmp::Ordering;
6use std::collections::btree_map::BTreeMap;
7use std::collections::btree_set::BTreeSet;
8use std::mem::size_of;
9use std::sync::{Condvar, Mutex};
10
11#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
12pub(crate) struct TransactionId(u64);
13
14impl TransactionId {
15    pub(crate) fn new(value: u64) -> TransactionId {
16        Self(value)
17    }
18
19    pub(crate) fn raw_id(self) -> u64 {
20        self.0
21    }
22
23    pub(crate) fn next(self) -> TransactionId {
24        TransactionId(self.0 + 1)
25    }
26
27    pub(crate) fn increment(&mut self) -> TransactionId {
28        let next = self.next();
29        *self = next;
30        next
31    }
32
33    pub(crate) fn parent(self) -> Option<TransactionId> {
34        if self.0 == 0 {
35            None
36        } else {
37            Some(TransactionId(self.0 - 1))
38        }
39    }
40}
41
42#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
43pub(crate) struct SavepointId(pub u64);
44
45impl SavepointId {
46    pub(crate) fn next(self) -> SavepointId {
47        SavepointId(self.0 + 1)
48    }
49}
50
51impl Value for SavepointId {
52    type SelfType<'a> = SavepointId;
53    type AsBytes<'a> = [u8; size_of::<u64>()];
54
55    fn fixed_width() -> Option<usize> {
56        Some(size_of::<u64>())
57    }
58
59    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
60    where
61        Self: 'a,
62    {
63        SavepointId(u64::from_le_bytes(data.try_into().unwrap()))
64    }
65
66    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
67    where
68        Self: 'b,
69    {
70        value.0.to_le_bytes()
71    }
72
73    fn type_name() -> TypeName {
74        TypeName::internal("redb::SavepointId")
75    }
76}
77
78impl Key for SavepointId {
79    fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
80        Self::from_bytes(data1).0.cmp(&Self::from_bytes(data2).0)
81    }
82}
83
84struct State {
85    next_savepoint_id: SavepointId,
86    // reference count of read transactions per transaction id
87    live_read_transactions: BTreeMap<TransactionId, u64>,
88    next_transaction_id: TransactionId,
89    live_write_transaction: Option<TransactionId>,
90    valid_savepoints: BTreeSet<SavepointId>,
91    // Non-durable commits that are still in-memory, and waiting for a durable commit to get flushed
92    // We need to make sure that the freed-table does not get processed for these, since they are not durable yet
93    // Therefore, we hold a read transaction on their parent
94    pending_non_durable_commits: Vec<TransactionId>,
95}
96
97pub(crate) struct TransactionTracker {
98    state: Mutex<State>,
99    live_write_transaction_available: Condvar,
100}
101
102impl TransactionTracker {
103    pub(crate) fn new(next_transaction_id: TransactionId) -> Self {
104        Self {
105            state: Mutex::new(State {
106                next_savepoint_id: SavepointId(0),
107                live_read_transactions: Default::default(),
108                next_transaction_id,
109                live_write_transaction: None,
110                valid_savepoints: Default::default(),
111                pending_non_durable_commits: Default::default(),
112            }),
113            live_write_transaction_available: Condvar::new(),
114        }
115    }
116
117    pub(crate) fn start_write_transaction(&self) -> TransactionId {
118        let mut state = self.state.lock().unwrap();
119        while state.live_write_transaction.is_some() {
120            state = self.live_write_transaction_available.wait(state).unwrap();
121        }
122        assert!(state.live_write_transaction.is_none());
123        let transaction_id = state.next_transaction_id.increment();
124        #[cfg(feature = "logging")]
125        debug!("Beginning write transaction id={:?}", transaction_id);
126        state.live_write_transaction = Some(transaction_id);
127
128        transaction_id
129    }
130
131    pub(crate) fn end_write_transaction(&self, id: TransactionId) {
132        let mut state = self.state.lock().unwrap();
133        assert_eq!(state.live_write_transaction.unwrap(), id);
134        state.live_write_transaction = None;
135        self.live_write_transaction_available.notify_one();
136    }
137
138    pub(crate) fn clear_pending_non_durable_commits(&self) {
139        let mut state = self.state.lock().unwrap();
140        let ids: Vec<TransactionId> = state.pending_non_durable_commits.drain(..).collect();
141        for id in ids {
142            if let Some(parent) = id.parent() {
143                let ref_count = state.live_read_transactions.get_mut(&parent).unwrap();
144                *ref_count -= 1;
145                if *ref_count == 0 {
146                    state.live_read_transactions.remove(&parent);
147                }
148            }
149        }
150    }
151
152    pub(crate) fn register_non_durable_commit(&self, id: TransactionId) {
153        let mut state = self.state.lock().unwrap();
154        if let Some(parent) = id.parent() {
155            state
156                .live_read_transactions
157                .entry(parent)
158                .and_modify(|x| *x += 1)
159                .or_insert(1);
160        }
161        state.pending_non_durable_commits.push(id);
162    }
163
164    pub(crate) fn restore_savepoint_counter_state(&self, next_savepoint: SavepointId) {
165        let mut state = self.state.lock().unwrap();
166        assert!(state.valid_savepoints.is_empty());
167        state.next_savepoint_id = next_savepoint;
168    }
169
170    pub(crate) fn register_persistent_savepoint(&self, savepoint: &Savepoint) {
171        let mut state = self.state.lock().unwrap();
172        state
173            .live_read_transactions
174            .entry(savepoint.get_transaction_id())
175            .and_modify(|x| *x += 1)
176            .or_insert(1);
177        state.valid_savepoints.insert(savepoint.get_id());
178    }
179
180    pub(crate) fn register_read_transaction(
181        &self,
182        mem: &TransactionalMemory,
183    ) -> Result<TransactionId> {
184        let mut state = self.state.lock().unwrap();
185        let id = mem.get_last_committed_transaction_id()?;
186        state
187            .live_read_transactions
188            .entry(id)
189            .and_modify(|x| *x += 1)
190            .or_insert(1);
191
192        Ok(id)
193    }
194
195    pub(crate) fn deallocate_read_transaction(&self, id: TransactionId) {
196        let mut state = self.state.lock().unwrap();
197        let ref_count = state.live_read_transactions.get_mut(&id).unwrap();
198        *ref_count -= 1;
199        if *ref_count == 0 {
200            state.live_read_transactions.remove(&id);
201        }
202    }
203
204    pub(crate) fn any_savepoint_exists(&self) -> bool {
205        !self.state.lock().unwrap().valid_savepoints.is_empty()
206    }
207
208    pub(crate) fn allocate_savepoint(&self) -> SavepointId {
209        let mut state = self.state.lock().unwrap();
210        let id = state.next_savepoint_id.next();
211        state.next_savepoint_id = id;
212        state.valid_savepoints.insert(id);
213        id
214    }
215
216    // Deallocates the given savepoint and its matching reference count on the transcation
217    pub(crate) fn deallocate_savepoint(&self, savepoint: SavepointId, transaction: TransactionId) {
218        self.state
219            .lock()
220            .unwrap()
221            .valid_savepoints
222            .remove(&savepoint);
223        self.deallocate_read_transaction(transaction);
224    }
225
226    pub(crate) fn is_valid_savepoint(&self, id: SavepointId) -> bool {
227        self.state.lock().unwrap().valid_savepoints.contains(&id)
228    }
229
230    pub(crate) fn invalidate_savepoints_after(&self, id: SavepointId) {
231        self.state
232            .lock()
233            .unwrap()
234            .valid_savepoints
235            .retain(|x| *x <= id);
236    }
237
238    pub(crate) fn oldest_live_read_transaction(&self) -> Option<TransactionId> {
239        self.state
240            .lock()
241            .unwrap()
242            .live_read_transactions
243            .keys()
244            .next()
245            .copied()
246    }
247}