tokio/io/util/
split.rs

1use crate::io::util::read_until::read_until_internal;
2use crate::io::AsyncBufRead;
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10pin_project! {
11    /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method.
12    ///
13    /// A `Split` can be turned into a `Stream` with [`SplitStream`].
14    ///
15    /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html
16    #[derive(Debug)]
17    #[must_use = "streams do nothing unless polled"]
18    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19    pub struct Split<R> {
20        #[pin]
21        reader: R,
22        buf: Vec<u8>,
23        delim: u8,
24        read: usize,
25    }
26}
27
28pub(crate) fn split<R>(reader: R, delim: u8) -> Split<R>
29where
30    R: AsyncBufRead,
31{
32    Split {
33        reader,
34        buf: Vec::new(),
35        delim,
36        read: 0,
37    }
38}
39
40impl<R> Split<R>
41where
42    R: AsyncBufRead + Unpin,
43{
44    /// Returns the next segment in the stream.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// # use tokio::io::AsyncBufRead;
50    /// use tokio::io::AsyncBufReadExt;
51    ///
52    /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
53    /// let mut segments = my_buf_read.split(b'f');
54    ///
55    /// while let Some(segment) = segments.next_segment().await? {
56    ///     println!("length = {}", segment.len())
57    /// }
58    /// # Ok(())
59    /// # }
60    /// ```
61    pub async fn next_segment(&mut self) -> io::Result<Option<Vec<u8>>> {
62        use std::future::poll_fn;
63
64        poll_fn(|cx| Pin::new(&mut *self).poll_next_segment(cx)).await
65    }
66}
67
68impl<R> Split<R>
69where
70    R: AsyncBufRead,
71{
72    /// Polls for the next segment in the stream.
73    ///
74    /// This method returns:
75    ///
76    ///  * `Poll::Pending` if the next segment is not yet available.
77    ///  * `Poll::Ready(Ok(Some(segment)))` if the next segment is available.
78    ///  * `Poll::Ready(Ok(None))` if there are no more segments in this stream.
79    ///  * `Poll::Ready(Err(err))` if an IO error occurred while reading the
80    ///    next segment.
81    ///
82    /// When the method returns `Poll::Pending`, the `Waker` in the provided
83    /// `Context` is scheduled to receive a wakeup when more bytes become
84    /// available on the underlying IO resource.
85    ///
86    /// Note that on multiple calls to `poll_next_segment`, only the `Waker`
87    /// from the `Context` passed to the most recent call is scheduled to
88    /// receive a wakeup.
89    pub fn poll_next_segment(
90        self: Pin<&mut Self>,
91        cx: &mut Context<'_>,
92    ) -> Poll<io::Result<Option<Vec<u8>>>> {
93        let me = self.project();
94
95        let n = ready!(read_until_internal(
96            me.reader, cx, *me.delim, me.buf, me.read,
97        ))?;
98        // read_until_internal resets me.read to zero once it finds the delimiter
99        debug_assert_eq!(*me.read, 0);
100
101        if n == 0 && me.buf.is_empty() {
102            return Poll::Ready(Ok(None));
103        }
104
105        if me.buf.last() == Some(me.delim) {
106            me.buf.pop();
107        }
108
109        Poll::Ready(Ok(Some(mem::take(me.buf))))
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn assert_unpin() {
119        crate::is_unpin::<Split<()>>();
120    }
121}