tokio_stream/stream_ext/
timeout_repeating.rs

1use crate::stream_ext::Fuse;
2use crate::{Elapsed, Stream};
3use tokio::time::Interval;
4
5use core::pin::Pin;
6use core::task::{ready, Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method.
11    #[must_use = "streams do nothing unless polled"]
12    #[derive(Debug)]
13    pub struct TimeoutRepeating<S> {
14        #[pin]
15        stream: Fuse<S>,
16        #[pin]
17        interval: Interval,
18    }
19}
20
21impl<S: Stream> TimeoutRepeating<S> {
22    pub(super) fn new(stream: S, interval: Interval) -> Self {
23        TimeoutRepeating {
24            stream: Fuse::new(stream),
25            interval,
26        }
27    }
28}
29
30impl<S: Stream> Stream for TimeoutRepeating<S> {
31    type Item = Result<S::Item, Elapsed>;
32
33    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        let mut me = self.project();
35
36        match me.stream.poll_next(cx) {
37            Poll::Ready(v) => {
38                if v.is_some() {
39                    me.interval.reset();
40                }
41                return Poll::Ready(v.map(Ok));
42            }
43            Poll::Pending => {}
44        };
45
46        ready!(me.interval.poll_tick(cx));
47        Poll::Ready(Some(Err(Elapsed::new())))
48    }
49
50    fn size_hint(&self) -> (usize, Option<usize>) {
51        let (lower, _) = self.stream.size_hint();
52
53        // The timeout stream may insert an error an infinite number of times.
54        (lower, None)
55    }
56}