arc_swap/debt/list.rs
1//! A linked list of debt nodes.
2//!
3//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
4//! writer walks everything (but may also use some owned values).
5//!
6//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
7//! thread later on). This makes the implementation much simpler, since everything here is
8//! `'static` and we don't have to care about knowing when to free stuff.
9//!
10//! The nodes contain both the fast primary slots and a secondary fallback ones.
11//!
12//! # Synchronization
13//!
14//! We synchronize several things here.
15//!
16//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
17//! attempt to add another node). Note that certain parts never change after that (they aren't even
18//! atomic) and other things that do change take care of themselves (the debt slots have their own
19//! synchronization, etc).
20//!
21//! The ownership is acquire-release lock pattern.
22//!
23//! Similar, the counting of active writers is an acquire-release lock pattern.
24//!
25//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
26//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
27//! we know it must have happened since then.
28
29use core::cell::Cell;
30use core::ptr;
31use core::slice::Iter;
32use core::sync::atomic::Ordering::*;
33use core::sync::atomic::{AtomicPtr, AtomicUsize};
34
35#[cfg(feature = "experimental-thread-local")]
36use core::cell::OnceCell;
37
38use alloc::boxed::Box;
39
40use super::fast::{Local as FastLocal, Slots as FastSlots};
41use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
42use super::Debt;
43use crate::RefCnt;
44
45const NODE_UNUSED: usize = 0;
46const NODE_USED: usize = 1;
47const NODE_COOLDOWN: usize = 2;
48
49/// The head of the debt linked list.
50static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
51
52pub struct NodeReservation<'a>(&'a Node);
53
54impl Drop for NodeReservation<'_> {
55 fn drop(&mut self) {
56 self.0.active_writers.fetch_sub(1, Release);
57 }
58}
59
60/// One thread-local node for debts.
61#[repr(C, align(64))]
62pub(crate) struct Node {
63 fast: FastSlots,
64 helping: HelpingSlots,
65 in_use: AtomicUsize,
66 // Next node in the list.
67 //
68 // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
69 // synchronization, only manipulate the pointer itself). That is illegal according to strict
70 // interpretation of the rules by MIRI on references.
71 next: *const Node,
72 active_writers: AtomicUsize,
73}
74
75impl Default for Node {
76 fn default() -> Self {
77 Node {
78 fast: FastSlots::default(),
79 helping: HelpingSlots::default(),
80 in_use: AtomicUsize::new(NODE_USED),
81 next: ptr::null(),
82 active_writers: AtomicUsize::new(0),
83 }
84 }
85}
86
87impl Node {
88 /// Goes through the debt linked list.
89 ///
90 /// This traverses the linked list, calling the closure on each node. If the closure returns
91 /// `Some`, it terminates with that value early, otherwise it runs to the end.
92 pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
93 // Acquire ‒ we want to make sure we read the correct version of data at the end of the
94 // pointer. Any write to the DEBT_HEAD is with Release.
95 //
96 // Furthermore, we need to see the newest version of the list in case we examine the debts
97 // - if a new one is added recently, we don't want a stale read -> SeqCst.
98 //
99 // Note that the other pointers in the chain never change and are *ordinary* pointers. The
100 // whole linked list is synchronized through the head.
101 let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
102 while let Some(node) = current {
103 let result = f(node);
104 if result.is_some() {
105 return result;
106 }
107 current = unsafe { node.next.as_ref() };
108 }
109 None
110 }
111
112 /// Put the current thread node into cooldown
113 fn start_cooldown(&self) {
114 // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
115 // can properly release it below.
116 let _reservation = self.reserve_writer();
117 assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
118 }
119
120 /// Perform a cooldown if the node is ready.
121 ///
122 /// See the ABA protection at the [helping].
123 fn check_cooldown(&self) {
124 // Check if the node is in cooldown, for two reasons:
125 // * Skip most of nodes fast, without dealing with them.
126 // * More importantly, sync the value of active_writers to be at least the value when the
127 // cooldown started. That way we know the 0 we observe happened some time after
128 // start_cooldown.
129 if self.in_use.load(Acquire) == NODE_COOLDOWN {
130 // The rest can be nicely relaxed ‒ no memory is being synchronized by these
131 // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
132 // node later on.
133 if self.active_writers.load(Relaxed) == 0 {
134 let _ = self
135 .in_use
136 .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
137 }
138 }
139 }
140
141 /// Mark this node that a writer is currently playing with it.
142 pub fn reserve_writer(&self) -> NodeReservation {
143 self.active_writers.fetch_add(1, Acquire);
144 NodeReservation(self)
145 }
146
147 /// "Allocate" a node.
148 ///
149 /// Either a new one is created, or previous one is reused. The node is claimed to become
150 /// in_use.
151 fn get() -> &'static Self {
152 // Try to find an unused one in the chain and reuse it.
153 Self::traverse(|node| {
154 node.check_cooldown();
155 if node
156 .in_use
157 // We claim a unique control over the generation and the right to write to slots if
158 // they are NO_DEPT
159 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
160 .is_ok()
161 {
162 Some(node)
163 } else {
164 None
165 }
166 })
167 // If that didn't work, create a new one and prepend to the list.
168 .unwrap_or_else(|| {
169 let node = Box::leak(Box::<Node>::default());
170 node.helping.init();
171 // We don't want to read any data in addition to the head, Relaxed is fine
172 // here.
173 //
174 // We do need to release the data to others, but for that, we acquire in the
175 // compare_exchange below.
176 let mut head = LIST_HEAD.load(Relaxed);
177 loop {
178 node.next = head;
179 if let Err(old) = LIST_HEAD.compare_exchange_weak(
180 head, node,
181 // We need to release *the whole chain* here. For that, we need to
182 // acquire it first.
183 //
184 // SeqCst because we need to make sure it is properly set "before" we do
185 // anything to the debts.
186 SeqCst, Relaxed, // Nothing changed, go next round of the loop.
187 ) {
188 head = old;
189 } else {
190 return node;
191 }
192 }
193 })
194 }
195
196 /// Iterate over the fast slots.
197 pub(crate) fn fast_slots(&self) -> Iter<Debt> {
198 self.fast.into_iter()
199 }
200
201 /// Access the helping slot.
202 pub(crate) fn helping_slot(&self) -> &Debt {
203 self.helping.slot()
204 }
205}
206
207/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
208pub(crate) struct LocalNode {
209 /// Node for this thread, if any.
210 ///
211 /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
212 node: Cell<Option<&'static Node>>,
213
214 /// Thread-local data for the fast slots.
215 fast: FastLocal,
216
217 /// Thread local data for the helping strategy.
218 helping: HelpingLocal,
219}
220
221impl LocalNode {
222 #[cfg(not(feature = "experimental-thread-local"))]
223 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
224 let f = Cell::new(Some(f));
225 THREAD_HEAD
226 .try_with(|head| {
227 if head.node.get().is_none() {
228 head.node.set(Some(Node::get()));
229 }
230 let f = f.take().unwrap();
231 f(head)
232 })
233 // During the application shutdown, the thread local storage may be already
234 // deallocated. In that case, the above fails but we still need something. So we just
235 // find or allocate a node and use it just once.
236 //
237 // Note that the situation should be very very rare and not happen often, so the slower
238 // performance doesn't matter that much.
239 .unwrap_or_else(|_| {
240 let tmp_node = LocalNode {
241 node: Cell::new(Some(Node::get())),
242 fast: FastLocal::default(),
243 helping: HelpingLocal::default(),
244 };
245 let f = f.take().unwrap();
246 f(&tmp_node)
247 // Drop of tmp_node -> sends the node we just used into cooldown.
248 })
249 }
250
251 #[cfg(feature = "experimental-thread-local")]
252 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
253 let thread_head = THREAD_HEAD.get_or_init(|| LocalNode {
254 node: Cell::new(None),
255 fast: FastLocal::default(),
256 helping: HelpingLocal::default(),
257 });
258 if thread_head.node.get().is_none() {
259 thread_head.node.set(Some(Node::get()));
260 }
261 f(&thread_head)
262 }
263
264 /// Creates a new debt.
265 ///
266 /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
267 /// reference to that slot, or gives up with `None` if all the slots are currently full.
268 #[inline]
269 pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
270 let node = &self.node.get().expect("LocalNode::with ensures it is set");
271 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
272 node.fast.get_debt(ptr, &self.fast)
273 }
274
275 /// Initializes a helping slot transaction.
276 ///
277 /// Returns the generation (with tag).
278 pub(crate) fn new_helping(&self, ptr: usize) -> usize {
279 let node = &self.node.get().expect("LocalNode::with ensures it is set");
280 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
281 let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
282 if discard {
283 // Too many generations happened, make sure the writers give the poor node a break for
284 // a while so they don't observe the generation wrapping around.
285 node.start_cooldown();
286 self.node.take();
287 }
288 gen
289 }
290
291 /// Confirm the helping transaction.
292 ///
293 /// The generation comes from previous new_helping.
294 ///
295 /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
296 /// protected) address.
297 pub(crate) fn confirm_helping(
298 &self,
299 gen: usize,
300 ptr: usize,
301 ) -> Result<&'static Debt, (&'static Debt, usize)> {
302 let node = &self.node.get().expect("LocalNode::with ensures it is set");
303 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
304 let slot = node.helping_slot();
305 node.helping
306 .confirm(gen, ptr)
307 .map(|()| slot)
308 .map_err(|repl| (slot, repl))
309 }
310
311 /// The writer side of a helping slot.
312 ///
313 /// This potentially helps the `who` node (uses self as the local node, which must be
314 /// different) by loading the address that one is trying to load.
315 pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
316 where
317 T: RefCnt,
318 R: Fn() -> T,
319 {
320 let node = &self.node.get().expect("LocalNode::with ensures it is set");
321 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
322 node.helping.help(&who.helping, storage_addr, replacement)
323 }
324}
325
326impl Drop for LocalNode {
327 fn drop(&mut self) {
328 if let Some(node) = self.node.get() {
329 // Release - syncing writes/ownership of this Node
330 node.start_cooldown();
331 }
332 }
333}
334
335#[cfg(not(feature = "experimental-thread-local"))]
336thread_local! {
337 /// A debt node assigned to this thread.
338 static THREAD_HEAD: LocalNode = LocalNode {
339 node: Cell::new(None),
340 fast: FastLocal::default(),
341 helping: HelpingLocal::default(),
342 };
343}
344
345#[cfg(feature = "experimental-thread-local")]
346#[thread_local]
347/// A debt node assigned to this thread.
348static THREAD_HEAD: OnceCell<LocalNode> = OnceCell::new();
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 impl Node {
355 fn is_empty(&self) -> bool {
356 self.fast_slots()
357 .chain(core::iter::once(self.helping_slot()))
358 .all(|d| d.0.load(Relaxed) == Debt::NONE)
359 }
360
361 fn get_thread() -> &'static Self {
362 LocalNode::with(|h| h.node.get().unwrap())
363 }
364 }
365
366 /// A freshly acquired thread local node is empty.
367 #[test]
368 fn new_empty() {
369 assert!(Node::get_thread().is_empty());
370 }
371}