futures_util/io/
lines.rs

1use super::read_line::read_line_internal;
2use futures_core::ready;
3use futures_core::stream::Stream;
4use futures_core::task::{Context, Poll};
5use futures_io::AsyncBufRead;
6use pin_project_lite::pin_project;
7use std::io;
8use std::mem;
9use std::pin::Pin;
10use std::string::String;
11use std::vec::Vec;
12
13pin_project! {
14    /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
15    #[derive(Debug)]
16    #[must_use = "streams do nothing unless polled"]
17    pub struct Lines<R> {
18        #[pin]
19        reader: R,
20        buf: String,
21        bytes: Vec<u8>,
22        read: usize,
23    }
24}
25
26impl<R: AsyncBufRead> Lines<R> {
27    pub(super) fn new(reader: R) -> Self {
28        Self { reader, buf: String::new(), bytes: Vec::new(), read: 0 }
29    }
30}
31
32impl<R: AsyncBufRead> Stream for Lines<R> {
33    type Item = io::Result<String>;
34
35    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36        let this = self.project();
37        let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?;
38        *this.read = 0;
39        if n == 0 && this.buf.is_empty() {
40            return Poll::Ready(None);
41        }
42        if this.buf.ends_with('\n') {
43            this.buf.pop();
44            if this.buf.ends_with('\r') {
45                this.buf.pop();
46            }
47        }
48        Poll::Ready(Some(Ok(mem::take(this.buf))))
49    }
50}