postage/stream/
map.rs

1use std::{marker::PhantomData, pin::Pin};
2
3use crate::stream::{PollRecv, Stream};
4use crate::Context;
5use pin_project::pin_project;
6
7#[pin_project]
8pub struct MapStream<From, Map, Into> {
9    #[pin]
10    from: From,
11
12    map: Map,
13    into: PhantomData<Into>,
14}
15
16impl<From, Map, Into> MapStream<From, Map, Into>
17where
18    From: Stream,
19    Map: Fn(From::Item) -> Into,
20{
21    pub fn new(from: From, map: Map) -> Self {
22        Self {
23            from,
24            map,
25            into: PhantomData,
26        }
27    }
28}
29
30impl<From, Map, Into> Stream for MapStream<From, Map, Into>
31where
32    From: Stream,
33    Map: Fn(From::Item) -> Into,
34{
35    type Item = Into;
36
37    fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
38        let this = self.project();
39
40        match this.from.poll_recv(cx) {
41            PollRecv::Ready(v) => PollRecv::Ready((this.map)(v)),
42            PollRecv::Pending => PollRecv::Pending,
43            PollRecv::Closed => PollRecv::Closed,
44        }
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use std::pin::Pin;
51
52    use crate::test::stream::*;
53    use crate::{
54        stream::{PollRecv, Stream},
55        Context,
56    };
57    use std::convert::identity;
58
59    use super::MapStream;
60
61    #[test]
62    fn map() {
63        let source = from_iter(vec![1, 2, 3]);
64        let mut find = MapStream::new(source, |i| i + 10);
65
66        let mut cx = Context::empty();
67
68        assert_eq!(PollRecv::Ready(11), Pin::new(&mut find).poll_recv(&mut cx));
69        assert_eq!(PollRecv::Ready(12), Pin::new(&mut find).poll_recv(&mut cx));
70        assert_eq!(PollRecv::Ready(13), Pin::new(&mut find).poll_recv(&mut cx));
71        assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
72    }
73
74    #[test]
75    fn forward_pending() {
76        let source = pending::<usize>();
77        let mut find = MapStream::new(source, identity);
78
79        let mut cx = Context::empty();
80
81        assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx));
82    }
83
84    #[test]
85    fn forward_closed() {
86        let source = closed::<usize>();
87        let mut find = MapStream::new(source, identity);
88
89        let mut cx = Context::empty();
90
91        assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
92    }
93}