hyper_util/client/legacy/
client.rs

1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
29
30type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
31
32/// A Client to make outgoing HTTP requests.
33///
34/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
35/// underlying connection pool will be reused.
36#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
37pub struct Client<C, B> {
38    config: Config,
39    connector: C,
40    exec: Exec,
41    #[cfg(feature = "http1")]
42    h1_builder: hyper::client::conn::http1::Builder,
43    #[cfg(feature = "http2")]
44    h2_builder: hyper::client::conn::http2::Builder<Exec>,
45    pool: pool::Pool<PoolClient<B>, PoolKey>,
46}
47
48#[derive(Clone, Copy, Debug)]
49struct Config {
50    retry_canceled_requests: bool,
51    set_host: bool,
52    ver: Ver,
53}
54
55/// Client errors
56pub struct Error {
57    kind: ErrorKind,
58    source: Option<Box<dyn StdError + Send + Sync>>,
59    #[cfg(any(feature = "http1", feature = "http2"))]
60    connect_info: Option<Connected>,
61}
62
63#[derive(Debug)]
64enum ErrorKind {
65    Canceled,
66    ChannelClosed,
67    Connect,
68    UserUnsupportedRequestMethod,
69    UserUnsupportedVersion,
70    UserAbsoluteUriRequired,
71    SendRequest,
72}
73
74macro_rules! e {
75    ($kind:ident) => {
76        Error {
77            kind: ErrorKind::$kind,
78            source: None,
79            connect_info: None,
80        }
81    };
82    ($kind:ident, $src:expr) => {
83        Error {
84            kind: ErrorKind::$kind,
85            source: Some($src.into()),
86            connect_info: None,
87        }
88    };
89}
90
91// We might change this... :shrug:
92type PoolKey = (http::uri::Scheme, http::uri::Authority);
93
94enum TrySendError<B> {
95    Retryable {
96        error: Error,
97        req: Request<B>,
98        connection_reused: bool,
99    },
100    Nope(Error),
101}
102
103/// A `Future` that will resolve to an HTTP Response.
104///
105/// This is returned by `Client::request` (and `Client::get`).
106#[must_use = "futures do nothing unless polled"]
107pub struct ResponseFuture {
108    inner: SyncWrapper<
109        Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
110    >,
111}
112
113// ===== impl Client =====
114
115impl Client<(), ()> {
116    /// Create a builder to configure a new `Client`.
117    ///
118    /// # Example
119    ///
120    /// ```
121    /// # #[cfg(feature = "tokio")]
122    /// # fn run () {
123    /// use std::time::Duration;
124    /// use hyper_util::client::legacy::Client;
125    /// use hyper_util::rt::TokioExecutor;
126    ///
127    /// let client = Client::builder(TokioExecutor::new())
128    ///     .pool_idle_timeout(Duration::from_secs(30))
129    ///     .http2_only(true)
130    ///     .build_http();
131    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
132    /// # drop(infer);
133    /// # }
134    /// # fn main() {}
135    /// ```
136    pub fn builder<E>(executor: E) -> Builder
137    where
138        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
139    {
140        Builder::new(executor)
141    }
142}
143
144impl<C, B> Client<C, B>
145where
146    C: Connect + Clone + Send + Sync + 'static,
147    B: Body + Send + 'static + Unpin,
148    B::Data: Send,
149    B::Error: Into<Box<dyn StdError + Send + Sync>>,
150{
151    /// Send a `GET` request to the supplied `Uri`.
152    ///
153    /// # Note
154    ///
155    /// This requires that the `Body` type have a `Default` implementation.
156    /// It *should* return an "empty" version of itself, such that
157    /// `Body::is_end_stream` is `true`.
158    ///
159    /// # Example
160    ///
161    /// ```
162    /// # #[cfg(feature = "tokio")]
163    /// # fn run () {
164    /// use hyper::Uri;
165    /// use hyper_util::client::legacy::Client;
166    /// use hyper_util::rt::TokioExecutor;
167    /// use bytes::Bytes;
168    /// use http_body_util::Full;
169    ///
170    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
171    ///
172    /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
173    /// # }
174    /// # fn main() {}
175    /// ```
176    pub fn get(&self, uri: Uri) -> ResponseFuture
177    where
178        B: Default,
179    {
180        let body = B::default();
181        if !body.is_end_stream() {
182            warn!("default Body used for get() does not return true for is_end_stream");
183        }
184
185        let mut req = Request::new(body);
186        *req.uri_mut() = uri;
187        self.request(req)
188    }
189
190    /// Send a constructed `Request` using this `Client`.
191    ///
192    /// # Example
193    ///
194    /// ```
195    /// # #[cfg(feature = "tokio")]
196    /// # fn run () {
197    /// use hyper::{Method, Request};
198    /// use hyper_util::client::legacy::Client;
199    /// use http_body_util::Full;
200    /// use hyper_util::rt::TokioExecutor;
201    /// use bytes::Bytes;
202    ///
203    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
204    ///
205    /// let req: Request<Full<Bytes>> = Request::builder()
206    ///     .method(Method::POST)
207    ///     .uri("http://httpbin.org/post")
208    ///     .body(Full::from("Hallo!"))
209    ///     .expect("request builder");
210    ///
211    /// let future = client.request(req);
212    /// # }
213    /// # fn main() {}
214    /// ```
215    pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
216        let is_http_connect = req.method() == Method::CONNECT;
217        match req.version() {
218            Version::HTTP_11 => (),
219            Version::HTTP_10 => {
220                if is_http_connect {
221                    warn!("CONNECT is not allowed for HTTP/1.0");
222                    return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
223                }
224            }
225            Version::HTTP_2 => (),
226            // completely unsupported HTTP version (like HTTP/0.9)!
227            other => return ResponseFuture::error_version(other),
228        };
229
230        let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
231            Ok(s) => s,
232            Err(err) => {
233                return ResponseFuture::new(future::err(err));
234            }
235        };
236
237        ResponseFuture::new(self.clone().send_request(req, pool_key))
238    }
239
240    async fn send_request(
241        self,
242        mut req: Request<B>,
243        pool_key: PoolKey,
244    ) -> Result<Response<hyper::body::Incoming>, Error> {
245        let uri = req.uri().clone();
246
247        loop {
248            req = match self.try_send_request(req, pool_key.clone()).await {
249                Ok(resp) => return Ok(resp),
250                Err(TrySendError::Nope(err)) => return Err(err),
251                Err(TrySendError::Retryable {
252                    mut req,
253                    error,
254                    connection_reused,
255                }) => {
256                    if !self.config.retry_canceled_requests || !connection_reused {
257                        // if client disabled, don't retry
258                        // a fresh connection means we definitely can't retry
259                        return Err(error);
260                    }
261
262                    trace!(
263                        "unstarted request canceled, trying again (reason={:?})",
264                        error
265                    );
266                    *req.uri_mut() = uri.clone();
267                    req
268                }
269            }
270        }
271    }
272
273    async fn try_send_request(
274        &self,
275        mut req: Request<B>,
276        pool_key: PoolKey,
277    ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
278        let mut pooled = self
279            .connection_for(pool_key)
280            .await
281            // `connection_for` already retries checkout errors, so if
282            // it returns an error, there's not much else to retry
283            .map_err(TrySendError::Nope)?;
284
285        req.extensions_mut()
286            .get_mut::<CaptureConnectionExtension>()
287            .map(|conn| conn.set(&pooled.conn_info));
288
289        if pooled.is_http1() {
290            if req.version() == Version::HTTP_2 {
291                warn!("Connection is HTTP/1, but request requires HTTP/2");
292                return Err(TrySendError::Nope(
293                    e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
294                ));
295            }
296
297            if self.config.set_host {
298                let uri = req.uri().clone();
299                req.headers_mut().entry(HOST).or_insert_with(|| {
300                    let hostname = uri.host().expect("authority implies host");
301                    if let Some(port) = get_non_default_port(&uri) {
302                        let s = format!("{}:{}", hostname, port);
303                        HeaderValue::from_str(&s)
304                    } else {
305                        HeaderValue::from_str(hostname)
306                    }
307                    .expect("uri host is valid header value")
308                });
309            }
310
311            // CONNECT always sends authority-form, so check it first...
312            if req.method() == Method::CONNECT {
313                authority_form(req.uri_mut());
314            } else if pooled.conn_info.is_proxied {
315                absolute_form(req.uri_mut());
316            } else {
317                origin_form(req.uri_mut());
318            }
319        } else if req.method() == Method::CONNECT {
320            authority_form(req.uri_mut());
321        }
322
323        let mut res = match pooled.try_send_request(req).await {
324            Ok(res) => res,
325            Err(mut err) => {
326                return if let Some(req) = err.take_message() {
327                    Err(TrySendError::Retryable {
328                        connection_reused: pooled.is_reused(),
329                        error: e!(Canceled, err.into_error())
330                            .with_connect_info(pooled.conn_info.clone()),
331                        req,
332                    })
333                } else {
334                    Err(TrySendError::Nope(
335                        e!(SendRequest, err.into_error())
336                            .with_connect_info(pooled.conn_info.clone()),
337                    ))
338                }
339            }
340        };
341
342        // If the Connector included 'extra' info, add to Response...
343        if let Some(extra) = &pooled.conn_info.extra {
344            extra.set(res.extensions_mut());
345        }
346
347        // As of futures@0.1.21, there is a race condition in the mpsc
348        // channel, such that sending when the receiver is closing can
349        // result in the message being stuck inside the queue. It won't
350        // ever notify until the Sender side is dropped.
351        //
352        // To counteract this, we must check if our senders 'want' channel
353        // has been closed after having tried to send. If so, error out...
354        if pooled.is_closed() {
355            return Ok(res);
356        }
357
358        // If pooled is HTTP/2, we can toss this reference immediately.
359        //
360        // when pooled is dropped, it will try to insert back into the
361        // pool. To delay that, spawn a future that completes once the
362        // sender is ready again.
363        //
364        // This *should* only be once the related `Connection` has polled
365        // for a new request to start.
366        //
367        // It won't be ready if there is a body to stream.
368        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
369            drop(pooled);
370        } else if !res.body().is_end_stream() {
371            //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
372            //res.body_mut().delayed_eof(delayed_rx);
373            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
374                // At this point, `pooled` is dropped, and had a chance
375                // to insert into the pool (if conn was idle)
376                //drop(delayed_tx);
377            });
378
379            self.exec.execute(on_idle);
380        } else {
381            // There's no body to delay, but the connection isn't
382            // ready yet. Only re-insert when it's ready
383            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
384
385            self.exec.execute(on_idle);
386        }
387
388        Ok(res)
389    }
390
391    async fn connection_for(
392        &self,
393        pool_key: PoolKey,
394    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
395        loop {
396            match self.one_connection_for(pool_key.clone()).await {
397                Ok(pooled) => return Ok(pooled),
398                Err(ClientConnectError::Normal(err)) => return Err(err),
399                Err(ClientConnectError::CheckoutIsClosed(reason)) => {
400                    if !self.config.retry_canceled_requests {
401                        return Err(e!(Connect, reason));
402                    }
403
404                    trace!(
405                        "unstarted request canceled, trying again (reason={:?})",
406                        reason,
407                    );
408                    continue;
409                }
410            };
411        }
412    }
413
414    async fn one_connection_for(
415        &self,
416        pool_key: PoolKey,
417    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
418        // Return a single connection if pooling is not enabled
419        if !self.pool.is_enabled() {
420            return self
421                .connect_to(pool_key)
422                .await
423                .map_err(ClientConnectError::Normal);
424        }
425
426        // This actually races 2 different futures to try to get a ready
427        // connection the fastest, and to reduce connection churn.
428        //
429        // - If the pool has an idle connection waiting, that's used
430        //   immediately.
431        // - Otherwise, the Connector is asked to start connecting to
432        //   the destination Uri.
433        // - Meanwhile, the pool Checkout is watching to see if any other
434        //   request finishes and tries to insert an idle connection.
435        // - If a new connection is started, but the Checkout wins after
436        //   (an idle connection became available first), the started
437        //   connection future is spawned into the runtime to complete,
438        //   and then be inserted into the pool as an idle connection.
439        let checkout = self.pool.checkout(pool_key.clone());
440        let connect = self.connect_to(pool_key);
441        let is_ver_h2 = self.config.ver == Ver::Http2;
442
443        // The order of the `select` is depended on below...
444
445        match future::select(checkout, connect).await {
446            // Checkout won, connect future may have been started or not.
447            //
448            // If it has, let it finish and insert back into the pool,
449            // so as to not waste the socket...
450            Either::Left((Ok(checked_out), connecting)) => {
451                // This depends on the `select` above having the correct
452                // order, such that if the checkout future were ready
453                // immediately, the connect future will never have been
454                // started.
455                //
456                // If it *wasn't* ready yet, then the connect future will
457                // have been started...
458                if connecting.started() {
459                    let bg = connecting
460                        .map_err(|err| {
461                            trace!("background connect error: {}", err);
462                        })
463                        .map(|_pooled| {
464                            // dropping here should just place it in
465                            // the Pool for us...
466                        });
467                    // An execute error here isn't important, we're just trying
468                    // to prevent a waste of a socket...
469                    self.exec.execute(bg);
470                }
471                Ok(checked_out)
472            }
473            // Connect won, checkout can just be dropped.
474            Either::Right((Ok(connected), _checkout)) => Ok(connected),
475            // Either checkout or connect could get canceled:
476            //
477            // 1. Connect is canceled if this is HTTP/2 and there is
478            //    an outstanding HTTP/2 connecting task.
479            // 2. Checkout is canceled if the pool cannot deliver an
480            //    idle connection reliably.
481            //
482            // In both cases, we should just wait for the other future.
483            Either::Left((Err(err), connecting)) => {
484                if err.is_canceled() {
485                    connecting.await.map_err(ClientConnectError::Normal)
486                } else {
487                    Err(ClientConnectError::Normal(e!(Connect, err)))
488                }
489            }
490            Either::Right((Err(err), checkout)) => {
491                if err.is_canceled() {
492                    checkout.await.map_err(move |err| {
493                        if is_ver_h2 && err.is_canceled() {
494                            ClientConnectError::CheckoutIsClosed(err)
495                        } else {
496                            ClientConnectError::Normal(e!(Connect, err))
497                        }
498                    })
499                } else {
500                    Err(ClientConnectError::Normal(err))
501                }
502            }
503        }
504    }
505
506    #[cfg(any(feature = "http1", feature = "http2"))]
507    fn connect_to(
508        &self,
509        pool_key: PoolKey,
510    ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
511    {
512        let executor = self.exec.clone();
513        let pool = self.pool.clone();
514        #[cfg(feature = "http1")]
515        let h1_builder = self.h1_builder.clone();
516        #[cfg(feature = "http2")]
517        let h2_builder = self.h2_builder.clone();
518        let ver = self.config.ver;
519        let is_ver_h2 = ver == Ver::Http2;
520        let connector = self.connector.clone();
521        let dst = domain_as_uri(pool_key.clone());
522        hyper_lazy(move || {
523            // Try to take a "connecting lock".
524            //
525            // If the pool_key is for HTTP/2, and there is already a
526            // connection being established, then this can't take a
527            // second lock. The "connect_to" future is Canceled.
528            let connecting = match pool.connecting(&pool_key, ver) {
529                Some(lock) => lock,
530                None => {
531                    let canceled = e!(Canceled);
532                    // TODO
533                    //crate::Error::new_canceled().with("HTTP/2 connection in progress");
534                    return Either::Right(future::err(canceled));
535                }
536            };
537            Either::Left(
538                connector
539                    .connect(super::connect::sealed::Internal, dst)
540                    .map_err(|src| e!(Connect, src))
541                    .and_then(move |io| {
542                        let connected = io.connected();
543                        // If ALPN is h2 and we aren't http2_only already,
544                        // then we need to convert our pool checkout into
545                        // a single HTTP2 one.
546                        let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
547                            match connecting.alpn_h2(&pool) {
548                                Some(lock) => {
549                                    trace!("ALPN negotiated h2, updating pool");
550                                    lock
551                                }
552                                None => {
553                                    // Another connection has already upgraded,
554                                    // the pool checkout should finish up for us.
555                                    let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
556                                    return Either::Right(future::err(canceled));
557                                }
558                            }
559                        } else {
560                            connecting
561                        };
562
563                        #[cfg_attr(not(feature = "http2"), allow(unused))]
564                        let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
565
566                        Either::Left(Box::pin(async move {
567                            let tx = if is_h2 {
568                                #[cfg(feature = "http2")] {
569                                    let (mut tx, conn) =
570                                        h2_builder.handshake(io).await.map_err(Error::tx)?;
571
572                                    trace!(
573                                        "http2 handshake complete, spawning background dispatcher task"
574                                    );
575                                    executor.execute(
576                                        conn.map_err(|e| debug!("client connection error: {}", e))
577                                            .map(|_| ()),
578                                    );
579
580                                    // Wait for 'conn' to ready up before we
581                                    // declare this tx as usable
582                                    tx.ready().await.map_err(Error::tx)?;
583                                    PoolTx::Http2(tx)
584                                }
585                                #[cfg(not(feature = "http2"))]
586                                panic!("http2 feature is not enabled");
587                            } else {
588                                #[cfg(feature = "http1")] {
589                                    let (mut tx, conn) =
590                                        h1_builder.handshake(io).await.map_err(Error::tx)?;
591
592                                    trace!(
593                                        "http1 handshake complete, spawning background dispatcher task"
594                                    );
595                                    executor.execute(
596                                        conn.with_upgrades()
597                                            .map_err(|e| debug!("client connection error: {}", e))
598                                            .map(|_| ()),
599                                    );
600
601                                    // Wait for 'conn' to ready up before we
602                                    // declare this tx as usable
603                                    tx.ready().await.map_err(Error::tx)?;
604                                    PoolTx::Http1(tx)
605                                }
606                                #[cfg(not(feature = "http1"))] {
607                                    panic!("http1 feature is not enabled");
608                                }
609                            };
610
611                            Ok(pool.pooled(
612                                connecting,
613                                PoolClient {
614                                    conn_info: connected,
615                                    tx,
616                                },
617                            ))
618                        }))
619                    }),
620            )
621        })
622    }
623}
624
625impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
626where
627    C: Connect + Clone + Send + Sync + 'static,
628    B: Body + Send + 'static + Unpin,
629    B::Data: Send,
630    B::Error: Into<Box<dyn StdError + Send + Sync>>,
631{
632    type Response = Response<hyper::body::Incoming>;
633    type Error = Error;
634    type Future = ResponseFuture;
635
636    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
637        Poll::Ready(Ok(()))
638    }
639
640    fn call(&mut self, req: Request<B>) -> Self::Future {
641        self.request(req)
642    }
643}
644
645impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
646where
647    C: Connect + Clone + Send + Sync + 'static,
648    B: Body + Send + 'static + Unpin,
649    B::Data: Send,
650    B::Error: Into<Box<dyn StdError + Send + Sync>>,
651{
652    type Response = Response<hyper::body::Incoming>;
653    type Error = Error;
654    type Future = ResponseFuture;
655
656    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
657        Poll::Ready(Ok(()))
658    }
659
660    fn call(&mut self, req: Request<B>) -> Self::Future {
661        self.request(req)
662    }
663}
664
665impl<C: Clone, B> Clone for Client<C, B> {
666    fn clone(&self) -> Client<C, B> {
667        Client {
668            config: self.config,
669            exec: self.exec.clone(),
670            #[cfg(feature = "http1")]
671            h1_builder: self.h1_builder.clone(),
672            #[cfg(feature = "http2")]
673            h2_builder: self.h2_builder.clone(),
674            connector: self.connector.clone(),
675            pool: self.pool.clone(),
676        }
677    }
678}
679
680impl<C, B> fmt::Debug for Client<C, B> {
681    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682        f.debug_struct("Client").finish()
683    }
684}
685
686// ===== impl ResponseFuture =====
687
688impl ResponseFuture {
689    fn new<F>(value: F) -> Self
690    where
691        F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
692    {
693        Self {
694            inner: SyncWrapper::new(Box::pin(value)),
695        }
696    }
697
698    fn error_version(ver: Version) -> Self {
699        warn!("Request has unsupported version \"{:?}\"", ver);
700        ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
701    }
702}
703
704impl fmt::Debug for ResponseFuture {
705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706        f.pad("Future<Response>")
707    }
708}
709
710impl Future for ResponseFuture {
711    type Output = Result<Response<hyper::body::Incoming>, Error>;
712
713    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
714        self.inner.get_mut().as_mut().poll(cx)
715    }
716}
717
718// ===== impl PoolClient =====
719
720// FIXME: allow() required due to `impl Trait` leaking types to this lint
721#[allow(missing_debug_implementations)]
722struct PoolClient<B> {
723    conn_info: Connected,
724    tx: PoolTx<B>,
725}
726
727enum PoolTx<B> {
728    #[cfg(feature = "http1")]
729    Http1(hyper::client::conn::http1::SendRequest<B>),
730    #[cfg(feature = "http2")]
731    Http2(hyper::client::conn::http2::SendRequest<B>),
732}
733
734impl<B> PoolClient<B> {
735    fn poll_ready(
736        &mut self,
737        #[allow(unused_variables)] cx: &mut task::Context<'_>,
738    ) -> Poll<Result<(), Error>> {
739        match self.tx {
740            #[cfg(feature = "http1")]
741            PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
742            #[cfg(feature = "http2")]
743            PoolTx::Http2(_) => Poll::Ready(Ok(())),
744        }
745    }
746
747    fn is_http1(&self) -> bool {
748        !self.is_http2()
749    }
750
751    fn is_http2(&self) -> bool {
752        match self.tx {
753            #[cfg(feature = "http1")]
754            PoolTx::Http1(_) => false,
755            #[cfg(feature = "http2")]
756            PoolTx::Http2(_) => true,
757        }
758    }
759
760    fn is_poisoned(&self) -> bool {
761        self.conn_info.poisoned.poisoned()
762    }
763
764    fn is_ready(&self) -> bool {
765        match self.tx {
766            #[cfg(feature = "http1")]
767            PoolTx::Http1(ref tx) => tx.is_ready(),
768            #[cfg(feature = "http2")]
769            PoolTx::Http2(ref tx) => tx.is_ready(),
770        }
771    }
772
773    fn is_closed(&self) -> bool {
774        match self.tx {
775            #[cfg(feature = "http1")]
776            PoolTx::Http1(ref tx) => tx.is_closed(),
777            #[cfg(feature = "http2")]
778            PoolTx::Http2(ref tx) => tx.is_closed(),
779        }
780    }
781}
782
783impl<B: Body + 'static> PoolClient<B> {
784    fn try_send_request(
785        &mut self,
786        req: Request<B>,
787    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
788    where
789        B: Send,
790    {
791        #[cfg(all(feature = "http1", feature = "http2"))]
792        return match self.tx {
793            #[cfg(feature = "http1")]
794            PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
795            #[cfg(feature = "http2")]
796            PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
797        };
798
799        #[cfg(feature = "http1")]
800        #[cfg(not(feature = "http2"))]
801        return match self.tx {
802            #[cfg(feature = "http1")]
803            PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
804        };
805
806        #[cfg(not(feature = "http1"))]
807        #[cfg(feature = "http2")]
808        return match self.tx {
809            #[cfg(feature = "http2")]
810            PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
811        };
812    }
813}
814
815impl<B> pool::Poolable for PoolClient<B>
816where
817    B: Send + 'static,
818{
819    fn is_open(&self) -> bool {
820        !self.is_poisoned() && self.is_ready()
821    }
822
823    fn reserve(self) -> pool::Reservation<Self> {
824        match self.tx {
825            #[cfg(feature = "http1")]
826            PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
827                conn_info: self.conn_info,
828                tx: PoolTx::Http1(tx),
829            }),
830            #[cfg(feature = "http2")]
831            PoolTx::Http2(tx) => {
832                let b = PoolClient {
833                    conn_info: self.conn_info.clone(),
834                    tx: PoolTx::Http2(tx.clone()),
835                };
836                let a = PoolClient {
837                    conn_info: self.conn_info,
838                    tx: PoolTx::Http2(tx),
839                };
840                pool::Reservation::Shared(a, b)
841            }
842        }
843    }
844
845    fn can_share(&self) -> bool {
846        self.is_http2()
847    }
848}
849
850enum ClientConnectError {
851    Normal(Error),
852    CheckoutIsClosed(pool::Error),
853}
854
855fn origin_form(uri: &mut Uri) {
856    let path = match uri.path_and_query() {
857        Some(path) if path.as_str() != "/" => {
858            let mut parts = ::http::uri::Parts::default();
859            parts.path_and_query = Some(path.clone());
860            Uri::from_parts(parts).expect("path is valid uri")
861        }
862        _none_or_just_slash => {
863            debug_assert!(Uri::default() == "/");
864            Uri::default()
865        }
866    };
867    *uri = path
868}
869
870fn absolute_form(uri: &mut Uri) {
871    debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
872    debug_assert!(
873        uri.authority().is_some(),
874        "absolute_form needs an authority"
875    );
876    // If the URI is to HTTPS, and the connector claimed to be a proxy,
877    // then it *should* have tunneled, and so we don't want to send
878    // absolute-form in that case.
879    if uri.scheme() == Some(&Scheme::HTTPS) {
880        origin_form(uri);
881    }
882}
883
884fn authority_form(uri: &mut Uri) {
885    if let Some(path) = uri.path_and_query() {
886        // `https://hyper.rs` would parse with `/` path, don't
887        // annoy people about that...
888        if path != "/" {
889            warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
890        }
891    }
892    *uri = match uri.authority() {
893        Some(auth) => {
894            let mut parts = ::http::uri::Parts::default();
895            parts.authority = Some(auth.clone());
896            Uri::from_parts(parts).expect("authority is valid")
897        }
898        None => {
899            unreachable!("authority_form with relative uri");
900        }
901    };
902}
903
904fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
905    let uri_clone = uri.clone();
906    match (uri_clone.scheme(), uri_clone.authority()) {
907        (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
908        (None, Some(auth)) if is_http_connect => {
909            let scheme = match auth.port_u16() {
910                Some(443) => {
911                    set_scheme(uri, Scheme::HTTPS);
912                    Scheme::HTTPS
913                }
914                _ => {
915                    set_scheme(uri, Scheme::HTTP);
916                    Scheme::HTTP
917                }
918            };
919            Ok((scheme, auth.clone()))
920        }
921        _ => {
922            debug!("Client requires absolute-form URIs, received: {:?}", uri);
923            Err(e!(UserAbsoluteUriRequired))
924        }
925    }
926}
927
928fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
929    http::uri::Builder::new()
930        .scheme(scheme)
931        .authority(auth)
932        .path_and_query("/")
933        .build()
934        .expect("domain is valid Uri")
935}
936
937fn set_scheme(uri: &mut Uri, scheme: Scheme) {
938    debug_assert!(
939        uri.scheme().is_none(),
940        "set_scheme expects no existing scheme"
941    );
942    let old = std::mem::take(uri);
943    let mut parts: ::http::uri::Parts = old.into();
944    parts.scheme = Some(scheme);
945    parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
946    *uri = Uri::from_parts(parts).expect("scheme is valid");
947}
948
949fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
950    match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
951        (Some(443), true) => None,
952        (Some(80), false) => None,
953        _ => uri.port(),
954    }
955}
956
957fn is_schema_secure(uri: &Uri) -> bool {
958    uri.scheme_str()
959        .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
960        .unwrap_or_default()
961}
962
963/// A builder to configure a new [`Client`](Client).
964///
965/// # Example
966///
967/// ```
968/// # #[cfg(feature = "tokio")]
969/// # fn run () {
970/// use std::time::Duration;
971/// use hyper_util::client::legacy::Client;
972/// use hyper_util::rt::TokioExecutor;
973///
974/// let client = Client::builder(TokioExecutor::new())
975///     .pool_idle_timeout(Duration::from_secs(30))
976///     .http2_only(true)
977///     .build_http();
978/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
979/// # drop(infer);
980/// # }
981/// # fn main() {}
982/// ```
983#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
984#[derive(Clone)]
985pub struct Builder {
986    client_config: Config,
987    exec: Exec,
988    #[cfg(feature = "http1")]
989    h1_builder: hyper::client::conn::http1::Builder,
990    #[cfg(feature = "http2")]
991    h2_builder: hyper::client::conn::http2::Builder<Exec>,
992    pool_config: pool::Config,
993    pool_timer: Option<timer::Timer>,
994}
995
996impl Builder {
997    /// Construct a new Builder.
998    pub fn new<E>(executor: E) -> Self
999    where
1000        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1001    {
1002        let exec = Exec::new(executor);
1003        Self {
1004            client_config: Config {
1005                retry_canceled_requests: true,
1006                set_host: true,
1007                ver: Ver::Auto,
1008            },
1009            exec: exec.clone(),
1010            #[cfg(feature = "http1")]
1011            h1_builder: hyper::client::conn::http1::Builder::new(),
1012            #[cfg(feature = "http2")]
1013            h2_builder: hyper::client::conn::http2::Builder::new(exec),
1014            pool_config: pool::Config {
1015                idle_timeout: Some(Duration::from_secs(90)),
1016                max_idle_per_host: usize::MAX,
1017            },
1018            pool_timer: None,
1019        }
1020    }
1021    /// Set an optional timeout for idle sockets being kept-alive.
1022    /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1023    ///
1024    /// Pass `None` to disable timeout.
1025    ///
1026    /// Default is 90 seconds.
1027    ///
1028    /// # Example
1029    ///
1030    /// ```
1031    /// # #[cfg(feature = "tokio")]
1032    /// # fn run () {
1033    /// use std::time::Duration;
1034    /// use hyper_util::client::legacy::Client;
1035    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1036    ///
1037    /// let client = Client::builder(TokioExecutor::new())
1038    ///     .pool_idle_timeout(Duration::from_secs(30))
1039    ///     .pool_timer(TokioTimer::new())
1040    ///     .build_http();
1041    ///
1042    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1043    /// # }
1044    /// # fn main() {}
1045    /// ```
1046    pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1047    where
1048        D: Into<Option<Duration>>,
1049    {
1050        self.pool_config.idle_timeout = val.into();
1051        self
1052    }
1053
1054    #[doc(hidden)]
1055    #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1056    pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1057        self.pool_config.max_idle_per_host = max_idle;
1058        self
1059    }
1060
1061    /// Sets the maximum idle connection per host allowed in the pool.
1062    ///
1063    /// Default is `usize::MAX` (no limit).
1064    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1065        self.pool_config.max_idle_per_host = max_idle;
1066        self
1067    }
1068
1069    // HTTP/1 options
1070
1071    /// Sets the exact size of the read buffer to *always* use.
1072    ///
1073    /// Note that setting this option unsets the `http1_max_buf_size` option.
1074    ///
1075    /// Default is an adaptive read buffer.
1076    #[cfg(feature = "http1")]
1077    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1078    pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1079        self.h1_builder.read_buf_exact_size(Some(sz));
1080        self
1081    }
1082
1083    /// Set the maximum buffer size for the connection.
1084    ///
1085    /// Default is ~400kb.
1086    ///
1087    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1088    ///
1089    /// # Panics
1090    ///
1091    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1092    #[cfg(feature = "http1")]
1093    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1094    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1095        self.h1_builder.max_buf_size(max);
1096        self
1097    }
1098
1099    /// Set whether HTTP/1 connections will accept spaces between header names
1100    /// and the colon that follow them in responses.
1101    ///
1102    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1103    /// parsing.
1104    ///
1105    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1106    /// to say about it:
1107    ///
1108    /// > No whitespace is allowed between the header field-name and colon. In
1109    /// > the past, differences in the handling of such whitespace have led to
1110    /// > security vulnerabilities in request routing and response handling. A
1111    /// > server MUST reject any received request message that contains
1112    /// > whitespace between a header field-name and colon with a response code
1113    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1114    /// > response message before forwarding the message downstream.
1115    ///
1116    /// Note that this setting does not affect HTTP/2.
1117    ///
1118    /// Default is false.
1119    ///
1120    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1121    #[cfg(feature = "http1")]
1122    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1123    pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1124        self.h1_builder
1125            .allow_spaces_after_header_name_in_responses(val);
1126        self
1127    }
1128
1129    /// Set whether HTTP/1 connections will accept obsolete line folding for
1130    /// header values.
1131    ///
1132    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1133    /// to say about it:
1134    ///
1135    /// > A server that receives an obs-fold in a request message that is not
1136    /// > within a message/http container MUST either reject the message by
1137    /// > sending a 400 (Bad Request), preferably with a representation
1138    /// > explaining that obsolete line folding is unacceptable, or replace
1139    /// > each received obs-fold with one or more SP octets prior to
1140    /// > interpreting the field value or forwarding the message downstream.
1141    ///
1142    /// > A proxy or gateway that receives an obs-fold in a response message
1143    /// > that is not within a message/http container MUST either discard the
1144    /// > message and replace it with a 502 (Bad Gateway) response, preferably
1145    /// > with a representation explaining that unacceptable line folding was
1146    /// > received, or replace each received obs-fold with one or more SP
1147    /// > octets prior to interpreting the field value or forwarding the
1148    /// > message downstream.
1149    ///
1150    /// > A user agent that receives an obs-fold in a response message that is
1151    /// > not within a message/http container MUST replace each received
1152    /// > obs-fold with one or more SP octets prior to interpreting the field
1153    /// > value.
1154    ///
1155    /// Note that this setting does not affect HTTP/2.
1156    ///
1157    /// Default is false.
1158    ///
1159    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1160    #[cfg(feature = "http1")]
1161    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1162    pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1163        self.h1_builder
1164            .allow_obsolete_multiline_headers_in_responses(val);
1165        self
1166    }
1167
1168    /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1169    ///
1170    /// This mimics the behaviour of major browsers. You probably don't want this.
1171    /// You should only want this if you are implementing a proxy whose main
1172    /// purpose is to sit in front of browsers whose users access arbitrary content
1173    /// which may be malformed, and they expect everything that works without
1174    /// the proxy to keep working with the proxy.
1175    ///
1176    /// This option will prevent Hyper's client from returning an error encountered
1177    /// when parsing a header, except if the error was caused by the character NUL
1178    /// (ASCII code 0), as Chrome specifically always reject those.
1179    ///
1180    /// The ignorable errors are:
1181    /// * empty header names;
1182    /// * characters that are not allowed in header names, except for `\0` and `\r`;
1183    /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1184    ///   spaces and tabs between the header name and the colon;
1185    /// * missing colon between header name and colon;
1186    /// * characters that are not allowed in header values except for `\0` and `\r`.
1187    ///
1188    /// If an ignorable error is encountered, the parser tries to find the next
1189    /// line in the input to resume parsing the rest of the headers. An error
1190    /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1191    /// looking for the next line.
1192    #[cfg(feature = "http1")]
1193    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1194    pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1195        self.h1_builder.ignore_invalid_headers_in_responses(val);
1196        self
1197    }
1198
1199    /// Set whether HTTP/1 connections should try to use vectored writes,
1200    /// or always flatten into a single buffer.
1201    ///
1202    /// Note that setting this to false may mean more copies of body data,
1203    /// but may also improve performance when an IO transport doesn't
1204    /// support vectored writes well, such as most TLS implementations.
1205    ///
1206    /// Setting this to true will force hyper to use queued strategy
1207    /// which may eliminate unnecessary cloning on some TLS backends
1208    ///
1209    /// Default is `auto`. In this mode hyper will try to guess which
1210    /// mode to use
1211    #[cfg(feature = "http1")]
1212    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1213    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1214        self.h1_builder.writev(enabled);
1215        self
1216    }
1217
1218    /// Set whether HTTP/1 connections will write header names as title case at
1219    /// the socket level.
1220    ///
1221    /// Note that this setting does not affect HTTP/2.
1222    ///
1223    /// Default is false.
1224    #[cfg(feature = "http1")]
1225    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1226    pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1227        self.h1_builder.title_case_headers(val);
1228        self
1229    }
1230
1231    /// Set whether to support preserving original header cases.
1232    ///
1233    /// Currently, this will record the original cases received, and store them
1234    /// in a private extension on the `Response`. It will also look for and use
1235    /// such an extension in any provided `Request`.
1236    ///
1237    /// Since the relevant extension is still private, there is no way to
1238    /// interact with the original cases. The only effect this can have now is
1239    /// to forward the cases in a proxy-like fashion.
1240    ///
1241    /// Note that this setting does not affect HTTP/2.
1242    ///
1243    /// Default is false.
1244    #[cfg(feature = "http1")]
1245    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1246    pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1247        self.h1_builder.preserve_header_case(val);
1248        self
1249    }
1250
1251    /// Set the maximum number of headers.
1252    ///
1253    /// When a response is received, the parser will reserve a buffer to store headers for optimal
1254    /// performance.
1255    ///
1256    /// If client receives more headers than the buffer size, the error "message header too large"
1257    /// is returned.
1258    ///
1259    /// The headers is allocated on the stack by default, which has higher performance. After
1260    /// setting this value, headers will be allocated in heap memory, that is, heap memory
1261    /// allocation will occur for each response, and there will be a performance drop of about 5%.
1262    ///
1263    /// Note that this setting does not affect HTTP/2.
1264    ///
1265    /// Default is 100.
1266    #[cfg(feature = "http1")]
1267    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1268    pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1269        self.h1_builder.max_headers(val);
1270        self
1271    }
1272
1273    /// Set whether HTTP/0.9 responses should be tolerated.
1274    ///
1275    /// Default is false.
1276    #[cfg(feature = "http1")]
1277    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1278    pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1279        self.h1_builder.http09_responses(val);
1280        self
1281    }
1282
1283    /// Set whether the connection **must** use HTTP/2.
1284    ///
1285    /// The destination must either allow HTTP2 Prior Knowledge, or the
1286    /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1287    /// as part of the connection process. This will not make the `Client`
1288    /// utilize ALPN by itself.
1289    ///
1290    /// Note that setting this to true prevents HTTP/1 from being allowed.
1291    ///
1292    /// Default is false.
1293    #[cfg(feature = "http2")]
1294    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1295    pub fn http2_only(&mut self, val: bool) -> &mut Self {
1296        self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1297        self
1298    }
1299
1300    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1301    ///
1302    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1303    /// As of v0.4.0, it is 20.
1304    ///
1305    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1306    #[cfg(feature = "http2")]
1307    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1308    pub fn http2_max_pending_accept_reset_streams(
1309        &mut self,
1310        max: impl Into<Option<usize>>,
1311    ) -> &mut Self {
1312        self.h2_builder.max_pending_accept_reset_streams(max.into());
1313        self
1314    }
1315
1316    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1317    /// stream-level flow control.
1318    ///
1319    /// Passing `None` will do nothing.
1320    ///
1321    /// If not set, hyper will use a default.
1322    ///
1323    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1324    #[cfg(feature = "http2")]
1325    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1326    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1327        self.h2_builder.initial_stream_window_size(sz.into());
1328        self
1329    }
1330
1331    /// Sets the max connection-level flow control for HTTP2
1332    ///
1333    /// Passing `None` will do nothing.
1334    ///
1335    /// If not set, hyper will use a default.
1336    #[cfg(feature = "http2")]
1337    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1338    pub fn http2_initial_connection_window_size(
1339        &mut self,
1340        sz: impl Into<Option<u32>>,
1341    ) -> &mut Self {
1342        self.h2_builder.initial_connection_window_size(sz.into());
1343        self
1344    }
1345
1346    /// Sets the initial maximum of locally initiated (send) streams.
1347    ///
1348    /// This value will be overwritten by the value included in the initial
1349    /// SETTINGS frame received from the peer as part of a [connection preface].
1350    ///
1351    /// Passing `None` will do nothing.
1352    ///
1353    /// If not set, hyper will use a default.
1354    ///
1355    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1356    #[cfg(feature = "http2")]
1357    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1358    pub fn http2_initial_max_send_streams(
1359        &mut self,
1360        initial: impl Into<Option<usize>>,
1361    ) -> &mut Self {
1362        self.h2_builder.initial_max_send_streams(initial);
1363        self
1364    }
1365
1366    /// Sets whether to use an adaptive flow control.
1367    ///
1368    /// Enabling this will override the limits set in
1369    /// `http2_initial_stream_window_size` and
1370    /// `http2_initial_connection_window_size`.
1371    #[cfg(feature = "http2")]
1372    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1373    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1374        self.h2_builder.adaptive_window(enabled);
1375        self
1376    }
1377
1378    /// Sets the maximum frame size to use for HTTP2.
1379    ///
1380    /// Passing `None` will do nothing.
1381    ///
1382    /// If not set, hyper will use a default.
1383    #[cfg(feature = "http2")]
1384    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1385    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1386        self.h2_builder.max_frame_size(sz);
1387        self
1388    }
1389
1390    /// Sets the max size of received header frames for HTTP2.
1391    ///
1392    /// Default is currently 16KB, but can change.
1393    #[cfg(feature = "http2")]
1394    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1395    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1396        self.h2_builder.max_header_list_size(max);
1397        self
1398    }
1399
1400    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1401    /// connection alive.
1402    ///
1403    /// Pass `None` to disable HTTP2 keep-alive.
1404    ///
1405    /// Default is currently disabled.
1406    ///
1407    /// # Cargo Feature
1408    ///
1409    /// Requires the `tokio` cargo feature to be enabled.
1410    #[cfg(feature = "tokio")]
1411    #[cfg(feature = "http2")]
1412    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1413    pub fn http2_keep_alive_interval(
1414        &mut self,
1415        interval: impl Into<Option<Duration>>,
1416    ) -> &mut Self {
1417        self.h2_builder.keep_alive_interval(interval);
1418        self
1419    }
1420
1421    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1422    ///
1423    /// If the ping is not acknowledged within the timeout, the connection will
1424    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1425    ///
1426    /// Default is 20 seconds.
1427    ///
1428    /// # Cargo Feature
1429    ///
1430    /// Requires the `tokio` cargo feature to be enabled.
1431    #[cfg(feature = "tokio")]
1432    #[cfg(feature = "http2")]
1433    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1434    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1435        self.h2_builder.keep_alive_timeout(timeout);
1436        self
1437    }
1438
1439    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1440    ///
1441    /// If disabled, keep-alive pings are only sent while there are open
1442    /// request/responses streams. If enabled, pings are also sent when no
1443    /// streams are active. Does nothing if `http2_keep_alive_interval` is
1444    /// disabled.
1445    ///
1446    /// Default is `false`.
1447    ///
1448    /// # Cargo Feature
1449    ///
1450    /// Requires the `tokio` cargo feature to be enabled.
1451    #[cfg(feature = "tokio")]
1452    #[cfg(feature = "http2")]
1453    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1454    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1455        self.h2_builder.keep_alive_while_idle(enabled);
1456        self
1457    }
1458
1459    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1460    ///
1461    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1462    /// details.
1463    ///
1464    /// The default value is determined by the `h2` crate.
1465    ///
1466    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1467    #[cfg(feature = "http2")]
1468    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1469    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1470        self.h2_builder.max_concurrent_reset_streams(max);
1471        self
1472    }
1473
1474    /// Provide a timer to be used for h2
1475    ///
1476    /// See the documentation of [`h2::client::Builder::timer`] for more
1477    /// details.
1478    ///
1479    /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1480    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1481    where
1482        M: Timer + Send + Sync + 'static,
1483    {
1484        #[cfg(feature = "http2")]
1485        self.h2_builder.timer(timer);
1486        self
1487    }
1488
1489    /// Provide a timer to be used for timeouts and intervals in connection pools.
1490    pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1491    where
1492        M: Timer + Clone + Send + Sync + 'static,
1493    {
1494        self.pool_timer = Some(timer::Timer::new(timer.clone()));
1495        self
1496    }
1497
1498    /// Set the maximum write buffer size for each HTTP/2 stream.
1499    ///
1500    /// Default is currently 1MB, but may change.
1501    ///
1502    /// # Panics
1503    ///
1504    /// The value must be no larger than `u32::MAX`.
1505    #[cfg(feature = "http2")]
1506    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1507    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1508        self.h2_builder.max_send_buf_size(max);
1509        self
1510    }
1511
1512    /// Set whether to retry requests that get disrupted before ever starting
1513    /// to write.
1514    ///
1515    /// This means a request that is queued, and gets given an idle, reused
1516    /// connection, and then encounters an error immediately as the idle
1517    /// connection was found to be unusable.
1518    ///
1519    /// When this is set to `false`, the related `ResponseFuture` would instead
1520    /// resolve to an `Error::Cancel`.
1521    ///
1522    /// Default is `true`.
1523    #[inline]
1524    pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1525        self.client_config.retry_canceled_requests = val;
1526        self
1527    }
1528
1529    /// Set whether to automatically add the `Host` header to requests.
1530    ///
1531    /// If true, and a request does not include a `Host` header, one will be
1532    /// added automatically, derived from the authority of the `Uri`.
1533    ///
1534    /// Default is `true`.
1535    #[inline]
1536    pub fn set_host(&mut self, val: bool) -> &mut Self {
1537        self.client_config.set_host = val;
1538        self
1539    }
1540
1541    /// Build a client with this configuration and the default `HttpConnector`.
1542    #[cfg(feature = "tokio")]
1543    pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1544    where
1545        B: Body + Send,
1546        B::Data: Send,
1547    {
1548        let mut connector = HttpConnector::new();
1549        if self.pool_config.is_enabled() {
1550            connector.set_keepalive(self.pool_config.idle_timeout);
1551        }
1552        self.build(connector)
1553    }
1554
1555    /// Combine the configuration of this builder with a connector to create a `Client`.
1556    pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1557    where
1558        C: Connect + Clone,
1559        B: Body + Send,
1560        B::Data: Send,
1561    {
1562        let exec = self.exec.clone();
1563        let timer = self.pool_timer.clone();
1564        Client {
1565            config: self.client_config,
1566            exec: exec.clone(),
1567            #[cfg(feature = "http1")]
1568            h1_builder: self.h1_builder.clone(),
1569            #[cfg(feature = "http2")]
1570            h2_builder: self.h2_builder.clone(),
1571            connector,
1572            pool: pool::Pool::new(self.pool_config, exec, timer),
1573        }
1574    }
1575}
1576
1577impl fmt::Debug for Builder {
1578    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1579        f.debug_struct("Builder")
1580            .field("client_config", &self.client_config)
1581            .field("pool_config", &self.pool_config)
1582            .finish()
1583    }
1584}
1585
1586// ==== impl Error ====
1587
1588impl fmt::Debug for Error {
1589    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1590        let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1591        f.field(&self.kind);
1592        if let Some(ref cause) = self.source {
1593            f.field(cause);
1594        }
1595        f.finish()
1596    }
1597}
1598
1599impl fmt::Display for Error {
1600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1601        write!(f, "client error ({:?})", self.kind)
1602    }
1603}
1604
1605impl StdError for Error {
1606    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1607        self.source.as_ref().map(|e| &**e as _)
1608    }
1609}
1610
1611impl Error {
1612    /// Returns true if this was an error from `Connect`.
1613    pub fn is_connect(&self) -> bool {
1614        matches!(self.kind, ErrorKind::Connect)
1615    }
1616
1617    /// Returns the info of the client connection on which this error occurred.
1618    #[cfg(any(feature = "http1", feature = "http2"))]
1619    pub fn connect_info(&self) -> Option<&Connected> {
1620        self.connect_info.as_ref()
1621    }
1622
1623    #[cfg(any(feature = "http1", feature = "http2"))]
1624    fn with_connect_info(self, connect_info: Connected) -> Self {
1625        Self {
1626            connect_info: Some(connect_info),
1627            ..self
1628        }
1629    }
1630    fn is_canceled(&self) -> bool {
1631        matches!(self.kind, ErrorKind::Canceled)
1632    }
1633
1634    fn tx(src: hyper::Error) -> Self {
1635        e!(SendRequest, src)
1636    }
1637
1638    fn closed(src: hyper::Error) -> Self {
1639        e!(ChannelClosed, src)
1640    }
1641}