tokio/sync/mpsc/bounded.rs
1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139/// let (tx, mut rx) = mpsc::channel(100);
140///
141/// tokio::spawn(async move {
142/// for i in 0..10 {
143/// if let Err(_) = tx.send(i).await {
144/// println!("receiver dropped");
145/// return;
146/// }
147/// }
148/// });
149///
150/// while let Some(i) = rx.recv().await {
151/// println!("got = {}", i);
152/// }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158 let semaphore = Semaphore {
159 semaphore: semaphore::Semaphore::new(buffer),
160 bound: buffer,
161 };
162 let (tx, rx) = chan::channel(semaphore);
163
164 let tx = Sender::new(tx);
165 let rx = Receiver::new(rx);
166
167 (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174 pub(crate) semaphore: semaphore::Semaphore,
175 pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180 Receiver { chan }
181 }
182
183 /// Receives the next value for this receiver.
184 ///
185 /// This method returns `None` if the channel has been closed and there are
186 /// no remaining messages in the channel's buffer. This indicates that no
187 /// further values can ever be received from this `Receiver`. The channel is
188 /// closed when all senders have been dropped, or when [`close`] is called.
189 ///
190 /// If there are no messages in the channel's buffer, but the channel has
191 /// not yet been closed, this method will sleep until a message is sent or
192 /// the channel is closed. Note that if [`close`] is called, but there are
193 /// still outstanding [`Permits`] from before it was closed, the channel is
194 /// not considered closed by `recv` until the permits are released.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. If `recv` is used as the event in a
199 /// [`tokio::select!`](crate::select) statement and some other branch
200 /// completes first, it is guaranteed that no messages were received on this
201 /// channel.
202 ///
203 /// [`close`]: Self::close
204 /// [`Permits`]: struct@crate::sync::mpsc::Permit
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::sync::mpsc;
210 ///
211 /// #[tokio::main]
212 /// async fn main() {
213 /// let (tx, mut rx) = mpsc::channel(100);
214 ///
215 /// tokio::spawn(async move {
216 /// tx.send("hello").await.unwrap();
217 /// });
218 ///
219 /// assert_eq!(Some("hello"), rx.recv().await);
220 /// assert_eq!(None, rx.recv().await);
221 /// }
222 /// ```
223 ///
224 /// Values are buffered:
225 ///
226 /// ```
227 /// use tokio::sync::mpsc;
228 ///
229 /// #[tokio::main]
230 /// async fn main() {
231 /// let (tx, mut rx) = mpsc::channel(100);
232 ///
233 /// tx.send("hello").await.unwrap();
234 /// tx.send("world").await.unwrap();
235 ///
236 /// assert_eq!(Some("hello"), rx.recv().await);
237 /// assert_eq!(Some("world"), rx.recv().await);
238 /// }
239 /// ```
240 pub async fn recv(&mut self) -> Option<T> {
241 use std::future::poll_fn;
242 poll_fn(|cx| self.chan.recv(cx)).await
243 }
244
245 /// Receives the next values for this receiver and extends `buffer`.
246 ///
247 /// This method extends `buffer` by no more than a fixed number of values
248 /// as specified by `limit`. If `limit` is zero, the function immediately
249 /// returns `0`. The return value is the number of values added to `buffer`.
250 ///
251 /// For `limit > 0`, if there are no messages in the channel's queue, but
252 /// the channel has not yet been closed, this method will sleep until a
253 /// message is sent or the channel is closed. Note that if [`close`] is
254 /// called, but there are still outstanding [`Permits`] from before it was
255 /// closed, the channel is not considered closed by `recv_many` until the
256 /// permits are released.
257 ///
258 /// For non-zero values of `limit`, this method will never return `0` unless
259 /// the channel has been closed and there are no remaining messages in the
260 /// channel's queue. This indicates that no further values can ever be
261 /// received from this `Receiver`. The channel is closed when all senders
262 /// have been dropped, or when [`close`] is called.
263 ///
264 /// The capacity of `buffer` is increased as needed.
265 ///
266 /// # Cancel safety
267 ///
268 /// This method is cancel safe. If `recv_many` is used as the event in a
269 /// [`tokio::select!`](crate::select) statement and some other branch
270 /// completes first, it is guaranteed that no messages were received on this
271 /// channel.
272 ///
273 /// [`close`]: Self::close
274 /// [`Permits`]: struct@crate::sync::mpsc::Permit
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use tokio::sync::mpsc;
280 ///
281 /// #[tokio::main]
282 /// async fn main() {
283 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
284 /// let limit = 2;
285 /// let (tx, mut rx) = mpsc::channel(100);
286 /// let tx2 = tx.clone();
287 /// tx2.send("first").await.unwrap();
288 /// tx2.send("second").await.unwrap();
289 /// tx2.send("third").await.unwrap();
290 ///
291 /// // Call `recv_many` to receive up to `limit` (2) values.
292 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293 /// assert_eq!(vec!["first", "second"], buffer);
294 ///
295 /// // If the buffer is full, the next call to `recv_many`
296 /// // reserves additional capacity.
297 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298 ///
299 /// tokio::spawn(async move {
300 /// tx.send("fourth").await.unwrap();
301 /// });
302 ///
303 /// // 'tx' is dropped, but `recv_many`
304 /// // is guaranteed not to return 0 as the channel
305 /// // is not yet closed.
306 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308 ///
309 /// // Once the last sender is dropped, the channel is
310 /// // closed and `recv_many` returns 0, capacity unchanged.
311 /// drop(tx2);
312 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314 /// }
315 /// ```
316 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317 use std::future::poll_fn;
318 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319 }
320
321 /// Tries to receive the next value for this receiver.
322 ///
323 /// This method returns the [`Empty`] error if the channel is currently
324 /// empty, but there are still outstanding [senders] or [permits].
325 ///
326 /// This method returns the [`Disconnected`] error if the channel is
327 /// currently empty, and there are no outstanding [senders] or [permits].
328 ///
329 /// Unlike the [`poll_recv`] method, this method will never return an
330 /// [`Empty`] error spuriously.
331 ///
332 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334 /// [`poll_recv`]: Self::poll_recv
335 /// [senders]: crate::sync::mpsc::Sender
336 /// [permits]: crate::sync::mpsc::Permit
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use tokio::sync::mpsc;
342 /// use tokio::sync::mpsc::error::TryRecvError;
343 ///
344 /// #[tokio::main]
345 /// async fn main() {
346 /// let (tx, mut rx) = mpsc::channel(100);
347 ///
348 /// tx.send("hello").await.unwrap();
349 ///
350 /// assert_eq!(Ok("hello"), rx.try_recv());
351 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352 ///
353 /// tx.send("hello").await.unwrap();
354 /// // Drop the last sender, closing the channel.
355 /// drop(tx);
356 ///
357 /// assert_eq!(Ok("hello"), rx.try_recv());
358 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359 /// }
360 /// ```
361 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362 self.chan.try_recv()
363 }
364
365 /// Blocking receive to call outside of asynchronous contexts.
366 ///
367 /// This method returns `None` if the channel has been closed and there are
368 /// no remaining messages in the channel's buffer. This indicates that no
369 /// further values can ever be received from this `Receiver`. The channel is
370 /// closed when all senders have been dropped, or when [`close`] is called.
371 ///
372 /// If there are no messages in the channel's buffer, but the channel has
373 /// not yet been closed, this method will block until a message is sent or
374 /// the channel is closed.
375 ///
376 /// This method is intended for use cases where you are sending from
377 /// asynchronous code to synchronous code, and will work even if the sender
378 /// is not using [`blocking_send`] to send the message.
379 ///
380 /// Note that if [`close`] is called, but there are still outstanding
381 /// [`Permits`] from before it was closed, the channel is not considered
382 /// closed by `blocking_recv` until the permits are released.
383 ///
384 /// [`close`]: Self::close
385 /// [`Permits`]: struct@crate::sync::mpsc::Permit
386 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387 ///
388 /// # Panics
389 ///
390 /// This function panics if called within an asynchronous execution
391 /// context.
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// use std::thread;
397 /// use tokio::runtime::Runtime;
398 /// use tokio::sync::mpsc;
399 ///
400 /// fn main() {
401 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
402 ///
403 /// let sync_code = thread::spawn(move || {
404 /// assert_eq!(Some(10), rx.blocking_recv());
405 /// });
406 ///
407 /// Runtime::new()
408 /// .unwrap()
409 /// .block_on(async move {
410 /// let _ = tx.send(10).await;
411 /// });
412 /// sync_code.join().unwrap()
413 /// }
414 /// ```
415 #[track_caller]
416 #[cfg(feature = "sync")]
417 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418 pub fn blocking_recv(&mut self) -> Option<T> {
419 crate::future::block_on(self.recv())
420 }
421
422 /// Variant of [`Self::recv_many`] for blocking contexts.
423 ///
424 /// The same conditions as in [`Self::blocking_recv`] apply.
425 #[track_caller]
426 #[cfg(feature = "sync")]
427 #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
428 pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
429 crate::future::block_on(self.recv_many(buffer, limit))
430 }
431
432 /// Closes the receiving half of a channel without dropping it.
433 ///
434 /// This prevents any further messages from being sent on the channel while
435 /// still enabling the receiver to drain messages that are buffered. Any
436 /// outstanding [`Permit`] values will still be able to send messages.
437 ///
438 /// To guarantee that no messages are dropped, after calling `close()`,
439 /// `recv()` must be called until `None` is returned. If there are
440 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
441 /// not return `None` until those are released.
442 ///
443 /// [`Permit`]: Permit
444 /// [`OwnedPermit`]: OwnedPermit
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use tokio::sync::mpsc;
450 ///
451 /// #[tokio::main]
452 /// async fn main() {
453 /// let (tx, mut rx) = mpsc::channel(20);
454 ///
455 /// tokio::spawn(async move {
456 /// let mut i = 0;
457 /// while let Ok(permit) = tx.reserve().await {
458 /// permit.send(i);
459 /// i += 1;
460 /// }
461 /// });
462 ///
463 /// rx.close();
464 ///
465 /// while let Some(msg) = rx.recv().await {
466 /// println!("got {}", msg);
467 /// }
468 ///
469 /// // Channel closed and no messages are lost.
470 /// }
471 /// ```
472 pub fn close(&mut self) {
473 self.chan.close();
474 }
475
476 /// Checks if a channel is closed.
477 ///
478 /// This method returns `true` if the channel has been closed. The channel is closed
479 /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
480 ///
481 /// [`Sender`]: crate::sync::mpsc::Sender
482 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
483 ///
484 /// # Examples
485 /// ```
486 /// use tokio::sync::mpsc;
487 ///
488 /// #[tokio::main]
489 /// async fn main() {
490 /// let (_tx, mut rx) = mpsc::channel::<()>(10);
491 /// assert!(!rx.is_closed());
492 ///
493 /// rx.close();
494 ///
495 /// assert!(rx.is_closed());
496 /// }
497 /// ```
498 pub fn is_closed(&self) -> bool {
499 self.chan.is_closed()
500 }
501
502 /// Checks if a channel is empty.
503 ///
504 /// This method returns `true` if the channel has no messages.
505 ///
506 /// # Examples
507 /// ```
508 /// use tokio::sync::mpsc;
509 ///
510 /// #[tokio::main]
511 /// async fn main() {
512 /// let (tx, rx) = mpsc::channel(10);
513 /// assert!(rx.is_empty());
514 ///
515 /// tx.send(0).await.unwrap();
516 /// assert!(!rx.is_empty());
517 /// }
518 ///
519 /// ```
520 pub fn is_empty(&self) -> bool {
521 self.chan.is_empty()
522 }
523
524 /// Returns the number of messages in the channel.
525 ///
526 /// # Examples
527 /// ```
528 /// use tokio::sync::mpsc;
529 ///
530 /// #[tokio::main]
531 /// async fn main() {
532 /// let (tx, rx) = mpsc::channel(10);
533 /// assert_eq!(0, rx.len());
534 ///
535 /// tx.send(0).await.unwrap();
536 /// assert_eq!(1, rx.len());
537 /// }
538 /// ```
539 pub fn len(&self) -> usize {
540 self.chan.len()
541 }
542
543 /// Returns the current capacity of the channel.
544 ///
545 /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
546 /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
547 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
548 /// specified when calling [`channel`].
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// use tokio::sync::mpsc;
554 ///
555 /// #[tokio::main]
556 /// async fn main() {
557 /// let (tx, mut rx) = mpsc::channel::<()>(5);
558 ///
559 /// assert_eq!(rx.capacity(), 5);
560 ///
561 /// // Making a reservation drops the capacity by one.
562 /// let permit = tx.reserve().await.unwrap();
563 /// assert_eq!(rx.capacity(), 4);
564 /// assert_eq!(rx.len(), 0);
565 ///
566 /// // Sending and receiving a value increases the capacity by one.
567 /// permit.send(());
568 /// assert_eq!(rx.len(), 1);
569 /// rx.recv().await.unwrap();
570 /// assert_eq!(rx.capacity(), 5);
571 ///
572 /// // Directly sending a message drops the capacity by one.
573 /// tx.send(()).await.unwrap();
574 /// assert_eq!(rx.capacity(), 4);
575 /// assert_eq!(rx.len(), 1);
576 ///
577 /// // Receiving the message increases the capacity by one.
578 /// rx.recv().await.unwrap();
579 /// assert_eq!(rx.capacity(), 5);
580 /// assert_eq!(rx.len(), 0);
581 /// }
582 /// ```
583 /// [`capacity`]: Receiver::capacity
584 /// [`max_capacity`]: Receiver::max_capacity
585 pub fn capacity(&self) -> usize {
586 self.chan.semaphore().semaphore.available_permits()
587 }
588
589 /// Returns the maximum buffer capacity of the channel.
590 ///
591 /// The maximum capacity is the buffer capacity initially specified when calling
592 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
593 /// available buffer capacity: as messages are sent and received, the value
594 /// returned by [`capacity`] will go up or down, whereas the value
595 /// returned by [`max_capacity`] will remain constant.
596 ///
597 /// # Examples
598 ///
599 /// ```
600 /// use tokio::sync::mpsc;
601 ///
602 /// #[tokio::main]
603 /// async fn main() {
604 /// let (tx, rx) = mpsc::channel::<()>(5);
605 ///
606 /// // both max capacity and capacity are the same at first
607 /// assert_eq!(rx.max_capacity(), 5);
608 /// assert_eq!(rx.capacity(), 5);
609 ///
610 /// // Making a reservation doesn't change the max capacity.
611 /// let permit = tx.reserve().await.unwrap();
612 /// assert_eq!(rx.max_capacity(), 5);
613 /// // but drops the capacity by one
614 /// assert_eq!(rx.capacity(), 4);
615 /// }
616 /// ```
617 /// [`capacity`]: Receiver::capacity
618 /// [`max_capacity`]: Receiver::max_capacity
619 pub fn max_capacity(&self) -> usize {
620 self.chan.semaphore().bound
621 }
622
623 /// Polls to receive the next message on this channel.
624 ///
625 /// This method returns:
626 ///
627 /// * `Poll::Pending` if no messages are available but the channel is not
628 /// closed, or if a spurious failure happens.
629 /// * `Poll::Ready(Some(message))` if a message is available.
630 /// * `Poll::Ready(None)` if the channel has been closed and all messages
631 /// sent before it was closed have been received.
632 ///
633 /// When the method returns `Poll::Pending`, the `Waker` in the provided
634 /// `Context` is scheduled to receive a wakeup when a message is sent on any
635 /// receiver, or when the channel is closed. Note that on multiple calls to
636 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
637 /// passed to the most recent call is scheduled to receive a wakeup.
638 ///
639 /// If this method returns `Poll::Pending` due to a spurious failure, then
640 /// the `Waker` will be notified when the situation causing the spurious
641 /// failure has been resolved. Note that receiving such a wakeup does not
642 /// guarantee that the next call will succeed — it could fail with another
643 /// spurious failure.
644 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
645 self.chan.recv(cx)
646 }
647
648 /// Polls to receive multiple messages on this channel, extending the provided buffer.
649 ///
650 /// This method returns:
651 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
652 /// spurious failure happens.
653 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
654 /// stored in `buffer`. This can be less than, or equal to, `limit`.
655 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
656 ///
657 /// When the method returns `Poll::Pending`, the `Waker` in the provided
658 /// `Context` is scheduled to receive a wakeup when a message is sent on any
659 /// receiver, or when the channel is closed. Note that on multiple calls to
660 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
661 /// passed to the most recent call is scheduled to receive a wakeup.
662 ///
663 /// Note that this method does not guarantee that exactly `limit` messages
664 /// are received. Rather, if at least one message is available, it returns
665 /// as many messages as it can up to the given limit. This method returns
666 /// zero only if the channel is closed (or if `limit` is zero).
667 ///
668 /// # Examples
669 ///
670 /// ```
671 /// use std::task::{Context, Poll};
672 /// use std::pin::Pin;
673 /// use tokio::sync::mpsc;
674 /// use futures::Future;
675 ///
676 /// struct MyReceiverFuture<'a> {
677 /// receiver: mpsc::Receiver<i32>,
678 /// buffer: &'a mut Vec<i32>,
679 /// limit: usize,
680 /// }
681 ///
682 /// impl<'a> Future for MyReceiverFuture<'a> {
683 /// type Output = usize; // Number of messages received
684 ///
685 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
686 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
687 ///
688 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
689 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
690 /// Poll::Pending => Poll::Pending,
691 /// Poll::Ready(count) => Poll::Ready(count),
692 /// }
693 /// }
694 /// }
695 ///
696 /// #[tokio::main]
697 /// async fn main() {
698 /// let (tx, rx) = mpsc::channel(32);
699 /// let mut buffer = Vec::new();
700 ///
701 /// let my_receiver_future = MyReceiverFuture {
702 /// receiver: rx,
703 /// buffer: &mut buffer,
704 /// limit: 3,
705 /// };
706 ///
707 /// for i in 0..10 {
708 /// tx.send(i).await.unwrap();
709 /// }
710 ///
711 /// let count = my_receiver_future.await;
712 /// assert_eq!(count, 3);
713 /// assert_eq!(buffer, vec![0,1,2])
714 /// }
715 /// ```
716 pub fn poll_recv_many(
717 &mut self,
718 cx: &mut Context<'_>,
719 buffer: &mut Vec<T>,
720 limit: usize,
721 ) -> Poll<usize> {
722 self.chan.recv_many(cx, buffer, limit)
723 }
724
725 /// Returns the number of [`Sender`] handles.
726 pub fn sender_strong_count(&self) -> usize {
727 self.chan.sender_strong_count()
728 }
729
730 /// Returns the number of [`WeakSender`] handles.
731 pub fn sender_weak_count(&self) -> usize {
732 self.chan.sender_weak_count()
733 }
734}
735
736impl<T> fmt::Debug for Receiver<T> {
737 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
738 fmt.debug_struct("Receiver")
739 .field("chan", &self.chan)
740 .finish()
741 }
742}
743
744impl<T> Unpin for Receiver<T> {}
745
746impl<T> Sender<T> {
747 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
748 Sender { chan }
749 }
750
751 /// Sends a value, waiting until there is capacity.
752 ///
753 /// A successful send occurs when it is determined that the other end of the
754 /// channel has not hung up already. An unsuccessful send would be one where
755 /// the corresponding receiver has already been closed. Note that a return
756 /// value of `Err` means that the data will never be received, but a return
757 /// value of `Ok` does not mean that the data will be received. It is
758 /// possible for the corresponding receiver to hang up immediately after
759 /// this function returns `Ok`.
760 ///
761 /// # Errors
762 ///
763 /// If the receive half of the channel is closed, either due to [`close`]
764 /// being called or the [`Receiver`] handle dropping, the function returns
765 /// an error. The error includes the value passed to `send`.
766 ///
767 /// [`close`]: Receiver::close
768 /// [`Receiver`]: Receiver
769 ///
770 /// # Cancel safety
771 ///
772 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
773 /// statement and some other branch completes first, then it is guaranteed
774 /// that the message was not sent. **However, in that case, the message
775 /// is dropped and will be lost.**
776 ///
777 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
778 /// capacity, then use the returned [`Permit`] to send the message.
779 ///
780 /// This channel uses a queue to ensure that calls to `send` and `reserve`
781 /// complete in the order they were requested. Cancelling a call to
782 /// `send` makes you lose your place in the queue.
783 ///
784 /// # Examples
785 ///
786 /// In the following example, each call to `send` will block until the
787 /// previously sent value was received.
788 ///
789 /// ```rust
790 /// use tokio::sync::mpsc;
791 ///
792 /// #[tokio::main]
793 /// async fn main() {
794 /// let (tx, mut rx) = mpsc::channel(1);
795 ///
796 /// tokio::spawn(async move {
797 /// for i in 0..10 {
798 /// if let Err(_) = tx.send(i).await {
799 /// println!("receiver dropped");
800 /// return;
801 /// }
802 /// }
803 /// });
804 ///
805 /// while let Some(i) = rx.recv().await {
806 /// println!("got = {}", i);
807 /// }
808 /// }
809 /// ```
810 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
811 match self.reserve().await {
812 Ok(permit) => {
813 permit.send(value);
814 Ok(())
815 }
816 Err(_) => Err(SendError(value)),
817 }
818 }
819
820 /// Completes when the receiver has dropped.
821 ///
822 /// This allows the producers to get notified when interest in the produced
823 /// values is canceled and immediately stop doing work.
824 ///
825 /// # Cancel safety
826 ///
827 /// This method is cancel safe. Once the channel is closed, it stays closed
828 /// forever and all future calls to `closed` will return immediately.
829 ///
830 /// # Examples
831 ///
832 /// ```
833 /// use tokio::sync::mpsc;
834 ///
835 /// #[tokio::main]
836 /// async fn main() {
837 /// let (tx1, rx) = mpsc::channel::<()>(1);
838 /// let tx2 = tx1.clone();
839 /// let tx3 = tx1.clone();
840 /// let tx4 = tx1.clone();
841 /// let tx5 = tx1.clone();
842 /// tokio::spawn(async move {
843 /// drop(rx);
844 /// });
845 ///
846 /// futures::join!(
847 /// tx1.closed(),
848 /// tx2.closed(),
849 /// tx3.closed(),
850 /// tx4.closed(),
851 /// tx5.closed()
852 /// );
853 /// println!("Receiver dropped");
854 /// }
855 /// ```
856 pub async fn closed(&self) {
857 self.chan.closed().await;
858 }
859
860 /// Attempts to immediately send a message on this `Sender`
861 ///
862 /// This method differs from [`send`] by returning immediately if the channel's
863 /// buffer is full or no receiver is waiting to acquire some data. Compared
864 /// with [`send`], this function has two failure cases instead of one (one for
865 /// disconnection, one for a full buffer).
866 ///
867 /// # Errors
868 ///
869 /// If the channel capacity has been reached, i.e., the channel has `n`
870 /// buffered values where `n` is the argument passed to [`channel`], then an
871 /// error is returned.
872 ///
873 /// If the receive half of the channel is closed, either due to [`close`]
874 /// being called or the [`Receiver`] handle dropping, the function returns
875 /// an error. The error includes the value passed to `send`.
876 ///
877 /// [`send`]: Sender::send
878 /// [`channel`]: channel
879 /// [`close`]: Receiver::close
880 ///
881 /// # Examples
882 ///
883 /// ```
884 /// use tokio::sync::mpsc;
885 ///
886 /// #[tokio::main]
887 /// async fn main() {
888 /// // Create a channel with buffer size 1
889 /// let (tx1, mut rx) = mpsc::channel(1);
890 /// let tx2 = tx1.clone();
891 ///
892 /// tokio::spawn(async move {
893 /// tx1.send(1).await.unwrap();
894 /// tx1.send(2).await.unwrap();
895 /// // task waits until the receiver receives a value.
896 /// });
897 ///
898 /// tokio::spawn(async move {
899 /// // This will return an error and send
900 /// // no message if the buffer is full
901 /// let _ = tx2.try_send(3);
902 /// });
903 ///
904 /// let mut msg;
905 /// msg = rx.recv().await.unwrap();
906 /// println!("message {} received", msg);
907 ///
908 /// msg = rx.recv().await.unwrap();
909 /// println!("message {} received", msg);
910 ///
911 /// // Third message may have never been sent
912 /// match rx.recv().await {
913 /// Some(msg) => println!("message {} received", msg),
914 /// None => println!("the third message was never sent"),
915 /// }
916 /// }
917 /// ```
918 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
919 match self.chan.semaphore().semaphore.try_acquire(1) {
920 Ok(()) => {}
921 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
922 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
923 }
924
925 // Send the message
926 self.chan.send(message);
927 Ok(())
928 }
929
930 /// Sends a value, waiting until there is capacity, but only for a limited time.
931 ///
932 /// Shares the same success and error conditions as [`send`], adding one more
933 /// condition for an unsuccessful send, which is when the provided timeout has
934 /// elapsed, and there is no capacity available.
935 ///
936 /// [`send`]: Sender::send
937 ///
938 /// # Errors
939 ///
940 /// If the receive half of the channel is closed, either due to [`close`]
941 /// being called or the [`Receiver`] having been dropped,
942 /// the function returns an error. The error includes the value passed to `send`.
943 ///
944 /// [`close`]: Receiver::close
945 /// [`Receiver`]: Receiver
946 ///
947 /// # Panics
948 ///
949 /// This function panics if it is called outside the context of a Tokio
950 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
951 ///
952 /// # Examples
953 ///
954 /// In the following example, each call to `send_timeout` will block until the
955 /// previously sent value was received, unless the timeout has elapsed.
956 ///
957 /// ```rust
958 /// use tokio::sync::mpsc;
959 /// use tokio::time::{sleep, Duration};
960 ///
961 /// #[tokio::main]
962 /// async fn main() {
963 /// let (tx, mut rx) = mpsc::channel(1);
964 ///
965 /// tokio::spawn(async move {
966 /// for i in 0..10 {
967 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
968 /// println!("send error: #{:?}", e);
969 /// return;
970 /// }
971 /// }
972 /// });
973 ///
974 /// while let Some(i) = rx.recv().await {
975 /// println!("got = {}", i);
976 /// sleep(Duration::from_millis(200)).await;
977 /// }
978 /// }
979 /// ```
980 #[cfg(feature = "time")]
981 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
982 pub async fn send_timeout(
983 &self,
984 value: T,
985 timeout: Duration,
986 ) -> Result<(), SendTimeoutError<T>> {
987 let permit = match crate::time::timeout(timeout, self.reserve()).await {
988 Err(_) => {
989 return Err(SendTimeoutError::Timeout(value));
990 }
991 Ok(Err(_)) => {
992 return Err(SendTimeoutError::Closed(value));
993 }
994 Ok(Ok(permit)) => permit,
995 };
996
997 permit.send(value);
998 Ok(())
999 }
1000
1001 /// Blocking send to call outside of asynchronous contexts.
1002 ///
1003 /// This method is intended for use cases where you are sending from
1004 /// synchronous code to asynchronous code, and will work even if the
1005 /// receiver is not using [`blocking_recv`] to receive the message.
1006 ///
1007 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1008 ///
1009 /// # Panics
1010 ///
1011 /// This function panics if called within an asynchronous execution
1012 /// context.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use std::thread;
1018 /// use tokio::runtime::Runtime;
1019 /// use tokio::sync::mpsc;
1020 ///
1021 /// fn main() {
1022 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
1023 ///
1024 /// let sync_code = thread::spawn(move || {
1025 /// tx.blocking_send(10).unwrap();
1026 /// });
1027 ///
1028 /// Runtime::new().unwrap().block_on(async move {
1029 /// assert_eq!(Some(10), rx.recv().await);
1030 /// });
1031 /// sync_code.join().unwrap()
1032 /// }
1033 /// ```
1034 #[track_caller]
1035 #[cfg(feature = "sync")]
1036 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1037 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1038 crate::future::block_on(self.send(value))
1039 }
1040
1041 /// Checks if the channel has been closed. This happens when the
1042 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1043 /// called.
1044 ///
1045 /// [`Receiver`]: crate::sync::mpsc::Receiver
1046 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1047 ///
1048 /// ```
1049 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1050 /// assert!(!tx.is_closed());
1051 ///
1052 /// let tx2 = tx.clone();
1053 /// assert!(!tx2.is_closed());
1054 ///
1055 /// drop(rx);
1056 /// assert!(tx.is_closed());
1057 /// assert!(tx2.is_closed());
1058 /// ```
1059 pub fn is_closed(&self) -> bool {
1060 self.chan.is_closed()
1061 }
1062
1063 /// Waits for channel capacity. Once capacity to send one message is
1064 /// available, it is reserved for the caller.
1065 ///
1066 /// If the channel is full, the function waits for the number of unreceived
1067 /// messages to become less than the channel capacity. Capacity to send one
1068 /// message is reserved for the caller. A [`Permit`] is returned to track
1069 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1070 /// reserved capacity.
1071 ///
1072 /// Dropping [`Permit`] without sending a message releases the capacity back
1073 /// to the channel.
1074 ///
1075 /// [`Permit`]: Permit
1076 /// [`send`]: Permit::send
1077 ///
1078 /// # Cancel safety
1079 ///
1080 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1081 /// complete in the order they were requested. Cancelling a call to
1082 /// `reserve` makes you lose your place in the queue.
1083 ///
1084 /// # Examples
1085 ///
1086 /// ```
1087 /// use tokio::sync::mpsc;
1088 ///
1089 /// #[tokio::main]
1090 /// async fn main() {
1091 /// let (tx, mut rx) = mpsc::channel(1);
1092 ///
1093 /// // Reserve capacity
1094 /// let permit = tx.reserve().await.unwrap();
1095 ///
1096 /// // Trying to send directly on the `tx` will fail due to no
1097 /// // available capacity.
1098 /// assert!(tx.try_send(123).is_err());
1099 ///
1100 /// // Sending on the permit succeeds
1101 /// permit.send(456);
1102 ///
1103 /// // The value sent on the permit is received
1104 /// assert_eq!(rx.recv().await.unwrap(), 456);
1105 /// }
1106 /// ```
1107 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1108 self.reserve_inner(1).await?;
1109 Ok(Permit { chan: &self.chan })
1110 }
1111
1112 /// Waits for channel capacity. Once capacity to send `n` messages is
1113 /// available, it is reserved for the caller.
1114 ///
1115 /// If the channel is full or if there are fewer than `n` permits available, the function waits
1116 /// for the number of unreceived messages to become `n` less than the channel capacity.
1117 /// Capacity to send `n` message is then reserved for the caller.
1118 ///
1119 /// A [`PermitIterator`] is returned to track the reserved capacity.
1120 /// You can call this [`Iterator`] until it is exhausted to
1121 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1122 /// [`try_reserve_many`] except it awaits for the slots to become available.
1123 ///
1124 /// If the channel is closed, the function returns a [`SendError`].
1125 ///
1126 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1127 /// permits back to the channel.
1128 ///
1129 /// [`PermitIterator`]: PermitIterator
1130 /// [`Permit`]: Permit
1131 /// [`send`]: Permit::send
1132 /// [`try_reserve_many`]: Sender::try_reserve_many
1133 ///
1134 /// # Cancel safety
1135 ///
1136 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1137 /// complete in the order they were requested. Cancelling a call to
1138 /// `reserve_many` makes you lose your place in the queue.
1139 ///
1140 /// # Examples
1141 ///
1142 /// ```
1143 /// use tokio::sync::mpsc;
1144 ///
1145 /// #[tokio::main]
1146 /// async fn main() {
1147 /// let (tx, mut rx) = mpsc::channel(2);
1148 ///
1149 /// // Reserve capacity
1150 /// let mut permit = tx.reserve_many(2).await.unwrap();
1151 ///
1152 /// // Trying to send directly on the `tx` will fail due to no
1153 /// // available capacity.
1154 /// assert!(tx.try_send(123).is_err());
1155 ///
1156 /// // Sending with the permit iterator succeeds
1157 /// permit.next().unwrap().send(456);
1158 /// permit.next().unwrap().send(457);
1159 ///
1160 /// // The iterator should now be exhausted
1161 /// assert!(permit.next().is_none());
1162 ///
1163 /// // The value sent on the permit is received
1164 /// assert_eq!(rx.recv().await.unwrap(), 456);
1165 /// assert_eq!(rx.recv().await.unwrap(), 457);
1166 /// }
1167 /// ```
1168 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1169 self.reserve_inner(n).await?;
1170 Ok(PermitIterator {
1171 chan: &self.chan,
1172 n,
1173 })
1174 }
1175
1176 /// Waits for channel capacity, moving the `Sender` and returning an owned
1177 /// permit. Once capacity to send one message is available, it is reserved
1178 /// for the caller.
1179 ///
1180 /// This moves the sender _by value_, and returns an owned permit that can
1181 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1182 /// this method may be used in cases where the permit must be valid for the
1183 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1184 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1185 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1186 /// moved, it can be cloned prior to calling `reserve_owned`.
1187 ///
1188 /// If the channel is full, the function waits for the number of unreceived
1189 /// messages to become less than the channel capacity. Capacity to send one
1190 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1191 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1192 /// consumes the reserved capacity.
1193 ///
1194 /// Dropping the [`OwnedPermit`] without sending a message releases the
1195 /// capacity back to the channel.
1196 ///
1197 /// # Cancel safety
1198 ///
1199 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1200 /// complete in the order they were requested. Cancelling a call to
1201 /// `reserve_owned` makes you lose your place in the queue.
1202 ///
1203 /// # Examples
1204 /// Sending a message using an [`OwnedPermit`]:
1205 /// ```
1206 /// use tokio::sync::mpsc;
1207 ///
1208 /// #[tokio::main]
1209 /// async fn main() {
1210 /// let (tx, mut rx) = mpsc::channel(1);
1211 ///
1212 /// // Reserve capacity, moving the sender.
1213 /// let permit = tx.reserve_owned().await.unwrap();
1214 ///
1215 /// // Send a message, consuming the permit and returning
1216 /// // the moved sender.
1217 /// let tx = permit.send(123);
1218 ///
1219 /// // The value sent on the permit is received.
1220 /// assert_eq!(rx.recv().await.unwrap(), 123);
1221 ///
1222 /// // The sender can now be used again.
1223 /// tx.send(456).await.unwrap();
1224 /// }
1225 /// ```
1226 ///
1227 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1228 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1229 ///
1230 /// ```
1231 /// use tokio::sync::mpsc;
1232 ///
1233 /// #[tokio::main]
1234 /// async fn main() {
1235 /// let (tx, mut rx) = mpsc::channel(1);
1236 ///
1237 /// // Clone the sender and reserve capacity.
1238 /// let permit = tx.clone().reserve_owned().await.unwrap();
1239 ///
1240 /// // Trying to send directly on the `tx` will fail due to no
1241 /// // available capacity.
1242 /// assert!(tx.try_send(123).is_err());
1243 ///
1244 /// // Sending on the permit succeeds.
1245 /// permit.send(456);
1246 ///
1247 /// // The value sent on the permit is received
1248 /// assert_eq!(rx.recv().await.unwrap(), 456);
1249 /// }
1250 /// ```
1251 ///
1252 /// [`Sender::reserve`]: Sender::reserve
1253 /// [`OwnedPermit`]: OwnedPermit
1254 /// [`send`]: OwnedPermit::send
1255 /// [`Arc::clone`]: std::sync::Arc::clone
1256 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1257 self.reserve_inner(1).await?;
1258 Ok(OwnedPermit {
1259 chan: Some(self.chan),
1260 })
1261 }
1262
1263 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1264 crate::trace::async_trace_leaf().await;
1265
1266 if n > self.max_capacity() {
1267 return Err(SendError(()));
1268 }
1269 match self.chan.semaphore().semaphore.acquire(n).await {
1270 Ok(()) => Ok(()),
1271 Err(_) => Err(SendError(())),
1272 }
1273 }
1274
1275 /// Tries to acquire a slot in the channel without waiting for the slot to become
1276 /// available.
1277 ///
1278 /// If the channel is full this function will return [`TrySendError`], otherwise
1279 /// if there is a slot available it will return a [`Permit`] that will then allow you
1280 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1281 /// [`reserve`] except it does not await for the slot to become available.
1282 ///
1283 /// Dropping [`Permit`] without sending a message releases the capacity back
1284 /// to the channel.
1285 ///
1286 /// [`Permit`]: Permit
1287 /// [`send`]: Permit::send
1288 /// [`reserve`]: Sender::reserve
1289 ///
1290 /// # Examples
1291 ///
1292 /// ```
1293 /// use tokio::sync::mpsc;
1294 ///
1295 /// #[tokio::main]
1296 /// async fn main() {
1297 /// let (tx, mut rx) = mpsc::channel(1);
1298 ///
1299 /// // Reserve capacity
1300 /// let permit = tx.try_reserve().unwrap();
1301 ///
1302 /// // Trying to send directly on the `tx` will fail due to no
1303 /// // available capacity.
1304 /// assert!(tx.try_send(123).is_err());
1305 ///
1306 /// // Trying to reserve an additional slot on the `tx` will
1307 /// // fail because there is no capacity.
1308 /// assert!(tx.try_reserve().is_err());
1309 ///
1310 /// // Sending on the permit succeeds
1311 /// permit.send(456);
1312 ///
1313 /// // The value sent on the permit is received
1314 /// assert_eq!(rx.recv().await.unwrap(), 456);
1315 ///
1316 /// }
1317 /// ```
1318 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1319 match self.chan.semaphore().semaphore.try_acquire(1) {
1320 Ok(()) => {}
1321 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1322 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1323 }
1324
1325 Ok(Permit { chan: &self.chan })
1326 }
1327
1328 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1329 /// available.
1330 ///
1331 /// A [`PermitIterator`] is returned to track the reserved capacity.
1332 /// You can call this [`Iterator`] until it is exhausted to
1333 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1334 /// [`reserve_many`] except it does not await for the slots to become available.
1335 ///
1336 /// If there are fewer than `n` permits available on the channel, then
1337 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1338 /// this function will return a [`TrySendError::Closed`].
1339 ///
1340 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1341 /// permits back to the channel.
1342 ///
1343 /// [`PermitIterator`]: PermitIterator
1344 /// [`send`]: Permit::send
1345 /// [`reserve_many`]: Sender::reserve_many
1346 ///
1347 /// # Examples
1348 ///
1349 /// ```
1350 /// use tokio::sync::mpsc;
1351 ///
1352 /// #[tokio::main]
1353 /// async fn main() {
1354 /// let (tx, mut rx) = mpsc::channel(2);
1355 ///
1356 /// // Reserve capacity
1357 /// let mut permit = tx.try_reserve_many(2).unwrap();
1358 ///
1359 /// // Trying to send directly on the `tx` will fail due to no
1360 /// // available capacity.
1361 /// assert!(tx.try_send(123).is_err());
1362 ///
1363 /// // Trying to reserve an additional slot on the `tx` will
1364 /// // fail because there is no capacity.
1365 /// assert!(tx.try_reserve().is_err());
1366 ///
1367 /// // Sending with the permit iterator succeeds
1368 /// permit.next().unwrap().send(456);
1369 /// permit.next().unwrap().send(457);
1370 ///
1371 /// // The iterator should now be exhausted
1372 /// assert!(permit.next().is_none());
1373 ///
1374 /// // The value sent on the permit is received
1375 /// assert_eq!(rx.recv().await.unwrap(), 456);
1376 /// assert_eq!(rx.recv().await.unwrap(), 457);
1377 ///
1378 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1379 /// let mut permit = tx.try_reserve_many(0).unwrap();
1380 /// assert!(permit.next().is_none());
1381 ///
1382 /// // Trying to call try_reserve_many with a number greater than the channel
1383 /// // capacity will return an error
1384 /// let permit = tx.try_reserve_many(3);
1385 /// assert!(permit.is_err());
1386 ///
1387 /// // Trying to call try_reserve_many on a closed channel will return an error
1388 /// drop(rx);
1389 /// let permit = tx.try_reserve_many(1);
1390 /// assert!(permit.is_err());
1391 ///
1392 /// let permit = tx.try_reserve_many(0);
1393 /// assert!(permit.is_err());
1394 /// }
1395 /// ```
1396 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1397 if n > self.max_capacity() {
1398 return Err(TrySendError::Full(()));
1399 }
1400
1401 match self.chan.semaphore().semaphore.try_acquire(n) {
1402 Ok(()) => {}
1403 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1404 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1405 }
1406
1407 Ok(PermitIterator {
1408 chan: &self.chan,
1409 n,
1410 })
1411 }
1412
1413 /// Tries to acquire a slot in the channel without waiting for the slot to become
1414 /// available, returning an owned permit.
1415 ///
1416 /// This moves the sender _by value_, and returns an owned permit that can
1417 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1418 /// this method may be used in cases where the permit must be valid for the
1419 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1420 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1421 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1422 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1423 ///
1424 /// If the channel is full this function will return a [`TrySendError`].
1425 /// Since the sender is taken by value, the `TrySendError` returned in this
1426 /// case contains the sender, so that it may be used again. Otherwise, if
1427 /// there is a slot available, this method will return an [`OwnedPermit`]
1428 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1429 /// This function is similar to [`reserve_owned`] except it does not await
1430 /// for the slot to become available.
1431 ///
1432 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1433 /// to the channel.
1434 ///
1435 /// [`OwnedPermit`]: OwnedPermit
1436 /// [`send`]: OwnedPermit::send
1437 /// [`reserve_owned`]: Sender::reserve_owned
1438 /// [`Arc::clone`]: std::sync::Arc::clone
1439 ///
1440 /// # Examples
1441 ///
1442 /// ```
1443 /// use tokio::sync::mpsc;
1444 ///
1445 /// #[tokio::main]
1446 /// async fn main() {
1447 /// let (tx, mut rx) = mpsc::channel(1);
1448 ///
1449 /// // Reserve capacity
1450 /// let permit = tx.clone().try_reserve_owned().unwrap();
1451 ///
1452 /// // Trying to send directly on the `tx` will fail due to no
1453 /// // available capacity.
1454 /// assert!(tx.try_send(123).is_err());
1455 ///
1456 /// // Trying to reserve an additional slot on the `tx` will
1457 /// // fail because there is no capacity.
1458 /// assert!(tx.try_reserve().is_err());
1459 ///
1460 /// // Sending on the permit succeeds
1461 /// permit.send(456);
1462 ///
1463 /// // The value sent on the permit is received
1464 /// assert_eq!(rx.recv().await.unwrap(), 456);
1465 ///
1466 /// }
1467 /// ```
1468 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1469 match self.chan.semaphore().semaphore.try_acquire(1) {
1470 Ok(()) => {}
1471 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1472 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1473 }
1474
1475 Ok(OwnedPermit {
1476 chan: Some(self.chan),
1477 })
1478 }
1479
1480 /// Returns `true` if senders belong to the same channel.
1481 ///
1482 /// # Examples
1483 ///
1484 /// ```
1485 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1486 /// let tx2 = tx.clone();
1487 /// assert!(tx.same_channel(&tx2));
1488 ///
1489 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1490 /// assert!(!tx3.same_channel(&tx2));
1491 /// ```
1492 pub fn same_channel(&self, other: &Self) -> bool {
1493 self.chan.same_channel(&other.chan)
1494 }
1495
1496 /// Returns the current capacity of the channel.
1497 ///
1498 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1499 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1500 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1501 /// specified when calling [`channel`]
1502 ///
1503 /// # Examples
1504 ///
1505 /// ```
1506 /// use tokio::sync::mpsc;
1507 ///
1508 /// #[tokio::main]
1509 /// async fn main() {
1510 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1511 ///
1512 /// assert_eq!(tx.capacity(), 5);
1513 ///
1514 /// // Making a reservation drops the capacity by one.
1515 /// let permit = tx.reserve().await.unwrap();
1516 /// assert_eq!(tx.capacity(), 4);
1517 ///
1518 /// // Sending and receiving a value increases the capacity by one.
1519 /// permit.send(());
1520 /// rx.recv().await.unwrap();
1521 /// assert_eq!(tx.capacity(), 5);
1522 /// }
1523 /// ```
1524 ///
1525 /// [`send`]: Sender::send
1526 /// [`reserve`]: Sender::reserve
1527 /// [`channel`]: channel
1528 /// [`max_capacity`]: Sender::max_capacity
1529 pub fn capacity(&self) -> usize {
1530 self.chan.semaphore().semaphore.available_permits()
1531 }
1532
1533 /// Converts the `Sender` to a [`WeakSender`] that does not count
1534 /// towards RAII semantics, i.e. if all `Sender` instances of the
1535 /// channel were dropped and only `WeakSender` instances remain,
1536 /// the channel is closed.
1537 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1538 pub fn downgrade(&self) -> WeakSender<T> {
1539 WeakSender {
1540 chan: self.chan.downgrade(),
1541 }
1542 }
1543
1544 /// Returns the maximum buffer capacity of the channel.
1545 ///
1546 /// The maximum capacity is the buffer capacity initially specified when calling
1547 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1548 /// available buffer capacity: as messages are sent and received, the
1549 /// value returned by [`capacity`] will go up or down, whereas the value
1550 /// returned by [`max_capacity`] will remain constant.
1551 ///
1552 /// # Examples
1553 ///
1554 /// ```
1555 /// use tokio::sync::mpsc;
1556 ///
1557 /// #[tokio::main]
1558 /// async fn main() {
1559 /// let (tx, _rx) = mpsc::channel::<()>(5);
1560 ///
1561 /// // both max capacity and capacity are the same at first
1562 /// assert_eq!(tx.max_capacity(), 5);
1563 /// assert_eq!(tx.capacity(), 5);
1564 ///
1565 /// // Making a reservation doesn't change the max capacity.
1566 /// let permit = tx.reserve().await.unwrap();
1567 /// assert_eq!(tx.max_capacity(), 5);
1568 /// // but drops the capacity by one
1569 /// assert_eq!(tx.capacity(), 4);
1570 /// }
1571 /// ```
1572 ///
1573 /// [`channel`]: channel
1574 /// [`max_capacity`]: Sender::max_capacity
1575 /// [`capacity`]: Sender::capacity
1576 pub fn max_capacity(&self) -> usize {
1577 self.chan.semaphore().bound
1578 }
1579
1580 /// Returns the number of [`Sender`] handles.
1581 pub fn strong_count(&self) -> usize {
1582 self.chan.strong_count()
1583 }
1584
1585 /// Returns the number of [`WeakSender`] handles.
1586 pub fn weak_count(&self) -> usize {
1587 self.chan.weak_count()
1588 }
1589}
1590
1591impl<T> Clone for Sender<T> {
1592 fn clone(&self) -> Self {
1593 Sender {
1594 chan: self.chan.clone(),
1595 }
1596 }
1597}
1598
1599impl<T> fmt::Debug for Sender<T> {
1600 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1601 fmt.debug_struct("Sender")
1602 .field("chan", &self.chan)
1603 .finish()
1604 }
1605}
1606
1607impl<T> Clone for WeakSender<T> {
1608 fn clone(&self) -> Self {
1609 self.chan.increment_weak_count();
1610
1611 WeakSender {
1612 chan: self.chan.clone(),
1613 }
1614 }
1615}
1616
1617impl<T> Drop for WeakSender<T> {
1618 fn drop(&mut self) {
1619 self.chan.decrement_weak_count();
1620 }
1621}
1622
1623impl<T> WeakSender<T> {
1624 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1625 /// if there are other `Sender` instances alive and the channel wasn't
1626 /// previously dropped, otherwise `None` is returned.
1627 pub fn upgrade(&self) -> Option<Sender<T>> {
1628 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1629 }
1630
1631 /// Returns the number of [`Sender`] handles.
1632 pub fn strong_count(&self) -> usize {
1633 self.chan.strong_count()
1634 }
1635
1636 /// Returns the number of [`WeakSender`] handles.
1637 pub fn weak_count(&self) -> usize {
1638 self.chan.weak_count()
1639 }
1640}
1641
1642impl<T> fmt::Debug for WeakSender<T> {
1643 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1644 fmt.debug_struct("WeakSender").finish()
1645 }
1646}
1647
1648// ===== impl Permit =====
1649
1650impl<T> Permit<'_, T> {
1651 /// Sends a value using the reserved capacity.
1652 ///
1653 /// Capacity for the message has already been reserved. The message is sent
1654 /// to the receiver and the permit is consumed. The operation will succeed
1655 /// even if the receiver half has been closed. See [`Receiver::close`] for
1656 /// more details on performing a clean shutdown.
1657 ///
1658 /// [`Receiver::close`]: Receiver::close
1659 ///
1660 /// # Examples
1661 ///
1662 /// ```
1663 /// use tokio::sync::mpsc;
1664 ///
1665 /// #[tokio::main]
1666 /// async fn main() {
1667 /// let (tx, mut rx) = mpsc::channel(1);
1668 ///
1669 /// // Reserve capacity
1670 /// let permit = tx.reserve().await.unwrap();
1671 ///
1672 /// // Trying to send directly on the `tx` will fail due to no
1673 /// // available capacity.
1674 /// assert!(tx.try_send(123).is_err());
1675 ///
1676 /// // Send a message on the permit
1677 /// permit.send(456);
1678 ///
1679 /// // The value sent on the permit is received
1680 /// assert_eq!(rx.recv().await.unwrap(), 456);
1681 /// }
1682 /// ```
1683 pub fn send(self, value: T) {
1684 use std::mem;
1685
1686 self.chan.send(value);
1687
1688 // Avoid the drop logic
1689 mem::forget(self);
1690 }
1691}
1692
1693impl<T> Drop for Permit<'_, T> {
1694 fn drop(&mut self) {
1695 use chan::Semaphore;
1696
1697 let semaphore = self.chan.semaphore();
1698
1699 // Add the permit back to the semaphore
1700 semaphore.add_permit();
1701
1702 // If this is the last sender for this channel, wake the receiver so
1703 // that it can be notified that the channel is closed.
1704 if semaphore.is_closed() && semaphore.is_idle() {
1705 self.chan.wake_rx();
1706 }
1707 }
1708}
1709
1710impl<T> fmt::Debug for Permit<'_, T> {
1711 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1712 fmt.debug_struct("Permit")
1713 .field("chan", &self.chan)
1714 .finish()
1715 }
1716}
1717
1718// ===== impl PermitIterator =====
1719
1720impl<'a, T> Iterator for PermitIterator<'a, T> {
1721 type Item = Permit<'a, T>;
1722
1723 fn next(&mut self) -> Option<Self::Item> {
1724 if self.n == 0 {
1725 return None;
1726 }
1727
1728 self.n -= 1;
1729 Some(Permit { chan: self.chan })
1730 }
1731
1732 fn size_hint(&self) -> (usize, Option<usize>) {
1733 let n = self.n;
1734 (n, Some(n))
1735 }
1736}
1737impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1738impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1739
1740impl<T> Drop for PermitIterator<'_, T> {
1741 fn drop(&mut self) {
1742 use chan::Semaphore;
1743
1744 if self.n == 0 {
1745 return;
1746 }
1747
1748 let semaphore = self.chan.semaphore();
1749
1750 // Add the remaining permits back to the semaphore
1751 semaphore.add_permits(self.n);
1752
1753 // If this is the last sender for this channel, wake the receiver so
1754 // that it can be notified that the channel is closed.
1755 if semaphore.is_closed() && semaphore.is_idle() {
1756 self.chan.wake_rx();
1757 }
1758 }
1759}
1760
1761impl<T> fmt::Debug for PermitIterator<'_, T> {
1762 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1763 fmt.debug_struct("PermitIterator")
1764 .field("chan", &self.chan)
1765 .field("capacity", &self.n)
1766 .finish()
1767 }
1768}
1769
1770// ===== impl Permit =====
1771
1772impl<T> OwnedPermit<T> {
1773 /// Sends a value using the reserved capacity.
1774 ///
1775 /// Capacity for the message has already been reserved. The message is sent
1776 /// to the receiver and the permit is consumed. The operation will succeed
1777 /// even if the receiver half has been closed. See [`Receiver::close`] for
1778 /// more details on performing a clean shutdown.
1779 ///
1780 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1781 /// the `OwnedPermit` was reserved.
1782 ///
1783 /// [`Receiver::close`]: Receiver::close
1784 ///
1785 /// # Examples
1786 ///
1787 /// ```
1788 /// use tokio::sync::mpsc;
1789 ///
1790 /// #[tokio::main]
1791 /// async fn main() {
1792 /// let (tx, mut rx) = mpsc::channel(1);
1793 ///
1794 /// // Reserve capacity
1795 /// let permit = tx.reserve_owned().await.unwrap();
1796 ///
1797 /// // Send a message on the permit, returning the sender.
1798 /// let tx = permit.send(456);
1799 ///
1800 /// // The value sent on the permit is received
1801 /// assert_eq!(rx.recv().await.unwrap(), 456);
1802 ///
1803 /// // We may now reuse `tx` to send another message.
1804 /// tx.send(789).await.unwrap();
1805 /// }
1806 /// ```
1807 pub fn send(mut self, value: T) -> Sender<T> {
1808 let chan = self.chan.take().unwrap_or_else(|| {
1809 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1810 });
1811 chan.send(value);
1812
1813 Sender { chan }
1814 }
1815
1816 /// Releases the reserved capacity *without* sending a message, returning the
1817 /// [`Sender`].
1818 ///
1819 /// # Examples
1820 ///
1821 /// ```
1822 /// use tokio::sync::mpsc;
1823 ///
1824 /// #[tokio::main]
1825 /// async fn main() {
1826 /// let (tx, rx) = mpsc::channel(1);
1827 ///
1828 /// // Clone the sender and reserve capacity
1829 /// let permit = tx.clone().reserve_owned().await.unwrap();
1830 ///
1831 /// // Trying to send on the original `tx` will fail, since the `permit`
1832 /// // has reserved all the available capacity.
1833 /// assert!(tx.try_send(123).is_err());
1834 ///
1835 /// // Release the permit without sending a message, returning the clone
1836 /// // of the sender.
1837 /// let tx2 = permit.release();
1838 ///
1839 /// // We may now reuse `tx` to send another message.
1840 /// tx.send(789).await.unwrap();
1841 /// # drop(rx); drop(tx2);
1842 /// }
1843 /// ```
1844 ///
1845 /// [`Sender`]: Sender
1846 pub fn release(mut self) -> Sender<T> {
1847 use chan::Semaphore;
1848
1849 let chan = self.chan.take().unwrap_or_else(|| {
1850 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1851 });
1852
1853 // Add the permit back to the semaphore
1854 chan.semaphore().add_permit();
1855 Sender { chan }
1856 }
1857}
1858
1859impl<T> Drop for OwnedPermit<T> {
1860 fn drop(&mut self) {
1861 use chan::Semaphore;
1862
1863 // Are we still holding onto the sender?
1864 if let Some(chan) = self.chan.take() {
1865 let semaphore = chan.semaphore();
1866
1867 // Add the permit back to the semaphore
1868 semaphore.add_permit();
1869
1870 // If this `OwnedPermit` is holding the last sender for this
1871 // channel, wake the receiver so that it can be notified that the
1872 // channel is closed.
1873 if semaphore.is_closed() && semaphore.is_idle() {
1874 chan.wake_rx();
1875 }
1876 }
1877
1878 // Otherwise, do nothing.
1879 }
1880}
1881
1882impl<T> fmt::Debug for OwnedPermit<T> {
1883 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1884 fmt.debug_struct("OwnedPermit")
1885 .field("chan", &self.chan)
1886 .finish()
1887 }
1888}