redb/
transaction_tracker.rs1use crate::tree_store::TransactionalMemory;
2use crate::{Key, Result, Savepoint, TypeName, Value};
3#[cfg(feature = "logging")]
4use log::debug;
5use std::cmp::Ordering;
6use std::collections::btree_map::BTreeMap;
7use std::collections::btree_set::BTreeSet;
8use std::mem::size_of;
9use std::sync::{Condvar, Mutex};
10
11#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
12pub(crate) struct TransactionId(u64);
13
14impl TransactionId {
15 pub(crate) fn new(value: u64) -> TransactionId {
16 Self(value)
17 }
18
19 pub(crate) fn raw_id(self) -> u64 {
20 self.0
21 }
22
23 pub(crate) fn next(self) -> TransactionId {
24 TransactionId(self.0 + 1)
25 }
26
27 pub(crate) fn increment(&mut self) -> TransactionId {
28 let next = self.next();
29 *self = next;
30 next
31 }
32
33 pub(crate) fn parent(self) -> Option<TransactionId> {
34 if self.0 == 0 {
35 None
36 } else {
37 Some(TransactionId(self.0 - 1))
38 }
39 }
40}
41
42#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
43pub(crate) struct SavepointId(pub u64);
44
45impl SavepointId {
46 pub(crate) fn next(self) -> SavepointId {
47 SavepointId(self.0 + 1)
48 }
49}
50
51impl Value for SavepointId {
52 type SelfType<'a> = SavepointId;
53 type AsBytes<'a> = [u8; size_of::<u64>()];
54
55 fn fixed_width() -> Option<usize> {
56 Some(size_of::<u64>())
57 }
58
59 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
60 where
61 Self: 'a,
62 {
63 SavepointId(u64::from_le_bytes(data.try_into().unwrap()))
64 }
65
66 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
67 where
68 Self: 'b,
69 {
70 value.0.to_le_bytes()
71 }
72
73 fn type_name() -> TypeName {
74 TypeName::internal("redb::SavepointId")
75 }
76}
77
78impl Key for SavepointId {
79 fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
80 Self::from_bytes(data1).0.cmp(&Self::from_bytes(data2).0)
81 }
82}
83
84struct State {
85 next_savepoint_id: SavepointId,
86 live_read_transactions: BTreeMap<TransactionId, u64>,
88 next_transaction_id: TransactionId,
89 live_write_transaction: Option<TransactionId>,
90 valid_savepoints: BTreeSet<SavepointId>,
91 pending_non_durable_commits: Vec<TransactionId>,
95}
96
97pub(crate) struct TransactionTracker {
98 state: Mutex<State>,
99 live_write_transaction_available: Condvar,
100}
101
102impl TransactionTracker {
103 pub(crate) fn new(next_transaction_id: TransactionId) -> Self {
104 Self {
105 state: Mutex::new(State {
106 next_savepoint_id: SavepointId(0),
107 live_read_transactions: Default::default(),
108 next_transaction_id,
109 live_write_transaction: None,
110 valid_savepoints: Default::default(),
111 pending_non_durable_commits: Default::default(),
112 }),
113 live_write_transaction_available: Condvar::new(),
114 }
115 }
116
117 pub(crate) fn start_write_transaction(&self) -> TransactionId {
118 let mut state = self.state.lock().unwrap();
119 while state.live_write_transaction.is_some() {
120 state = self.live_write_transaction_available.wait(state).unwrap();
121 }
122 assert!(state.live_write_transaction.is_none());
123 let transaction_id = state.next_transaction_id.increment();
124 #[cfg(feature = "logging")]
125 debug!("Beginning write transaction id={:?}", transaction_id);
126 state.live_write_transaction = Some(transaction_id);
127
128 transaction_id
129 }
130
131 pub(crate) fn end_write_transaction(&self, id: TransactionId) {
132 let mut state = self.state.lock().unwrap();
133 assert_eq!(state.live_write_transaction.unwrap(), id);
134 state.live_write_transaction = None;
135 self.live_write_transaction_available.notify_one();
136 }
137
138 pub(crate) fn clear_pending_non_durable_commits(&self) {
139 let mut state = self.state.lock().unwrap();
140 let ids: Vec<TransactionId> = state.pending_non_durable_commits.drain(..).collect();
141 for id in ids {
142 if let Some(parent) = id.parent() {
143 let ref_count = state.live_read_transactions.get_mut(&parent).unwrap();
144 *ref_count -= 1;
145 if *ref_count == 0 {
146 state.live_read_transactions.remove(&parent);
147 }
148 }
149 }
150 }
151
152 pub(crate) fn register_non_durable_commit(&self, id: TransactionId) {
153 let mut state = self.state.lock().unwrap();
154 if let Some(parent) = id.parent() {
155 state
156 .live_read_transactions
157 .entry(parent)
158 .and_modify(|x| *x += 1)
159 .or_insert(1);
160 }
161 state.pending_non_durable_commits.push(id);
162 }
163
164 pub(crate) fn restore_savepoint_counter_state(&self, next_savepoint: SavepointId) {
165 let mut state = self.state.lock().unwrap();
166 assert!(state.valid_savepoints.is_empty());
167 state.next_savepoint_id = next_savepoint;
168 }
169
170 pub(crate) fn register_persistent_savepoint(&self, savepoint: &Savepoint) {
171 let mut state = self.state.lock().unwrap();
172 state
173 .live_read_transactions
174 .entry(savepoint.get_transaction_id())
175 .and_modify(|x| *x += 1)
176 .or_insert(1);
177 state.valid_savepoints.insert(savepoint.get_id());
178 }
179
180 pub(crate) fn register_read_transaction(
181 &self,
182 mem: &TransactionalMemory,
183 ) -> Result<TransactionId> {
184 let mut state = self.state.lock().unwrap();
185 let id = mem.get_last_committed_transaction_id()?;
186 state
187 .live_read_transactions
188 .entry(id)
189 .and_modify(|x| *x += 1)
190 .or_insert(1);
191
192 Ok(id)
193 }
194
195 pub(crate) fn deallocate_read_transaction(&self, id: TransactionId) {
196 let mut state = self.state.lock().unwrap();
197 let ref_count = state.live_read_transactions.get_mut(&id).unwrap();
198 *ref_count -= 1;
199 if *ref_count == 0 {
200 state.live_read_transactions.remove(&id);
201 }
202 }
203
204 pub(crate) fn any_savepoint_exists(&self) -> bool {
205 !self.state.lock().unwrap().valid_savepoints.is_empty()
206 }
207
208 pub(crate) fn allocate_savepoint(&self) -> SavepointId {
209 let mut state = self.state.lock().unwrap();
210 let id = state.next_savepoint_id.next();
211 state.next_savepoint_id = id;
212 state.valid_savepoints.insert(id);
213 id
214 }
215
216 pub(crate) fn deallocate_savepoint(&self, savepoint: SavepointId, transaction: TransactionId) {
218 self.state
219 .lock()
220 .unwrap()
221 .valid_savepoints
222 .remove(&savepoint);
223 self.deallocate_read_transaction(transaction);
224 }
225
226 pub(crate) fn is_valid_savepoint(&self, id: SavepointId) -> bool {
227 self.state.lock().unwrap().valid_savepoints.contains(&id)
228 }
229
230 pub(crate) fn invalidate_savepoints_after(&self, id: SavepointId) {
231 self.state
232 .lock()
233 .unwrap()
234 .valid_savepoints
235 .retain(|x| *x <= id);
236 }
237
238 pub(crate) fn oldest_live_read_transaction(&self) -> Option<TransactionId> {
239 self.state
240 .lock()
241 .unwrap()
242 .live_read_transactions
243 .keys()
244 .next()
245 .copied()
246 }
247}