futures_util/stream/stream/
map.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10use crate::fns::FnMut1;
11
12pin_project! {
13    /// Stream for the [`map`](super::StreamExt::map) method.
14    #[must_use = "streams do nothing unless polled"]
15    pub struct Map<St, F> {
16        #[pin]
17        stream: St,
18        f: F,
19    }
20}
21
22impl<St, F> fmt::Debug for Map<St, F>
23where
24    St: fmt::Debug,
25{
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("Map").field("stream", &self.stream).finish()
28    }
29}
30
31impl<St, F> Map<St, F> {
32    pub(crate) fn new(stream: St, f: F) -> Self {
33        Self { stream, f }
34    }
35
36    delegate_access_inner!(stream, St, ());
37}
38
39impl<St, F> FusedStream for Map<St, F>
40where
41    St: FusedStream,
42    F: FnMut1<St::Item>,
43{
44    fn is_terminated(&self) -> bool {
45        self.stream.is_terminated()
46    }
47}
48
49impl<St, F> Stream for Map<St, F>
50where
51    St: Stream,
52    F: FnMut1<St::Item>,
53{
54    type Item = F::Output;
55
56    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        let mut this = self.project();
58        let res = ready!(this.stream.as_mut().poll_next(cx));
59        Poll::Ready(res.map(|x| this.f.call_mut(x)))
60    }
61
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        self.stream.size_hint()
64    }
65}
66
67// Forwarding impl of Sink from the underlying stream
68#[cfg(feature = "sink")]
69impl<St, F, Item> Sink<Item> for Map<St, F>
70where
71    St: Stream + Sink<Item>,
72    F: FnMut1<St::Item>,
73{
74    type Error = St::Error;
75
76    delegate_sink!(stream, Item);
77}