futures_util/io/
buf_reader.rs

1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::boxed::Box;
8use std::io::{self, Read};
9use std::pin::Pin;
10use std::vec;
11use std::{cmp, fmt};
12
13pin_project! {
14    /// The `BufReader` struct adds buffering to any reader.
15    ///
16    /// It can be excessively inefficient to work directly with a [`AsyncRead`]
17    /// instance. A `BufReader` performs large, infrequent reads on the underlying
18    /// [`AsyncRead`] and maintains an in-memory buffer of the results.
19    ///
20    /// `BufReader` can improve the speed of programs that make *small* and
21    /// *repeated* read calls to the same file or network socket. It does not
22    /// help when reading very large amounts at once, or reading just one or a few
23    /// times. It also provides no advantage when reading from a source that is
24    /// already in memory, like a `Vec<u8>`.
25    ///
26    /// When the `BufReader` is dropped, the contents of its buffer will be
27    /// discarded. Creating multiple instances of a `BufReader` on the same
28    /// stream can cause data loss.
29    ///
30    /// [`AsyncRead`]: futures_io::AsyncRead
31    ///
32    // TODO: Examples
33    pub struct BufReader<R> {
34        #[pin]
35        inner: R,
36        buffer: Box<[u8]>,
37        pos: usize,
38        cap: usize,
39    }
40}
41
42impl<R: AsyncRead> BufReader<R> {
43    /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
44    /// but may change in the future.
45    pub fn new(inner: R) -> Self {
46        Self::with_capacity(DEFAULT_BUF_SIZE, inner)
47    }
48
49    /// Creates a new `BufReader` with the specified buffer capacity.
50    pub fn with_capacity(capacity: usize, inner: R) -> Self {
51        // TODO: consider using Box<[u8]>::new_uninit_slice once it stabilized
52        let buffer = vec![0; capacity];
53        Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54    }
55}
56
57impl<R> BufReader<R> {
58    delegate_access_inner!(inner, R, ());
59
60    /// Returns a reference to the internally buffered data.
61    ///
62    /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
63    pub fn buffer(&self) -> &[u8] {
64        &self.buffer[self.pos..self.cap]
65    }
66
67    /// Invalidates all data in the internal buffer.
68    #[inline]
69    fn discard_buffer(self: Pin<&mut Self>) {
70        let this = self.project();
71        *this.pos = 0;
72        *this.cap = 0;
73    }
74}
75
76impl<R: AsyncRead + AsyncSeek> BufReader<R> {
77    /// Seeks relative to the current position. If the new position lies within the buffer,
78    /// the buffer will not be flushed, allowing for more efficient seeks.
79    /// This method does not return the location of the underlying reader, so the caller
80    /// must track this information themselves if it is required.
81    pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
82        SeeKRelative { inner: self, offset, first: true }
83    }
84
85    /// Attempts to seek relative to the current position. If the new position lies within the buffer,
86    /// the buffer will not be flushed, allowing for more efficient seeks.
87    /// This method does not return the location of the underlying reader, so the caller
88    /// must track this information themselves if it is required.
89    pub fn poll_seek_relative(
90        self: Pin<&mut Self>,
91        cx: &mut Context<'_>,
92        offset: i64,
93    ) -> Poll<io::Result<()>> {
94        let pos = self.pos as u64;
95        if offset < 0 {
96            if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
97                *self.project().pos = new_pos as usize;
98                return Poll::Ready(Ok(()));
99            }
100        } else if let Some(new_pos) = pos.checked_add(offset as u64) {
101            if new_pos <= self.cap as u64 {
102                *self.project().pos = new_pos as usize;
103                return Poll::Ready(Ok(()));
104            }
105        }
106        self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
107    }
108}
109
110impl<R: AsyncRead> AsyncRead for BufReader<R> {
111    fn poll_read(
112        mut self: Pin<&mut Self>,
113        cx: &mut Context<'_>,
114        buf: &mut [u8],
115    ) -> Poll<io::Result<usize>> {
116        // If we don't have any buffered data and we're doing a massive read
117        // (larger than our internal buffer), bypass our internal buffer
118        // entirely.
119        if self.pos == self.cap && buf.len() >= self.buffer.len() {
120            let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
121            self.discard_buffer();
122            return Poll::Ready(res);
123        }
124        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
125        let nread = rem.read(buf)?;
126        self.consume(nread);
127        Poll::Ready(Ok(nread))
128    }
129
130    fn poll_read_vectored(
131        mut self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133        bufs: &mut [IoSliceMut<'_>],
134    ) -> Poll<io::Result<usize>> {
135        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
136        if self.pos == self.cap && total_len >= self.buffer.len() {
137            let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
138            self.discard_buffer();
139            return Poll::Ready(res);
140        }
141        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
142        let nread = rem.read_vectored(bufs)?;
143        self.consume(nread);
144        Poll::Ready(Ok(nread))
145    }
146}
147
148impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
149    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
150        let this = self.project();
151
152        // If we've reached the end of our internal buffer then we need to fetch
153        // some more data from the underlying reader.
154        // Branch using `>=` instead of the more correct `==`
155        // to tell the compiler that the pos..cap slice is always valid.
156        if *this.pos >= *this.cap {
157            debug_assert!(*this.pos == *this.cap);
158            *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
159            *this.pos = 0;
160        }
161        Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
162    }
163
164    fn consume(self: Pin<&mut Self>, amt: usize) {
165        *self.project().pos = cmp::min(self.pos + amt, self.cap);
166    }
167}
168
169impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
170    delegate_async_write!(inner);
171}
172
173impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        f.debug_struct("BufReader")
176            .field("reader", &self.inner)
177            .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
178            .finish()
179    }
180}
181
182impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
183    /// Seek to an offset, in bytes, in the underlying reader.
184    ///
185    /// The position used for seeking with `SeekFrom::Current(_)` is the
186    /// position the underlying reader would be at if the `BufReader` had no
187    /// internal buffer.
188    ///
189    /// Seeking always discards the internal buffer, even if the seek position
190    /// would otherwise fall within it. This guarantees that calling
191    /// `.into_inner()` immediately after a seek yields the underlying reader
192    /// at the same position.
193    ///
194    /// To seek without discarding the internal buffer, use
195    /// [`BufReader::seek_relative`](BufReader::seek_relative) or
196    /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
197    ///
198    /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
199    ///
200    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
201    /// where `n` minus the internal buffer length overflows an `i64`, two
202    /// seeks will be performed instead of one. If the second seek returns
203    /// `Err`, the underlying reader will be left at the same position it would
204    /// have if you called `seek` with `SeekFrom::Current(0)`.
205    fn poll_seek(
206        mut self: Pin<&mut Self>,
207        cx: &mut Context<'_>,
208        pos: SeekFrom,
209    ) -> Poll<io::Result<u64>> {
210        let result: u64;
211        if let SeekFrom::Current(n) = pos {
212            let remainder = (self.cap - self.pos) as i64;
213            // it should be safe to assume that remainder fits within an i64 as the alternative
214            // means we managed to allocate 8 exbibytes and that's absurd.
215            // But it's not out of the realm of possibility for some weird underlying reader to
216            // support seeking by i64::MIN so we need to handle underflow when subtracting
217            // remainder.
218            if let Some(offset) = n.checked_sub(remainder) {
219                result =
220                    ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
221            } else {
222                // seek backwards by our remainder, and then by the offset
223                ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
224                self.as_mut().discard_buffer();
225                result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
226            }
227        } else {
228            // Seeking with Start/End doesn't care about our buffer length.
229            result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
230        }
231        self.discard_buffer();
232        Poll::Ready(Ok(result))
233    }
234}
235
236/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
237#[derive(Debug)]
238#[must_use = "futures do nothing unless polled"]
239pub struct SeeKRelative<'a, R> {
240    inner: Pin<&'a mut BufReader<R>>,
241    offset: i64,
242    first: bool,
243}
244
245impl<R> Future for SeeKRelative<'_, R>
246where
247    R: AsyncRead + AsyncSeek,
248{
249    type Output = io::Result<()>;
250
251    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252        let offset = self.offset;
253        if self.first {
254            self.first = false;
255            self.inner.as_mut().poll_seek_relative(cx, offset)
256        } else {
257            self.inner
258                .as_mut()
259                .as_mut()
260                .poll_seek(cx, SeekFrom::Current(offset))
261                .map(|res| res.map(|_| ()))
262        }
263    }
264}