crossbeam_queue/
seg_queue.rs

1use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2use alloc::boxed::Box;
3use core::cell::UnsafeCell;
4use core::fmt;
5use core::marker::PhantomData;
6use core::mem::MaybeUninit;
7use core::panic::{RefUnwindSafe, UnwindSafe};
8use core::ptr;
9use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10
11use crossbeam_utils::{Backoff, CachePadded};
12
13// Bits indicating the state of a slot:
14// * If a value has been written into the slot, `WRITE` is set.
15// * If a value has been read from the slot, `READ` is set.
16// * If the block is being destroyed, `DESTROY` is set.
17const WRITE: usize = 1;
18const READ: usize = 2;
19const DESTROY: usize = 4;
20
21// Each block covers one "lap" of indices.
22const LAP: usize = 32;
23// The maximum number of values a block can hold.
24const BLOCK_CAP: usize = LAP - 1;
25// How many lower bits are reserved for metadata.
26const SHIFT: usize = 1;
27// Indicates that the block is not the last one.
28const HAS_NEXT: usize = 1;
29
30/// A slot in a block.
31struct Slot<T> {
32    /// The value.
33    value: UnsafeCell<MaybeUninit<T>>,
34
35    /// The state of the slot.
36    state: AtomicUsize,
37}
38
39impl<T> Slot<T> {
40    /// Waits until a value is written into the slot.
41    fn wait_write(&self) {
42        let backoff = Backoff::new();
43        while self.state.load(Ordering::Acquire) & WRITE == 0 {
44            backoff.snooze();
45        }
46    }
47}
48
49/// A block in a linked list.
50///
51/// Each block in the list can hold up to `BLOCK_CAP` values.
52struct Block<T> {
53    /// The next block in the linked list.
54    next: AtomicPtr<Block<T>>,
55
56    /// Slots for values.
57    slots: [Slot<T>; BLOCK_CAP],
58}
59
60impl<T> Block<T> {
61    const LAYOUT: Layout = {
62        let layout = Layout::new::<Self>();
63        assert!(
64            layout.size() != 0,
65            "Block should never be zero-sized, as it has an AtomicPtr field"
66        );
67        layout
68    };
69
70    /// Creates an empty block.
71    fn new() -> Box<Self> {
72        // SAFETY: layout is not zero-sized
73        let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
74        // Handle allocation failure
75        if ptr.is_null() {
76            handle_alloc_error(Self::LAYOUT)
77        }
78        // SAFETY: This is safe because:
79        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
80        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
81        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
82        //       holds a MaybeUninit.
83        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
84        // TODO: unsafe { Box::new_zeroed().assume_init() }
85        unsafe { Box::from_raw(ptr.cast()) }
86    }
87
88    /// Waits until the next pointer is set.
89    fn wait_next(&self) -> *mut Block<T> {
90        let backoff = Backoff::new();
91        loop {
92            let next = self.next.load(Ordering::Acquire);
93            if !next.is_null() {
94                return next;
95            }
96            backoff.snooze();
97        }
98    }
99
100    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
101    unsafe fn destroy(this: *mut Block<T>, start: usize) {
102        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
103        // begun destruction of the block.
104        for i in start..BLOCK_CAP - 1 {
105            let slot = (*this).slots.get_unchecked(i);
106
107            // Mark the `DESTROY` bit if a thread is still using the slot.
108            if slot.state.load(Ordering::Acquire) & READ == 0
109                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
110            {
111                // If a thread is still using the slot, it will continue destruction of the block.
112                return;
113            }
114        }
115
116        // No thread is using the block, now it is safe to destroy it.
117        drop(Box::from_raw(this));
118    }
119}
120
121/// A position in a queue.
122struct Position<T> {
123    /// The index in the queue.
124    index: AtomicUsize,
125
126    /// The block in the linked list.
127    block: AtomicPtr<Block<T>>,
128}
129
130/// An unbounded multi-producer multi-consumer queue.
131///
132/// This queue is implemented as a linked list of segments, where each segment is a small buffer
133/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
134/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
135/// this queue is somewhat slower than [`ArrayQueue`].
136///
137/// [`ArrayQueue`]: super::ArrayQueue
138///
139/// # Examples
140///
141/// ```
142/// use crossbeam_queue::SegQueue;
143///
144/// let q = SegQueue::new();
145///
146/// q.push('a');
147/// q.push('b');
148///
149/// assert_eq!(q.pop(), Some('a'));
150/// assert_eq!(q.pop(), Some('b'));
151/// assert!(q.pop().is_none());
152/// ```
153pub struct SegQueue<T> {
154    /// The head of the queue.
155    head: CachePadded<Position<T>>,
156
157    /// The tail of the queue.
158    tail: CachePadded<Position<T>>,
159
160    /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
161    _marker: PhantomData<T>,
162}
163
164unsafe impl<T: Send> Send for SegQueue<T> {}
165unsafe impl<T: Send> Sync for SegQueue<T> {}
166
167impl<T> UnwindSafe for SegQueue<T> {}
168impl<T> RefUnwindSafe for SegQueue<T> {}
169
170impl<T> SegQueue<T> {
171    /// Creates a new unbounded queue.
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use crossbeam_queue::SegQueue;
177    ///
178    /// let q = SegQueue::<i32>::new();
179    /// ```
180    pub const fn new() -> SegQueue<T> {
181        SegQueue {
182            head: CachePadded::new(Position {
183                block: AtomicPtr::new(ptr::null_mut()),
184                index: AtomicUsize::new(0),
185            }),
186            tail: CachePadded::new(Position {
187                block: AtomicPtr::new(ptr::null_mut()),
188                index: AtomicUsize::new(0),
189            }),
190            _marker: PhantomData,
191        }
192    }
193
194    /// Pushes back an element to the tail.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// use crossbeam_queue::SegQueue;
200    ///
201    /// let q = SegQueue::new();
202    ///
203    /// q.push(10);
204    /// q.push(20);
205    /// ```
206    pub fn push(&self, value: T) {
207        let backoff = Backoff::new();
208        let mut tail = self.tail.index.load(Ordering::Acquire);
209        let mut block = self.tail.block.load(Ordering::Acquire);
210        let mut next_block = None;
211
212        loop {
213            // Calculate the offset of the index into the block.
214            let offset = (tail >> SHIFT) % LAP;
215
216            // If we reached the end of the block, wait until the next one is installed.
217            if offset == BLOCK_CAP {
218                backoff.snooze();
219                tail = self.tail.index.load(Ordering::Acquire);
220                block = self.tail.block.load(Ordering::Acquire);
221                continue;
222            }
223
224            // If we're going to have to install the next block, allocate it in advance in order to
225            // make the wait for other threads as short as possible.
226            if offset + 1 == BLOCK_CAP && next_block.is_none() {
227                next_block = Some(Block::<T>::new());
228            }
229
230            // If this is the first push operation, we need to allocate the first block.
231            if block.is_null() {
232                let new = Box::into_raw(Block::<T>::new());
233
234                if self
235                    .tail
236                    .block
237                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238                    .is_ok()
239                {
240                    self.head.block.store(new, Ordering::Release);
241                    block = new;
242                } else {
243                    next_block = unsafe { Some(Box::from_raw(new)) };
244                    tail = self.tail.index.load(Ordering::Acquire);
245                    block = self.tail.block.load(Ordering::Acquire);
246                    continue;
247                }
248            }
249
250            let new_tail = tail + (1 << SHIFT);
251
252            // Try advancing the tail forward.
253            match self.tail.index.compare_exchange_weak(
254                tail,
255                new_tail,
256                Ordering::SeqCst,
257                Ordering::Acquire,
258            ) {
259                Ok(_) => unsafe {
260                    // If we've reached the end of the block, install the next one.
261                    if offset + 1 == BLOCK_CAP {
262                        let next_block = Box::into_raw(next_block.unwrap());
263                        let next_index = new_tail.wrapping_add(1 << SHIFT);
264
265                        self.tail.block.store(next_block, Ordering::Release);
266                        self.tail.index.store(next_index, Ordering::Release);
267                        (*block).next.store(next_block, Ordering::Release);
268                    }
269
270                    // Write the value into the slot.
271                    let slot = (*block).slots.get_unchecked(offset);
272                    slot.value.get().write(MaybeUninit::new(value));
273                    slot.state.fetch_or(WRITE, Ordering::Release);
274
275                    return;
276                },
277                Err(t) => {
278                    tail = t;
279                    block = self.tail.block.load(Ordering::Acquire);
280                    backoff.spin();
281                }
282            }
283        }
284    }
285
286    /// Pops the head element from the queue.
287    ///
288    /// If the queue is empty, `None` is returned.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use crossbeam_queue::SegQueue;
294    ///
295    /// let q = SegQueue::new();
296    ///
297    /// q.push(10);
298    /// q.push(20);
299    /// assert_eq!(q.pop(), Some(10));
300    /// assert_eq!(q.pop(), Some(20));
301    /// assert!(q.pop().is_none());
302    /// ```
303    pub fn pop(&self) -> Option<T> {
304        let backoff = Backoff::new();
305        let mut head = self.head.index.load(Ordering::Acquire);
306        let mut block = self.head.block.load(Ordering::Acquire);
307
308        loop {
309            // Calculate the offset of the index into the block.
310            let offset = (head >> SHIFT) % LAP;
311
312            // If we reached the end of the block, wait until the next one is installed.
313            if offset == BLOCK_CAP {
314                backoff.snooze();
315                head = self.head.index.load(Ordering::Acquire);
316                block = self.head.block.load(Ordering::Acquire);
317                continue;
318            }
319
320            let mut new_head = head + (1 << SHIFT);
321
322            if new_head & HAS_NEXT == 0 {
323                atomic::fence(Ordering::SeqCst);
324                let tail = self.tail.index.load(Ordering::Relaxed);
325
326                // If the tail equals the head, that means the queue is empty.
327                if head >> SHIFT == tail >> SHIFT {
328                    return None;
329                }
330
331                // If head and tail are not in the same block, set `HAS_NEXT` in head.
332                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
333                    new_head |= HAS_NEXT;
334                }
335            }
336
337            // The block can be null here only if the first push operation is in progress. In that
338            // case, just wait until it gets initialized.
339            if block.is_null() {
340                backoff.snooze();
341                head = self.head.index.load(Ordering::Acquire);
342                block = self.head.block.load(Ordering::Acquire);
343                continue;
344            }
345
346            // Try moving the head index forward.
347            match self.head.index.compare_exchange_weak(
348                head,
349                new_head,
350                Ordering::SeqCst,
351                Ordering::Acquire,
352            ) {
353                Ok(_) => unsafe {
354                    // If we've reached the end of the block, move to the next one.
355                    if offset + 1 == BLOCK_CAP {
356                        let next = (*block).wait_next();
357                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
358                        if !(*next).next.load(Ordering::Relaxed).is_null() {
359                            next_index |= HAS_NEXT;
360                        }
361
362                        self.head.block.store(next, Ordering::Release);
363                        self.head.index.store(next_index, Ordering::Release);
364                    }
365
366                    // Read the value.
367                    let slot = (*block).slots.get_unchecked(offset);
368                    slot.wait_write();
369                    let value = slot.value.get().read().assume_init();
370
371                    // Destroy the block if we've reached the end, or if another thread wanted to
372                    // destroy but couldn't because we were busy reading from the slot.
373                    if offset + 1 == BLOCK_CAP {
374                        Block::destroy(block, 0);
375                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
376                        Block::destroy(block, offset + 1);
377                    }
378
379                    return Some(value);
380                },
381                Err(h) => {
382                    head = h;
383                    block = self.head.block.load(Ordering::Acquire);
384                    backoff.spin();
385                }
386            }
387        }
388    }
389
390    /// Returns `true` if the queue is empty.
391    ///
392    /// # Examples
393    ///
394    /// ```
395    /// use crossbeam_queue::SegQueue;
396    ///
397    /// let q = SegQueue::new();
398    ///
399    /// assert!(q.is_empty());
400    /// q.push(1);
401    /// assert!(!q.is_empty());
402    /// ```
403    pub fn is_empty(&self) -> bool {
404        let head = self.head.index.load(Ordering::SeqCst);
405        let tail = self.tail.index.load(Ordering::SeqCst);
406        head >> SHIFT == tail >> SHIFT
407    }
408
409    /// Returns the number of elements in the queue.
410    ///
411    /// # Examples
412    ///
413    /// ```
414    /// use crossbeam_queue::SegQueue;
415    ///
416    /// let q = SegQueue::new();
417    /// assert_eq!(q.len(), 0);
418    ///
419    /// q.push(10);
420    /// assert_eq!(q.len(), 1);
421    ///
422    /// q.push(20);
423    /// assert_eq!(q.len(), 2);
424    /// ```
425    pub fn len(&self) -> usize {
426        loop {
427            // Load the tail index, then load the head index.
428            let mut tail = self.tail.index.load(Ordering::SeqCst);
429            let mut head = self.head.index.load(Ordering::SeqCst);
430
431            // If the tail index didn't change, we've got consistent indices to work with.
432            if self.tail.index.load(Ordering::SeqCst) == tail {
433                // Erase the lower bits.
434                tail &= !((1 << SHIFT) - 1);
435                head &= !((1 << SHIFT) - 1);
436
437                // Fix up indices if they fall onto block ends.
438                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
439                    tail = tail.wrapping_add(1 << SHIFT);
440                }
441                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
442                    head = head.wrapping_add(1 << SHIFT);
443                }
444
445                // Rotate indices so that head falls into the first block.
446                let lap = (head >> SHIFT) / LAP;
447                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
448                head = head.wrapping_sub((lap * LAP) << SHIFT);
449
450                // Remove the lower bits.
451                tail >>= SHIFT;
452                head >>= SHIFT;
453
454                // Return the difference minus the number of blocks between tail and head.
455                return tail - head - tail / LAP;
456            }
457        }
458    }
459}
460
461impl<T> Drop for SegQueue<T> {
462    fn drop(&mut self) {
463        let mut head = *self.head.index.get_mut();
464        let mut tail = *self.tail.index.get_mut();
465        let mut block = *self.head.block.get_mut();
466
467        // Erase the lower bits.
468        head &= !((1 << SHIFT) - 1);
469        tail &= !((1 << SHIFT) - 1);
470
471        unsafe {
472            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
473            while head != tail {
474                let offset = (head >> SHIFT) % LAP;
475
476                if offset < BLOCK_CAP {
477                    // Drop the value in the slot.
478                    let slot = (*block).slots.get_unchecked(offset);
479                    (*slot.value.get()).assume_init_drop();
480                } else {
481                    // Deallocate the block and move to the next one.
482                    let next = *(*block).next.get_mut();
483                    drop(Box::from_raw(block));
484                    block = next;
485                }
486
487                head = head.wrapping_add(1 << SHIFT);
488            }
489
490            // Deallocate the last remaining block.
491            if !block.is_null() {
492                drop(Box::from_raw(block));
493            }
494        }
495    }
496}
497
498impl<T> fmt::Debug for SegQueue<T> {
499    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500        f.pad("SegQueue { .. }")
501    }
502}
503
504impl<T> Default for SegQueue<T> {
505    fn default() -> SegQueue<T> {
506        SegQueue::new()
507    }
508}
509
510impl<T> IntoIterator for SegQueue<T> {
511    type Item = T;
512
513    type IntoIter = IntoIter<T>;
514
515    fn into_iter(self) -> Self::IntoIter {
516        IntoIter { value: self }
517    }
518}
519
520#[derive(Debug)]
521pub struct IntoIter<T> {
522    value: SegQueue<T>,
523}
524
525impl<T> Iterator for IntoIter<T> {
526    type Item = T;
527
528    fn next(&mut self) -> Option<Self::Item> {
529        let value = &mut self.value;
530        let head = *value.head.index.get_mut();
531        let tail = *value.tail.index.get_mut();
532        if head >> SHIFT == tail >> SHIFT {
533            None
534        } else {
535            let block = *value.head.block.get_mut();
536            let offset = (head >> SHIFT) % LAP;
537
538            // SAFETY: We have mutable access to this, so we can read without
539            // worrying about concurrency. Furthermore, we know this is
540            // initialized because it is the value pointed at by `value.head`
541            // and this is a non-empty queue.
542            let item = unsafe {
543                let slot = (*block).slots.get_unchecked(offset);
544                slot.value.get().read().assume_init()
545            };
546            if offset + 1 == BLOCK_CAP {
547                // Deallocate the block and move to the next one.
548                // SAFETY: The block is initialized because we've been reading
549                // from it this entire time. We can drop it b/c everything has
550                // been read out of it, so nothing is pointing to it anymore.
551                unsafe {
552                    let next = *(*block).next.get_mut();
553                    drop(Box::from_raw(block));
554                    *value.head.block.get_mut() = next;
555                }
556                // The last value in a block is empty, so skip it
557                *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
558                // Double-check that we're pointing to the first item in a block.
559                debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
560            } else {
561                *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
562            }
563            Some(item)
564        }
565    }
566}