crossbeam_queue/seg_queue.rs
1use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2use alloc::boxed::Box;
3use core::cell::UnsafeCell;
4use core::fmt;
5use core::marker::PhantomData;
6use core::mem::MaybeUninit;
7use core::panic::{RefUnwindSafe, UnwindSafe};
8use core::ptr;
9use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10
11use crossbeam_utils::{Backoff, CachePadded};
12
13// Bits indicating the state of a slot:
14// * If a value has been written into the slot, `WRITE` is set.
15// * If a value has been read from the slot, `READ` is set.
16// * If the block is being destroyed, `DESTROY` is set.
17const WRITE: usize = 1;
18const READ: usize = 2;
19const DESTROY: usize = 4;
20
21// Each block covers one "lap" of indices.
22const LAP: usize = 32;
23// The maximum number of values a block can hold.
24const BLOCK_CAP: usize = LAP - 1;
25// How many lower bits are reserved for metadata.
26const SHIFT: usize = 1;
27// Indicates that the block is not the last one.
28const HAS_NEXT: usize = 1;
29
30/// A slot in a block.
31struct Slot<T> {
32 /// The value.
33 value: UnsafeCell<MaybeUninit<T>>,
34
35 /// The state of the slot.
36 state: AtomicUsize,
37}
38
39impl<T> Slot<T> {
40 /// Waits until a value is written into the slot.
41 fn wait_write(&self) {
42 let backoff = Backoff::new();
43 while self.state.load(Ordering::Acquire) & WRITE == 0 {
44 backoff.snooze();
45 }
46 }
47}
48
49/// A block in a linked list.
50///
51/// Each block in the list can hold up to `BLOCK_CAP` values.
52struct Block<T> {
53 /// The next block in the linked list.
54 next: AtomicPtr<Block<T>>,
55
56 /// Slots for values.
57 slots: [Slot<T>; BLOCK_CAP],
58}
59
60impl<T> Block<T> {
61 const LAYOUT: Layout = {
62 let layout = Layout::new::<Self>();
63 assert!(
64 layout.size() != 0,
65 "Block should never be zero-sized, as it has an AtomicPtr field"
66 );
67 layout
68 };
69
70 /// Creates an empty block.
71 fn new() -> Box<Self> {
72 // SAFETY: layout is not zero-sized
73 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
74 // Handle allocation failure
75 if ptr.is_null() {
76 handle_alloc_error(Self::LAYOUT)
77 }
78 // SAFETY: This is safe because:
79 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
80 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
81 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
82 // holds a MaybeUninit.
83 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
84 // TODO: unsafe { Box::new_zeroed().assume_init() }
85 unsafe { Box::from_raw(ptr.cast()) }
86 }
87
88 /// Waits until the next pointer is set.
89 fn wait_next(&self) -> *mut Block<T> {
90 let backoff = Backoff::new();
91 loop {
92 let next = self.next.load(Ordering::Acquire);
93 if !next.is_null() {
94 return next;
95 }
96 backoff.snooze();
97 }
98 }
99
100 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
101 unsafe fn destroy(this: *mut Block<T>, start: usize) {
102 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
103 // begun destruction of the block.
104 for i in start..BLOCK_CAP - 1 {
105 let slot = (*this).slots.get_unchecked(i);
106
107 // Mark the `DESTROY` bit if a thread is still using the slot.
108 if slot.state.load(Ordering::Acquire) & READ == 0
109 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
110 {
111 // If a thread is still using the slot, it will continue destruction of the block.
112 return;
113 }
114 }
115
116 // No thread is using the block, now it is safe to destroy it.
117 drop(Box::from_raw(this));
118 }
119}
120
121/// A position in a queue.
122struct Position<T> {
123 /// The index in the queue.
124 index: AtomicUsize,
125
126 /// The block in the linked list.
127 block: AtomicPtr<Block<T>>,
128}
129
130/// An unbounded multi-producer multi-consumer queue.
131///
132/// This queue is implemented as a linked list of segments, where each segment is a small buffer
133/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
134/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
135/// this queue is somewhat slower than [`ArrayQueue`].
136///
137/// [`ArrayQueue`]: super::ArrayQueue
138///
139/// # Examples
140///
141/// ```
142/// use crossbeam_queue::SegQueue;
143///
144/// let q = SegQueue::new();
145///
146/// q.push('a');
147/// q.push('b');
148///
149/// assert_eq!(q.pop(), Some('a'));
150/// assert_eq!(q.pop(), Some('b'));
151/// assert!(q.pop().is_none());
152/// ```
153pub struct SegQueue<T> {
154 /// The head of the queue.
155 head: CachePadded<Position<T>>,
156
157 /// The tail of the queue.
158 tail: CachePadded<Position<T>>,
159
160 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
161 _marker: PhantomData<T>,
162}
163
164unsafe impl<T: Send> Send for SegQueue<T> {}
165unsafe impl<T: Send> Sync for SegQueue<T> {}
166
167impl<T> UnwindSafe for SegQueue<T> {}
168impl<T> RefUnwindSafe for SegQueue<T> {}
169
170impl<T> SegQueue<T> {
171 /// Creates a new unbounded queue.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use crossbeam_queue::SegQueue;
177 ///
178 /// let q = SegQueue::<i32>::new();
179 /// ```
180 pub const fn new() -> SegQueue<T> {
181 SegQueue {
182 head: CachePadded::new(Position {
183 block: AtomicPtr::new(ptr::null_mut()),
184 index: AtomicUsize::new(0),
185 }),
186 tail: CachePadded::new(Position {
187 block: AtomicPtr::new(ptr::null_mut()),
188 index: AtomicUsize::new(0),
189 }),
190 _marker: PhantomData,
191 }
192 }
193
194 /// Pushes back an element to the tail.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use crossbeam_queue::SegQueue;
200 ///
201 /// let q = SegQueue::new();
202 ///
203 /// q.push(10);
204 /// q.push(20);
205 /// ```
206 pub fn push(&self, value: T) {
207 let backoff = Backoff::new();
208 let mut tail = self.tail.index.load(Ordering::Acquire);
209 let mut block = self.tail.block.load(Ordering::Acquire);
210 let mut next_block = None;
211
212 loop {
213 // Calculate the offset of the index into the block.
214 let offset = (tail >> SHIFT) % LAP;
215
216 // If we reached the end of the block, wait until the next one is installed.
217 if offset == BLOCK_CAP {
218 backoff.snooze();
219 tail = self.tail.index.load(Ordering::Acquire);
220 block = self.tail.block.load(Ordering::Acquire);
221 continue;
222 }
223
224 // If we're going to have to install the next block, allocate it in advance in order to
225 // make the wait for other threads as short as possible.
226 if offset + 1 == BLOCK_CAP && next_block.is_none() {
227 next_block = Some(Block::<T>::new());
228 }
229
230 // If this is the first push operation, we need to allocate the first block.
231 if block.is_null() {
232 let new = Box::into_raw(Block::<T>::new());
233
234 if self
235 .tail
236 .block
237 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238 .is_ok()
239 {
240 self.head.block.store(new, Ordering::Release);
241 block = new;
242 } else {
243 next_block = unsafe { Some(Box::from_raw(new)) };
244 tail = self.tail.index.load(Ordering::Acquire);
245 block = self.tail.block.load(Ordering::Acquire);
246 continue;
247 }
248 }
249
250 let new_tail = tail + (1 << SHIFT);
251
252 // Try advancing the tail forward.
253 match self.tail.index.compare_exchange_weak(
254 tail,
255 new_tail,
256 Ordering::SeqCst,
257 Ordering::Acquire,
258 ) {
259 Ok(_) => unsafe {
260 // If we've reached the end of the block, install the next one.
261 if offset + 1 == BLOCK_CAP {
262 let next_block = Box::into_raw(next_block.unwrap());
263 let next_index = new_tail.wrapping_add(1 << SHIFT);
264
265 self.tail.block.store(next_block, Ordering::Release);
266 self.tail.index.store(next_index, Ordering::Release);
267 (*block).next.store(next_block, Ordering::Release);
268 }
269
270 // Write the value into the slot.
271 let slot = (*block).slots.get_unchecked(offset);
272 slot.value.get().write(MaybeUninit::new(value));
273 slot.state.fetch_or(WRITE, Ordering::Release);
274
275 return;
276 },
277 Err(t) => {
278 tail = t;
279 block = self.tail.block.load(Ordering::Acquire);
280 backoff.spin();
281 }
282 }
283 }
284 }
285
286 /// Pops the head element from the queue.
287 ///
288 /// If the queue is empty, `None` is returned.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use crossbeam_queue::SegQueue;
294 ///
295 /// let q = SegQueue::new();
296 ///
297 /// q.push(10);
298 /// q.push(20);
299 /// assert_eq!(q.pop(), Some(10));
300 /// assert_eq!(q.pop(), Some(20));
301 /// assert!(q.pop().is_none());
302 /// ```
303 pub fn pop(&self) -> Option<T> {
304 let backoff = Backoff::new();
305 let mut head = self.head.index.load(Ordering::Acquire);
306 let mut block = self.head.block.load(Ordering::Acquire);
307
308 loop {
309 // Calculate the offset of the index into the block.
310 let offset = (head >> SHIFT) % LAP;
311
312 // If we reached the end of the block, wait until the next one is installed.
313 if offset == BLOCK_CAP {
314 backoff.snooze();
315 head = self.head.index.load(Ordering::Acquire);
316 block = self.head.block.load(Ordering::Acquire);
317 continue;
318 }
319
320 let mut new_head = head + (1 << SHIFT);
321
322 if new_head & HAS_NEXT == 0 {
323 atomic::fence(Ordering::SeqCst);
324 let tail = self.tail.index.load(Ordering::Relaxed);
325
326 // If the tail equals the head, that means the queue is empty.
327 if head >> SHIFT == tail >> SHIFT {
328 return None;
329 }
330
331 // If head and tail are not in the same block, set `HAS_NEXT` in head.
332 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
333 new_head |= HAS_NEXT;
334 }
335 }
336
337 // The block can be null here only if the first push operation is in progress. In that
338 // case, just wait until it gets initialized.
339 if block.is_null() {
340 backoff.snooze();
341 head = self.head.index.load(Ordering::Acquire);
342 block = self.head.block.load(Ordering::Acquire);
343 continue;
344 }
345
346 // Try moving the head index forward.
347 match self.head.index.compare_exchange_weak(
348 head,
349 new_head,
350 Ordering::SeqCst,
351 Ordering::Acquire,
352 ) {
353 Ok(_) => unsafe {
354 // If we've reached the end of the block, move to the next one.
355 if offset + 1 == BLOCK_CAP {
356 let next = (*block).wait_next();
357 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
358 if !(*next).next.load(Ordering::Relaxed).is_null() {
359 next_index |= HAS_NEXT;
360 }
361
362 self.head.block.store(next, Ordering::Release);
363 self.head.index.store(next_index, Ordering::Release);
364 }
365
366 // Read the value.
367 let slot = (*block).slots.get_unchecked(offset);
368 slot.wait_write();
369 let value = slot.value.get().read().assume_init();
370
371 // Destroy the block if we've reached the end, or if another thread wanted to
372 // destroy but couldn't because we were busy reading from the slot.
373 if offset + 1 == BLOCK_CAP {
374 Block::destroy(block, 0);
375 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
376 Block::destroy(block, offset + 1);
377 }
378
379 return Some(value);
380 },
381 Err(h) => {
382 head = h;
383 block = self.head.block.load(Ordering::Acquire);
384 backoff.spin();
385 }
386 }
387 }
388 }
389
390 /// Returns `true` if the queue is empty.
391 ///
392 /// # Examples
393 ///
394 /// ```
395 /// use crossbeam_queue::SegQueue;
396 ///
397 /// let q = SegQueue::new();
398 ///
399 /// assert!(q.is_empty());
400 /// q.push(1);
401 /// assert!(!q.is_empty());
402 /// ```
403 pub fn is_empty(&self) -> bool {
404 let head = self.head.index.load(Ordering::SeqCst);
405 let tail = self.tail.index.load(Ordering::SeqCst);
406 head >> SHIFT == tail >> SHIFT
407 }
408
409 /// Returns the number of elements in the queue.
410 ///
411 /// # Examples
412 ///
413 /// ```
414 /// use crossbeam_queue::SegQueue;
415 ///
416 /// let q = SegQueue::new();
417 /// assert_eq!(q.len(), 0);
418 ///
419 /// q.push(10);
420 /// assert_eq!(q.len(), 1);
421 ///
422 /// q.push(20);
423 /// assert_eq!(q.len(), 2);
424 /// ```
425 pub fn len(&self) -> usize {
426 loop {
427 // Load the tail index, then load the head index.
428 let mut tail = self.tail.index.load(Ordering::SeqCst);
429 let mut head = self.head.index.load(Ordering::SeqCst);
430
431 // If the tail index didn't change, we've got consistent indices to work with.
432 if self.tail.index.load(Ordering::SeqCst) == tail {
433 // Erase the lower bits.
434 tail &= !((1 << SHIFT) - 1);
435 head &= !((1 << SHIFT) - 1);
436
437 // Fix up indices if they fall onto block ends.
438 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
439 tail = tail.wrapping_add(1 << SHIFT);
440 }
441 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
442 head = head.wrapping_add(1 << SHIFT);
443 }
444
445 // Rotate indices so that head falls into the first block.
446 let lap = (head >> SHIFT) / LAP;
447 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
448 head = head.wrapping_sub((lap * LAP) << SHIFT);
449
450 // Remove the lower bits.
451 tail >>= SHIFT;
452 head >>= SHIFT;
453
454 // Return the difference minus the number of blocks between tail and head.
455 return tail - head - tail / LAP;
456 }
457 }
458 }
459}
460
461impl<T> Drop for SegQueue<T> {
462 fn drop(&mut self) {
463 let mut head = *self.head.index.get_mut();
464 let mut tail = *self.tail.index.get_mut();
465 let mut block = *self.head.block.get_mut();
466
467 // Erase the lower bits.
468 head &= !((1 << SHIFT) - 1);
469 tail &= !((1 << SHIFT) - 1);
470
471 unsafe {
472 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
473 while head != tail {
474 let offset = (head >> SHIFT) % LAP;
475
476 if offset < BLOCK_CAP {
477 // Drop the value in the slot.
478 let slot = (*block).slots.get_unchecked(offset);
479 (*slot.value.get()).assume_init_drop();
480 } else {
481 // Deallocate the block and move to the next one.
482 let next = *(*block).next.get_mut();
483 drop(Box::from_raw(block));
484 block = next;
485 }
486
487 head = head.wrapping_add(1 << SHIFT);
488 }
489
490 // Deallocate the last remaining block.
491 if !block.is_null() {
492 drop(Box::from_raw(block));
493 }
494 }
495 }
496}
497
498impl<T> fmt::Debug for SegQueue<T> {
499 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500 f.pad("SegQueue { .. }")
501 }
502}
503
504impl<T> Default for SegQueue<T> {
505 fn default() -> SegQueue<T> {
506 SegQueue::new()
507 }
508}
509
510impl<T> IntoIterator for SegQueue<T> {
511 type Item = T;
512
513 type IntoIter = IntoIter<T>;
514
515 fn into_iter(self) -> Self::IntoIter {
516 IntoIter { value: self }
517 }
518}
519
520#[derive(Debug)]
521pub struct IntoIter<T> {
522 value: SegQueue<T>,
523}
524
525impl<T> Iterator for IntoIter<T> {
526 type Item = T;
527
528 fn next(&mut self) -> Option<Self::Item> {
529 let value = &mut self.value;
530 let head = *value.head.index.get_mut();
531 let tail = *value.tail.index.get_mut();
532 if head >> SHIFT == tail >> SHIFT {
533 None
534 } else {
535 let block = *value.head.block.get_mut();
536 let offset = (head >> SHIFT) % LAP;
537
538 // SAFETY: We have mutable access to this, so we can read without
539 // worrying about concurrency. Furthermore, we know this is
540 // initialized because it is the value pointed at by `value.head`
541 // and this is a non-empty queue.
542 let item = unsafe {
543 let slot = (*block).slots.get_unchecked(offset);
544 slot.value.get().read().assume_init()
545 };
546 if offset + 1 == BLOCK_CAP {
547 // Deallocate the block and move to the next one.
548 // SAFETY: The block is initialized because we've been reading
549 // from it this entire time. We can drop it b/c everything has
550 // been read out of it, so nothing is pointing to it anymore.
551 unsafe {
552 let next = *(*block).next.get_mut();
553 drop(Box::from_raw(block));
554 *value.head.block.get_mut() = next;
555 }
556 // The last value in a block is empty, so skip it
557 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
558 // Double-check that we're pointing to the first item in a block.
559 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
560 } else {
561 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
562 }
563 Some(item)
564 }
565 }
566}