futures_util/stream/stream/
enumerate.rs

1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5#[cfg(feature = "sink")]
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
11    #[derive(Debug)]
12    #[must_use = "streams do nothing unless polled"]
13    pub struct Enumerate<St> {
14        #[pin]
15        stream: St,
16        count: usize,
17    }
18}
19
20impl<St: Stream> Enumerate<St> {
21    pub(super) fn new(stream: St) -> Self {
22        Self { stream, count: 0 }
23    }
24
25    delegate_access_inner!(stream, St, ());
26}
27
28impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
29    fn is_terminated(&self) -> bool {
30        self.stream.is_terminated()
31    }
32}
33
34impl<St: Stream> Stream for Enumerate<St> {
35    type Item = (usize, St::Item);
36
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let this = self.project();
39
40        match ready!(this.stream.poll_next(cx)) {
41            Some(item) => {
42                let prev_count = *this.count;
43                *this.count += 1;
44                Poll::Ready(Some((prev_count, item)))
45            }
46            None => Poll::Ready(None),
47        }
48    }
49
50    fn size_hint(&self) -> (usize, Option<usize>) {
51        self.stream.size_hint()
52    }
53}
54
55// Forwarding impl of Sink from the underlying stream
56#[cfg(feature = "sink")]
57impl<S, Item> Sink<Item> for Enumerate<S>
58where
59    S: Stream + Sink<Item>,
60{
61    type Error = S::Error;
62
63    delegate_sink!(stream, Item);
64}