hyper/proto/h1/
io.rs

1use std::cmp;
2use std::fmt;
3use std::io::{self, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::rt::{Read, ReadBuf, Write};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use futures_util::ready;
10
11use super::{Http1Transaction, ParseContext, ParsedMessage};
12use crate::common::buf::BufList;
13
14/// The initial buffer size allocated before trying to read from IO.
15pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
16
17/// The minimum value that can be set to max buffer size.
18pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
19
20/// The default maximum read buffer size. If the buffer gets this big and
21/// a message is still not complete, a `TooLarge` error is triggered.
22// Note: if this changes, update server::conn::Http::max_buf_size docs.
23pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
24
25/// The maximum number of distinct `Buf`s to hold in a list before requiring
26/// a flush. Only affects when the buffer strategy is to queue buffers.
27///
28/// Note that a flush can happen before reaching the maximum. This simply
29/// forces a flush if the queue gets this big.
30const MAX_BUF_LIST_BUFFERS: usize = 16;
31
32pub(crate) struct Buffered<T, B> {
33    flush_pipeline: bool,
34    io: T,
35    partial_len: Option<usize>,
36    read_blocked: bool,
37    read_buf: BytesMut,
38    read_buf_strategy: ReadStrategy,
39    write_buf: WriteBuf<B>,
40}
41
42impl<T, B> fmt::Debug for Buffered<T, B>
43where
44    B: Buf,
45{
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        f.debug_struct("Buffered")
48            .field("read_buf", &self.read_buf)
49            .field("write_buf", &self.write_buf)
50            .finish()
51    }
52}
53
54impl<T, B> Buffered<T, B>
55where
56    T: Read + Write + Unpin,
57    B: Buf,
58{
59    pub(crate) fn new(io: T) -> Buffered<T, B> {
60        let strategy = if io.is_write_vectored() {
61            WriteStrategy::Queue
62        } else {
63            WriteStrategy::Flatten
64        };
65        let write_buf = WriteBuf::new(strategy);
66        Buffered {
67            flush_pipeline: false,
68            io,
69            partial_len: None,
70            read_blocked: false,
71            read_buf: BytesMut::with_capacity(0),
72            read_buf_strategy: ReadStrategy::default(),
73            write_buf,
74        }
75    }
76
77    #[cfg(feature = "server")]
78    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
79        debug_assert!(!self.write_buf.has_remaining());
80        self.flush_pipeline = enabled;
81        if enabled {
82            self.set_write_strategy_flatten();
83        }
84    }
85
86    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
87        assert!(
88            max >= MINIMUM_MAX_BUFFER_SIZE,
89            "The max_buf_size cannot be smaller than {}.",
90            MINIMUM_MAX_BUFFER_SIZE,
91        );
92        self.read_buf_strategy = ReadStrategy::with_max(max);
93        self.write_buf.max_buf_size = max;
94    }
95
96    #[cfg(feature = "client")]
97    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
98        self.read_buf_strategy = ReadStrategy::Exact(sz);
99    }
100
101    pub(crate) fn set_write_strategy_flatten(&mut self) {
102        // this should always be called only at construction time,
103        // so this assert is here to catch myself
104        debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
105        self.write_buf.set_strategy(WriteStrategy::Flatten);
106    }
107
108    pub(crate) fn set_write_strategy_queue(&mut self) {
109        // this should always be called only at construction time,
110        // so this assert is here to catch myself
111        debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
112        self.write_buf.set_strategy(WriteStrategy::Queue);
113    }
114
115    pub(crate) fn read_buf(&self) -> &[u8] {
116        self.read_buf.as_ref()
117    }
118
119    #[cfg(test)]
120    #[cfg(feature = "nightly")]
121    pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
122        &mut self.read_buf
123    }
124
125    /// Return the "allocated" available space, not the potential space
126    /// that could be allocated in the future.
127    fn read_buf_remaining_mut(&self) -> usize {
128        self.read_buf.capacity() - self.read_buf.len()
129    }
130
131    /// Return whether we can append to the headers buffer.
132    ///
133    /// Reasons we can't:
134    /// - The write buf is in queue mode, and some of the past body is still
135    ///   needing to be flushed.
136    pub(crate) fn can_headers_buf(&self) -> bool {
137        !self.write_buf.queue.has_remaining()
138    }
139
140    pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
141        let buf = self.write_buf.headers_mut();
142        &mut buf.bytes
143    }
144
145    pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
146        &mut self.write_buf
147    }
148
149    pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
150        self.write_buf.buffer(buf)
151    }
152
153    pub(crate) fn can_buffer(&self) -> bool {
154        self.flush_pipeline || self.write_buf.can_buffer()
155    }
156
157    pub(crate) fn consume_leading_lines(&mut self) {
158        if !self.read_buf.is_empty() {
159            let mut i = 0;
160            while i < self.read_buf.len() {
161                match self.read_buf[i] {
162                    b'\r' | b'\n' => i += 1,
163                    _ => break,
164                }
165            }
166            self.read_buf.advance(i);
167        }
168    }
169
170    pub(super) fn parse<S>(
171        &mut self,
172        cx: &mut Context<'_>,
173        parse_ctx: ParseContext<'_>,
174    ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
175    where
176        S: Http1Transaction,
177    {
178        loop {
179            match super::role::parse_headers::<S>(
180                &mut self.read_buf,
181                self.partial_len,
182                ParseContext {
183                    cached_headers: parse_ctx.cached_headers,
184                    req_method: parse_ctx.req_method,
185                    h1_parser_config: parse_ctx.h1_parser_config.clone(),
186                    h1_max_headers: parse_ctx.h1_max_headers,
187                    preserve_header_case: parse_ctx.preserve_header_case,
188                    #[cfg(feature = "ffi")]
189                    preserve_header_order: parse_ctx.preserve_header_order,
190                    h09_responses: parse_ctx.h09_responses,
191                    #[cfg(feature = "ffi")]
192                    on_informational: parse_ctx.on_informational,
193                },
194            )? {
195                Some(msg) => {
196                    debug!("parsed {} headers", msg.head.headers.len());
197                    self.partial_len = None;
198                    return Poll::Ready(Ok(msg));
199                }
200                None => {
201                    let max = self.read_buf_strategy.max();
202                    let curr_len = self.read_buf.len();
203                    if curr_len >= max {
204                        debug!("max_buf_size ({}) reached, closing", max);
205                        return Poll::Ready(Err(crate::Error::new_too_large()));
206                    }
207                    if curr_len > 0 {
208                        trace!("partial headers; {} bytes so far", curr_len);
209                        self.partial_len = Some(curr_len);
210                    } else {
211                        // 1xx gobled some bytes
212                        self.partial_len = None;
213                    }
214                }
215            }
216            if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
217                trace!("parse eof");
218                return Poll::Ready(Err(crate::Error::new_incomplete()));
219            }
220        }
221    }
222
223    pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
224        self.read_blocked = false;
225        let next = self.read_buf_strategy.next();
226        if self.read_buf_remaining_mut() < next {
227            self.read_buf.reserve(next);
228        }
229
230        // SAFETY: ReadBuf and poll_read promise not to set any uninitialized
231        // bytes onto `dst`.
232        let dst = unsafe { self.read_buf.chunk_mut().as_uninit_slice_mut() };
233        let mut buf = ReadBuf::uninit(dst);
234        match Pin::new(&mut self.io).poll_read(cx, buf.unfilled()) {
235            Poll::Ready(Ok(_)) => {
236                let n = buf.filled().len();
237                trace!("received {} bytes", n);
238                unsafe {
239                    // Safety: we just read that many bytes into the
240                    // uninitialized part of the buffer, so this is okay.
241                    // @tokio pls give me back `poll_read_buf` thanks
242                    self.read_buf.advance_mut(n);
243                }
244                self.read_buf_strategy.record(n);
245                Poll::Ready(Ok(n))
246            }
247            Poll::Pending => {
248                self.read_blocked = true;
249                Poll::Pending
250            }
251            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
252        }
253    }
254
255    pub(crate) fn into_inner(self) -> (T, Bytes) {
256        (self.io, self.read_buf.freeze())
257    }
258
259    pub(crate) fn io_mut(&mut self) -> &mut T {
260        &mut self.io
261    }
262
263    pub(crate) fn is_read_blocked(&self) -> bool {
264        self.read_blocked
265    }
266
267    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
268        if self.flush_pipeline && !self.read_buf.is_empty() {
269            Poll::Ready(Ok(()))
270        } else if self.write_buf.remaining() == 0 {
271            Pin::new(&mut self.io).poll_flush(cx)
272        } else {
273            if let WriteStrategy::Flatten = self.write_buf.strategy {
274                return self.poll_flush_flattened(cx);
275            }
276
277            const MAX_WRITEV_BUFS: usize = 64;
278            loop {
279                let n = {
280                    let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
281                    let len = self.write_buf.chunks_vectored(&mut iovs);
282                    ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
283                };
284                // TODO(eliza): we have to do this manually because
285                // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
286                // `poll_write_buf` comes back, the manual advance will need to leave!
287                self.write_buf.advance(n);
288                debug!("flushed {} bytes", n);
289                if self.write_buf.remaining() == 0 {
290                    break;
291                } else if n == 0 {
292                    trace!(
293                        "write returned zero, but {} bytes remaining",
294                        self.write_buf.remaining()
295                    );
296                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
297                }
298            }
299            Pin::new(&mut self.io).poll_flush(cx)
300        }
301    }
302
303    /// Specialized version of `flush` when strategy is Flatten.
304    ///
305    /// Since all buffered bytes are flattened into the single headers buffer,
306    /// that skips some bookkeeping around using multiple buffers.
307    fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
308        loop {
309            let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
310            debug!("flushed {} bytes", n);
311            self.write_buf.headers.advance(n);
312            if self.write_buf.headers.remaining() == 0 {
313                self.write_buf.headers.reset();
314                break;
315            } else if n == 0 {
316                trace!(
317                    "write returned zero, but {} bytes remaining",
318                    self.write_buf.remaining()
319                );
320                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
321            }
322        }
323        Pin::new(&mut self.io).poll_flush(cx)
324    }
325
326    #[cfg(test)]
327    fn flush(&mut self) -> impl std::future::Future<Output = io::Result<()>> + '_ {
328        futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
329    }
330}
331
332// The `B` is a `Buf`, we never project a pin to it
333impl<T: Unpin, B> Unpin for Buffered<T, B> {}
334
335// TODO: This trait is old... at least rename to PollBytes or something...
336pub(crate) trait MemRead {
337    fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
338}
339
340impl<T, B> MemRead for Buffered<T, B>
341where
342    T: Read + Write + Unpin,
343    B: Buf,
344{
345    fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
346        if !self.read_buf.is_empty() {
347            let n = std::cmp::min(len, self.read_buf.len());
348            Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
349        } else {
350            let n = ready!(self.poll_read_from_io(cx))?;
351            Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
352        }
353    }
354}
355
356#[derive(Clone, Copy, Debug)]
357enum ReadStrategy {
358    Adaptive {
359        decrease_now: bool,
360        next: usize,
361        max: usize,
362    },
363    #[cfg(feature = "client")]
364    Exact(usize),
365}
366
367impl ReadStrategy {
368    fn with_max(max: usize) -> ReadStrategy {
369        ReadStrategy::Adaptive {
370            decrease_now: false,
371            next: INIT_BUFFER_SIZE,
372            max,
373        }
374    }
375
376    fn next(&self) -> usize {
377        match *self {
378            ReadStrategy::Adaptive { next, .. } => next,
379            #[cfg(feature = "client")]
380            ReadStrategy::Exact(exact) => exact,
381        }
382    }
383
384    fn max(&self) -> usize {
385        match *self {
386            ReadStrategy::Adaptive { max, .. } => max,
387            #[cfg(feature = "client")]
388            ReadStrategy::Exact(exact) => exact,
389        }
390    }
391
392    fn record(&mut self, bytes_read: usize) {
393        match *self {
394            ReadStrategy::Adaptive {
395                ref mut decrease_now,
396                ref mut next,
397                max,
398                ..
399            } => {
400                if bytes_read >= *next {
401                    *next = cmp::min(incr_power_of_two(*next), max);
402                    *decrease_now = false;
403                } else {
404                    let decr_to = prev_power_of_two(*next);
405                    if bytes_read < decr_to {
406                        if *decrease_now {
407                            *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
408                            *decrease_now = false;
409                        } else {
410                            // Decreasing is a two "record" process.
411                            *decrease_now = true;
412                        }
413                    } else {
414                        // A read within the current range should cancel
415                        // a potential decrease, since we just saw proof
416                        // that we still need this size.
417                        *decrease_now = false;
418                    }
419                }
420            }
421            #[cfg(feature = "client")]
422            ReadStrategy::Exact(_) => (),
423        }
424    }
425}
426
427fn incr_power_of_two(n: usize) -> usize {
428    n.saturating_mul(2)
429}
430
431fn prev_power_of_two(n: usize) -> usize {
432    // Only way this shift can underflow is if n is less than 4.
433    // (Which would means `usize::MAX >> 64` and underflowed!)
434    debug_assert!(n >= 4);
435    (usize::MAX >> (n.leading_zeros() + 2)) + 1
436}
437
438impl Default for ReadStrategy {
439    fn default() -> ReadStrategy {
440        ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
441    }
442}
443
444#[derive(Clone)]
445pub(crate) struct Cursor<T> {
446    bytes: T,
447    pos: usize,
448}
449
450impl<T: AsRef<[u8]>> Cursor<T> {
451    #[inline]
452    pub(crate) fn new(bytes: T) -> Cursor<T> {
453        Cursor { bytes, pos: 0 }
454    }
455}
456
457impl Cursor<Vec<u8>> {
458    /// If we've advanced the position a bit in this cursor, and wish to
459    /// extend the underlying vector, we may wish to unshift the "read" bytes
460    /// off, and move everything else over.
461    fn maybe_unshift(&mut self, additional: usize) {
462        if self.pos == 0 {
463            // nothing to do
464            return;
465        }
466
467        if self.bytes.capacity() - self.bytes.len() >= additional {
468            // there's room!
469            return;
470        }
471
472        self.bytes.drain(0..self.pos);
473        self.pos = 0;
474    }
475
476    fn reset(&mut self) {
477        self.pos = 0;
478        self.bytes.clear();
479    }
480}
481
482impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("Cursor")
485            .field("pos", &self.pos)
486            .field("len", &self.bytes.as_ref().len())
487            .finish()
488    }
489}
490
491impl<T: AsRef<[u8]>> Buf for Cursor<T> {
492    #[inline]
493    fn remaining(&self) -> usize {
494        self.bytes.as_ref().len() - self.pos
495    }
496
497    #[inline]
498    fn chunk(&self) -> &[u8] {
499        &self.bytes.as_ref()[self.pos..]
500    }
501
502    #[inline]
503    fn advance(&mut self, cnt: usize) {
504        debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
505        self.pos += cnt;
506    }
507}
508
509// an internal buffer to collect writes before flushes
510pub(super) struct WriteBuf<B> {
511    /// Re-usable buffer that holds message headers
512    headers: Cursor<Vec<u8>>,
513    max_buf_size: usize,
514    /// Deque of user buffers if strategy is Queue
515    queue: BufList<B>,
516    strategy: WriteStrategy,
517}
518
519impl<B: Buf> WriteBuf<B> {
520    fn new(strategy: WriteStrategy) -> WriteBuf<B> {
521        WriteBuf {
522            headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
523            max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
524            queue: BufList::new(),
525            strategy,
526        }
527    }
528}
529
530impl<B> WriteBuf<B>
531where
532    B: Buf,
533{
534    fn set_strategy(&mut self, strategy: WriteStrategy) {
535        self.strategy = strategy;
536    }
537
538    pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
539        debug_assert!(buf.has_remaining());
540        match self.strategy {
541            WriteStrategy::Flatten => {
542                let head = self.headers_mut();
543
544                head.maybe_unshift(buf.remaining());
545                trace!(
546                    self.len = head.remaining(),
547                    buf.len = buf.remaining(),
548                    "buffer.flatten"
549                );
550                //perf: This is a little faster than <Vec as BufMut>>::put,
551                //but accomplishes the same result.
552                loop {
553                    let adv = {
554                        let slice = buf.chunk();
555                        if slice.is_empty() {
556                            return;
557                        }
558                        head.bytes.extend_from_slice(slice);
559                        slice.len()
560                    };
561                    buf.advance(adv);
562                }
563            }
564            WriteStrategy::Queue => {
565                trace!(
566                    self.len = self.remaining(),
567                    buf.len = buf.remaining(),
568                    "buffer.queue"
569                );
570                self.queue.push(buf.into());
571            }
572        }
573    }
574
575    fn can_buffer(&self) -> bool {
576        match self.strategy {
577            WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
578            WriteStrategy::Queue => {
579                self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
580            }
581        }
582    }
583
584    fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
585        debug_assert!(!self.queue.has_remaining());
586        &mut self.headers
587    }
588}
589
590impl<B: Buf> fmt::Debug for WriteBuf<B> {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        f.debug_struct("WriteBuf")
593            .field("remaining", &self.remaining())
594            .field("strategy", &self.strategy)
595            .finish()
596    }
597}
598
599impl<B: Buf> Buf for WriteBuf<B> {
600    #[inline]
601    fn remaining(&self) -> usize {
602        self.headers.remaining() + self.queue.remaining()
603    }
604
605    #[inline]
606    fn chunk(&self) -> &[u8] {
607        let headers = self.headers.chunk();
608        if !headers.is_empty() {
609            headers
610        } else {
611            self.queue.chunk()
612        }
613    }
614
615    #[inline]
616    fn advance(&mut self, cnt: usize) {
617        let hrem = self.headers.remaining();
618
619        match hrem.cmp(&cnt) {
620            cmp::Ordering::Equal => self.headers.reset(),
621            cmp::Ordering::Greater => self.headers.advance(cnt),
622            cmp::Ordering::Less => {
623                let qcnt = cnt - hrem;
624                self.headers.reset();
625                self.queue.advance(qcnt);
626            }
627        }
628    }
629
630    #[inline]
631    fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
632        let n = self.headers.chunks_vectored(dst);
633        self.queue.chunks_vectored(&mut dst[n..]) + n
634    }
635}
636
637#[derive(Debug)]
638enum WriteStrategy {
639    Flatten,
640    Queue,
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646    use crate::common::io::Compat;
647    use std::time::Duration;
648
649    use tokio_test::io::Builder as Mock;
650
651    // #[cfg(feature = "nightly")]
652    // use test::Bencher;
653
654    /*
655    impl<T: Read> MemRead for AsyncIo<T> {
656        fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
657            let mut v = vec![0; len];
658            let n = try_nb!(self.read(v.as_mut_slice()));
659            Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
660        }
661    }
662    */
663
664    #[tokio::test]
665    #[ignore]
666    async fn iobuf_write_empty_slice() {
667        // TODO(eliza): can i have writev back pls T_T
668        // // First, let's just check that the Mock would normally return an
669        // // error on an unexpected write, even if the buffer is empty...
670        // let mut mock = Mock::new().build();
671        // futures_util::future::poll_fn(|cx| {
672        //     Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
673        // })
674        // .await
675        // .expect_err("should be a broken pipe");
676
677        // // underlying io will return the logic error upon write,
678        // // so we are testing that the io_buf does not trigger a write
679        // // when there is nothing to flush
680        // let mock = Mock::new().build();
681        // let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
682        // io_buf.flush().await.expect("should short-circuit flush");
683    }
684
685    #[cfg(not(miri))]
686    #[tokio::test]
687    async fn parse_reads_until_blocked() {
688        use crate::proto::h1::ClientTransaction;
689
690        let _ = pretty_env_logger::try_init();
691        let mock = Mock::new()
692            // Split over multiple reads will read all of it
693            .read(b"HTTP/1.1 200 OK\r\n")
694            .read(b"Server: hyper\r\n")
695            // missing last line ending
696            .wait(Duration::from_secs(1))
697            .build();
698
699        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
700
701        // We expect a `parse` to be not ready, and so can't await it directly.
702        // Rather, this `poll_fn` will wrap the `Poll` result.
703        futures_util::future::poll_fn(|cx| {
704            let parse_ctx = ParseContext {
705                cached_headers: &mut None,
706                req_method: &mut None,
707                h1_parser_config: Default::default(),
708                h1_max_headers: None,
709                preserve_header_case: false,
710                #[cfg(feature = "ffi")]
711                preserve_header_order: false,
712                h09_responses: false,
713                #[cfg(feature = "ffi")]
714                on_informational: &mut None,
715            };
716            assert!(buffered
717                .parse::<ClientTransaction>(cx, parse_ctx)
718                .is_pending());
719            Poll::Ready(())
720        })
721        .await;
722
723        assert_eq!(
724            buffered.read_buf,
725            b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
726        );
727    }
728
729    #[test]
730    fn read_strategy_adaptive_increments() {
731        let mut strategy = ReadStrategy::default();
732        assert_eq!(strategy.next(), 8192);
733
734        // Grows if record == next
735        strategy.record(8192);
736        assert_eq!(strategy.next(), 16384);
737
738        strategy.record(16384);
739        assert_eq!(strategy.next(), 32768);
740
741        // Enormous records still increment at same rate
742        strategy.record(usize::MAX);
743        assert_eq!(strategy.next(), 65536);
744
745        let max = strategy.max();
746        while strategy.next() < max {
747            strategy.record(max);
748        }
749
750        assert_eq!(strategy.next(), max, "never goes over max");
751        strategy.record(max + 1);
752        assert_eq!(strategy.next(), max, "never goes over max");
753    }
754
755    #[test]
756    fn read_strategy_adaptive_decrements() {
757        let mut strategy = ReadStrategy::default();
758        strategy.record(8192);
759        assert_eq!(strategy.next(), 16384);
760
761        strategy.record(1);
762        assert_eq!(
763            strategy.next(),
764            16384,
765            "first smaller record doesn't decrement yet"
766        );
767        strategy.record(8192);
768        assert_eq!(strategy.next(), 16384, "record was with range");
769
770        strategy.record(1);
771        assert_eq!(
772            strategy.next(),
773            16384,
774            "in-range record should make this the 'first' again"
775        );
776
777        strategy.record(1);
778        assert_eq!(strategy.next(), 8192, "second smaller record decrements");
779
780        strategy.record(1);
781        assert_eq!(strategy.next(), 8192, "first doesn't decrement");
782        strategy.record(1);
783        assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
784    }
785
786    #[test]
787    fn read_strategy_adaptive_stays_the_same() {
788        let mut strategy = ReadStrategy::default();
789        strategy.record(8192);
790        assert_eq!(strategy.next(), 16384);
791
792        strategy.record(8193);
793        assert_eq!(
794            strategy.next(),
795            16384,
796            "first smaller record doesn't decrement yet"
797        );
798
799        strategy.record(8193);
800        assert_eq!(
801            strategy.next(),
802            16384,
803            "with current step does not decrement"
804        );
805    }
806
807    #[test]
808    fn read_strategy_adaptive_max_fuzz() {
809        fn fuzz(max: usize) {
810            let mut strategy = ReadStrategy::with_max(max);
811            while strategy.next() < max {
812                strategy.record(usize::MAX);
813            }
814            let mut next = strategy.next();
815            while next > 8192 {
816                strategy.record(1);
817                strategy.record(1);
818                next = strategy.next();
819                assert!(
820                    next.is_power_of_two(),
821                    "decrement should be powers of two: {} (max = {})",
822                    next,
823                    max,
824                );
825            }
826        }
827
828        let mut max = 8192;
829        while max < std::usize::MAX {
830            fuzz(max);
831            max = (max / 2).saturating_mul(3);
832        }
833        fuzz(usize::MAX);
834    }
835
836    #[test]
837    #[should_panic]
838    #[cfg(debug_assertions)] // needs to trigger a debug_assert
839    fn write_buf_requires_non_empty_bufs() {
840        let mock = Mock::new().build();
841        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
842
843        buffered.buffer(Cursor::new(Vec::new()));
844    }
845
846    /*
847    TODO: needs tokio_test::io to allow configure write_buf calls
848    #[test]
849    fn write_buf_queue() {
850        let _ = pretty_env_logger::try_init();
851
852        let mock = AsyncIo::new_buf(vec![], 1024);
853        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
854
855
856        buffered.headers_buf().extend(b"hello ");
857        buffered.buffer(Cursor::new(b"world, ".to_vec()));
858        buffered.buffer(Cursor::new(b"it's ".to_vec()));
859        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
860        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
861        buffered.flush().unwrap();
862
863        assert_eq!(buffered.io, b"hello world, it's hyper!");
864        assert_eq!(buffered.io.num_writes(), 1);
865        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
866    }
867    */
868
869    #[cfg(not(miri))]
870    #[tokio::test]
871    async fn write_buf_flatten() {
872        let _ = pretty_env_logger::try_init();
873
874        let mock = Mock::new().write(b"hello world, it's hyper!").build();
875
876        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
877        buffered.write_buf.set_strategy(WriteStrategy::Flatten);
878
879        buffered.headers_buf().extend(b"hello ");
880        buffered.buffer(Cursor::new(b"world, ".to_vec()));
881        buffered.buffer(Cursor::new(b"it's ".to_vec()));
882        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
883        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
884
885        buffered.flush().await.expect("flush");
886    }
887
888    #[test]
889    fn write_buf_flatten_partially_flushed() {
890        let _ = pretty_env_logger::try_init();
891
892        let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
893
894        let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
895
896        write_buf.buffer(b("hello "));
897        write_buf.buffer(b("world, "));
898
899        assert_eq!(write_buf.chunk(), b"hello world, ");
900
901        // advance most of the way, but not all
902        write_buf.advance(11);
903
904        assert_eq!(write_buf.chunk(), b", ");
905        assert_eq!(write_buf.headers.pos, 11);
906        assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
907
908        // there's still room in the headers buffer, so just push on the end
909        write_buf.buffer(b("it's hyper!"));
910
911        assert_eq!(write_buf.chunk(), b", it's hyper!");
912        assert_eq!(write_buf.headers.pos, 11);
913
914        let rem1 = write_buf.remaining();
915        let cap = write_buf.headers.bytes.capacity();
916
917        // but when this would go over capacity, don't copy the old bytes
918        write_buf.buffer(Cursor::new(vec![b'X'; cap]));
919        assert_eq!(write_buf.remaining(), cap + rem1);
920        assert_eq!(write_buf.headers.pos, 0);
921    }
922
923    #[cfg(not(miri))]
924    #[tokio::test]
925    async fn write_buf_queue_disable_auto() {
926        let _ = pretty_env_logger::try_init();
927
928        let mock = Mock::new()
929            .write(b"hello ")
930            .write(b"world, ")
931            .write(b"it's ")
932            .write(b"hyper!")
933            .build();
934
935        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
936        buffered.write_buf.set_strategy(WriteStrategy::Queue);
937
938        // we have 4 buffers, and vec IO disabled, but explicitly said
939        // don't try to auto detect (via setting strategy above)
940
941        buffered.headers_buf().extend(b"hello ");
942        buffered.buffer(Cursor::new(b"world, ".to_vec()));
943        buffered.buffer(Cursor::new(b"it's ".to_vec()));
944        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
945        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
946
947        buffered.flush().await.expect("flush");
948
949        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
950    }
951
952    // #[cfg(feature = "nightly")]
953    // #[bench]
954    // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
955    //     let s = "Hello, World!";
956    //     b.bytes = s.len() as u64;
957
958    //     let mut write_buf = WriteBuf::<bytes::Bytes>::new();
959    //     write_buf.set_strategy(WriteStrategy::Flatten);
960    //     b.iter(|| {
961    //         let chunk = bytes::Bytes::from(s);
962    //         write_buf.buffer(chunk);
963    //         ::test::black_box(&write_buf);
964    //         write_buf.headers.bytes.clear();
965    //     })
966    // }
967}