tokio/io/util/
read_until.rs

1use crate::io::AsyncBufRead;
2use crate::util::memchr;
3
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::io;
7use std::marker::PhantomPinned;
8use std::mem;
9use std::pin::Pin;
10use std::task::{ready, Context, Poll};
11
12pin_project! {
13    /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
14    /// The delimiter is included in the resulting vector.
15    #[derive(Debug)]
16    #[must_use = "futures do nothing unless you `.await` or poll them"]
17    pub struct ReadUntil<'a, R: ?Sized> {
18        reader: &'a mut R,
19        delimiter: u8,
20        buf: &'a mut Vec<u8>,
21        // The number of bytes appended to buf. This can be less than buf.len() if
22        // the buffer was not empty when the operation was started.
23        read: usize,
24        // Make this future `!Unpin` for compatibility with async trait methods.
25        #[pin]
26        _pin: PhantomPinned,
27    }
28}
29
30pub(crate) fn read_until<'a, R>(
31    reader: &'a mut R,
32    delimiter: u8,
33    buf: &'a mut Vec<u8>,
34) -> ReadUntil<'a, R>
35where
36    R: AsyncBufRead + ?Sized + Unpin,
37{
38    ReadUntil {
39        reader,
40        delimiter,
41        buf,
42        read: 0,
43        _pin: PhantomPinned,
44    }
45}
46
47pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
48    mut reader: Pin<&mut R>,
49    cx: &mut Context<'_>,
50    delimiter: u8,
51    buf: &mut Vec<u8>,
52    read: &mut usize,
53) -> Poll<io::Result<usize>> {
54    loop {
55        let (done, used) = {
56            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
57            if let Some(i) = memchr::memchr(delimiter, available) {
58                buf.extend_from_slice(&available[..=i]);
59                (true, i + 1)
60            } else {
61                buf.extend_from_slice(available);
62                (false, available.len())
63            }
64        };
65        reader.as_mut().consume(used);
66        *read += used;
67        if done || used == 0 {
68            return Poll::Ready(Ok(mem::replace(read, 0)));
69        }
70    }
71}
72
73impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
74    type Output = io::Result<usize>;
75
76    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77        let me = self.project();
78        read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read)
79    }
80}