1use std::task::Poll;
2
3macro_rules! poll {
4 ($self:ident, $cx:ident) => {{
5 use crate::prelude::Stream;
6
7 let mut cx = $cx.into();
8
9 return match $self.poll_recv(&mut cx) {
10 crate::stream::PollRecv::Ready(v) => Poll::Ready(Some(v)),
11 crate::stream::PollRecv::Pending => Poll::Pending,
12 crate::stream::PollRecv::Closed => Poll::Ready(None),
13 };
14 }};
15}
16
17impl futures::stream::Stream for crate::barrier::Receiver {
18 type Item = ();
19
20 fn poll_next(
21 self: std::pin::Pin<&mut Self>,
22 cx: &mut std::task::Context<'_>,
23 ) -> Poll<Option<Self::Item>> {
24 poll!(self, cx)
25 }
26}
27
28impl<T: Clone> futures::stream::Stream for crate::broadcast::Receiver<T> {
29 type Item = T;
30
31 fn poll_next(
32 self: std::pin::Pin<&mut Self>,
33 cx: &mut std::task::Context<'_>,
34 ) -> Poll<Option<Self::Item>> {
35 poll!(self, cx)
36 }
37}
38
39impl<T> futures::stream::Stream for crate::dispatch::Receiver<T> {
40 type Item = T;
41
42 fn poll_next(
43 self: std::pin::Pin<&mut Self>,
44 cx: &mut std::task::Context<'_>,
45 ) -> Poll<Option<Self::Item>> {
46 poll!(self, cx)
47 }
48}
49
50impl<T> futures::stream::Stream for crate::mpsc::Receiver<T> {
51 type Item = T;
52
53 fn poll_next(
54 self: std::pin::Pin<&mut Self>,
55 cx: &mut std::task::Context<'_>,
56 ) -> Poll<Option<Self::Item>> {
57 poll!(self, cx)
58 }
59}
60
61impl<T> futures::stream::Stream for crate::oneshot::Receiver<T> {
62 type Item = T;
63
64 fn poll_next(
65 self: std::pin::Pin<&mut Self>,
66 cx: &mut std::task::Context<'_>,
67 ) -> Poll<Option<Self::Item>> {
68 poll!(self, cx)
69 }
70}
71
72impl<T: Clone> futures::stream::Stream for crate::watch::Receiver<T> {
73 type Item = T;
74
75 fn poll_next(
76 self: std::pin::Pin<&mut Self>,
77 cx: &mut std::task::Context<'_>,
78 ) -> Poll<Option<Self::Item>> {
79 poll!(self, cx)
80 }
81}
82
83#[cfg(test)]
84mod sink_tests {
85 use std::{pin::Pin, task::Poll};
86
87 use crate::{barrier, dispatch, mpsc, oneshot, sink::SendError, watch};
88 use futures::Sink;
89
90 macro_rules! test_sink {
91 ($chan:expr, $val:expr) => {
92 let mut std_cx = futures_test::task::noop_context();
93
94 let (mut tx, rx) = $chan;
95
96 assert_eq!(
97 Poll::Ready(Ok(())),
98 Pin::new(&mut tx).poll_ready(&mut std_cx)
99 );
100 assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
101
102 assert_eq!(Poll::Pending, Pin::new(&mut tx).poll_ready(&mut std_cx));
103
104 drop(rx);
105 assert_eq!(
106 Poll::Ready(Ok(())),
107 Pin::new(&mut tx).poll_ready(&mut std_cx)
108 );
109 assert_eq!(Err(SendError($val)), Pin::new(&mut tx).start_send($val));
110 };
111 }
112
113 macro_rules! test_sink_ready {
114 ($chan:expr, $val:expr) => {
115 let mut std_cx = futures_test::task::noop_context();
116
117 let (mut tx, _rx) = $chan;
118
119 assert_eq!(
120 Poll::Ready(Ok(())),
121 Pin::new(&mut tx).poll_ready(&mut std_cx)
122 );
123 assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
124
125 assert_eq!(
126 Poll::Ready(Ok(())),
127 Pin::new(&mut tx).poll_ready(&mut std_cx)
128 );
129 assert_eq!(Ok(()), Pin::new(&mut tx).start_send($val));
130 };
131 }
132
133 #[test]
134 fn barrier() {
135 let mut std_cx = futures_test::task::noop_context();
136
137 let (mut tx, _rx) = barrier::channel();
138
139 assert_eq!(
140 Poll::Ready(Ok(())),
141 Pin::new(&mut tx).poll_ready(&mut std_cx)
142 );
143 assert_eq!(Ok(()), Pin::new(&mut tx).start_send(()));
144
145 assert_eq!(
146 Poll::Ready(Err(SendError(()))),
147 Pin::new(&mut tx).poll_ready(&mut std_cx)
148 );
149 assert_eq!(Err(SendError(())), Pin::new(&mut tx).start_send(()));
150 }
151
152 #[test]
158 fn dispatch() {
159 test_sink!(dispatch::channel(1), 1usize);
160 }
161
162 #[test]
163 fn mpsc() {
164 test_sink!(mpsc::channel(1), 1usize);
165 }
166
167 #[test]
168 fn oneshot() {
169 let mut std_cx = futures_test::task::noop_context();
170
171 let (mut tx, rx) = oneshot::channel();
172
173 assert_eq!(
174 Poll::Ready(Ok(())),
175 Pin::new(&mut tx).poll_ready(&mut std_cx)
176 );
177 assert_eq!(Ok(()), Pin::new(&mut tx).start_send(1usize));
178
179 assert_eq!(
180 Poll::Ready(Ok(())),
181 Pin::new(&mut tx).poll_ready(&mut std_cx)
182 );
183 assert_eq!(Err(SendError(1usize)), Pin::new(&mut tx).start_send(1usize));
184
185 drop(rx);
186
187 assert_eq!(
188 Poll::Ready(Ok(())),
189 Pin::new(&mut tx).poll_ready(&mut std_cx)
190 );
191 assert_eq!(Err(SendError(1usize)), Pin::new(&mut tx).start_send(1usize));
192 }
193
194 #[test]
195 fn watch() {
196 test_sink_ready!(watch::channel(), 1usize);
197 }
198}
199
200#[cfg(test)]
201mod stream_tests {
202 use std::{pin::Pin, task::Poll};
203
204 use crate::{
205 barrier, broadcast, dispatch, mpsc, oneshot,
206 sink::{PollSend, Sink},
207 watch,
208 };
209 use futures::Stream;
210
211 macro_rules! test_stream {
212 ($chan:expr, $val:expr) => {
213 let mut std_cx = futures_test::task::noop_context();
214 let mut cx = crate::test::noop_context();
215
216 let (mut tx, mut rx) = $chan;
217 assert_eq!(Poll::Pending, Pin::new(&mut rx).poll_next(&mut std_cx));
218
219 assert_eq!(PollSend::Ready, Pin::new(&mut tx).poll_send(&mut cx, $val));
220
221 assert_eq!(
222 Poll::Ready(Some($val)),
223 Pin::new(&mut rx).poll_next(&mut std_cx)
224 );
225
226 drop(tx);
227
228 assert_eq!(Poll::Ready(None), Pin::new(&mut rx).poll_next(&mut std_cx));
229 };
230 }
231
232 #[test]
233 fn barrier() {
234 let mut std_cx = futures_test::task::noop_context();
235 let mut cx = crate::test::noop_context();
236
237 let (mut tx, mut rx) = barrier::channel();
238 assert_eq!(Poll::Pending, Pin::new(&mut rx).poll_next(&mut std_cx));
239
240 assert_eq!(PollSend::Ready, Pin::new(&mut tx).poll_send(&mut cx, ()));
241
242 assert_eq!(
243 Poll::Ready(Some(())),
244 Pin::new(&mut rx).poll_next(&mut std_cx)
245 );
246
247 drop(tx);
248
249 assert_eq!(
250 Poll::Ready(Some(())),
251 Pin::new(&mut rx).poll_next(&mut std_cx)
252 );
253 }
254
255 #[test]
256 fn broadcast() {
257 test_stream!(broadcast::channel(4), 1usize);
258 }
259
260 #[test]
261 fn dispatch() {
262 test_stream!(dispatch::channel(4), 1usize);
263 }
264
265 #[test]
266 fn mpsc() {
267 test_stream!(mpsc::channel(4), 1usize);
268 }
269
270 #[test]
271 fn oneshot() {
272 test_stream!(oneshot::channel(), 1usize);
273 }
274
275 #[test]
276 fn watch() {
277 let mut std_cx = futures_test::task::noop_context();
278 let mut cx = crate::test::noop_context();
279
280 let (mut tx, mut rx) = watch::channel();
281 assert_eq!(
282 Poll::Ready(Some(0usize)),
283 Pin::new(&mut rx).poll_next(&mut std_cx)
284 );
285
286 assert_eq!(
287 PollSend::Ready,
288 Pin::new(&mut tx).poll_send(&mut cx, 1usize)
289 );
290
291 assert_eq!(
292 Poll::Ready(Some(1usize)),
293 Pin::new(&mut rx).poll_next(&mut std_cx)
294 );
295
296 drop(tx);
297
298 assert_eq!(Poll::Ready(None), Pin::new(&mut rx).poll_next(&mut std_cx));
299 }
300}