tokio/sync/mpsc/
bounded.rs

1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7    use crate::sync::mpsc::error::SendTimeoutError;
8    use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23    chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45///     let (tx, _rx) = channel::<i32>(15);
46///     let tx_weak = tx.downgrade();
47///
48///     // Upgrading will succeed because `tx` still exists.
49///     assert!(tx_weak.upgrade().is_some());
50///
51///     // If we drop `tx`, then it will fail.
52///     drop(tx);
53///     assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57    chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68    chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79    chan: &'a chan::Tx<T, Semaphore>,
80    n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96    chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107    /// The channel receiver.
108    chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages.  Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139///     let (tx, mut rx) = mpsc::channel(100);
140///
141///     tokio::spawn(async move {
142///         for i in 0..10 {
143///             if let Err(_) = tx.send(i).await {
144///                 println!("receiver dropped");
145///                 return;
146///             }
147///         }
148///     });
149///
150///     while let Some(i) = rx.recv().await {
151///         println!("got = {}", i);
152///     }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158    let semaphore = Semaphore {
159        semaphore: semaphore::Semaphore::new(buffer),
160        bound: buffer,
161    };
162    let (tx, rx) = chan::channel(semaphore);
163
164    let tx = Sender::new(tx);
165    let rx = Receiver::new(rx);
166
167    (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174    pub(crate) semaphore: semaphore::Semaphore,
175    pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180        Receiver { chan }
181    }
182
183    /// Receives the next value for this receiver.
184    ///
185    /// This method returns `None` if the channel has been closed and there are
186    /// no remaining messages in the channel's buffer. This indicates that no
187    /// further values can ever be received from this `Receiver`. The channel is
188    /// closed when all senders have been dropped, or when [`close`] is called.
189    ///
190    /// If there are no messages in the channel's buffer, but the channel has
191    /// not yet been closed, this method will sleep until a message is sent or
192    /// the channel is closed.  Note that if [`close`] is called, but there are
193    /// still outstanding [`Permits`] from before it was closed, the channel is
194    /// not considered closed by `recv` until the permits are released.
195    ///
196    /// # Cancel safety
197    ///
198    /// This method is cancel safe. If `recv` is used as the event in a
199    /// [`tokio::select!`](crate::select) statement and some other branch
200    /// completes first, it is guaranteed that no messages were received on this
201    /// channel.
202    ///
203    /// [`close`]: Self::close
204    /// [`Permits`]: struct@crate::sync::mpsc::Permit
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use tokio::sync::mpsc;
210    ///
211    /// #[tokio::main]
212    /// async fn main() {
213    ///     let (tx, mut rx) = mpsc::channel(100);
214    ///
215    ///     tokio::spawn(async move {
216    ///         tx.send("hello").await.unwrap();
217    ///     });
218    ///
219    ///     assert_eq!(Some("hello"), rx.recv().await);
220    ///     assert_eq!(None, rx.recv().await);
221    /// }
222    /// ```
223    ///
224    /// Values are buffered:
225    ///
226    /// ```
227    /// use tokio::sync::mpsc;
228    ///
229    /// #[tokio::main]
230    /// async fn main() {
231    ///     let (tx, mut rx) = mpsc::channel(100);
232    ///
233    ///     tx.send("hello").await.unwrap();
234    ///     tx.send("world").await.unwrap();
235    ///
236    ///     assert_eq!(Some("hello"), rx.recv().await);
237    ///     assert_eq!(Some("world"), rx.recv().await);
238    /// }
239    /// ```
240    pub async fn recv(&mut self) -> Option<T> {
241        use std::future::poll_fn;
242        poll_fn(|cx| self.chan.recv(cx)).await
243    }
244
245    /// Receives the next values for this receiver and extends `buffer`.
246    ///
247    /// This method extends `buffer` by no more than a fixed number of values
248    /// as specified by `limit`. If `limit` is zero, the function immediately
249    /// returns `0`. The return value is the number of values added to `buffer`.
250    ///
251    /// For `limit > 0`, if there are no messages in the channel's queue, but
252    /// the channel has not yet been closed, this method will sleep until a
253    /// message is sent or the channel is closed. Note that if [`close`] is
254    /// called, but there are still outstanding [`Permits`] from before it was
255    /// closed, the channel is not considered closed by `recv_many` until the
256    /// permits are released.
257    ///
258    /// For non-zero values of `limit`, this method will never return `0` unless
259    /// the channel has been closed and there are no remaining messages in the
260    /// channel's queue. This indicates that no further values can ever be
261    /// received from this `Receiver`. The channel is closed when all senders
262    /// have been dropped, or when [`close`] is called.
263    ///
264    /// The capacity of `buffer` is increased as needed.
265    ///
266    /// # Cancel safety
267    ///
268    /// This method is cancel safe. If `recv_many` is used as the event in a
269    /// [`tokio::select!`](crate::select) statement and some other branch
270    /// completes first, it is guaranteed that no messages were received on this
271    /// channel.
272    ///
273    /// [`close`]: Self::close
274    /// [`Permits`]: struct@crate::sync::mpsc::Permit
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use tokio::sync::mpsc;
280    ///
281    /// #[tokio::main]
282    /// async fn main() {
283    ///     let mut buffer: Vec<&str> = Vec::with_capacity(2);
284    ///     let limit = 2;
285    ///     let (tx, mut rx) = mpsc::channel(100);
286    ///     let tx2 = tx.clone();
287    ///     tx2.send("first").await.unwrap();
288    ///     tx2.send("second").await.unwrap();
289    ///     tx2.send("third").await.unwrap();
290    ///
291    ///     // Call `recv_many` to receive up to `limit` (2) values.
292    ///     assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293    ///     assert_eq!(vec!["first", "second"], buffer);
294    ///
295    ///     // If the buffer is full, the next call to `recv_many`
296    ///     // reserves additional capacity.
297    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298    ///
299    ///     tokio::spawn(async move {
300    ///         tx.send("fourth").await.unwrap();
301    ///     });
302    ///
303    ///     // 'tx' is dropped, but `recv_many`
304    ///     // is guaranteed not to return 0 as the channel
305    ///     // is not yet closed.
306    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308    ///
309    ///     // Once the last sender is dropped, the channel is
310    ///     // closed and `recv_many` returns 0, capacity unchanged.
311    ///     drop(tx2);
312    ///     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314    /// }
315    /// ```
316    pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317        use std::future::poll_fn;
318        poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319    }
320
321    /// Tries to receive the next value for this receiver.
322    ///
323    /// This method returns the [`Empty`] error if the channel is currently
324    /// empty, but there are still outstanding [senders] or [permits].
325    ///
326    /// This method returns the [`Disconnected`] error if the channel is
327    /// currently empty, and there are no outstanding [senders] or [permits].
328    ///
329    /// Unlike the [`poll_recv`] method, this method will never return an
330    /// [`Empty`] error spuriously.
331    ///
332    /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333    /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334    /// [`poll_recv`]: Self::poll_recv
335    /// [senders]: crate::sync::mpsc::Sender
336    /// [permits]: crate::sync::mpsc::Permit
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// use tokio::sync::mpsc;
342    /// use tokio::sync::mpsc::error::TryRecvError;
343    ///
344    /// #[tokio::main]
345    /// async fn main() {
346    ///     let (tx, mut rx) = mpsc::channel(100);
347    ///
348    ///     tx.send("hello").await.unwrap();
349    ///
350    ///     assert_eq!(Ok("hello"), rx.try_recv());
351    ///     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352    ///
353    ///     tx.send("hello").await.unwrap();
354    ///     // Drop the last sender, closing the channel.
355    ///     drop(tx);
356    ///
357    ///     assert_eq!(Ok("hello"), rx.try_recv());
358    ///     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359    /// }
360    /// ```
361    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362        self.chan.try_recv()
363    }
364
365    /// Blocking receive to call outside of asynchronous contexts.
366    ///
367    /// This method returns `None` if the channel has been closed and there are
368    /// no remaining messages in the channel's buffer. This indicates that no
369    /// further values can ever be received from this `Receiver`. The channel is
370    /// closed when all senders have been dropped, or when [`close`] is called.
371    ///
372    /// If there are no messages in the channel's buffer, but the channel has
373    /// not yet been closed, this method will block until a message is sent or
374    /// the channel is closed.
375    ///
376    /// This method is intended for use cases where you are sending from
377    /// asynchronous code to synchronous code, and will work even if the sender
378    /// is not using [`blocking_send`] to send the message.
379    ///
380    /// Note that if [`close`] is called, but there are still outstanding
381    /// [`Permits`] from before it was closed, the channel is not considered
382    /// closed by `blocking_recv` until the permits are released.
383    ///
384    /// [`close`]: Self::close
385    /// [`Permits`]: struct@crate::sync::mpsc::Permit
386    /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387    ///
388    /// # Panics
389    ///
390    /// This function panics if called within an asynchronous execution
391    /// context.
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// use std::thread;
397    /// use tokio::runtime::Runtime;
398    /// use tokio::sync::mpsc;
399    ///
400    /// fn main() {
401    ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
402    ///
403    ///     let sync_code = thread::spawn(move || {
404    ///         assert_eq!(Some(10), rx.blocking_recv());
405    ///     });
406    ///
407    ///     Runtime::new()
408    ///         .unwrap()
409    ///         .block_on(async move {
410    ///             let _ = tx.send(10).await;
411    ///         });
412    ///     sync_code.join().unwrap()
413    /// }
414    /// ```
415    #[track_caller]
416    #[cfg(feature = "sync")]
417    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418    pub fn blocking_recv(&mut self) -> Option<T> {
419        crate::future::block_on(self.recv())
420    }
421
422    /// Variant of [`Self::recv_many`] for blocking contexts.
423    ///
424    /// The same conditions as in [`Self::blocking_recv`] apply.
425    #[track_caller]
426    #[cfg(feature = "sync")]
427    #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
428    pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
429        crate::future::block_on(self.recv_many(buffer, limit))
430    }
431
432    /// Closes the receiving half of a channel without dropping it.
433    ///
434    /// This prevents any further messages from being sent on the channel while
435    /// still enabling the receiver to drain messages that are buffered. Any
436    /// outstanding [`Permit`] values will still be able to send messages.
437    ///
438    /// To guarantee that no messages are dropped, after calling `close()`,
439    /// `recv()` must be called until `None` is returned. If there are
440    /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
441    /// not return `None` until those are released.
442    ///
443    /// [`Permit`]: Permit
444    /// [`OwnedPermit`]: OwnedPermit
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use tokio::sync::mpsc;
450    ///
451    /// #[tokio::main]
452    /// async fn main() {
453    ///     let (tx, mut rx) = mpsc::channel(20);
454    ///
455    ///     tokio::spawn(async move {
456    ///         let mut i = 0;
457    ///         while let Ok(permit) = tx.reserve().await {
458    ///             permit.send(i);
459    ///             i += 1;
460    ///         }
461    ///     });
462    ///
463    ///     rx.close();
464    ///
465    ///     while let Some(msg) = rx.recv().await {
466    ///         println!("got {}", msg);
467    ///     }
468    ///
469    ///     // Channel closed and no messages are lost.
470    /// }
471    /// ```
472    pub fn close(&mut self) {
473        self.chan.close();
474    }
475
476    /// Checks if a channel is closed.
477    ///
478    /// This method returns `true` if the channel has been closed. The channel is closed
479    /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
480    ///
481    /// [`Sender`]: crate::sync::mpsc::Sender
482    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
483    ///
484    /// # Examples
485    /// ```
486    /// use tokio::sync::mpsc;
487    ///
488    /// #[tokio::main]
489    /// async fn main() {
490    ///     let (_tx, mut rx) = mpsc::channel::<()>(10);
491    ///     assert!(!rx.is_closed());
492    ///
493    ///     rx.close();
494    ///
495    ///     assert!(rx.is_closed());
496    /// }
497    /// ```
498    pub fn is_closed(&self) -> bool {
499        self.chan.is_closed()
500    }
501
502    /// Checks if a channel is empty.
503    ///
504    /// This method returns `true` if the channel has no messages.
505    ///
506    /// # Examples
507    /// ```
508    /// use tokio::sync::mpsc;
509    ///
510    /// #[tokio::main]
511    /// async fn main() {
512    ///     let (tx, rx) = mpsc::channel(10);
513    ///     assert!(rx.is_empty());
514    ///
515    ///     tx.send(0).await.unwrap();
516    ///     assert!(!rx.is_empty());
517    /// }
518    ///
519    /// ```
520    pub fn is_empty(&self) -> bool {
521        self.chan.is_empty()
522    }
523
524    /// Returns the number of messages in the channel.
525    ///
526    /// # Examples
527    /// ```
528    /// use tokio::sync::mpsc;
529    ///
530    /// #[tokio::main]
531    /// async fn main() {
532    ///     let (tx, rx) = mpsc::channel(10);
533    ///     assert_eq!(0, rx.len());
534    ///
535    ///     tx.send(0).await.unwrap();
536    ///     assert_eq!(1, rx.len());
537    /// }
538    /// ```
539    pub fn len(&self) -> usize {
540        self.chan.len()
541    }
542
543    /// Returns the current capacity of the channel.
544    ///
545    /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
546    /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
547    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
548    /// specified when calling [`channel`].
549    ///
550    /// # Examples
551    ///
552    /// ```
553    /// use tokio::sync::mpsc;
554    ///
555    /// #[tokio::main]
556    /// async fn main() {
557    ///     let (tx, mut rx) = mpsc::channel::<()>(5);
558    ///
559    ///     assert_eq!(rx.capacity(), 5);
560    ///
561    ///     // Making a reservation drops the capacity by one.
562    ///     let permit = tx.reserve().await.unwrap();
563    ///     assert_eq!(rx.capacity(), 4);
564    ///     assert_eq!(rx.len(), 0);
565    ///
566    ///     // Sending and receiving a value increases the capacity by one.
567    ///     permit.send(());
568    ///     assert_eq!(rx.len(), 1);
569    ///     rx.recv().await.unwrap();
570    ///     assert_eq!(rx.capacity(), 5);
571    ///
572    ///     // Directly sending a message drops the capacity by one.
573    ///     tx.send(()).await.unwrap();
574    ///     assert_eq!(rx.capacity(), 4);
575    ///     assert_eq!(rx.len(), 1);
576    ///
577    ///     // Receiving the message increases the capacity by one.
578    ///     rx.recv().await.unwrap();
579    ///     assert_eq!(rx.capacity(), 5);
580    ///     assert_eq!(rx.len(), 0);
581    /// }
582    /// ```
583    /// [`capacity`]: Receiver::capacity
584    /// [`max_capacity`]: Receiver::max_capacity
585    pub fn capacity(&self) -> usize {
586        self.chan.semaphore().semaphore.available_permits()
587    }
588
589    /// Returns the maximum buffer capacity of the channel.
590    ///
591    /// The maximum capacity is the buffer capacity initially specified when calling
592    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
593    /// available buffer capacity: as messages are sent and received, the value
594    /// returned by [`capacity`] will go up or down, whereas the value
595    /// returned by [`max_capacity`] will remain constant.
596    ///
597    /// # Examples
598    ///
599    /// ```
600    /// use tokio::sync::mpsc;
601    ///
602    /// #[tokio::main]
603    /// async fn main() {
604    ///     let (tx, rx) = mpsc::channel::<()>(5);
605    ///
606    ///     // both max capacity and capacity are the same at first
607    ///     assert_eq!(rx.max_capacity(), 5);
608    ///     assert_eq!(rx.capacity(), 5);
609    ///
610    ///     // Making a reservation doesn't change the max capacity.
611    ///     let permit = tx.reserve().await.unwrap();
612    ///     assert_eq!(rx.max_capacity(), 5);
613    ///     // but drops the capacity by one
614    ///     assert_eq!(rx.capacity(), 4);
615    /// }
616    /// ```
617    /// [`capacity`]: Receiver::capacity
618    /// [`max_capacity`]: Receiver::max_capacity
619    pub fn max_capacity(&self) -> usize {
620        self.chan.semaphore().bound
621    }
622
623    /// Polls to receive the next message on this channel.
624    ///
625    /// This method returns:
626    ///
627    ///  * `Poll::Pending` if no messages are available but the channel is not
628    ///    closed, or if a spurious failure happens.
629    ///  * `Poll::Ready(Some(message))` if a message is available.
630    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
631    ///    sent before it was closed have been received.
632    ///
633    /// When the method returns `Poll::Pending`, the `Waker` in the provided
634    /// `Context` is scheduled to receive a wakeup when a message is sent on any
635    /// receiver, or when the channel is closed.  Note that on multiple calls to
636    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
637    /// passed to the most recent call is scheduled to receive a wakeup.
638    ///
639    /// If this method returns `Poll::Pending` due to a spurious failure, then
640    /// the `Waker` will be notified when the situation causing the spurious
641    /// failure has been resolved. Note that receiving such a wakeup does not
642    /// guarantee that the next call will succeed — it could fail with another
643    /// spurious failure.
644    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
645        self.chan.recv(cx)
646    }
647
648    /// Polls to receive multiple messages on this channel, extending the provided buffer.
649    ///
650    /// This method returns:
651    /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
652    ///   spurious failure happens.
653    /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
654    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
655    /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
656    ///
657    /// When the method returns `Poll::Pending`, the `Waker` in the provided
658    /// `Context` is scheduled to receive a wakeup when a message is sent on any
659    /// receiver, or when the channel is closed.  Note that on multiple calls to
660    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
661    /// passed to the most recent call is scheduled to receive a wakeup.
662    ///
663    /// Note that this method does not guarantee that exactly `limit` messages
664    /// are received. Rather, if at least one message is available, it returns
665    /// as many messages as it can up to the given limit. This method returns
666    /// zero only if the channel is closed (or if `limit` is zero).
667    ///
668    /// # Examples
669    ///
670    /// ```
671    /// use std::task::{Context, Poll};
672    /// use std::pin::Pin;
673    /// use tokio::sync::mpsc;
674    /// use futures::Future;
675    ///
676    /// struct MyReceiverFuture<'a> {
677    ///     receiver: mpsc::Receiver<i32>,
678    ///     buffer: &'a mut Vec<i32>,
679    ///     limit: usize,
680    /// }
681    ///
682    /// impl<'a> Future for MyReceiverFuture<'a> {
683    ///     type Output = usize; // Number of messages received
684    ///
685    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
686    ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
687    ///
688    ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
689    ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
690    ///             Poll::Pending => Poll::Pending,
691    ///             Poll::Ready(count) => Poll::Ready(count),
692    ///         }
693    ///     }
694    /// }
695    ///
696    /// #[tokio::main]
697    /// async fn main() {
698    ///     let (tx, rx) = mpsc::channel(32);
699    ///     let mut buffer = Vec::new();
700    ///
701    ///     let my_receiver_future = MyReceiverFuture {
702    ///         receiver: rx,
703    ///         buffer: &mut buffer,
704    ///         limit: 3,
705    ///     };
706    ///
707    ///     for i in 0..10 {
708    ///         tx.send(i).await.unwrap();
709    ///     }
710    ///
711    ///     let count = my_receiver_future.await;
712    ///     assert_eq!(count, 3);
713    ///     assert_eq!(buffer, vec![0,1,2])
714    /// }
715    /// ```
716    pub fn poll_recv_many(
717        &mut self,
718        cx: &mut Context<'_>,
719        buffer: &mut Vec<T>,
720        limit: usize,
721    ) -> Poll<usize> {
722        self.chan.recv_many(cx, buffer, limit)
723    }
724
725    /// Returns the number of [`Sender`] handles.
726    pub fn sender_strong_count(&self) -> usize {
727        self.chan.sender_strong_count()
728    }
729
730    /// Returns the number of [`WeakSender`] handles.
731    pub fn sender_weak_count(&self) -> usize {
732        self.chan.sender_weak_count()
733    }
734}
735
736impl<T> fmt::Debug for Receiver<T> {
737    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
738        fmt.debug_struct("Receiver")
739            .field("chan", &self.chan)
740            .finish()
741    }
742}
743
744impl<T> Unpin for Receiver<T> {}
745
746impl<T> Sender<T> {
747    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
748        Sender { chan }
749    }
750
751    /// Sends a value, waiting until there is capacity.
752    ///
753    /// A successful send occurs when it is determined that the other end of the
754    /// channel has not hung up already. An unsuccessful send would be one where
755    /// the corresponding receiver has already been closed. Note that a return
756    /// value of `Err` means that the data will never be received, but a return
757    /// value of `Ok` does not mean that the data will be received. It is
758    /// possible for the corresponding receiver to hang up immediately after
759    /// this function returns `Ok`.
760    ///
761    /// # Errors
762    ///
763    /// If the receive half of the channel is closed, either due to [`close`]
764    /// being called or the [`Receiver`] handle dropping, the function returns
765    /// an error. The error includes the value passed to `send`.
766    ///
767    /// [`close`]: Receiver::close
768    /// [`Receiver`]: Receiver
769    ///
770    /// # Cancel safety
771    ///
772    /// If `send` is used as the event in a [`tokio::select!`](crate::select)
773    /// statement and some other branch completes first, then it is guaranteed
774    /// that the message was not sent. **However, in that case, the message
775    /// is dropped and will be lost.**
776    ///
777    /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
778    /// capacity, then use the returned [`Permit`] to send the message.
779    ///
780    /// This channel uses a queue to ensure that calls to `send` and `reserve`
781    /// complete in the order they were requested.  Cancelling a call to
782    /// `send` makes you lose your place in the queue.
783    ///
784    /// # Examples
785    ///
786    /// In the following example, each call to `send` will block until the
787    /// previously sent value was received.
788    ///
789    /// ```rust
790    /// use tokio::sync::mpsc;
791    ///
792    /// #[tokio::main]
793    /// async fn main() {
794    ///     let (tx, mut rx) = mpsc::channel(1);
795    ///
796    ///     tokio::spawn(async move {
797    ///         for i in 0..10 {
798    ///             if let Err(_) = tx.send(i).await {
799    ///                 println!("receiver dropped");
800    ///                 return;
801    ///             }
802    ///         }
803    ///     });
804    ///
805    ///     while let Some(i) = rx.recv().await {
806    ///         println!("got = {}", i);
807    ///     }
808    /// }
809    /// ```
810    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
811        match self.reserve().await {
812            Ok(permit) => {
813                permit.send(value);
814                Ok(())
815            }
816            Err(_) => Err(SendError(value)),
817        }
818    }
819
820    /// Completes when the receiver has dropped.
821    ///
822    /// This allows the producers to get notified when interest in the produced
823    /// values is canceled and immediately stop doing work.
824    ///
825    /// # Cancel safety
826    ///
827    /// This method is cancel safe. Once the channel is closed, it stays closed
828    /// forever and all future calls to `closed` will return immediately.
829    ///
830    /// # Examples
831    ///
832    /// ```
833    /// use tokio::sync::mpsc;
834    ///
835    /// #[tokio::main]
836    /// async fn main() {
837    ///     let (tx1, rx) = mpsc::channel::<()>(1);
838    ///     let tx2 = tx1.clone();
839    ///     let tx3 = tx1.clone();
840    ///     let tx4 = tx1.clone();
841    ///     let tx5 = tx1.clone();
842    ///     tokio::spawn(async move {
843    ///         drop(rx);
844    ///     });
845    ///
846    ///     futures::join!(
847    ///         tx1.closed(),
848    ///         tx2.closed(),
849    ///         tx3.closed(),
850    ///         tx4.closed(),
851    ///         tx5.closed()
852    ///     );
853    ///     println!("Receiver dropped");
854    /// }
855    /// ```
856    pub async fn closed(&self) {
857        self.chan.closed().await;
858    }
859
860    /// Attempts to immediately send a message on this `Sender`
861    ///
862    /// This method differs from [`send`] by returning immediately if the channel's
863    /// buffer is full or no receiver is waiting to acquire some data. Compared
864    /// with [`send`], this function has two failure cases instead of one (one for
865    /// disconnection, one for a full buffer).
866    ///
867    /// # Errors
868    ///
869    /// If the channel capacity has been reached, i.e., the channel has `n`
870    /// buffered values where `n` is the argument passed to [`channel`], then an
871    /// error is returned.
872    ///
873    /// If the receive half of the channel is closed, either due to [`close`]
874    /// being called or the [`Receiver`] handle dropping, the function returns
875    /// an error. The error includes the value passed to `send`.
876    ///
877    /// [`send`]: Sender::send
878    /// [`channel`]: channel
879    /// [`close`]: Receiver::close
880    ///
881    /// # Examples
882    ///
883    /// ```
884    /// use tokio::sync::mpsc;
885    ///
886    /// #[tokio::main]
887    /// async fn main() {
888    ///     // Create a channel with buffer size 1
889    ///     let (tx1, mut rx) = mpsc::channel(1);
890    ///     let tx2 = tx1.clone();
891    ///
892    ///     tokio::spawn(async move {
893    ///         tx1.send(1).await.unwrap();
894    ///         tx1.send(2).await.unwrap();
895    ///         // task waits until the receiver receives a value.
896    ///     });
897    ///
898    ///     tokio::spawn(async move {
899    ///         // This will return an error and send
900    ///         // no message if the buffer is full
901    ///         let _ = tx2.try_send(3);
902    ///     });
903    ///
904    ///     let mut msg;
905    ///     msg = rx.recv().await.unwrap();
906    ///     println!("message {} received", msg);
907    ///
908    ///     msg = rx.recv().await.unwrap();
909    ///     println!("message {} received", msg);
910    ///
911    ///     // Third message may have never been sent
912    ///     match rx.recv().await {
913    ///         Some(msg) => println!("message {} received", msg),
914    ///         None => println!("the third message was never sent"),
915    ///     }
916    /// }
917    /// ```
918    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
919        match self.chan.semaphore().semaphore.try_acquire(1) {
920            Ok(()) => {}
921            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
922            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
923        }
924
925        // Send the message
926        self.chan.send(message);
927        Ok(())
928    }
929
930    /// Sends a value, waiting until there is capacity, but only for a limited time.
931    ///
932    /// Shares the same success and error conditions as [`send`], adding one more
933    /// condition for an unsuccessful send, which is when the provided timeout has
934    /// elapsed, and there is no capacity available.
935    ///
936    /// [`send`]: Sender::send
937    ///
938    /// # Errors
939    ///
940    /// If the receive half of the channel is closed, either due to [`close`]
941    /// being called or the [`Receiver`] having been dropped,
942    /// the function returns an error. The error includes the value passed to `send`.
943    ///
944    /// [`close`]: Receiver::close
945    /// [`Receiver`]: Receiver
946    ///
947    /// # Panics
948    ///
949    /// This function panics if it is called outside the context of a Tokio
950    /// runtime [with time enabled](crate::runtime::Builder::enable_time).
951    ///
952    /// # Examples
953    ///
954    /// In the following example, each call to `send_timeout` will block until the
955    /// previously sent value was received, unless the timeout has elapsed.
956    ///
957    /// ```rust
958    /// use tokio::sync::mpsc;
959    /// use tokio::time::{sleep, Duration};
960    ///
961    /// #[tokio::main]
962    /// async fn main() {
963    ///     let (tx, mut rx) = mpsc::channel(1);
964    ///
965    ///     tokio::spawn(async move {
966    ///         for i in 0..10 {
967    ///             if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
968    ///                 println!("send error: #{:?}", e);
969    ///                 return;
970    ///             }
971    ///         }
972    ///     });
973    ///
974    ///     while let Some(i) = rx.recv().await {
975    ///         println!("got = {}", i);
976    ///         sleep(Duration::from_millis(200)).await;
977    ///     }
978    /// }
979    /// ```
980    #[cfg(feature = "time")]
981    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
982    pub async fn send_timeout(
983        &self,
984        value: T,
985        timeout: Duration,
986    ) -> Result<(), SendTimeoutError<T>> {
987        let permit = match crate::time::timeout(timeout, self.reserve()).await {
988            Err(_) => {
989                return Err(SendTimeoutError::Timeout(value));
990            }
991            Ok(Err(_)) => {
992                return Err(SendTimeoutError::Closed(value));
993            }
994            Ok(Ok(permit)) => permit,
995        };
996
997        permit.send(value);
998        Ok(())
999    }
1000
1001    /// Blocking send to call outside of asynchronous contexts.
1002    ///
1003    /// This method is intended for use cases where you are sending from
1004    /// synchronous code to asynchronous code, and will work even if the
1005    /// receiver is not using [`blocking_recv`] to receive the message.
1006    ///
1007    /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1008    ///
1009    /// # Panics
1010    ///
1011    /// This function panics if called within an asynchronous execution
1012    /// context.
1013    ///
1014    /// # Examples
1015    ///
1016    /// ```
1017    /// use std::thread;
1018    /// use tokio::runtime::Runtime;
1019    /// use tokio::sync::mpsc;
1020    ///
1021    /// fn main() {
1022    ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
1023    ///
1024    ///     let sync_code = thread::spawn(move || {
1025    ///         tx.blocking_send(10).unwrap();
1026    ///     });
1027    ///
1028    ///     Runtime::new().unwrap().block_on(async move {
1029    ///         assert_eq!(Some(10), rx.recv().await);
1030    ///     });
1031    ///     sync_code.join().unwrap()
1032    /// }
1033    /// ```
1034    #[track_caller]
1035    #[cfg(feature = "sync")]
1036    #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1037    pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1038        crate::future::block_on(self.send(value))
1039    }
1040
1041    /// Checks if the channel has been closed. This happens when the
1042    /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1043    /// called.
1044    ///
1045    /// [`Receiver`]: crate::sync::mpsc::Receiver
1046    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1047    ///
1048    /// ```
1049    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1050    /// assert!(!tx.is_closed());
1051    ///
1052    /// let tx2 = tx.clone();
1053    /// assert!(!tx2.is_closed());
1054    ///
1055    /// drop(rx);
1056    /// assert!(tx.is_closed());
1057    /// assert!(tx2.is_closed());
1058    /// ```
1059    pub fn is_closed(&self) -> bool {
1060        self.chan.is_closed()
1061    }
1062
1063    /// Waits for channel capacity. Once capacity to send one message is
1064    /// available, it is reserved for the caller.
1065    ///
1066    /// If the channel is full, the function waits for the number of unreceived
1067    /// messages to become less than the channel capacity. Capacity to send one
1068    /// message is reserved for the caller. A [`Permit`] is returned to track
1069    /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1070    /// reserved capacity.
1071    ///
1072    /// Dropping [`Permit`] without sending a message releases the capacity back
1073    /// to the channel.
1074    ///
1075    /// [`Permit`]: Permit
1076    /// [`send`]: Permit::send
1077    ///
1078    /// # Cancel safety
1079    ///
1080    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1081    /// complete in the order they were requested.  Cancelling a call to
1082    /// `reserve` makes you lose your place in the queue.
1083    ///
1084    /// # Examples
1085    ///
1086    /// ```
1087    /// use tokio::sync::mpsc;
1088    ///
1089    /// #[tokio::main]
1090    /// async fn main() {
1091    ///     let (tx, mut rx) = mpsc::channel(1);
1092    ///
1093    ///     // Reserve capacity
1094    ///     let permit = tx.reserve().await.unwrap();
1095    ///
1096    ///     // Trying to send directly on the `tx` will fail due to no
1097    ///     // available capacity.
1098    ///     assert!(tx.try_send(123).is_err());
1099    ///
1100    ///     // Sending on the permit succeeds
1101    ///     permit.send(456);
1102    ///
1103    ///     // The value sent on the permit is received
1104    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1105    /// }
1106    /// ```
1107    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1108        self.reserve_inner(1).await?;
1109        Ok(Permit { chan: &self.chan })
1110    }
1111
1112    /// Waits for channel capacity. Once capacity to send `n` messages is
1113    /// available, it is reserved for the caller.
1114    ///
1115    /// If the channel is full or if there are fewer than `n` permits available, the function waits
1116    /// for the number of unreceived messages to become `n` less than the channel capacity.
1117    /// Capacity to send `n` message is then reserved for the caller.
1118    ///
1119    /// A [`PermitIterator`] is returned to track the reserved capacity.
1120    /// You can call this [`Iterator`] until it is exhausted to
1121    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1122    /// [`try_reserve_many`] except it awaits for the slots to become available.
1123    ///
1124    /// If the channel is closed, the function returns a [`SendError`].
1125    ///
1126    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1127    /// permits back to the channel.
1128    ///
1129    /// [`PermitIterator`]: PermitIterator
1130    /// [`Permit`]: Permit
1131    /// [`send`]: Permit::send
1132    /// [`try_reserve_many`]: Sender::try_reserve_many
1133    ///
1134    /// # Cancel safety
1135    ///
1136    /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1137    /// complete in the order they were requested. Cancelling a call to
1138    /// `reserve_many` makes you lose your place in the queue.
1139    ///
1140    /// # Examples
1141    ///
1142    /// ```
1143    /// use tokio::sync::mpsc;
1144    ///
1145    /// #[tokio::main]
1146    /// async fn main() {
1147    ///     let (tx, mut rx) = mpsc::channel(2);
1148    ///
1149    ///     // Reserve capacity
1150    ///     let mut permit = tx.reserve_many(2).await.unwrap();
1151    ///
1152    ///     // Trying to send directly on the `tx` will fail due to no
1153    ///     // available capacity.
1154    ///     assert!(tx.try_send(123).is_err());
1155    ///
1156    ///     // Sending with the permit iterator succeeds
1157    ///     permit.next().unwrap().send(456);
1158    ///     permit.next().unwrap().send(457);
1159    ///
1160    ///     // The iterator should now be exhausted
1161    ///     assert!(permit.next().is_none());
1162    ///
1163    ///     // The value sent on the permit is received
1164    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1165    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1166    /// }
1167    /// ```
1168    pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1169        self.reserve_inner(n).await?;
1170        Ok(PermitIterator {
1171            chan: &self.chan,
1172            n,
1173        })
1174    }
1175
1176    /// Waits for channel capacity, moving the `Sender` and returning an owned
1177    /// permit. Once capacity to send one message is available, it is reserved
1178    /// for the caller.
1179    ///
1180    /// This moves the sender _by value_, and returns an owned permit that can
1181    /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1182    /// this method may be used in cases where the permit must be valid for the
1183    /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1184    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1185    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1186    /// moved, it can be cloned prior to calling `reserve_owned`.
1187    ///
1188    /// If the channel is full, the function waits for the number of unreceived
1189    /// messages to become less than the channel capacity. Capacity to send one
1190    /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1191    /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1192    /// consumes the reserved capacity.
1193    ///
1194    /// Dropping the [`OwnedPermit`] without sending a message releases the
1195    /// capacity back to the channel.
1196    ///
1197    /// # Cancel safety
1198    ///
1199    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1200    /// complete in the order they were requested.  Cancelling a call to
1201    /// `reserve_owned` makes you lose your place in the queue.
1202    ///
1203    /// # Examples
1204    /// Sending a message using an [`OwnedPermit`]:
1205    /// ```
1206    /// use tokio::sync::mpsc;
1207    ///
1208    /// #[tokio::main]
1209    /// async fn main() {
1210    ///     let (tx, mut rx) = mpsc::channel(1);
1211    ///
1212    ///     // Reserve capacity, moving the sender.
1213    ///     let permit = tx.reserve_owned().await.unwrap();
1214    ///
1215    ///     // Send a message, consuming the permit and returning
1216    ///     // the moved sender.
1217    ///     let tx = permit.send(123);
1218    ///
1219    ///     // The value sent on the permit is received.
1220    ///     assert_eq!(rx.recv().await.unwrap(), 123);
1221    ///
1222    ///     // The sender can now be used again.
1223    ///     tx.send(456).await.unwrap();
1224    /// }
1225    /// ```
1226    ///
1227    /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1228    /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1229    ///
1230    /// ```
1231    /// use tokio::sync::mpsc;
1232    ///
1233    /// #[tokio::main]
1234    /// async fn main() {
1235    ///     let (tx, mut rx) = mpsc::channel(1);
1236    ///
1237    ///     // Clone the sender and reserve capacity.
1238    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1239    ///
1240    ///     // Trying to send directly on the `tx` will fail due to no
1241    ///     // available capacity.
1242    ///     assert!(tx.try_send(123).is_err());
1243    ///
1244    ///     // Sending on the permit succeeds.
1245    ///     permit.send(456);
1246    ///
1247    ///     // The value sent on the permit is received
1248    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1249    /// }
1250    /// ```
1251    ///
1252    /// [`Sender::reserve`]: Sender::reserve
1253    /// [`OwnedPermit`]: OwnedPermit
1254    /// [`send`]: OwnedPermit::send
1255    /// [`Arc::clone`]: std::sync::Arc::clone
1256    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1257        self.reserve_inner(1).await?;
1258        Ok(OwnedPermit {
1259            chan: Some(self.chan),
1260        })
1261    }
1262
1263    async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1264        crate::trace::async_trace_leaf().await;
1265
1266        if n > self.max_capacity() {
1267            return Err(SendError(()));
1268        }
1269        match self.chan.semaphore().semaphore.acquire(n).await {
1270            Ok(()) => Ok(()),
1271            Err(_) => Err(SendError(())),
1272        }
1273    }
1274
1275    /// Tries to acquire a slot in the channel without waiting for the slot to become
1276    /// available.
1277    ///
1278    /// If the channel is full this function will return [`TrySendError`], otherwise
1279    /// if there is a slot available it will return a [`Permit`] that will then allow you
1280    /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1281    /// [`reserve`] except it does not await for the slot to become available.
1282    ///
1283    /// Dropping [`Permit`] without sending a message releases the capacity back
1284    /// to the channel.
1285    ///
1286    /// [`Permit`]: Permit
1287    /// [`send`]: Permit::send
1288    /// [`reserve`]: Sender::reserve
1289    ///
1290    /// # Examples
1291    ///
1292    /// ```
1293    /// use tokio::sync::mpsc;
1294    ///
1295    /// #[tokio::main]
1296    /// async fn main() {
1297    ///     let (tx, mut rx) = mpsc::channel(1);
1298    ///
1299    ///     // Reserve capacity
1300    ///     let permit = tx.try_reserve().unwrap();
1301    ///
1302    ///     // Trying to send directly on the `tx` will fail due to no
1303    ///     // available capacity.
1304    ///     assert!(tx.try_send(123).is_err());
1305    ///
1306    ///     // Trying to reserve an additional slot on the `tx` will
1307    ///     // fail because there is no capacity.
1308    ///     assert!(tx.try_reserve().is_err());
1309    ///
1310    ///     // Sending on the permit succeeds
1311    ///     permit.send(456);
1312    ///
1313    ///     // The value sent on the permit is received
1314    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1315    ///
1316    /// }
1317    /// ```
1318    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1319        match self.chan.semaphore().semaphore.try_acquire(1) {
1320            Ok(()) => {}
1321            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1322            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1323        }
1324
1325        Ok(Permit { chan: &self.chan })
1326    }
1327
1328    /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1329    /// available.
1330    ///
1331    /// A [`PermitIterator`] is returned to track the reserved capacity.
1332    /// You can call this [`Iterator`] until it is exhausted to
1333    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1334    /// [`reserve_many`] except it does not await for the slots to become available.
1335    ///
1336    /// If there are fewer than `n` permits available on the channel, then
1337    /// this function will return a [`TrySendError::Full`]. If the channel is closed
1338    /// this function will return a [`TrySendError::Closed`].
1339    ///
1340    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1341    /// permits back to the channel.
1342    ///
1343    /// [`PermitIterator`]: PermitIterator
1344    /// [`send`]: Permit::send
1345    /// [`reserve_many`]: Sender::reserve_many
1346    ///
1347    /// # Examples
1348    ///
1349    /// ```
1350    /// use tokio::sync::mpsc;
1351    ///
1352    /// #[tokio::main]
1353    /// async fn main() {
1354    ///     let (tx, mut rx) = mpsc::channel(2);
1355    ///
1356    ///     // Reserve capacity
1357    ///     let mut permit = tx.try_reserve_many(2).unwrap();
1358    ///
1359    ///     // Trying to send directly on the `tx` will fail due to no
1360    ///     // available capacity.
1361    ///     assert!(tx.try_send(123).is_err());
1362    ///
1363    ///     // Trying to reserve an additional slot on the `tx` will
1364    ///     // fail because there is no capacity.
1365    ///     assert!(tx.try_reserve().is_err());
1366    ///
1367    ///     // Sending with the permit iterator succeeds
1368    ///     permit.next().unwrap().send(456);
1369    ///     permit.next().unwrap().send(457);
1370    ///
1371    ///     // The iterator should now be exhausted
1372    ///     assert!(permit.next().is_none());
1373    ///
1374    ///     // The value sent on the permit is received
1375    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1376    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1377    ///
1378    ///     // Trying to call try_reserve_many with 0 will return an empty iterator
1379    ///     let mut permit = tx.try_reserve_many(0).unwrap();
1380    ///     assert!(permit.next().is_none());
1381    ///
1382    ///     // Trying to call try_reserve_many with a number greater than the channel
1383    ///     // capacity will return an error
1384    ///     let permit = tx.try_reserve_many(3);
1385    ///     assert!(permit.is_err());
1386    ///
1387    ///     // Trying to call try_reserve_many on a closed channel will return an error
1388    ///     drop(rx);
1389    ///     let permit = tx.try_reserve_many(1);
1390    ///     assert!(permit.is_err());
1391    ///
1392    ///     let permit = tx.try_reserve_many(0);
1393    ///     assert!(permit.is_err());
1394    /// }
1395    /// ```
1396    pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1397        if n > self.max_capacity() {
1398            return Err(TrySendError::Full(()));
1399        }
1400
1401        match self.chan.semaphore().semaphore.try_acquire(n) {
1402            Ok(()) => {}
1403            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1404            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1405        }
1406
1407        Ok(PermitIterator {
1408            chan: &self.chan,
1409            n,
1410        })
1411    }
1412
1413    /// Tries to acquire a slot in the channel without waiting for the slot to become
1414    /// available, returning an owned permit.
1415    ///
1416    /// This moves the sender _by value_, and returns an owned permit that can
1417    /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1418    /// this method may be used in cases where the permit must be valid for the
1419    /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1420    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1421    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1422    /// moved, it can be cloned prior to calling `try_reserve_owned`.
1423    ///
1424    /// If the channel is full this function will return a [`TrySendError`].
1425    /// Since the sender is taken by value, the `TrySendError` returned in this
1426    /// case contains the sender, so that it may be used again. Otherwise, if
1427    /// there is a slot available, this method will return an [`OwnedPermit`]
1428    /// that can then be used to [`send`] on the channel with a guaranteed slot.
1429    /// This function is similar to  [`reserve_owned`] except it does not await
1430    /// for the slot to become available.
1431    ///
1432    /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1433    /// to the channel.
1434    ///
1435    /// [`OwnedPermit`]: OwnedPermit
1436    /// [`send`]: OwnedPermit::send
1437    /// [`reserve_owned`]: Sender::reserve_owned
1438    /// [`Arc::clone`]: std::sync::Arc::clone
1439    ///
1440    /// # Examples
1441    ///
1442    /// ```
1443    /// use tokio::sync::mpsc;
1444    ///
1445    /// #[tokio::main]
1446    /// async fn main() {
1447    ///     let (tx, mut rx) = mpsc::channel(1);
1448    ///
1449    ///     // Reserve capacity
1450    ///     let permit = tx.clone().try_reserve_owned().unwrap();
1451    ///
1452    ///     // Trying to send directly on the `tx` will fail due to no
1453    ///     // available capacity.
1454    ///     assert!(tx.try_send(123).is_err());
1455    ///
1456    ///     // Trying to reserve an additional slot on the `tx` will
1457    ///     // fail because there is no capacity.
1458    ///     assert!(tx.try_reserve().is_err());
1459    ///
1460    ///     // Sending on the permit succeeds
1461    ///     permit.send(456);
1462    ///
1463    ///     // The value sent on the permit is received
1464    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1465    ///
1466    /// }
1467    /// ```
1468    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1469        match self.chan.semaphore().semaphore.try_acquire(1) {
1470            Ok(()) => {}
1471            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1472            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1473        }
1474
1475        Ok(OwnedPermit {
1476            chan: Some(self.chan),
1477        })
1478    }
1479
1480    /// Returns `true` if senders belong to the same channel.
1481    ///
1482    /// # Examples
1483    ///
1484    /// ```
1485    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1486    /// let  tx2 = tx.clone();
1487    /// assert!(tx.same_channel(&tx2));
1488    ///
1489    /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1490    /// assert!(!tx3.same_channel(&tx2));
1491    /// ```
1492    pub fn same_channel(&self, other: &Self) -> bool {
1493        self.chan.same_channel(&other.chan)
1494    }
1495
1496    /// Returns the current capacity of the channel.
1497    ///
1498    /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1499    /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1500    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1501    /// specified when calling [`channel`]
1502    ///
1503    /// # Examples
1504    ///
1505    /// ```
1506    /// use tokio::sync::mpsc;
1507    ///
1508    /// #[tokio::main]
1509    /// async fn main() {
1510    ///     let (tx, mut rx) = mpsc::channel::<()>(5);
1511    ///
1512    ///     assert_eq!(tx.capacity(), 5);
1513    ///
1514    ///     // Making a reservation drops the capacity by one.
1515    ///     let permit = tx.reserve().await.unwrap();
1516    ///     assert_eq!(tx.capacity(), 4);
1517    ///
1518    ///     // Sending and receiving a value increases the capacity by one.
1519    ///     permit.send(());
1520    ///     rx.recv().await.unwrap();
1521    ///     assert_eq!(tx.capacity(), 5);
1522    /// }
1523    /// ```
1524    ///
1525    /// [`send`]: Sender::send
1526    /// [`reserve`]: Sender::reserve
1527    /// [`channel`]: channel
1528    /// [`max_capacity`]: Sender::max_capacity
1529    pub fn capacity(&self) -> usize {
1530        self.chan.semaphore().semaphore.available_permits()
1531    }
1532
1533    /// Converts the `Sender` to a [`WeakSender`] that does not count
1534    /// towards RAII semantics, i.e. if all `Sender` instances of the
1535    /// channel were dropped and only `WeakSender` instances remain,
1536    /// the channel is closed.
1537    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1538    pub fn downgrade(&self) -> WeakSender<T> {
1539        WeakSender {
1540            chan: self.chan.downgrade(),
1541        }
1542    }
1543
1544    /// Returns the maximum buffer capacity of the channel.
1545    ///
1546    /// The maximum capacity is the buffer capacity initially specified when calling
1547    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1548    /// available buffer capacity: as messages are sent and received, the
1549    /// value returned by [`capacity`] will go up or down, whereas the value
1550    /// returned by [`max_capacity`] will remain constant.
1551    ///
1552    /// # Examples
1553    ///
1554    /// ```
1555    /// use tokio::sync::mpsc;
1556    ///
1557    /// #[tokio::main]
1558    /// async fn main() {
1559    ///     let (tx, _rx) = mpsc::channel::<()>(5);
1560    ///
1561    ///     // both max capacity and capacity are the same at first
1562    ///     assert_eq!(tx.max_capacity(), 5);
1563    ///     assert_eq!(tx.capacity(), 5);
1564    ///
1565    ///     // Making a reservation doesn't change the max capacity.
1566    ///     let permit = tx.reserve().await.unwrap();
1567    ///     assert_eq!(tx.max_capacity(), 5);
1568    ///     // but drops the capacity by one
1569    ///     assert_eq!(tx.capacity(), 4);
1570    /// }
1571    /// ```
1572    ///
1573    /// [`channel`]: channel
1574    /// [`max_capacity`]: Sender::max_capacity
1575    /// [`capacity`]: Sender::capacity
1576    pub fn max_capacity(&self) -> usize {
1577        self.chan.semaphore().bound
1578    }
1579
1580    /// Returns the number of [`Sender`] handles.
1581    pub fn strong_count(&self) -> usize {
1582        self.chan.strong_count()
1583    }
1584
1585    /// Returns the number of [`WeakSender`] handles.
1586    pub fn weak_count(&self) -> usize {
1587        self.chan.weak_count()
1588    }
1589}
1590
1591impl<T> Clone for Sender<T> {
1592    fn clone(&self) -> Self {
1593        Sender {
1594            chan: self.chan.clone(),
1595        }
1596    }
1597}
1598
1599impl<T> fmt::Debug for Sender<T> {
1600    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1601        fmt.debug_struct("Sender")
1602            .field("chan", &self.chan)
1603            .finish()
1604    }
1605}
1606
1607impl<T> Clone for WeakSender<T> {
1608    fn clone(&self) -> Self {
1609        self.chan.increment_weak_count();
1610
1611        WeakSender {
1612            chan: self.chan.clone(),
1613        }
1614    }
1615}
1616
1617impl<T> Drop for WeakSender<T> {
1618    fn drop(&mut self) {
1619        self.chan.decrement_weak_count();
1620    }
1621}
1622
1623impl<T> WeakSender<T> {
1624    /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1625    /// if there are other `Sender` instances alive and the channel wasn't
1626    /// previously dropped, otherwise `None` is returned.
1627    pub fn upgrade(&self) -> Option<Sender<T>> {
1628        chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1629    }
1630
1631    /// Returns the number of [`Sender`] handles.
1632    pub fn strong_count(&self) -> usize {
1633        self.chan.strong_count()
1634    }
1635
1636    /// Returns the number of [`WeakSender`] handles.
1637    pub fn weak_count(&self) -> usize {
1638        self.chan.weak_count()
1639    }
1640}
1641
1642impl<T> fmt::Debug for WeakSender<T> {
1643    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1644        fmt.debug_struct("WeakSender").finish()
1645    }
1646}
1647
1648// ===== impl Permit =====
1649
1650impl<T> Permit<'_, T> {
1651    /// Sends a value using the reserved capacity.
1652    ///
1653    /// Capacity for the message has already been reserved. The message is sent
1654    /// to the receiver and the permit is consumed. The operation will succeed
1655    /// even if the receiver half has been closed. See [`Receiver::close`] for
1656    /// more details on performing a clean shutdown.
1657    ///
1658    /// [`Receiver::close`]: Receiver::close
1659    ///
1660    /// # Examples
1661    ///
1662    /// ```
1663    /// use tokio::sync::mpsc;
1664    ///
1665    /// #[tokio::main]
1666    /// async fn main() {
1667    ///     let (tx, mut rx) = mpsc::channel(1);
1668    ///
1669    ///     // Reserve capacity
1670    ///     let permit = tx.reserve().await.unwrap();
1671    ///
1672    ///     // Trying to send directly on the `tx` will fail due to no
1673    ///     // available capacity.
1674    ///     assert!(tx.try_send(123).is_err());
1675    ///
1676    ///     // Send a message on the permit
1677    ///     permit.send(456);
1678    ///
1679    ///     // The value sent on the permit is received
1680    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1681    /// }
1682    /// ```
1683    pub fn send(self, value: T) {
1684        use std::mem;
1685
1686        self.chan.send(value);
1687
1688        // Avoid the drop logic
1689        mem::forget(self);
1690    }
1691}
1692
1693impl<T> Drop for Permit<'_, T> {
1694    fn drop(&mut self) {
1695        use chan::Semaphore;
1696
1697        let semaphore = self.chan.semaphore();
1698
1699        // Add the permit back to the semaphore
1700        semaphore.add_permit();
1701
1702        // If this is the last sender for this channel, wake the receiver so
1703        // that it can be notified that the channel is closed.
1704        if semaphore.is_closed() && semaphore.is_idle() {
1705            self.chan.wake_rx();
1706        }
1707    }
1708}
1709
1710impl<T> fmt::Debug for Permit<'_, T> {
1711    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1712        fmt.debug_struct("Permit")
1713            .field("chan", &self.chan)
1714            .finish()
1715    }
1716}
1717
1718// ===== impl PermitIterator =====
1719
1720impl<'a, T> Iterator for PermitIterator<'a, T> {
1721    type Item = Permit<'a, T>;
1722
1723    fn next(&mut self) -> Option<Self::Item> {
1724        if self.n == 0 {
1725            return None;
1726        }
1727
1728        self.n -= 1;
1729        Some(Permit { chan: self.chan })
1730    }
1731
1732    fn size_hint(&self) -> (usize, Option<usize>) {
1733        let n = self.n;
1734        (n, Some(n))
1735    }
1736}
1737impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1738impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1739
1740impl<T> Drop for PermitIterator<'_, T> {
1741    fn drop(&mut self) {
1742        use chan::Semaphore;
1743
1744        if self.n == 0 {
1745            return;
1746        }
1747
1748        let semaphore = self.chan.semaphore();
1749
1750        // Add the remaining permits back to the semaphore
1751        semaphore.add_permits(self.n);
1752
1753        // If this is the last sender for this channel, wake the receiver so
1754        // that it can be notified that the channel is closed.
1755        if semaphore.is_closed() && semaphore.is_idle() {
1756            self.chan.wake_rx();
1757        }
1758    }
1759}
1760
1761impl<T> fmt::Debug for PermitIterator<'_, T> {
1762    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1763        fmt.debug_struct("PermitIterator")
1764            .field("chan", &self.chan)
1765            .field("capacity", &self.n)
1766            .finish()
1767    }
1768}
1769
1770// ===== impl Permit =====
1771
1772impl<T> OwnedPermit<T> {
1773    /// Sends a value using the reserved capacity.
1774    ///
1775    /// Capacity for the message has already been reserved. The message is sent
1776    /// to the receiver and the permit is consumed. The operation will succeed
1777    /// even if the receiver half has been closed. See [`Receiver::close`] for
1778    /// more details on performing a clean shutdown.
1779    ///
1780    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1781    /// the `OwnedPermit` was reserved.
1782    ///
1783    /// [`Receiver::close`]: Receiver::close
1784    ///
1785    /// # Examples
1786    ///
1787    /// ```
1788    /// use tokio::sync::mpsc;
1789    ///
1790    /// #[tokio::main]
1791    /// async fn main() {
1792    ///     let (tx, mut rx) = mpsc::channel(1);
1793    ///
1794    ///     // Reserve capacity
1795    ///     let permit = tx.reserve_owned().await.unwrap();
1796    ///
1797    ///     // Send a message on the permit, returning the sender.
1798    ///     let tx = permit.send(456);
1799    ///
1800    ///     // The value sent on the permit is received
1801    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1802    ///
1803    ///     // We may now reuse `tx` to send another message.
1804    ///     tx.send(789).await.unwrap();
1805    /// }
1806    /// ```
1807    pub fn send(mut self, value: T) -> Sender<T> {
1808        let chan = self.chan.take().unwrap_or_else(|| {
1809            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1810        });
1811        chan.send(value);
1812
1813        Sender { chan }
1814    }
1815
1816    /// Releases the reserved capacity *without* sending a message, returning the
1817    /// [`Sender`].
1818    ///
1819    /// # Examples
1820    ///
1821    /// ```
1822    /// use tokio::sync::mpsc;
1823    ///
1824    /// #[tokio::main]
1825    /// async fn main() {
1826    ///     let (tx, rx) = mpsc::channel(1);
1827    ///
1828    ///     // Clone the sender and reserve capacity
1829    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1830    ///
1831    ///     // Trying to send on the original `tx` will fail, since the `permit`
1832    ///     // has reserved all the available capacity.
1833    ///     assert!(tx.try_send(123).is_err());
1834    ///
1835    ///     // Release the permit without sending a message, returning the clone
1836    ///     // of the sender.
1837    ///     let tx2 = permit.release();
1838    ///
1839    ///     // We may now reuse `tx` to send another message.
1840    ///     tx.send(789).await.unwrap();
1841    ///     # drop(rx); drop(tx2);
1842    /// }
1843    /// ```
1844    ///
1845    /// [`Sender`]: Sender
1846    pub fn release(mut self) -> Sender<T> {
1847        use chan::Semaphore;
1848
1849        let chan = self.chan.take().unwrap_or_else(|| {
1850            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1851        });
1852
1853        // Add the permit back to the semaphore
1854        chan.semaphore().add_permit();
1855        Sender { chan }
1856    }
1857}
1858
1859impl<T> Drop for OwnedPermit<T> {
1860    fn drop(&mut self) {
1861        use chan::Semaphore;
1862
1863        // Are we still holding onto the sender?
1864        if let Some(chan) = self.chan.take() {
1865            let semaphore = chan.semaphore();
1866
1867            // Add the permit back to the semaphore
1868            semaphore.add_permit();
1869
1870            // If this `OwnedPermit` is holding the last sender for this
1871            // channel, wake the receiver so that it can be notified that the
1872            // channel is closed.
1873            if semaphore.is_closed() && semaphore.is_idle() {
1874                chan.wake_rx();
1875            }
1876        }
1877
1878        // Otherwise, do nothing.
1879    }
1880}
1881
1882impl<T> fmt::Debug for OwnedPermit<T> {
1883    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1884        fmt.debug_struct("OwnedPermit")
1885            .field("chan", &self.chan)
1886            .finish()
1887    }
1888}