crossbeam_queue/array_queue.rs
1//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2//!
3//! Source:
4//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
5
6use alloc::boxed::Box;
7use core::cell::UnsafeCell;
8use core::fmt;
9use core::mem::{self, MaybeUninit};
10use core::panic::{RefUnwindSafe, UnwindSafe};
11use core::sync::atomic::{self, AtomicUsize, Ordering};
12
13use crossbeam_utils::{Backoff, CachePadded};
14
15/// A slot in a queue.
16struct Slot<T> {
17 /// The current stamp.
18 ///
19 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
20 /// this node will be next read from.
21 stamp: AtomicUsize,
22
23 /// The value in this slot.
24 value: UnsafeCell<MaybeUninit<T>>,
25}
26
27/// A bounded multi-producer multi-consumer queue.
28///
29/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
30/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
31/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
32/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
33/// a bit faster than [`SegQueue`].
34///
35/// [`force_push`]: ArrayQueue::force_push
36/// [`SegQueue`]: super::SegQueue
37///
38/// # Examples
39///
40/// ```
41/// use crossbeam_queue::ArrayQueue;
42///
43/// let q = ArrayQueue::new(2);
44///
45/// assert_eq!(q.push('a'), Ok(()));
46/// assert_eq!(q.push('b'), Ok(()));
47/// assert_eq!(q.push('c'), Err('c'));
48/// assert_eq!(q.pop(), Some('a'));
49/// ```
50pub struct ArrayQueue<T> {
51 /// The head of the queue.
52 ///
53 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
54 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
55 ///
56 /// Elements are popped from the head of the queue.
57 head: CachePadded<AtomicUsize>,
58
59 /// The tail of the queue.
60 ///
61 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
63 ///
64 /// Elements are pushed into the tail of the queue.
65 tail: CachePadded<AtomicUsize>,
66
67 /// The buffer holding slots.
68 buffer: Box<[Slot<T>]>,
69
70 /// The queue capacity.
71 cap: usize,
72
73 /// A stamp with the value of `{ lap: 1, index: 0 }`.
74 one_lap: usize,
75}
76
77unsafe impl<T: Send> Sync for ArrayQueue<T> {}
78unsafe impl<T: Send> Send for ArrayQueue<T> {}
79
80impl<T> UnwindSafe for ArrayQueue<T> {}
81impl<T> RefUnwindSafe for ArrayQueue<T> {}
82
83impl<T> ArrayQueue<T> {
84 /// Creates a new bounded queue with the given capacity.
85 ///
86 /// # Panics
87 ///
88 /// Panics if the capacity is zero.
89 ///
90 /// # Examples
91 ///
92 /// ```
93 /// use crossbeam_queue::ArrayQueue;
94 ///
95 /// let q = ArrayQueue::<i32>::new(100);
96 /// ```
97 pub fn new(cap: usize) -> ArrayQueue<T> {
98 assert!(cap > 0, "capacity must be non-zero");
99
100 // Head is initialized to `{ lap: 0, index: 0 }`.
101 // Tail is initialized to `{ lap: 0, index: 0 }`.
102 let head = 0;
103 let tail = 0;
104
105 // Allocate a buffer of `cap` slots initialized
106 // with stamps.
107 let buffer: Box<[Slot<T>]> = (0..cap)
108 .map(|i| {
109 // Set the stamp to `{ lap: 0, index: i }`.
110 Slot {
111 stamp: AtomicUsize::new(i),
112 value: UnsafeCell::new(MaybeUninit::uninit()),
113 }
114 })
115 .collect();
116
117 // One lap is the smallest power of two greater than `cap`.
118 let one_lap = (cap + 1).next_power_of_two();
119
120 ArrayQueue {
121 buffer,
122 cap,
123 one_lap,
124 head: CachePadded::new(AtomicUsize::new(head)),
125 tail: CachePadded::new(AtomicUsize::new(tail)),
126 }
127 }
128
129 fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
130 where
131 F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
132 {
133 let backoff = Backoff::new();
134 let mut tail = self.tail.load(Ordering::Relaxed);
135
136 loop {
137 // Deconstruct the tail.
138 let index = tail & (self.one_lap - 1);
139 let lap = tail & !(self.one_lap - 1);
140
141 let new_tail = if index + 1 < self.cap {
142 // Same lap, incremented index.
143 // Set to `{ lap: lap, index: index + 1 }`.
144 tail + 1
145 } else {
146 // One lap forward, index wraps around to zero.
147 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
148 lap.wrapping_add(self.one_lap)
149 };
150
151 // Inspect the corresponding slot.
152 debug_assert!(index < self.buffer.len());
153 let slot = unsafe { self.buffer.get_unchecked(index) };
154 let stamp = slot.stamp.load(Ordering::Acquire);
155
156 // If the tail and the stamp match, we may attempt to push.
157 if tail == stamp {
158 // Try moving the tail.
159 match self.tail.compare_exchange_weak(
160 tail,
161 new_tail,
162 Ordering::SeqCst,
163 Ordering::Relaxed,
164 ) {
165 Ok(_) => {
166 // Write the value into the slot and update the stamp.
167 unsafe {
168 slot.value.get().write(MaybeUninit::new(value));
169 }
170 slot.stamp.store(tail + 1, Ordering::Release);
171 return Ok(());
172 }
173 Err(t) => {
174 tail = t;
175 backoff.spin();
176 }
177 }
178 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
179 atomic::fence(Ordering::SeqCst);
180 value = f(value, tail, new_tail, slot)?;
181 backoff.spin();
182 tail = self.tail.load(Ordering::Relaxed);
183 } else {
184 // Snooze because we need to wait for the stamp to get updated.
185 backoff.snooze();
186 tail = self.tail.load(Ordering::Relaxed);
187 }
188 }
189 }
190
191 /// Attempts to push an element into the queue.
192 ///
193 /// If the queue is full, the element is returned back as an error.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use crossbeam_queue::ArrayQueue;
199 ///
200 /// let q = ArrayQueue::new(1);
201 ///
202 /// assert_eq!(q.push(10), Ok(()));
203 /// assert_eq!(q.push(20), Err(20));
204 /// ```
205 pub fn push(&self, value: T) -> Result<(), T> {
206 self.push_or_else(value, |v, tail, _, _| {
207 let head = self.head.load(Ordering::Relaxed);
208
209 // If the head lags one lap behind the tail as well...
210 if head.wrapping_add(self.one_lap) == tail {
211 // ...then the queue is full.
212 Err(v)
213 } else {
214 Ok(v)
215 }
216 })
217 }
218
219 /// Pushes an element into the queue, replacing the oldest element if necessary.
220 ///
221 /// If the queue is full, the oldest element is replaced and returned,
222 /// otherwise `None` is returned.
223 ///
224 /// # Examples
225 ///
226 /// ```
227 /// use crossbeam_queue::ArrayQueue;
228 ///
229 /// let q = ArrayQueue::new(2);
230 ///
231 /// assert_eq!(q.force_push(10), None);
232 /// assert_eq!(q.force_push(20), None);
233 /// assert_eq!(q.force_push(30), Some(10));
234 /// assert_eq!(q.pop(), Some(20));
235 /// ```
236 pub fn force_push(&self, value: T) -> Option<T> {
237 self.push_or_else(value, |v, tail, new_tail, slot| {
238 let head = tail.wrapping_sub(self.one_lap);
239 let new_head = new_tail.wrapping_sub(self.one_lap);
240
241 // Try moving the head.
242 if self
243 .head
244 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
245 .is_ok()
246 {
247 // Move the tail.
248 self.tail.store(new_tail, Ordering::SeqCst);
249
250 // Swap the previous value.
251 let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
252
253 // Update the stamp.
254 slot.stamp.store(tail + 1, Ordering::Release);
255
256 Err(old)
257 } else {
258 Ok(v)
259 }
260 })
261 .err()
262 }
263
264 /// Attempts to pop an element from the queue.
265 ///
266 /// If the queue is empty, `None` is returned.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use crossbeam_queue::ArrayQueue;
272 ///
273 /// let q = ArrayQueue::new(1);
274 /// assert_eq!(q.push(10), Ok(()));
275 ///
276 /// assert_eq!(q.pop(), Some(10));
277 /// assert!(q.pop().is_none());
278 /// ```
279 pub fn pop(&self) -> Option<T> {
280 let backoff = Backoff::new();
281 let mut head = self.head.load(Ordering::Relaxed);
282
283 loop {
284 // Deconstruct the head.
285 let index = head & (self.one_lap - 1);
286 let lap = head & !(self.one_lap - 1);
287
288 // Inspect the corresponding slot.
289 debug_assert!(index < self.buffer.len());
290 let slot = unsafe { self.buffer.get_unchecked(index) };
291 let stamp = slot.stamp.load(Ordering::Acquire);
292
293 // If the stamp is ahead of the head by 1, we may attempt to pop.
294 if head + 1 == stamp {
295 let new = if index + 1 < self.cap {
296 // Same lap, incremented index.
297 // Set to `{ lap: lap, index: index + 1 }`.
298 head + 1
299 } else {
300 // One lap forward, index wraps around to zero.
301 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
302 lap.wrapping_add(self.one_lap)
303 };
304
305 // Try moving the head.
306 match self.head.compare_exchange_weak(
307 head,
308 new,
309 Ordering::SeqCst,
310 Ordering::Relaxed,
311 ) {
312 Ok(_) => {
313 // Read the value from the slot and update the stamp.
314 let msg = unsafe { slot.value.get().read().assume_init() };
315 slot.stamp
316 .store(head.wrapping_add(self.one_lap), Ordering::Release);
317 return Some(msg);
318 }
319 Err(h) => {
320 head = h;
321 backoff.spin();
322 }
323 }
324 } else if stamp == head {
325 atomic::fence(Ordering::SeqCst);
326 let tail = self.tail.load(Ordering::Relaxed);
327
328 // If the tail equals the head, that means the channel is empty.
329 if tail == head {
330 return None;
331 }
332
333 backoff.spin();
334 head = self.head.load(Ordering::Relaxed);
335 } else {
336 // Snooze because we need to wait for the stamp to get updated.
337 backoff.snooze();
338 head = self.head.load(Ordering::Relaxed);
339 }
340 }
341 }
342
343 /// Returns the capacity of the queue.
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use crossbeam_queue::ArrayQueue;
349 ///
350 /// let q = ArrayQueue::<i32>::new(100);
351 ///
352 /// assert_eq!(q.capacity(), 100);
353 /// ```
354 pub fn capacity(&self) -> usize {
355 self.cap
356 }
357
358 /// Returns `true` if the queue is empty.
359 ///
360 /// # Examples
361 ///
362 /// ```
363 /// use crossbeam_queue::ArrayQueue;
364 ///
365 /// let q = ArrayQueue::new(100);
366 ///
367 /// assert!(q.is_empty());
368 /// q.push(1).unwrap();
369 /// assert!(!q.is_empty());
370 /// ```
371 pub fn is_empty(&self) -> bool {
372 let head = self.head.load(Ordering::SeqCst);
373 let tail = self.tail.load(Ordering::SeqCst);
374
375 // Is the tail lagging one lap behind head?
376 // Is the tail equal to the head?
377 //
378 // Note: If the head changes just before we load the tail, that means there was a moment
379 // when the channel was not empty, so it is safe to just return `false`.
380 tail == head
381 }
382
383 /// Returns `true` if the queue is full.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use crossbeam_queue::ArrayQueue;
389 ///
390 /// let q = ArrayQueue::new(1);
391 ///
392 /// assert!(!q.is_full());
393 /// q.push(1).unwrap();
394 /// assert!(q.is_full());
395 /// ```
396 pub fn is_full(&self) -> bool {
397 let tail = self.tail.load(Ordering::SeqCst);
398 let head = self.head.load(Ordering::SeqCst);
399
400 // Is the head lagging one lap behind tail?
401 //
402 // Note: If the tail changes just before we load the head, that means there was a moment
403 // when the queue was not full, so it is safe to just return `false`.
404 head.wrapping_add(self.one_lap) == tail
405 }
406
407 /// Returns the number of elements in the queue.
408 ///
409 /// # Examples
410 ///
411 /// ```
412 /// use crossbeam_queue::ArrayQueue;
413 ///
414 /// let q = ArrayQueue::new(100);
415 /// assert_eq!(q.len(), 0);
416 ///
417 /// q.push(10).unwrap();
418 /// assert_eq!(q.len(), 1);
419 ///
420 /// q.push(20).unwrap();
421 /// assert_eq!(q.len(), 2);
422 /// ```
423 pub fn len(&self) -> usize {
424 loop {
425 // Load the tail, then load the head.
426 let tail = self.tail.load(Ordering::SeqCst);
427 let head = self.head.load(Ordering::SeqCst);
428
429 // If the tail didn't change, we've got consistent values to work with.
430 if self.tail.load(Ordering::SeqCst) == tail {
431 let hix = head & (self.one_lap - 1);
432 let tix = tail & (self.one_lap - 1);
433
434 return if hix < tix {
435 tix - hix
436 } else if hix > tix {
437 self.cap - hix + tix
438 } else if tail == head {
439 0
440 } else {
441 self.cap
442 };
443 }
444 }
445 }
446}
447
448impl<T> Drop for ArrayQueue<T> {
449 fn drop(&mut self) {
450 if mem::needs_drop::<T>() {
451 // Get the index of the head.
452 let head = *self.head.get_mut();
453 let tail = *self.tail.get_mut();
454
455 let hix = head & (self.one_lap - 1);
456 let tix = tail & (self.one_lap - 1);
457
458 let len = if hix < tix {
459 tix - hix
460 } else if hix > tix {
461 self.cap - hix + tix
462 } else if tail == head {
463 0
464 } else {
465 self.cap
466 };
467
468 // Loop over all slots that hold a message and drop them.
469 for i in 0..len {
470 // Compute the index of the next slot holding a message.
471 let index = if hix + i < self.cap {
472 hix + i
473 } else {
474 hix + i - self.cap
475 };
476
477 unsafe {
478 debug_assert!(index < self.buffer.len());
479 let slot = self.buffer.get_unchecked_mut(index);
480 (*slot.value.get()).assume_init_drop();
481 }
482 }
483 }
484 }
485}
486
487impl<T> fmt::Debug for ArrayQueue<T> {
488 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489 f.pad("ArrayQueue { .. }")
490 }
491}
492
493impl<T> IntoIterator for ArrayQueue<T> {
494 type Item = T;
495
496 type IntoIter = IntoIter<T>;
497
498 fn into_iter(self) -> Self::IntoIter {
499 IntoIter { value: self }
500 }
501}
502
503#[derive(Debug)]
504pub struct IntoIter<T> {
505 value: ArrayQueue<T>,
506}
507
508impl<T> Iterator for IntoIter<T> {
509 type Item = T;
510
511 fn next(&mut self) -> Option<Self::Item> {
512 let value = &mut self.value;
513 let head = *value.head.get_mut();
514 if value.head.get_mut() != value.tail.get_mut() {
515 let index = head & (value.one_lap - 1);
516 let lap = head & !(value.one_lap - 1);
517 // SAFETY: We have mutable access to this, so we can read without
518 // worrying about concurrency. Furthermore, we know this is
519 // initialized because it is the value pointed at by `value.head`
520 // and this is a non-empty queue.
521 let val = unsafe {
522 debug_assert!(index < value.buffer.len());
523 let slot = value.buffer.get_unchecked_mut(index);
524 slot.value.get().read().assume_init()
525 };
526 let new = if index + 1 < value.cap {
527 // Same lap, incremented index.
528 // Set to `{ lap: lap, index: index + 1 }`.
529 head + 1
530 } else {
531 // One lap forward, index wraps around to zero.
532 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
533 lap.wrapping_add(value.one_lap)
534 };
535 *value.head.get_mut() = new;
536 Option::Some(val)
537 } else {
538 Option::None
539 }
540 }
541}