tokio_stream/stream_ext/
fold.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::PhantomPinned;
5use core::pin::Pin;
6use core::task::{ready, Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Future returned by the [`fold`](super::StreamExt::fold) method.
11    #[derive(Debug)]
12    #[must_use = "futures do nothing unless you `.await` or poll them"]
13    pub struct FoldFuture<St, B, F> {
14        #[pin]
15        stream: St,
16        acc: Option<B>,
17        f: F,
18        // Make this future `!Unpin` for compatibility with async trait methods.
19        #[pin]
20        _pin: PhantomPinned,
21    }
22}
23
24impl<St, B, F> FoldFuture<St, B, F> {
25    pub(super) fn new(stream: St, init: B, f: F) -> Self {
26        Self {
27            stream,
28            acc: Some(init),
29            f,
30            _pin: PhantomPinned,
31        }
32    }
33}
34
35impl<St, B, F> Future for FoldFuture<St, B, F>
36where
37    St: Stream,
38    F: FnMut(B, St::Item) -> B,
39{
40    type Output = B;
41
42    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43        let mut me = self.project();
44        loop {
45            let next = ready!(me.stream.as_mut().poll_next(cx));
46
47            match next {
48                Some(v) => {
49                    let old = me.acc.take().unwrap();
50                    let new = (me.f)(old, v);
51                    *me.acc = Some(new);
52                }
53                None => return Poll::Ready(me.acc.take().unwrap()),
54            }
55        }
56    }
57}