postage/
stream.rs

1//! A stream of values which are asynchronously produced, until the source is closed.
2//!
3//! Postage channel receivers implement Stream:
4//! ```rust
5//! use postage::mpsc::channel;
6//! use postage::sink::Sink;
7//! use postage::stream::Stream;
8//!
9//! #[tokio::main]
10//! async fn main() {
11//!     let (mut tx, mut rx) = channel(16);
12//!     tx.send(true).await;
13//!     drop(tx);
14//!     assert_eq!(Some(true), rx.recv().await);
15//!     assert_eq!(None, rx.recv().await);
16//! }
17//! ```
18//!
19//! Streams produce `Option<T>`.  When a None value is recieved, the stream is closed and
20//! will never produce another item.  Loops can be concicely written with `while let Some(v) = rx.recv().await {}`
21//! ```rust
22//! use postage::mpsc::channel;
23//! use postage::sink::Sink;
24//! use postage::stream::Stream;
25//!
26//! #[tokio::main]
27//! async fn main() {
28//!     let (mut tx, mut rx) = channel(16);
29//!     tx.send(true).await;
30//!     tx.send(true).await;
31//!     drop(tx);
32//!
33//!     while let Some(v) = rx.recv().await {
34//!         println!("Value received!: {}", v);
35//!     }
36//! }
37//! ```
38use std::{future::Future, marker::PhantomPinned, ops::DerefMut, pin::Pin};
39
40use crate::Context;
41use pin_project::pin_project;
42use std::task::Poll;
43
44use self::{
45    chain::ChainStream, filter::FilterStream, find::FindStream, map::MapStream, merge::MergeStream,
46    once::OnceStream, repeat::RepeatStream,
47};
48
49mod chain;
50mod errors;
51mod filter;
52mod find;
53mod map;
54mod merge;
55mod once;
56mod repeat;
57
58#[cfg(feature = "logging")]
59mod stream_log;
60
61pub use errors::*;
62
63/// An asynchronous stream, which produces a series of messages until closed.
64///
65/// Streams implement `poll_recv`, a poll-based method very similar to `std::future::Future`.
66///
67/// Streams can be used in async code with `stream.recv().await`, or with `stream.try_recv()`.
68///
69/// ```rust
70/// use postage::mpsc::channel;
71/// use postage::sink::Sink;
72/// use postage::stream::Stream;
73///
74/// #[tokio::main]
75/// async fn main() {
76///     let (mut tx, mut rx) = channel(16);
77///     tx.send(true).await;
78///     tx.send(true).await;
79///     drop(tx);
80///
81///     while let Some(_v) = rx.recv().await {
82///         println!("Value received!");
83///         if let Ok(_v) = rx.try_recv() {
84///             println!("Extra value received!");
85///         }
86///     }
87/// }
88/// ```
89///
90/// Streams also support combinators, such as map, filter, find, and log.
91/// ```rust
92/// use postage::mpsc::channel;
93/// use postage::sink::Sink;
94/// use postage::stream::{Stream, TryRecvError};
95///
96/// #[tokio::main]
97/// async fn main() {
98///     let (mut tx, rx) = channel(16);
99///
100///     tx.send(1usize).await;
101///     tx.send(2usize).await;
102///     tx.send(3usize).await;
103///     drop(tx);
104///
105///     let mut rx = rx
106///         .map(|i| i * 2)
107///         .filter(|i| *i >= 4)
108///         .find(|i| *i == 6);
109///
110///     // The `logging` feature enables a combinator that logs values using the Debug trait.
111///     #[cfg(feature = "logging")]
112///     let mut rx = rx
113///         .log(log::Level::Info);
114///
115///     assert_eq!(Ok(6), rx.try_recv());
116///     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
117/// }
118/// ```
119#[must_use = "streams do nothing unless polled"]
120pub trait Stream {
121    type Item;
122
123    /// Attempts to retrieve an item from the stream, without blocking.
124    ///
125    /// Returns:
126    /// - `PollRecv::Ready(value)` if a message is ready
127    /// - `PollRecv::Pending` if the stream is open, but no message is currently available.
128    /// - `PollRecv::Closed` if the stream is closed, and no messages are expected.
129    fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>;
130
131    /// Retrieves a message from the stream.
132    ///
133    /// Returns:
134    /// - `Some(value)` if the stream is open
135    /// - `None` if the stream is closed, and no further messages are expected.
136    fn recv(&mut self) -> RecvFuture<'_, Self>
137    where
138        Self: Unpin,
139    {
140        RecvFuture::new(self)
141    }
142
143    /// Attempts to retrive a message from the stream, without blocking.
144    ///
145    /// Returns:
146    /// - `Ok(value)` if a message is ready.
147    /// - `TryRecvError::Pending` if the stream is open, but no messages are available.
148    /// - `TryRecvError::Closed` if the stream has been closed, and no items are expected.
149    fn try_recv(&mut self) -> Result<Self::Item, TryRecvError>
150    where
151        Self: Unpin,
152    {
153        let pin = Pin::new(self);
154
155        match pin.poll_recv(&mut Context::empty()) {
156            PollRecv::Ready(value) => Ok(value),
157            PollRecv::Pending => Err(TryRecvError::Pending),
158            PollRecv::Closed => Err(TryRecvError::Closed),
159        }
160    }
161
162    /// Retrieves a message from the stream, blocking the current thread until one is available.
163    ///
164    /// Returns:
165    /// - `Some(value)` if the stream is open
166    /// - `None` if the stream is closed, and no further messages are expected.
167    #[cfg(feature = "blocking")]
168    fn blocking_recv(&mut self) -> Option<Self::Item>
169    where
170        Self: Unpin,
171    {
172        pollster::block_on(self.recv())
173    }
174
175    /// Transforms the stream with a map function.
176    fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into>
177    where
178        Map: Fn(Self::Item) -> Into,
179        Self: Sized,
180    {
181        MapStream::new(self, map)
182    }
183
184    /// Filters messages returned by the stream, ignoring messages where `filter` returns false.
185    fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter>
186    where
187        Self: Sized + Unpin,
188        Filter: FnMut(&Self::Item) -> bool + Unpin,
189    {
190        FilterStream::new(self, filter)
191    }
192
193    /// Merges two streams, returning values from both at once, until both are closed.
194    fn merge<Other>(self, other: Other) -> MergeStream<Self, Other>
195    where
196        Other: Stream<Item = Self::Item>,
197        Self: Sized,
198    {
199        MergeStream::new(self, other)
200    }
201
202    /// Chains two streams, returning values from `self` until it is closed, and then returning values from `other`.
203    fn chain<Other>(self, other: Other) -> ChainStream<Self, Other>
204    where
205        Other: Stream<Item = Self::Item>,
206        Self: Sized,
207    {
208        ChainStream::new(self, other)
209    }
210
211    /// Finds a message matching a condition.  When the condition is matched, a single value will be returned.
212    /// Then the stream will be closed.
213    fn find<Condition>(self, condition: Condition) -> FindStream<Self, Condition>
214    where
215        Self: Sized + Unpin,
216        Condition: Fn(&Self::Item) -> bool + Unpin,
217    {
218        FindStream::new(self, condition)
219    }
220
221    /// Logs messages that are produced by the stream using the Debug trait, at the provided log level.
222    ///
223    /// Requires the `logging` feature
224    #[cfg(feature = "logging")]
225    fn log(self, level: log::Level) -> stream_log::StreamLog<Self>
226    where
227        Self: Sized,
228        Self::Item: std::fmt::Debug,
229    {
230        stream_log::StreamLog::new(self, level)
231    }
232}
233
234impl<S> Stream for &mut S
235where
236    S: Stream + Unpin + ?Sized,
237{
238    type Item = S::Item;
239
240    fn poll_recv(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
241        S::poll_recv(Pin::new(&mut **self), cx)
242    }
243}
244
245impl<P, S> Stream for Pin<P>
246where
247    P: DerefMut<Target = S> + Unpin,
248    S: Stream + Unpin + ?Sized,
249{
250    type Item = <S as Stream>::Item;
251
252    fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
253        Pin::get_mut(self).as_mut().poll_recv(cx)
254    }
255}
256
257/// Returns a stream which produces a single value, and then is closed.
258pub fn once<T>(item: T) -> OnceStream<T> {
259    OnceStream::new(item)
260}
261
262/// Returns a stream which infiniately produces a clonable value.
263pub fn repeat<T>(item: T) -> RepeatStream<T>
264where
265    T: Clone,
266{
267    RepeatStream::new(item)
268}
269
270/// An enum of poll responses that are produced by Stream implementations.
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub enum PollRecv<T> {
273    /// An item is ready
274    Ready(T),
275    /// The channel is open, but no messages are ready and the receiver has registered with the waker context
276    Pending,
277    /// The channel is closed, and no messages will ever be delivered
278    Closed,
279}
280
281/// A future returned by `Stream::recv`.
282#[pin_project]
283#[must_use = "futures do nothing unless polled"]
284pub struct RecvFuture<'s, S>
285where
286    S: Stream + ?Sized,
287{
288    recv: &'s mut S,
289    #[pin]
290    _pin: PhantomPinned,
291}
292
293impl<'s, S: Stream> RecvFuture<'s, S>
294where
295    S: ?Sized,
296{
297    pub fn new(recv: &'s mut S) -> RecvFuture<'s, S> {
298        Self {
299            recv,
300            _pin: PhantomPinned,
301        }
302    }
303}
304
305impl<'s, S> Future for RecvFuture<'s, S>
306where
307    S: Stream + Unpin + ?Sized,
308{
309    type Output = Option<S::Item>;
310
311    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
312        let this = self.project();
313
314        let mut cx: crate::Context<'_> = cx.into();
315        match Pin::new(this.recv).poll_recv(&mut cx) {
316            PollRecv::Ready(v) => Poll::Ready(Some(v)),
317            PollRecv::Pending => Poll::Pending,
318            PollRecv::Closed => Poll::Ready(None),
319        }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325
326    #[cfg(feature = "blocking")]
327    #[test]
328    fn test_blocking() {
329        use super::Stream;
330        use crate::test::stream::ready;
331
332        let mut stream = ready(1usize);
333        assert_eq!(Some(1usize), stream.blocking_recv());
334    }
335}