tokio/io/util/
lines.rs

1use crate::io::util::read_line::read_line_internal;
2use crate::io::AsyncBufRead;
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10pin_project! {
11    /// Reads lines from an [`AsyncBufRead`].
12    ///
13    /// A `Lines` can be turned into a `Stream` with [`LinesStream`].
14    ///
15    /// This type is usually created using the [`lines`] method.
16    ///
17    /// [`AsyncBufRead`]: crate::io::AsyncBufRead
18    /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
19    /// [`lines`]: crate::io::AsyncBufReadExt::lines
20    #[derive(Debug)]
21    #[must_use = "streams do nothing unless polled"]
22    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
23    pub struct Lines<R> {
24        #[pin]
25        reader: R,
26        buf: String,
27        bytes: Vec<u8>,
28        read: usize,
29    }
30}
31
32pub(crate) fn lines<R>(reader: R) -> Lines<R>
33where
34    R: AsyncBufRead,
35{
36    Lines {
37        reader,
38        buf: String::new(),
39        bytes: Vec::new(),
40        read: 0,
41    }
42}
43
44impl<R> Lines<R>
45where
46    R: AsyncBufRead + Unpin,
47{
48    /// Returns the next line in the stream.
49    ///
50    /// # Cancel safety
51    ///
52    /// This method is cancellation safe.
53    ///
54    /// # Examples
55    ///
56    /// ```
57    /// # use tokio::io::AsyncBufRead;
58    /// use tokio::io::AsyncBufReadExt;
59    ///
60    /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
61    /// let mut lines = my_buf_read.lines();
62    ///
63    /// while let Some(line) = lines.next_line().await? {
64    ///     println!("length = {}", line.len())
65    /// }
66    /// # Ok(())
67    /// # }
68    /// ```
69    pub async fn next_line(&mut self) -> io::Result<Option<String>> {
70        use std::future::poll_fn;
71
72        poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
73    }
74
75    /// Obtains a mutable reference to the underlying reader.
76    pub fn get_mut(&mut self) -> &mut R {
77        &mut self.reader
78    }
79
80    /// Obtains a reference to the underlying reader.
81    pub fn get_ref(&mut self) -> &R {
82        &self.reader
83    }
84
85    /// Unwraps this `Lines<R>`, returning the underlying reader.
86    ///
87    /// Note that any leftover data in the internal buffer is lost.
88    /// Therefore, a following read from the underlying reader may lead to data loss.
89    pub fn into_inner(self) -> R {
90        self.reader
91    }
92}
93
94impl<R> Lines<R>
95where
96    R: AsyncBufRead,
97{
98    /// Polls for the next line in the stream.
99    ///
100    /// This method returns:
101    ///
102    ///  * `Poll::Pending` if the next line is not yet available.
103    ///  * `Poll::Ready(Ok(Some(line)))` if the next line is available.
104    ///  * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
105    ///  * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
106    ///
107    /// When the method returns `Poll::Pending`, the `Waker` in the provided
108    /// `Context` is scheduled to receive a wakeup when more bytes become
109    /// available on the underlying IO resource.  Note that on multiple calls to
110    /// `poll_next_line`, only the `Waker` from the `Context` passed to the most
111    /// recent call is scheduled to receive a wakeup.
112    pub fn poll_next_line(
113        self: Pin<&mut Self>,
114        cx: &mut Context<'_>,
115    ) -> Poll<io::Result<Option<String>>> {
116        let me = self.project();
117
118        let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
119        debug_assert_eq!(*me.read, 0);
120
121        if n == 0 && me.buf.is_empty() {
122            return Poll::Ready(Ok(None));
123        }
124
125        if me.buf.ends_with('\n') {
126            me.buf.pop();
127
128            if me.buf.ends_with('\r') {
129                me.buf.pop();
130            }
131        }
132
133        Poll::Ready(Ok(Some(mem::take(me.buf))))
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn assert_unpin() {
143        crate::is_unpin::<Lines<()>>();
144    }
145}