1use std::cmp;
2use std::fmt;
3use std::io::{self, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::rt::{Read, ReadBuf, Write};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use futures_util::ready;
10
11use super::{Http1Transaction, ParseContext, ParsedMessage};
12use crate::common::buf::BufList;
13
14pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
16
17pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
19
20pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
24
25const MAX_BUF_LIST_BUFFERS: usize = 16;
31
32pub(crate) struct Buffered<T, B> {
33 flush_pipeline: bool,
34 io: T,
35 partial_len: Option<usize>,
36 read_blocked: bool,
37 read_buf: BytesMut,
38 read_buf_strategy: ReadStrategy,
39 write_buf: WriteBuf<B>,
40}
41
42impl<T, B> fmt::Debug for Buffered<T, B>
43where
44 B: Buf,
45{
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("Buffered")
48 .field("read_buf", &self.read_buf)
49 .field("write_buf", &self.write_buf)
50 .finish()
51 }
52}
53
54impl<T, B> Buffered<T, B>
55where
56 T: Read + Write + Unpin,
57 B: Buf,
58{
59 pub(crate) fn new(io: T) -> Buffered<T, B> {
60 let strategy = if io.is_write_vectored() {
61 WriteStrategy::Queue
62 } else {
63 WriteStrategy::Flatten
64 };
65 let write_buf = WriteBuf::new(strategy);
66 Buffered {
67 flush_pipeline: false,
68 io,
69 partial_len: None,
70 read_blocked: false,
71 read_buf: BytesMut::with_capacity(0),
72 read_buf_strategy: ReadStrategy::default(),
73 write_buf,
74 }
75 }
76
77 #[cfg(feature = "server")]
78 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
79 debug_assert!(!self.write_buf.has_remaining());
80 self.flush_pipeline = enabled;
81 if enabled {
82 self.set_write_strategy_flatten();
83 }
84 }
85
86 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
87 assert!(
88 max >= MINIMUM_MAX_BUFFER_SIZE,
89 "The max_buf_size cannot be smaller than {}.",
90 MINIMUM_MAX_BUFFER_SIZE,
91 );
92 self.read_buf_strategy = ReadStrategy::with_max(max);
93 self.write_buf.max_buf_size = max;
94 }
95
96 #[cfg(feature = "client")]
97 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
98 self.read_buf_strategy = ReadStrategy::Exact(sz);
99 }
100
101 pub(crate) fn set_write_strategy_flatten(&mut self) {
102 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
105 self.write_buf.set_strategy(WriteStrategy::Flatten);
106 }
107
108 pub(crate) fn set_write_strategy_queue(&mut self) {
109 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
112 self.write_buf.set_strategy(WriteStrategy::Queue);
113 }
114
115 pub(crate) fn read_buf(&self) -> &[u8] {
116 self.read_buf.as_ref()
117 }
118
119 #[cfg(test)]
120 #[cfg(feature = "nightly")]
121 pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
122 &mut self.read_buf
123 }
124
125 fn read_buf_remaining_mut(&self) -> usize {
128 self.read_buf.capacity() - self.read_buf.len()
129 }
130
131 pub(crate) fn can_headers_buf(&self) -> bool {
137 !self.write_buf.queue.has_remaining()
138 }
139
140 pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
141 let buf = self.write_buf.headers_mut();
142 &mut buf.bytes
143 }
144
145 pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
146 &mut self.write_buf
147 }
148
149 pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
150 self.write_buf.buffer(buf)
151 }
152
153 pub(crate) fn can_buffer(&self) -> bool {
154 self.flush_pipeline || self.write_buf.can_buffer()
155 }
156
157 pub(crate) fn consume_leading_lines(&mut self) {
158 if !self.read_buf.is_empty() {
159 let mut i = 0;
160 while i < self.read_buf.len() {
161 match self.read_buf[i] {
162 b'\r' | b'\n' => i += 1,
163 _ => break,
164 }
165 }
166 self.read_buf.advance(i);
167 }
168 }
169
170 pub(super) fn parse<S>(
171 &mut self,
172 cx: &mut Context<'_>,
173 parse_ctx: ParseContext<'_>,
174 ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
175 where
176 S: Http1Transaction,
177 {
178 loop {
179 match super::role::parse_headers::<S>(
180 &mut self.read_buf,
181 self.partial_len,
182 ParseContext {
183 cached_headers: parse_ctx.cached_headers,
184 req_method: parse_ctx.req_method,
185 h1_parser_config: parse_ctx.h1_parser_config.clone(),
186 h1_max_headers: parse_ctx.h1_max_headers,
187 preserve_header_case: parse_ctx.preserve_header_case,
188 #[cfg(feature = "ffi")]
189 preserve_header_order: parse_ctx.preserve_header_order,
190 h09_responses: parse_ctx.h09_responses,
191 #[cfg(feature = "ffi")]
192 on_informational: parse_ctx.on_informational,
193 },
194 )? {
195 Some(msg) => {
196 debug!("parsed {} headers", msg.head.headers.len());
197 self.partial_len = None;
198 return Poll::Ready(Ok(msg));
199 }
200 None => {
201 let max = self.read_buf_strategy.max();
202 let curr_len = self.read_buf.len();
203 if curr_len >= max {
204 debug!("max_buf_size ({}) reached, closing", max);
205 return Poll::Ready(Err(crate::Error::new_too_large()));
206 }
207 if curr_len > 0 {
208 trace!("partial headers; {} bytes so far", curr_len);
209 self.partial_len = Some(curr_len);
210 } else {
211 self.partial_len = None;
213 }
214 }
215 }
216 if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
217 trace!("parse eof");
218 return Poll::Ready(Err(crate::Error::new_incomplete()));
219 }
220 }
221 }
222
223 pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
224 self.read_blocked = false;
225 let next = self.read_buf_strategy.next();
226 if self.read_buf_remaining_mut() < next {
227 self.read_buf.reserve(next);
228 }
229
230 let dst = unsafe { self.read_buf.chunk_mut().as_uninit_slice_mut() };
233 let mut buf = ReadBuf::uninit(dst);
234 match Pin::new(&mut self.io).poll_read(cx, buf.unfilled()) {
235 Poll::Ready(Ok(_)) => {
236 let n = buf.filled().len();
237 trace!("received {} bytes", n);
238 unsafe {
239 self.read_buf.advance_mut(n);
243 }
244 self.read_buf_strategy.record(n);
245 Poll::Ready(Ok(n))
246 }
247 Poll::Pending => {
248 self.read_blocked = true;
249 Poll::Pending
250 }
251 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
252 }
253 }
254
255 pub(crate) fn into_inner(self) -> (T, Bytes) {
256 (self.io, self.read_buf.freeze())
257 }
258
259 pub(crate) fn io_mut(&mut self) -> &mut T {
260 &mut self.io
261 }
262
263 pub(crate) fn is_read_blocked(&self) -> bool {
264 self.read_blocked
265 }
266
267 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
268 if self.flush_pipeline && !self.read_buf.is_empty() {
269 Poll::Ready(Ok(()))
270 } else if self.write_buf.remaining() == 0 {
271 Pin::new(&mut self.io).poll_flush(cx)
272 } else {
273 if let WriteStrategy::Flatten = self.write_buf.strategy {
274 return self.poll_flush_flattened(cx);
275 }
276
277 const MAX_WRITEV_BUFS: usize = 64;
278 loop {
279 let n = {
280 let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
281 let len = self.write_buf.chunks_vectored(&mut iovs);
282 ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
283 };
284 self.write_buf.advance(n);
288 debug!("flushed {} bytes", n);
289 if self.write_buf.remaining() == 0 {
290 break;
291 } else if n == 0 {
292 trace!(
293 "write returned zero, but {} bytes remaining",
294 self.write_buf.remaining()
295 );
296 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
297 }
298 }
299 Pin::new(&mut self.io).poll_flush(cx)
300 }
301 }
302
303 fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
308 loop {
309 let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
310 debug!("flushed {} bytes", n);
311 self.write_buf.headers.advance(n);
312 if self.write_buf.headers.remaining() == 0 {
313 self.write_buf.headers.reset();
314 break;
315 } else if n == 0 {
316 trace!(
317 "write returned zero, but {} bytes remaining",
318 self.write_buf.remaining()
319 );
320 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
321 }
322 }
323 Pin::new(&mut self.io).poll_flush(cx)
324 }
325
326 #[cfg(test)]
327 fn flush(&mut self) -> impl std::future::Future<Output = io::Result<()>> + '_ {
328 futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
329 }
330}
331
332impl<T: Unpin, B> Unpin for Buffered<T, B> {}
334
335pub(crate) trait MemRead {
337 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
338}
339
340impl<T, B> MemRead for Buffered<T, B>
341where
342 T: Read + Write + Unpin,
343 B: Buf,
344{
345 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
346 if !self.read_buf.is_empty() {
347 let n = std::cmp::min(len, self.read_buf.len());
348 Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
349 } else {
350 let n = ready!(self.poll_read_from_io(cx))?;
351 Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
352 }
353 }
354}
355
356#[derive(Clone, Copy, Debug)]
357enum ReadStrategy {
358 Adaptive {
359 decrease_now: bool,
360 next: usize,
361 max: usize,
362 },
363 #[cfg(feature = "client")]
364 Exact(usize),
365}
366
367impl ReadStrategy {
368 fn with_max(max: usize) -> ReadStrategy {
369 ReadStrategy::Adaptive {
370 decrease_now: false,
371 next: INIT_BUFFER_SIZE,
372 max,
373 }
374 }
375
376 fn next(&self) -> usize {
377 match *self {
378 ReadStrategy::Adaptive { next, .. } => next,
379 #[cfg(feature = "client")]
380 ReadStrategy::Exact(exact) => exact,
381 }
382 }
383
384 fn max(&self) -> usize {
385 match *self {
386 ReadStrategy::Adaptive { max, .. } => max,
387 #[cfg(feature = "client")]
388 ReadStrategy::Exact(exact) => exact,
389 }
390 }
391
392 fn record(&mut self, bytes_read: usize) {
393 match *self {
394 ReadStrategy::Adaptive {
395 ref mut decrease_now,
396 ref mut next,
397 max,
398 ..
399 } => {
400 if bytes_read >= *next {
401 *next = cmp::min(incr_power_of_two(*next), max);
402 *decrease_now = false;
403 } else {
404 let decr_to = prev_power_of_two(*next);
405 if bytes_read < decr_to {
406 if *decrease_now {
407 *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
408 *decrease_now = false;
409 } else {
410 *decrease_now = true;
412 }
413 } else {
414 *decrease_now = false;
418 }
419 }
420 }
421 #[cfg(feature = "client")]
422 ReadStrategy::Exact(_) => (),
423 }
424 }
425}
426
427fn incr_power_of_two(n: usize) -> usize {
428 n.saturating_mul(2)
429}
430
431fn prev_power_of_two(n: usize) -> usize {
432 debug_assert!(n >= 4);
435 (usize::MAX >> (n.leading_zeros() + 2)) + 1
436}
437
438impl Default for ReadStrategy {
439 fn default() -> ReadStrategy {
440 ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
441 }
442}
443
444#[derive(Clone)]
445pub(crate) struct Cursor<T> {
446 bytes: T,
447 pos: usize,
448}
449
450impl<T: AsRef<[u8]>> Cursor<T> {
451 #[inline]
452 pub(crate) fn new(bytes: T) -> Cursor<T> {
453 Cursor { bytes, pos: 0 }
454 }
455}
456
457impl Cursor<Vec<u8>> {
458 fn maybe_unshift(&mut self, additional: usize) {
462 if self.pos == 0 {
463 return;
465 }
466
467 if self.bytes.capacity() - self.bytes.len() >= additional {
468 return;
470 }
471
472 self.bytes.drain(0..self.pos);
473 self.pos = 0;
474 }
475
476 fn reset(&mut self) {
477 self.pos = 0;
478 self.bytes.clear();
479 }
480}
481
482impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 f.debug_struct("Cursor")
485 .field("pos", &self.pos)
486 .field("len", &self.bytes.as_ref().len())
487 .finish()
488 }
489}
490
491impl<T: AsRef<[u8]>> Buf for Cursor<T> {
492 #[inline]
493 fn remaining(&self) -> usize {
494 self.bytes.as_ref().len() - self.pos
495 }
496
497 #[inline]
498 fn chunk(&self) -> &[u8] {
499 &self.bytes.as_ref()[self.pos..]
500 }
501
502 #[inline]
503 fn advance(&mut self, cnt: usize) {
504 debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
505 self.pos += cnt;
506 }
507}
508
509pub(super) struct WriteBuf<B> {
511 headers: Cursor<Vec<u8>>,
513 max_buf_size: usize,
514 queue: BufList<B>,
516 strategy: WriteStrategy,
517}
518
519impl<B: Buf> WriteBuf<B> {
520 fn new(strategy: WriteStrategy) -> WriteBuf<B> {
521 WriteBuf {
522 headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
523 max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
524 queue: BufList::new(),
525 strategy,
526 }
527 }
528}
529
530impl<B> WriteBuf<B>
531where
532 B: Buf,
533{
534 fn set_strategy(&mut self, strategy: WriteStrategy) {
535 self.strategy = strategy;
536 }
537
538 pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
539 debug_assert!(buf.has_remaining());
540 match self.strategy {
541 WriteStrategy::Flatten => {
542 let head = self.headers_mut();
543
544 head.maybe_unshift(buf.remaining());
545 trace!(
546 self.len = head.remaining(),
547 buf.len = buf.remaining(),
548 "buffer.flatten"
549 );
550 loop {
553 let adv = {
554 let slice = buf.chunk();
555 if slice.is_empty() {
556 return;
557 }
558 head.bytes.extend_from_slice(slice);
559 slice.len()
560 };
561 buf.advance(adv);
562 }
563 }
564 WriteStrategy::Queue => {
565 trace!(
566 self.len = self.remaining(),
567 buf.len = buf.remaining(),
568 "buffer.queue"
569 );
570 self.queue.push(buf.into());
571 }
572 }
573 }
574
575 fn can_buffer(&self) -> bool {
576 match self.strategy {
577 WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
578 WriteStrategy::Queue => {
579 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
580 }
581 }
582 }
583
584 fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
585 debug_assert!(!self.queue.has_remaining());
586 &mut self.headers
587 }
588}
589
590impl<B: Buf> fmt::Debug for WriteBuf<B> {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 f.debug_struct("WriteBuf")
593 .field("remaining", &self.remaining())
594 .field("strategy", &self.strategy)
595 .finish()
596 }
597}
598
599impl<B: Buf> Buf for WriteBuf<B> {
600 #[inline]
601 fn remaining(&self) -> usize {
602 self.headers.remaining() + self.queue.remaining()
603 }
604
605 #[inline]
606 fn chunk(&self) -> &[u8] {
607 let headers = self.headers.chunk();
608 if !headers.is_empty() {
609 headers
610 } else {
611 self.queue.chunk()
612 }
613 }
614
615 #[inline]
616 fn advance(&mut self, cnt: usize) {
617 let hrem = self.headers.remaining();
618
619 match hrem.cmp(&cnt) {
620 cmp::Ordering::Equal => self.headers.reset(),
621 cmp::Ordering::Greater => self.headers.advance(cnt),
622 cmp::Ordering::Less => {
623 let qcnt = cnt - hrem;
624 self.headers.reset();
625 self.queue.advance(qcnt);
626 }
627 }
628 }
629
630 #[inline]
631 fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
632 let n = self.headers.chunks_vectored(dst);
633 self.queue.chunks_vectored(&mut dst[n..]) + n
634 }
635}
636
637#[derive(Debug)]
638enum WriteStrategy {
639 Flatten,
640 Queue,
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646 use crate::common::io::Compat;
647 use std::time::Duration;
648
649 use tokio_test::io::Builder as Mock;
650
651 #[tokio::test]
665 #[ignore]
666 async fn iobuf_write_empty_slice() {
667 }
684
685 #[cfg(not(miri))]
686 #[tokio::test]
687 async fn parse_reads_until_blocked() {
688 use crate::proto::h1::ClientTransaction;
689
690 let _ = pretty_env_logger::try_init();
691 let mock = Mock::new()
692 .read(b"HTTP/1.1 200 OK\r\n")
694 .read(b"Server: hyper\r\n")
695 .wait(Duration::from_secs(1))
697 .build();
698
699 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
700
701 futures_util::future::poll_fn(|cx| {
704 let parse_ctx = ParseContext {
705 cached_headers: &mut None,
706 req_method: &mut None,
707 h1_parser_config: Default::default(),
708 h1_max_headers: None,
709 preserve_header_case: false,
710 #[cfg(feature = "ffi")]
711 preserve_header_order: false,
712 h09_responses: false,
713 #[cfg(feature = "ffi")]
714 on_informational: &mut None,
715 };
716 assert!(buffered
717 .parse::<ClientTransaction>(cx, parse_ctx)
718 .is_pending());
719 Poll::Ready(())
720 })
721 .await;
722
723 assert_eq!(
724 buffered.read_buf,
725 b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
726 );
727 }
728
729 #[test]
730 fn read_strategy_adaptive_increments() {
731 let mut strategy = ReadStrategy::default();
732 assert_eq!(strategy.next(), 8192);
733
734 strategy.record(8192);
736 assert_eq!(strategy.next(), 16384);
737
738 strategy.record(16384);
739 assert_eq!(strategy.next(), 32768);
740
741 strategy.record(usize::MAX);
743 assert_eq!(strategy.next(), 65536);
744
745 let max = strategy.max();
746 while strategy.next() < max {
747 strategy.record(max);
748 }
749
750 assert_eq!(strategy.next(), max, "never goes over max");
751 strategy.record(max + 1);
752 assert_eq!(strategy.next(), max, "never goes over max");
753 }
754
755 #[test]
756 fn read_strategy_adaptive_decrements() {
757 let mut strategy = ReadStrategy::default();
758 strategy.record(8192);
759 assert_eq!(strategy.next(), 16384);
760
761 strategy.record(1);
762 assert_eq!(
763 strategy.next(),
764 16384,
765 "first smaller record doesn't decrement yet"
766 );
767 strategy.record(8192);
768 assert_eq!(strategy.next(), 16384, "record was with range");
769
770 strategy.record(1);
771 assert_eq!(
772 strategy.next(),
773 16384,
774 "in-range record should make this the 'first' again"
775 );
776
777 strategy.record(1);
778 assert_eq!(strategy.next(), 8192, "second smaller record decrements");
779
780 strategy.record(1);
781 assert_eq!(strategy.next(), 8192, "first doesn't decrement");
782 strategy.record(1);
783 assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
784 }
785
786 #[test]
787 fn read_strategy_adaptive_stays_the_same() {
788 let mut strategy = ReadStrategy::default();
789 strategy.record(8192);
790 assert_eq!(strategy.next(), 16384);
791
792 strategy.record(8193);
793 assert_eq!(
794 strategy.next(),
795 16384,
796 "first smaller record doesn't decrement yet"
797 );
798
799 strategy.record(8193);
800 assert_eq!(
801 strategy.next(),
802 16384,
803 "with current step does not decrement"
804 );
805 }
806
807 #[test]
808 fn read_strategy_adaptive_max_fuzz() {
809 fn fuzz(max: usize) {
810 let mut strategy = ReadStrategy::with_max(max);
811 while strategy.next() < max {
812 strategy.record(usize::MAX);
813 }
814 let mut next = strategy.next();
815 while next > 8192 {
816 strategy.record(1);
817 strategy.record(1);
818 next = strategy.next();
819 assert!(
820 next.is_power_of_two(),
821 "decrement should be powers of two: {} (max = {})",
822 next,
823 max,
824 );
825 }
826 }
827
828 let mut max = 8192;
829 while max < std::usize::MAX {
830 fuzz(max);
831 max = (max / 2).saturating_mul(3);
832 }
833 fuzz(usize::MAX);
834 }
835
836 #[test]
837 #[should_panic]
838 #[cfg(debug_assertions)] fn write_buf_requires_non_empty_bufs() {
840 let mock = Mock::new().build();
841 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
842
843 buffered.buffer(Cursor::new(Vec::new()));
844 }
845
846 #[cfg(not(miri))]
870 #[tokio::test]
871 async fn write_buf_flatten() {
872 let _ = pretty_env_logger::try_init();
873
874 let mock = Mock::new().write(b"hello world, it's hyper!").build();
875
876 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
877 buffered.write_buf.set_strategy(WriteStrategy::Flatten);
878
879 buffered.headers_buf().extend(b"hello ");
880 buffered.buffer(Cursor::new(b"world, ".to_vec()));
881 buffered.buffer(Cursor::new(b"it's ".to_vec()));
882 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
883 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
884
885 buffered.flush().await.expect("flush");
886 }
887
888 #[test]
889 fn write_buf_flatten_partially_flushed() {
890 let _ = pretty_env_logger::try_init();
891
892 let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
893
894 let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
895
896 write_buf.buffer(b("hello "));
897 write_buf.buffer(b("world, "));
898
899 assert_eq!(write_buf.chunk(), b"hello world, ");
900
901 write_buf.advance(11);
903
904 assert_eq!(write_buf.chunk(), b", ");
905 assert_eq!(write_buf.headers.pos, 11);
906 assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
907
908 write_buf.buffer(b("it's hyper!"));
910
911 assert_eq!(write_buf.chunk(), b", it's hyper!");
912 assert_eq!(write_buf.headers.pos, 11);
913
914 let rem1 = write_buf.remaining();
915 let cap = write_buf.headers.bytes.capacity();
916
917 write_buf.buffer(Cursor::new(vec![b'X'; cap]));
919 assert_eq!(write_buf.remaining(), cap + rem1);
920 assert_eq!(write_buf.headers.pos, 0);
921 }
922
923 #[cfg(not(miri))]
924 #[tokio::test]
925 async fn write_buf_queue_disable_auto() {
926 let _ = pretty_env_logger::try_init();
927
928 let mock = Mock::new()
929 .write(b"hello ")
930 .write(b"world, ")
931 .write(b"it's ")
932 .write(b"hyper!")
933 .build();
934
935 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
936 buffered.write_buf.set_strategy(WriteStrategy::Queue);
937
938 buffered.headers_buf().extend(b"hello ");
942 buffered.buffer(Cursor::new(b"world, ".to_vec()));
943 buffered.buffer(Cursor::new(b"it's ".to_vec()));
944 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
945 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
946
947 buffered.flush().await.expect("flush");
948
949 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
950 }
951
952 }