tokio/io/util/
async_buf_read_ext.rs

1use crate::io::util::fill_buf::{fill_buf, FillBuf};
2use crate::io::util::lines::{lines, Lines};
3use crate::io::util::read_line::{read_line, ReadLine};
4use crate::io::util::read_until::{read_until, ReadUntil};
5use crate::io::util::split::{split, Split};
6use crate::io::AsyncBufRead;
7
8cfg_io_util! {
9    /// An extension trait which adds utility methods to [`AsyncBufRead`] types.
10    ///
11    /// [`AsyncBufRead`]: crate::io::AsyncBufRead
12    pub trait AsyncBufReadExt: AsyncBufRead {
13        /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
14        ///
15        /// Equivalent to:
16        ///
17        /// ```ignore
18        /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>;
19        /// ```
20        ///
21        /// This function will read bytes from the underlying stream until the
22        /// delimiter or EOF is found. Once found, all bytes up to, and including,
23        /// the delimiter (if found) will be appended to `buf`.
24        ///
25        /// If successful, this function will return the total number of bytes read.
26        ///
27        /// If this function returns `Ok(0)`, the stream has reached EOF.
28        ///
29        /// # Errors
30        ///
31        /// This function will ignore all instances of [`ErrorKind::Interrupted`] and
32        /// will otherwise return any errors returned by [`fill_buf`].
33        ///
34        /// If an I/O error is encountered then all bytes read so far will be
35        /// present in `buf` and its length will have been adjusted appropriately.
36        ///
37        /// [`fill_buf`]: AsyncBufRead::poll_fill_buf
38        /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
39        ///
40        /// # Cancel safety
41        ///
42        /// If the method is used as the event in a
43        /// [`tokio::select!`](crate::select) statement and some other branch
44        /// completes first, then some data may have been partially read. Any
45        /// partially read bytes are appended to `buf`, and the method can be
46        /// called again to continue reading until `byte`.
47        ///
48        /// This method returns the total number of bytes read. If you cancel
49        /// the call to `read_until` and then call it again to continue reading,
50        /// the counter is reset.
51        ///
52        /// # Examples
53        ///
54        /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
55        /// this example, we use [`Cursor`] to read all the bytes in a byte slice
56        /// in hyphen delimited segments:
57        ///
58        /// [`Cursor`]: std::io::Cursor
59        ///
60        /// ```
61        /// use tokio::io::AsyncBufReadExt;
62        ///
63        /// use std::io::Cursor;
64        ///
65        /// #[tokio::main]
66        /// async fn main() {
67        ///     let mut cursor = Cursor::new(b"lorem-ipsum");
68        ///     let mut buf = vec![];
69        ///
70        ///     // cursor is at 'l'
71        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
72        ///         .await
73        ///         .expect("reading from cursor won't fail");
74        ///
75        ///     assert_eq!(num_bytes, 6);
76        ///     assert_eq!(buf, b"lorem-");
77        ///     buf.clear();
78        ///
79        ///     // cursor is at 'i'
80        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
81        ///         .await
82        ///         .expect("reading from cursor won't fail");
83        ///
84        ///     assert_eq!(num_bytes, 5);
85        ///     assert_eq!(buf, b"ipsum");
86        ///     buf.clear();
87        ///
88        ///     // cursor is at EOF
89        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
90        ///         .await
91        ///         .expect("reading from cursor won't fail");
92        ///     assert_eq!(num_bytes, 0);
93        ///     assert_eq!(buf, b"");
94        /// }
95        /// ```
96        fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
97        where
98            Self: Unpin,
99        {
100            read_until(self, byte, buf)
101        }
102
103        /// Reads all bytes until a newline (the 0xA byte) is reached, and append
104        /// them to the provided buffer.
105        ///
106        /// Equivalent to:
107        ///
108        /// ```ignore
109        /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>;
110        /// ```
111        ///
112        /// This function will read bytes from the underlying stream until the
113        /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
114        /// up to, and including, the delimiter (if found) will be appended to
115        /// `buf`.
116        ///
117        /// If successful, this function will return the total number of bytes read.
118        ///
119        /// If this function returns `Ok(0)`, the stream has reached EOF.
120        ///
121        /// # Errors
122        ///
123        /// This function has the same error semantics as [`read_until`] and will
124        /// also return an error if the read bytes are not valid UTF-8. If an I/O
125        /// error is encountered then `buf` may contain some bytes already read in
126        /// the event that all data read so far was valid UTF-8.
127        ///
128        /// [`read_until`]: AsyncBufReadExt::read_until
129        ///
130        /// # Cancel safety
131        ///
132        /// This method is not cancellation safe. If the method is used as the
133        /// event in a [`tokio::select!`](crate::select) statement and some
134        /// other branch completes first, then some data may have been partially
135        /// read, and this data is lost. There are no guarantees regarding the
136        /// contents of `buf` when the call is cancelled. The current
137        /// implementation replaces `buf` with the empty string, but this may
138        /// change in the future.
139        ///
140        /// This function does not behave like [`read_until`] because of the
141        /// requirement that a string contains only valid utf-8. If you need a
142        /// cancellation safe `read_line`, there are three options:
143        ///
144        ///  * Call [`read_until`] with a newline character and manually perform the utf-8 check.
145        ///  * The stream returned by [`lines`] has a cancellation safe
146        ///    [`next_line`] method.
147        ///  * Use [`tokio_util::codec::LinesCodec`][LinesCodec].
148        ///
149        /// [LinesCodec]: https://docs.rs/tokio-util/latest/tokio_util/codec/struct.LinesCodec.html
150        /// [`read_until`]: Self::read_until
151        /// [`lines`]: Self::lines
152        /// [`next_line`]: crate::io::Lines::next_line
153        ///
154        /// # Examples
155        ///
156        /// [`std::io::Cursor`][`Cursor`] is a type that implements
157        /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the
158        /// lines in a byte slice:
159        ///
160        /// [`Cursor`]: std::io::Cursor
161        ///
162        /// ```
163        /// use tokio::io::AsyncBufReadExt;
164        ///
165        /// use std::io::Cursor;
166        ///
167        /// #[tokio::main]
168        /// async fn main() {
169        ///     let mut cursor = Cursor::new(b"foo\nbar");
170        ///     let mut buf = String::new();
171        ///
172        ///     // cursor is at 'f'
173        ///     let num_bytes = cursor.read_line(&mut buf)
174        ///         .await
175        ///         .expect("reading from cursor won't fail");
176        ///
177        ///     assert_eq!(num_bytes, 4);
178        ///     assert_eq!(buf, "foo\n");
179        ///     buf.clear();
180        ///
181        ///     // cursor is at 'b'
182        ///     let num_bytes = cursor.read_line(&mut buf)
183        ///         .await
184        ///         .expect("reading from cursor won't fail");
185        ///
186        ///     assert_eq!(num_bytes, 3);
187        ///     assert_eq!(buf, "bar");
188        ///     buf.clear();
189        ///
190        ///     // cursor is at EOF
191        ///     let num_bytes = cursor.read_line(&mut buf)
192        ///         .await
193        ///         .expect("reading from cursor won't fail");
194        ///
195        ///     assert_eq!(num_bytes, 0);
196        ///     assert_eq!(buf, "");
197        /// }
198        /// ```
199        fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
200        where
201            Self: Unpin,
202        {
203            read_line(self, buf)
204        }
205
206        /// Returns a stream of the contents of this reader split on the byte
207        /// `byte`.
208        ///
209        /// This method is the asynchronous equivalent to
210        /// [`BufRead::split`](std::io::BufRead::split).
211        ///
212        /// The stream returned from this function will yield instances of
213        /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have
214        /// the delimiter byte at the end.
215        ///
216        /// [`io::Result`]: std::io::Result
217        /// [`Option`]: core::option::Option
218        /// [`Vec<u8>`]: std::vec::Vec
219        ///
220        /// # Errors
221        ///
222        /// Each item of the stream has the same error semantics as
223        /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
224        ///
225        /// # Examples
226        ///
227        /// ```
228        /// # use tokio::io::AsyncBufRead;
229        /// use tokio::io::AsyncBufReadExt;
230        ///
231        /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
232        /// let mut segments = my_buf_read.split(b'f');
233        ///
234        /// while let Some(segment) = segments.next_segment().await? {
235        ///     println!("length = {}", segment.len())
236        /// }
237        /// # Ok(())
238        /// # }
239        /// ```
240        fn split(self, byte: u8) -> Split<Self>
241        where
242            Self: Sized + Unpin,
243        {
244            split(self, byte)
245        }
246
247        /// Returns the contents of the internal buffer, filling it with more
248        /// data from the inner reader if it is empty.
249        ///
250        /// This function is a lower-level call. It needs to be paired with the
251        /// [`consume`] method to function properly. When calling this method,
252        /// none of the contents will be "read" in the sense that later calling
253        /// `read` may return the same contents. As such, [`consume`] must be
254        /// called with the number of bytes that are consumed from this buffer
255        /// to ensure that the bytes are never returned twice.
256        ///
257        /// An empty buffer returned indicates that the stream has reached EOF.
258        ///
259        /// Equivalent to:
260        ///
261        /// ```ignore
262        /// async fn fill_buf(&mut self) -> io::Result<&[u8]>;
263        /// ```
264        ///
265        /// # Errors
266        ///
267        /// This function will return an I/O error if the underlying reader was
268        /// read, but returned an error.
269        ///
270        /// # Cancel safety
271        ///
272        /// This method is cancel safe. If you use it as the event in a
273        /// [`tokio::select!`](crate::select) statement and some other branch
274        /// completes first, then it is guaranteed that no data was read.
275        ///
276        /// [`consume`]: crate::io::AsyncBufReadExt::consume
277        fn fill_buf(&mut self) -> FillBuf<'_, Self>
278        where
279            Self: Unpin,
280        {
281            fill_buf(self)
282        }
283
284        /// Tells this buffer that `amt` bytes have been consumed from the
285        /// buffer, so they should no longer be returned in calls to [`read`].
286        ///
287        /// This function is a lower-level call. It needs to be paired with the
288        /// [`fill_buf`] method to function properly. This function does not
289        /// perform any I/O, it simply informs this object that some amount of
290        /// its buffer, returned from [`fill_buf`], has been consumed and should
291        /// no longer be returned. As such, this function may do odd things if
292        /// [`fill_buf`] isn't called before calling it.
293        ///
294        /// The `amt` must be less than the number of bytes in the buffer
295        /// returned by [`fill_buf`].
296        ///
297        /// [`read`]: crate::io::AsyncReadExt::read
298        /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf
299        fn consume(&mut self, amt: usize)
300        where
301            Self: Unpin,
302        {
303            std::pin::Pin::new(self).consume(amt);
304        }
305
306        /// Returns a stream over the lines of this reader.
307        /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
308        ///
309        /// The stream returned from this function will yield instances of
310        /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline
311        /// byte (the 0xA byte) or `CRLF` (0xD, 0xA bytes) at the end.
312        ///
313        /// [`io::Result`]: std::io::Result
314        /// [`Option`]: core::option::Option
315        /// [`String`]: String
316        ///
317        /// # Errors
318        ///
319        /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
320        ///
321        /// # Examples
322        ///
323        /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
324        /// this example, we use [`Cursor`] to iterate over all the lines in a byte
325        /// slice.
326        ///
327        /// [`Cursor`]: std::io::Cursor
328        ///
329        /// ```
330        /// use tokio::io::AsyncBufReadExt;
331        ///
332        /// use std::io::Cursor;
333        ///
334        /// #[tokio::main]
335        /// async fn main() {
336        ///     let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
337        ///
338        ///     let mut lines = cursor.lines();
339        ///
340        ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem")));
341        ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum")));
342        ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor")));
343        ///     assert_eq!(lines.next_line().await.unwrap(), None);
344        /// }
345        /// ```
346        ///
347        /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
348        fn lines(self) -> Lines<Self>
349        where
350            Self: Sized,
351        {
352            lines(self)
353        }
354    }
355}
356
357impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}