futures_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use futures_core::stream::{FusedStream, Stream};
82use futures_core::task::__internal::AtomicWaker;
83use futures_core::task::{Context, Poll, Waker};
84use std::fmt;
85use std::pin::Pin;
86use std::sync::atomic::AtomicUsize;
87use std::sync::atomic::Ordering::SeqCst;
88use std::sync::{Arc, Mutex};
89use std::thread;
90
91use crate::mpsc::queue::Queue;
92
93mod queue;
94#[cfg(feature = "sink")]
95mod sink_impl;
96
97struct UnboundedSenderInner<T> {
98    // Channel state shared between the sender and receiver.
99    inner: Arc<UnboundedInner<T>>,
100}
101
102struct BoundedSenderInner<T> {
103    // Channel state shared between the sender and receiver.
104    inner: Arc<BoundedInner<T>>,
105
106    // Handle to the task that is blocked on this sender. This handle is sent
107    // to the receiver half in order to be notified when the sender becomes
108    // unblocked.
109    sender_task: Arc<Mutex<SenderTask>>,
110
111    // `true` if the sender might be blocked. This is an optimization to avoid
112    // having to lock the mutex most of the time.
113    maybe_parked: bool,
114}
115
116// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117impl<T> Unpin for UnboundedSenderInner<T> {}
118impl<T> Unpin for BoundedSenderInner<T> {}
119
120/// The transmission end of a bounded mpsc channel.
121///
122/// This value is created by the [`channel`] function.
123pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124
125/// The transmission end of an unbounded mpsc channel.
126///
127/// This value is created by the [`unbounded`] function.
128pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129
130#[allow(dead_code)]
131trait AssertKinds: Send + Sync + Clone {}
132impl AssertKinds for UnboundedSender<u32> {}
133
134/// The receiving end of a bounded mpsc channel.
135///
136/// This value is created by the [`channel`] function.
137pub struct Receiver<T> {
138    inner: Option<Arc<BoundedInner<T>>>,
139}
140
141/// The receiving end of an unbounded mpsc channel.
142///
143/// This value is created by the [`unbounded`] function.
144pub struct UnboundedReceiver<T> {
145    inner: Option<Arc<UnboundedInner<T>>>,
146}
147
148// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
149impl<T> Unpin for UnboundedReceiver<T> {}
150
151/// The error type for [`Sender`s](Sender) used as `Sink`s.
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct SendError {
154    kind: SendErrorKind,
155}
156
157/// The error type returned from [`try_send`](Sender::try_send).
158#[derive(Clone, PartialEq, Eq)]
159pub struct TrySendError<T> {
160    err: SendError,
161    val: T,
162}
163
164#[derive(Clone, Debug, PartialEq, Eq)]
165enum SendErrorKind {
166    Full,
167    Disconnected,
168}
169
170/// The error type returned from [`try_next`](Receiver::try_next).
171pub struct TryRecvError {
172    _priv: (),
173}
174
175impl fmt::Display for SendError {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        if self.is_full() {
178            write!(f, "send failed because channel is full")
179        } else {
180            write!(f, "send failed because receiver is gone")
181        }
182    }
183}
184
185impl std::error::Error for SendError {}
186
187impl SendError {
188    /// Returns `true` if this error is a result of the channel being full.
189    pub fn is_full(&self) -> bool {
190        match self.kind {
191            SendErrorKind::Full => true,
192            _ => false,
193        }
194    }
195
196    /// Returns `true` if this error is a result of the receiver being dropped.
197    pub fn is_disconnected(&self) -> bool {
198        match self.kind {
199            SendErrorKind::Disconnected => true,
200            _ => false,
201        }
202    }
203}
204
205impl<T> fmt::Debug for TrySendError<T> {
206    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
208    }
209}
210
211impl<T> fmt::Display for TrySendError<T> {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        if self.is_full() {
214            write!(f, "send failed because channel is full")
215        } else {
216            write!(f, "send failed because receiver is gone")
217        }
218    }
219}
220
221impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
222
223impl<T> TrySendError<T> {
224    /// Returns `true` if this error is a result of the channel being full.
225    pub fn is_full(&self) -> bool {
226        self.err.is_full()
227    }
228
229    /// Returns `true` if this error is a result of the receiver being dropped.
230    pub fn is_disconnected(&self) -> bool {
231        self.err.is_disconnected()
232    }
233
234    /// Returns the message that was attempted to be sent but failed.
235    pub fn into_inner(self) -> T {
236        self.val
237    }
238
239    /// Drops the message and converts into a `SendError`.
240    pub fn into_send_error(self) -> SendError {
241        self.err
242    }
243}
244
245impl fmt::Debug for TryRecvError {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        f.debug_tuple("TryRecvError").finish()
248    }
249}
250
251impl fmt::Display for TryRecvError {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        write!(f, "receiver channel is empty")
254    }
255}
256
257impl std::error::Error for TryRecvError {}
258
259struct UnboundedInner<T> {
260    // Internal channel state. Consists of the number of messages stored in the
261    // channel as well as a flag signalling that the channel is closed.
262    state: AtomicUsize,
263
264    // Atomic, FIFO queue used to send messages to the receiver
265    message_queue: Queue<T>,
266
267    // Number of senders in existence
268    num_senders: AtomicUsize,
269
270    // Handle to the receiver's task.
271    recv_task: AtomicWaker,
272}
273
274struct BoundedInner<T> {
275    // Max buffer size of the channel. If `None` then the channel is unbounded.
276    buffer: usize,
277
278    // Internal channel state. Consists of the number of messages stored in the
279    // channel as well as a flag signalling that the channel is closed.
280    state: AtomicUsize,
281
282    // Atomic, FIFO queue used to send messages to the receiver
283    message_queue: Queue<T>,
284
285    // Atomic, FIFO queue used to send parked task handles to the receiver.
286    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
287
288    // Number of senders in existence
289    num_senders: AtomicUsize,
290
291    // Handle to the receiver's task.
292    recv_task: AtomicWaker,
293}
294
295// Struct representation of `Inner::state`.
296#[derive(Clone, Copy)]
297struct State {
298    // `true` when the channel is open
299    is_open: bool,
300
301    // Number of messages in the channel
302    num_messages: usize,
303}
304
305// The `is_open` flag is stored in the left-most bit of `Inner::state`
306const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
307
308// When a new channel is created, it is created in the open state with no
309// pending messages.
310const INIT_STATE: usize = OPEN_MASK;
311
312// The maximum number of messages that a channel can track is `usize::MAX >> 1`
313const MAX_CAPACITY: usize = !(OPEN_MASK);
314
315// The maximum requested buffer size must be less than the maximum capacity of
316// a channel. This is because each sender gets a guaranteed slot.
317const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
318
319// Sent to the consumer to wake up blocked producers
320struct SenderTask {
321    task: Option<Waker>,
322    is_parked: bool,
323}
324
325impl SenderTask {
326    fn new() -> Self {
327        Self { task: None, is_parked: false }
328    }
329
330    fn notify(&mut self) {
331        self.is_parked = false;
332
333        if let Some(task) = self.task.take() {
334            task.wake();
335        }
336    }
337}
338
339/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
340///
341/// Being bounded, this channel provides backpressure to ensure that the sender
342/// outpaces the receiver by only a limited amount. The channel's capacity is
343/// equal to `buffer + num-senders`. In other words, each sender gets a
344/// guaranteed slot in the channel capacity, and on top of that there are
345/// `buffer` "first come, first serve" slots available to all senders.
346///
347/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
348/// implements `Sink`.
349pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
350    // Check that the requested buffer size does not exceed the maximum buffer
351    // size permitted by the system.
352    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
353
354    let inner = Arc::new(BoundedInner {
355        buffer,
356        state: AtomicUsize::new(INIT_STATE),
357        message_queue: Queue::new(),
358        parked_queue: Queue::new(),
359        num_senders: AtomicUsize::new(1),
360        recv_task: AtomicWaker::new(),
361    });
362
363    let tx = BoundedSenderInner {
364        inner: inner.clone(),
365        sender_task: Arc::new(Mutex::new(SenderTask::new())),
366        maybe_parked: false,
367    };
368
369    let rx = Receiver { inner: Some(inner) };
370
371    (Sender(Some(tx)), rx)
372}
373
374/// Creates an unbounded mpsc channel for communicating between asynchronous
375/// tasks.
376///
377/// A `send` on this channel will always succeed as long as the receive half has
378/// not been closed. If the receiver falls behind, messages will be arbitrarily
379/// buffered.
380///
381/// **Note** that the amount of available system memory is an implicit bound to
382/// the channel. Using an `unbounded` channel has the ability of causing the
383/// process to run out of memory. In this case, the process will be aborted.
384pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
385    let inner = Arc::new(UnboundedInner {
386        state: AtomicUsize::new(INIT_STATE),
387        message_queue: Queue::new(),
388        num_senders: AtomicUsize::new(1),
389        recv_task: AtomicWaker::new(),
390    });
391
392    let tx = UnboundedSenderInner { inner: inner.clone() };
393
394    let rx = UnboundedReceiver { inner: Some(inner) };
395
396    (UnboundedSender(Some(tx)), rx)
397}
398
399/*
400 *
401 * ===== impl Sender =====
402 *
403 */
404
405impl<T> UnboundedSenderInner<T> {
406    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
407        let state = decode_state(self.inner.state.load(SeqCst));
408        if state.is_open {
409            Poll::Ready(Ok(()))
410        } else {
411            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
412        }
413    }
414
415    // Push message to the queue and signal to the receiver
416    fn queue_push_and_signal(&self, msg: T) {
417        // Push the message onto the message queue
418        self.inner.message_queue.push(msg);
419
420        // Signal to the receiver that a message has been enqueued. If the
421        // receiver is parked, this will unpark the task.
422        self.inner.recv_task.wake();
423    }
424
425    // Increment the number of queued messages. Returns the resulting number.
426    fn inc_num_messages(&self) -> Option<usize> {
427        let mut curr = self.inner.state.load(SeqCst);
428
429        loop {
430            let mut state = decode_state(curr);
431
432            // The receiver end closed the channel.
433            if !state.is_open {
434                return None;
435            }
436
437            // This probably is never hit? Odds are the process will run out of
438            // memory first. It may be worth to return something else in this
439            // case?
440            assert!(
441                state.num_messages < MAX_CAPACITY,
442                "buffer space \
443                    exhausted; sending this messages would overflow the state"
444            );
445
446            state.num_messages += 1;
447
448            let next = encode_state(&state);
449            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
450                Ok(_) => return Some(state.num_messages),
451                Err(actual) => curr = actual,
452            }
453        }
454    }
455
456    /// Returns whether the senders send to the same receiver.
457    fn same_receiver(&self, other: &Self) -> bool {
458        Arc::ptr_eq(&self.inner, &other.inner)
459    }
460
461    /// Returns whether the sender send to this receiver.
462    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
463        Arc::ptr_eq(&self.inner, inner)
464    }
465
466    /// Returns pointer to the Arc containing sender
467    ///
468    /// The returned pointer is not referenced and should be only used for hashing!
469    fn ptr(&self) -> *const UnboundedInner<T> {
470        &*self.inner
471    }
472
473    /// Returns whether this channel is closed without needing a context.
474    fn is_closed(&self) -> bool {
475        !decode_state(self.inner.state.load(SeqCst)).is_open
476    }
477
478    /// Closes this channel from the sender side, preventing any new messages.
479    fn close_channel(&self) {
480        // There's no need to park this sender, its dropping,
481        // and we don't want to check for capacity, so skip
482        // that stuff from `do_send`.
483
484        self.inner.set_closed();
485        self.inner.recv_task.wake();
486    }
487}
488
489impl<T> BoundedSenderInner<T> {
490    /// Attempts to send a message on this `Sender`, returning the message
491    /// if there was an error.
492    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
493        // If the sender is currently blocked, reject the message
494        if !self.poll_unparked(None).is_ready() {
495            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
496        }
497
498        // The channel has capacity to accept the message, so send it
499        self.do_send_b(msg)
500    }
501
502    // Do the send without failing.
503    // Can be called only by bounded sender.
504    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
505        // Anyone calling do_send *should* make sure there is room first,
506        // but assert here for tests as a sanity check.
507        debug_assert!(self.poll_unparked(None).is_ready());
508
509        // First, increment the number of messages contained by the channel.
510        // This operation will also atomically determine if the sender task
511        // should be parked.
512        //
513        // `None` is returned in the case that the channel has been closed by the
514        // receiver. This happens when `Receiver::close` is called or the
515        // receiver is dropped.
516        let park_self = match self.inc_num_messages() {
517            Some(num_messages) => {
518                // Block if the current number of pending messages has exceeded
519                // the configured buffer size
520                num_messages > self.inner.buffer
521            }
522            None => {
523                return Err(TrySendError {
524                    err: SendError { kind: SendErrorKind::Disconnected },
525                    val: msg,
526                })
527            }
528        };
529
530        // If the channel has reached capacity, then the sender task needs to
531        // be parked. This will send the task handle on the parked task queue.
532        //
533        // However, when `do_send` is called while dropping the `Sender`,
534        // `task::current()` can't be called safely. In this case, in order to
535        // maintain internal consistency, a blank message is pushed onto the
536        // parked task queue.
537        if park_self {
538            self.park();
539        }
540
541        self.queue_push_and_signal(msg);
542
543        Ok(())
544    }
545
546    // Push message to the queue and signal to the receiver
547    fn queue_push_and_signal(&self, msg: T) {
548        // Push the message onto the message queue
549        self.inner.message_queue.push(msg);
550
551        // Signal to the receiver that a message has been enqueued. If the
552        // receiver is parked, this will unpark the task.
553        self.inner.recv_task.wake();
554    }
555
556    // Increment the number of queued messages. Returns the resulting number.
557    fn inc_num_messages(&self) -> Option<usize> {
558        let mut curr = self.inner.state.load(SeqCst);
559
560        loop {
561            let mut state = decode_state(curr);
562
563            // The receiver end closed the channel.
564            if !state.is_open {
565                return None;
566            }
567
568            // This probably is never hit? Odds are the process will run out of
569            // memory first. It may be worth to return something else in this
570            // case?
571            assert!(
572                state.num_messages < MAX_CAPACITY,
573                "buffer space \
574                    exhausted; sending this messages would overflow the state"
575            );
576
577            state.num_messages += 1;
578
579            let next = encode_state(&state);
580            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
581                Ok(_) => return Some(state.num_messages),
582                Err(actual) => curr = actual,
583            }
584        }
585    }
586
587    fn park(&mut self) {
588        {
589            let mut sender = self.sender_task.lock().unwrap();
590            sender.task = None;
591            sender.is_parked = true;
592        }
593
594        // Send handle over queue
595        let t = self.sender_task.clone();
596        self.inner.parked_queue.push(t);
597
598        // Check to make sure we weren't closed after we sent our task on the
599        // queue
600        let state = decode_state(self.inner.state.load(SeqCst));
601        self.maybe_parked = state.is_open;
602    }
603
604    /// Polls the channel to determine if there is guaranteed capacity to send
605    /// at least one item without waiting.
606    ///
607    /// # Return value
608    ///
609    /// This method returns:
610    ///
611    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
612    /// - `Poll::Pending` if the channel may not have
613    ///   capacity, in which case the current task is queued to be notified once
614    ///   capacity is available;
615    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
616    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
617        let state = decode_state(self.inner.state.load(SeqCst));
618        if !state.is_open {
619            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
620        }
621
622        self.poll_unparked(Some(cx)).map(Ok)
623    }
624
625    /// Returns whether the senders send to the same receiver.
626    fn same_receiver(&self, other: &Self) -> bool {
627        Arc::ptr_eq(&self.inner, &other.inner)
628    }
629
630    /// Returns whether the sender send to this receiver.
631    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
632        Arc::ptr_eq(&self.inner, receiver)
633    }
634
635    /// Returns pointer to the Arc containing sender
636    ///
637    /// The returned pointer is not referenced and should be only used for hashing!
638    fn ptr(&self) -> *const BoundedInner<T> {
639        &*self.inner
640    }
641
642    /// Returns whether this channel is closed without needing a context.
643    fn is_closed(&self) -> bool {
644        !decode_state(self.inner.state.load(SeqCst)).is_open
645    }
646
647    /// Closes this channel from the sender side, preventing any new messages.
648    fn close_channel(&self) {
649        // There's no need to park this sender, its dropping,
650        // and we don't want to check for capacity, so skip
651        // that stuff from `do_send`.
652
653        self.inner.set_closed();
654        self.inner.recv_task.wake();
655    }
656
657    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
658        // First check the `maybe_parked` variable. This avoids acquiring the
659        // lock in most cases
660        if self.maybe_parked {
661            // Get a lock on the task handle
662            let mut task = self.sender_task.lock().unwrap();
663
664            if !task.is_parked {
665                self.maybe_parked = false;
666                return Poll::Ready(());
667            }
668
669            // At this point, an unpark request is pending, so there will be an
670            // unpark sometime in the future. We just need to make sure that
671            // the correct task will be notified.
672            //
673            // Update the task in case the `Sender` has been moved to another
674            // task
675            task.task = cx.map(|cx| cx.waker().clone());
676
677            Poll::Pending
678        } else {
679            Poll::Ready(())
680        }
681    }
682}
683
684impl<T> Sender<T> {
685    /// Attempts to send a message on this `Sender`, returning the message
686    /// if there was an error.
687    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
688        if let Some(inner) = &mut self.0 {
689            inner.try_send(msg)
690        } else {
691            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
692        }
693    }
694
695    /// Send a message on the channel.
696    ///
697    /// This function should only be called after
698    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
699    /// ready to receive a message.
700    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
701        self.try_send(msg).map_err(|e| e.err)
702    }
703
704    /// Polls the channel to determine if there is guaranteed capacity to send
705    /// at least one item without waiting.
706    ///
707    /// # Return value
708    ///
709    /// This method returns:
710    ///
711    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
712    /// - `Poll::Pending` if the channel may not have
713    ///   capacity, in which case the current task is queued to be notified once
714    ///   capacity is available;
715    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
716    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
717        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
718        inner.poll_ready(cx)
719    }
720
721    /// Returns whether this channel is closed without needing a context.
722    pub fn is_closed(&self) -> bool {
723        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
724    }
725
726    /// Closes this channel from the sender side, preventing any new messages.
727    pub fn close_channel(&mut self) {
728        if let Some(inner) = &mut self.0 {
729            inner.close_channel();
730        }
731    }
732
733    /// Disconnects this sender from the channel, closing it if there are no more senders left.
734    pub fn disconnect(&mut self) {
735        self.0 = None;
736    }
737
738    /// Returns whether the senders send to the same receiver.
739    pub fn same_receiver(&self, other: &Self) -> bool {
740        match (&self.0, &other.0) {
741            (Some(inner), Some(other)) => inner.same_receiver(other),
742            _ => false,
743        }
744    }
745
746    /// Returns whether the sender send to this receiver.
747    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
748        match (&self.0, &receiver.inner) {
749            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
750            _ => false,
751        }
752    }
753
754    /// Hashes the receiver into the provided hasher
755    pub fn hash_receiver<H>(&self, hasher: &mut H)
756    where
757        H: std::hash::Hasher,
758    {
759        use std::hash::Hash;
760
761        let ptr = self.0.as_ref().map(|inner| inner.ptr());
762        ptr.hash(hasher);
763    }
764}
765
766impl<T> UnboundedSender<T> {
767    /// Check if the channel is ready to receive a message.
768    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
769        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
770        inner.poll_ready_nb()
771    }
772
773    /// Returns whether this channel is closed without needing a context.
774    pub fn is_closed(&self) -> bool {
775        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
776    }
777
778    /// Closes this channel from the sender side, preventing any new messages.
779    pub fn close_channel(&self) {
780        if let Some(inner) = &self.0 {
781            inner.close_channel();
782        }
783    }
784
785    /// Disconnects this sender from the channel, closing it if there are no more senders left.
786    pub fn disconnect(&mut self) {
787        self.0 = None;
788    }
789
790    // Do the send without parking current task.
791    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
792        if let Some(inner) = &self.0 {
793            if inner.inc_num_messages().is_some() {
794                inner.queue_push_and_signal(msg);
795                return Ok(());
796            }
797        }
798
799        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
800    }
801
802    /// Send a message on the channel.
803    ///
804    /// This method should only be called after `poll_ready` has been used to
805    /// verify that the channel is ready to receive a message.
806    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
807        self.do_send_nb(msg).map_err(|e| e.err)
808    }
809
810    /// Sends a message along this channel.
811    ///
812    /// This is an unbounded sender, so this function differs from `Sink::send`
813    /// by ensuring the return type reflects that the channel is always ready to
814    /// receive messages.
815    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
816        self.do_send_nb(msg)
817    }
818
819    /// Returns whether the senders send to the same receiver.
820    pub fn same_receiver(&self, other: &Self) -> bool {
821        match (&self.0, &other.0) {
822            (Some(inner), Some(other)) => inner.same_receiver(other),
823            _ => false,
824        }
825    }
826
827    /// Returns whether the sender send to this receiver.
828    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
829        match (&self.0, &receiver.inner) {
830            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
831            _ => false,
832        }
833    }
834
835    /// Hashes the receiver into the provided hasher
836    pub fn hash_receiver<H>(&self, hasher: &mut H)
837    where
838        H: std::hash::Hasher,
839    {
840        use std::hash::Hash;
841
842        let ptr = self.0.as_ref().map(|inner| inner.ptr());
843        ptr.hash(hasher);
844    }
845
846    /// Return the number of messages in the queue or 0 if channel is disconnected.
847    pub fn len(&self) -> usize {
848        if let Some(sender) = &self.0 {
849            decode_state(sender.inner.state.load(SeqCst)).num_messages
850        } else {
851            0
852        }
853    }
854
855    /// Return false is channel has no queued messages, true otherwise.
856    pub fn is_empty(&self) -> bool {
857        self.len() == 0
858    }
859}
860
861impl<T> Clone for Sender<T> {
862    fn clone(&self) -> Self {
863        Self(self.0.clone())
864    }
865}
866
867impl<T> Clone for UnboundedSender<T> {
868    fn clone(&self) -> Self {
869        Self(self.0.clone())
870    }
871}
872
873impl<T> Clone for UnboundedSenderInner<T> {
874    fn clone(&self) -> Self {
875        // Since this atomic op isn't actually guarding any memory and we don't
876        // care about any orderings besides the ordering on the single atomic
877        // variable, a relaxed ordering is acceptable.
878        let mut curr = self.inner.num_senders.load(SeqCst);
879
880        loop {
881            // If the maximum number of senders has been reached, then fail
882            if curr == MAX_BUFFER {
883                panic!("cannot clone `Sender` -- too many outstanding senders");
884            }
885
886            debug_assert!(curr < MAX_BUFFER);
887
888            let next = curr + 1;
889            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
890                Ok(_) => {
891                    // The ABA problem doesn't matter here. We only care that the
892                    // number of senders never exceeds the maximum.
893                    return Self { inner: self.inner.clone() };
894                }
895                Err(actual) => curr = actual,
896            }
897        }
898    }
899}
900
901impl<T> Clone for BoundedSenderInner<T> {
902    fn clone(&self) -> Self {
903        // Since this atomic op isn't actually guarding any memory and we don't
904        // care about any orderings besides the ordering on the single atomic
905        // variable, a relaxed ordering is acceptable.
906        let mut curr = self.inner.num_senders.load(SeqCst);
907
908        loop {
909            // If the maximum number of senders has been reached, then fail
910            if curr == self.inner.max_senders() {
911                panic!("cannot clone `Sender` -- too many outstanding senders");
912            }
913
914            debug_assert!(curr < self.inner.max_senders());
915
916            let next = curr + 1;
917            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
918                Ok(_) => {
919                    // The ABA problem doesn't matter here. We only care that the
920                    // number of senders never exceeds the maximum.
921                    return Self {
922                        inner: self.inner.clone(),
923                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
924                        maybe_parked: false,
925                    };
926                }
927                Err(actual) => curr = actual,
928            }
929        }
930    }
931}
932
933impl<T> Drop for UnboundedSenderInner<T> {
934    fn drop(&mut self) {
935        // Ordering between variables don't matter here
936        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
937
938        if prev == 1 {
939            self.close_channel();
940        }
941    }
942}
943
944impl<T> Drop for BoundedSenderInner<T> {
945    fn drop(&mut self) {
946        // Ordering between variables don't matter here
947        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
948
949        if prev == 1 {
950            self.close_channel();
951        }
952    }
953}
954
955impl<T> fmt::Debug for Sender<T> {
956    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
957        f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
958    }
959}
960
961impl<T> fmt::Debug for UnboundedSender<T> {
962    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
963        f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
964    }
965}
966
967/*
968 *
969 * ===== impl Receiver =====
970 *
971 */
972
973impl<T> Receiver<T> {
974    /// Closes the receiving half of a channel, without dropping it.
975    ///
976    /// This prevents any further messages from being sent on the channel while
977    /// still enabling the receiver to drain messages that are buffered.
978    pub fn close(&mut self) {
979        if let Some(inner) = &mut self.inner {
980            inner.set_closed();
981
982            // Wake up any threads waiting as they'll see that we've closed the
983            // channel and will continue on their merry way.
984            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
985                task.lock().unwrap().notify();
986            }
987        }
988    }
989
990    /// Tries to receive the next message without notifying a context if empty.
991    ///
992    /// It is not recommended to call this function from inside of a future,
993    /// only when you've otherwise arranged to be notified when the channel is
994    /// no longer empty.
995    ///
996    /// This function returns:
997    /// * `Ok(Some(t))` when message is fetched
998    /// * `Ok(None)` when channel is closed and no messages left in the queue
999    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1000    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1001        match self.next_message() {
1002            Poll::Ready(msg) => Ok(msg),
1003            Poll::Pending => Err(TryRecvError { _priv: () }),
1004        }
1005    }
1006
1007    fn next_message(&mut self) -> Poll<Option<T>> {
1008        let inner = match self.inner.as_mut() {
1009            None => return Poll::Ready(None),
1010            Some(inner) => inner,
1011        };
1012        // Pop off a message
1013        match unsafe { inner.message_queue.pop_spin() } {
1014            Some(msg) => {
1015                // If there are any parked task handles in the parked queue,
1016                // pop one and unpark it.
1017                self.unpark_one();
1018
1019                // Decrement number of messages
1020                self.dec_num_messages();
1021
1022                Poll::Ready(Some(msg))
1023            }
1024            None => {
1025                let state = decode_state(inner.state.load(SeqCst));
1026                if state.is_closed() {
1027                    // If closed flag is set AND there are no pending messages
1028                    // it means end of stream
1029                    self.inner = None;
1030                    Poll::Ready(None)
1031                } else {
1032                    // If queue is open, we need to return Pending
1033                    // to be woken up when new messages arrive.
1034                    // If queue is closed but num_messages is non-zero,
1035                    // it means that senders updated the state,
1036                    // but didn't put message to queue yet,
1037                    // so we need to park until sender unparks the task
1038                    // after queueing the message.
1039                    Poll::Pending
1040                }
1041            }
1042        }
1043    }
1044
1045    // Unpark a single task handle if there is one pending in the parked queue
1046    fn unpark_one(&mut self) {
1047        if let Some(inner) = &mut self.inner {
1048            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1049                task.lock().unwrap().notify();
1050            }
1051        }
1052    }
1053
1054    fn dec_num_messages(&self) {
1055        if let Some(inner) = &self.inner {
1056            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1057            // unless there's underflow, and we know there's no underflow
1058            // because number of messages at this point is always > 0.
1059            inner.state.fetch_sub(1, SeqCst);
1060        }
1061    }
1062}
1063
1064// The receiver does not ever take a Pin to the inner T
1065impl<T> Unpin for Receiver<T> {}
1066
1067impl<T> FusedStream for Receiver<T> {
1068    fn is_terminated(&self) -> bool {
1069        self.inner.is_none()
1070    }
1071}
1072
1073impl<T> Stream for Receiver<T> {
1074    type Item = T;
1075
1076    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1077        // Try to read a message off of the message queue.
1078        match self.next_message() {
1079            Poll::Ready(msg) => {
1080                if msg.is_none() {
1081                    self.inner = None;
1082                }
1083                Poll::Ready(msg)
1084            }
1085            Poll::Pending => {
1086                // There are no messages to read, in this case, park.
1087                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1088                // Check queue again after parking to prevent race condition:
1089                // a message could be added to the queue after previous `next_message`
1090                // before `register` call.
1091                self.next_message()
1092            }
1093        }
1094    }
1095
1096    fn size_hint(&self) -> (usize, Option<usize>) {
1097        if let Some(inner) = &self.inner {
1098            decode_state(inner.state.load(SeqCst)).size_hint()
1099        } else {
1100            (0, Some(0))
1101        }
1102    }
1103}
1104
1105impl<T> Drop for Receiver<T> {
1106    fn drop(&mut self) {
1107        // Drain the channel of all pending messages
1108        self.close();
1109        if self.inner.is_some() {
1110            loop {
1111                match self.next_message() {
1112                    Poll::Ready(Some(_)) => {}
1113                    Poll::Ready(None) => break,
1114                    Poll::Pending => {
1115                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1116
1117                        // If the channel is closed, then there is no need to park.
1118                        if state.is_closed() {
1119                            break;
1120                        }
1121
1122                        // TODO: Spinning isn't ideal, it might be worth
1123                        // investigating using a condvar or some other strategy
1124                        // here. That said, if this case is hit, then another thread
1125                        // is about to push the value into the queue and this isn't
1126                        // the only spinlock in the impl right now.
1127                        thread::yield_now();
1128                    }
1129                }
1130            }
1131        }
1132    }
1133}
1134
1135impl<T> fmt::Debug for Receiver<T> {
1136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1137        let closed = if let Some(ref inner) = self.inner {
1138            decode_state(inner.state.load(SeqCst)).is_closed()
1139        } else {
1140            false
1141        };
1142
1143        f.debug_struct("Receiver").field("closed", &closed).finish()
1144    }
1145}
1146
1147impl<T> UnboundedReceiver<T> {
1148    /// Closes the receiving half of a channel, without dropping it.
1149    ///
1150    /// This prevents any further messages from being sent on the channel while
1151    /// still enabling the receiver to drain messages that are buffered.
1152    pub fn close(&mut self) {
1153        if let Some(inner) = &mut self.inner {
1154            inner.set_closed();
1155        }
1156    }
1157
1158    /// Tries to receive the next message without notifying a context if empty.
1159    ///
1160    /// It is not recommended to call this function from inside of a future,
1161    /// only when you've otherwise arranged to be notified when the channel is
1162    /// no longer empty.
1163    ///
1164    /// This function returns:
1165    /// * `Ok(Some(t))` when message is fetched
1166    /// * `Ok(None)` when channel is closed and no messages left in the queue
1167    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1168    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1169        match self.next_message() {
1170            Poll::Ready(msg) => Ok(msg),
1171            Poll::Pending => Err(TryRecvError { _priv: () }),
1172        }
1173    }
1174
1175    fn next_message(&mut self) -> Poll<Option<T>> {
1176        let inner = match self.inner.as_mut() {
1177            None => return Poll::Ready(None),
1178            Some(inner) => inner,
1179        };
1180        // Pop off a message
1181        match unsafe { inner.message_queue.pop_spin() } {
1182            Some(msg) => {
1183                // Decrement number of messages
1184                self.dec_num_messages();
1185
1186                Poll::Ready(Some(msg))
1187            }
1188            None => {
1189                let state = decode_state(inner.state.load(SeqCst));
1190                if state.is_closed() {
1191                    // If closed flag is set AND there are no pending messages
1192                    // it means end of stream
1193                    self.inner = None;
1194                    Poll::Ready(None)
1195                } else {
1196                    // If queue is open, we need to return Pending
1197                    // to be woken up when new messages arrive.
1198                    // If queue is closed but num_messages is non-zero,
1199                    // it means that senders updated the state,
1200                    // but didn't put message to queue yet,
1201                    // so we need to park until sender unparks the task
1202                    // after queueing the message.
1203                    Poll::Pending
1204                }
1205            }
1206        }
1207    }
1208
1209    fn dec_num_messages(&self) {
1210        if let Some(inner) = &self.inner {
1211            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1212            // unless there's underflow, and we know there's no underflow
1213            // because number of messages at this point is always > 0.
1214            inner.state.fetch_sub(1, SeqCst);
1215        }
1216    }
1217}
1218
1219impl<T> FusedStream for UnboundedReceiver<T> {
1220    fn is_terminated(&self) -> bool {
1221        self.inner.is_none()
1222    }
1223}
1224
1225impl<T> Stream for UnboundedReceiver<T> {
1226    type Item = T;
1227
1228    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1229        // Try to read a message off of the message queue.
1230        match self.next_message() {
1231            Poll::Ready(msg) => {
1232                if msg.is_none() {
1233                    self.inner = None;
1234                }
1235                Poll::Ready(msg)
1236            }
1237            Poll::Pending => {
1238                // There are no messages to read, in this case, park.
1239                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1240                // Check queue again after parking to prevent race condition:
1241                // a message could be added to the queue after previous `next_message`
1242                // before `register` call.
1243                self.next_message()
1244            }
1245        }
1246    }
1247
1248    fn size_hint(&self) -> (usize, Option<usize>) {
1249        if let Some(inner) = &self.inner {
1250            decode_state(inner.state.load(SeqCst)).size_hint()
1251        } else {
1252            (0, Some(0))
1253        }
1254    }
1255}
1256
1257impl<T> Drop for UnboundedReceiver<T> {
1258    fn drop(&mut self) {
1259        // Drain the channel of all pending messages
1260        self.close();
1261        if self.inner.is_some() {
1262            loop {
1263                match self.next_message() {
1264                    Poll::Ready(Some(_)) => {}
1265                    Poll::Ready(None) => break,
1266                    Poll::Pending => {
1267                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1268
1269                        // If the channel is closed, then there is no need to park.
1270                        if state.is_closed() {
1271                            break;
1272                        }
1273
1274                        // TODO: Spinning isn't ideal, it might be worth
1275                        // investigating using a condvar or some other strategy
1276                        // here. That said, if this case is hit, then another thread
1277                        // is about to push the value into the queue and this isn't
1278                        // the only spinlock in the impl right now.
1279                        thread::yield_now();
1280                    }
1281                }
1282            }
1283        }
1284    }
1285}
1286
1287impl<T> fmt::Debug for UnboundedReceiver<T> {
1288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1289        let closed = if let Some(ref inner) = self.inner {
1290            decode_state(inner.state.load(SeqCst)).is_closed()
1291        } else {
1292            false
1293        };
1294
1295        f.debug_struct("Receiver").field("closed", &closed).finish()
1296    }
1297}
1298
1299/*
1300 *
1301 * ===== impl Inner =====
1302 *
1303 */
1304
1305impl<T> UnboundedInner<T> {
1306    // Clear `open` flag in the state, keep `num_messages` intact.
1307    fn set_closed(&self) {
1308        let curr = self.state.load(SeqCst);
1309        if !decode_state(curr).is_open {
1310            return;
1311        }
1312
1313        self.state.fetch_and(!OPEN_MASK, SeqCst);
1314    }
1315}
1316
1317impl<T> BoundedInner<T> {
1318    // The return value is such that the total number of messages that can be
1319    // enqueued into the channel will never exceed MAX_CAPACITY
1320    fn max_senders(&self) -> usize {
1321        MAX_CAPACITY - self.buffer
1322    }
1323
1324    // Clear `open` flag in the state, keep `num_messages` intact.
1325    fn set_closed(&self) {
1326        let curr = self.state.load(SeqCst);
1327        if !decode_state(curr).is_open {
1328            return;
1329        }
1330
1331        self.state.fetch_and(!OPEN_MASK, SeqCst);
1332    }
1333}
1334
1335unsafe impl<T: Send> Send for UnboundedInner<T> {}
1336unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1337
1338unsafe impl<T: Send> Send for BoundedInner<T> {}
1339unsafe impl<T: Send> Sync for BoundedInner<T> {}
1340
1341impl State {
1342    fn is_closed(&self) -> bool {
1343        !self.is_open && self.num_messages == 0
1344    }
1345
1346    fn size_hint(&self) -> (usize, Option<usize>) {
1347        if self.is_open {
1348            (self.num_messages, None)
1349        } else {
1350            (self.num_messages, Some(self.num_messages))
1351        }
1352    }
1353}
1354
1355/*
1356 *
1357 * ===== Helpers =====
1358 *
1359 */
1360
1361fn decode_state(num: usize) -> State {
1362    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1363}
1364
1365fn encode_state(state: &State) -> usize {
1366    let mut num = state.num_messages;
1367
1368    if state.is_open {
1369        num |= OPEN_MASK;
1370    }
1371
1372    num
1373}