tokio_stream/stream_ext/
all.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::PhantomPinned;
5use core::pin::Pin;
6use core::task::{ready, Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Future for the [`all`](super::StreamExt::all) method.
11    #[derive(Debug)]
12    #[must_use = "futures do nothing unless you `.await` or poll them"]
13    pub struct AllFuture<'a, St: ?Sized, F> {
14        stream: &'a mut St,
15        f: F,
16        // Make this future `!Unpin` for compatibility with async trait methods.
17        #[pin]
18        _pin: PhantomPinned,
19    }
20}
21
22impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
23    pub(super) fn new(stream: &'a mut St, f: F) -> Self {
24        Self {
25            stream,
26            f,
27            _pin: PhantomPinned,
28        }
29    }
30}
31
32impl<St, F> Future for AllFuture<'_, St, F>
33where
34    St: ?Sized + Stream + Unpin,
35    F: FnMut(St::Item) -> bool,
36{
37    type Output = bool;
38
39    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40        let me = self.project();
41        let mut stream = Pin::new(me.stream);
42
43        // Take a maximum of 32 items from the stream before yielding.
44        for _ in 0..32 {
45            match ready!(stream.as_mut().poll_next(cx)) {
46                Some(v) => {
47                    if !(me.f)(v) {
48                        return Poll::Ready(false);
49                    }
50                }
51                None => return Poll::Ready(true),
52            }
53        }
54
55        cx.waker().wake_by_ref();
56        Poll::Pending
57    }
58}