1use std::{marker::PhantomData, pin::Pin};
2
3use crate::stream::{PollRecv, Stream};
4use crate::Context;
5use pin_project::pin_project;
6
7#[pin_project]
8pub struct MapStream<From, Map, Into> {
9 #[pin]
10 from: From,
11
12 map: Map,
13 into: PhantomData<Into>,
14}
15
16impl<From, Map, Into> MapStream<From, Map, Into>
17where
18 From: Stream,
19 Map: Fn(From::Item) -> Into,
20{
21 pub fn new(from: From, map: Map) -> Self {
22 Self {
23 from,
24 map,
25 into: PhantomData,
26 }
27 }
28}
29
30impl<From, Map, Into> Stream for MapStream<From, Map, Into>
31where
32 From: Stream,
33 Map: Fn(From::Item) -> Into,
34{
35 type Item = Into;
36
37 fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
38 let this = self.project();
39
40 match this.from.poll_recv(cx) {
41 PollRecv::Ready(v) => PollRecv::Ready((this.map)(v)),
42 PollRecv::Pending => PollRecv::Pending,
43 PollRecv::Closed => PollRecv::Closed,
44 }
45 }
46}
47
48#[cfg(test)]
49mod tests {
50 use std::pin::Pin;
51
52 use crate::test::stream::*;
53 use crate::{
54 stream::{PollRecv, Stream},
55 Context,
56 };
57 use std::convert::identity;
58
59 use super::MapStream;
60
61 #[test]
62 fn map() {
63 let source = from_iter(vec![1, 2, 3]);
64 let mut find = MapStream::new(source, |i| i + 10);
65
66 let mut cx = Context::empty();
67
68 assert_eq!(PollRecv::Ready(11), Pin::new(&mut find).poll_recv(&mut cx));
69 assert_eq!(PollRecv::Ready(12), Pin::new(&mut find).poll_recv(&mut cx));
70 assert_eq!(PollRecv::Ready(13), Pin::new(&mut find).poll_recv(&mut cx));
71 assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
72 }
73
74 #[test]
75 fn forward_pending() {
76 let source = pending::<usize>();
77 let mut find = MapStream::new(source, identity);
78
79 let mut cx = Context::empty();
80
81 assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx));
82 }
83
84 #[test]
85 fn forward_closed() {
86 let source = closed::<usize>();
87 let mut find = MapStream::new(source, identity);
88
89 let mut cx = Context::empty();
90
91 assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
92 }
93}