arc_swap/debt/
list.rs

1//! A linked list of debt nodes.
2//!
3//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
4//! writer walks everything (but may also use some owned values).
5//!
6//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
7//! thread later on). This makes the implementation much simpler, since everything here is
8//! `'static` and we don't have to care about knowing when to free stuff.
9//!
10//! The nodes contain both the fast primary slots and a secondary fallback ones.
11//!
12//! # Synchronization
13//!
14//! We synchronize several things here.
15//!
16//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
17//! attempt to add another node). Note that certain parts never change after that (they aren't even
18//! atomic) and other things that do change take care of themselves (the debt slots have their own
19//! synchronization, etc).
20//!
21//! The ownership is acquire-release lock pattern.
22//!
23//! Similar, the counting of active writers is an acquire-release lock pattern.
24//!
25//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
26//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
27//! we know it must have happened since then.
28
29use core::cell::Cell;
30use core::ptr;
31use core::slice::Iter;
32use core::sync::atomic::Ordering::*;
33use core::sync::atomic::{AtomicPtr, AtomicUsize};
34
35#[cfg(feature = "experimental-thread-local")]
36use core::cell::OnceCell;
37
38use alloc::boxed::Box;
39
40use super::fast::{Local as FastLocal, Slots as FastSlots};
41use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
42use super::Debt;
43use crate::RefCnt;
44
45const NODE_UNUSED: usize = 0;
46const NODE_USED: usize = 1;
47const NODE_COOLDOWN: usize = 2;
48
49/// The head of the debt linked list.
50static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
51
52pub struct NodeReservation<'a>(&'a Node);
53
54impl Drop for NodeReservation<'_> {
55    fn drop(&mut self) {
56        self.0.active_writers.fetch_sub(1, Release);
57    }
58}
59
60/// One thread-local node for debts.
61#[repr(C, align(64))]
62pub(crate) struct Node {
63    fast: FastSlots,
64    helping: HelpingSlots,
65    in_use: AtomicUsize,
66    // Next node in the list.
67    //
68    // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
69    // synchronization, only manipulate the pointer itself). That is illegal according to strict
70    // interpretation of the rules by MIRI on references.
71    next: *const Node,
72    active_writers: AtomicUsize,
73}
74
75impl Default for Node {
76    fn default() -> Self {
77        Node {
78            fast: FastSlots::default(),
79            helping: HelpingSlots::default(),
80            in_use: AtomicUsize::new(NODE_USED),
81            next: ptr::null(),
82            active_writers: AtomicUsize::new(0),
83        }
84    }
85}
86
87impl Node {
88    /// Goes through the debt linked list.
89    ///
90    /// This traverses the linked list, calling the closure on each node. If the closure returns
91    /// `Some`, it terminates with that value early, otherwise it runs to the end.
92    pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
93        // Acquire ‒ we want to make sure we read the correct version of data at the end of the
94        // pointer. Any write to the DEBT_HEAD is with Release.
95        //
96        // Furthermore, we need to see the newest version of the list in case we examine the debts
97        // - if a new one is added recently, we don't want a stale read -> SeqCst.
98        //
99        // Note that the other pointers in the chain never change and are *ordinary* pointers. The
100        // whole linked list is synchronized through the head.
101        let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
102        while let Some(node) = current {
103            let result = f(node);
104            if result.is_some() {
105                return result;
106            }
107            current = unsafe { node.next.as_ref() };
108        }
109        None
110    }
111
112    /// Put the current thread node into cooldown
113    fn start_cooldown(&self) {
114        // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
115        // can properly release it below.
116        let _reservation = self.reserve_writer();
117        assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
118    }
119
120    /// Perform a cooldown if the node is ready.
121    ///
122    /// See the ABA protection at the [helping].
123    fn check_cooldown(&self) {
124        // Check if the node is in cooldown, for two reasons:
125        // * Skip most of nodes fast, without dealing with them.
126        // * More importantly, sync the value of active_writers to be at least the value when the
127        //   cooldown started. That way we know the 0 we observe happened some time after
128        //   start_cooldown.
129        if self.in_use.load(Acquire) == NODE_COOLDOWN {
130            // The rest can be nicely relaxed ‒ no memory is being synchronized by these
131            // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
132            // node later on.
133            if self.active_writers.load(Relaxed) == 0 {
134                let _ = self
135                    .in_use
136                    .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
137            }
138        }
139    }
140
141    /// Mark this node that a writer is currently playing with it.
142    pub fn reserve_writer(&self) -> NodeReservation {
143        self.active_writers.fetch_add(1, Acquire);
144        NodeReservation(self)
145    }
146
147    /// "Allocate" a node.
148    ///
149    /// Either a new one is created, or previous one is reused. The node is claimed to become
150    /// in_use.
151    fn get() -> &'static Self {
152        // Try to find an unused one in the chain and reuse it.
153        Self::traverse(|node| {
154            node.check_cooldown();
155            if node
156                .in_use
157                // We claim a unique control over the generation and the right to write to slots if
158                // they are NO_DEPT
159                .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
160                .is_ok()
161            {
162                Some(node)
163            } else {
164                None
165            }
166        })
167        // If that didn't work, create a new one and prepend to the list.
168        .unwrap_or_else(|| {
169            let node = Box::leak(Box::<Node>::default());
170            node.helping.init();
171            // We don't want to read any data in addition to the head, Relaxed is fine
172            // here.
173            //
174            // We do need to release the data to others, but for that, we acquire in the
175            // compare_exchange below.
176            let mut head = LIST_HEAD.load(Relaxed);
177            loop {
178                node.next = head;
179                if let Err(old) = LIST_HEAD.compare_exchange_weak(
180                    head, node,
181                    // We need to release *the whole chain* here. For that, we need to
182                    // acquire it first.
183                    //
184                    // SeqCst because we need to make sure it is properly set "before" we do
185                    // anything to the debts.
186                    SeqCst, Relaxed, // Nothing changed, go next round of the loop.
187                ) {
188                    head = old;
189                } else {
190                    return node;
191                }
192            }
193        })
194    }
195
196    /// Iterate over the fast slots.
197    pub(crate) fn fast_slots(&self) -> Iter<Debt> {
198        self.fast.into_iter()
199    }
200
201    /// Access the helping slot.
202    pub(crate) fn helping_slot(&self) -> &Debt {
203        self.helping.slot()
204    }
205}
206
207/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
208pub(crate) struct LocalNode {
209    /// Node for this thread, if any.
210    ///
211    /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
212    node: Cell<Option<&'static Node>>,
213
214    /// Thread-local data for the fast slots.
215    fast: FastLocal,
216
217    /// Thread local data for the helping strategy.
218    helping: HelpingLocal,
219}
220
221impl LocalNode {
222    #[cfg(not(feature = "experimental-thread-local"))]
223    pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
224        let f = Cell::new(Some(f));
225        THREAD_HEAD
226            .try_with(|head| {
227                if head.node.get().is_none() {
228                    head.node.set(Some(Node::get()));
229                }
230                let f = f.take().unwrap();
231                f(head)
232            })
233            // During the application shutdown, the thread local storage may be already
234            // deallocated. In that case, the above fails but we still need something. So we just
235            // find or allocate a node and use it just once.
236            //
237            // Note that the situation should be very very rare and not happen often, so the slower
238            // performance doesn't matter that much.
239            .unwrap_or_else(|_| {
240                let tmp_node = LocalNode {
241                    node: Cell::new(Some(Node::get())),
242                    fast: FastLocal::default(),
243                    helping: HelpingLocal::default(),
244                };
245                let f = f.take().unwrap();
246                f(&tmp_node)
247                // Drop of tmp_node -> sends the node we just used into cooldown.
248            })
249    }
250
251    #[cfg(feature = "experimental-thread-local")]
252    pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
253        let thread_head = THREAD_HEAD.get_or_init(|| LocalNode {
254            node: Cell::new(None),
255            fast: FastLocal::default(),
256            helping: HelpingLocal::default(),
257        });
258        if thread_head.node.get().is_none() {
259            thread_head.node.set(Some(Node::get()));
260        }
261        f(&thread_head)
262    }
263
264    /// Creates a new debt.
265    ///
266    /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
267    /// reference to that slot, or gives up with `None` if all the slots are currently full.
268    #[inline]
269    pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
270        let node = &self.node.get().expect("LocalNode::with ensures it is set");
271        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
272        node.fast.get_debt(ptr, &self.fast)
273    }
274
275    /// Initializes a helping slot transaction.
276    ///
277    /// Returns the generation (with tag).
278    pub(crate) fn new_helping(&self, ptr: usize) -> usize {
279        let node = &self.node.get().expect("LocalNode::with ensures it is set");
280        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
281        let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
282        if discard {
283            // Too many generations happened, make sure the writers give the poor node a break for
284            // a while so they don't observe the generation wrapping around.
285            node.start_cooldown();
286            self.node.take();
287        }
288        gen
289    }
290
291    /// Confirm the helping transaction.
292    ///
293    /// The generation comes from previous new_helping.
294    ///
295    /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
296    /// protected) address.
297    pub(crate) fn confirm_helping(
298        &self,
299        gen: usize,
300        ptr: usize,
301    ) -> Result<&'static Debt, (&'static Debt, usize)> {
302        let node = &self.node.get().expect("LocalNode::with ensures it is set");
303        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
304        let slot = node.helping_slot();
305        node.helping
306            .confirm(gen, ptr)
307            .map(|()| slot)
308            .map_err(|repl| (slot, repl))
309    }
310
311    /// The writer side of a helping slot.
312    ///
313    /// This potentially helps the `who` node (uses self as the local node, which must be
314    /// different) by loading the address that one is trying to load.
315    pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
316    where
317        T: RefCnt,
318        R: Fn() -> T,
319    {
320        let node = &self.node.get().expect("LocalNode::with ensures it is set");
321        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
322        node.helping.help(&who.helping, storage_addr, replacement)
323    }
324}
325
326impl Drop for LocalNode {
327    fn drop(&mut self) {
328        if let Some(node) = self.node.get() {
329            // Release - syncing writes/ownership of this Node
330            node.start_cooldown();
331        }
332    }
333}
334
335#[cfg(not(feature = "experimental-thread-local"))]
336thread_local! {
337    /// A debt node assigned to this thread.
338    static THREAD_HEAD: LocalNode = LocalNode {
339        node: Cell::new(None),
340        fast: FastLocal::default(),
341        helping: HelpingLocal::default(),
342    };
343}
344
345#[cfg(feature = "experimental-thread-local")]
346#[thread_local]
347/// A debt node assigned to this thread.
348static THREAD_HEAD: OnceCell<LocalNode> = OnceCell::new();
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    impl Node {
355        fn is_empty(&self) -> bool {
356            self.fast_slots()
357                .chain(core::iter::once(self.helping_slot()))
358                .all(|d| d.0.load(Relaxed) == Debt::NONE)
359        }
360
361        fn get_thread() -> &'static Self {
362            LocalNode::with(|h| h.node.get().unwrap())
363        }
364    }
365
366    /// A freshly acquired thread local node is empty.
367    #[test]
368    fn new_empty() {
369        assert!(Node::get_thread().is_empty());
370    }
371}