crossbeam_queue/
array_queue.rs

1//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2//!
3//! Source:
4//!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
5
6use alloc::boxed::Box;
7use core::cell::UnsafeCell;
8use core::fmt;
9use core::mem::{self, MaybeUninit};
10use core::panic::{RefUnwindSafe, UnwindSafe};
11use core::sync::atomic::{self, AtomicUsize, Ordering};
12
13use crossbeam_utils::{Backoff, CachePadded};
14
15/// A slot in a queue.
16struct Slot<T> {
17    /// The current stamp.
18    ///
19    /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
20    /// this node will be next read from.
21    stamp: AtomicUsize,
22
23    /// The value in this slot.
24    value: UnsafeCell<MaybeUninit<T>>,
25}
26
27/// A bounded multi-producer multi-consumer queue.
28///
29/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
30/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
31/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
32/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
33/// a bit faster than [`SegQueue`].
34///
35/// [`force_push`]: ArrayQueue::force_push
36/// [`SegQueue`]: super::SegQueue
37///
38/// # Examples
39///
40/// ```
41/// use crossbeam_queue::ArrayQueue;
42///
43/// let q = ArrayQueue::new(2);
44///
45/// assert_eq!(q.push('a'), Ok(()));
46/// assert_eq!(q.push('b'), Ok(()));
47/// assert_eq!(q.push('c'), Err('c'));
48/// assert_eq!(q.pop(), Some('a'));
49/// ```
50pub struct ArrayQueue<T> {
51    /// The head of the queue.
52    ///
53    /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
54    /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
55    ///
56    /// Elements are popped from the head of the queue.
57    head: CachePadded<AtomicUsize>,
58
59    /// The tail of the queue.
60    ///
61    /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62    /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
63    ///
64    /// Elements are pushed into the tail of the queue.
65    tail: CachePadded<AtomicUsize>,
66
67    /// The buffer holding slots.
68    buffer: Box<[Slot<T>]>,
69
70    /// The queue capacity.
71    cap: usize,
72
73    /// A stamp with the value of `{ lap: 1, index: 0 }`.
74    one_lap: usize,
75}
76
77unsafe impl<T: Send> Sync for ArrayQueue<T> {}
78unsafe impl<T: Send> Send for ArrayQueue<T> {}
79
80impl<T> UnwindSafe for ArrayQueue<T> {}
81impl<T> RefUnwindSafe for ArrayQueue<T> {}
82
83impl<T> ArrayQueue<T> {
84    /// Creates a new bounded queue with the given capacity.
85    ///
86    /// # Panics
87    ///
88    /// Panics if the capacity is zero.
89    ///
90    /// # Examples
91    ///
92    /// ```
93    /// use crossbeam_queue::ArrayQueue;
94    ///
95    /// let q = ArrayQueue::<i32>::new(100);
96    /// ```
97    pub fn new(cap: usize) -> ArrayQueue<T> {
98        assert!(cap > 0, "capacity must be non-zero");
99
100        // Head is initialized to `{ lap: 0, index: 0 }`.
101        // Tail is initialized to `{ lap: 0, index: 0 }`.
102        let head = 0;
103        let tail = 0;
104
105        // Allocate a buffer of `cap` slots initialized
106        // with stamps.
107        let buffer: Box<[Slot<T>]> = (0..cap)
108            .map(|i| {
109                // Set the stamp to `{ lap: 0, index: i }`.
110                Slot {
111                    stamp: AtomicUsize::new(i),
112                    value: UnsafeCell::new(MaybeUninit::uninit()),
113                }
114            })
115            .collect();
116
117        // One lap is the smallest power of two greater than `cap`.
118        let one_lap = (cap + 1).next_power_of_two();
119
120        ArrayQueue {
121            buffer,
122            cap,
123            one_lap,
124            head: CachePadded::new(AtomicUsize::new(head)),
125            tail: CachePadded::new(AtomicUsize::new(tail)),
126        }
127    }
128
129    fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
130    where
131        F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
132    {
133        let backoff = Backoff::new();
134        let mut tail = self.tail.load(Ordering::Relaxed);
135
136        loop {
137            // Deconstruct the tail.
138            let index = tail & (self.one_lap - 1);
139            let lap = tail & !(self.one_lap - 1);
140
141            let new_tail = if index + 1 < self.cap {
142                // Same lap, incremented index.
143                // Set to `{ lap: lap, index: index + 1 }`.
144                tail + 1
145            } else {
146                // One lap forward, index wraps around to zero.
147                // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
148                lap.wrapping_add(self.one_lap)
149            };
150
151            // Inspect the corresponding slot.
152            debug_assert!(index < self.buffer.len());
153            let slot = unsafe { self.buffer.get_unchecked(index) };
154            let stamp = slot.stamp.load(Ordering::Acquire);
155
156            // If the tail and the stamp match, we may attempt to push.
157            if tail == stamp {
158                // Try moving the tail.
159                match self.tail.compare_exchange_weak(
160                    tail,
161                    new_tail,
162                    Ordering::SeqCst,
163                    Ordering::Relaxed,
164                ) {
165                    Ok(_) => {
166                        // Write the value into the slot and update the stamp.
167                        unsafe {
168                            slot.value.get().write(MaybeUninit::new(value));
169                        }
170                        slot.stamp.store(tail + 1, Ordering::Release);
171                        return Ok(());
172                    }
173                    Err(t) => {
174                        tail = t;
175                        backoff.spin();
176                    }
177                }
178            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
179                atomic::fence(Ordering::SeqCst);
180                value = f(value, tail, new_tail, slot)?;
181                backoff.spin();
182                tail = self.tail.load(Ordering::Relaxed);
183            } else {
184                // Snooze because we need to wait for the stamp to get updated.
185                backoff.snooze();
186                tail = self.tail.load(Ordering::Relaxed);
187            }
188        }
189    }
190
191    /// Attempts to push an element into the queue.
192    ///
193    /// If the queue is full, the element is returned back as an error.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use crossbeam_queue::ArrayQueue;
199    ///
200    /// let q = ArrayQueue::new(1);
201    ///
202    /// assert_eq!(q.push(10), Ok(()));
203    /// assert_eq!(q.push(20), Err(20));
204    /// ```
205    pub fn push(&self, value: T) -> Result<(), T> {
206        self.push_or_else(value, |v, tail, _, _| {
207            let head = self.head.load(Ordering::Relaxed);
208
209            // If the head lags one lap behind the tail as well...
210            if head.wrapping_add(self.one_lap) == tail {
211                // ...then the queue is full.
212                Err(v)
213            } else {
214                Ok(v)
215            }
216        })
217    }
218
219    /// Pushes an element into the queue, replacing the oldest element if necessary.
220    ///
221    /// If the queue is full, the oldest element is replaced and returned,
222    /// otherwise `None` is returned.
223    ///
224    /// # Examples
225    ///
226    /// ```
227    /// use crossbeam_queue::ArrayQueue;
228    ///
229    /// let q = ArrayQueue::new(2);
230    ///
231    /// assert_eq!(q.force_push(10), None);
232    /// assert_eq!(q.force_push(20), None);
233    /// assert_eq!(q.force_push(30), Some(10));
234    /// assert_eq!(q.pop(), Some(20));
235    /// ```
236    pub fn force_push(&self, value: T) -> Option<T> {
237        self.push_or_else(value, |v, tail, new_tail, slot| {
238            let head = tail.wrapping_sub(self.one_lap);
239            let new_head = new_tail.wrapping_sub(self.one_lap);
240
241            // Try moving the head.
242            if self
243                .head
244                .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
245                .is_ok()
246            {
247                // Move the tail.
248                self.tail.store(new_tail, Ordering::SeqCst);
249
250                // Swap the previous value.
251                let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
252
253                // Update the stamp.
254                slot.stamp.store(tail + 1, Ordering::Release);
255
256                Err(old)
257            } else {
258                Ok(v)
259            }
260        })
261        .err()
262    }
263
264    /// Attempts to pop an element from the queue.
265    ///
266    /// If the queue is empty, `None` is returned.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use crossbeam_queue::ArrayQueue;
272    ///
273    /// let q = ArrayQueue::new(1);
274    /// assert_eq!(q.push(10), Ok(()));
275    ///
276    /// assert_eq!(q.pop(), Some(10));
277    /// assert!(q.pop().is_none());
278    /// ```
279    pub fn pop(&self) -> Option<T> {
280        let backoff = Backoff::new();
281        let mut head = self.head.load(Ordering::Relaxed);
282
283        loop {
284            // Deconstruct the head.
285            let index = head & (self.one_lap - 1);
286            let lap = head & !(self.one_lap - 1);
287
288            // Inspect the corresponding slot.
289            debug_assert!(index < self.buffer.len());
290            let slot = unsafe { self.buffer.get_unchecked(index) };
291            let stamp = slot.stamp.load(Ordering::Acquire);
292
293            // If the stamp is ahead of the head by 1, we may attempt to pop.
294            if head + 1 == stamp {
295                let new = if index + 1 < self.cap {
296                    // Same lap, incremented index.
297                    // Set to `{ lap: lap, index: index + 1 }`.
298                    head + 1
299                } else {
300                    // One lap forward, index wraps around to zero.
301                    // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
302                    lap.wrapping_add(self.one_lap)
303                };
304
305                // Try moving the head.
306                match self.head.compare_exchange_weak(
307                    head,
308                    new,
309                    Ordering::SeqCst,
310                    Ordering::Relaxed,
311                ) {
312                    Ok(_) => {
313                        // Read the value from the slot and update the stamp.
314                        let msg = unsafe { slot.value.get().read().assume_init() };
315                        slot.stamp
316                            .store(head.wrapping_add(self.one_lap), Ordering::Release);
317                        return Some(msg);
318                    }
319                    Err(h) => {
320                        head = h;
321                        backoff.spin();
322                    }
323                }
324            } else if stamp == head {
325                atomic::fence(Ordering::SeqCst);
326                let tail = self.tail.load(Ordering::Relaxed);
327
328                // If the tail equals the head, that means the channel is empty.
329                if tail == head {
330                    return None;
331                }
332
333                backoff.spin();
334                head = self.head.load(Ordering::Relaxed);
335            } else {
336                // Snooze because we need to wait for the stamp to get updated.
337                backoff.snooze();
338                head = self.head.load(Ordering::Relaxed);
339            }
340        }
341    }
342
343    /// Returns the capacity of the queue.
344    ///
345    /// # Examples
346    ///
347    /// ```
348    /// use crossbeam_queue::ArrayQueue;
349    ///
350    /// let q = ArrayQueue::<i32>::new(100);
351    ///
352    /// assert_eq!(q.capacity(), 100);
353    /// ```
354    pub fn capacity(&self) -> usize {
355        self.cap
356    }
357
358    /// Returns `true` if the queue is empty.
359    ///
360    /// # Examples
361    ///
362    /// ```
363    /// use crossbeam_queue::ArrayQueue;
364    ///
365    /// let q = ArrayQueue::new(100);
366    ///
367    /// assert!(q.is_empty());
368    /// q.push(1).unwrap();
369    /// assert!(!q.is_empty());
370    /// ```
371    pub fn is_empty(&self) -> bool {
372        let head = self.head.load(Ordering::SeqCst);
373        let tail = self.tail.load(Ordering::SeqCst);
374
375        // Is the tail lagging one lap behind head?
376        // Is the tail equal to the head?
377        //
378        // Note: If the head changes just before we load the tail, that means there was a moment
379        // when the channel was not empty, so it is safe to just return `false`.
380        tail == head
381    }
382
383    /// Returns `true` if the queue is full.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use crossbeam_queue::ArrayQueue;
389    ///
390    /// let q = ArrayQueue::new(1);
391    ///
392    /// assert!(!q.is_full());
393    /// q.push(1).unwrap();
394    /// assert!(q.is_full());
395    /// ```
396    pub fn is_full(&self) -> bool {
397        let tail = self.tail.load(Ordering::SeqCst);
398        let head = self.head.load(Ordering::SeqCst);
399
400        // Is the head lagging one lap behind tail?
401        //
402        // Note: If the tail changes just before we load the head, that means there was a moment
403        // when the queue was not full, so it is safe to just return `false`.
404        head.wrapping_add(self.one_lap) == tail
405    }
406
407    /// Returns the number of elements in the queue.
408    ///
409    /// # Examples
410    ///
411    /// ```
412    /// use crossbeam_queue::ArrayQueue;
413    ///
414    /// let q = ArrayQueue::new(100);
415    /// assert_eq!(q.len(), 0);
416    ///
417    /// q.push(10).unwrap();
418    /// assert_eq!(q.len(), 1);
419    ///
420    /// q.push(20).unwrap();
421    /// assert_eq!(q.len(), 2);
422    /// ```
423    pub fn len(&self) -> usize {
424        loop {
425            // Load the tail, then load the head.
426            let tail = self.tail.load(Ordering::SeqCst);
427            let head = self.head.load(Ordering::SeqCst);
428
429            // If the tail didn't change, we've got consistent values to work with.
430            if self.tail.load(Ordering::SeqCst) == tail {
431                let hix = head & (self.one_lap - 1);
432                let tix = tail & (self.one_lap - 1);
433
434                return if hix < tix {
435                    tix - hix
436                } else if hix > tix {
437                    self.cap - hix + tix
438                } else if tail == head {
439                    0
440                } else {
441                    self.cap
442                };
443            }
444        }
445    }
446}
447
448impl<T> Drop for ArrayQueue<T> {
449    fn drop(&mut self) {
450        if mem::needs_drop::<T>() {
451            // Get the index of the head.
452            let head = *self.head.get_mut();
453            let tail = *self.tail.get_mut();
454
455            let hix = head & (self.one_lap - 1);
456            let tix = tail & (self.one_lap - 1);
457
458            let len = if hix < tix {
459                tix - hix
460            } else if hix > tix {
461                self.cap - hix + tix
462            } else if tail == head {
463                0
464            } else {
465                self.cap
466            };
467
468            // Loop over all slots that hold a message and drop them.
469            for i in 0..len {
470                // Compute the index of the next slot holding a message.
471                let index = if hix + i < self.cap {
472                    hix + i
473                } else {
474                    hix + i - self.cap
475                };
476
477                unsafe {
478                    debug_assert!(index < self.buffer.len());
479                    let slot = self.buffer.get_unchecked_mut(index);
480                    (*slot.value.get()).assume_init_drop();
481                }
482            }
483        }
484    }
485}
486
487impl<T> fmt::Debug for ArrayQueue<T> {
488    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489        f.pad("ArrayQueue { .. }")
490    }
491}
492
493impl<T> IntoIterator for ArrayQueue<T> {
494    type Item = T;
495
496    type IntoIter = IntoIter<T>;
497
498    fn into_iter(self) -> Self::IntoIter {
499        IntoIter { value: self }
500    }
501}
502
503#[derive(Debug)]
504pub struct IntoIter<T> {
505    value: ArrayQueue<T>,
506}
507
508impl<T> Iterator for IntoIter<T> {
509    type Item = T;
510
511    fn next(&mut self) -> Option<Self::Item> {
512        let value = &mut self.value;
513        let head = *value.head.get_mut();
514        if value.head.get_mut() != value.tail.get_mut() {
515            let index = head & (value.one_lap - 1);
516            let lap = head & !(value.one_lap - 1);
517            // SAFETY: We have mutable access to this, so we can read without
518            // worrying about concurrency. Furthermore, we know this is
519            // initialized because it is the value pointed at by `value.head`
520            // and this is a non-empty queue.
521            let val = unsafe {
522                debug_assert!(index < value.buffer.len());
523                let slot = value.buffer.get_unchecked_mut(index);
524                slot.value.get().read().assume_init()
525            };
526            let new = if index + 1 < value.cap {
527                // Same lap, incremented index.
528                // Set to `{ lap: lap, index: index + 1 }`.
529                head + 1
530            } else {
531                // One lap forward, index wraps around to zero.
532                // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
533                lap.wrapping_add(value.one_lap)
534            };
535            *value.head.get_mut() = new;
536            Option::Some(val)
537        } else {
538            Option::None
539        }
540    }
541}