sharded_slab/
shard.rs

1use crate::{
2    cfg::{self, CfgPrivate},
3    clear::Clear,
4    page,
5    sync::{
6        alloc,
7        atomic::{
8            AtomicPtr, AtomicUsize,
9            Ordering::{self, *},
10        },
11    },
12    tid::Tid,
13    Pack,
14};
15
16use std::{fmt, ptr, slice};
17
18// ┌─────────────┐      ┌────────┐
19// │ page 1      │      │        │
20// ├─────────────┤ ┌───▶│  next──┼─┐
21// │ page 2      │ │    ├────────┤ │
22// │             │ │    │XXXXXXXX│ │
23// │ local_free──┼─┘    ├────────┤ │
24// │ global_free─┼─┐    │        │◀┘
25// ├─────────────┤ └───▶│  next──┼─┐
26// │   page 3    │      ├────────┤ │
27// └─────────────┘      │XXXXXXXX│ │
28//       ...            ├────────┤ │
29// ┌─────────────┐      │XXXXXXXX│ │
30// │ page n      │      ├────────┤ │
31// └─────────────┘      │        │◀┘
32//                      │  next──┼───▶
33//                      ├────────┤
34//                      │XXXXXXXX│
35//                      └────────┘
36//                         ...
37pub(crate) struct Shard<T, C: cfg::Config> {
38    /// The shard's parent thread ID.
39    pub(crate) tid: usize,
40    /// The local free list for each page.
41    ///
42    /// These are only ever accessed from this shard's thread, so they are
43    /// stored separately from the shared state for the page that can be
44    /// accessed concurrently, to minimize false sharing.
45    local: Box<[page::Local]>,
46    /// The shared state for each page in this shard.
47    ///
48    /// This consists of the page's metadata (size, previous size), remote free
49    /// list, and a pointer to the actual array backing that page.
50    shared: Box<[page::Shared<T, C>]>,
51}
52
53pub(crate) struct Array<T, C: cfg::Config> {
54    shards: Box<[Ptr<T, C>]>,
55    max: AtomicUsize,
56}
57
58#[derive(Debug)]
59struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
60
61#[derive(Debug)]
62pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
63
64// === impl Shard ===
65
66impl<T, C> Shard<T, C>
67where
68    C: cfg::Config,
69{
70    #[inline(always)]
71    pub(crate) fn with_slot<'a, U>(
72        &'a self,
73        idx: usize,
74        f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
75    ) -> Option<U> {
76        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
77        let (addr, page_index) = page::indices::<C>(idx);
78
79        test_println!("-> {:?}", addr);
80        if page_index >= self.shared.len() {
81            return None;
82        }
83
84        self.shared[page_index].with_slot(addr, f)
85    }
86
87    pub(crate) fn new(tid: usize) -> Self {
88        let mut total_sz = 0;
89        let shared = (0..C::MAX_PAGES)
90            .map(|page_num| {
91                let sz = C::page_size(page_num);
92                let prev_sz = total_sz;
93                total_sz += sz;
94                page::Shared::new(sz, prev_sz)
95            })
96            .collect();
97        let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
98        Self { tid, local, shared }
99    }
100}
101
102impl<T, C> Shard<Option<T>, C>
103where
104    C: cfg::Config,
105{
106    /// Remove an item on the shard's local thread.
107    pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
108        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
109        let (addr, page_index) = page::indices::<C>(idx);
110
111        test_println!("-> remove_local {:?}", addr);
112
113        self.shared
114            .get(page_index)?
115            .take(addr, C::unpack_gen(idx), self.local(page_index))
116    }
117
118    /// Remove an item, while on a different thread from the shard's local thread.
119    pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
120        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
121        debug_assert!(Tid::<C>::current().as_usize() != self.tid);
122
123        let (addr, page_index) = page::indices::<C>(idx);
124
125        test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
126
127        let shared = self.shared.get(page_index)?;
128        shared.take(addr, C::unpack_gen(idx), shared.free_list())
129    }
130
131    pub(crate) fn remove_local(&self, idx: usize) -> bool {
132        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
133        let (addr, page_index) = page::indices::<C>(idx);
134
135        if page_index >= self.shared.len() {
136            return false;
137        }
138
139        self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
140    }
141
142    pub(crate) fn remove_remote(&self, idx: usize) -> bool {
143        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
144        let (addr, page_index) = page::indices::<C>(idx);
145
146        if page_index >= self.shared.len() {
147            return false;
148        }
149
150        let shared = &self.shared[page_index];
151        shared.remove(addr, C::unpack_gen(idx), shared.free_list())
152    }
153
154    pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
155        self.shared.iter()
156    }
157}
158
159impl<T, C> Shard<T, C>
160where
161    T: Clear + Default,
162    C: cfg::Config,
163{
164    pub(crate) fn init_with<U>(
165        &self,
166        mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
167    ) -> Option<U> {
168        // Can we fit the value into an exist`ing page?
169        for (page_idx, page) in self.shared.iter().enumerate() {
170            let local = self.local(page_idx);
171
172            test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
173
174            if let Some(res) = page.init_with(local, &mut init) {
175                return Some(res);
176            }
177        }
178
179        None
180    }
181
182    pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
183        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
184        let (addr, page_index) = page::indices::<C>(idx);
185
186        if page_index >= self.shared.len() {
187            return false;
188        }
189
190        self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
191    }
192
193    pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
194        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
195        let (addr, page_index) = page::indices::<C>(idx);
196
197        if page_index >= self.shared.len() {
198            return false;
199        }
200
201        let shared = &self.shared[page_index];
202        shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
203    }
204
205    pub(crate) fn clear_after_release(&self, idx: usize) {
206        crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
207        let tid = Tid::<C>::current().as_usize();
208        test_println!(
209            "-> clear_after_release; self.tid={:?}; current.tid={:?};",
210            tid,
211            self.tid
212        );
213        if tid == self.tid {
214            self.clear_local(idx);
215        } else {
216            self.clear_remote(idx);
217        }
218    }
219
220    fn clear_local(&self, idx: usize) -> bool {
221        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
222        let (addr, page_index) = page::indices::<C>(idx);
223
224        if page_index >= self.shared.len() {
225            return false;
226        }
227
228        self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
229    }
230
231    fn clear_remote(&self, idx: usize) -> bool {
232        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
233        let (addr, page_index) = page::indices::<C>(idx);
234
235        if page_index >= self.shared.len() {
236            return false;
237        }
238
239        let shared = &self.shared[page_index];
240        shared.clear(addr, C::unpack_gen(idx), shared.free_list())
241    }
242
243    #[inline(always)]
244    fn local(&self, i: usize) -> &page::Local {
245        #[cfg(debug_assertions)]
246        debug_assert_eq_in_drop!(
247            Tid::<C>::current().as_usize(),
248            self.tid,
249            "tried to access local data from another thread!"
250        );
251
252        &self.local[i]
253    }
254}
255
256impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        let mut d = f.debug_struct("Shard");
259
260        #[cfg(debug_assertions)]
261        d.field("tid", &self.tid);
262        d.field("shared", &self.shared).finish()
263    }
264}
265
266// === impl Array ===
267
268impl<T, C> Array<T, C>
269where
270    C: cfg::Config,
271{
272    pub(crate) fn new() -> Self {
273        let mut shards = Vec::with_capacity(C::MAX_SHARDS);
274        for _ in 0..C::MAX_SHARDS {
275            // XXX(eliza): T_T this could be avoided with maybeuninit or something...
276            shards.push(Ptr::null());
277        }
278        Self {
279            shards: shards.into(),
280            max: AtomicUsize::new(0),
281        }
282    }
283
284    #[inline]
285    pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
286        test_println!("-> get shard={}", idx);
287        self.shards.get(idx)?.load(Acquire)
288    }
289
290    #[inline]
291    pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
292        let tid = Tid::<C>::current();
293        test_println!("current: {:?}", tid);
294        let idx = tid.as_usize();
295        assert!(
296            idx < self.shards.len(),
297            "Thread count overflowed the configured max count. \
298            Thread index = {}, max threads = {}.",
299            idx,
300            C::MAX_SHARDS,
301        );
302        // It's okay for this to be relaxed. The value is only ever stored by
303        // the thread that corresponds to the index, and we are that thread.
304        let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
305            let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
306            test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
307            self.shards[idx].set(ptr);
308            let mut max = self.max.load(Acquire);
309            while max < idx {
310                match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
311                    Ok(_) => break,
312                    Err(actual) => max = actual,
313                }
314            }
315            test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
316            unsafe {
317                // Safety: we just put it there!
318                &*ptr
319            }
320            .get_ref()
321        });
322        (tid, shard)
323    }
324
325    pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
326        test_println!("Array::iter_mut");
327        let max = self.max.load(Acquire);
328        test_println!("-> highest index={}", max);
329        IterMut(self.shards[0..=max].iter_mut())
330    }
331}
332
333impl<T, C: cfg::Config> Drop for Array<T, C> {
334    fn drop(&mut self) {
335        // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
336        let max = self.max.load(Acquire);
337        for shard in &self.shards[0..=max] {
338            // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
339            let ptr = shard.0.load(Acquire);
340            if ptr.is_null() {
341                continue;
342            }
343            let shard = unsafe {
344                // Safety: this is the only place where these boxes are
345                // deallocated, and we have exclusive access to the shard array,
346                // because...we are dropping it...
347                Box::from_raw(ptr)
348            };
349            drop(shard)
350        }
351    }
352}
353
354impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        let max = self.max.load(Acquire);
357        let mut set = f.debug_map();
358        for shard in &self.shards[0..=max] {
359            let ptr = shard.0.load(Acquire);
360            if let Some(shard) = ptr::NonNull::new(ptr) {
361                set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
362            } else {
363                set.entry(&format_args!("{:p}", ptr), &());
364            }
365        }
366        set.finish()
367    }
368}
369
370// === impl Ptr ===
371
372impl<T, C: cfg::Config> Ptr<T, C> {
373    #[inline]
374    fn null() -> Self {
375        Self(AtomicPtr::new(ptr::null_mut()))
376    }
377
378    #[inline]
379    fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
380        let ptr = self.0.load(order);
381        test_println!("---> loaded={:p} (order={:?})", ptr, order);
382        if ptr.is_null() {
383            test_println!("---> null");
384            return None;
385        }
386        let track = unsafe {
387            // Safety: The returned reference will have the same lifetime as the
388            // reference to the shard pointer, which (morally, if not actually)
389            // owns the shard. The shard is only deallocated when the shard
390            // array is dropped, and it won't be dropped while this pointer is
391            // borrowed --- and the returned reference has the same lifetime.
392            //
393            // We know that the pointer is not null, because we just
394            // null-checked it immediately prior.
395            &*ptr
396        };
397
398        Some(track.get_ref())
399    }
400
401    #[inline]
402    fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
403        self.0
404            .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
405            .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
406    }
407}
408
409// === Iterators ===
410
411impl<'a, T, C> Iterator for IterMut<'a, T, C>
412where
413    T: 'a,
414    C: cfg::Config + 'a,
415{
416    type Item = &'a Shard<T, C>;
417    fn next(&mut self) -> Option<Self::Item> {
418        test_println!("IterMut::next");
419        loop {
420            // Skip over empty indices if they are less than the highest
421            // allocated shard. Some threads may have accessed the slab
422            // (generating a thread ID) but never actually inserted data, so
423            // they may have never allocated a shard.
424            let next = self.0.next();
425            test_println!("-> next.is_some={}", next.is_some());
426            if let Some(shard) = next?.load(Acquire) {
427                test_println!("-> done");
428                return Some(shard);
429            }
430        }
431    }
432}