1use std::fmt;
8
9use super::SendMessage;
10use crate::{
11 sink::{PollSend, Sink},
12 stream::{PollRecv, Stream},
13 sync::{shared, ReceiverShared, SenderShared},
14};
15use crossbeam_queue::ArrayQueue;
16use static_assertions::assert_impl_all;
17
18pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
20 #[cfg(feature = "debug")]
21 log::error!("Creating dispatch channel with capacity {}", capacity);
22 let (tx_shared, rx_shared) = shared(StateExtension::new(capacity));
23 let sender = Sender { shared: tx_shared };
24
25 let receiver = Receiver { shared: rx_shared };
26
27 (sender, receiver)
28}
29
30pub struct Sender<T> {
34 shared: SenderShared<StateExtension<T>>,
35}
36
37assert_impl_all!(Sender<String>: Clone, Send, Sync, fmt::Debug);
38
39impl<T> Clone for Sender<T> {
40 fn clone(&self) -> Self {
41 Self {
42 shared: self.shared.clone(),
43 }
44 }
45}
46
47impl<T> Sink for Sender<T> {
48 type Item = T;
49
50 fn poll_send(
51 self: std::pin::Pin<&mut Self>,
52 cx: &mut crate::Context<'_>,
53 mut value: Self::Item,
54 ) -> PollSend<Self::Item> {
55 loop {
56 if self.shared.is_closed() {
57 return PollSend::Rejected(value);
58 }
59
60 let queue = &self.shared.extension().queue;
61 let guard = self.shared.recv_guard();
62
63 match queue.push(value) {
64 Ok(_) => {
65 self.shared.notify_receivers();
66 return PollSend::Ready;
67 }
68 Err(v) => {
69 self.shared.subscribe_recv(cx);
70 if guard.is_expired() {
71 value = v;
72 continue;
73 }
74
75 return PollSend::Pending(v);
76 }
77 }
78 }
79 }
80}
81
82impl<T> fmt::Debug for Sender<T> {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("Sender").finish()
85 }
86}
87
88#[cfg(feature = "futures-traits")]
89mod impl_futures {
90 use crate::sink::SendError;
91 use std::task::Poll;
92
93 impl<T> futures::sink::Sink<T> for super::Sender<T> {
94 type Error = SendError<T>;
95
96 fn poll_ready(
97 self: std::pin::Pin<&mut Self>,
98 cx: &mut std::task::Context<'_>,
99 ) -> Poll<Result<(), Self::Error>> {
100 loop {
101 if self.shared.is_closed() {
102 return Poll::Ready(Ok(()));
103 }
104
105 let queue = &self.shared.extension().queue;
106 let guard = self.shared.recv_guard();
107
108 if queue.is_full() {
109 let mut cx = cx.into();
110 self.shared.subscribe_recv(&mut cx);
111
112 if guard.is_expired() {
113 continue;
114 }
115
116 return Poll::Pending;
117 } else {
118 return Poll::Ready(Ok(()));
119 }
120 }
121 }
122
123 fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
124 if self.shared.is_closed() {
125 return Err(SendError(item));
126 }
127
128 let result = self
129 .shared
130 .extension()
131 .queue
132 .push(item)
133 .map_err(|item| SendError(item));
134
135 if result.is_ok() {
136 self.shared.notify_receivers();
137 }
138
139 result
140 }
141
142 fn poll_flush(
143 self: std::pin::Pin<&mut Self>,
144 _cx: &mut std::task::Context<'_>,
145 ) -> Poll<Result<(), Self::Error>> {
146 Poll::Ready(Ok(()))
147 }
148
149 fn poll_close(
150 self: std::pin::Pin<&mut Self>,
151 _cx: &mut std::task::Context<'_>,
152 ) -> Poll<Result<(), Self::Error>> {
153 Poll::Ready(Ok(()))
154 }
155 }
156}
157
158impl<T> Sender<T> {
159 pub fn subscribe(&self) -> Receiver<T> {
161 Receiver {
162 shared: self.shared.clone_receiver(),
163 }
164 }
165}
166
167pub struct Receiver<T> {
171 shared: ReceiverShared<StateExtension<T>>,
172}
173
174assert_impl_all!(Receiver<SendMessage>: Clone, Send, Sync, fmt::Debug);
175
176impl<T> Stream for Receiver<T> {
177 type Item = T;
178
179 fn poll_recv(
180 self: std::pin::Pin<&mut Self>,
181 cx: &mut crate::Context<'_>,
182 ) -> PollRecv<Self::Item> {
183 loop {
184 let guard = self.shared.send_guard();
185 match self.shared.extension().queue.pop() {
186 Some(v) => {
187 self.shared.notify_senders();
188 return PollRecv::Ready(v);
189 }
190 None => {
191 if self.shared.is_closed() {
192 return PollRecv::Closed;
193 }
194
195 self.shared.subscribe_send(cx);
196 if guard.is_expired() {
197 continue;
198 }
199
200 return PollRecv::Pending;
201 }
202 }
203 }
204 }
205}
206
207impl<T> Clone for Receiver<T> {
208 fn clone(&self) -> Self {
209 Self {
210 shared: self.shared.clone(),
211 }
212 }
213}
214
215impl<T> fmt::Debug for Receiver<T> {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 f.debug_struct("Receiver").finish()
218 }
219}
220
221struct StateExtension<T> {
222 queue: ArrayQueue<T>,
223}
224
225impl<T> StateExtension<T> {
226 pub fn new(capacity: usize) -> Self {
227 Self {
228 queue: ArrayQueue::new(capacity),
229 }
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::{pin::Pin, task::Context};
236
237 use crate::{
238 sink::{PollSend, Sink},
239 stream::{PollRecv, Stream},
240 test::{noop_context, panic_context},
241 };
242 use futures_test::task::new_count_waker;
243
244 use super::{channel, Receiver, Sender};
245
246 fn pin<'a, 'b>(
247 chan: &mut (Sender<Message>, Receiver<Message>),
248 ) -> (Pin<&mut Sender<Message>>, Pin<&mut Receiver<Message>>) {
249 let tx = Pin::new(&mut chan.0);
250 let rx = Pin::new(&mut chan.1);
251
252 (tx, rx)
253 }
254
255 #[derive(Debug, PartialEq, Eq)]
256 struct Message(usize);
257
258 #[test]
259 fn send_accepted() {
260 let mut cx = panic_context();
261 let mut chan = channel(2);
262 let (tx, _) = pin(&mut chan);
263
264 assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
265 }
266
267 #[test]
268 fn send_blocks() {
269 let mut cx = panic_context();
270 let (mut tx, _rx) = channel(2);
271
272 assert_eq!(
273 PollSend::Ready,
274 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
275 );
276 assert_eq!(
277 PollSend::Ready,
278 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
279 );
280 }
281
282 #[test]
283 fn send_recv() {
284 let mut cx = panic_context();
285 let (mut tx, mut rx) = channel(2);
286
287 assert_eq!(
288 PollSend::Ready,
289 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
290 );
291 assert_eq!(
292 PollSend::Ready,
293 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
294 );
295 assert_eq!(
296 PollSend::Pending(Message(3)),
297 Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
298 );
299
300 assert_eq!(
301 PollRecv::Ready(Message(1)),
302 Pin::new(&mut rx).poll_recv(&mut cx)
303 );
304
305 assert_eq!(
306 PollRecv::Ready(Message(2)),
307 Pin::new(&mut rx).poll_recv(&mut cx)
308 );
309
310 assert_eq!(
311 PollRecv::Pending,
312 Pin::new(&mut rx).poll_recv(&mut noop_context())
313 );
314 }
315
316 #[test]
317 fn sender_disconnect() {
318 let mut cx = panic_context();
319 let (mut tx, mut rx) = channel(100);
320 let mut tx2 = tx.clone();
321
322 assert_eq!(
323 PollSend::Ready,
324 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
325 );
326
327 assert_eq!(
328 PollSend::Ready,
329 Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
330 );
331
332 drop(tx);
333 drop(tx2);
334
335 assert_eq!(
336 PollRecv::Ready(Message(1)),
337 Pin::new(&mut rx).poll_recv(&mut cx)
338 );
339
340 assert_eq!(
341 PollRecv::Ready(Message(2)),
342 Pin::new(&mut rx).poll_recv(&mut cx)
343 );
344
345 assert_eq!(PollRecv::Closed, Pin::new(&mut rx).poll_recv(&mut cx));
346 }
347
348 #[test]
349 fn receiver_disconnect() {
350 let mut cx = panic_context();
351 let (mut tx, rx) = channel(100);
352 let mut tx2 = tx.clone();
353
354 assert_eq!(
355 PollSend::Ready,
356 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
357 );
358
359 assert_eq!(
360 PollSend::Ready,
361 Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
362 );
363
364 drop(rx);
365
366 assert_eq!(
367 PollSend::Rejected(Message(3)),
368 Pin::new(&mut tx).poll_send(&mut cx, Message(3))
369 );
370
371 assert_eq!(
372 PollSend::Rejected(Message(4)),
373 Pin::new(&mut tx2).poll_send(&mut cx, Message(4))
374 );
375 }
376
377 #[test]
378 fn wake_sender() {
379 let mut cx = panic_context();
380 let (mut tx, mut rx) = channel(1);
381
382 assert_eq!(
383 PollSend::Ready,
384 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
385 );
386
387 let (w2, w2_count) = new_count_waker();
388 let w2_context = Context::from_waker(&w2);
389 assert_eq!(
390 PollSend::Pending(Message(2)),
391 Pin::new(&mut tx).poll_send(&mut w2_context.into(), Message(2))
392 );
393
394 assert_eq!(0, w2_count.get());
395
396 assert_eq!(
397 PollRecv::Ready(Message(1)),
398 Pin::new(&mut rx).poll_recv(&mut cx)
399 );
400
401 assert_eq!(1, w2_count.get());
402 assert_eq!(
403 PollRecv::Pending,
404 Pin::new(&mut rx).poll_recv(&mut noop_context())
405 );
406
407 assert_eq!(1, w2_count.get());
408 }
409
410 #[test]
411 fn wake_receiver() {
412 let mut cx = panic_context();
413 let (mut tx, mut rx) = channel(100);
414
415 let (w1, w1_count) = new_count_waker();
416 let w1_context = Context::from_waker(&w1);
417
418 assert_eq!(
419 PollRecv::Pending,
420 Pin::new(&mut rx).poll_recv(&mut w1_context.into())
421 );
422
423 assert_eq!(0, w1_count.get());
424
425 assert_eq!(
426 PollSend::Ready,
427 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
428 );
429
430 assert_eq!(1, w1_count.get());
431
432 assert_eq!(
433 PollSend::Ready,
434 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
435 );
436
437 assert_eq!(1, w1_count.get());
438 }
439
440 #[test]
441 fn wake_sender_on_disconnect() {
442 let (mut tx, rx) = channel(1);
443
444 let (w1, w1_count) = new_count_waker();
445 let w1_context = Context::from_waker(&w1);
446 let mut w1_context: crate::Context<'_> = w1_context.into();
447
448 assert_eq!(
449 PollSend::Ready,
450 Pin::new(&mut tx).poll_send(&mut w1_context, Message(1))
451 );
452
453 assert_eq!(
454 PollSend::Pending(Message(2)),
455 Pin::new(&mut tx).poll_send(&mut w1_context, Message(2))
456 );
457
458 assert_eq!(0, w1_count.get());
459
460 drop(rx);
461
462 assert_eq!(1, w1_count.get());
463 }
464
465 #[test]
466 fn wake_receivers_on_disconnect() {
467 let (tx, mut rx) = channel::<()>(100);
468 let mut rx2 = rx.clone();
469
470 let (w1, w1_count) = new_count_waker();
471 let w1_context = Context::from_waker(&w1);
472
473 let (w2, w2_count) = new_count_waker();
474 let w2_context = Context::from_waker(&w2);
475
476 assert_eq!(
477 PollRecv::Pending,
478 Pin::new(&mut rx).poll_recv(&mut w1_context.into())
479 );
480
481 assert_eq!(
482 PollRecv::Pending,
483 Pin::new(&mut rx2).poll_recv(&mut w2_context.into())
484 );
485
486 assert_eq!(0, w1_count.get());
487 assert_eq!(0, w2_count.get());
488
489 drop(tx);
490
491 assert_eq!(1, w1_count.get());
492 assert_eq!(1, w2_count.get());
493 }
494
495 #[test]
496 fn multi_receiver() {
497 let mut cx = noop_context();
498 let (mut tx, mut rx) = channel(100);
499 let mut rx2 = rx.clone();
500
501 assert_eq!(
502 PollSend::Ready,
503 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
504 );
505 assert_eq!(
506 PollSend::Ready,
507 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
508 );
509
510 assert_eq!(
511 PollRecv::Ready(Message(1)),
512 Pin::new(&mut rx).poll_recv(&mut cx)
513 );
514 assert_eq!(
515 PollRecv::Ready(Message(2)),
516 Pin::new(&mut rx2).poll_recv(&mut cx)
517 );
518
519 assert_eq!(PollRecv::Pending, Pin::new(&mut rx).poll_recv(&mut cx));
520 assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
521 }
522}
523
524#[cfg(test)]
525mod tokio_tests {
526 use std::time::Duration;
527
528 use tokio::{
529 task::{spawn, JoinHandle},
530 time::{sleep, timeout},
531 };
532
533 use crate::{
534 sink::Sink,
535 stream::Stream,
536 test::{
537 capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
538 CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
539 },
540 };
541
542 #[tokio::test(flavor = "multi_thread")]
543 async fn simple() {
544 for cap in capacity_iter() {
547 let (mut tx, mut rx) = super::channel(cap);
548
549 let join = spawn(async move {
550 for message in Message::new_iter(0) {
551 tx.send(message).await.expect("send failed");
552 }
553 });
554
555 let rx_handle = spawn(async move {
556 let mut channel = Channel::new(0);
557 while let Some(message) = rx.recv().await {
558 channel.assert_message(&message);
559 }
560 join.await.expect("Join failed");
561 });
562
563 timeout(TEST_TIMEOUT, rx_handle)
564 .await
565 .expect("test timeout")
566 .expect("join error");
567 }
568 }
569
570 #[tokio::test(flavor = "multi_thread")]
571 async fn multi_sender() {
572 for cap in capacity_iter() {
573 let (tx, mut rx) = super::channel(cap);
574
575 for i in 0..CHANNEL_TEST_SENDERS {
576 let mut tx2 = tx.clone();
577 spawn(async move {
578 for message in Message::new_multi_sender(i) {
579 tx2.send(message).await.expect("send failed");
580 }
581 });
582 }
583
584 drop(tx);
585
586 let rx_handle = spawn(async move {
587 let mut channel = Channels::new(CHANNEL_TEST_SENDERS);
588 while let Some(message) = rx.recv().await {
589 channel.assert_message(&message);
590 }
591 });
592
593 timeout(TEST_TIMEOUT, rx_handle)
594 .await
595 .expect("test timeout")
596 .expect("join error");
597 }
598 }
599
600 #[tokio::test(flavor = "multi_thread")]
601 async fn multi_receiver() {
602 for cap in capacity_iter() {
604 let (mut tx, rx) = super::channel(cap);
605
606 spawn(async move {
607 for message in Message::new_iter(0) {
608 tx.send(message).await.expect("send failed");
609 }
610 });
611
612 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
613 .map(|_| {
614 let mut rx2 = rx.clone();
615 let mut channels = Channels::new(1).allow_skips();
616
617 spawn(async move {
618 while let Some(message) = rx2.recv().await {
619 channels.assert_message(&message);
620 }
621 })
622 })
623 .collect();
624
625 drop(rx);
626
627 let rx_handle = spawn(async move {
628 for handle in handles {
629 handle.await.expect("Assertion failure");
630 }
631 });
632
633 timeout(TEST_TIMEOUT, rx_handle)
634 .await
635 .expect("test timeout")
636 .expect("join failure");
637 }
638 }
639
640 #[tokio::test(flavor = "multi_thread")]
641 async fn multi_sender_multi_receiver() {
642 for cap in capacity_iter() {
644 let (tx, rx) = super::channel(cap);
645
646 for i in 0..CHANNEL_TEST_SENDERS {
647 let mut tx2 = tx.clone();
648 spawn(async move {
649 for message in Message::new_multi_sender(i) {
650 tx2.send(message).await.expect("send failed");
651 }
652 });
653 }
654
655 drop(tx);
656
657 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
658 .map(|_i| {
659 let mut rx2 = rx.clone();
660 let mut channels = Channels::new(CHANNEL_TEST_SENDERS).allow_skips();
661
662 spawn(async move {
663 while let Some(message) = rx2.recv().await {
664 channels.assert_message(&message);
665 }
666 })
667 })
668 .collect();
669
670 drop(rx);
671
672 let rx_handle = spawn(async move {
673 for handle in handles {
674 handle.await.expect("Assertion failure");
675 }
676 });
677
678 timeout(TEST_TIMEOUT, rx_handle)
679 .await
680 .expect("test timeout")
681 .expect("join failure");
682 }
683 }
684
685 #[tokio::test(flavor = "multi_thread")]
686 async fn clone_monster() {
687 for cap in capacity_iter() {
688 let (tx, mut rx) = super::channel(cap);
694 let (mut barrier, mut sender_quit) = crate::barrier::channel();
695
696 let mut tx2 = tx.clone();
697 spawn(async move {
698 for message in Message::new_iter(0) {
699 tx2.send(message).await.expect("send failed");
700 }
701
702 barrier.send(()).await.expect("clone task shutdown failed");
703 });
704
705 spawn(async move {
706 loop {
707 if let Ok(_) = sender_quit.try_recv() {
708 break;
709 }
710
711 let tx2 = tx.clone();
712 let rx2 = tx.subscribe();
713 let rx3 = rx2.clone();
714 sleep(Duration::from_micros(100)).await;
715 drop(tx2);
716 drop(rx2);
717 drop(rx3);
718
719 sleep(Duration::from_micros(50)).await;
720 }
721 });
722
723 let rx_handle = spawn(async move {
724 let mut channel = Channel::new(0);
725
726 while let Some(message) = rx.recv().await {
727 channel.assert_message(&message);
728 }
729 });
730
731 timeout(TEST_TIMEOUT, rx_handle)
732 .await
733 .expect("test timeout")
734 .expect("join failed");
735 }
736 }
737}
738
739#[cfg(test)]
740mod async_std_tests {
741 use std::time::Duration;
742
743 use async_std::{
744 future::timeout,
745 task::{self, spawn, JoinHandle},
746 };
747
748 use crate::{
749 sink::Sink,
750 stream::Stream,
751 test::{
752 capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
753 CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
754 },
755 };
756
757 #[async_std::test]
758 async fn simple() {
759 for cap in capacity_iter() {
760 let (mut tx, mut rx) = super::channel(cap);
761
762 spawn(async move {
763 for message in Message::new_iter(0) {
764 tx.send(message).await.expect("send failed");
765 }
766 });
767
768 let rx_handle = spawn(async move {
769 let mut channel = Channel::new(0);
770 while let Some(message) = rx.recv().await {
771 channel.assert_message(&message);
772 }
773 });
774
775 timeout(TEST_TIMEOUT, rx_handle)
776 .await
777 .expect("test timeout");
778 }
779 }
780
781 #[async_std::test]
782 async fn multi_sender() {
783 for cap in capacity_iter() {
784 let (tx, mut rx) = super::channel(cap);
785
786 for i in 0..CHANNEL_TEST_SENDERS {
787 let mut tx2 = tx.clone();
788 spawn(async move {
789 for message in Message::new_multi_sender(i) {
790 tx2.send(message).await.expect("send failed");
791 }
792 });
793 }
794
795 drop(tx);
796
797 let rx_handle = spawn(async move {
798 let mut channel = Channels::new(CHANNEL_TEST_SENDERS);
799 while let Some(message) = rx.recv().await {
800 channel.assert_message(&message);
801 }
802 });
803
804 timeout(TEST_TIMEOUT, rx_handle)
805 .await
806 .expect("test timeout");
807 }
808 }
809
810 #[async_std::test]
811 async fn multi_receiver() {
812 for cap in capacity_iter() {
814 let (mut tx, rx) = super::channel(cap);
815
816 spawn(async move {
817 for message in Message::new_iter(0) {
818 tx.send(message).await.expect("send failed");
819 }
820 });
821
822 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
823 .map(|_| {
824 let mut rx2 = rx.clone();
825 let mut channels = Channels::new(1).allow_skips();
826
827 spawn(async move {
828 while let Some(message) = rx2.recv().await {
829 channels.assert_message(&message);
830 }
831 })
832 })
833 .collect();
834
835 drop(rx);
836
837 let rx_handle = spawn(async move {
838 for handle in handles {
839 handle.await;
840 }
841 });
842
843 timeout(TEST_TIMEOUT, rx_handle)
844 .await
845 .expect("test timeout");
846 }
847 }
848
849 #[async_std::test]
850 async fn multi_sender_multi_receiver() {
851 for cap in capacity_iter() {
854 let (tx, rx) = super::channel(cap);
855
856 for i in 0..CHANNEL_TEST_SENDERS {
857 let mut tx2 = tx.clone();
858 spawn(async move {
859 for message in Message::new_multi_sender(i) {
860 tx2.send(message).await.expect("send failed");
861 }
862 });
863 }
864
865 drop(tx);
866
867 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
868 .map(|_i| {
869 let mut rx2 = rx.clone();
870 let mut channels = Channels::new(CHANNEL_TEST_SENDERS).allow_skips();
871
872 spawn(async move {
873 while let Some(message) = rx2.recv().await {
874 channels.assert_message(&message);
875 }
876 })
877 })
878 .collect();
879
880 drop(rx);
881
882 let rx_handle = spawn(async move {
883 for handle in handles {
884 handle.await;
885 }
886 });
887
888 timeout(TEST_TIMEOUT, rx_handle)
889 .await
890 .expect("test timeout");
891 }
892 }
893
894 #[tokio::test(flavor = "multi_thread")]
895 async fn clone_monster() {
896 for cap in capacity_iter() {
899 let (tx, mut rx) = super::channel(cap);
900 let (mut barrier, mut sender_quit) = crate::barrier::channel();
901
902 let mut tx2 = tx.clone();
903 spawn(async move {
904 for message in Message::new_iter(0) {
905 tx2.send(message).await.expect("send failed");
906 }
907
908 barrier.send(()).await.expect("clone task shutdown failed");
909 });
910
911 spawn(async move {
912 loop {
913 if let Ok(_) = sender_quit.try_recv() {
914 break;
915 }
916
917 let tx2 = tx.clone();
918 let rx2 = tx.subscribe();
919 let rx3 = rx2.clone();
920 task::sleep(Duration::from_micros(100)).await;
921
922 drop(tx2);
923 drop(rx2);
924 drop(rx3);
925
926 task::sleep(Duration::from_micros(50)).await;
927 }
928 });
929
930 let rx_handle = spawn(async move {
931 let mut channel = Channel::new(0);
932
933 while let Some(message) = rx.recv().await {
934 channel.assert_message(&message);
935 }
936 });
937
938 timeout(TEST_TIMEOUT, rx_handle)
939 .await
940 .expect("test timeout");
941 }
942 }
943}