1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31pub(crate) struct Conn<I, B, T> {
39 io: Buffered<I, EncodedBuf<B>>,
40 state: State,
41 _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46 I: Read + Write + Unpin,
47 B: Buf,
48 T: Http1Transaction,
49{
50 pub(crate) fn new(io: I) -> Conn<I, B, T> {
51 Conn {
52 io: Buffered::new(io),
53 state: State {
54 allow_half_close: false,
55 cached_headers: None,
56 error: None,
57 keep_alive: KA::Busy,
58 method: None,
59 h1_parser_config: ParserConfig::default(),
60 h1_max_headers: None,
61 #[cfg(feature = "server")]
62 h1_header_read_timeout: None,
63 #[cfg(feature = "server")]
64 h1_header_read_timeout_fut: None,
65 #[cfg(feature = "server")]
66 h1_header_read_timeout_running: false,
67 #[cfg(feature = "server")]
68 date_header: true,
69 #[cfg(feature = "server")]
70 timer: Time::Empty,
71 preserve_header_case: false,
72 #[cfg(feature = "ffi")]
73 preserve_header_order: false,
74 title_case_headers: false,
75 h09_responses: false,
76 #[cfg(feature = "ffi")]
77 on_informational: None,
78 notify_read: false,
79 reading: Reading::Init,
80 writing: Writing::Init,
81 upgrade: None,
82 version: Version::HTTP_11,
85 allow_trailer_fields: false,
86 },
87 _marker: PhantomData,
88 }
89 }
90
91 #[cfg(feature = "server")]
92 pub(crate) fn set_timer(&mut self, timer: Time) {
93 self.state.timer = timer;
94 }
95
96 #[cfg(feature = "server")]
97 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98 self.io.set_flush_pipeline(enabled);
99 }
100
101 pub(crate) fn set_write_strategy_queue(&mut self) {
102 self.io.set_write_strategy_queue();
103 }
104
105 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106 self.io.set_max_buf_size(max);
107 }
108
109 #[cfg(feature = "client")]
110 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111 self.io.set_read_buf_exact_size(sz);
112 }
113
114 pub(crate) fn set_write_strategy_flatten(&mut self) {
115 self.io.set_write_strategy_flatten();
116 }
117
118 #[cfg(feature = "client")]
119 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
120 self.state.h1_parser_config = parser_config;
121 }
122
123 pub(crate) fn set_title_case_headers(&mut self) {
124 self.state.title_case_headers = true;
125 }
126
127 pub(crate) fn set_preserve_header_case(&mut self) {
128 self.state.preserve_header_case = true;
129 }
130
131 #[cfg(feature = "ffi")]
132 pub(crate) fn set_preserve_header_order(&mut self) {
133 self.state.preserve_header_order = true;
134 }
135
136 #[cfg(feature = "client")]
137 pub(crate) fn set_h09_responses(&mut self) {
138 self.state.h09_responses = true;
139 }
140
141 pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
142 self.state.h1_max_headers = Some(val);
143 }
144
145 #[cfg(feature = "server")]
146 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
147 self.state.h1_header_read_timeout = Some(val);
148 }
149
150 #[cfg(feature = "server")]
151 pub(crate) fn set_allow_half_close(&mut self) {
152 self.state.allow_half_close = true;
153 }
154
155 #[cfg(feature = "server")]
156 pub(crate) fn disable_date_header(&mut self) {
157 self.state.date_header = false;
158 }
159
160 pub(crate) fn into_inner(self) -> (I, Bytes) {
161 self.io.into_inner()
162 }
163
164 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
165 self.state.upgrade.take()
166 }
167
168 pub(crate) fn is_read_closed(&self) -> bool {
169 self.state.is_read_closed()
170 }
171
172 pub(crate) fn is_write_closed(&self) -> bool {
173 self.state.is_write_closed()
174 }
175
176 pub(crate) fn can_read_head(&self) -> bool {
177 if !matches!(self.state.reading, Reading::Init) {
178 return false;
179 }
180
181 if T::should_read_first() {
182 return true;
183 }
184
185 !matches!(self.state.writing, Writing::Init)
186 }
187
188 pub(crate) fn can_read_body(&self) -> bool {
189 matches!(
190 self.state.reading,
191 Reading::Body(..) | Reading::Continue(..)
192 )
193 }
194
195 #[cfg(feature = "server")]
196 pub(crate) fn has_initial_read_write_state(&self) -> bool {
197 matches!(self.state.reading, Reading::Init)
198 && matches!(self.state.writing, Writing::Init)
199 && self.io.read_buf().is_empty()
200 }
201
202 fn should_error_on_eof(&self) -> bool {
203 T::should_error_on_parse_eof() && !self.state.is_idle()
205 }
206
207 fn has_h2_prefix(&self) -> bool {
208 let read_buf = self.io.read_buf();
209 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
210 }
211
212 pub(super) fn poll_read_head(
213 &mut self,
214 cx: &mut Context<'_>,
215 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
216 debug_assert!(self.can_read_head());
217 trace!("Conn::read_head");
218
219 #[cfg(feature = "server")]
220 if !self.state.h1_header_read_timeout_running {
221 if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
222 let deadline = Instant::now() + h1_header_read_timeout;
223 self.state.h1_header_read_timeout_running = true;
224 match self.state.h1_header_read_timeout_fut {
225 Some(ref mut h1_header_read_timeout_fut) => {
226 trace!("resetting h1 header read timeout timer");
227 self.state.timer.reset(h1_header_read_timeout_fut, deadline);
228 }
229 None => {
230 trace!("setting h1 header read timeout timer");
231 self.state.h1_header_read_timeout_fut =
232 Some(self.state.timer.sleep_until(deadline));
233 }
234 }
235 }
236 }
237
238 let msg = match self.io.parse::<T>(
239 cx,
240 ParseContext {
241 cached_headers: &mut self.state.cached_headers,
242 req_method: &mut self.state.method,
243 h1_parser_config: self.state.h1_parser_config.clone(),
244 h1_max_headers: self.state.h1_max_headers,
245 preserve_header_case: self.state.preserve_header_case,
246 #[cfg(feature = "ffi")]
247 preserve_header_order: self.state.preserve_header_order,
248 h09_responses: self.state.h09_responses,
249 #[cfg(feature = "ffi")]
250 on_informational: &mut self.state.on_informational,
251 },
252 ) {
253 Poll::Ready(Ok(msg)) => msg,
254 Poll::Ready(Err(e)) => return self.on_read_head_error(e),
255 Poll::Pending => {
256 #[cfg(feature = "server")]
257 if self.state.h1_header_read_timeout_running {
258 if let Some(ref mut h1_header_read_timeout_fut) =
259 self.state.h1_header_read_timeout_fut
260 {
261 if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
262 self.state.h1_header_read_timeout_running = false;
263
264 warn!("read header from client timeout");
265 return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
266 }
267 }
268 }
269
270 return Poll::Pending;
271 }
272 };
273
274 #[cfg(feature = "server")]
275 {
276 self.state.h1_header_read_timeout_running = false;
277 self.state.h1_header_read_timeout_fut = None;
278 }
279
280 debug!("incoming body is {}", msg.decode);
284
285 self.state.h09_responses = false;
287
288 #[cfg(feature = "ffi")]
290 {
291 self.state.on_informational = None;
292 }
293
294 self.state.busy();
295 self.state.keep_alive &= msg.keep_alive;
296 self.state.version = msg.head.version;
297
298 let mut wants = if msg.wants_upgrade {
299 Wants::UPGRADE
300 } else {
301 Wants::EMPTY
302 };
303
304 if msg.decode == DecodedLength::ZERO {
305 if msg.expect_continue {
306 debug!("ignoring expect-continue since body is empty");
307 }
308 self.state.reading = Reading::KeepAlive;
309 if !T::should_read_first() {
310 self.try_keep_alive(cx);
311 }
312 } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
313 let h1_max_header_size = None; self.state.reading = Reading::Continue(Decoder::new(
315 msg.decode,
316 self.state.h1_max_headers,
317 h1_max_header_size,
318 ));
319 wants = wants.add(Wants::EXPECT);
320 } else {
321 let h1_max_header_size = None; self.state.reading = Reading::Body(Decoder::new(
323 msg.decode,
324 self.state.h1_max_headers,
325 h1_max_header_size,
326 ));
327 }
328
329 self.state.allow_trailer_fields = msg
330 .head
331 .headers
332 .get(TE)
333 .map_or(false, |te_header| te_header == "trailers");
334
335 Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
336 }
337
338 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
339 let must_error = self.should_error_on_eof();
343 self.close_read();
344 self.io.consume_leading_lines();
345 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
346 if was_mid_parse || must_error {
347 debug!(
349 "parse error ({}) with {} bytes",
350 e,
351 self.io.read_buf().len()
352 );
353 match self.on_parse_error(e) {
354 Ok(()) => Poll::Pending, Err(e) => Poll::Ready(Some(Err(e))),
356 }
357 } else {
358 debug!("read eof");
359 self.close_write();
360 Poll::Ready(None)
361 }
362 }
363
364 pub(crate) fn poll_read_body(
365 &mut self,
366 cx: &mut Context<'_>,
367 ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
368 debug_assert!(self.can_read_body());
369
370 let (reading, ret) = match self.state.reading {
371 Reading::Body(ref mut decoder) => {
372 match ready!(decoder.decode(cx, &mut self.io)) {
373 Ok(frame) => {
374 if frame.is_data() {
375 let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
376 let (reading, maybe_frame) = if decoder.is_eof() {
377 debug!("incoming body completed");
378 (
379 Reading::KeepAlive,
380 if !slice.is_empty() {
381 Some(Ok(frame))
382 } else {
383 None
384 },
385 )
386 } else if slice.is_empty() {
387 error!("incoming body unexpectedly ended");
388 (Reading::Closed, None)
392 } else {
393 return Poll::Ready(Some(Ok(frame)));
394 };
395 (reading, Poll::Ready(maybe_frame))
396 } else if frame.is_trailers() {
397 (Reading::Closed, Poll::Ready(Some(Ok(frame))))
398 } else {
399 trace!("discarding unknown frame");
400 (Reading::Closed, Poll::Ready(None))
401 }
402 }
403 Err(e) => {
404 debug!("incoming body decode error: {}", e);
405 (Reading::Closed, Poll::Ready(Some(Err(e))))
406 }
407 }
408 }
409 Reading::Continue(ref decoder) => {
410 if let Writing::Init = self.state.writing {
412 trace!("automatically sending 100 Continue");
413 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
414 self.io.headers_buf().extend_from_slice(cont);
415 }
416
417 self.state.reading = Reading::Body(decoder.clone());
419 return self.poll_read_body(cx);
420 }
421 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
422 };
423
424 self.state.reading = reading;
425 self.try_keep_alive(cx);
426 ret
427 }
428
429 pub(crate) fn wants_read_again(&mut self) -> bool {
430 let ret = self.state.notify_read;
431 self.state.notify_read = false;
432 ret
433 }
434
435 pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
436 debug_assert!(!self.can_read_head() && !self.can_read_body());
437
438 if self.is_read_closed() {
439 Poll::Pending
440 } else if self.is_mid_message() {
441 self.mid_message_detect_eof(cx)
442 } else {
443 self.require_empty_read(cx)
444 }
445 }
446
447 fn is_mid_message(&self) -> bool {
448 !matches!(
449 (&self.state.reading, &self.state.writing),
450 (&Reading::Init, &Writing::Init)
451 )
452 }
453
454 fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
459 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
460 debug_assert!(!self.is_mid_message());
461 debug_assert!(T::is_client());
462
463 if !self.io.read_buf().is_empty() {
464 debug!("received an unexpected {} bytes", self.io.read_buf().len());
465 return Poll::Ready(Err(crate::Error::new_unexpected_message()));
466 }
467
468 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
469
470 if num_read == 0 {
471 let ret = if self.should_error_on_eof() {
472 trace!("found unexpected EOF on busy connection: {:?}", self.state);
473 Poll::Ready(Err(crate::Error::new_incomplete()))
474 } else {
475 trace!("found EOF on idle connection, closing");
476 Poll::Ready(Ok(()))
477 };
478
479 self.state.close_read();
481 return ret;
482 }
483
484 debug!(
485 "received unexpected {} bytes on an idle connection",
486 num_read
487 );
488 Poll::Ready(Err(crate::Error::new_unexpected_message()))
489 }
490
491 fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
492 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
493 debug_assert!(self.is_mid_message());
494
495 if self.state.allow_half_close || !self.io.read_buf().is_empty() {
496 return Poll::Pending;
497 }
498
499 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
500
501 if num_read == 0 {
502 trace!("found unexpected EOF on busy connection: {:?}", self.state);
503 self.state.close_read();
504 Poll::Ready(Err(crate::Error::new_incomplete()))
505 } else {
506 Poll::Ready(Ok(()))
507 }
508 }
509
510 fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
511 debug_assert!(!self.state.is_read_closed());
512
513 let result = ready!(self.io.poll_read_from_io(cx));
514 Poll::Ready(result.map_err(|e| {
515 trace!(error = %e, "force_io_read; io error");
516 self.state.close();
517 e
518 }))
519 }
520
521 fn maybe_notify(&mut self, cx: &mut Context<'_>) {
522 match self.state.reading {
528 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
529 return
530 }
531 Reading::Init => (),
532 };
533
534 match self.state.writing {
535 Writing::Body(..) => return,
536 Writing::Init | Writing::KeepAlive | Writing::Closed => (),
537 }
538
539 if !self.io.is_read_blocked() {
540 if self.io.read_buf().is_empty() {
541 match self.io.poll_read_from_io(cx) {
542 Poll::Ready(Ok(n)) => {
543 if n == 0 {
544 trace!("maybe_notify; read eof");
545 if self.state.is_idle() {
546 self.state.close();
547 } else {
548 self.close_read()
549 }
550 return;
551 }
552 }
553 Poll::Pending => {
554 trace!("maybe_notify; read_from_io blocked");
555 return;
556 }
557 Poll::Ready(Err(e)) => {
558 trace!("maybe_notify; read_from_io error: {}", e);
559 self.state.close();
560 self.state.error = Some(crate::Error::new_io(e));
561 }
562 }
563 }
564 self.state.notify_read = true;
565 }
566 }
567
568 fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
569 self.state.try_keep_alive::<T>();
570 self.maybe_notify(cx);
571 }
572
573 pub(crate) fn can_write_head(&self) -> bool {
574 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
575 return false;
576 }
577
578 match self.state.writing {
579 Writing::Init => self.io.can_headers_buf(),
580 _ => false,
581 }
582 }
583
584 pub(crate) fn can_write_body(&self) -> bool {
585 match self.state.writing {
586 Writing::Body(..) => true,
587 Writing::Init | Writing::KeepAlive | Writing::Closed => false,
588 }
589 }
590
591 pub(crate) fn can_buffer_body(&self) -> bool {
592 self.io.can_buffer()
593 }
594
595 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
596 if let Some(encoder) = self.encode_head(head, body) {
597 self.state.writing = if !encoder.is_eof() {
598 Writing::Body(encoder)
599 } else if encoder.is_last() {
600 Writing::Closed
601 } else {
602 Writing::KeepAlive
603 };
604 }
605 }
606
607 fn encode_head(
608 &mut self,
609 mut head: MessageHead<T::Outgoing>,
610 body: Option<BodyLength>,
611 ) -> Option<Encoder> {
612 debug_assert!(self.can_write_head());
613
614 if !T::should_read_first() {
615 self.state.busy();
616 }
617
618 self.enforce_version(&mut head);
619
620 let buf = self.io.headers_buf();
621 match super::role::encode_headers::<T>(
622 Encode {
623 head: &mut head,
624 body,
625 #[cfg(feature = "server")]
626 keep_alive: self.state.wants_keep_alive(),
627 req_method: &mut self.state.method,
628 title_case_headers: self.state.title_case_headers,
629 #[cfg(feature = "server")]
630 date_header: self.state.date_header,
631 },
632 buf,
633 ) {
634 Ok(encoder) => {
635 debug_assert!(self.state.cached_headers.is_none());
636 debug_assert!(head.headers.is_empty());
637 self.state.cached_headers = Some(head.headers);
638
639 #[cfg(feature = "ffi")]
640 {
641 self.state.on_informational =
642 head.extensions.remove::<crate::ffi::OnInformational>();
643 }
644
645 Some(encoder)
646 }
647 Err(err) => {
648 self.state.error = Some(err);
649 self.state.writing = Writing::Closed;
650 None
651 }
652 }
653 }
654
655 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
657 let outgoing_is_keep_alive = head
658 .headers
659 .get(CONNECTION)
660 .map_or(false, headers::connection_keep_alive);
661
662 if !outgoing_is_keep_alive {
663 match head.version {
664 Version::HTTP_10 => self.state.disable_keep_alive(),
667 Version::HTTP_11 => {
670 if self.state.wants_keep_alive() {
671 head.headers
672 .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
673 }
674 }
675 _ => (),
676 }
677 }
678 }
679
680 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
683 match self.state.version {
684 Version::HTTP_10 => {
685 self.fix_keep_alive(head);
687 head.version = Version::HTTP_10;
690 }
691 Version::HTTP_11 => {
692 if let KA::Disabled = self.state.keep_alive.status() {
693 head.headers
694 .insert(CONNECTION, HeaderValue::from_static("close"));
695 }
696 }
697 _ => (),
698 }
699 }
703
704 pub(crate) fn write_body(&mut self, chunk: B) {
705 debug_assert!(self.can_write_body() && self.can_buffer_body());
706 debug_assert!(chunk.remaining() != 0);
708
709 let state = match self.state.writing {
710 Writing::Body(ref mut encoder) => {
711 self.io.buffer(encoder.encode(chunk));
712
713 if !encoder.is_eof() {
714 return;
715 }
716
717 if encoder.is_last() {
718 Writing::Closed
719 } else {
720 Writing::KeepAlive
721 }
722 }
723 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
724 };
725
726 self.state.writing = state;
727 }
728
729 pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
730 if T::is_server() && !self.state.allow_trailer_fields {
731 debug!("trailers not allowed to be sent");
732 return;
733 }
734 debug_assert!(self.can_write_body() && self.can_buffer_body());
735
736 match self.state.writing {
737 Writing::Body(ref encoder) => {
738 if let Some(enc_buf) =
739 encoder.encode_trailers(trailers, self.state.title_case_headers)
740 {
741 self.io.buffer(enc_buf);
742
743 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
744 Writing::Closed
745 } else {
746 Writing::KeepAlive
747 };
748 }
749 }
750 _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
751 }
752 }
753
754 pub(crate) fn write_body_and_end(&mut self, chunk: B) {
755 debug_assert!(self.can_write_body() && self.can_buffer_body());
756 debug_assert!(chunk.remaining() != 0);
758
759 let state = match self.state.writing {
760 Writing::Body(ref encoder) => {
761 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
762 if can_keep_alive {
763 Writing::KeepAlive
764 } else {
765 Writing::Closed
766 }
767 }
768 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
769 };
770
771 self.state.writing = state;
772 }
773
774 pub(crate) fn end_body(&mut self) -> crate::Result<()> {
775 debug_assert!(self.can_write_body());
776
777 let encoder = match self.state.writing {
778 Writing::Body(ref mut enc) => enc,
779 _ => return Ok(()),
780 };
781
782 match encoder.end() {
784 Ok(end) => {
785 if let Some(end) = end {
786 self.io.buffer(end);
787 }
788
789 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
790 Writing::Closed
791 } else {
792 Writing::KeepAlive
793 };
794
795 Ok(())
796 }
797 Err(not_eof) => {
798 self.state.writing = Writing::Closed;
799 Err(crate::Error::new_body_write_aborted().with(not_eof))
800 }
801 }
802 }
803
804 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
810 if let Writing::Init = self.state.writing {
811 if self.has_h2_prefix() {
812 return Err(crate::Error::new_version_h2());
813 }
814 if let Some(msg) = T::on_error(&err) {
815 self.state.cached_headers.take();
818 self.write_head(msg, None);
819 self.state.error = Some(err);
820 return Ok(());
821 }
822 }
823
824 Err(err)
826 }
827
828 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
829 ready!(Pin::new(&mut self.io).poll_flush(cx))?;
830 self.try_keep_alive(cx);
831 trace!("flushed({}): {:?}", T::LOG, self.state);
832 Poll::Ready(Ok(()))
833 }
834
835 pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
836 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
837 Ok(()) => {
838 trace!("shut down IO complete");
839 Poll::Ready(Ok(()))
840 }
841 Err(e) => {
842 debug!("error shutting down IO: {}", e);
843 Poll::Ready(Err(e))
844 }
845 }
846 }
847
848 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
850 if let Reading::Continue(ref decoder) = self.state.reading {
851 self.state.reading = Reading::Body(decoder.clone());
854 }
855
856 let _ = self.poll_read_body(cx);
857
858 match self.state.reading {
860 Reading::Init | Reading::KeepAlive => {
861 trace!("body drained")
862 }
863 _ => self.close_read(),
864 }
865 }
866
867 pub(crate) fn close_read(&mut self) {
868 self.state.close_read();
869 }
870
871 pub(crate) fn close_write(&mut self) {
872 self.state.close_write();
873 }
874
875 #[cfg(feature = "server")]
876 pub(crate) fn disable_keep_alive(&mut self) {
877 if self.state.is_idle() {
878 trace!("disable_keep_alive; closing idle connection");
879 self.state.close();
880 } else {
881 trace!("disable_keep_alive; in-progress connection");
882 self.state.disable_keep_alive();
883 }
884 }
885
886 pub(crate) fn take_error(&mut self) -> crate::Result<()> {
887 if let Some(err) = self.state.error.take() {
888 Err(err)
889 } else {
890 Ok(())
891 }
892 }
893
894 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
895 trace!("{}: prepare possible HTTP upgrade", T::LOG);
896 self.state.prepare_upgrade()
897 }
898}
899
900impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
901 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902 f.debug_struct("Conn")
903 .field("state", &self.state)
904 .field("io", &self.io)
905 .finish()
906 }
907}
908
909impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
911
912struct State {
913 allow_half_close: bool,
914 cached_headers: Option<HeaderMap>,
916 error: Option<crate::Error>,
919 keep_alive: KA,
921 method: Option<Method>,
926 h1_parser_config: ParserConfig,
927 h1_max_headers: Option<usize>,
928 #[cfg(feature = "server")]
929 h1_header_read_timeout: Option<Duration>,
930 #[cfg(feature = "server")]
931 h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
932 #[cfg(feature = "server")]
933 h1_header_read_timeout_running: bool,
934 #[cfg(feature = "server")]
935 date_header: bool,
936 #[cfg(feature = "server")]
937 timer: Time,
938 preserve_header_case: bool,
939 #[cfg(feature = "ffi")]
940 preserve_header_order: bool,
941 title_case_headers: bool,
942 h09_responses: bool,
943 #[cfg(feature = "ffi")]
947 on_informational: Option<crate::ffi::OnInformational>,
948 notify_read: bool,
951 reading: Reading,
953 writing: Writing,
955 upgrade: Option<crate::upgrade::Pending>,
957 version: Version,
959 allow_trailer_fields: bool,
961}
962
963#[derive(Debug)]
964enum Reading {
965 Init,
966 Continue(Decoder),
967 Body(Decoder),
968 KeepAlive,
969 Closed,
970}
971
972enum Writing {
973 Init,
974 Body(Encoder),
975 KeepAlive,
976 Closed,
977}
978
979impl fmt::Debug for State {
980 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
981 let mut builder = f.debug_struct("State");
982 builder
983 .field("reading", &self.reading)
984 .field("writing", &self.writing)
985 .field("keep_alive", &self.keep_alive);
986
987 if let Some(ref error) = self.error {
989 builder.field("error", error);
990 }
991
992 if self.allow_half_close {
993 builder.field("allow_half_close", &true);
994 }
995
996 builder.finish()
999 }
1000}
1001
1002impl fmt::Debug for Writing {
1003 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1004 match *self {
1005 Writing::Init => f.write_str("Init"),
1006 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
1007 Writing::KeepAlive => f.write_str("KeepAlive"),
1008 Writing::Closed => f.write_str("Closed"),
1009 }
1010 }
1011}
1012
1013impl std::ops::BitAndAssign<bool> for KA {
1014 fn bitand_assign(&mut self, enabled: bool) {
1015 if !enabled {
1016 trace!("remote disabling keep-alive");
1017 *self = KA::Disabled;
1018 }
1019 }
1020}
1021
1022#[derive(Clone, Copy, Debug, Default)]
1023enum KA {
1024 Idle,
1025 #[default]
1026 Busy,
1027 Disabled,
1028}
1029
1030impl KA {
1031 fn idle(&mut self) {
1032 *self = KA::Idle;
1033 }
1034
1035 fn busy(&mut self) {
1036 *self = KA::Busy;
1037 }
1038
1039 fn disable(&mut self) {
1040 *self = KA::Disabled;
1041 }
1042
1043 fn status(&self) -> KA {
1044 *self
1045 }
1046}
1047
1048impl State {
1049 fn close(&mut self) {
1050 trace!("State::close()");
1051 self.reading = Reading::Closed;
1052 self.writing = Writing::Closed;
1053 self.keep_alive.disable();
1054 }
1055
1056 fn close_read(&mut self) {
1057 trace!("State::close_read()");
1058 self.reading = Reading::Closed;
1059 self.keep_alive.disable();
1060 }
1061
1062 fn close_write(&mut self) {
1063 trace!("State::close_write()");
1064 self.writing = Writing::Closed;
1065 self.keep_alive.disable();
1066 }
1067
1068 fn wants_keep_alive(&self) -> bool {
1069 !matches!(self.keep_alive.status(), KA::Disabled)
1070 }
1071
1072 fn try_keep_alive<T: Http1Transaction>(&mut self) {
1073 match (&self.reading, &self.writing) {
1074 (&Reading::KeepAlive, &Writing::KeepAlive) => {
1075 if let KA::Busy = self.keep_alive.status() {
1076 self.idle::<T>();
1077 } else {
1078 trace!(
1079 "try_keep_alive({}): could keep-alive, but status = {:?}",
1080 T::LOG,
1081 self.keep_alive
1082 );
1083 self.close();
1084 }
1085 }
1086 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1087 self.close()
1088 }
1089 _ => (),
1090 }
1091 }
1092
1093 fn disable_keep_alive(&mut self) {
1094 self.keep_alive.disable()
1095 }
1096
1097 fn busy(&mut self) {
1098 if let KA::Disabled = self.keep_alive.status() {
1099 return;
1100 }
1101 self.keep_alive.busy();
1102 }
1103
1104 fn idle<T: Http1Transaction>(&mut self) {
1105 debug_assert!(!self.is_idle(), "State::idle() called while idle");
1106
1107 self.method = None;
1108 self.keep_alive.idle();
1109
1110 if !self.is_idle() {
1111 self.close();
1112 return;
1113 }
1114
1115 self.reading = Reading::Init;
1116 self.writing = Writing::Init;
1117
1118 if !T::should_read_first() {
1124 self.notify_read = true;
1125 }
1126 }
1127
1128 fn is_idle(&self) -> bool {
1129 matches!(self.keep_alive.status(), KA::Idle)
1130 }
1131
1132 fn is_read_closed(&self) -> bool {
1133 matches!(self.reading, Reading::Closed)
1134 }
1135
1136 fn is_write_closed(&self) -> bool {
1137 matches!(self.writing, Writing::Closed)
1138 }
1139
1140 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1141 let (tx, rx) = crate::upgrade::pending();
1142 self.upgrade = Some(tx);
1143 rx
1144 }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149 #[cfg(all(feature = "nightly", not(miri)))]
1150 #[bench]
1151 fn bench_read_head_short(b: &mut ::test::Bencher) {
1152 use super::*;
1153 use crate::common::io::Compat;
1154 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1155 let len = s.len();
1156 b.bytes = len as u64;
1157
1158 let io = Compat(tokio_test::io::Builder::new().build());
1160 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1161 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1162 conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1163
1164 let rt = tokio::runtime::Builder::new_current_thread()
1165 .enable_all()
1166 .build()
1167 .unwrap();
1168
1169 b.iter(|| {
1170 rt.block_on(futures_util::future::poll_fn(|cx| {
1171 match conn.poll_read_head(cx) {
1172 Poll::Ready(Some(Ok(x))) => {
1173 ::test::black_box(&x);
1174 let mut headers = x.0.headers;
1175 headers.clear();
1176 conn.state.cached_headers = Some(headers);
1177 }
1178 f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1179 }
1180
1181 conn.io.read_buf_mut().reserve(1);
1182 unsafe {
1183 conn.io.read_buf_mut().set_len(len);
1184 }
1185 conn.state.reading = Reading::Init;
1186 Poll::Ready(())
1187 }));
1188 });
1189 }
1190
1191 }