futures_util/io/buf_reader.rs
1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::boxed::Box;
8use std::io::{self, Read};
9use std::pin::Pin;
10use std::vec;
11use std::{cmp, fmt};
12
13pin_project! {
14 /// The `BufReader` struct adds buffering to any reader.
15 ///
16 /// It can be excessively inefficient to work directly with a [`AsyncRead`]
17 /// instance. A `BufReader` performs large, infrequent reads on the underlying
18 /// [`AsyncRead`] and maintains an in-memory buffer of the results.
19 ///
20 /// `BufReader` can improve the speed of programs that make *small* and
21 /// *repeated* read calls to the same file or network socket. It does not
22 /// help when reading very large amounts at once, or reading just one or a few
23 /// times. It also provides no advantage when reading from a source that is
24 /// already in memory, like a `Vec<u8>`.
25 ///
26 /// When the `BufReader` is dropped, the contents of its buffer will be
27 /// discarded. Creating multiple instances of a `BufReader` on the same
28 /// stream can cause data loss.
29 ///
30 /// [`AsyncRead`]: futures_io::AsyncRead
31 ///
32 // TODO: Examples
33 pub struct BufReader<R> {
34 #[pin]
35 inner: R,
36 buffer: Box<[u8]>,
37 pos: usize,
38 cap: usize,
39 }
40}
41
42impl<R: AsyncRead> BufReader<R> {
43 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
44 /// but may change in the future.
45 pub fn new(inner: R) -> Self {
46 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
47 }
48
49 /// Creates a new `BufReader` with the specified buffer capacity.
50 pub fn with_capacity(capacity: usize, inner: R) -> Self {
51 // TODO: consider using Box<[u8]>::new_uninit_slice once it stabilized
52 let buffer = vec![0; capacity];
53 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54 }
55}
56
57impl<R> BufReader<R> {
58 delegate_access_inner!(inner, R, ());
59
60 /// Returns a reference to the internally buffered data.
61 ///
62 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
63 pub fn buffer(&self) -> &[u8] {
64 &self.buffer[self.pos..self.cap]
65 }
66
67 /// Invalidates all data in the internal buffer.
68 #[inline]
69 fn discard_buffer(self: Pin<&mut Self>) {
70 let this = self.project();
71 *this.pos = 0;
72 *this.cap = 0;
73 }
74}
75
76impl<R: AsyncRead + AsyncSeek> BufReader<R> {
77 /// Seeks relative to the current position. If the new position lies within the buffer,
78 /// the buffer will not be flushed, allowing for more efficient seeks.
79 /// This method does not return the location of the underlying reader, so the caller
80 /// must track this information themselves if it is required.
81 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
82 SeeKRelative { inner: self, offset, first: true }
83 }
84
85 /// Attempts to seek relative to the current position. If the new position lies within the buffer,
86 /// the buffer will not be flushed, allowing for more efficient seeks.
87 /// This method does not return the location of the underlying reader, so the caller
88 /// must track this information themselves if it is required.
89 pub fn poll_seek_relative(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 offset: i64,
93 ) -> Poll<io::Result<()>> {
94 let pos = self.pos as u64;
95 if offset < 0 {
96 if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
97 *self.project().pos = new_pos as usize;
98 return Poll::Ready(Ok(()));
99 }
100 } else if let Some(new_pos) = pos.checked_add(offset as u64) {
101 if new_pos <= self.cap as u64 {
102 *self.project().pos = new_pos as usize;
103 return Poll::Ready(Ok(()));
104 }
105 }
106 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
107 }
108}
109
110impl<R: AsyncRead> AsyncRead for BufReader<R> {
111 fn poll_read(
112 mut self: Pin<&mut Self>,
113 cx: &mut Context<'_>,
114 buf: &mut [u8],
115 ) -> Poll<io::Result<usize>> {
116 // If we don't have any buffered data and we're doing a massive read
117 // (larger than our internal buffer), bypass our internal buffer
118 // entirely.
119 if self.pos == self.cap && buf.len() >= self.buffer.len() {
120 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
121 self.discard_buffer();
122 return Poll::Ready(res);
123 }
124 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
125 let nread = rem.read(buf)?;
126 self.consume(nread);
127 Poll::Ready(Ok(nread))
128 }
129
130 fn poll_read_vectored(
131 mut self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 bufs: &mut [IoSliceMut<'_>],
134 ) -> Poll<io::Result<usize>> {
135 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
136 if self.pos == self.cap && total_len >= self.buffer.len() {
137 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
138 self.discard_buffer();
139 return Poll::Ready(res);
140 }
141 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
142 let nread = rem.read_vectored(bufs)?;
143 self.consume(nread);
144 Poll::Ready(Ok(nread))
145 }
146}
147
148impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
149 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
150 let this = self.project();
151
152 // If we've reached the end of our internal buffer then we need to fetch
153 // some more data from the underlying reader.
154 // Branch using `>=` instead of the more correct `==`
155 // to tell the compiler that the pos..cap slice is always valid.
156 if *this.pos >= *this.cap {
157 debug_assert!(*this.pos == *this.cap);
158 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
159 *this.pos = 0;
160 }
161 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
162 }
163
164 fn consume(self: Pin<&mut Self>, amt: usize) {
165 *self.project().pos = cmp::min(self.pos + amt, self.cap);
166 }
167}
168
169impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
170 delegate_async_write!(inner);
171}
172
173impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_struct("BufReader")
176 .field("reader", &self.inner)
177 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
178 .finish()
179 }
180}
181
182impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
183 /// Seek to an offset, in bytes, in the underlying reader.
184 ///
185 /// The position used for seeking with `SeekFrom::Current(_)` is the
186 /// position the underlying reader would be at if the `BufReader` had no
187 /// internal buffer.
188 ///
189 /// Seeking always discards the internal buffer, even if the seek position
190 /// would otherwise fall within it. This guarantees that calling
191 /// `.into_inner()` immediately after a seek yields the underlying reader
192 /// at the same position.
193 ///
194 /// To seek without discarding the internal buffer, use
195 /// [`BufReader::seek_relative`](BufReader::seek_relative) or
196 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
197 ///
198 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
199 ///
200 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
201 /// where `n` minus the internal buffer length overflows an `i64`, two
202 /// seeks will be performed instead of one. If the second seek returns
203 /// `Err`, the underlying reader will be left at the same position it would
204 /// have if you called `seek` with `SeekFrom::Current(0)`.
205 fn poll_seek(
206 mut self: Pin<&mut Self>,
207 cx: &mut Context<'_>,
208 pos: SeekFrom,
209 ) -> Poll<io::Result<u64>> {
210 let result: u64;
211 if let SeekFrom::Current(n) = pos {
212 let remainder = (self.cap - self.pos) as i64;
213 // it should be safe to assume that remainder fits within an i64 as the alternative
214 // means we managed to allocate 8 exbibytes and that's absurd.
215 // But it's not out of the realm of possibility for some weird underlying reader to
216 // support seeking by i64::MIN so we need to handle underflow when subtracting
217 // remainder.
218 if let Some(offset) = n.checked_sub(remainder) {
219 result =
220 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
221 } else {
222 // seek backwards by our remainder, and then by the offset
223 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
224 self.as_mut().discard_buffer();
225 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
226 }
227 } else {
228 // Seeking with Start/End doesn't care about our buffer length.
229 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
230 }
231 self.discard_buffer();
232 Poll::Ready(Ok(result))
233 }
234}
235
236/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
237#[derive(Debug)]
238#[must_use = "futures do nothing unless polled"]
239pub struct SeeKRelative<'a, R> {
240 inner: Pin<&'a mut BufReader<R>>,
241 offset: i64,
242 first: bool,
243}
244
245impl<R> Future for SeeKRelative<'_, R>
246where
247 R: AsyncRead + AsyncSeek,
248{
249 type Output = io::Result<()>;
250
251 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252 let offset = self.offset;
253 if self.first {
254 self.first = false;
255 self.inner.as_mut().poll_seek_relative(cx, offset)
256 } else {
257 self.inner
258 .as_mut()
259 .as_mut()
260 .poll_seek(cx, SeekFrom::Current(offset))
261 .map(|res| res.map(|_| ()))
262 }
263 }
264}