postage/
futures.rs

1use std::task::Poll;
2
3macro_rules! poll {
4    ($self:ident, $cx:ident) => {{
5        use crate::prelude::Stream;
6
7        let mut cx = $cx.into();
8
9        return match $self.poll_recv(&mut cx) {
10            crate::stream::PollRecv::Ready(v) => Poll::Ready(Some(v)),
11            crate::stream::PollRecv::Pending => Poll::Pending,
12            crate::stream::PollRecv::Closed => Poll::Ready(None),
13        };
14    }};
15}
16
17impl futures::stream::Stream for crate::barrier::Receiver {
18    type Item = ();
19
20    fn poll_next(
21        self: std::pin::Pin<&mut Self>,
22        cx: &mut std::task::Context<'_>,
23    ) -> Poll<Option<Self::Item>> {
24        poll!(self, cx)
25    }
26}
27
28impl<T: Clone> futures::stream::Stream for crate::broadcast::Receiver<T> {
29    type Item = T;
30
31    fn poll_next(
32        self: std::pin::Pin<&mut Self>,
33        cx: &mut std::task::Context<'_>,
34    ) -> Poll<Option<Self::Item>> {
35        poll!(self, cx)
36    }
37}
38
39impl<T> futures::stream::Stream for crate::dispatch::Receiver<T> {
40    type Item = T;
41
42    fn poll_next(
43        self: std::pin::Pin<&mut Self>,
44        cx: &mut std::task::Context<'_>,
45    ) -> Poll<Option<Self::Item>> {
46        poll!(self, cx)
47    }
48}
49
50impl<T> futures::stream::Stream for crate::mpsc::Receiver<T> {
51    type Item = T;
52
53    fn poll_next(
54        self: std::pin::Pin<&mut Self>,
55        cx: &mut std::task::Context<'_>,
56    ) -> Poll<Option<Self::Item>> {
57        poll!(self, cx)
58    }
59}
60
61impl<T> futures::stream::Stream for crate::oneshot::Receiver<T> {
62    type Item = T;
63
64    fn poll_next(
65        self: std::pin::Pin<&mut Self>,
66        cx: &mut std::task::Context<'_>,
67    ) -> Poll<Option<Self::Item>> {
68        poll!(self, cx)
69    }
70}
71
72impl<T: Clone> futures::stream::Stream for crate::watch::Receiver<T> {
73    type Item = T;
74
75    fn poll_next(
76        self: std::pin::Pin<&mut Self>,
77        cx: &mut std::task::Context<'_>,
78    ) -> Poll<Option<Self::Item>> {
79        poll!(self, cx)
80    }
81}
82
83#[cfg(test)]
84mod sink_tests {
85    use std::{pin::Pin, task::Poll};
86
87    use crate::{barrier, dispatch, mpsc, oneshot, sink::SendError, watch};
88    use futures::Sink;
89
90    macro_rules! test_sink {
91        ($chan:expr, $val:expr) => {
92            let mut std_cx = futures_test::task::noop_context();
93
94            let (mut tx, rx) = $chan;
95
96            assert_eq!(
97                Poll::Ready(Ok(())),
98                Pin::new(&mut tx).poll_ready(&mut std_cx)
99            );
100            assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
101
102            assert_eq!(Poll::Pending, Pin::new(&mut tx).poll_ready(&mut std_cx));
103
104            drop(rx);
105            assert_eq!(
106                Poll::Ready(Ok(())),
107                Pin::new(&mut tx).poll_ready(&mut std_cx)
108            );
109            assert_eq!(Err(SendError($val)), Pin::new(&mut tx).start_send($val));
110        };
111    }
112
113    macro_rules! test_sink_ready {
114        ($chan:expr, $val:expr) => {
115            let mut std_cx = futures_test::task::noop_context();
116
117            let (mut tx, _rx) = $chan;
118
119            assert_eq!(
120                Poll::Ready(Ok(())),
121                Pin::new(&mut tx).poll_ready(&mut std_cx)
122            );
123            assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
124
125            assert_eq!(
126                Poll::Ready(Ok(())),
127                Pin::new(&mut tx).poll_ready(&mut std_cx)
128            );
129            assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
130        };
131    }
132
133    #[test]
134    fn barrier() {
135        let mut std_cx = futures_test::task::noop_context();
136
137        let (mut tx, _rx) = barrier::channel();
138
139        assert_eq!(
140            Poll::Ready(Ok(())),
141            Pin::new(&mut tx).poll_ready(&mut std_cx)
142        );
143        assert_eq!(Ok(()), Pin::new(&mut tx).start_send(()));
144
145        assert_eq!(
146            Poll::Ready(Err(SendError(()))),
147            Pin::new(&mut tx).poll_ready(&mut std_cx)
148        );
149        assert_eq!(Err(SendError(())), Pin::new(&mut tx).start_send(()));
150    }
151
152    // I couldn't implement the trait for the broadcast channel.
153    // Values and Contexts are intricately linked to the internal structure of the broadcast channel.
154    // Unfortunately, futures::Sink provides the Context and Item separately,
155    // so there was no reasonable way to implement the futures trait.
156
157    #[test]
158    fn dispatch() {
159        test_sink!(dispatch::channel(1), 1usize);
160    }
161
162    #[test]
163    fn mpsc() {
164        test_sink!(mpsc::channel(1), 1usize);
165    }
166
167    #[test]
168    fn oneshot() {
169        let mut std_cx = futures_test::task::noop_context();
170
171        let (mut tx, rx) = oneshot::channel();
172
173        assert_eq!(
174            Poll::Ready(Ok(())),
175            Pin::new(&mut tx).poll_ready(&mut std_cx)
176        );
177        assert_eq!(Ok(()), Pin::new(&mut tx).start_send(1usize));
178
179        assert_eq!(
180            Poll::Ready(Ok(())),
181            Pin::new(&mut tx).poll_ready(&mut std_cx)
182        );
183        assert_eq!(Err(SendError(1usize)), Pin::new(&mut tx).start_send(1usize));
184
185        drop(rx);
186
187        assert_eq!(
188            Poll::Ready(Ok(())),
189            Pin::new(&mut tx).poll_ready(&mut std_cx)
190        );
191        assert_eq!(Err(SendError(1usize)), Pin::new(&mut tx).start_send(1usize));
192    }
193
194    #[test]
195    fn watch() {
196        test_sink_ready!(watch::channel(), 1usize);
197    }
198}
199
200#[cfg(test)]
201mod stream_tests {
202    use std::{pin::Pin, task::Poll};
203
204    use crate::{
205        barrier, broadcast, dispatch, mpsc, oneshot,
206        sink::{PollSend, Sink},
207        watch,
208    };
209    use futures::Stream;
210
211    macro_rules! test_stream {
212        ($chan:expr, $val:expr) => {
213            let mut std_cx = futures_test::task::noop_context();
214            let mut cx = crate::test::noop_context();
215
216            let (mut tx, mut rx) = $chan;
217            assert_eq!(Poll::Pending, Pin::new(&mut rx).poll_next(&mut std_cx));
218
219            assert_eq!(PollSend::Ready, Pin::new(&mut tx).poll_send(&mut cx, $val));
220
221            assert_eq!(
222                Poll::Ready(Some($val)),
223                Pin::new(&mut rx).poll_next(&mut std_cx)
224            );
225
226            drop(tx);
227
228            assert_eq!(Poll::Ready(None), Pin::new(&mut rx).poll_next(&mut std_cx));
229        };
230    }
231
232    #[test]
233    fn barrier() {
234        let mut std_cx = futures_test::task::noop_context();
235        let mut cx = crate::test::noop_context();
236
237        let (mut tx, mut rx) = barrier::channel();
238        assert_eq!(Poll::Pending, Pin::new(&mut rx).poll_next(&mut std_cx));
239
240        assert_eq!(PollSend::Ready, Pin::new(&mut tx).poll_send(&mut cx, ()));
241
242        assert_eq!(
243            Poll::Ready(Some(())),
244            Pin::new(&mut rx).poll_next(&mut std_cx)
245        );
246
247        drop(tx);
248
249        assert_eq!(
250            Poll::Ready(Some(())),
251            Pin::new(&mut rx).poll_next(&mut std_cx)
252        );
253    }
254
255    #[test]
256    fn broadcast() {
257        test_stream!(broadcast::channel(4), 1usize);
258    }
259
260    #[test]
261    fn dispatch() {
262        test_stream!(dispatch::channel(4), 1usize);
263    }
264
265    #[test]
266    fn mpsc() {
267        test_stream!(mpsc::channel(4), 1usize);
268    }
269
270    #[test]
271    fn oneshot() {
272        test_stream!(oneshot::channel(), 1usize);
273    }
274
275    #[test]
276    fn watch() {
277        let mut std_cx = futures_test::task::noop_context();
278        let mut cx = crate::test::noop_context();
279
280        let (mut tx, mut rx) = watch::channel();
281        assert_eq!(
282            Poll::Ready(Some(0usize)),
283            Pin::new(&mut rx).poll_next(&mut std_cx)
284        );
285
286        assert_eq!(
287            PollSend::Ready,
288            Pin::new(&mut tx).poll_send(&mut cx, 1usize)
289        );
290
291        assert_eq!(
292            Poll::Ready(Some(1usize)),
293            Pin::new(&mut rx).poll_next(&mut std_cx)
294        );
295
296        drop(tx);
297
298        assert_eq!(Poll::Ready(None), Pin::new(&mut rx).poll_next(&mut std_cx));
299    }
300}