hyper/proto/h1/
conn.rs

1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31/// This handles a connection, which will have been established over an
32/// `Read + Write` (like a socket), and will likely include multiple
33/// `Transaction`s over HTTP.
34///
35/// The connection will determine when a message begins and ends as well as
36/// determine if this connection can be kept alive after the message,
37/// or if it is complete.
38pub(crate) struct Conn<I, B, T> {
39    io: Buffered<I, EncodedBuf<B>>,
40    state: State,
41    _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46    I: Read + Write + Unpin,
47    B: Buf,
48    T: Http1Transaction,
49{
50    pub(crate) fn new(io: I) -> Conn<I, B, T> {
51        Conn {
52            io: Buffered::new(io),
53            state: State {
54                allow_half_close: false,
55                cached_headers: None,
56                error: None,
57                keep_alive: KA::Busy,
58                method: None,
59                h1_parser_config: ParserConfig::default(),
60                h1_max_headers: None,
61                #[cfg(feature = "server")]
62                h1_header_read_timeout: None,
63                #[cfg(feature = "server")]
64                h1_header_read_timeout_fut: None,
65                #[cfg(feature = "server")]
66                h1_header_read_timeout_running: false,
67                #[cfg(feature = "server")]
68                date_header: true,
69                #[cfg(feature = "server")]
70                timer: Time::Empty,
71                preserve_header_case: false,
72                #[cfg(feature = "ffi")]
73                preserve_header_order: false,
74                title_case_headers: false,
75                h09_responses: false,
76                #[cfg(feature = "ffi")]
77                on_informational: None,
78                notify_read: false,
79                reading: Reading::Init,
80                writing: Writing::Init,
81                upgrade: None,
82                // We assume a modern world where the remote speaks HTTP/1.1.
83                // If they tell us otherwise, we'll downgrade in `read_head`.
84                version: Version::HTTP_11,
85                allow_trailer_fields: false,
86            },
87            _marker: PhantomData,
88        }
89    }
90
91    #[cfg(feature = "server")]
92    pub(crate) fn set_timer(&mut self, timer: Time) {
93        self.state.timer = timer;
94    }
95
96    #[cfg(feature = "server")]
97    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98        self.io.set_flush_pipeline(enabled);
99    }
100
101    pub(crate) fn set_write_strategy_queue(&mut self) {
102        self.io.set_write_strategy_queue();
103    }
104
105    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106        self.io.set_max_buf_size(max);
107    }
108
109    #[cfg(feature = "client")]
110    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111        self.io.set_read_buf_exact_size(sz);
112    }
113
114    pub(crate) fn set_write_strategy_flatten(&mut self) {
115        self.io.set_write_strategy_flatten();
116    }
117
118    #[cfg(feature = "client")]
119    pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
120        self.state.h1_parser_config = parser_config;
121    }
122
123    pub(crate) fn set_title_case_headers(&mut self) {
124        self.state.title_case_headers = true;
125    }
126
127    pub(crate) fn set_preserve_header_case(&mut self) {
128        self.state.preserve_header_case = true;
129    }
130
131    #[cfg(feature = "ffi")]
132    pub(crate) fn set_preserve_header_order(&mut self) {
133        self.state.preserve_header_order = true;
134    }
135
136    #[cfg(feature = "client")]
137    pub(crate) fn set_h09_responses(&mut self) {
138        self.state.h09_responses = true;
139    }
140
141    pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
142        self.state.h1_max_headers = Some(val);
143    }
144
145    #[cfg(feature = "server")]
146    pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
147        self.state.h1_header_read_timeout = Some(val);
148    }
149
150    #[cfg(feature = "server")]
151    pub(crate) fn set_allow_half_close(&mut self) {
152        self.state.allow_half_close = true;
153    }
154
155    #[cfg(feature = "server")]
156    pub(crate) fn disable_date_header(&mut self) {
157        self.state.date_header = false;
158    }
159
160    pub(crate) fn into_inner(self) -> (I, Bytes) {
161        self.io.into_inner()
162    }
163
164    pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
165        self.state.upgrade.take()
166    }
167
168    pub(crate) fn is_read_closed(&self) -> bool {
169        self.state.is_read_closed()
170    }
171
172    pub(crate) fn is_write_closed(&self) -> bool {
173        self.state.is_write_closed()
174    }
175
176    pub(crate) fn can_read_head(&self) -> bool {
177        if !matches!(self.state.reading, Reading::Init) {
178            return false;
179        }
180
181        if T::should_read_first() {
182            return true;
183        }
184
185        !matches!(self.state.writing, Writing::Init)
186    }
187
188    pub(crate) fn can_read_body(&self) -> bool {
189        matches!(
190            self.state.reading,
191            Reading::Body(..) | Reading::Continue(..)
192        )
193    }
194
195    #[cfg(feature = "server")]
196    pub(crate) fn has_initial_read_write_state(&self) -> bool {
197        matches!(self.state.reading, Reading::Init)
198            && matches!(self.state.writing, Writing::Init)
199            && self.io.read_buf().is_empty()
200    }
201
202    fn should_error_on_eof(&self) -> bool {
203        // If we're idle, it's probably just the connection closing gracefully.
204        T::should_error_on_parse_eof() && !self.state.is_idle()
205    }
206
207    fn has_h2_prefix(&self) -> bool {
208        let read_buf = self.io.read_buf();
209        read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
210    }
211
212    pub(super) fn poll_read_head(
213        &mut self,
214        cx: &mut Context<'_>,
215    ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
216        debug_assert!(self.can_read_head());
217        trace!("Conn::read_head");
218
219        #[cfg(feature = "server")]
220        if !self.state.h1_header_read_timeout_running {
221            if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
222                let deadline = Instant::now() + h1_header_read_timeout;
223                self.state.h1_header_read_timeout_running = true;
224                match self.state.h1_header_read_timeout_fut {
225                    Some(ref mut h1_header_read_timeout_fut) => {
226                        trace!("resetting h1 header read timeout timer");
227                        self.state.timer.reset(h1_header_read_timeout_fut, deadline);
228                    }
229                    None => {
230                        trace!("setting h1 header read timeout timer");
231                        self.state.h1_header_read_timeout_fut =
232                            Some(self.state.timer.sleep_until(deadline));
233                    }
234                }
235            }
236        }
237
238        let msg = match self.io.parse::<T>(
239            cx,
240            ParseContext {
241                cached_headers: &mut self.state.cached_headers,
242                req_method: &mut self.state.method,
243                h1_parser_config: self.state.h1_parser_config.clone(),
244                h1_max_headers: self.state.h1_max_headers,
245                preserve_header_case: self.state.preserve_header_case,
246                #[cfg(feature = "ffi")]
247                preserve_header_order: self.state.preserve_header_order,
248                h09_responses: self.state.h09_responses,
249                #[cfg(feature = "ffi")]
250                on_informational: &mut self.state.on_informational,
251            },
252        ) {
253            Poll::Ready(Ok(msg)) => msg,
254            Poll::Ready(Err(e)) => return self.on_read_head_error(e),
255            Poll::Pending => {
256                #[cfg(feature = "server")]
257                if self.state.h1_header_read_timeout_running {
258                    if let Some(ref mut h1_header_read_timeout_fut) =
259                        self.state.h1_header_read_timeout_fut
260                    {
261                        if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
262                            self.state.h1_header_read_timeout_running = false;
263
264                            warn!("read header from client timeout");
265                            return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
266                        }
267                    }
268                }
269
270                return Poll::Pending;
271            }
272        };
273
274        #[cfg(feature = "server")]
275        {
276            self.state.h1_header_read_timeout_running = false;
277            self.state.h1_header_read_timeout_fut = None;
278        }
279
280        // Note: don't deconstruct `msg` into local variables, it appears
281        // the optimizer doesn't remove the extra copies.
282
283        debug!("incoming body is {}", msg.decode);
284
285        // Prevent accepting HTTP/0.9 responses after the initial one, if any.
286        self.state.h09_responses = false;
287
288        // Drop any OnInformational callbacks, we're done there!
289        #[cfg(feature = "ffi")]
290        {
291            self.state.on_informational = None;
292        }
293
294        self.state.busy();
295        self.state.keep_alive &= msg.keep_alive;
296        self.state.version = msg.head.version;
297
298        let mut wants = if msg.wants_upgrade {
299            Wants::UPGRADE
300        } else {
301            Wants::EMPTY
302        };
303
304        if msg.decode == DecodedLength::ZERO {
305            if msg.expect_continue {
306                debug!("ignoring expect-continue since body is empty");
307            }
308            self.state.reading = Reading::KeepAlive;
309            if !T::should_read_first() {
310                self.try_keep_alive(cx);
311            }
312        } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
313            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
314            self.state.reading = Reading::Continue(Decoder::new(
315                msg.decode,
316                self.state.h1_max_headers,
317                h1_max_header_size,
318            ));
319            wants = wants.add(Wants::EXPECT);
320        } else {
321            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
322            self.state.reading = Reading::Body(Decoder::new(
323                msg.decode,
324                self.state.h1_max_headers,
325                h1_max_header_size,
326            ));
327        }
328
329        self.state.allow_trailer_fields = msg
330            .head
331            .headers
332            .get(TE)
333            .map_or(false, |te_header| te_header == "trailers");
334
335        Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
336    }
337
338    fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
339        // If we are currently waiting on a message, then an empty
340        // message should be reported as an error. If not, it is just
341        // the connection closing gracefully.
342        let must_error = self.should_error_on_eof();
343        self.close_read();
344        self.io.consume_leading_lines();
345        let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
346        if was_mid_parse || must_error {
347            // We check if the buf contains the h2 Preface
348            debug!(
349                "parse error ({}) with {} bytes",
350                e,
351                self.io.read_buf().len()
352            );
353            match self.on_parse_error(e) {
354                Ok(()) => Poll::Pending, // XXX: wat?
355                Err(e) => Poll::Ready(Some(Err(e))),
356            }
357        } else {
358            debug!("read eof");
359            self.close_write();
360            Poll::Ready(None)
361        }
362    }
363
364    pub(crate) fn poll_read_body(
365        &mut self,
366        cx: &mut Context<'_>,
367    ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
368        debug_assert!(self.can_read_body());
369
370        let (reading, ret) = match self.state.reading {
371            Reading::Body(ref mut decoder) => {
372                match ready!(decoder.decode(cx, &mut self.io)) {
373                    Ok(frame) => {
374                        if frame.is_data() {
375                            let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
376                            let (reading, maybe_frame) = if decoder.is_eof() {
377                                debug!("incoming body completed");
378                                (
379                                    Reading::KeepAlive,
380                                    if !slice.is_empty() {
381                                        Some(Ok(frame))
382                                    } else {
383                                        None
384                                    },
385                                )
386                            } else if slice.is_empty() {
387                                error!("incoming body unexpectedly ended");
388                                // This should be unreachable, since all 3 decoders
389                                // either set eof=true or return an Err when reading
390                                // an empty slice...
391                                (Reading::Closed, None)
392                            } else {
393                                return Poll::Ready(Some(Ok(frame)));
394                            };
395                            (reading, Poll::Ready(maybe_frame))
396                        } else if frame.is_trailers() {
397                            (Reading::Closed, Poll::Ready(Some(Ok(frame))))
398                        } else {
399                            trace!("discarding unknown frame");
400                            (Reading::Closed, Poll::Ready(None))
401                        }
402                    }
403                    Err(e) => {
404                        debug!("incoming body decode error: {}", e);
405                        (Reading::Closed, Poll::Ready(Some(Err(e))))
406                    }
407                }
408            }
409            Reading::Continue(ref decoder) => {
410                // Write the 100 Continue if not already responded...
411                if let Writing::Init = self.state.writing {
412                    trace!("automatically sending 100 Continue");
413                    let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
414                    self.io.headers_buf().extend_from_slice(cont);
415                }
416
417                // And now recurse once in the Reading::Body state...
418                self.state.reading = Reading::Body(decoder.clone());
419                return self.poll_read_body(cx);
420            }
421            _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
422        };
423
424        self.state.reading = reading;
425        self.try_keep_alive(cx);
426        ret
427    }
428
429    pub(crate) fn wants_read_again(&mut self) -> bool {
430        let ret = self.state.notify_read;
431        self.state.notify_read = false;
432        ret
433    }
434
435    pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
436        debug_assert!(!self.can_read_head() && !self.can_read_body());
437
438        if self.is_read_closed() {
439            Poll::Pending
440        } else if self.is_mid_message() {
441            self.mid_message_detect_eof(cx)
442        } else {
443            self.require_empty_read(cx)
444        }
445    }
446
447    fn is_mid_message(&self) -> bool {
448        !matches!(
449            (&self.state.reading, &self.state.writing),
450            (&Reading::Init, &Writing::Init)
451        )
452    }
453
454    // This will check to make sure the io object read is empty.
455    //
456    // This should only be called for Clients wanting to enter the idle
457    // state.
458    fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
459        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
460        debug_assert!(!self.is_mid_message());
461        debug_assert!(T::is_client());
462
463        if !self.io.read_buf().is_empty() {
464            debug!("received an unexpected {} bytes", self.io.read_buf().len());
465            return Poll::Ready(Err(crate::Error::new_unexpected_message()));
466        }
467
468        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
469
470        if num_read == 0 {
471            let ret = if self.should_error_on_eof() {
472                trace!("found unexpected EOF on busy connection: {:?}", self.state);
473                Poll::Ready(Err(crate::Error::new_incomplete()))
474            } else {
475                trace!("found EOF on idle connection, closing");
476                Poll::Ready(Ok(()))
477            };
478
479            // order is important: should_error needs state BEFORE close_read
480            self.state.close_read();
481            return ret;
482        }
483
484        debug!(
485            "received unexpected {} bytes on an idle connection",
486            num_read
487        );
488        Poll::Ready(Err(crate::Error::new_unexpected_message()))
489    }
490
491    fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
492        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
493        debug_assert!(self.is_mid_message());
494
495        if self.state.allow_half_close || !self.io.read_buf().is_empty() {
496            return Poll::Pending;
497        }
498
499        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
500
501        if num_read == 0 {
502            trace!("found unexpected EOF on busy connection: {:?}", self.state);
503            self.state.close_read();
504            Poll::Ready(Err(crate::Error::new_incomplete()))
505        } else {
506            Poll::Ready(Ok(()))
507        }
508    }
509
510    fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
511        debug_assert!(!self.state.is_read_closed());
512
513        let result = ready!(self.io.poll_read_from_io(cx));
514        Poll::Ready(result.map_err(|e| {
515            trace!(error = %e, "force_io_read; io error");
516            self.state.close();
517            e
518        }))
519    }
520
521    fn maybe_notify(&mut self, cx: &mut Context<'_>) {
522        // its possible that we returned NotReady from poll() without having
523        // exhausted the underlying Io. We would have done this when we
524        // determined we couldn't keep reading until we knew how writing
525        // would finish.
526
527        match self.state.reading {
528            Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
529                return
530            }
531            Reading::Init => (),
532        };
533
534        match self.state.writing {
535            Writing::Body(..) => return,
536            Writing::Init | Writing::KeepAlive | Writing::Closed => (),
537        }
538
539        if !self.io.is_read_blocked() {
540            if self.io.read_buf().is_empty() {
541                match self.io.poll_read_from_io(cx) {
542                    Poll::Ready(Ok(n)) => {
543                        if n == 0 {
544                            trace!("maybe_notify; read eof");
545                            if self.state.is_idle() {
546                                self.state.close();
547                            } else {
548                                self.close_read()
549                            }
550                            return;
551                        }
552                    }
553                    Poll::Pending => {
554                        trace!("maybe_notify; read_from_io blocked");
555                        return;
556                    }
557                    Poll::Ready(Err(e)) => {
558                        trace!("maybe_notify; read_from_io error: {}", e);
559                        self.state.close();
560                        self.state.error = Some(crate::Error::new_io(e));
561                    }
562                }
563            }
564            self.state.notify_read = true;
565        }
566    }
567
568    fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
569        self.state.try_keep_alive::<T>();
570        self.maybe_notify(cx);
571    }
572
573    pub(crate) fn can_write_head(&self) -> bool {
574        if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
575            return false;
576        }
577
578        match self.state.writing {
579            Writing::Init => self.io.can_headers_buf(),
580            _ => false,
581        }
582    }
583
584    pub(crate) fn can_write_body(&self) -> bool {
585        match self.state.writing {
586            Writing::Body(..) => true,
587            Writing::Init | Writing::KeepAlive | Writing::Closed => false,
588        }
589    }
590
591    pub(crate) fn can_buffer_body(&self) -> bool {
592        self.io.can_buffer()
593    }
594
595    pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
596        if let Some(encoder) = self.encode_head(head, body) {
597            self.state.writing = if !encoder.is_eof() {
598                Writing::Body(encoder)
599            } else if encoder.is_last() {
600                Writing::Closed
601            } else {
602                Writing::KeepAlive
603            };
604        }
605    }
606
607    fn encode_head(
608        &mut self,
609        mut head: MessageHead<T::Outgoing>,
610        body: Option<BodyLength>,
611    ) -> Option<Encoder> {
612        debug_assert!(self.can_write_head());
613
614        if !T::should_read_first() {
615            self.state.busy();
616        }
617
618        self.enforce_version(&mut head);
619
620        let buf = self.io.headers_buf();
621        match super::role::encode_headers::<T>(
622            Encode {
623                head: &mut head,
624                body,
625                #[cfg(feature = "server")]
626                keep_alive: self.state.wants_keep_alive(),
627                req_method: &mut self.state.method,
628                title_case_headers: self.state.title_case_headers,
629                #[cfg(feature = "server")]
630                date_header: self.state.date_header,
631            },
632            buf,
633        ) {
634            Ok(encoder) => {
635                debug_assert!(self.state.cached_headers.is_none());
636                debug_assert!(head.headers.is_empty());
637                self.state.cached_headers = Some(head.headers);
638
639                #[cfg(feature = "ffi")]
640                {
641                    self.state.on_informational =
642                        head.extensions.remove::<crate::ffi::OnInformational>();
643                }
644
645                Some(encoder)
646            }
647            Err(err) => {
648                self.state.error = Some(err);
649                self.state.writing = Writing::Closed;
650                None
651            }
652        }
653    }
654
655    // Fix keep-alive when Connection: keep-alive header is not present
656    fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
657        let outgoing_is_keep_alive = head
658            .headers
659            .get(CONNECTION)
660            .map_or(false, headers::connection_keep_alive);
661
662        if !outgoing_is_keep_alive {
663            match head.version {
664                // If response is version 1.0 and keep-alive is not present in the response,
665                // disable keep-alive so the server closes the connection
666                Version::HTTP_10 => self.state.disable_keep_alive(),
667                // If response is version 1.1 and keep-alive is wanted, add
668                // Connection: keep-alive header when not present
669                Version::HTTP_11 => {
670                    if self.state.wants_keep_alive() {
671                        head.headers
672                            .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
673                    }
674                }
675                _ => (),
676            }
677        }
678    }
679
680    // If we know the remote speaks an older version, we try to fix up any messages
681    // to work with our older peer.
682    fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
683        match self.state.version {
684            Version::HTTP_10 => {
685                // Fixes response or connection when keep-alive header is not present
686                self.fix_keep_alive(head);
687                // If the remote only knows HTTP/1.0, we should force ourselves
688                // to do only speak HTTP/1.0 as well.
689                head.version = Version::HTTP_10;
690            }
691            Version::HTTP_11 => {
692                if let KA::Disabled = self.state.keep_alive.status() {
693                    head.headers
694                        .insert(CONNECTION, HeaderValue::from_static("close"));
695                }
696            }
697            _ => (),
698        }
699        // If the remote speaks HTTP/1.1, then it *should* be fine with
700        // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
701        // the user's headers be.
702    }
703
704    pub(crate) fn write_body(&mut self, chunk: B) {
705        debug_assert!(self.can_write_body() && self.can_buffer_body());
706        // empty chunks should be discarded at Dispatcher level
707        debug_assert!(chunk.remaining() != 0);
708
709        let state = match self.state.writing {
710            Writing::Body(ref mut encoder) => {
711                self.io.buffer(encoder.encode(chunk));
712
713                if !encoder.is_eof() {
714                    return;
715                }
716
717                if encoder.is_last() {
718                    Writing::Closed
719                } else {
720                    Writing::KeepAlive
721                }
722            }
723            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
724        };
725
726        self.state.writing = state;
727    }
728
729    pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
730        if T::is_server() && !self.state.allow_trailer_fields {
731            debug!("trailers not allowed to be sent");
732            return;
733        }
734        debug_assert!(self.can_write_body() && self.can_buffer_body());
735
736        match self.state.writing {
737            Writing::Body(ref encoder) => {
738                if let Some(enc_buf) =
739                    encoder.encode_trailers(trailers, self.state.title_case_headers)
740                {
741                    self.io.buffer(enc_buf);
742
743                    self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
744                        Writing::Closed
745                    } else {
746                        Writing::KeepAlive
747                    };
748                }
749            }
750            _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
751        }
752    }
753
754    pub(crate) fn write_body_and_end(&mut self, chunk: B) {
755        debug_assert!(self.can_write_body() && self.can_buffer_body());
756        // empty chunks should be discarded at Dispatcher level
757        debug_assert!(chunk.remaining() != 0);
758
759        let state = match self.state.writing {
760            Writing::Body(ref encoder) => {
761                let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
762                if can_keep_alive {
763                    Writing::KeepAlive
764                } else {
765                    Writing::Closed
766                }
767            }
768            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
769        };
770
771        self.state.writing = state;
772    }
773
774    pub(crate) fn end_body(&mut self) -> crate::Result<()> {
775        debug_assert!(self.can_write_body());
776
777        let encoder = match self.state.writing {
778            Writing::Body(ref mut enc) => enc,
779            _ => return Ok(()),
780        };
781
782        // end of stream, that means we should try to eof
783        match encoder.end() {
784            Ok(end) => {
785                if let Some(end) = end {
786                    self.io.buffer(end);
787                }
788
789                self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
790                    Writing::Closed
791                } else {
792                    Writing::KeepAlive
793                };
794
795                Ok(())
796            }
797            Err(not_eof) => {
798                self.state.writing = Writing::Closed;
799                Err(crate::Error::new_body_write_aborted().with(not_eof))
800            }
801        }
802    }
803
804    // When we get a parse error, depending on what side we are, we might be able
805    // to write a response before closing the connection.
806    //
807    // - Client: there is nothing we can do
808    // - Server: if Response hasn't been written yet, we can send a 4xx response
809    fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
810        if let Writing::Init = self.state.writing {
811            if self.has_h2_prefix() {
812                return Err(crate::Error::new_version_h2());
813            }
814            if let Some(msg) = T::on_error(&err) {
815                // Drop the cached headers so as to not trigger a debug
816                // assert in `write_head`...
817                self.state.cached_headers.take();
818                self.write_head(msg, None);
819                self.state.error = Some(err);
820                return Ok(());
821            }
822        }
823
824        // fallback is pass the error back up
825        Err(err)
826    }
827
828    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
829        ready!(Pin::new(&mut self.io).poll_flush(cx))?;
830        self.try_keep_alive(cx);
831        trace!("flushed({}): {:?}", T::LOG, self.state);
832        Poll::Ready(Ok(()))
833    }
834
835    pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
836        match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
837            Ok(()) => {
838                trace!("shut down IO complete");
839                Poll::Ready(Ok(()))
840            }
841            Err(e) => {
842                debug!("error shutting down IO: {}", e);
843                Poll::Ready(Err(e))
844            }
845        }
846    }
847
848    /// If the read side can be cheaply drained, do so. Otherwise, close.
849    pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
850        if let Reading::Continue(ref decoder) = self.state.reading {
851            // skip sending the 100-continue
852            // just move forward to a read, in case a tiny body was included
853            self.state.reading = Reading::Body(decoder.clone());
854        }
855
856        let _ = self.poll_read_body(cx);
857
858        // If still in Reading::Body, just give up
859        match self.state.reading {
860            Reading::Init | Reading::KeepAlive => {
861                trace!("body drained")
862            }
863            _ => self.close_read(),
864        }
865    }
866
867    pub(crate) fn close_read(&mut self) {
868        self.state.close_read();
869    }
870
871    pub(crate) fn close_write(&mut self) {
872        self.state.close_write();
873    }
874
875    #[cfg(feature = "server")]
876    pub(crate) fn disable_keep_alive(&mut self) {
877        if self.state.is_idle() {
878            trace!("disable_keep_alive; closing idle connection");
879            self.state.close();
880        } else {
881            trace!("disable_keep_alive; in-progress connection");
882            self.state.disable_keep_alive();
883        }
884    }
885
886    pub(crate) fn take_error(&mut self) -> crate::Result<()> {
887        if let Some(err) = self.state.error.take() {
888            Err(err)
889        } else {
890            Ok(())
891        }
892    }
893
894    pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
895        trace!("{}: prepare possible HTTP upgrade", T::LOG);
896        self.state.prepare_upgrade()
897    }
898}
899
900impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
901    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902        f.debug_struct("Conn")
903            .field("state", &self.state)
904            .field("io", &self.io)
905            .finish()
906    }
907}
908
909// B and T are never pinned
910impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
911
912struct State {
913    allow_half_close: bool,
914    /// Re-usable HeaderMap to reduce allocating new ones.
915    cached_headers: Option<HeaderMap>,
916    /// If an error occurs when there wasn't a direct way to return it
917    /// back to the user, this is set.
918    error: Option<crate::Error>,
919    /// Current keep-alive status.
920    keep_alive: KA,
921    /// If mid-message, the HTTP Method that started it.
922    ///
923    /// This is used to know things such as if the message can include
924    /// a body or not.
925    method: Option<Method>,
926    h1_parser_config: ParserConfig,
927    h1_max_headers: Option<usize>,
928    #[cfg(feature = "server")]
929    h1_header_read_timeout: Option<Duration>,
930    #[cfg(feature = "server")]
931    h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
932    #[cfg(feature = "server")]
933    h1_header_read_timeout_running: bool,
934    #[cfg(feature = "server")]
935    date_header: bool,
936    #[cfg(feature = "server")]
937    timer: Time,
938    preserve_header_case: bool,
939    #[cfg(feature = "ffi")]
940    preserve_header_order: bool,
941    title_case_headers: bool,
942    h09_responses: bool,
943    /// If set, called with each 1xx informational response received for
944    /// the current request. MUST be unset after a non-1xx response is
945    /// received.
946    #[cfg(feature = "ffi")]
947    on_informational: Option<crate::ffi::OnInformational>,
948    /// Set to true when the Dispatcher should poll read operations
949    /// again. See the `maybe_notify` method for more.
950    notify_read: bool,
951    /// State of allowed reads
952    reading: Reading,
953    /// State of allowed writes
954    writing: Writing,
955    /// An expected pending HTTP upgrade.
956    upgrade: Option<crate::upgrade::Pending>,
957    /// Either HTTP/1.0 or 1.1 connection
958    version: Version,
959    /// Flag to track if trailer fields are allowed to be sent
960    allow_trailer_fields: bool,
961}
962
963#[derive(Debug)]
964enum Reading {
965    Init,
966    Continue(Decoder),
967    Body(Decoder),
968    KeepAlive,
969    Closed,
970}
971
972enum Writing {
973    Init,
974    Body(Encoder),
975    KeepAlive,
976    Closed,
977}
978
979impl fmt::Debug for State {
980    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
981        let mut builder = f.debug_struct("State");
982        builder
983            .field("reading", &self.reading)
984            .field("writing", &self.writing)
985            .field("keep_alive", &self.keep_alive);
986
987        // Only show error field if it's interesting...
988        if let Some(ref error) = self.error {
989            builder.field("error", error);
990        }
991
992        if self.allow_half_close {
993            builder.field("allow_half_close", &true);
994        }
995
996        // Purposefully leaving off other fields..
997
998        builder.finish()
999    }
1000}
1001
1002impl fmt::Debug for Writing {
1003    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1004        match *self {
1005            Writing::Init => f.write_str("Init"),
1006            Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
1007            Writing::KeepAlive => f.write_str("KeepAlive"),
1008            Writing::Closed => f.write_str("Closed"),
1009        }
1010    }
1011}
1012
1013impl std::ops::BitAndAssign<bool> for KA {
1014    fn bitand_assign(&mut self, enabled: bool) {
1015        if !enabled {
1016            trace!("remote disabling keep-alive");
1017            *self = KA::Disabled;
1018        }
1019    }
1020}
1021
1022#[derive(Clone, Copy, Debug, Default)]
1023enum KA {
1024    Idle,
1025    #[default]
1026    Busy,
1027    Disabled,
1028}
1029
1030impl KA {
1031    fn idle(&mut self) {
1032        *self = KA::Idle;
1033    }
1034
1035    fn busy(&mut self) {
1036        *self = KA::Busy;
1037    }
1038
1039    fn disable(&mut self) {
1040        *self = KA::Disabled;
1041    }
1042
1043    fn status(&self) -> KA {
1044        *self
1045    }
1046}
1047
1048impl State {
1049    fn close(&mut self) {
1050        trace!("State::close()");
1051        self.reading = Reading::Closed;
1052        self.writing = Writing::Closed;
1053        self.keep_alive.disable();
1054    }
1055
1056    fn close_read(&mut self) {
1057        trace!("State::close_read()");
1058        self.reading = Reading::Closed;
1059        self.keep_alive.disable();
1060    }
1061
1062    fn close_write(&mut self) {
1063        trace!("State::close_write()");
1064        self.writing = Writing::Closed;
1065        self.keep_alive.disable();
1066    }
1067
1068    fn wants_keep_alive(&self) -> bool {
1069        !matches!(self.keep_alive.status(), KA::Disabled)
1070    }
1071
1072    fn try_keep_alive<T: Http1Transaction>(&mut self) {
1073        match (&self.reading, &self.writing) {
1074            (&Reading::KeepAlive, &Writing::KeepAlive) => {
1075                if let KA::Busy = self.keep_alive.status() {
1076                    self.idle::<T>();
1077                } else {
1078                    trace!(
1079                        "try_keep_alive({}): could keep-alive, but status = {:?}",
1080                        T::LOG,
1081                        self.keep_alive
1082                    );
1083                    self.close();
1084                }
1085            }
1086            (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1087                self.close()
1088            }
1089            _ => (),
1090        }
1091    }
1092
1093    fn disable_keep_alive(&mut self) {
1094        self.keep_alive.disable()
1095    }
1096
1097    fn busy(&mut self) {
1098        if let KA::Disabled = self.keep_alive.status() {
1099            return;
1100        }
1101        self.keep_alive.busy();
1102    }
1103
1104    fn idle<T: Http1Transaction>(&mut self) {
1105        debug_assert!(!self.is_idle(), "State::idle() called while idle");
1106
1107        self.method = None;
1108        self.keep_alive.idle();
1109
1110        if !self.is_idle() {
1111            self.close();
1112            return;
1113        }
1114
1115        self.reading = Reading::Init;
1116        self.writing = Writing::Init;
1117
1118        // !T::should_read_first() means Client.
1119        //
1120        // If Client connection has just gone idle, the Dispatcher
1121        // should try the poll loop one more time, so as to poll the
1122        // pending requests stream.
1123        if !T::should_read_first() {
1124            self.notify_read = true;
1125        }
1126    }
1127
1128    fn is_idle(&self) -> bool {
1129        matches!(self.keep_alive.status(), KA::Idle)
1130    }
1131
1132    fn is_read_closed(&self) -> bool {
1133        matches!(self.reading, Reading::Closed)
1134    }
1135
1136    fn is_write_closed(&self) -> bool {
1137        matches!(self.writing, Writing::Closed)
1138    }
1139
1140    fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1141        let (tx, rx) = crate::upgrade::pending();
1142        self.upgrade = Some(tx);
1143        rx
1144    }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149    #[cfg(all(feature = "nightly", not(miri)))]
1150    #[bench]
1151    fn bench_read_head_short(b: &mut ::test::Bencher) {
1152        use super::*;
1153        use crate::common::io::Compat;
1154        let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1155        let len = s.len();
1156        b.bytes = len as u64;
1157
1158        // an empty IO, we'll be skipping and using the read buffer anyways
1159        let io = Compat(tokio_test::io::Builder::new().build());
1160        let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1161        *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1162        conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1163
1164        let rt = tokio::runtime::Builder::new_current_thread()
1165            .enable_all()
1166            .build()
1167            .unwrap();
1168
1169        b.iter(|| {
1170            rt.block_on(futures_util::future::poll_fn(|cx| {
1171                match conn.poll_read_head(cx) {
1172                    Poll::Ready(Some(Ok(x))) => {
1173                        ::test::black_box(&x);
1174                        let mut headers = x.0.headers;
1175                        headers.clear();
1176                        conn.state.cached_headers = Some(headers);
1177                    }
1178                    f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1179                }
1180
1181                conn.io.read_buf_mut().reserve(1);
1182                unsafe {
1183                    conn.io.read_buf_mut().set_len(len);
1184                }
1185                conn.state.reading = Reading::Init;
1186                Poll::Ready(())
1187            }));
1188        });
1189    }
1190
1191    /*
1192    //TODO: rewrite these using dispatch... someday...
1193    use futures::{Async, Future, Stream, Sink};
1194    use futures::future;
1195
1196    use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1197    use super::super::Encoder;
1198    use mock::AsyncIo;
1199
1200    use super::{Conn, Decoder, Reading, Writing};
1201    use ::uri::Uri;
1202
1203    use std::str::FromStr;
1204
1205    #[test]
1206    fn test_conn_init_read() {
1207        let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1208        let len = good_message.len();
1209        let io = AsyncIo::new_buf(good_message, len);
1210        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1211
1212        match conn.poll().unwrap() {
1213            Async::Ready(Some(Frame::Message { message, body: false })) => {
1214                assert_eq!(message, MessageHead {
1215                    subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1216                    .. MessageHead::default()
1217                })
1218            },
1219            f => panic!("frame is not Frame::Message: {:?}", f)
1220        }
1221    }
1222
1223    #[test]
1224    fn test_conn_parse_partial() {
1225        let _: Result<(), ()> = future::lazy(|| {
1226            let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1227            let io = AsyncIo::new_buf(good_message, 10);
1228            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1229            assert!(conn.poll().unwrap().is_not_ready());
1230            conn.io.io_mut().block_in(50);
1231            let async = conn.poll().unwrap();
1232            assert!(async.is_ready());
1233            match async {
1234                Async::Ready(Some(Frame::Message { .. })) => (),
1235                f => panic!("frame is not Message: {:?}", f),
1236            }
1237            Ok(())
1238        }).wait();
1239    }
1240
1241    #[test]
1242    fn test_conn_init_read_eof_idle() {
1243        let io = AsyncIo::new_buf(vec![], 1);
1244        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1245        conn.state.idle();
1246
1247        match conn.poll().unwrap() {
1248            Async::Ready(None) => {},
1249            other => panic!("frame is not None: {:?}", other)
1250        }
1251    }
1252
1253    #[test]
1254    fn test_conn_init_read_eof_idle_partial_parse() {
1255        let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1256        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1257        conn.state.idle();
1258
1259        match conn.poll() {
1260            Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1261            other => panic!("unexpected frame: {:?}", other)
1262        }
1263    }
1264
1265    #[test]
1266    fn test_conn_init_read_eof_busy() {
1267        let _: Result<(), ()> = future::lazy(|| {
1268            // server ignores
1269            let io = AsyncIo::new_eof();
1270            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1271            conn.state.busy();
1272
1273            match conn.poll().unwrap() {
1274                Async::Ready(None) => {},
1275                other => panic!("unexpected frame: {:?}", other)
1276            }
1277
1278            // client
1279            let io = AsyncIo::new_eof();
1280            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1281            conn.state.busy();
1282
1283            match conn.poll() {
1284                Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1285                other => panic!("unexpected frame: {:?}", other)
1286            }
1287            Ok(())
1288        }).wait();
1289    }
1290
1291    #[test]
1292    fn test_conn_body_finish_read_eof() {
1293        let _: Result<(), ()> = future::lazy(|| {
1294            let io = AsyncIo::new_eof();
1295            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1296            conn.state.busy();
1297            conn.state.writing = Writing::KeepAlive;
1298            conn.state.reading = Reading::Body(Decoder::length(0));
1299
1300            match conn.poll() {
1301                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1302                other => panic!("unexpected frame: {:?}", other)
1303            }
1304
1305            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1306            // the conn eof in this case is perfectly fine
1307
1308            match conn.poll() {
1309                Ok(Async::Ready(None)) => (),
1310                other => panic!("unexpected frame: {:?}", other)
1311            }
1312            Ok(())
1313        }).wait();
1314    }
1315
1316    #[test]
1317    fn test_conn_message_empty_body_read_eof() {
1318        let _: Result<(), ()> = future::lazy(|| {
1319            let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1320            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1321            conn.state.busy();
1322            conn.state.writing = Writing::KeepAlive;
1323
1324            match conn.poll() {
1325                Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1326                other => panic!("unexpected frame: {:?}", other)
1327            }
1328
1329            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1330            // the conn eof in this case is perfectly fine
1331
1332            match conn.poll() {
1333                Ok(Async::Ready(None)) => (),
1334                other => panic!("unexpected frame: {:?}", other)
1335            }
1336            Ok(())
1337        }).wait();
1338    }
1339
1340    #[test]
1341    fn test_conn_read_body_end() {
1342        let _: Result<(), ()> = future::lazy(|| {
1343            let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1344            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1345            conn.state.busy();
1346
1347            match conn.poll() {
1348                Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1349                other => panic!("unexpected frame: {:?}", other)
1350            }
1351
1352            match conn.poll() {
1353                Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1354                other => panic!("unexpected frame: {:?}", other)
1355            }
1356
1357            // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1358            match conn.poll() {
1359                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1360                other => panic!("unexpected frame: {:?}", other)
1361            }
1362
1363            match conn.poll() {
1364                Ok(Async::NotReady) => (),
1365                other => panic!("unexpected frame: {:?}", other)
1366            }
1367            Ok(())
1368        }).wait();
1369    }
1370
1371    #[test]
1372    fn test_conn_closed_read() {
1373        let io = AsyncIo::new_buf(vec![], 0);
1374        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1375        conn.state.close();
1376
1377        match conn.poll().unwrap() {
1378            Async::Ready(None) => {},
1379            other => panic!("frame is not None: {:?}", other)
1380        }
1381    }
1382
1383    #[test]
1384    fn test_conn_body_write_length() {
1385        let _ = pretty_env_logger::try_init();
1386        let _: Result<(), ()> = future::lazy(|| {
1387            let io = AsyncIo::new_buf(vec![], 0);
1388            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1389            let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1390            conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1391
1392            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1393            assert!(!conn.can_buffer_body());
1394
1395            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1396
1397            conn.io.io_mut().block_in(1024 * 3);
1398            assert!(conn.poll_complete().unwrap().is_not_ready());
1399            conn.io.io_mut().block_in(1024 * 3);
1400            assert!(conn.poll_complete().unwrap().is_not_ready());
1401            conn.io.io_mut().block_in(max * 2);
1402            assert!(conn.poll_complete().unwrap().is_ready());
1403
1404            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1405            Ok(())
1406        }).wait();
1407    }
1408
1409    #[test]
1410    fn test_conn_body_write_chunked() {
1411        let _: Result<(), ()> = future::lazy(|| {
1412            let io = AsyncIo::new_buf(vec![], 4096);
1413            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1414            conn.state.writing = Writing::Body(Encoder::chunked());
1415
1416            assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1417            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1418            Ok(())
1419        }).wait();
1420    }
1421
1422    #[test]
1423    fn test_conn_body_flush() {
1424        let _: Result<(), ()> = future::lazy(|| {
1425            let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1426            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1427            conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1428            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1429            assert!(!conn.can_buffer_body());
1430            conn.io.io_mut().block_in(1024 * 1024 * 5);
1431            assert!(conn.poll_complete().unwrap().is_ready());
1432            assert!(conn.can_buffer_body());
1433            assert!(conn.io.io_mut().flushed());
1434
1435            Ok(())
1436        }).wait();
1437    }
1438
1439    #[test]
1440    fn test_conn_parking() {
1441        use std::sync::Arc;
1442        use futures::executor::Notify;
1443        use futures::executor::NotifyHandle;
1444
1445        struct Car {
1446            permit: bool,
1447        }
1448        impl Notify for Car {
1449            fn notify(&self, _id: usize) {
1450                assert!(self.permit, "unparked without permit");
1451            }
1452        }
1453
1454        fn car(permit: bool) -> NotifyHandle {
1455            Arc::new(Car {
1456                permit: permit,
1457            }).into()
1458        }
1459
1460        // test that once writing is done, unparks
1461        let f = future::lazy(|| {
1462            let io = AsyncIo::new_buf(vec![], 4096);
1463            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1464            conn.state.reading = Reading::KeepAlive;
1465            assert!(conn.poll().unwrap().is_not_ready());
1466
1467            conn.state.writing = Writing::KeepAlive;
1468            assert!(conn.poll_complete().unwrap().is_ready());
1469            Ok::<(), ()>(())
1470        });
1471        ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1472
1473
1474        // test that flushing when not waiting on read doesn't unpark
1475        let f = future::lazy(|| {
1476            let io = AsyncIo::new_buf(vec![], 4096);
1477            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1478            conn.state.writing = Writing::KeepAlive;
1479            assert!(conn.poll_complete().unwrap().is_ready());
1480            Ok::<(), ()>(())
1481        });
1482        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1483
1484
1485        // test that flushing and writing isn't done doesn't unpark
1486        let f = future::lazy(|| {
1487            let io = AsyncIo::new_buf(vec![], 4096);
1488            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1489            conn.state.reading = Reading::KeepAlive;
1490            assert!(conn.poll().unwrap().is_not_ready());
1491            conn.state.writing = Writing::Body(Encoder::length(5_000));
1492            assert!(conn.poll_complete().unwrap().is_ready());
1493            Ok::<(), ()>(())
1494        });
1495        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1496    }
1497
1498    #[test]
1499    fn test_conn_closed_write() {
1500        let io = AsyncIo::new_buf(vec![], 0);
1501        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1502        conn.state.close();
1503
1504        match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1505            Err(_e) => {},
1506            other => panic!("did not return Err: {:?}", other)
1507        }
1508
1509        assert!(conn.state.is_write_closed());
1510    }
1511
1512    #[test]
1513    fn test_conn_write_empty_chunk() {
1514        let io = AsyncIo::new_buf(vec![], 0);
1515        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1516        conn.state.writing = Writing::KeepAlive;
1517
1518        assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1519        assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1520        conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1521    }
1522    */
1523}