tokio/sync/mpsc/unbounded.rs
1use crate::loom::sync::{atomic::AtomicUsize, Arc};
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the [`unbounded_channel`] function.
11pub struct UnboundedSender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15/// An unbounded sender that does not prevent the channel from being closed.
16///
17/// If all [`UnboundedSender`] instances of a channel were dropped and only
18/// `WeakUnboundedSender` instances remain, the channel is closed.
19///
20/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
21/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
22/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
23///
24/// [`UnboundedSender`]: UnboundedSender
25/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
26///
27/// # Examples
28///
29/// ```
30/// use tokio::sync::mpsc::unbounded_channel;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// let (tx, _rx) = unbounded_channel::<i32>();
35/// let tx_weak = tx.downgrade();
36///
37/// // Upgrading will succeed because `tx` still exists.
38/// assert!(tx_weak.upgrade().is_some());
39///
40/// // If we drop `tx`, then it will fail.
41/// drop(tx);
42/// assert!(tx_weak.clone().upgrade().is_none());
43/// }
44/// ```
45pub struct WeakUnboundedSender<T> {
46 chan: Arc<chan::Chan<T, Semaphore>>,
47}
48
49impl<T> Clone for UnboundedSender<T> {
50 fn clone(&self) -> Self {
51 UnboundedSender {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57impl<T> fmt::Debug for UnboundedSender<T> {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("UnboundedSender")
60 .field("chan", &self.chan)
61 .finish()
62 }
63}
64
65/// Receive values from the associated `UnboundedSender`.
66///
67/// Instances are created by the [`unbounded_channel`] function.
68///
69/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
70///
71/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
72pub struct UnboundedReceiver<T> {
73 /// The channel receiver
74 chan: chan::Rx<T, Semaphore>,
75}
76
77impl<T> fmt::Debug for UnboundedReceiver<T> {
78 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
79 fmt.debug_struct("UnboundedReceiver")
80 .field("chan", &self.chan)
81 .finish()
82 }
83}
84
85/// Creates an unbounded mpsc channel for communicating between asynchronous
86/// tasks without backpressure.
87///
88/// A `send` on this channel will always succeed as long as the receive half has
89/// not been closed. If the receiver falls behind, messages will be arbitrarily
90/// buffered.
91///
92/// **Note** that the amount of available system memory is an implicit bound to
93/// the channel. Using an `unbounded` channel has the ability of causing the
94/// process to run out of memory. In this case, the process will be aborted.
95pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
96 let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
97
98 let tx = UnboundedSender::new(tx);
99 let rx = UnboundedReceiver::new(rx);
100
101 (tx, rx)
102}
103
104/// No capacity
105#[derive(Debug)]
106pub(crate) struct Semaphore(pub(crate) AtomicUsize);
107
108impl<T> UnboundedReceiver<T> {
109 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
110 UnboundedReceiver { chan }
111 }
112
113 /// Receives the next value for this receiver.
114 ///
115 /// This method returns `None` if the channel has been closed and there are
116 /// no remaining messages in the channel's buffer. This indicates that no
117 /// further values can ever be received from this `Receiver`. The channel is
118 /// closed when all senders have been dropped, or when [`close`] is called.
119 ///
120 /// If there are no messages in the channel's buffer, but the channel has
121 /// not yet been closed, this method will sleep until a message is sent or
122 /// the channel is closed.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. If `recv` is used as the event in a
127 /// [`tokio::select!`](crate::select) statement and some other branch
128 /// completes first, it is guaranteed that no messages were received on this
129 /// channel.
130 ///
131 /// [`close`]: Self::close
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::mpsc;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = mpsc::unbounded_channel();
141 ///
142 /// tokio::spawn(async move {
143 /// tx.send("hello").unwrap();
144 /// });
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(None, rx.recv().await);
148 /// }
149 /// ```
150 ///
151 /// Values are buffered:
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::unbounded_channel();
159 ///
160 /// tx.send("hello").unwrap();
161 /// tx.send("world").unwrap();
162 ///
163 /// assert_eq!(Some("hello"), rx.recv().await);
164 /// assert_eq!(Some("world"), rx.recv().await);
165 /// }
166 /// ```
167 pub async fn recv(&mut self) -> Option<T> {
168 use std::future::poll_fn;
169
170 poll_fn(|cx| self.poll_recv(cx)).await
171 }
172
173 /// Receives the next values for this receiver and extends `buffer`.
174 ///
175 /// This method extends `buffer` by no more than a fixed number of values
176 /// as specified by `limit`. If `limit` is zero, the function returns
177 /// immediately with `0`. The return value is the number of values added to
178 /// `buffer`.
179 ///
180 /// For `limit > 0`, if there are no messages in the channel's queue,
181 /// but the channel has not yet been closed, this method will sleep
182 /// until a message is sent or the channel is closed.
183 ///
184 /// For non-zero values of `limit`, this method will never return `0` unless
185 /// the channel has been closed and there are no remaining messages in the
186 /// channel's queue. This indicates that no further values can ever be
187 /// received from this `Receiver`. The channel is closed when all senders
188 /// have been dropped, or when [`close`] is called.
189 ///
190 /// The capacity of `buffer` is increased as needed.
191 ///
192 /// # Cancel safety
193 ///
194 /// This method is cancel safe. If `recv_many` is used as the event in a
195 /// [`tokio::select!`](crate::select) statement and some other branch
196 /// completes first, it is guaranteed that no messages were received on this
197 /// channel.
198 ///
199 /// [`close`]: Self::close
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::sync::mpsc;
205 ///
206 /// #[tokio::main]
207 /// async fn main() {
208 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
209 /// let limit = 2;
210 /// let (tx, mut rx) = mpsc::unbounded_channel();
211 /// let tx2 = tx.clone();
212 /// tx2.send("first").unwrap();
213 /// tx2.send("second").unwrap();
214 /// tx2.send("third").unwrap();
215 ///
216 /// // Call `recv_many` to receive up to `limit` (2) values.
217 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
218 /// assert_eq!(vec!["first", "second"], buffer);
219 ///
220 /// // If the buffer is full, the next call to `recv_many`
221 /// // reserves additional capacity.
222 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
223 ///
224 /// tokio::spawn(async move {
225 /// tx.send("fourth").unwrap();
226 /// });
227 ///
228 /// // 'tx' is dropped, but `recv_many`
229 /// // is guaranteed not to return 0 as the channel
230 /// // is not yet closed.
231 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
232 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
233 ///
234 /// // Once the last sender is dropped, the channel is
235 /// // closed and `recv_many` returns 0, capacity unchanged.
236 /// drop(tx2);
237 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
238 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
239 /// }
240 /// ```
241 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
242 use std::future::poll_fn;
243 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
244 }
245
246 /// Tries to receive the next value for this receiver.
247 ///
248 /// This method returns the [`Empty`] error if the channel is currently
249 /// empty, but there are still outstanding [senders] or [permits].
250 ///
251 /// This method returns the [`Disconnected`] error if the channel is
252 /// currently empty, and there are no outstanding [senders] or [permits].
253 ///
254 /// Unlike the [`poll_recv`] method, this method will never return an
255 /// [`Empty`] error spuriously.
256 ///
257 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
258 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
259 /// [`poll_recv`]: Self::poll_recv
260 /// [senders]: crate::sync::mpsc::Sender
261 /// [permits]: crate::sync::mpsc::Permit
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use tokio::sync::mpsc;
267 /// use tokio::sync::mpsc::error::TryRecvError;
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let (tx, mut rx) = mpsc::unbounded_channel();
272 ///
273 /// tx.send("hello").unwrap();
274 ///
275 /// assert_eq!(Ok("hello"), rx.try_recv());
276 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
277 ///
278 /// tx.send("hello").unwrap();
279 /// // Drop the last sender, closing the channel.
280 /// drop(tx);
281 ///
282 /// assert_eq!(Ok("hello"), rx.try_recv());
283 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
284 /// }
285 /// ```
286 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
287 self.chan.try_recv()
288 }
289
290 /// Blocking receive to call outside of asynchronous contexts.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if called within an asynchronous execution
295 /// context.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use std::thread;
301 /// use tokio::sync::mpsc;
302 ///
303 /// #[tokio::main]
304 /// async fn main() {
305 /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
306 ///
307 /// let sync_code = thread::spawn(move || {
308 /// assert_eq!(Some(10), rx.blocking_recv());
309 /// });
310 ///
311 /// let _ = tx.send(10);
312 /// sync_code.join().unwrap();
313 /// }
314 /// ```
315 #[track_caller]
316 #[cfg(feature = "sync")]
317 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
318 pub fn blocking_recv(&mut self) -> Option<T> {
319 crate::future::block_on(self.recv())
320 }
321
322 /// Variant of [`Self::recv_many`] for blocking contexts.
323 ///
324 /// The same conditions as in [`Self::blocking_recv`] apply.
325 #[track_caller]
326 #[cfg(feature = "sync")]
327 #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
328 pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
329 crate::future::block_on(self.recv_many(buffer, limit))
330 }
331
332 /// Closes the receiving half of a channel, without dropping it.
333 ///
334 /// This prevents any further messages from being sent on the channel while
335 /// still enabling the receiver to drain messages that are buffered.
336 ///
337 /// To guarantee that no messages are dropped, after calling `close()`,
338 /// `recv()` must be called until `None` is returned.
339 pub fn close(&mut self) {
340 self.chan.close();
341 }
342
343 /// Checks if a channel is closed.
344 ///
345 /// This method returns `true` if the channel has been closed. The channel is closed
346 /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
347 ///
348 /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
349 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
350 ///
351 /// # Examples
352 /// ```
353 /// use tokio::sync::mpsc;
354 ///
355 /// #[tokio::main]
356 /// async fn main() {
357 /// let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
358 /// assert!(!rx.is_closed());
359 ///
360 /// rx.close();
361 ///
362 /// assert!(rx.is_closed());
363 /// }
364 /// ```
365 pub fn is_closed(&self) -> bool {
366 self.chan.is_closed()
367 }
368
369 /// Checks if a channel is empty.
370 ///
371 /// This method returns `true` if the channel has no messages.
372 ///
373 /// # Examples
374 /// ```
375 /// use tokio::sync::mpsc;
376 ///
377 /// #[tokio::main]
378 /// async fn main() {
379 /// let (tx, rx) = mpsc::unbounded_channel();
380 /// assert!(rx.is_empty());
381 ///
382 /// tx.send(0).unwrap();
383 /// assert!(!rx.is_empty());
384 /// }
385 ///
386 /// ```
387 pub fn is_empty(&self) -> bool {
388 self.chan.is_empty()
389 }
390
391 /// Returns the number of messages in the channel.
392 ///
393 /// # Examples
394 /// ```
395 /// use tokio::sync::mpsc;
396 ///
397 /// #[tokio::main]
398 /// async fn main() {
399 /// let (tx, rx) = mpsc::unbounded_channel();
400 /// assert_eq!(0, rx.len());
401 ///
402 /// tx.send(0).unwrap();
403 /// assert_eq!(1, rx.len());
404 /// }
405 /// ```
406 pub fn len(&self) -> usize {
407 self.chan.len()
408 }
409
410 /// Polls to receive the next message on this channel.
411 ///
412 /// This method returns:
413 ///
414 /// * `Poll::Pending` if no messages are available but the channel is not
415 /// closed, or if a spurious failure happens.
416 /// * `Poll::Ready(Some(message))` if a message is available.
417 /// * `Poll::Ready(None)` if the channel has been closed and all messages
418 /// sent before it was closed have been received.
419 ///
420 /// When the method returns `Poll::Pending`, the `Waker` in the provided
421 /// `Context` is scheduled to receive a wakeup when a message is sent on any
422 /// receiver, or when the channel is closed. Note that on multiple calls to
423 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
424 /// passed to the most recent call is scheduled to receive a wakeup.
425 ///
426 /// If this method returns `Poll::Pending` due to a spurious failure, then
427 /// the `Waker` will be notified when the situation causing the spurious
428 /// failure has been resolved. Note that receiving such a wakeup does not
429 /// guarantee that the next call will succeed — it could fail with another
430 /// spurious failure.
431 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
432 self.chan.recv(cx)
433 }
434
435 /// Polls to receive multiple messages on this channel, extending the provided buffer.
436 ///
437 /// This method returns:
438 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
439 /// spurious failure happens.
440 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
441 /// stored in `buffer`. This can be less than, or equal to, `limit`.
442 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
443 ///
444 /// When the method returns `Poll::Pending`, the `Waker` in the provided
445 /// `Context` is scheduled to receive a wakeup when a message is sent on any
446 /// receiver, or when the channel is closed. Note that on multiple calls to
447 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
448 /// passed to the most recent call is scheduled to receive a wakeup.
449 ///
450 /// Note that this method does not guarantee that exactly `limit` messages
451 /// are received. Rather, if at least one message is available, it returns
452 /// as many messages as it can up to the given limit. This method returns
453 /// zero only if the channel is closed (or if `limit` is zero).
454 ///
455 /// # Examples
456 ///
457 /// ```
458 /// use std::task::{Context, Poll};
459 /// use std::pin::Pin;
460 /// use tokio::sync::mpsc;
461 /// use futures::Future;
462 ///
463 /// struct MyReceiverFuture<'a> {
464 /// receiver: mpsc::UnboundedReceiver<i32>,
465 /// buffer: &'a mut Vec<i32>,
466 /// limit: usize,
467 /// }
468 ///
469 /// impl<'a> Future for MyReceiverFuture<'a> {
470 /// type Output = usize; // Number of messages received
471 ///
472 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
473 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
474 ///
475 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
476 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
477 /// Poll::Pending => Poll::Pending,
478 /// Poll::Ready(count) => Poll::Ready(count),
479 /// }
480 /// }
481 /// }
482 ///
483 /// #[tokio::main]
484 /// async fn main() {
485 /// let (tx, rx) = mpsc::unbounded_channel::<i32>();
486 /// let mut buffer = Vec::new();
487 ///
488 /// let my_receiver_future = MyReceiverFuture {
489 /// receiver: rx,
490 /// buffer: &mut buffer,
491 /// limit: 3,
492 /// };
493 ///
494 /// for i in 0..10 {
495 /// tx.send(i).expect("Unable to send integer");
496 /// }
497 ///
498 /// let count = my_receiver_future.await;
499 /// assert_eq!(count, 3);
500 /// assert_eq!(buffer, vec![0,1,2])
501 /// }
502 /// ```
503 pub fn poll_recv_many(
504 &mut self,
505 cx: &mut Context<'_>,
506 buffer: &mut Vec<T>,
507 limit: usize,
508 ) -> Poll<usize> {
509 self.chan.recv_many(cx, buffer, limit)
510 }
511
512 /// Returns the number of [`UnboundedSender`] handles.
513 pub fn sender_strong_count(&self) -> usize {
514 self.chan.sender_strong_count()
515 }
516
517 /// Returns the number of [`WeakUnboundedSender`] handles.
518 pub fn sender_weak_count(&self) -> usize {
519 self.chan.sender_weak_count()
520 }
521}
522
523impl<T> UnboundedSender<T> {
524 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
525 UnboundedSender { chan }
526 }
527
528 /// Attempts to send a message on this `UnboundedSender` without blocking.
529 ///
530 /// This method is not marked async because sending a message to an unbounded channel
531 /// never requires any form of waiting. Because of this, the `send` method can be
532 /// used in both synchronous and asynchronous code without problems.
533 ///
534 /// If the receive half of the channel is closed, either due to [`close`]
535 /// being called or the [`UnboundedReceiver`] having been dropped, this
536 /// function returns an error. The error includes the value passed to `send`.
537 ///
538 /// [`close`]: UnboundedReceiver::close
539 /// [`UnboundedReceiver`]: UnboundedReceiver
540 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
541 if !self.inc_num_messages() {
542 return Err(SendError(message));
543 }
544
545 self.chan.send(message);
546 Ok(())
547 }
548
549 fn inc_num_messages(&self) -> bool {
550 use std::process;
551 use std::sync::atomic::Ordering::{AcqRel, Acquire};
552
553 let mut curr = self.chan.semaphore().0.load(Acquire);
554
555 loop {
556 if curr & 1 == 1 {
557 return false;
558 }
559
560 if curr == usize::MAX ^ 1 {
561 // Overflowed the ref count. There is no safe way to recover, so
562 // abort the process. In practice, this should never happen.
563 process::abort()
564 }
565
566 match self
567 .chan
568 .semaphore()
569 .0
570 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
571 {
572 Ok(_) => return true,
573 Err(actual) => {
574 curr = actual;
575 }
576 }
577 }
578 }
579
580 /// Completes when the receiver has dropped.
581 ///
582 /// This allows the producers to get notified when interest in the produced
583 /// values is canceled and immediately stop doing work.
584 ///
585 /// # Cancel safety
586 ///
587 /// This method is cancel safe. Once the channel is closed, it stays closed
588 /// forever and all future calls to `closed` will return immediately.
589 ///
590 /// # Examples
591 ///
592 /// ```
593 /// use tokio::sync::mpsc;
594 ///
595 /// #[tokio::main]
596 /// async fn main() {
597 /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
598 /// let tx2 = tx1.clone();
599 /// let tx3 = tx1.clone();
600 /// let tx4 = tx1.clone();
601 /// let tx5 = tx1.clone();
602 /// tokio::spawn(async move {
603 /// drop(rx);
604 /// });
605 ///
606 /// futures::join!(
607 /// tx1.closed(),
608 /// tx2.closed(),
609 /// tx3.closed(),
610 /// tx4.closed(),
611 /// tx5.closed()
612 /// );
613 //// println!("Receiver dropped");
614 /// }
615 /// ```
616 pub async fn closed(&self) {
617 self.chan.closed().await;
618 }
619
620 /// Checks if the channel has been closed. This happens when the
621 /// [`UnboundedReceiver`] is dropped, or when the
622 /// [`UnboundedReceiver::close`] method is called.
623 ///
624 /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
625 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
626 ///
627 /// ```
628 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
629 /// assert!(!tx.is_closed());
630 ///
631 /// let tx2 = tx.clone();
632 /// assert!(!tx2.is_closed());
633 ///
634 /// drop(rx);
635 /// assert!(tx.is_closed());
636 /// assert!(tx2.is_closed());
637 /// ```
638 pub fn is_closed(&self) -> bool {
639 self.chan.is_closed()
640 }
641
642 /// Returns `true` if senders belong to the same channel.
643 ///
644 /// # Examples
645 ///
646 /// ```
647 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
648 /// let tx2 = tx.clone();
649 /// assert!(tx.same_channel(&tx2));
650 ///
651 /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
652 /// assert!(!tx3.same_channel(&tx2));
653 /// ```
654 pub fn same_channel(&self, other: &Self) -> bool {
655 self.chan.same_channel(&other.chan)
656 }
657
658 /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
659 /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
660 /// channel were dropped and only `WeakUnboundedSender` instances remain,
661 /// the channel is closed.
662 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
663 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
664 WeakUnboundedSender {
665 chan: self.chan.downgrade(),
666 }
667 }
668
669 /// Returns the number of [`UnboundedSender`] handles.
670 pub fn strong_count(&self) -> usize {
671 self.chan.strong_count()
672 }
673
674 /// Returns the number of [`WeakUnboundedSender`] handles.
675 pub fn weak_count(&self) -> usize {
676 self.chan.weak_count()
677 }
678}
679
680impl<T> Clone for WeakUnboundedSender<T> {
681 fn clone(&self) -> Self {
682 self.chan.increment_weak_count();
683
684 WeakUnboundedSender {
685 chan: self.chan.clone(),
686 }
687 }
688}
689
690impl<T> Drop for WeakUnboundedSender<T> {
691 fn drop(&mut self) {
692 self.chan.decrement_weak_count();
693 }
694}
695
696impl<T> WeakUnboundedSender<T> {
697 /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
698 /// This will return `Some` if there are other `Sender` instances alive and
699 /// the channel wasn't previously dropped, otherwise `None` is returned.
700 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
701 chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
702 }
703
704 /// Returns the number of [`UnboundedSender`] handles.
705 pub fn strong_count(&self) -> usize {
706 self.chan.strong_count()
707 }
708
709 /// Returns the number of [`WeakUnboundedSender`] handles.
710 pub fn weak_count(&self) -> usize {
711 self.chan.weak_count()
712 }
713}
714
715impl<T> fmt::Debug for WeakUnboundedSender<T> {
716 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
717 fmt.debug_struct("WeakUnboundedSender").finish()
718 }
719}