postage/channels/
broadcast.rs

1//! Provides a lossless, MPMC channel.  All receivers are guaranteed to recieve each message.
2//!
3//! When a receiver is cloned, the new receive will observe the same series of messages as the original.
4//! When a receiver is created with `Sender::subscribe`, it will observe new messages.
5
6use std::fmt;
7
8use super::SendMessage;
9use static_assertions::assert_impl_all;
10
11use crate::{
12    sink::{PollSend, Sink},
13    stream::{PollRecv, Stream},
14    sync::{
15        mpmc_circular_buffer::{BufferReader, MpmcCircularBuffer, TryRead, TryWrite},
16        shared, ReceiverShared, SenderShared,
17    },
18};
19
20/// Constructs a pair of broadcast endpoints, with a fixed-size buffer of the given capacity
21pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
22    #[cfg(feature = "debug")]
23    log::error!("Creating broadcast channel with capacity {}", capacity);
24    // we add one spare capacity so that receivers have an empty slot to wait on
25    let (buffer, reader) = MpmcCircularBuffer::new(capacity);
26
27    let (tx_shared, rx_shared) = shared(buffer);
28    let sender = Sender { shared: tx_shared };
29
30    let receiver = Receiver::new(rx_shared, reader);
31
32    (sender, receiver)
33}
34
35/// A broadcast sender that can be used with the postage::Sink trait.  Can be cloned.
36///
37/// The sender task is suspended when the internal buffer is filled.
38///
39/// Note: no implementation of the `futures::Sink` trait is provided for the broadcast Sender.
40pub struct Sender<T> {
41    pub(in crate::channels::broadcast) shared: SenderShared<MpmcCircularBuffer<T>>,
42}
43
44unsafe impl<T: Send> Send for Sender<T> {}
45unsafe impl<T: Send> Sync for Sender<T> {}
46
47impl<T> Clone for Sender<T> {
48    fn clone(&self) -> Self {
49        Self {
50            shared: self.shared.clone(),
51        }
52    }
53}
54
55assert_impl_all!(Sender<SendMessage>: Send, Sync, Clone, fmt::Debug);
56
57impl<T> Sink for Sender<T>
58where
59    T: Clone,
60{
61    type Item = T;
62
63    fn poll_send(
64        self: std::pin::Pin<&mut Self>,
65        cx: &mut crate::Context<'_>,
66        value: Self::Item,
67    ) -> PollSend<Self::Item> {
68        // if all receivers have disconnected, we return Rejected like other channels.
69        // tx.subscribe() can be used to produce a new receiver.
70        // however, it would not receive this item, as it would need to be called
71        //   before the message is sent.
72        if self.shared.is_closed() {
73            return PollSend::Rejected(value);
74        }
75
76        // start at the head
77        // if the next element has references,
78        //   register for wakeup
79        // else
80        //   overwrite the element
81        let buffer = self.shared.extension();
82        match buffer.try_write(value, cx) {
83            TryWrite::Pending(value) => PollSend::Pending(value),
84            TryWrite::Ready => PollSend::Ready,
85        }
86    }
87}
88
89impl<T> Sender<T> {
90    /// Subscribes to the channel, creating a new receiver.  The receiver
91    /// will observe all messages sent after the call to subscribe.
92    ///
93    /// Messages currently in the buffer are not received.
94    pub fn subscribe(&self) -> Receiver<T> {
95        let shared = self.shared.clone_receiver();
96        let reader = shared.extension().new_reader();
97        self.shared.notify_self();
98
99        Receiver::new(shared, reader)
100    }
101}
102
103impl<T> fmt::Debug for Sender<T> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_struct("Sender").finish()
106    }
107}
108
109/// A broadcast receiver that can be used with the postage::Stream trait.
110///
111/// When cloned, the new receiver will begin processing messages at the same location as the original.
112pub struct Receiver<T> {
113    shared: ReceiverShared<MpmcCircularBuffer<T>>,
114    reader: BufferReader,
115}
116
117unsafe impl<T: Send> Send for Receiver<T> {}
118unsafe impl<T: Send> Sync for Receiver<T> {}
119
120assert_impl_all!(Receiver<SendMessage>: Send, Sync, Clone, fmt::Debug);
121
122impl<T> Receiver<T> {
123    fn new(shared: ReceiverShared<MpmcCircularBuffer<T>>, reader: BufferReader) -> Self {
124        Self { shared, reader }
125    }
126}
127
128impl<T> Stream for Receiver<T>
129where
130    T: Clone,
131{
132    type Item = T;
133
134    fn poll_recv(
135        self: std::pin::Pin<&mut Self>,
136        cx: &mut crate::Context<'_>,
137    ) -> PollRecv<Self::Item> {
138        // unpin self, so Rust can infer that the borrows of reader and buffer are disjoint
139        let this = self.get_mut();
140        let reader = &mut this.reader;
141        let buffer = this.shared.extension();
142
143        match reader.try_read(buffer, cx) {
144            TryRead::Pending => {
145                this.shared.subscribe_send(cx);
146
147                if this.shared.is_closed() {
148                    return PollRecv::Closed;
149                }
150
151                PollRecv::Pending
152            }
153            TryRead::Ready(value) => PollRecv::Ready(value),
154        }
155    }
156}
157
158impl<T> Clone for Receiver<T> {
159    fn clone(&self) -> Self {
160        let buffer = self.shared.extension();
161        let reader = self.reader.clone_with(buffer);
162
163        Self::new(self.shared.clone(), reader)
164    }
165}
166
167impl<T> Drop for Receiver<T> {
168    fn drop(&mut self) {
169        let buffer = self.shared.extension();
170        self.reader.drop_with(buffer);
171    }
172}
173
174impl<T> fmt::Debug for Receiver<T> {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("Receiver").finish()
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::pin::Pin;
183
184    use crate::{
185        sink::{PollSend, Sink},
186        stream::{PollRecv, Stream},
187        test::{noop_context, panic_context},
188        Context,
189    };
190    use futures_test::task::new_count_waker;
191
192    use super::{channel, Receiver, Sender};
193
194    //TODO: add test covering rx location when cloned on an in-progress channel (exercising tail)
195    fn pin<'a, 'b>(
196        chan: &mut (Sender<Message>, Receiver<Message>),
197    ) -> (Pin<&mut Sender<Message>>, Pin<&mut Receiver<Message>>) {
198        let tx = Pin::new(&mut chan.0);
199        let rx = Pin::new(&mut chan.1);
200
201        (tx, rx)
202    }
203
204    #[derive(Clone, Debug, PartialEq, Eq)]
205    struct Message(usize);
206
207    #[test]
208    fn send_accepted() {
209        // crate::logging::enable_log();
210        let mut cx = panic_context();
211        let mut chan = channel(2);
212        let (tx, _rx) = pin(&mut chan);
213
214        assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
215    }
216
217    #[test]
218    fn full_send_blocks() {
219        // SimpleLogger::new().init().unwrap();
220        let mut cx = panic_context();
221        let (mut tx, _rx) = channel(2);
222
223        assert_eq!(
224            PollSend::Ready,
225            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
226        );
227
228        assert_eq!(
229            PollSend::Ready,
230            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
231        );
232
233        assert_eq!(
234            PollSend::Pending(Message(3)),
235            Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
236        );
237    }
238
239    #[test]
240    fn empty_send_recv() {
241        let mut cx = noop_context();
242        let (mut tx, mut rx) = channel(0);
243
244        assert_eq!(
245            PollSend::Ready,
246            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
247        );
248
249        assert_eq!(
250            PollRecv::Ready(Message(1)),
251            Pin::new(&mut rx).poll_recv(&mut cx)
252        );
253    }
254
255    #[test]
256    fn send_recv() {
257        let mut cx = noop_context();
258        let mut chan = channel(2);
259        let (tx, rx) = pin(&mut chan);
260
261        assert_eq!(PollSend::Ready, tx.poll_send(&mut cx, Message(1)));
262        assert_eq!(PollRecv::Ready(Message(1)), rx.poll_recv(&mut cx));
263    }
264
265    #[test]
266    fn sender_subscribe_same_read() {
267        // crate::logging::enable_log();
268        let mut cx = noop_context();
269        let (mut tx, mut rx) = channel(2);
270
271        assert_eq!(
272            PollSend::Ready,
273            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
274        );
275        assert_eq!(
276            PollRecv::Ready(Message(1)),
277            Pin::new(&mut rx).poll_recv(&mut cx)
278        );
279
280        let mut rx2 = tx.subscribe();
281        assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
282        assert_eq!(
283            PollSend::Ready,
284            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
285        );
286        assert_eq!(
287            PollRecv::Ready(Message(2)),
288            Pin::new(&mut rx2).poll_recv(&mut cx)
289        );
290    }
291
292    #[test]
293    fn sender_subscribe_different_read() {
294        // SimpleLogger::new().init().unwrap();
295
296        let mut cx = noop_context();
297        let (mut tx, _rx) = channel(2);
298
299        assert_eq!(
300            PollSend::Ready,
301            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
302        );
303
304        let mut rx2 = tx.subscribe();
305        assert_eq!(PollRecv::Pending, Pin::new(&mut rx2).poll_recv(&mut cx));
306        assert_eq!(
307            PollSend::Ready,
308            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
309        );
310        assert_eq!(
311            PollRecv::Ready(Message(2)),
312            Pin::new(&mut rx2).poll_recv(&mut cx)
313        );
314    }
315
316    #[test]
317    fn two_senders_recv() {
318        // SimpleLogger::new().init().unwrap();
319
320        let mut cx = panic_context();
321        let (mut tx, mut rx) = channel(2);
322        let mut tx2 = tx.clone();
323
324        assert_eq!(
325            PollSend::Ready,
326            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
327        );
328        assert_eq!(
329            PollSend::Ready,
330            Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
331        );
332
333        assert_eq!(
334            PollSend::Pending(Message(3)),
335            Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
336        );
337        assert_eq!(
338            PollSend::Pending(Message(3)),
339            Pin::new(&mut tx2).poll_send(&mut noop_context(), Message(3))
340        );
341
342        assert_eq!(
343            PollRecv::Ready(Message(1)),
344            Pin::new(&mut rx).poll_recv(&mut cx)
345        );
346        assert_eq!(
347            PollRecv::Ready(Message(2)),
348            Pin::new(&mut rx).poll_recv(&mut cx)
349        );
350
351        assert_eq!(
352            PollRecv::Pending,
353            Pin::new(&mut rx).poll_recv(&mut noop_context())
354        );
355    }
356
357    #[test]
358    fn two_receivers() {
359        // SimpleLogger::new().init().unwrap();
360
361        let mut cx = panic_context();
362        let (mut tx, mut rx) = channel(2);
363        let mut rx2 = rx.clone();
364
365        assert_eq!(
366            PollSend::Ready,
367            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
368        );
369        assert_eq!(
370            PollSend::Ready,
371            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
372        );
373        assert_eq!(
374            PollSend::Pending(Message(3)),
375            Pin::new(&mut tx).poll_send(&mut noop_context(), Message(3))
376        );
377
378        assert_eq!(
379            PollRecv::Ready(Message(1)),
380            Pin::new(&mut rx).poll_recv(&mut cx)
381        );
382        assert_eq!(
383            PollRecv::Ready(Message(2)),
384            Pin::new(&mut rx).poll_recv(&mut cx)
385        );
386        assert_eq!(
387            PollRecv::Pending,
388            Pin::new(&mut rx).poll_recv(&mut noop_context())
389        );
390
391        assert_eq!(
392            PollRecv::Ready(Message(1)),
393            Pin::new(&mut rx2).poll_recv(&mut cx)
394        );
395        assert_eq!(
396            PollRecv::Ready(Message(2)),
397            Pin::new(&mut rx2).poll_recv(&mut cx)
398        );
399        assert_eq!(
400            PollRecv::Pending,
401            Pin::new(&mut rx2).poll_recv(&mut noop_context())
402        );
403    }
404
405    #[test]
406    fn sender_disconnect() {
407        let mut cx = panic_context();
408        let (mut tx, mut rx) = channel(100);
409        let mut tx2 = tx.clone();
410
411        assert_eq!(
412            PollSend::Ready,
413            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
414        );
415        assert_eq!(
416            PollSend::Ready,
417            Pin::new(&mut tx2).poll_send(&mut cx, Message(2))
418        );
419        drop(tx);
420        drop(tx2);
421        assert_eq!(
422            PollRecv::Ready(Message(1)),
423            Pin::new(&mut rx).poll_recv(&mut cx)
424        );
425        assert_eq!(
426            PollRecv::Ready(Message(2)),
427            Pin::new(&mut rx).poll_recv(&mut cx)
428        );
429        assert_eq!(PollRecv::Closed, Pin::new(&mut rx).poll_recv(&mut cx));
430    }
431
432    #[test]
433    fn receiver_disconnect() {
434        let mut cx = panic_context();
435        let (mut tx, rx) = channel(100);
436        let rx2 = rx.clone();
437
438        assert_eq!(
439            PollSend::Ready,
440            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
441        );
442        drop(rx);
443        drop(rx2);
444        assert_eq!(
445            PollSend::Rejected(Message(2)),
446            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
447        );
448    }
449
450    #[test]
451    fn receiver_reconnect() {
452        let mut cx = panic_context();
453        let (mut tx, rx) = channel(100);
454
455        assert_eq!(
456            PollSend::Ready,
457            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
458        );
459        drop(rx);
460
461        let (w2, w2_count) = new_count_waker();
462        let mut w2_context = Context::from_waker(&w2);
463        assert_eq!(
464            PollSend::Rejected(Message(2)),
465            Pin::new(&mut tx).poll_send(&mut w2_context, Message(2))
466        );
467
468        let _rx = tx.subscribe();
469        assert_eq!(0, w2_count.get());
470    }
471
472    #[test]
473    fn wake_sender() {
474        // SimpleLogger::new().init().unwrap();
475
476        let mut cx = panic_context();
477        let (mut tx, mut rx) = channel(2);
478
479        assert_eq!(
480            PollSend::Ready,
481            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
482        );
483
484        let (w2, w2_count) = new_count_waker();
485        let w2_context = Context::from_waker(&w2);
486        assert_eq!(
487            PollSend::Ready,
488            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
489        );
490        assert_eq!(
491            PollSend::Pending(Message(3)),
492            Pin::new(&mut tx).poll_send(&mut w2_context.into(), Message(3))
493        );
494
495        assert_eq!(0, w2_count.get());
496
497        assert_eq!(
498            PollRecv::Ready(Message(1)),
499            Pin::new(&mut rx).poll_recv(&mut cx)
500        );
501
502        assert_eq!(
503            PollRecv::Ready(Message(2)),
504            Pin::new(&mut rx).poll_recv(&mut cx)
505        );
506
507        assert_eq!(1, w2_count.get());
508        assert_eq!(
509            PollRecv::Pending,
510            Pin::new(&mut rx).poll_recv(&mut noop_context())
511        );
512
513        assert_eq!(1, w2_count.get());
514    }
515
516    #[test]
517    fn wake_receiver() {
518        let mut cx = panic_context();
519        let (mut tx, mut rx) = channel(100);
520
521        let (w1, w1_count) = new_count_waker();
522        let w1_context = Context::from_waker(&w1);
523
524        assert_eq!(
525            PollRecv::Pending,
526            Pin::new(&mut rx).poll_recv(&mut w1_context.into())
527        );
528
529        assert_eq!(0, w1_count.get());
530
531        assert_eq!(
532            PollSend::Ready,
533            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
534        );
535
536        assert_eq!(1, w1_count.get());
537
538        assert_eq!(
539            PollSend::Ready,
540            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
541        );
542
543        assert_eq!(1, w1_count.get());
544    }
545
546    #[test]
547    fn dropping_receiver_does_not_block() {
548        let mut cx = panic_context();
549        let (mut tx, mut rx) = channel(2);
550        let rx2 = rx.clone();
551
552        assert_eq!(
553            PollSend::Ready,
554            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
555        );
556
557        assert_eq!(
558            PollSend::Ready,
559            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
560        );
561
562        drop(rx2);
563        assert_eq!(
564            PollSend::Pending(Message(2)),
565            Pin::new(&mut tx).poll_send(&mut noop_context(), Message(2))
566        );
567
568        assert_eq!(
569            PollRecv::Ready(Message(1)),
570            Pin::new(&mut rx).poll_recv(&mut cx)
571        );
572
573        assert_eq!(
574            PollSend::Ready,
575            Pin::new(&mut tx).poll_send(&mut cx, Message(4))
576        );
577    }
578
579    #[test]
580    fn drop_receiver_frees_slot() {
581        // crate::logging::enable_log();
582        let mut cx = panic_context();
583        let (mut tx, mut rx) = channel(2);
584        let rx2 = rx.clone();
585
586        assert_eq!(
587            PollSend::Ready,
588            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
589        );
590
591        assert_eq!(
592            PollSend::Ready,
593            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
594        );
595
596        assert_eq!(
597            PollRecv::Ready(Message(1)),
598            Pin::new(&mut rx).poll_recv(&mut cx)
599        );
600
601        drop(rx2);
602
603        assert_eq!(
604            PollSend::Ready,
605            Pin::new(&mut tx).poll_send(&mut cx, Message(3))
606        );
607    }
608
609    #[test]
610    fn wake_sender_on_disconnect() {
611        let (mut tx, rx) = channel(2);
612
613        let (w1, w1_count) = new_count_waker();
614        let w1_context = Context::from_waker(&w1);
615        let mut w1_context: crate::Context<'_> = w1_context.into();
616
617        assert_eq!(
618            PollSend::Ready,
619            Pin::new(&mut tx).poll_send(&mut w1_context, Message(1))
620        );
621
622        assert_eq!(
623            PollSend::Ready,
624            Pin::new(&mut tx).poll_send(&mut w1_context, Message(2))
625        );
626
627        assert_eq!(
628            PollSend::Pending(Message(3)),
629            Pin::new(&mut tx).poll_send(&mut w1_context, Message(3))
630        );
631
632        assert_eq!(0, w1_count.get());
633
634        drop(rx);
635
636        assert_eq!(1, w1_count.get());
637    }
638
639    #[test]
640    fn wake_receiver_on_disconnect() {
641        let (tx, mut rx) = channel::<()>(100);
642
643        let (w1, w1_count) = new_count_waker();
644        let w1_context = Context::from_waker(&w1);
645
646        assert_eq!(
647            PollRecv::Pending,
648            Pin::new(&mut rx).poll_recv(&mut w1_context.into())
649        );
650
651        assert_eq!(0, w1_count.get());
652
653        drop(tx);
654
655        assert_eq!(1, w1_count.get());
656    }
657
658    #[test]
659    fn reader_bounds_bug() {
660        let mut cx = noop_context();
661        let (mut tx, mut rx) = channel(2);
662
663        assert_eq!(
664            PollSend::Ready,
665            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
666        );
667        assert_eq!(
668            PollRecv::Ready(Message(1)),
669            Pin::new(&mut rx).poll_recv(&mut cx)
670        );
671
672        assert_eq!(
673            PollSend::Ready,
674            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
675        );
676        assert_eq!(
677            PollRecv::Ready(Message(2)),
678            Pin::new(&mut rx).poll_recv(&mut cx)
679        );
680    }
681
682    #[test]
683    fn drop_subscribe_bug() {
684        // SimpleLogger::new().init().unwrap();
685
686        let mut cx = noop_context();
687        let (mut tx, rx) = channel(2);
688
689        drop(rx);
690        let mut rx2 = tx.subscribe();
691
692        assert_eq!(
693            PollSend::Ready,
694            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
695        );
696        assert_eq!(
697            PollRecv::Ready(Message(1)),
698            Pin::new(&mut rx2).poll_recv(&mut cx)
699        );
700    }
701
702    #[test]
703    fn skips_intermediate_bug() {
704        let mut cx = noop_context();
705        let (mut tx, rx) = channel(2);
706
707        drop(rx);
708        let mut rx2 = tx.subscribe();
709
710        assert_eq!(
711            PollSend::Ready,
712            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
713        );
714        assert_eq!(
715            PollSend::Ready,
716            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
717        );
718        assert_eq!(
719            PollRecv::Ready(Message(1)),
720            Pin::new(&mut rx2).poll_recv(&mut cx)
721        );
722    }
723
724    #[test]
725    fn drop_subscribe_ignores_queued() {
726        let mut cx = noop_context();
727        let (mut tx, rx) = channel(2);
728
729        assert_eq!(
730            PollSend::Ready,
731            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
732        );
733
734        drop(rx);
735        let mut rx2 = tx.subscribe();
736
737        assert_eq!(
738            PollSend::Ready,
739            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
740        );
741        assert_eq!(
742            PollRecv::Ready(Message(2)),
743            Pin::new(&mut rx2).poll_recv(&mut cx)
744        );
745    }
746
747    #[test]
748    fn drop_preserves_read() {
749        let mut cx = noop_context();
750        let (mut tx, mut rx) = channel(2);
751
752        let _rx_pin_message_1 = rx.clone();
753
754        assert_eq!(
755            PollSend::Ready,
756            Pin::new(&mut tx).poll_send(&mut cx, Message(1))
757        );
758
759        assert_eq!(
760            PollSend::Ready,
761            Pin::new(&mut tx).poll_send(&mut cx, Message(2))
762        );
763
764        assert_eq!(
765            PollRecv::Ready(Message(1)),
766            Pin::new(&mut rx).poll_recv(&mut cx)
767        );
768
769        let rx2 = rx.clone();
770        drop(rx2);
771
772        assert_eq!(
773            PollSend::Pending(Message(3)),
774            Pin::new(&mut tx).poll_send(&mut cx, Message(3))
775        );
776    }
777}
778
779#[cfg(test)]
780mod tokio_tests {
781    use std::time::Duration;
782
783    use tokio::{
784        task::{spawn, JoinHandle},
785        time::{self, timeout},
786    };
787
788    use crate::sink::Sink;
789    use crate::{
790        stream::{Stream, TryRecvError},
791        test::{
792            capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
793            CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
794        },
795    };
796
797    #[tokio::test(flavor = "multi_thread")]
798    async fn simple() {
799        // crate::logging::enable_log();
800        for cap in capacity_iter() {
801            let (mut tx, mut rx) = super::channel(cap);
802
803            spawn(async move {
804                for message in Message::new_iter(0) {
805                    tx.send(message).await.expect("send failed");
806                }
807            });
808
809            let rx_handle = spawn(async move {
810                let mut channel = Channel::new(0);
811                while let Some(message) = rx.recv().await {
812                    channel.assert_message(&message);
813                }
814            });
815
816            timeout(TEST_TIMEOUT, rx_handle)
817                .await
818                .expect("test timeout")
819                .expect("join failure");
820        }
821    }
822
823    #[tokio::test(flavor = "multi_thread")]
824    async fn multi_sender() {
825        // crate::logging::enable_log();
826        for cap in capacity_iter() {
827            let (tx, mut rx) = super::channel(cap);
828
829            for i in 0..CHANNEL_TEST_SENDERS {
830                let mut tx2 = tx.clone();
831                spawn(async move {
832                    for message in Message::new_multi_sender(i) {
833                        tx2.send(message).await.expect("send failed");
834                    }
835                });
836            }
837
838            drop(tx);
839
840            let rx_handle = spawn(async move {
841                let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
842                while let Some(message) = rx.recv().await {
843                    channels.assert_message(&message);
844                }
845            });
846
847            timeout(TEST_TIMEOUT, rx_handle)
848                .await
849                .expect("test timeout")
850                .expect("join failure");
851        }
852    }
853
854    #[tokio::test(flavor = "multi_thread")]
855    async fn multi_receiver() {
856        // crate::logging::enable_log();
857        for cap in capacity_iter() {
858            let (mut tx, rx) = super::channel(cap);
859
860            spawn(async move {
861                for message in Message::new_iter(0) {
862                    tx.send(message).await.expect("send failed");
863                }
864            });
865
866            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
867                .map(|_| {
868                    let mut rx2 = rx.clone();
869                    let mut channels = Channels::new(1);
870
871                    spawn(async move {
872                        while let Some(message) = rx2.recv().await {
873                            channels.assert_message(&message);
874                        }
875                    })
876                })
877                .collect();
878
879            drop(rx);
880
881            let rx_handle = spawn(async move {
882                for handle in handles {
883                    handle.await.expect("Assertion failure");
884                }
885            });
886
887            timeout(TEST_TIMEOUT, rx_handle)
888                .await
889                .expect("test timeout")
890                .expect("join failure");
891        }
892    }
893
894    #[tokio::test(flavor = "multi_thread")]
895    async fn multi_sender_multi_receiver() {
896        // crate::logging::enable_log();
897        for cap in capacity_iter() {
898            let (tx, rx) = super::channel(cap);
899
900            for i in 0..CHANNEL_TEST_SENDERS {
901                let mut tx2 = tx.clone();
902                spawn(async move {
903                    for message in Message::new_multi_sender(i) {
904                        tx2.send(message).await.expect("send failed");
905                    }
906                });
907            }
908
909            drop(tx);
910
911            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
912                .map(|_i| {
913                    let mut rx2 = rx.clone();
914                    let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
915
916                    spawn(async move {
917                        while let Some(message) = rx2.recv().await {
918                            channels.assert_message(&message);
919                        }
920                    })
921                })
922                .collect();
923
924            drop(rx);
925
926            let rx_handle = spawn(async move {
927                for handle in handles {
928                    handle.await.expect("Assertion failure");
929                }
930            });
931
932            timeout(TEST_TIMEOUT, rx_handle)
933                .await
934                .expect("test timeout")
935                .expect("join failure");
936        }
937    }
938
939    #[tokio::test(flavor = "multi_thread")]
940    async fn clone_monster() {
941        // crate::logging::enable_log();
942
943        for cap in capacity_iter() {
944            let (tx, mut rx) = super::channel(cap);
945            let (mut barrier, mut sender_quit) = crate::barrier::channel();
946
947            let mut tx2 = tx.clone();
948            spawn(async move {
949                for message in Message::new_iter(0) {
950                    tx2.send(message).await.expect("send failed");
951                }
952
953                barrier.send(()).await.expect("clone task shutdown failed");
954            });
955
956            let mut rx2 = rx.clone();
957            spawn(async move {
958                loop {
959                    let next = rx2.try_recv();
960
961                    if let Ok(_) = next {
962                        continue;
963                    }
964
965                    if let Err(TryRecvError::Closed) = next {
966                        break;
967                    }
968
969                    if let Ok(_) = sender_quit.try_recv() {
970                        break;
971                    }
972
973                    let tx3 = tx.clone();
974                    let rx3 = rx2.clone();
975                    let rx4 = tx.subscribe();
976                    time::sleep(Duration::from_micros(100)).await;
977                    drop(tx3);
978                    drop(rx3);
979                    drop(rx4);
980                    time::sleep(Duration::from_micros(50)).await;
981                }
982
983                drop(tx);
984            });
985
986            let rx_handle = spawn(async move {
987                let mut channel = Channel::new(0);
988                while let Some(message) = rx.recv().await {
989                    channel.assert_message(&message);
990                }
991            });
992
993            timeout(TEST_TIMEOUT, rx_handle)
994                .await
995                .expect("test timeout")
996                .expect("join failure");
997        }
998    }
999}
1000
1001#[cfg(test)]
1002mod async_std_tests {
1003    use std::time::Duration;
1004
1005    use async_std::{
1006        future::timeout,
1007        task::{self, spawn, JoinHandle},
1008    };
1009
1010    use crate::{
1011        sink::Sink,
1012        stream::{Stream, TryRecvError},
1013        test::{
1014            capacity_iter, Channel, Channels, Message, CHANNEL_TEST_RECEIVERS,
1015            CHANNEL_TEST_SENDERS, TEST_TIMEOUT,
1016        },
1017    };
1018
1019    #[async_std::test]
1020    async fn simple() {
1021        crate::logging::enable_log();
1022        for cap in capacity_iter() {
1023            let (mut tx, mut rx) = super::channel(cap);
1024
1025            spawn(async move {
1026                for message in Message::new_iter(0) {
1027                    tx.send(message).await.expect("send failed");
1028                }
1029            });
1030
1031            let rx_handle = spawn(async move {
1032                let mut channel = Channel::new(0);
1033                while let Some(message) = rx.recv().await {
1034                    channel.assert_message(&message);
1035                }
1036            });
1037
1038            timeout(TEST_TIMEOUT, rx_handle)
1039                .await
1040                .expect("test timeout");
1041        }
1042    }
1043
1044    #[async_std::test]
1045    async fn multi_sender() {
1046        // crate::logging::enable_log();
1047
1048        for cap in capacity_iter() {
1049            let (tx, mut rx) = super::channel(cap);
1050
1051            for i in 0..CHANNEL_TEST_SENDERS {
1052                let mut tx2 = tx.clone();
1053                spawn(async move {
1054                    for message in Message::new_multi_sender(i) {
1055                        tx2.send(message).await.expect("send failed");
1056                    }
1057                });
1058            }
1059
1060            drop(tx);
1061
1062            let rx_handle = spawn(async move {
1063                let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
1064                while let Some(message) = rx.recv().await {
1065                    channels.assert_message(&message);
1066                }
1067            });
1068
1069            timeout(TEST_TIMEOUT, rx_handle)
1070                .await
1071                .expect("test timeout");
1072        }
1073    }
1074
1075    #[async_std::test]
1076    async fn multi_receiver() {
1077        // crate::logging::enable_log();
1078        for cap in capacity_iter() {
1079            let (mut tx, rx) = super::channel(cap);
1080
1081            spawn(async move {
1082                for message in Message::new_iter(0) {
1083                    tx.send(message).await.expect("send failed");
1084                }
1085            });
1086
1087            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
1088                .map(|_| {
1089                    let mut rx2 = rx.clone();
1090                    let mut channels = Channels::new(1);
1091
1092                    spawn(async move {
1093                        while let Some(message) = rx2.recv().await {
1094                            channels.assert_message(&message);
1095                        }
1096                    })
1097                })
1098                .collect();
1099
1100            drop(rx);
1101
1102            let rx_handle = spawn(async move {
1103                for handle in handles {
1104                    handle.await;
1105                }
1106            });
1107
1108            timeout(TEST_TIMEOUT, rx_handle)
1109                .await
1110                .expect("test timeout");
1111        }
1112    }
1113
1114    #[async_std::test]
1115    async fn multi_sender_multi_receiver() {
1116        // crate::logging::enable_log();
1117
1118        for cap in capacity_iter() {
1119            let (tx, rx) = super::channel(cap);
1120
1121            for i in 0..CHANNEL_TEST_SENDERS {
1122                let mut tx2 = tx.clone();
1123                spawn(async move {
1124                    for message in Message::new_multi_sender(i) {
1125                        tx2.send(message).await.expect("send failed");
1126                    }
1127                });
1128            }
1129
1130            drop(tx);
1131
1132            let handles: Vec<JoinHandle<()>> = (0..CHANNEL_TEST_RECEIVERS)
1133                .map(|_i| {
1134                    let mut rx2 = rx.clone();
1135                    let mut channels = Channels::new(CHANNEL_TEST_SENDERS);
1136
1137                    spawn(async move {
1138                        while let Some(message) = rx2.recv().await {
1139                            channels.assert_message(&message);
1140                        }
1141                    })
1142                })
1143                .collect();
1144
1145            drop(rx);
1146
1147            let rx_handle = spawn(async move {
1148                for handle in handles {
1149                    handle.await;
1150                }
1151            });
1152
1153            timeout(TEST_TIMEOUT, rx_handle)
1154                .await
1155                .expect("test timeout");
1156        }
1157    }
1158
1159    #[async_std::test]
1160    async fn clone_monster() {
1161        // crate::logging::enable_log();
1162
1163        for cap in capacity_iter() {
1164            let (tx, mut rx) = super::channel(cap);
1165            let (mut barrier, mut sender_quit) = crate::barrier::channel();
1166
1167            let mut tx2 = tx.clone();
1168            spawn(async move {
1169                for message in Message::new_iter(0) {
1170                    tx2.send(message).await.expect("send failed");
1171                }
1172
1173                barrier.send(()).await.expect("clone task shutdown failed");
1174            });
1175
1176            let mut rx2 = rx.clone();
1177            spawn(async move {
1178                loop {
1179                    let next = rx2.try_recv();
1180
1181                    if let Ok(_) = next {
1182                        continue;
1183                    }
1184
1185                    if let Err(TryRecvError::Closed) = next {
1186                        break;
1187                    }
1188
1189                    if let Ok(_) = sender_quit.try_recv() {
1190                        break;
1191                    }
1192
1193                    let tx3 = tx.clone();
1194                    let rx3 = rx2.clone();
1195                    let rx4 = tx.subscribe();
1196                    task::sleep(Duration::from_micros(100)).await;
1197                    drop(tx3);
1198                    drop(rx3);
1199                    drop(rx4);
1200                    task::sleep(Duration::from_micros(50)).await;
1201                }
1202
1203                drop(tx);
1204            });
1205
1206            let rx_handle = spawn(async move {
1207                let mut channel = Channel::new(0);
1208                while let Some(message) = rx.recv().await {
1209                    channel.assert_message(&message);
1210                }
1211            });
1212
1213            timeout(TEST_TIMEOUT, rx_handle)
1214                .await
1215                .expect("test timeout");
1216        }
1217    }
1218}