postage/channels/
dispatch.rs

1//! A fixed-capacity multi-producer, multi-consumer queue.  At most one receiver will observe each value.
2//!
3//! Senders and recievers can be cloned, and additional recievers can be created with `tx.subscribe()`
4//!
5//! The producer can be cloned, and the sender task is suspended if the channel becomes full.
6
7use std::fmt;
8
9use super::SendMessage;
10use crate::{
11    sink::{PollSend, Sink},
12    stream::{PollRecv, Stream},
13    sync::{shared, ReceiverShared, SenderShared},
14};
15use crossbeam_queue::ArrayQueue;
16use static_assertions::assert_impl_all;
17
18/// Constructs a pair of dispatch endpoints, with a fixed-size buffer of the given capacity
19pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
20    #[cfg(feature = "debug")]
21    log::error!("Creating dispatch channel with capacity {}", capacity);
22    let (tx_shared, rx_shared) = shared(StateExtension::new(capacity));
23    let sender = Sender { shared: tx_shared };
24
25    let receiver = Receiver { shared: rx_shared };
26
27    (sender, receiver)
28}
29
30/// The sender half of a dispatch channel.  Can send messages with the `postage::Sink` trait.
31///
32/// Can be cloned.
33pub struct Sender<T> {
34    shared: SenderShared<StateExtension<T>>,
35}
36
37assert_impl_all!(Sender<String>: Clone, Send, Sync, fmt::Debug);
38
39impl<T> Clone for Sender<T> {
40    fn clone(&self) -> Self {
41        Self {
42            shared: self.shared.clone(),
43        }
44    }
45}
46
47impl<T> Sink for Sender<T> {
48    type Item = T;
49
50    fn poll_send(
51        self: std::pin::Pin<&mut Self>,
52        cx: &mut crate::Context<'_>,
53        mut value: Self::Item,
54    ) -> PollSend<Self::Item> {
55        loop {
56            if self.shared.is_closed() {
57                return PollSend::Rejected(value);
58            }
59
60            let queue = &self.shared.extension().queue;
61            let guard = self.shared.recv_guard();
62
63            match queue.push(value) {
64                Ok(_) => {
65                    self.shared.notify_receivers();
66                    return PollSend::Ready;
67                }
68                Err(v) => {
69                    self.shared.subscribe_recv(cx);
70                    if guard.is_expired() {
71                        value = v;
72                        continue;
73                    }
74
75                    return PollSend::Pending(v);
76                }
77            }
78        }
79    }
80}
81
82impl<T> fmt::Debug for Sender<T> {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("Sender").finish()
85    }
86}
87
88#[cfg(feature = "futures-traits")]
89mod impl_futures {
90    use crate::sink::SendError;
91    use std::task::Poll;
92
93    impl<T> futures::sink::Sink<T> for super::Sender<T> {
94        type Error = SendError<T>;
95
96        fn poll_ready(
97            self: std::pin::Pin<&mut Self>,
98            cx: &mut std::task::Context<'_>,
99        ) -> Poll<Result<(), Self::Error>> {
100            loop {
101                if self.shared.is_closed() {
102                    return Poll::Ready(Ok(()));
103                }
104
105                let queue = &self.shared.extension().queue;
106                let guard = self.shared.recv_guard();
107
108                if queue.is_full() {
109                    let mut cx = cx.into();
110                    self.shared.subscribe_recv(&mut cx);
111
112                    if guard.is_expired() {
113                        continue;
114                    }
115
116                    return Poll::Pending;
117                } else {
118                    return Poll::Ready(Ok(()));
119                }
120            }
121        }
122
123        fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
124            if self.shared.is_closed() {
125                return Err(SendError(item));
126            }
127
128            let result = self
129                .shared
130                .extension()
131                .queue
132                .push(item)
133                .map_err(|item| SendError(item));
134
135            if result.is_ok() {
136                self.shared.notify_receivers();
137            }
138
139            result
140        }
141
142        fn poll_flush(
143            self: std::pin::Pin<&mut Self>,
144            _cx: &mut std::task::Context<'_>,
145        ) -> Poll<Result<(), Self::Error>> {
146            Poll::Ready(Ok(()))
147        }
148
149        fn poll_close(
150            self: std::pin::Pin<&mut Self>,
151            _cx: &mut std::task::Context<'_>,
152        ) -> Poll<Result<(), Self::Error>> {
153            Poll::Ready(Ok(()))
154        }
155    }
156}
157
158impl<T> Sender<T> {
159    /// Creates a new Receiver that listens to this channel.
160    pub fn subscribe(&self) -> Receiver<T> {
161        Receiver {
162            shared: self.shared.clone_receiver(),
163        }
164    }
165}
166
167/// The receiver half of a dispatch channel.
168///
169/// Can receive messages with the `postage::Stream` trait.
170pub struct Receiver<T> {
171    shared: ReceiverShared<StateExtension<T>>,
172}
173
174assert_impl_all!(Receiver<SendMessage>: Clone, Send, Sync, fmt::Debug);
175
176impl<T> Stream for Receiver<T> {
177    type Item = T;
178
179    fn poll_recv(
180        self: std::pin::Pin<&mut Self>,
181        cx: &mut crate::Context<'_>,
182    ) -> PollRecv<Self::Item> {
183        loop {
184            let guard = self.shared.send_guard();
185            match self.shared.extension().queue.pop() {
186                Some(v) => {
187                    self.shared.notify_senders();
188                    return PollRecv::Ready(v);
189                }
190                None => {
191                    if self.shared.is_closed() {
192                        return PollRecv::Closed;
193                    }
194
195                    self.shared.subscribe_send(cx);
196                    if guard.is_expired() {
197                        continue;
198                    }
199
200                    return PollRecv::Pending;
201                }
202            }
203        }
204    }
205}
206
207impl<T> Clone for Receiver<T> {
208    fn clone(&self) -> Self {
209        Self {
210            shared: self.shared.clone(),
211        }
212    }
213}
214
215impl<T> fmt::Debug for Receiver<T> {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        f.debug_struct("Receiver").finish()
218    }
219}
220
221struct StateExtension<T> {
222    queue: ArrayQueue<T>,
223}
224
225impl<T> StateExtension<T> {
226    pub fn new(capacity: usize) -> Self {
227        Self {
228            queue: ArrayQueue::new(capacity),
229        }
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::{pin::Pin, task::Context};
236
237    use crate::{
238        sink::{PollSend, Sink},
239        stream::{PollRecv, Stream},
240        test::{noop_context, panic_context},
241    };
242    use futures_test::task::new_count_waker;
243
244    use super::{channel, Receiver, Sender};
245
246    fn pin<'a, 'b>(
247        chan: &mut (Sender<Message>, Receiver<Message>),
248    ) -> (Pin<&mut Sender<Message>>, Pin<&mut Receiver<Message>>) {
249        let tx = Pin::new(&mut chan.0);
250        let rx = Pin::new(&mut chan.1);
251
252        (tx, rx)
253    }
254
255    #[derive(Debug, PartialEq, Eq)]
256    struct Message(usize);
257
258    #[test]
259    fn send_accepted() {
260        let mut cx = panic_context();
261        let mut chan = channel(2);
262        let (tx, _) = pin(&mut chan);
263
264        assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
265    }
266
267    #[test]
268    fn send_blocks() {
269        let mut cx = panic_context();
270        let (mut tx, _rx) = channel(2);
271
272        assert_eq!(
273            PollSend::Ready,
274            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
275        );
276        assert_eq!(
277            PollSend::Ready,
278            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
279        );
280    }
281
282    #[test]
283    fn send_recv() {
284        let mut cx = panic_context();
285        let (mut tx, mut rx) = channel(2);
286
287        assert_eq!(
288            PollSend::Ready,
289            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
290        );
291        assert_eq!(
292            PollSend::Ready,
293            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
294        );
295        assert_eq!(
296            PollSend::Pending(Message(3)),
297            Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
298        );
299
300        assert_eq!(
301            PollRecv::Ready(Message(1)),
302            Pin::new(&mut rx).poll_recv(&mut cx)
303        );
304
305        assert_eq!(
306            PollRecv::Ready(Message(2)),
307            Pin::new(&mut rx).poll_recv(&mut cx)
308        );
309
310        assert_eq!(
311            PollRecv::Pending,
312            Pin::new(&mut rx).poll_recv(&mut noop_context())
313        );
314    }
315
316    #[test]
317    fn sender_disconnect() {
318        let mut cx = panic_context();
319        let (mut tx, mut rx) = channel(100);
320        let mut tx2 = tx.clone();
321
322        assert_eq!(
323            PollSend::Ready,
324            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
325        );
326
327        assert_eq!(
328            PollSend::Ready,
329            Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
330        );
331
332        drop(tx);
333        drop(tx2);
334
335        assert_eq!(
336            PollRecv::Ready(Message(1)),
337            Pin::new(&mut rx).poll_recv(&mut cx)
338        );
339
340        assert_eq!(
341            PollRecv::Ready(Message(2)),
342            Pin::new(&mut rx).poll_recv(&mut cx)
343        );
344
345        assert_eq!(PollRecv::Closed, Pin::new(&mut rx).poll_recv(&mut cx));
346    }
347
348    #[test]
349    fn receiver_disconnect() {
350        let mut cx = panic_context();
351        let (mut tx, rx) = channel(100);
352        let mut tx2 = tx.clone();
353
354        assert_eq!(
355            PollSend::Ready,
356            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
357        );
358
359        assert_eq!(
360            PollSend::Ready,
361            Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
362        );
363
364        drop(rx);
365
366        assert_eq!(
367            PollSend::Rejected(Message(3)),
368            Pin::new(&mut tx).poll_send(&mut cx, Message(3))
369        );
370
371        assert_eq!(
372            PollSend::Rejected(Message(4)),
373            Pin::new(&mut tx2).poll_send(&mut cx, Message(4))
374        );
375    }
376
377    #[test]
378    fn wake_sender() {
379        let mut cx = panic_context();
380        let (mut tx, mut rx) = channel(1);
381
382        assert_eq!(
383            PollSend::Ready,
384            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
385        );
386
387        let (w2, w2_count) = new_count_waker();
388        let w2_context = Context::from_waker(&w2);
389        assert_eq!(
390            PollSend::Pending(Message(2)),
391            Pin::new(&mut tx).poll_send(&mut w2_context.into(), Message(2))
392        );
393
394        assert_eq!(0, w2_count.get());
395
396        assert_eq!(
397            PollRecv::Ready(Message(1)),
398            Pin::new(&mut rx).poll_recv(&mut cx)
399        );
400
401        assert_eq!(1, w2_count.get());
402        assert_eq!(
403            PollRecv::Pending,
404            Pin::new(&mut rx).poll_recv(&mut noop_context())
405        );
406
407        assert_eq!(1, w2_count.get());
408    }
409
410    #[test]
411    fn wake_receiver() {
412        let mut cx = panic_context();
413        let (mut tx, mut rx) = channel(100);
414
415        let (w1, w1_count) = new_count_waker();
416        let w1_context = Context::from_waker(&w1);
417
418        assert_eq!(
419            PollRecv::Pending,
420            Pin::new(&mut rx).poll_recv(&mut w1_context.into())
421        );
422
423        assert_eq!(0, w1_count.get());
424
425        assert_eq!(
426            PollSend::Ready,
427            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
428        );
429
430        assert_eq!(1, w1_count.get());
431
432        assert_eq!(
433            PollSend::Ready,
434            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
435        );
436
437        assert_eq!(1, w1_count.get());
438    }
439
440    #[test]
441    fn wake_sender_on_disconnect() {
442        let (mut tx, rx) = channel(1);
443
444        let (w1, w1_count) = new_count_waker();
445        let w1_context = Context::from_waker(&w1);
446        let mut w1_context: crate::Context<'_> = w1_context.into();
447
448        assert_eq!(
449            PollSend::Ready,
450            Pin::new(&mut tx).poll_send(&mut w1_context, Message(1))
451        );
452
453        assert_eq!(
454            PollSend::Pending(Message(2)),
455            Pin::new(&mut tx).poll_send(&mut w1_context, Message(2))
456        );
457
458        assert_eq!(0, w1_count.get());
459
460        drop(rx);
461
462        assert_eq!(1, w1_count.get());
463    }
464
465    #[test]
466    fn wake_receivers_on_disconnect() {
467        let (tx, mut rx) = channel::<()>(100);
468        let mut rx2 = rx.clone();
469
470        let (w1, w1_count) = new_count_waker();
471        let w1_context = Context::from_waker(&w1);
472
473        let (w2, w2_count) = new_count_waker();
474        let w2_context = Context::from_waker(&w2);
475
476        assert_eq!(
477            PollRecv::Pending,
478            Pin::new(&mut rx).poll_recv(&mut w1_context.into())
479        );
480
481        assert_eq!(
482            PollRecv::Pending,
483            Pin::new(&mut rx2).poll_recv(&mut w2_context.into())
484        );
485
486        assert_eq!(0, w1_count.get());
487        assert_eq!(0, w2_count.get());
488
489        drop(tx);
490
491        assert_eq!(1, w1_count.get());
492        assert_eq!(1, w2_count.get());
493    }
494
495    #[test]
496    fn multi_receiver() {
497        let mut cx = noop_context();
498        let (mut tx, mut rx) = channel(100);
499        let mut rx2 = rx.clone();
500
501        assert_eq!(
502            PollSend::Ready,
503            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
504        );
505        assert_eq!(
506            PollSend::Ready,
507            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
508        );
509
510        assert_eq!(
511            PollRecv::Ready(Message(1)),
512            Pin::new(&mut rx).poll_recv(&mut cx)
513        );
514        assert_eq!(
515            PollRecv::Ready(Message(2)),
516            Pin::new(&mut rx2).poll_recv(&mut cx)
517        );
518
519        assert_eq!(PollRecv::Pending, Pin::new(&mut rx).poll_recv(&mut cx));
520        assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
521    }
522}
523
524#[cfg(test)]
525mod tokio_tests {
526    use std::time::Duration;
527
528    use tokio::{
529        task::{spawn, JoinHandle},
530        time::{sleep, timeout},
531    };
532
533    use crate::{
534        sink::Sink,
535        stream::Stream,
536        test::{
537            capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
538            CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
539        },
540    };
541
542    #[tokio::test(flavor = "multi_thread")]
543    async fn simple() {
544        // crate::logging::enable_log();
545
546        for cap in capacity_iter() {
547            let (mut tx, mut rx) = super::channel(cap);
548
549            let join = spawn(async move {
550                for message in Message::new_iter(0) {
551                    tx.send(message).await.expect("send failed");
552                }
553            });
554
555            let rx_handle = spawn(async move {
556                let mut channel = Channel::new(0);
557                while let Some(message) = rx.recv().await {
558                    channel.assert_message(&message);
559                }
560                join.await.expect("Join failed");
561            });
562
563            timeout(TEST_TIMEOUT, rx_handle)
564                .await
565                .expect("test timeout")
566                .expect("join error");
567        }
568    }
569
570    #[tokio::test(flavor = "multi_thread")]
571    async fn multi_sender() {
572        for cap in capacity_iter() {
573            let (tx, mut rx) = super::channel(cap);
574
575            for i in 0..CHANNEL_TEST_SENDERS {
576                let mut tx2 = tx.clone();
577                spawn(async move {
578                    for message in Message::new_multi_sender(i) {
579                        tx2.send(message).await.expect("send failed");
580                    }
581                });
582            }
583
584            drop(tx);
585
586            let rx_handle = spawn(async move {
587                let mut channel = Channels::new(CHANNEL_TEST_SENDERS);
588                while let Some(message) = rx.recv().await {
589                    channel.assert_message(&message);
590                }
591            });
592
593            timeout(TEST_TIMEOUT, rx_handle)
594                .await
595                .expect("test timeout")
596                .expect("join error");
597        }
598    }
599
600    #[tokio::test(flavor = "multi_thread")]
601    async fn multi_receiver() {
602        // crate::logging::enable_log();
603        for cap in capacity_iter() {
604            let (mut tx, rx) = super::channel(cap);
605
606            spawn(async move {
607                for message in Message::new_iter(0) {
608                    tx.send(message).await.expect("send failed");
609                }
610            });
611
612            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
613                .map(|_| {
614                    let mut rx2 = rx.clone();
615                    let mut channels = Channels::new(1).allow_skips();
616
617                    spawn(async move {
618                        while let Some(message) = rx2.recv().await {
619                            channels.assert_message(&message);
620                        }
621                    })
622                })
623                .collect();
624
625            drop(rx);
626
627            let rx_handle = spawn(async move {
628                for handle in handles {
629                    handle.await.expect("Assertion failure");
630                }
631            });
632
633            timeout(TEST_TIMEOUT, rx_handle)
634                .await
635                .expect("test timeout")
636                .expect("join failure");
637        }
638    }
639
640    #[tokio::test(flavor = "multi_thread")]
641    async fn multi_sender_multi_receiver() {
642        // crate::logging::enable_log();
643        for cap in capacity_iter() {
644            let (tx, rx) = super::channel(cap);
645
646            for i in 0..CHANNEL_TEST_SENDERS {
647                let mut tx2 = tx.clone();
648                spawn(async move {
649                    for message in Message::new_multi_sender(i) {
650                        tx2.send(message).await.expect("send failed");
651                    }
652                });
653            }
654
655            drop(tx);
656
657            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
658                .map(|_i| {
659                    let mut rx2 = rx.clone();
660                    let mut channels = Channels::new(CHANNEL_TEST_SENDERS).allow_skips();
661
662                    spawn(async move {
663                        while let Some(message) = rx2.recv().await {
664                            channels.assert_message(&message);
665                        }
666                    })
667                })
668                .collect();
669
670            drop(rx);
671
672            let rx_handle = spawn(async move {
673                for handle in handles {
674                    handle.await.expect("Assertion failure");
675                }
676            });
677
678            timeout(TEST_TIMEOUT, rx_handle)
679                .await
680                .expect("test timeout")
681                .expect("join failure");
682        }
683    }
684
685    #[tokio::test(flavor = "multi_thread")]
686    async fn clone_monster() {
687        for cap in capacity_iter() {
688            // SimpleLogger::new()
689            //     .with_level(LevelFilter::Warn)
690            //     .init()
691            //     .unwrap();
692
693            let (tx, mut rx) = super::channel(cap);
694            let (mut barrier, mut sender_quit) = crate::barrier::channel();
695
696            let mut tx2 = tx.clone();
697            spawn(async move {
698                for message in Message::new_iter(0) {
699                    tx2.send(message).await.expect("send failed");
700                }
701
702                barrier.send(()).await.expect("clone task shutdown failed");
703            });
704
705            spawn(async move {
706                loop {
707                    if let Ok(_) = sender_quit.try_recv() {
708                        break;
709                    }
710
711                    let tx2 = tx.clone();
712                    let rx2 = tx.subscribe();
713                    let rx3 = rx2.clone();
714                    sleep(Duration::from_micros(100)).await;
715                    drop(tx2);
716                    drop(rx2);
717                    drop(rx3);
718
719                    sleep(Duration::from_micros(50)).await;
720                }
721            });
722
723            let rx_handle = spawn(async move {
724                let mut channel = Channel::new(0);
725
726                while let Some(message) = rx.recv().await {
727                    channel.assert_message(&message);
728                }
729            });
730
731            timeout(TEST_TIMEOUT, rx_handle)
732                .await
733                .expect("test timeout")
734                .expect("join failed");
735        }
736    }
737}
738
739#[cfg(test)]
740mod async_std_tests {
741    use std::time::Duration;
742
743    use async_std::{
744        future::timeout,
745        task::{self, spawn, JoinHandle},
746    };
747
748    use crate::{
749        sink::Sink,
750        stream::Stream,
751        test::{
752            capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
753            CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
754        },
755    };
756
757    #[async_std::test]
758    async fn simple() {
759        for cap in capacity_iter() {
760            let (mut tx, mut rx) = super::channel(cap);
761
762            spawn(async move {
763                for message in Message::new_iter(0) {
764                    tx.send(message).await.expect("send failed");
765                }
766            });
767
768            let rx_handle = spawn(async move {
769                let mut channel = Channel::new(0);
770                while let Some(message) = rx.recv().await {
771                    channel.assert_message(&message);
772                }
773            });
774
775            timeout(TEST_TIMEOUT, rx_handle)
776                .await
777                .expect("test timeout");
778        }
779    }
780
781    #[async_std::test]
782    async fn multi_sender() {
783        for cap in capacity_iter() {
784            let (tx, mut rx) = super::channel(cap);
785
786            for i in 0..CHANNEL_TEST_SENDERS {
787                let mut tx2 = tx.clone();
788                spawn(async move {
789                    for message in Message::new_multi_sender(i) {
790                        tx2.send(message).await.expect("send failed");
791                    }
792                });
793            }
794
795            drop(tx);
796
797            let rx_handle = spawn(async move {
798                let mut channel = Channels::new(CHANNEL_TEST_SENDERS);
799                while let Some(message) = rx.recv().await {
800                    channel.assert_message(&message);
801                }
802            });
803
804            timeout(TEST_TIMEOUT, rx_handle)
805                .await
806                .expect("test timeout");
807        }
808    }
809
810    #[async_std::test]
811    async fn multi_receiver() {
812        // crate::logging::enable_log();
813        for cap in capacity_iter() {
814            let (mut tx, rx) = super::channel(cap);
815
816            spawn(async move {
817                for message in Message::new_iter(0) {
818                    tx.send(message).await.expect("send failed");
819                }
820            });
821
822            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
823                .map(|_| {
824                    let mut rx2 = rx.clone();
825                    let mut channels = Channels::new(1).allow_skips();
826
827                    spawn(async move {
828                        while let Some(message) = rx2.recv().await {
829                            channels.assert_message(&message);
830                        }
831                    })
832                })
833                .collect();
834
835            drop(rx);
836
837            let rx_handle = spawn(async move {
838                for handle in handles {
839                    handle.await;
840                }
841            });
842
843            timeout(TEST_TIMEOUT, rx_handle)
844                .await
845                .expect("test timeout");
846        }
847    }
848
849    #[async_std::test]
850    async fn multi_sender_multi_receiver() {
851        // crate::logging::enable_log();
852
853        for cap in capacity_iter() {
854            let (tx, rx) = super::channel(cap);
855
856            for i in 0..CHANNEL_TEST_SENDERS {
857                let mut tx2 = tx.clone();
858                spawn(async move {
859                    for message in Message::new_multi_sender(i) {
860                        tx2.send(message).await.expect("send failed");
861                    }
862                });
863            }
864
865            drop(tx);
866
867            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
868                .map(|_i| {
869                    let mut rx2 = rx.clone();
870                    let mut channels = Channels::new(CHANNEL_TEST_SENDERS).allow_skips();
871
872                    spawn(async move {
873                        while let Some(message) = rx2.recv().await {
874                            channels.assert_message(&message);
875                        }
876                    })
877                })
878                .collect();
879
880            drop(rx);
881
882            let rx_handle = spawn(async move {
883                for handle in handles {
884                    handle.await;
885                }
886            });
887
888            timeout(TEST_TIMEOUT, rx_handle)
889                .await
890                .expect("test timeout");
891        }
892    }
893
894    #[tokio::test(flavor = "multi_thread")]
895    async fn clone_monster() {
896        // crate::logging::enable_log();
897
898        for cap in capacity_iter() {
899            let (tx, mut rx) = super::channel(cap);
900            let (mut barrier, mut sender_quit) = crate::barrier::channel();
901
902            let mut tx2 = tx.clone();
903            spawn(async move {
904                for message in Message::new_iter(0) {
905                    tx2.send(message).await.expect("send failed");
906                }
907
908                barrier.send(()).await.expect("clone task shutdown failed");
909            });
910
911            spawn(async move {
912                loop {
913                    if let Ok(_) = sender_quit.try_recv() {
914                        break;
915                    }
916
917                    let tx2 = tx.clone();
918                    let rx2 = tx.subscribe();
919                    let rx3 = rx2.clone();
920                    task::sleep(Duration::from_micros(100)).await;
921
922                    drop(tx2);
923                    drop(rx2);
924                    drop(rx3);
925
926                    task::sleep(Duration::from_micros(50)).await;
927                }
928            });
929
930            let rx_handle = spawn(async move {
931                let mut channel = Channel::new(0);
932
933                while let Some(message) = rx.recv().await {
934                    channel.assert_message(&message);
935                }
936            });
937
938            timeout(TEST_TIMEOUT, rx_handle)
939                .await
940                .expect("test timeout");
941        }
942    }
943}