tokio_stream/stream_ext/
chunks_timeout.rs

1use crate::stream_ext::Fuse;
2use crate::Stream;
3use tokio::time::{sleep, Sleep};
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9use std::time::Duration;
10
11pin_project! {
12    /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
13    #[must_use = "streams do nothing unless polled"]
14    #[derive(Debug)]
15    pub struct ChunksTimeout<S: Stream> {
16        #[pin]
17        stream: Fuse<S>,
18        #[pin]
19        deadline: Option<Sleep>,
20        duration: Duration,
21        items: Vec<S::Item>,
22        cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
23    }
24}
25
26impl<S: Stream> ChunksTimeout<S> {
27    pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28        ChunksTimeout {
29            stream: Fuse::new(stream),
30            deadline: None,
31            duration,
32            items: Vec::with_capacity(max_size),
33            cap: max_size,
34        }
35    }
36}
37
38impl<S: Stream> Stream for ChunksTimeout<S> {
39    type Item = Vec<S::Item>;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let mut me = self.as_mut().project();
43        loop {
44            match me.stream.as_mut().poll_next(cx) {
45                Poll::Pending => break,
46                Poll::Ready(Some(item)) => {
47                    if me.items.is_empty() {
48                        me.deadline.set(Some(sleep(*me.duration)));
49                        me.items.reserve_exact(*me.cap);
50                    }
51                    me.items.push(item);
52                    if me.items.len() >= *me.cap {
53                        return Poll::Ready(Some(std::mem::take(me.items)));
54                    }
55                }
56                Poll::Ready(None) => {
57                    // Returning Some here is only correct because we fuse the inner stream.
58                    let last = if me.items.is_empty() {
59                        None
60                    } else {
61                        Some(std::mem::take(me.items))
62                    };
63
64                    return Poll::Ready(last);
65                }
66            }
67        }
68
69        if !me.items.is_empty() {
70            if let Some(deadline) = me.deadline.as_pin_mut() {
71                ready!(deadline.poll(cx));
72            }
73            return Poll::Ready(Some(std::mem::take(me.items)));
74        }
75
76        Poll::Pending
77    }
78
79    fn size_hint(&self) -> (usize, Option<usize>) {
80        let chunk_len = if self.items.is_empty() { 0 } else { 1 };
81        let (lower, upper) = self.stream.size_hint();
82        let lower = (lower / self.cap).saturating_add(chunk_len);
83        let upper = upper.and_then(|x| x.checked_add(chunk_len));
84        (lower, upper)
85    }
86}