1use std::fmt;
7
8use super::SendMessage;
9use static_assertions::assert_impl_all;
10
11use crate::{
12 sink::{PollSend, Sink},
13 stream::{PollRecv, Stream},
14 sync::{
15 mpmc_circular_buffer::{BufferReader, MpmcCircularBuffer, TryRead, TryWrite},
16 shared, ReceiverShared, SenderShared,
17 },
18};
19
20pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
22 #[cfg(feature = "debug")]
23 log::error!("Creating broadcast channel with capacity {}", capacity);
24 let (buffer, reader) = MpmcCircularBuffer::new(capacity);
26
27 let (tx_shared, rx_shared) = shared(buffer);
28 let sender = Sender { shared: tx_shared };
29
30 let receiver = Receiver::new(rx_shared, reader);
31
32 (sender, receiver)
33}
34
35pub struct Sender<T> {
41 pub(in crate::channels::broadcast) shared: SenderShared<MpmcCircularBuffer<T>>,
42}
43
44unsafe impl<T: Send> Send for Sender<T> {}
45unsafe impl<T: Send> Sync for Sender<T> {}
46
47impl<T> Clone for Sender<T> {
48 fn clone(&self) -> Self {
49 Self {
50 shared: self.shared.clone(),
51 }
52 }
53}
54
55assert_impl_all!(Sender<SendMessage>: Send, Sync, Clone, fmt::Debug);
56
57impl<T> Sink for Sender<T>
58where
59 T: Clone,
60{
61 type Item = T;
62
63 fn poll_send(
64 self: std::pin::Pin<&mut Self>,
65 cx: &mut crate::Context<'_>,
66 value: Self::Item,
67 ) -> PollSend<Self::Item> {
68 if self.shared.is_closed() {
73 return PollSend::Rejected(value);
74 }
75
76 let buffer = self.shared.extension();
82 match buffer.try_write(value, cx) {
83 TryWrite::Pending(value) => PollSend::Pending(value),
84 TryWrite::Ready => PollSend::Ready,
85 }
86 }
87}
88
89impl<T> Sender<T> {
90 pub fn subscribe(&self) -> Receiver<T> {
95 let shared = self.shared.clone_receiver();
96 let reader = shared.extension().new_reader();
97 self.shared.notify_self();
98
99 Receiver::new(shared, reader)
100 }
101}
102
103impl<T> fmt::Debug for Sender<T> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("Sender").finish()
106 }
107}
108
109pub struct Receiver<T> {
113 shared: ReceiverShared<MpmcCircularBuffer<T>>,
114 reader: BufferReader,
115}
116
117unsafe impl<T: Send> Send for Receiver<T> {}
118unsafe impl<T: Send> Sync for Receiver<T> {}
119
120assert_impl_all!(Receiver<SendMessage>: Send, Sync, Clone, fmt::Debug);
121
122impl<T> Receiver<T> {
123 fn new(shared: ReceiverShared<MpmcCircularBuffer<T>>, reader: BufferReader) -> Self {
124 Self { shared, reader }
125 }
126}
127
128impl<T> Stream for Receiver<T>
129where
130 T: Clone,
131{
132 type Item = T;
133
134 fn poll_recv(
135 self: std::pin::Pin<&mut Self>,
136 cx: &mut crate::Context<'_>,
137 ) -> PollRecv<Self::Item> {
138 let this = self.get_mut();
140 let reader = &mut this.reader;
141 let buffer = this.shared.extension();
142
143 match reader.try_read(buffer, cx) {
144 TryRead::Pending => {
145 this.shared.subscribe_send(cx);
146
147 if this.shared.is_closed() {
148 return PollRecv::Closed;
149 }
150
151 PollRecv::Pending
152 }
153 TryRead::Ready(value) => PollRecv::Ready(value),
154 }
155 }
156}
157
158impl<T> Clone for Receiver<T> {
159 fn clone(&self) -> Self {
160 let buffer = self.shared.extension();
161 let reader = self.reader.clone_with(buffer);
162
163 Self::new(self.shared.clone(), reader)
164 }
165}
166
167impl<T> Drop for Receiver<T> {
168 fn drop(&mut self) {
169 let buffer = self.shared.extension();
170 self.reader.drop_with(buffer);
171 }
172}
173
174impl<T> fmt::Debug for Receiver<T> {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 f.debug_struct("Receiver").finish()
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::pin::Pin;
183
184 use crate::{
185 sink::{PollSend, Sink},
186 stream::{PollRecv, Stream},
187 test::{noop_context, panic_context},
188 Context,
189 };
190 use futures_test::task::new_count_waker;
191
192 use super::{channel, Receiver, Sender};
193
194 fn pin<'a, 'b>(
196 chan: &mut (Sender<Message>, Receiver<Message>),
197 ) -> (Pin<&mut Sender<Message>>, Pin<&mut Receiver<Message>>) {
198 let tx = Pin::new(&mut chan.0);
199 let rx = Pin::new(&mut chan.1);
200
201 (tx, rx)
202 }
203
204 #[derive(Clone, Debug, PartialEq, Eq)]
205 struct Message(usize);
206
207 #[test]
208 fn send_accepted() {
209 let mut cx = panic_context();
211 let mut chan = channel(2);
212 let (tx, _rx) = pin(&mut chan);
213
214 assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
215 }
216
217 #[test]
218 fn full_send_blocks() {
219 let mut cx = panic_context();
221 let (mut tx, _rx) = channel(2);
222
223 assert_eq!(
224 PollSend::Ready,
225 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
226 );
227
228 assert_eq!(
229 PollSend::Ready,
230 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
231 );
232
233 assert_eq!(
234 PollSend::Pending(Message(3)),
235 Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
236 );
237 }
238
239 #[test]
240 fn empty_send_recv() {
241 let mut cx = noop_context();
242 let (mut tx, mut rx) = channel(0);
243
244 assert_eq!(
245 PollSend::Ready,
246 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
247 );
248
249 assert_eq!(
250 PollRecv::Ready(Message(1)),
251 Pin::new(&mut rx).poll_recv(&mut cx)
252 );
253 }
254
255 #[test]
256 fn send_recv() {
257 let mut cx = noop_context();
258 let mut chan = channel(2);
259 let (tx, rx) = pin(&mut chan);
260
261 assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
262 assert_eq!(PollRecv::Ready(Message(1)), rx.poll_recv(&mut cx));
263 }
264
265 #[test]
266 fn sender_subscribe_same_read() {
267 let mut cx = noop_context();
269 let (mut tx, mut rx) = channel(2);
270
271 assert_eq!(
272 PollSend::Ready,
273 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
274 );
275 assert_eq!(
276 PollRecv::Ready(Message(1)),
277 Pin::new(&mut rx).poll_recv(&mut cx)
278 );
279
280 let mut rx2 = tx.subscribe();
281 assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
282 assert_eq!(
283 PollSend::Ready,
284 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
285 );
286 assert_eq!(
287 PollRecv::Ready(Message(2)),
288 Pin::new(&mut rx2).poll_recv(&mut cx)
289 );
290 }
291
292 #[test]
293 fn sender_subscribe_different_read() {
294 let mut cx = noop_context();
297 let (mut tx, _rx) = channel(2);
298
299 assert_eq!(
300 PollSend::Ready,
301 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
302 );
303
304 let mut rx2 = tx.subscribe();
305 assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
306 assert_eq!(
307 PollSend::Ready,
308 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
309 );
310 assert_eq!(
311 PollRecv::Ready(Message(2)),
312 Pin::new(&mut rx2).poll_recv(&mut cx)
313 );
314 }
315
316 #[test]
317 fn two_senders_recv() {
318 let mut cx = panic_context();
321 let (mut tx, mut rx) = channel(2);
322 let mut tx2 = tx.clone();
323
324 assert_eq!(
325 PollSend::Ready,
326 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
327 );
328 assert_eq!(
329 PollSend::Ready,
330 Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
331 );
332
333 assert_eq!(
334 PollSend::Pending(Message(3)),
335 Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
336 );
337 assert_eq!(
338 PollSend::Pending(Message(3)),
339 Pin::new(&mut tx2).poll_send(&mut noop_context(), Message(3))
340 );
341
342 assert_eq!(
343 PollRecv::Ready(Message(1)),
344 Pin::new(&mut rx).poll_recv(&mut cx)
345 );
346 assert_eq!(
347 PollRecv::Ready(Message(2)),
348 Pin::new(&mut rx).poll_recv(&mut cx)
349 );
350
351 assert_eq!(
352 PollRecv::Pending,
353 Pin::new(&mut rx).poll_recv(&mut noop_context())
354 );
355 }
356
357 #[test]
358 fn two_receivers() {
359 let mut cx = panic_context();
362 let (mut tx, mut rx) = channel(2);
363 let mut rx2 = rx.clone();
364
365 assert_eq!(
366 PollSend::Ready,
367 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
368 );
369 assert_eq!(
370 PollSend::Ready,
371 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
372 );
373 assert_eq!(
374 PollSend::Pending(Message(3)),
375 Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
376 );
377
378 assert_eq!(
379 PollRecv::Ready(Message(1)),
380 Pin::new(&mut rx).poll_recv(&mut cx)
381 );
382 assert_eq!(
383 PollRecv::Ready(Message(2)),
384 Pin::new(&mut rx).poll_recv(&mut cx)
385 );
386 assert_eq!(
387 PollRecv::Pending,
388 Pin::new(&mut rx).poll_recv(&mut noop_context())
389 );
390
391 assert_eq!(
392 PollRecv::Ready(Message(1)),
393 Pin::new(&mut rx2).poll_recv(&mut cx)
394 );
395 assert_eq!(
396 PollRecv::Ready(Message(2)),
397 Pin::new(&mut rx2).poll_recv(&mut cx)
398 );
399 assert_eq!(
400 PollRecv::Pending,
401 Pin::new(&mut rx2).poll_recv(&mut noop_context())
402 );
403 }
404
405 #[test]
406 fn sender_disconnect() {
407 let mut cx = panic_context();
408 let (mut tx, mut rx) = channel(100);
409 let mut tx2 = tx.clone();
410
411 assert_eq!(
412 PollSend::Ready,
413 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
414 );
415 assert_eq!(
416 PollSend::Ready,
417 Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
418 );
419 drop(tx);
420 drop(tx2);
421 assert_eq!(
422 PollRecv::Ready(Message(1)),
423 Pin::new(&mut rx).poll_recv(&mut cx)
424 );
425 assert_eq!(
426 PollRecv::Ready(Message(2)),
427 Pin::new(&mut rx).poll_recv(&mut cx)
428 );
429 assert_eq!(PollRecv::Closed, Pin::new(&mut rx).poll_recv(&mut cx));
430 }
431
432 #[test]
433 fn receiver_disconnect() {
434 let mut cx = panic_context();
435 let (mut tx, rx) = channel(100);
436 let rx2 = rx.clone();
437
438 assert_eq!(
439 PollSend::Ready,
440 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
441 );
442 drop(rx);
443 drop(rx2);
444 assert_eq!(
445 PollSend::Rejected(Message(2)),
446 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
447 );
448 }
449
450 #[test]
451 fn receiver_reconnect() {
452 let mut cx = panic_context();
453 let (mut tx, rx) = channel(100);
454
455 assert_eq!(
456 PollSend::Ready,
457 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
458 );
459 drop(rx);
460
461 let (w2, w2_count) = new_count_waker();
462 let mut w2_context = Context::from_waker(&w2);
463 assert_eq!(
464 PollSend::Rejected(Message(2)),
465 Pin::new(&mut tx).poll_send(&mut w2_context, Message(2))
466 );
467
468 let _rx = tx.subscribe();
469 assert_eq!(0, w2_count.get());
470 }
471
472 #[test]
473 fn wake_sender() {
474 let mut cx = panic_context();
477 let (mut tx, mut rx) = channel(2);
478
479 assert_eq!(
480 PollSend::Ready,
481 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
482 );
483
484 let (w2, w2_count) = new_count_waker();
485 let w2_context = Context::from_waker(&w2);
486 assert_eq!(
487 PollSend::Ready,
488 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
489 );
490 assert_eq!(
491 PollSend::Pending(Message(3)),
492 Pin::new(&mut tx).poll_send(&mut w2_context.into(), Message(3))
493 );
494
495 assert_eq!(0, w2_count.get());
496
497 assert_eq!(
498 PollRecv::Ready(Message(1)),
499 Pin::new(&mut rx).poll_recv(&mut cx)
500 );
501
502 assert_eq!(
503 PollRecv::Ready(Message(2)),
504 Pin::new(&mut rx).poll_recv(&mut cx)
505 );
506
507 assert_eq!(1, w2_count.get());
508 assert_eq!(
509 PollRecv::Pending,
510 Pin::new(&mut rx).poll_recv(&mut noop_context())
511 );
512
513 assert_eq!(1, w2_count.get());
514 }
515
516 #[test]
517 fn wake_receiver() {
518 let mut cx = panic_context();
519 let (mut tx, mut rx) = channel(100);
520
521 let (w1, w1_count) = new_count_waker();
522 let w1_context = Context::from_waker(&w1);
523
524 assert_eq!(
525 PollRecv::Pending,
526 Pin::new(&mut rx).poll_recv(&mut w1_context.into())
527 );
528
529 assert_eq!(0, w1_count.get());
530
531 assert_eq!(
532 PollSend::Ready,
533 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
534 );
535
536 assert_eq!(1, w1_count.get());
537
538 assert_eq!(
539 PollSend::Ready,
540 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
541 );
542
543 assert_eq!(1, w1_count.get());
544 }
545
546 #[test]
547 fn dropping_receiver_does_not_block() {
548 let mut cx = panic_context();
549 let (mut tx, mut rx) = channel(2);
550 let rx2 = rx.clone();
551
552 assert_eq!(
553 PollSend::Ready,
554 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
555 );
556
557 assert_eq!(
558 PollSend::Ready,
559 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
560 );
561
562 drop(rx2);
563 assert_eq!(
564 PollSend::Pending(Message(2)),
565 Pin::new(&mut tx).poll_send(&mut noop_context(), Message(2))
566 );
567
568 assert_eq!(
569 PollRecv::Ready(Message(1)),
570 Pin::new(&mut rx).poll_recv(&mut cx)
571 );
572
573 assert_eq!(
574 PollSend::Ready,
575 Pin::new(&mut tx).poll_send(&mut cx, Message(4))
576 );
577 }
578
579 #[test]
580 fn drop_receiver_frees_slot() {
581 let mut cx = panic_context();
583 let (mut tx, mut rx) = channel(2);
584 let rx2 = rx.clone();
585
586 assert_eq!(
587 PollSend::Ready,
588 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
589 );
590
591 assert_eq!(
592 PollSend::Ready,
593 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
594 );
595
596 assert_eq!(
597 PollRecv::Ready(Message(1)),
598 Pin::new(&mut rx).poll_recv(&mut cx)
599 );
600
601 drop(rx2);
602
603 assert_eq!(
604 PollSend::Ready,
605 Pin::new(&mut tx).poll_send(&mut cx, Message(3))
606 );
607 }
608
609 #[test]
610 fn wake_sender_on_disconnect() {
611 let (mut tx, rx) = channel(2);
612
613 let (w1, w1_count) = new_count_waker();
614 let w1_context = Context::from_waker(&w1);
615 let mut w1_context: crate::Context<'_> = w1_context.into();
616
617 assert_eq!(
618 PollSend::Ready,
619 Pin::new(&mut tx).poll_send(&mut w1_context, Message(1))
620 );
621
622 assert_eq!(
623 PollSend::Ready,
624 Pin::new(&mut tx).poll_send(&mut w1_context, Message(2))
625 );
626
627 assert_eq!(
628 PollSend::Pending(Message(3)),
629 Pin::new(&mut tx).poll_send(&mut w1_context, Message(3))
630 );
631
632 assert_eq!(0, w1_count.get());
633
634 drop(rx);
635
636 assert_eq!(1, w1_count.get());
637 }
638
639 #[test]
640 fn wake_receiver_on_disconnect() {
641 let (tx, mut rx) = channel::<()>(100);
642
643 let (w1, w1_count) = new_count_waker();
644 let w1_context = Context::from_waker(&w1);
645
646 assert_eq!(
647 PollRecv::Pending,
648 Pin::new(&mut rx).poll_recv(&mut w1_context.into())
649 );
650
651 assert_eq!(0, w1_count.get());
652
653 drop(tx);
654
655 assert_eq!(1, w1_count.get());
656 }
657
658 #[test]
659 fn reader_bounds_bug() {
660 let mut cx = noop_context();
661 let (mut tx, mut rx) = channel(2);
662
663 assert_eq!(
664 PollSend::Ready,
665 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
666 );
667 assert_eq!(
668 PollRecv::Ready(Message(1)),
669 Pin::new(&mut rx).poll_recv(&mut cx)
670 );
671
672 assert_eq!(
673 PollSend::Ready,
674 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
675 );
676 assert_eq!(
677 PollRecv::Ready(Message(2)),
678 Pin::new(&mut rx).poll_recv(&mut cx)
679 );
680 }
681
682 #[test]
683 fn drop_subscribe_bug() {
684 let mut cx = noop_context();
687 let (mut tx, rx) = channel(2);
688
689 drop(rx);
690 let mut rx2 = tx.subscribe();
691
692 assert_eq!(
693 PollSend::Ready,
694 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
695 );
696 assert_eq!(
697 PollRecv::Ready(Message(1)),
698 Pin::new(&mut rx2).poll_recv(&mut cx)
699 );
700 }
701
702 #[test]
703 fn skips_intermediate_bug() {
704 let mut cx = noop_context();
705 let (mut tx, rx) = channel(2);
706
707 drop(rx);
708 let mut rx2 = tx.subscribe();
709
710 assert_eq!(
711 PollSend::Ready,
712 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
713 );
714 assert_eq!(
715 PollSend::Ready,
716 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
717 );
718 assert_eq!(
719 PollRecv::Ready(Message(1)),
720 Pin::new(&mut rx2).poll_recv(&mut cx)
721 );
722 }
723
724 #[test]
725 fn drop_subscribe_ignores_queued() {
726 let mut cx = noop_context();
727 let (mut tx, rx) = channel(2);
728
729 assert_eq!(
730 PollSend::Ready,
731 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
732 );
733
734 drop(rx);
735 let mut rx2 = tx.subscribe();
736
737 assert_eq!(
738 PollSend::Ready,
739 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
740 );
741 assert_eq!(
742 PollRecv::Ready(Message(2)),
743 Pin::new(&mut rx2).poll_recv(&mut cx)
744 );
745 }
746
747 #[test]
748 fn drop_preserves_read() {
749 let mut cx = noop_context();
750 let (mut tx, mut rx) = channel(2);
751
752 let _rx_pin_message_1 = rx.clone();
753
754 assert_eq!(
755 PollSend::Ready,
756 Pin::new(&mut tx).poll_send(&mut cx, Message(1))
757 );
758
759 assert_eq!(
760 PollSend::Ready,
761 Pin::new(&mut tx).poll_send(&mut cx, Message(2))
762 );
763
764 assert_eq!(
765 PollRecv::Ready(Message(1)),
766 Pin::new(&mut rx).poll_recv(&mut cx)
767 );
768
769 let rx2 = rx.clone();
770 drop(rx2);
771
772 assert_eq!(
773 PollSend::Pending(Message(3)),
774 Pin::new(&mut tx).poll_send(&mut cx, Message(3))
775 );
776 }
777}
778
779#[cfg(test)]
780mod tokio_tests {
781 use std::time::Duration;
782
783 use tokio::{
784 task::{spawn, JoinHandle},
785 time::{self, timeout},
786 };
787
788 use crate::sink::Sink;
789 use crate::{
790 stream::{Stream, TryRecvError},
791 test::{
792 capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
793 CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
794 },
795 };
796
797 #[tokio::test(flavor = "multi_thread")]
798 async fn simple() {
799 for cap in capacity_iter() {
801 let (mut tx, mut rx) = super::channel(cap);
802
803 spawn(async move {
804 for message in Message::new_iter(0) {
805 tx.send(message).await.expect("send failed");
806 }
807 });
808
809 let rx_handle = spawn(async move {
810 let mut channel = Channel::new(0);
811 while let Some(message) = rx.recv().await {
812 channel.assert_message(&message);
813 }
814 });
815
816 timeout(TEST_TIMEOUT, rx_handle)
817 .await
818 .expect("test timeout")
819 .expect("join failure");
820 }
821 }
822
823 #[tokio::test(flavor = "multi_thread")]
824 async fn multi_sender() {
825 for cap in capacity_iter() {
827 let (tx, mut rx) = super::channel(cap);
828
829 for i in 0..CHANNEL_TEST_SENDERS {
830 let mut tx2 = tx.clone();
831 spawn(async move {
832 for message in Message::new_multi_sender(i) {
833 tx2.send(message).await.expect("send failed");
834 }
835 });
836 }
837
838 drop(tx);
839
840 let rx_handle = spawn(async move {
841 let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
842 while let Some(message) = rx.recv().await {
843 channels.assert_message(&message);
844 }
845 });
846
847 timeout(TEST_TIMEOUT, rx_handle)
848 .await
849 .expect("test timeout")
850 .expect("join failure");
851 }
852 }
853
854 #[tokio::test(flavor = "multi_thread")]
855 async fn multi_receiver() {
856 for cap in capacity_iter() {
858 let (mut tx, rx) = super::channel(cap);
859
860 spawn(async move {
861 for message in Message::new_iter(0) {
862 tx.send(message).await.expect("send failed");
863 }
864 });
865
866 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
867 .map(|_| {
868 let mut rx2 = rx.clone();
869 let mut channels = Channels::new(1);
870
871 spawn(async move {
872 while let Some(message) = rx2.recv().await {
873 channels.assert_message(&message);
874 }
875 })
876 })
877 .collect();
878
879 drop(rx);
880
881 let rx_handle = spawn(async move {
882 for handle in handles {
883 handle.await.expect("Assertion failure");
884 }
885 });
886
887 timeout(TEST_TIMEOUT, rx_handle)
888 .await
889 .expect("test timeout")
890 .expect("join failure");
891 }
892 }
893
894 #[tokio::test(flavor = "multi_thread")]
895 async fn multi_sender_multi_receiver() {
896 for cap in capacity_iter() {
898 let (tx, rx) = super::channel(cap);
899
900 for i in 0..CHANNEL_TEST_SENDERS {
901 let mut tx2 = tx.clone();
902 spawn(async move {
903 for message in Message::new_multi_sender(i) {
904 tx2.send(message).await.expect("send failed");
905 }
906 });
907 }
908
909 drop(tx);
910
911 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
912 .map(|_i| {
913 let mut rx2 = rx.clone();
914 let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
915
916 spawn(async move {
917 while let Some(message) = rx2.recv().await {
918 channels.assert_message(&message);
919 }
920 })
921 })
922 .collect();
923
924 drop(rx);
925
926 let rx_handle = spawn(async move {
927 for handle in handles {
928 handle.await.expect("Assertion failure");
929 }
930 });
931
932 timeout(TEST_TIMEOUT, rx_handle)
933 .await
934 .expect("test timeout")
935 .expect("join failure");
936 }
937 }
938
939 #[tokio::test(flavor = "multi_thread")]
940 async fn clone_monster() {
941 for cap in capacity_iter() {
944 let (tx, mut rx) = super::channel(cap);
945 let (mut barrier, mut sender_quit) = crate::barrier::channel();
946
947 let mut tx2 = tx.clone();
948 spawn(async move {
949 for message in Message::new_iter(0) {
950 tx2.send(message).await.expect("send failed");
951 }
952
953 barrier.send(()).await.expect("clone task shutdown failed");
954 });
955
956 let mut rx2 = rx.clone();
957 spawn(async move {
958 loop {
959 let next = rx2.try_recv();
960
961 if let Ok(_) = next {
962 continue;
963 }
964
965 if let Err(TryRecvError::Closed) = next {
966 break;
967 }
968
969 if let Ok(_) = sender_quit.try_recv() {
970 break;
971 }
972
973 let tx3 = tx.clone();
974 let rx3 = rx2.clone();
975 let rx4 = tx.subscribe();
976 time::sleep(Duration::from_micros(100)).await;
977 drop(tx3);
978 drop(rx3);
979 drop(rx4);
980 time::sleep(Duration::from_micros(50)).await;
981 }
982
983 drop(tx);
984 });
985
986 let rx_handle = spawn(async move {
987 let mut channel = Channel::new(0);
988 while let Some(message) = rx.recv().await {
989 channel.assert_message(&message);
990 }
991 });
992
993 timeout(TEST_TIMEOUT, rx_handle)
994 .await
995 .expect("test timeout")
996 .expect("join failure");
997 }
998 }
999}
1000
1001#[cfg(test)]
1002mod async_std_tests {
1003 use std::time::Duration;
1004
1005 use async_std::{
1006 future::timeout,
1007 task::{self, spawn, JoinHandle},
1008 };
1009
1010 use crate::{
1011 sink::Sink,
1012 stream::{Stream, TryRecvError},
1013 test::{
1014 capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
1015 CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
1016 },
1017 };
1018
1019 #[async_std::test]
1020 async fn simple() {
1021 crate::logging::enable_log();
1022 for cap in capacity_iter() {
1023 let (mut tx, mut rx) = super::channel(cap);
1024
1025 spawn(async move {
1026 for message in Message::new_iter(0) {
1027 tx.send(message).await.expect("send failed");
1028 }
1029 });
1030
1031 let rx_handle = spawn(async move {
1032 let mut channel = Channel::new(0);
1033 while let Some(message) = rx.recv().await {
1034 channel.assert_message(&message);
1035 }
1036 });
1037
1038 timeout(TEST_TIMEOUT, rx_handle)
1039 .await
1040 .expect("test timeout");
1041 }
1042 }
1043
1044 #[async_std::test]
1045 async fn multi_sender() {
1046 for cap in capacity_iter() {
1049 let (tx, mut rx) = super::channel(cap);
1050
1051 for i in 0..CHANNEL_TEST_SENDERS {
1052 let mut tx2 = tx.clone();
1053 spawn(async move {
1054 for message in Message::new_multi_sender(i) {
1055 tx2.send(message).await.expect("send failed");
1056 }
1057 });
1058 }
1059
1060 drop(tx);
1061
1062 let rx_handle = spawn(async move {
1063 let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
1064 while let Some(message) = rx.recv().await {
1065 channels.assert_message(&message);
1066 }
1067 });
1068
1069 timeout(TEST_TIMEOUT, rx_handle)
1070 .await
1071 .expect("test timeout");
1072 }
1073 }
1074
1075 #[async_std::test]
1076 async fn multi_receiver() {
1077 for cap in capacity_iter() {
1079 let (mut tx, rx) = super::channel(cap);
1080
1081 spawn(async move {
1082 for message in Message::new_iter(0) {
1083 tx.send(message).await.expect("send failed");
1084 }
1085 });
1086
1087 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
1088 .map(|_| {
1089 let mut rx2 = rx.clone();
1090 let mut channels = Channels::new(1);
1091
1092 spawn(async move {
1093 while let Some(message) = rx2.recv().await {
1094 channels.assert_message(&message);
1095 }
1096 })
1097 })
1098 .collect();
1099
1100 drop(rx);
1101
1102 let rx_handle = spawn(async move {
1103 for handle in handles {
1104 handle.await;
1105 }
1106 });
1107
1108 timeout(TEST_TIMEOUT, rx_handle)
1109 .await
1110 .expect("test timeout");
1111 }
1112 }
1113
1114 #[async_std::test]
1115 async fn multi_sender_multi_receiver() {
1116 for cap in capacity_iter() {
1119 let (tx, rx) = super::channel(cap);
1120
1121 for i in 0..CHANNEL_TEST_SENDERS {
1122 let mut tx2 = tx.clone();
1123 spawn(async move {
1124 for message in Message::new_multi_sender(i) {
1125 tx2.send(message).await.expect("send failed");
1126 }
1127 });
1128 }
1129
1130 drop(tx);
1131
1132 let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
1133 .map(|_i| {
1134 let mut rx2 = rx.clone();
1135 let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
1136
1137 spawn(async move {
1138 while let Some(message) = rx2.recv().await {
1139 channels.assert_message(&message);
1140 }
1141 })
1142 })
1143 .collect();
1144
1145 drop(rx);
1146
1147 let rx_handle = spawn(async move {
1148 for handle in handles {
1149 handle.await;
1150 }
1151 });
1152
1153 timeout(TEST_TIMEOUT, rx_handle)
1154 .await
1155 .expect("test timeout");
1156 }
1157 }
1158
1159 #[async_std::test]
1160 async fn clone_monster() {
1161 for cap in capacity_iter() {
1164 let (tx, mut rx) = super::channel(cap);
1165 let (mut barrier, mut sender_quit) = crate::barrier::channel();
1166
1167 let mut tx2 = tx.clone();
1168 spawn(async move {
1169 for message in Message::new_iter(0) {
1170 tx2.send(message).await.expect("send failed");
1171 }
1172
1173 barrier.send(()).await.expect("clone task shutdown failed");
1174 });
1175
1176 let mut rx2 = rx.clone();
1177 spawn(async move {
1178 loop {
1179 let next = rx2.try_recv();
1180
1181 if let Ok(_) = next {
1182 continue;
1183 }
1184
1185 if let Err(TryRecvError::Closed) = next {
1186 break;
1187 }
1188
1189 if let Ok(_) = sender_quit.try_recv() {
1190 break;
1191 }
1192
1193 let tx3 = tx.clone();
1194 let rx3 = rx2.clone();
1195 let rx4 = tx.subscribe();
1196 task::sleep(Duration::from_micros(100)).await;
1197 drop(tx3);
1198 drop(rx3);
1199 drop(rx4);
1200 task::sleep(Duration::from_micros(50)).await;
1201 }
1202
1203 drop(tx);
1204 });
1205
1206 let rx_handle = spawn(async move {
1207 let mut channel = Channel::new(0);
1208 while let Some(message) = rx.recv().await {
1209 channel.assert_message(&message);
1210 }
1211 });
1212
1213 timeout(TEST_TIMEOUT, rx_handle)
1214 .await
1215 .expect("test timeout");
1216 }
1217 }
1218}