1//! Unbounded channel implemented as a linked list.
23use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
4use std::boxed::Box;
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::ptr;
9use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10use std::time::Instant;
1112use crossbeam_utils::{Backoff, CachePadded};
1314use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::SyncWaker;
1819// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
20// following changes by @kleimkuhler:
21//
22// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
23// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
2425// Bits indicating the state of a slot:
26// * If a message has been written into the slot, `WRITE` is set.
27// * If a message has been read from the slot, `READ` is set.
28// * If the block is being destroyed, `DESTROY` is set.
29const WRITE: usize = 1;
30const READ: usize = 2;
31const DESTROY: usize = 4;
3233// Each block covers one "lap" of indices.
34const LAP: usize = 32;
35// The maximum number of messages a block can hold.
36const BLOCK_CAP: usize = LAP - 1;
37// How many lower bits are reserved for metadata.
38const SHIFT: usize = 1;
39// Has two different purposes:
40// * If set in head, indicates that the block is not the last one.
41// * If set in tail, indicates that the channel is disconnected.
42const MARK_BIT: usize = 1;
4344/// A slot in a block.
45struct Slot<T> {
46/// The message.
47msg: UnsafeCell<MaybeUninit<T>>,
4849/// The state of the slot.
50state: AtomicUsize,
51}
5253impl<T> Slot<T> {
54/// Waits until a message is written into the slot.
55fn wait_write(&self) {
56let backoff = Backoff::new();
57while self.state.load(Ordering::Acquire) & WRITE == 0 {
58 backoff.snooze();
59 }
60 }
61}
6263/// A block in a linked list.
64///
65/// Each block in the list can hold up to `BLOCK_CAP` messages.
66struct Block<T> {
67/// The next block in the linked list.
68next: AtomicPtr<Block<T>>,
6970/// Slots for messages.
71slots: [Slot<T>; BLOCK_CAP],
72}
7374impl<T> Block<T> {
75const LAYOUT: Layout = {
76let layout = Layout::new::<Self>();
77assert!(
78 layout.size() != 0,
79"Block should never be zero-sized, as it has an AtomicPtr field"
80);
81 layout
82 };
8384/// Creates an empty block.
85fn new() -> Box<Self> {
86// SAFETY: layout is not zero-sized
87let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
88// Handle allocation failure
89if ptr.is_null() {
90 handle_alloc_error(Self::LAYOUT)
91 }
92// SAFETY: This is safe because:
93 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
94 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
95 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
96 // holds a MaybeUninit.
97 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
98 // TODO: unsafe { Box::new_zeroed().assume_init() }
99unsafe { Box::from_raw(ptr.cast()) }
100 }
101102/// Waits until the next pointer is set.
103fn wait_next(&self) -> *mut Block<T> {
104let backoff = Backoff::new();
105loop {
106let next = self.next.load(Ordering::Acquire);
107if !next.is_null() {
108return next;
109 }
110 backoff.snooze();
111 }
112 }
113114/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
115unsafe fn destroy(this: *mut Block<T>, start: usize) {
116// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
117 // begun destruction of the block.
118for i in start..BLOCK_CAP - 1 {
119let slot = (*this).slots.get_unchecked(i);
120121// Mark the `DESTROY` bit if a thread is still using the slot.
122if slot.state.load(Ordering::Acquire) & READ == 0
123&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
124{
125// If a thread is still using the slot, it will continue destruction of the block.
126return;
127 }
128 }
129130// No thread is using the block, now it is safe to destroy it.
131drop(Box::from_raw(this));
132 }
133}
134135/// A position in a channel.
136#[derive(Debug)]
137struct Position<T> {
138/// The index in the channel.
139index: AtomicUsize,
140141/// The block in the linked list.
142block: AtomicPtr<Block<T>>,
143}
144145/// The token type for the list flavor.
146#[derive(Debug)]
147pub(crate) struct ListToken {
148/// The block of slots.
149block: *const u8,
150151/// The offset into the block.
152offset: usize,
153}
154155impl Default for ListToken {
156#[inline]
157fn default() -> Self {
158 ListToken {
159 block: ptr::null(),
160 offset: 0,
161 }
162 }
163}
164165/// Unbounded channel implemented as a linked list.
166///
167/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
168/// represented as numbers of type `usize` and wrap on overflow.
169///
170/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
171/// improve cache efficiency.
172pub(crate) struct Channel<T> {
173/// The head of the channel.
174head: CachePadded<Position<T>>,
175176/// The tail of the channel.
177tail: CachePadded<Position<T>>,
178179/// Receivers waiting while the channel is empty and not disconnected.
180receivers: SyncWaker,
181182/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
183_marker: PhantomData<T>,
184}
185186impl<T> Channel<T> {
187/// Creates a new unbounded channel.
188pub(crate) fn new() -> Self {
189 Channel {
190 head: CachePadded::new(Position {
191 block: AtomicPtr::new(ptr::null_mut()),
192 index: AtomicUsize::new(0),
193 }),
194 tail: CachePadded::new(Position {
195 block: AtomicPtr::new(ptr::null_mut()),
196 index: AtomicUsize::new(0),
197 }),
198 receivers: SyncWaker::new(),
199 _marker: PhantomData,
200 }
201 }
202203/// Returns a receiver handle to the channel.
204pub(crate) fn receiver(&self) -> Receiver<'_, T> {
205 Receiver(self)
206 }
207208/// Returns a sender handle to the channel.
209pub(crate) fn sender(&self) -> Sender<'_, T> {
210 Sender(self)
211 }
212213/// Attempts to reserve a slot for sending a message.
214fn start_send(&self, token: &mut Token) -> bool {
215let backoff = Backoff::new();
216let mut tail = self.tail.index.load(Ordering::Acquire);
217let mut block = self.tail.block.load(Ordering::Acquire);
218let mut next_block = None;
219220loop {
221// Check if the channel is disconnected.
222if tail & MARK_BIT != 0 {
223 token.list.block = ptr::null();
224return true;
225 }
226227// Calculate the offset of the index into the block.
228let offset = (tail >> SHIFT) % LAP;
229230// If we reached the end of the block, wait until the next one is installed.
231if offset == BLOCK_CAP {
232 backoff.snooze();
233 tail = self.tail.index.load(Ordering::Acquire);
234 block = self.tail.block.load(Ordering::Acquire);
235continue;
236 }
237238// If we're going to have to install the next block, allocate it in advance in order to
239 // make the wait for other threads as short as possible.
240if offset + 1 == BLOCK_CAP && next_block.is_none() {
241 next_block = Some(Block::<T>::new());
242 }
243244// If this is the first message to be sent into the channel, we need to allocate the
245 // first block and install it.
246if block.is_null() {
247let new = Box::into_raw(Block::<T>::new());
248249if self
250.tail
251 .block
252 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
253 .is_ok()
254 {
255self.head.block.store(new, Ordering::Release);
256 block = new;
257 } else {
258 next_block = unsafe { Some(Box::from_raw(new)) };
259 tail = self.tail.index.load(Ordering::Acquire);
260 block = self.tail.block.load(Ordering::Acquire);
261continue;
262 }
263 }
264265let new_tail = tail + (1 << SHIFT);
266267// Try advancing the tail forward.
268match self.tail.index.compare_exchange_weak(
269 tail,
270 new_tail,
271 Ordering::SeqCst,
272 Ordering::Acquire,
273 ) {
274Ok(_) => unsafe {
275// If we've reached the end of the block, install the next one.
276if offset + 1 == BLOCK_CAP {
277let next_block = Box::into_raw(next_block.unwrap());
278self.tail.block.store(next_block, Ordering::Release);
279self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
280 (*block).next.store(next_block, Ordering::Release);
281 }
282283 token.list.block = block as *const u8;
284 token.list.offset = offset;
285return true;
286 },
287Err(t) => {
288 tail = t;
289 block = self.tail.block.load(Ordering::Acquire);
290 backoff.spin();
291 }
292 }
293 }
294 }
295296/// Writes a message into the channel.
297pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
298// If there is no slot, the channel is disconnected.
299if token.list.block.is_null() {
300return Err(msg);
301 }
302303// Write the message into the slot.
304let block = token.list.block.cast::<Block<T>>();
305let offset = token.list.offset;
306let slot = (*block).slots.get_unchecked(offset);
307 slot.msg.get().write(MaybeUninit::new(msg));
308 slot.state.fetch_or(WRITE, Ordering::Release);
309310// Wake a sleeping receiver.
311self.receivers.notify();
312Ok(())
313 }
314315/// Attempts to reserve a slot for receiving a message.
316fn start_recv(&self, token: &mut Token) -> bool {
317let backoff = Backoff::new();
318let mut head = self.head.index.load(Ordering::Acquire);
319let mut block = self.head.block.load(Ordering::Acquire);
320321loop {
322// Calculate the offset of the index into the block.
323let offset = (head >> SHIFT) % LAP;
324325// If we reached the end of the block, wait until the next one is installed.
326if offset == BLOCK_CAP {
327 backoff.snooze();
328 head = self.head.index.load(Ordering::Acquire);
329 block = self.head.block.load(Ordering::Acquire);
330continue;
331 }
332333let mut new_head = head + (1 << SHIFT);
334335if new_head & MARK_BIT == 0 {
336 atomic::fence(Ordering::SeqCst);
337let tail = self.tail.index.load(Ordering::Relaxed);
338339// If the tail equals the head, that means the channel is empty.
340if head >> SHIFT == tail >> SHIFT {
341// If the channel is disconnected...
342if tail & MARK_BIT != 0 {
343// ...then receive an error.
344token.list.block = ptr::null();
345return true;
346 } else {
347// Otherwise, the receive operation is not ready.
348return false;
349 }
350 }
351352// If head and tail are not in the same block, set `MARK_BIT` in head.
353if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
354 new_head |= MARK_BIT;
355 }
356 }
357358// The block can be null here only if the first message is being sent into the channel.
359 // In that case, just wait until it gets initialized.
360if block.is_null() {
361 backoff.snooze();
362 head = self.head.index.load(Ordering::Acquire);
363 block = self.head.block.load(Ordering::Acquire);
364continue;
365 }
366367// Try moving the head index forward.
368match self.head.index.compare_exchange_weak(
369 head,
370 new_head,
371 Ordering::SeqCst,
372 Ordering::Acquire,
373 ) {
374Ok(_) => unsafe {
375// If we've reached the end of the block, move to the next one.
376if offset + 1 == BLOCK_CAP {
377let next = (*block).wait_next();
378let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
379if !(*next).next.load(Ordering::Relaxed).is_null() {
380 next_index |= MARK_BIT;
381 }
382383self.head.block.store(next, Ordering::Release);
384self.head.index.store(next_index, Ordering::Release);
385 }
386387 token.list.block = block as *const u8;
388 token.list.offset = offset;
389return true;
390 },
391Err(h) => {
392 head = h;
393 block = self.head.block.load(Ordering::Acquire);
394 backoff.spin();
395 }
396 }
397 }
398 }
399400/// Reads a message from the channel.
401pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
402if token.list.block.is_null() {
403// The channel is disconnected.
404return Err(());
405 }
406407// Read the message.
408let block = token.list.block as *mut Block<T>;
409let offset = token.list.offset;
410let slot = (*block).slots.get_unchecked(offset);
411 slot.wait_write();
412let msg = slot.msg.get().read().assume_init();
413414// Destroy the block if we've reached the end, or if another thread wanted to destroy but
415 // couldn't because we were busy reading from the slot.
416if offset + 1 == BLOCK_CAP {
417 Block::destroy(block, 0);
418 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
419 Block::destroy(block, offset + 1);
420 }
421422Ok(msg)
423 }
424425/// Attempts to send a message into the channel.
426pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
427self.send(msg, None).map_err(|err| match err {
428 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
429 SendTimeoutError::Timeout(_) => unreachable!(),
430 })
431 }
432433/// Sends a message into the channel.
434pub(crate) fn send(
435&self,
436 msg: T,
437 _deadline: Option<Instant>,
438 ) -> Result<(), SendTimeoutError<T>> {
439let token = &mut Token::default();
440assert!(self.start_send(token));
441unsafe {
442self.write(token, msg)
443 .map_err(SendTimeoutError::Disconnected)
444 }
445 }
446447/// Attempts to receive a message without blocking.
448pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
449let token = &mut Token::default();
450451if self.start_recv(token) {
452unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
453 } else {
454Err(TryRecvError::Empty)
455 }
456 }
457458/// Receives a message from the channel.
459pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
460let token = &mut Token::default();
461loop {
462// Try receiving a message several times.
463let backoff = Backoff::new();
464loop {
465if self.start_recv(token) {
466unsafe {
467return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
468 }
469 }
470471if backoff.is_completed() {
472break;
473 } else {
474 backoff.snooze();
475 }
476 }
477478if let Some(d) = deadline {
479if Instant::now() >= d {
480return Err(RecvTimeoutError::Timeout);
481 }
482 }
483484// Prepare for blocking until a sender wakes us up.
485Context::with(|cx| {
486let oper = Operation::hook(token);
487self.receivers.register(oper, cx);
488489// Has the channel become ready just now?
490if !self.is_empty() || self.is_disconnected() {
491let _ = cx.try_select(Selected::Aborted);
492 }
493494// Block the current thread.
495let sel = cx.wait_until(deadline);
496497match sel {
498 Selected::Waiting => unreachable!(),
499 Selected::Aborted | Selected::Disconnected => {
500self.receivers.unregister(oper).unwrap();
501// If the channel was disconnected, we still have to check for remaining
502 // messages.
503}
504 Selected::Operation(_) => {}
505 }
506 });
507 }
508 }
509510/// Returns the current number of messages inside the channel.
511pub(crate) fn len(&self) -> usize {
512loop {
513// Load the tail index, then load the head index.
514let mut tail = self.tail.index.load(Ordering::SeqCst);
515let mut head = self.head.index.load(Ordering::SeqCst);
516517// If the tail index didn't change, we've got consistent indices to work with.
518if self.tail.index.load(Ordering::SeqCst) == tail {
519// Erase the lower bits.
520tail &= !((1 << SHIFT) - 1);
521 head &= !((1 << SHIFT) - 1);
522523// Fix up indices if they fall onto block ends.
524if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
525 tail = tail.wrapping_add(1 << SHIFT);
526 }
527if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
528 head = head.wrapping_add(1 << SHIFT);
529 }
530531// Rotate indices so that head falls into the first block.
532let lap = (head >> SHIFT) / LAP;
533 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
534 head = head.wrapping_sub((lap * LAP) << SHIFT);
535536// Remove the lower bits.
537tail >>= SHIFT;
538 head >>= SHIFT;
539540// Return the difference minus the number of blocks between tail and head.
541return tail - head - tail / LAP;
542 }
543 }
544 }
545546/// Returns the capacity of the channel.
547pub(crate) fn capacity(&self) -> Option<usize> {
548None
549}
550551/// Disconnects senders and wakes up all blocked receivers.
552 ///
553 /// Returns `true` if this call disconnected the channel.
554pub(crate) fn disconnect_senders(&self) -> bool {
555let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
556557if tail & MARK_BIT == 0 {
558self.receivers.disconnect();
559true
560} else {
561false
562}
563 }
564565/// Disconnects receivers.
566 ///
567 /// Returns `true` if this call disconnected the channel.
568pub(crate) fn disconnect_receivers(&self) -> bool {
569let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
570571if tail & MARK_BIT == 0 {
572// If receivers are dropped first, discard all messages to free
573 // memory eagerly.
574self.discard_all_messages();
575true
576} else {
577false
578}
579 }
580581/// Discards all messages.
582 ///
583 /// This method should only be called when all receivers are dropped.
584fn discard_all_messages(&self) {
585let backoff = Backoff::new();
586let mut tail = self.tail.index.load(Ordering::Acquire);
587loop {
588let offset = (tail >> SHIFT) % LAP;
589if offset != BLOCK_CAP {
590break;
591 }
592593// New updates to tail will be rejected by MARK_BIT and aborted unless it's
594 // at boundary. We need to wait for the updates take affect otherwise there
595 // can be memory leaks.
596backoff.snooze();
597 tail = self.tail.index.load(Ordering::Acquire);
598 }
599600let mut head = self.head.index.load(Ordering::Acquire);
601// The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
602 // to initialize the first block before noticing that the receivers disconnected. Late allocations
603 // will be deallocated by the sender in Drop
604let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
605606// If we're going to be dropping messages we need to synchronize with initialization
607if head >> SHIFT != tail >> SHIFT {
608// The block can be null here only if a sender is in the process of initializing the
609 // channel while another sender managed to send a message by inserting it into the
610 // semi-initialized channel and advanced the tail.
611 // In that case, just wait until it gets initialized.
612while block.is_null() {
613 backoff.snooze();
614 block = self.head.block.load(Ordering::Acquire);
615 }
616 }
617618unsafe {
619// Drop all messages between head and tail and deallocate the heap-allocated blocks.
620while head >> SHIFT != tail >> SHIFT {
621let offset = (head >> SHIFT) % LAP;
622623if offset < BLOCK_CAP {
624// Drop the message in the slot.
625let slot = (*block).slots.get_unchecked(offset);
626 slot.wait_write();
627 (*slot.msg.get()).assume_init_drop();
628 } else {
629 (*block).wait_next();
630// Deallocate the block and move to the next one.
631let next = (*block).next.load(Ordering::Acquire);
632 drop(Box::from_raw(block));
633 block = next;
634 }
635636 head = head.wrapping_add(1 << SHIFT);
637 }
638639// Deallocate the last remaining block.
640if !block.is_null() {
641 drop(Box::from_raw(block));
642 }
643 }
644 head &= !MARK_BIT;
645self.head.index.store(head, Ordering::Release);
646 }
647648/// Returns `true` if the channel is disconnected.
649pub(crate) fn is_disconnected(&self) -> bool {
650self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
651}
652653/// Returns `true` if the channel is empty.
654pub(crate) fn is_empty(&self) -> bool {
655let head = self.head.index.load(Ordering::SeqCst);
656let tail = self.tail.index.load(Ordering::SeqCst);
657 head >> SHIFT == tail >> SHIFT
658 }
659660/// Returns `true` if the channel is full.
661pub(crate) fn is_full(&self) -> bool {
662false
663}
664}
665666impl<T> Drop for Channel<T> {
667fn drop(&mut self) {
668let mut head = *self.head.index.get_mut();
669let mut tail = *self.tail.index.get_mut();
670let mut block = *self.head.block.get_mut();
671672// Erase the lower bits.
673head &= !((1 << SHIFT) - 1);
674 tail &= !((1 << SHIFT) - 1);
675676unsafe {
677// Drop all messages between head and tail and deallocate the heap-allocated blocks.
678while head != tail {
679let offset = (head >> SHIFT) % LAP;
680681if offset < BLOCK_CAP {
682// Drop the message in the slot.
683let slot = (*block).slots.get_unchecked(offset);
684 (*slot.msg.get()).assume_init_drop();
685 } else {
686// Deallocate the block and move to the next one.
687let next = *(*block).next.get_mut();
688 drop(Box::from_raw(block));
689 block = next;
690 }
691692 head = head.wrapping_add(1 << SHIFT);
693 }
694695// Deallocate the last remaining block.
696if !block.is_null() {
697 drop(Box::from_raw(block));
698 }
699 }
700 }
701}
702703/// Receiver handle to a channel.
704pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
705706/// Sender handle to a channel.
707pub(crate) struct Sender<'a, T>(&'a Channel<T>);
708709impl<T> SelectHandle for Receiver<'_, T> {
710fn try_select(&self, token: &mut Token) -> bool {
711self.0.start_recv(token)
712 }
713714fn deadline(&self) -> Option<Instant> {
715None
716}
717718fn register(&self, oper: Operation, cx: &Context) -> bool {
719self.0.receivers.register(oper, cx);
720self.is_ready()
721 }
722723fn unregister(&self, oper: Operation) {
724self.0.receivers.unregister(oper);
725 }
726727fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
728self.try_select(token)
729 }
730731fn is_ready(&self) -> bool {
732 !self.0.is_empty() || self.0.is_disconnected()
733 }
734735fn watch(&self, oper: Operation, cx: &Context) -> bool {
736self.0.receivers.watch(oper, cx);
737self.is_ready()
738 }
739740fn unwatch(&self, oper: Operation) {
741self.0.receivers.unwatch(oper);
742 }
743}
744745impl<T> SelectHandle for Sender<'_, T> {
746fn try_select(&self, token: &mut Token) -> bool {
747self.0.start_send(token)
748 }
749750fn deadline(&self) -> Option<Instant> {
751None
752}
753754fn register(&self, _oper: Operation, _cx: &Context) -> bool {
755self.is_ready()
756 }
757758fn unregister(&self, _oper: Operation) {}
759760fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
761self.try_select(token)
762 }
763764fn is_ready(&self) -> bool {
765true
766}
767768fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
769self.is_ready()
770 }
771772fn unwatch(&self, _oper: Operation) {}
773}