tokio_stream/stream_ext/
next.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::PhantomPinned;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Future for the [`next`](super::StreamExt::next) method.
11    ///
12    /// # Cancel safety
13    ///
14    /// This method is cancel safe. It only
15    /// holds onto a reference to the underlying stream,
16    /// so dropping it will never lose a value.
17    ///
18    #[derive(Debug)]
19    #[must_use = "futures do nothing unless you `.await` or poll them"]
20    pub struct Next<'a, St: ?Sized> {
21        stream: &'a mut St,
22        // Make this future `!Unpin` for compatibility with async trait methods.
23        #[pin]
24        _pin: PhantomPinned,
25    }
26}
27
28impl<'a, St: ?Sized> Next<'a, St> {
29    pub(super) fn new(stream: &'a mut St) -> Self {
30        Next {
31            stream,
32            _pin: PhantomPinned,
33        }
34    }
35}
36
37impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
38    type Output = Option<St::Item>;
39
40    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41        let me = self.project();
42        Pin::new(me.stream).poll_next(cx)
43    }
44}