hyper_util/client/legacy/client.rs
1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
29
30type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
31
32/// A Client to make outgoing HTTP requests.
33///
34/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
35/// underlying connection pool will be reused.
36#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
37pub struct Client<C, B> {
38 config: Config,
39 connector: C,
40 exec: Exec,
41 #[cfg(feature = "http1")]
42 h1_builder: hyper::client::conn::http1::Builder,
43 #[cfg(feature = "http2")]
44 h2_builder: hyper::client::conn::http2::Builder<Exec>,
45 pool: pool::Pool<PoolClient<B>, PoolKey>,
46}
47
48#[derive(Clone, Copy, Debug)]
49struct Config {
50 retry_canceled_requests: bool,
51 set_host: bool,
52 ver: Ver,
53}
54
55/// Client errors
56pub struct Error {
57 kind: ErrorKind,
58 source: Option<Box<dyn StdError + Send + Sync>>,
59 #[cfg(any(feature = "http1", feature = "http2"))]
60 connect_info: Option<Connected>,
61}
62
63#[derive(Debug)]
64enum ErrorKind {
65 Canceled,
66 ChannelClosed,
67 Connect,
68 UserUnsupportedRequestMethod,
69 UserUnsupportedVersion,
70 UserAbsoluteUriRequired,
71 SendRequest,
72}
73
74macro_rules! e {
75 ($kind:ident) => {
76 Error {
77 kind: ErrorKind::$kind,
78 source: None,
79 connect_info: None,
80 }
81 };
82 ($kind:ident, $src:expr) => {
83 Error {
84 kind: ErrorKind::$kind,
85 source: Some($src.into()),
86 connect_info: None,
87 }
88 };
89}
90
91// We might change this... :shrug:
92type PoolKey = (http::uri::Scheme, http::uri::Authority);
93
94enum TrySendError<B> {
95 Retryable {
96 error: Error,
97 req: Request<B>,
98 connection_reused: bool,
99 },
100 Nope(Error),
101}
102
103/// A `Future` that will resolve to an HTTP Response.
104///
105/// This is returned by `Client::request` (and `Client::get`).
106#[must_use = "futures do nothing unless polled"]
107pub struct ResponseFuture {
108 inner: SyncWrapper<
109 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
110 >,
111}
112
113// ===== impl Client =====
114
115impl Client<(), ()> {
116 /// Create a builder to configure a new `Client`.
117 ///
118 /// # Example
119 ///
120 /// ```
121 /// # #[cfg(feature = "tokio")]
122 /// # fn run () {
123 /// use std::time::Duration;
124 /// use hyper_util::client::legacy::Client;
125 /// use hyper_util::rt::TokioExecutor;
126 ///
127 /// let client = Client::builder(TokioExecutor::new())
128 /// .pool_idle_timeout(Duration::from_secs(30))
129 /// .http2_only(true)
130 /// .build_http();
131 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
132 /// # drop(infer);
133 /// # }
134 /// # fn main() {}
135 /// ```
136 pub fn builder<E>(executor: E) -> Builder
137 where
138 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
139 {
140 Builder::new(executor)
141 }
142}
143
144impl<C, B> Client<C, B>
145where
146 C: Connect + Clone + Send + Sync + 'static,
147 B: Body + Send + 'static + Unpin,
148 B::Data: Send,
149 B::Error: Into<Box<dyn StdError + Send + Sync>>,
150{
151 /// Send a `GET` request to the supplied `Uri`.
152 ///
153 /// # Note
154 ///
155 /// This requires that the `Body` type have a `Default` implementation.
156 /// It *should* return an "empty" version of itself, such that
157 /// `Body::is_end_stream` is `true`.
158 ///
159 /// # Example
160 ///
161 /// ```
162 /// # #[cfg(feature = "tokio")]
163 /// # fn run () {
164 /// use hyper::Uri;
165 /// use hyper_util::client::legacy::Client;
166 /// use hyper_util::rt::TokioExecutor;
167 /// use bytes::Bytes;
168 /// use http_body_util::Full;
169 ///
170 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
171 ///
172 /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
173 /// # }
174 /// # fn main() {}
175 /// ```
176 pub fn get(&self, uri: Uri) -> ResponseFuture
177 where
178 B: Default,
179 {
180 let body = B::default();
181 if !body.is_end_stream() {
182 warn!("default Body used for get() does not return true for is_end_stream");
183 }
184
185 let mut req = Request::new(body);
186 *req.uri_mut() = uri;
187 self.request(req)
188 }
189
190 /// Send a constructed `Request` using this `Client`.
191 ///
192 /// # Example
193 ///
194 /// ```
195 /// # #[cfg(feature = "tokio")]
196 /// # fn run () {
197 /// use hyper::{Method, Request};
198 /// use hyper_util::client::legacy::Client;
199 /// use http_body_util::Full;
200 /// use hyper_util::rt::TokioExecutor;
201 /// use bytes::Bytes;
202 ///
203 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
204 ///
205 /// let req: Request<Full<Bytes>> = Request::builder()
206 /// .method(Method::POST)
207 /// .uri("http://httpbin.org/post")
208 /// .body(Full::from("Hallo!"))
209 /// .expect("request builder");
210 ///
211 /// let future = client.request(req);
212 /// # }
213 /// # fn main() {}
214 /// ```
215 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
216 let is_http_connect = req.method() == Method::CONNECT;
217 match req.version() {
218 Version::HTTP_11 => (),
219 Version::HTTP_10 => {
220 if is_http_connect {
221 warn!("CONNECT is not allowed for HTTP/1.0");
222 return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
223 }
224 }
225 Version::HTTP_2 => (),
226 // completely unsupported HTTP version (like HTTP/0.9)!
227 other => return ResponseFuture::error_version(other),
228 };
229
230 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
231 Ok(s) => s,
232 Err(err) => {
233 return ResponseFuture::new(future::err(err));
234 }
235 };
236
237 ResponseFuture::new(self.clone().send_request(req, pool_key))
238 }
239
240 async fn send_request(
241 self,
242 mut req: Request<B>,
243 pool_key: PoolKey,
244 ) -> Result<Response<hyper::body::Incoming>, Error> {
245 let uri = req.uri().clone();
246
247 loop {
248 req = match self.try_send_request(req, pool_key.clone()).await {
249 Ok(resp) => return Ok(resp),
250 Err(TrySendError::Nope(err)) => return Err(err),
251 Err(TrySendError::Retryable {
252 mut req,
253 error,
254 connection_reused,
255 }) => {
256 if !self.config.retry_canceled_requests || !connection_reused {
257 // if client disabled, don't retry
258 // a fresh connection means we definitely can't retry
259 return Err(error);
260 }
261
262 trace!(
263 "unstarted request canceled, trying again (reason={:?})",
264 error
265 );
266 *req.uri_mut() = uri.clone();
267 req
268 }
269 }
270 }
271 }
272
273 async fn try_send_request(
274 &self,
275 mut req: Request<B>,
276 pool_key: PoolKey,
277 ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
278 let mut pooled = self
279 .connection_for(pool_key)
280 .await
281 // `connection_for` already retries checkout errors, so if
282 // it returns an error, there's not much else to retry
283 .map_err(TrySendError::Nope)?;
284
285 req.extensions_mut()
286 .get_mut::<CaptureConnectionExtension>()
287 .map(|conn| conn.set(&pooled.conn_info));
288
289 if pooled.is_http1() {
290 if req.version() == Version::HTTP_2 {
291 warn!("Connection is HTTP/1, but request requires HTTP/2");
292 return Err(TrySendError::Nope(
293 e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
294 ));
295 }
296
297 if self.config.set_host {
298 let uri = req.uri().clone();
299 req.headers_mut().entry(HOST).or_insert_with(|| {
300 let hostname = uri.host().expect("authority implies host");
301 if let Some(port) = get_non_default_port(&uri) {
302 let s = format!("{}:{}", hostname, port);
303 HeaderValue::from_str(&s)
304 } else {
305 HeaderValue::from_str(hostname)
306 }
307 .expect("uri host is valid header value")
308 });
309 }
310
311 // CONNECT always sends authority-form, so check it first...
312 if req.method() == Method::CONNECT {
313 authority_form(req.uri_mut());
314 } else if pooled.conn_info.is_proxied {
315 absolute_form(req.uri_mut());
316 } else {
317 origin_form(req.uri_mut());
318 }
319 } else if req.method() == Method::CONNECT {
320 authority_form(req.uri_mut());
321 }
322
323 let mut res = match pooled.try_send_request(req).await {
324 Ok(res) => res,
325 Err(mut err) => {
326 return if let Some(req) = err.take_message() {
327 Err(TrySendError::Retryable {
328 connection_reused: pooled.is_reused(),
329 error: e!(Canceled, err.into_error())
330 .with_connect_info(pooled.conn_info.clone()),
331 req,
332 })
333 } else {
334 Err(TrySendError::Nope(
335 e!(SendRequest, err.into_error())
336 .with_connect_info(pooled.conn_info.clone()),
337 ))
338 }
339 }
340 };
341
342 // If the Connector included 'extra' info, add to Response...
343 if let Some(extra) = &pooled.conn_info.extra {
344 extra.set(res.extensions_mut());
345 }
346
347 // As of futures@0.1.21, there is a race condition in the mpsc
348 // channel, such that sending when the receiver is closing can
349 // result in the message being stuck inside the queue. It won't
350 // ever notify until the Sender side is dropped.
351 //
352 // To counteract this, we must check if our senders 'want' channel
353 // has been closed after having tried to send. If so, error out...
354 if pooled.is_closed() {
355 return Ok(res);
356 }
357
358 // If pooled is HTTP/2, we can toss this reference immediately.
359 //
360 // when pooled is dropped, it will try to insert back into the
361 // pool. To delay that, spawn a future that completes once the
362 // sender is ready again.
363 //
364 // This *should* only be once the related `Connection` has polled
365 // for a new request to start.
366 //
367 // It won't be ready if there is a body to stream.
368 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
369 drop(pooled);
370 } else if !res.body().is_end_stream() {
371 //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
372 //res.body_mut().delayed_eof(delayed_rx);
373 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
374 // At this point, `pooled` is dropped, and had a chance
375 // to insert into the pool (if conn was idle)
376 //drop(delayed_tx);
377 });
378
379 self.exec.execute(on_idle);
380 } else {
381 // There's no body to delay, but the connection isn't
382 // ready yet. Only re-insert when it's ready
383 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
384
385 self.exec.execute(on_idle);
386 }
387
388 Ok(res)
389 }
390
391 async fn connection_for(
392 &self,
393 pool_key: PoolKey,
394 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
395 loop {
396 match self.one_connection_for(pool_key.clone()).await {
397 Ok(pooled) => return Ok(pooled),
398 Err(ClientConnectError::Normal(err)) => return Err(err),
399 Err(ClientConnectError::CheckoutIsClosed(reason)) => {
400 if !self.config.retry_canceled_requests {
401 return Err(e!(Connect, reason));
402 }
403
404 trace!(
405 "unstarted request canceled, trying again (reason={:?})",
406 reason,
407 );
408 continue;
409 }
410 };
411 }
412 }
413
414 async fn one_connection_for(
415 &self,
416 pool_key: PoolKey,
417 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
418 // Return a single connection if pooling is not enabled
419 if !self.pool.is_enabled() {
420 return self
421 .connect_to(pool_key)
422 .await
423 .map_err(ClientConnectError::Normal);
424 }
425
426 // This actually races 2 different futures to try to get a ready
427 // connection the fastest, and to reduce connection churn.
428 //
429 // - If the pool has an idle connection waiting, that's used
430 // immediately.
431 // - Otherwise, the Connector is asked to start connecting to
432 // the destination Uri.
433 // - Meanwhile, the pool Checkout is watching to see if any other
434 // request finishes and tries to insert an idle connection.
435 // - If a new connection is started, but the Checkout wins after
436 // (an idle connection became available first), the started
437 // connection future is spawned into the runtime to complete,
438 // and then be inserted into the pool as an idle connection.
439 let checkout = self.pool.checkout(pool_key.clone());
440 let connect = self.connect_to(pool_key);
441 let is_ver_h2 = self.config.ver == Ver::Http2;
442
443 // The order of the `select` is depended on below...
444
445 match future::select(checkout, connect).await {
446 // Checkout won, connect future may have been started or not.
447 //
448 // If it has, let it finish and insert back into the pool,
449 // so as to not waste the socket...
450 Either::Left((Ok(checked_out), connecting)) => {
451 // This depends on the `select` above having the correct
452 // order, such that if the checkout future were ready
453 // immediately, the connect future will never have been
454 // started.
455 //
456 // If it *wasn't* ready yet, then the connect future will
457 // have been started...
458 if connecting.started() {
459 let bg = connecting
460 .map_err(|err| {
461 trace!("background connect error: {}", err);
462 })
463 .map(|_pooled| {
464 // dropping here should just place it in
465 // the Pool for us...
466 });
467 // An execute error here isn't important, we're just trying
468 // to prevent a waste of a socket...
469 self.exec.execute(bg);
470 }
471 Ok(checked_out)
472 }
473 // Connect won, checkout can just be dropped.
474 Either::Right((Ok(connected), _checkout)) => Ok(connected),
475 // Either checkout or connect could get canceled:
476 //
477 // 1. Connect is canceled if this is HTTP/2 and there is
478 // an outstanding HTTP/2 connecting task.
479 // 2. Checkout is canceled if the pool cannot deliver an
480 // idle connection reliably.
481 //
482 // In both cases, we should just wait for the other future.
483 Either::Left((Err(err), connecting)) => {
484 if err.is_canceled() {
485 connecting.await.map_err(ClientConnectError::Normal)
486 } else {
487 Err(ClientConnectError::Normal(e!(Connect, err)))
488 }
489 }
490 Either::Right((Err(err), checkout)) => {
491 if err.is_canceled() {
492 checkout.await.map_err(move |err| {
493 if is_ver_h2 && err.is_canceled() {
494 ClientConnectError::CheckoutIsClosed(err)
495 } else {
496 ClientConnectError::Normal(e!(Connect, err))
497 }
498 })
499 } else {
500 Err(ClientConnectError::Normal(err))
501 }
502 }
503 }
504 }
505
506 #[cfg(any(feature = "http1", feature = "http2"))]
507 fn connect_to(
508 &self,
509 pool_key: PoolKey,
510 ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
511 {
512 let executor = self.exec.clone();
513 let pool = self.pool.clone();
514 #[cfg(feature = "http1")]
515 let h1_builder = self.h1_builder.clone();
516 #[cfg(feature = "http2")]
517 let h2_builder = self.h2_builder.clone();
518 let ver = self.config.ver;
519 let is_ver_h2 = ver == Ver::Http2;
520 let connector = self.connector.clone();
521 let dst = domain_as_uri(pool_key.clone());
522 hyper_lazy(move || {
523 // Try to take a "connecting lock".
524 //
525 // If the pool_key is for HTTP/2, and there is already a
526 // connection being established, then this can't take a
527 // second lock. The "connect_to" future is Canceled.
528 let connecting = match pool.connecting(&pool_key, ver) {
529 Some(lock) => lock,
530 None => {
531 let canceled = e!(Canceled);
532 // TODO
533 //crate::Error::new_canceled().with("HTTP/2 connection in progress");
534 return Either::Right(future::err(canceled));
535 }
536 };
537 Either::Left(
538 connector
539 .connect(super::connect::sealed::Internal, dst)
540 .map_err(|src| e!(Connect, src))
541 .and_then(move |io| {
542 let connected = io.connected();
543 // If ALPN is h2 and we aren't http2_only already,
544 // then we need to convert our pool checkout into
545 // a single HTTP2 one.
546 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
547 match connecting.alpn_h2(&pool) {
548 Some(lock) => {
549 trace!("ALPN negotiated h2, updating pool");
550 lock
551 }
552 None => {
553 // Another connection has already upgraded,
554 // the pool checkout should finish up for us.
555 let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
556 return Either::Right(future::err(canceled));
557 }
558 }
559 } else {
560 connecting
561 };
562
563 #[cfg_attr(not(feature = "http2"), allow(unused))]
564 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
565
566 Either::Left(Box::pin(async move {
567 let tx = if is_h2 {
568 #[cfg(feature = "http2")] {
569 let (mut tx, conn) =
570 h2_builder.handshake(io).await.map_err(Error::tx)?;
571
572 trace!(
573 "http2 handshake complete, spawning background dispatcher task"
574 );
575 executor.execute(
576 conn.map_err(|e| debug!("client connection error: {}", e))
577 .map(|_| ()),
578 );
579
580 // Wait for 'conn' to ready up before we
581 // declare this tx as usable
582 tx.ready().await.map_err(Error::tx)?;
583 PoolTx::Http2(tx)
584 }
585 #[cfg(not(feature = "http2"))]
586 panic!("http2 feature is not enabled");
587 } else {
588 #[cfg(feature = "http1")] {
589 let (mut tx, conn) =
590 h1_builder.handshake(io).await.map_err(Error::tx)?;
591
592 trace!(
593 "http1 handshake complete, spawning background dispatcher task"
594 );
595 executor.execute(
596 conn.with_upgrades()
597 .map_err(|e| debug!("client connection error: {}", e))
598 .map(|_| ()),
599 );
600
601 // Wait for 'conn' to ready up before we
602 // declare this tx as usable
603 tx.ready().await.map_err(Error::tx)?;
604 PoolTx::Http1(tx)
605 }
606 #[cfg(not(feature = "http1"))] {
607 panic!("http1 feature is not enabled");
608 }
609 };
610
611 Ok(pool.pooled(
612 connecting,
613 PoolClient {
614 conn_info: connected,
615 tx,
616 },
617 ))
618 }))
619 }),
620 )
621 })
622 }
623}
624
625impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
626where
627 C: Connect + Clone + Send + Sync + 'static,
628 B: Body + Send + 'static + Unpin,
629 B::Data: Send,
630 B::Error: Into<Box<dyn StdError + Send + Sync>>,
631{
632 type Response = Response<hyper::body::Incoming>;
633 type Error = Error;
634 type Future = ResponseFuture;
635
636 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
637 Poll::Ready(Ok(()))
638 }
639
640 fn call(&mut self, req: Request<B>) -> Self::Future {
641 self.request(req)
642 }
643}
644
645impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
646where
647 C: Connect + Clone + Send + Sync + 'static,
648 B: Body + Send + 'static + Unpin,
649 B::Data: Send,
650 B::Error: Into<Box<dyn StdError + Send + Sync>>,
651{
652 type Response = Response<hyper::body::Incoming>;
653 type Error = Error;
654 type Future = ResponseFuture;
655
656 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
657 Poll::Ready(Ok(()))
658 }
659
660 fn call(&mut self, req: Request<B>) -> Self::Future {
661 self.request(req)
662 }
663}
664
665impl<C: Clone, B> Clone for Client<C, B> {
666 fn clone(&self) -> Client<C, B> {
667 Client {
668 config: self.config,
669 exec: self.exec.clone(),
670 #[cfg(feature = "http1")]
671 h1_builder: self.h1_builder.clone(),
672 #[cfg(feature = "http2")]
673 h2_builder: self.h2_builder.clone(),
674 connector: self.connector.clone(),
675 pool: self.pool.clone(),
676 }
677 }
678}
679
680impl<C, B> fmt::Debug for Client<C, B> {
681 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682 f.debug_struct("Client").finish()
683 }
684}
685
686// ===== impl ResponseFuture =====
687
688impl ResponseFuture {
689 fn new<F>(value: F) -> Self
690 where
691 F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
692 {
693 Self {
694 inner: SyncWrapper::new(Box::pin(value)),
695 }
696 }
697
698 fn error_version(ver: Version) -> Self {
699 warn!("Request has unsupported version \"{:?}\"", ver);
700 ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
701 }
702}
703
704impl fmt::Debug for ResponseFuture {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 f.pad("Future<Response>")
707 }
708}
709
710impl Future for ResponseFuture {
711 type Output = Result<Response<hyper::body::Incoming>, Error>;
712
713 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
714 self.inner.get_mut().as_mut().poll(cx)
715 }
716}
717
718// ===== impl PoolClient =====
719
720// FIXME: allow() required due to `impl Trait` leaking types to this lint
721#[allow(missing_debug_implementations)]
722struct PoolClient<B> {
723 conn_info: Connected,
724 tx: PoolTx<B>,
725}
726
727enum PoolTx<B> {
728 #[cfg(feature = "http1")]
729 Http1(hyper::client::conn::http1::SendRequest<B>),
730 #[cfg(feature = "http2")]
731 Http2(hyper::client::conn::http2::SendRequest<B>),
732}
733
734impl<B> PoolClient<B> {
735 fn poll_ready(
736 &mut self,
737 #[allow(unused_variables)] cx: &mut task::Context<'_>,
738 ) -> Poll<Result<(), Error>> {
739 match self.tx {
740 #[cfg(feature = "http1")]
741 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
742 #[cfg(feature = "http2")]
743 PoolTx::Http2(_) => Poll::Ready(Ok(())),
744 }
745 }
746
747 fn is_http1(&self) -> bool {
748 !self.is_http2()
749 }
750
751 fn is_http2(&self) -> bool {
752 match self.tx {
753 #[cfg(feature = "http1")]
754 PoolTx::Http1(_) => false,
755 #[cfg(feature = "http2")]
756 PoolTx::Http2(_) => true,
757 }
758 }
759
760 fn is_poisoned(&self) -> bool {
761 self.conn_info.poisoned.poisoned()
762 }
763
764 fn is_ready(&self) -> bool {
765 match self.tx {
766 #[cfg(feature = "http1")]
767 PoolTx::Http1(ref tx) => tx.is_ready(),
768 #[cfg(feature = "http2")]
769 PoolTx::Http2(ref tx) => tx.is_ready(),
770 }
771 }
772
773 fn is_closed(&self) -> bool {
774 match self.tx {
775 #[cfg(feature = "http1")]
776 PoolTx::Http1(ref tx) => tx.is_closed(),
777 #[cfg(feature = "http2")]
778 PoolTx::Http2(ref tx) => tx.is_closed(),
779 }
780 }
781}
782
783impl<B: Body + 'static> PoolClient<B> {
784 fn try_send_request(
785 &mut self,
786 req: Request<B>,
787 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
788 where
789 B: Send,
790 {
791 #[cfg(all(feature = "http1", feature = "http2"))]
792 return match self.tx {
793 #[cfg(feature = "http1")]
794 PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
795 #[cfg(feature = "http2")]
796 PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
797 };
798
799 #[cfg(feature = "http1")]
800 #[cfg(not(feature = "http2"))]
801 return match self.tx {
802 #[cfg(feature = "http1")]
803 PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
804 };
805
806 #[cfg(not(feature = "http1"))]
807 #[cfg(feature = "http2")]
808 return match self.tx {
809 #[cfg(feature = "http2")]
810 PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
811 };
812 }
813}
814
815impl<B> pool::Poolable for PoolClient<B>
816where
817 B: Send + 'static,
818{
819 fn is_open(&self) -> bool {
820 !self.is_poisoned() && self.is_ready()
821 }
822
823 fn reserve(self) -> pool::Reservation<Self> {
824 match self.tx {
825 #[cfg(feature = "http1")]
826 PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
827 conn_info: self.conn_info,
828 tx: PoolTx::Http1(tx),
829 }),
830 #[cfg(feature = "http2")]
831 PoolTx::Http2(tx) => {
832 let b = PoolClient {
833 conn_info: self.conn_info.clone(),
834 tx: PoolTx::Http2(tx.clone()),
835 };
836 let a = PoolClient {
837 conn_info: self.conn_info,
838 tx: PoolTx::Http2(tx),
839 };
840 pool::Reservation::Shared(a, b)
841 }
842 }
843 }
844
845 fn can_share(&self) -> bool {
846 self.is_http2()
847 }
848}
849
850enum ClientConnectError {
851 Normal(Error),
852 CheckoutIsClosed(pool::Error),
853}
854
855fn origin_form(uri: &mut Uri) {
856 let path = match uri.path_and_query() {
857 Some(path) if path.as_str() != "/" => {
858 let mut parts = ::http::uri::Parts::default();
859 parts.path_and_query = Some(path.clone());
860 Uri::from_parts(parts).expect("path is valid uri")
861 }
862 _none_or_just_slash => {
863 debug_assert!(Uri::default() == "/");
864 Uri::default()
865 }
866 };
867 *uri = path
868}
869
870fn absolute_form(uri: &mut Uri) {
871 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
872 debug_assert!(
873 uri.authority().is_some(),
874 "absolute_form needs an authority"
875 );
876 // If the URI is to HTTPS, and the connector claimed to be a proxy,
877 // then it *should* have tunneled, and so we don't want to send
878 // absolute-form in that case.
879 if uri.scheme() == Some(&Scheme::HTTPS) {
880 origin_form(uri);
881 }
882}
883
884fn authority_form(uri: &mut Uri) {
885 if let Some(path) = uri.path_and_query() {
886 // `https://hyper.rs` would parse with `/` path, don't
887 // annoy people about that...
888 if path != "/" {
889 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
890 }
891 }
892 *uri = match uri.authority() {
893 Some(auth) => {
894 let mut parts = ::http::uri::Parts::default();
895 parts.authority = Some(auth.clone());
896 Uri::from_parts(parts).expect("authority is valid")
897 }
898 None => {
899 unreachable!("authority_form with relative uri");
900 }
901 };
902}
903
904fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
905 let uri_clone = uri.clone();
906 match (uri_clone.scheme(), uri_clone.authority()) {
907 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
908 (None, Some(auth)) if is_http_connect => {
909 let scheme = match auth.port_u16() {
910 Some(443) => {
911 set_scheme(uri, Scheme::HTTPS);
912 Scheme::HTTPS
913 }
914 _ => {
915 set_scheme(uri, Scheme::HTTP);
916 Scheme::HTTP
917 }
918 };
919 Ok((scheme, auth.clone()))
920 }
921 _ => {
922 debug!("Client requires absolute-form URIs, received: {:?}", uri);
923 Err(e!(UserAbsoluteUriRequired))
924 }
925 }
926}
927
928fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
929 http::uri::Builder::new()
930 .scheme(scheme)
931 .authority(auth)
932 .path_and_query("/")
933 .build()
934 .expect("domain is valid Uri")
935}
936
937fn set_scheme(uri: &mut Uri, scheme: Scheme) {
938 debug_assert!(
939 uri.scheme().is_none(),
940 "set_scheme expects no existing scheme"
941 );
942 let old = std::mem::take(uri);
943 let mut parts: ::http::uri::Parts = old.into();
944 parts.scheme = Some(scheme);
945 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
946 *uri = Uri::from_parts(parts).expect("scheme is valid");
947}
948
949fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
950 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
951 (Some(443), true) => None,
952 (Some(80), false) => None,
953 _ => uri.port(),
954 }
955}
956
957fn is_schema_secure(uri: &Uri) -> bool {
958 uri.scheme_str()
959 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
960 .unwrap_or_default()
961}
962
963/// A builder to configure a new [`Client`](Client).
964///
965/// # Example
966///
967/// ```
968/// # #[cfg(feature = "tokio")]
969/// # fn run () {
970/// use std::time::Duration;
971/// use hyper_util::client::legacy::Client;
972/// use hyper_util::rt::TokioExecutor;
973///
974/// let client = Client::builder(TokioExecutor::new())
975/// .pool_idle_timeout(Duration::from_secs(30))
976/// .http2_only(true)
977/// .build_http();
978/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
979/// # drop(infer);
980/// # }
981/// # fn main() {}
982/// ```
983#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
984#[derive(Clone)]
985pub struct Builder {
986 client_config: Config,
987 exec: Exec,
988 #[cfg(feature = "http1")]
989 h1_builder: hyper::client::conn::http1::Builder,
990 #[cfg(feature = "http2")]
991 h2_builder: hyper::client::conn::http2::Builder<Exec>,
992 pool_config: pool::Config,
993 pool_timer: Option<timer::Timer>,
994}
995
996impl Builder {
997 /// Construct a new Builder.
998 pub fn new<E>(executor: E) -> Self
999 where
1000 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1001 {
1002 let exec = Exec::new(executor);
1003 Self {
1004 client_config: Config {
1005 retry_canceled_requests: true,
1006 set_host: true,
1007 ver: Ver::Auto,
1008 },
1009 exec: exec.clone(),
1010 #[cfg(feature = "http1")]
1011 h1_builder: hyper::client::conn::http1::Builder::new(),
1012 #[cfg(feature = "http2")]
1013 h2_builder: hyper::client::conn::http2::Builder::new(exec),
1014 pool_config: pool::Config {
1015 idle_timeout: Some(Duration::from_secs(90)),
1016 max_idle_per_host: usize::MAX,
1017 },
1018 pool_timer: None,
1019 }
1020 }
1021 /// Set an optional timeout for idle sockets being kept-alive.
1022 /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1023 ///
1024 /// Pass `None` to disable timeout.
1025 ///
1026 /// Default is 90 seconds.
1027 ///
1028 /// # Example
1029 ///
1030 /// ```
1031 /// # #[cfg(feature = "tokio")]
1032 /// # fn run () {
1033 /// use std::time::Duration;
1034 /// use hyper_util::client::legacy::Client;
1035 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1036 ///
1037 /// let client = Client::builder(TokioExecutor::new())
1038 /// .pool_idle_timeout(Duration::from_secs(30))
1039 /// .pool_timer(TokioTimer::new())
1040 /// .build_http();
1041 ///
1042 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1043 /// # }
1044 /// # fn main() {}
1045 /// ```
1046 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1047 where
1048 D: Into<Option<Duration>>,
1049 {
1050 self.pool_config.idle_timeout = val.into();
1051 self
1052 }
1053
1054 #[doc(hidden)]
1055 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1056 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1057 self.pool_config.max_idle_per_host = max_idle;
1058 self
1059 }
1060
1061 /// Sets the maximum idle connection per host allowed in the pool.
1062 ///
1063 /// Default is `usize::MAX` (no limit).
1064 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1065 self.pool_config.max_idle_per_host = max_idle;
1066 self
1067 }
1068
1069 // HTTP/1 options
1070
1071 /// Sets the exact size of the read buffer to *always* use.
1072 ///
1073 /// Note that setting this option unsets the `http1_max_buf_size` option.
1074 ///
1075 /// Default is an adaptive read buffer.
1076 #[cfg(feature = "http1")]
1077 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1078 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1079 self.h1_builder.read_buf_exact_size(Some(sz));
1080 self
1081 }
1082
1083 /// Set the maximum buffer size for the connection.
1084 ///
1085 /// Default is ~400kb.
1086 ///
1087 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1088 ///
1089 /// # Panics
1090 ///
1091 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1092 #[cfg(feature = "http1")]
1093 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1094 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1095 self.h1_builder.max_buf_size(max);
1096 self
1097 }
1098
1099 /// Set whether HTTP/1 connections will accept spaces between header names
1100 /// and the colon that follow them in responses.
1101 ///
1102 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1103 /// parsing.
1104 ///
1105 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1106 /// to say about it:
1107 ///
1108 /// > No whitespace is allowed between the header field-name and colon. In
1109 /// > the past, differences in the handling of such whitespace have led to
1110 /// > security vulnerabilities in request routing and response handling. A
1111 /// > server MUST reject any received request message that contains
1112 /// > whitespace between a header field-name and colon with a response code
1113 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1114 /// > response message before forwarding the message downstream.
1115 ///
1116 /// Note that this setting does not affect HTTP/2.
1117 ///
1118 /// Default is false.
1119 ///
1120 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1121 #[cfg(feature = "http1")]
1122 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1123 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1124 self.h1_builder
1125 .allow_spaces_after_header_name_in_responses(val);
1126 self
1127 }
1128
1129 /// Set whether HTTP/1 connections will accept obsolete line folding for
1130 /// header values.
1131 ///
1132 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1133 /// to say about it:
1134 ///
1135 /// > A server that receives an obs-fold in a request message that is not
1136 /// > within a message/http container MUST either reject the message by
1137 /// > sending a 400 (Bad Request), preferably with a representation
1138 /// > explaining that obsolete line folding is unacceptable, or replace
1139 /// > each received obs-fold with one or more SP octets prior to
1140 /// > interpreting the field value or forwarding the message downstream.
1141 ///
1142 /// > A proxy or gateway that receives an obs-fold in a response message
1143 /// > that is not within a message/http container MUST either discard the
1144 /// > message and replace it with a 502 (Bad Gateway) response, preferably
1145 /// > with a representation explaining that unacceptable line folding was
1146 /// > received, or replace each received obs-fold with one or more SP
1147 /// > octets prior to interpreting the field value or forwarding the
1148 /// > message downstream.
1149 ///
1150 /// > A user agent that receives an obs-fold in a response message that is
1151 /// > not within a message/http container MUST replace each received
1152 /// > obs-fold with one or more SP octets prior to interpreting the field
1153 /// > value.
1154 ///
1155 /// Note that this setting does not affect HTTP/2.
1156 ///
1157 /// Default is false.
1158 ///
1159 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1160 #[cfg(feature = "http1")]
1161 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1162 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1163 self.h1_builder
1164 .allow_obsolete_multiline_headers_in_responses(val);
1165 self
1166 }
1167
1168 /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1169 ///
1170 /// This mimics the behaviour of major browsers. You probably don't want this.
1171 /// You should only want this if you are implementing a proxy whose main
1172 /// purpose is to sit in front of browsers whose users access arbitrary content
1173 /// which may be malformed, and they expect everything that works without
1174 /// the proxy to keep working with the proxy.
1175 ///
1176 /// This option will prevent Hyper's client from returning an error encountered
1177 /// when parsing a header, except if the error was caused by the character NUL
1178 /// (ASCII code 0), as Chrome specifically always reject those.
1179 ///
1180 /// The ignorable errors are:
1181 /// * empty header names;
1182 /// * characters that are not allowed in header names, except for `\0` and `\r`;
1183 /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1184 /// spaces and tabs between the header name and the colon;
1185 /// * missing colon between header name and colon;
1186 /// * characters that are not allowed in header values except for `\0` and `\r`.
1187 ///
1188 /// If an ignorable error is encountered, the parser tries to find the next
1189 /// line in the input to resume parsing the rest of the headers. An error
1190 /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1191 /// looking for the next line.
1192 #[cfg(feature = "http1")]
1193 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1194 pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1195 self.h1_builder.ignore_invalid_headers_in_responses(val);
1196 self
1197 }
1198
1199 /// Set whether HTTP/1 connections should try to use vectored writes,
1200 /// or always flatten into a single buffer.
1201 ///
1202 /// Note that setting this to false may mean more copies of body data,
1203 /// but may also improve performance when an IO transport doesn't
1204 /// support vectored writes well, such as most TLS implementations.
1205 ///
1206 /// Setting this to true will force hyper to use queued strategy
1207 /// which may eliminate unnecessary cloning on some TLS backends
1208 ///
1209 /// Default is `auto`. In this mode hyper will try to guess which
1210 /// mode to use
1211 #[cfg(feature = "http1")]
1212 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1213 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1214 self.h1_builder.writev(enabled);
1215 self
1216 }
1217
1218 /// Set whether HTTP/1 connections will write header names as title case at
1219 /// the socket level.
1220 ///
1221 /// Note that this setting does not affect HTTP/2.
1222 ///
1223 /// Default is false.
1224 #[cfg(feature = "http1")]
1225 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1226 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1227 self.h1_builder.title_case_headers(val);
1228 self
1229 }
1230
1231 /// Set whether to support preserving original header cases.
1232 ///
1233 /// Currently, this will record the original cases received, and store them
1234 /// in a private extension on the `Response`. It will also look for and use
1235 /// such an extension in any provided `Request`.
1236 ///
1237 /// Since the relevant extension is still private, there is no way to
1238 /// interact with the original cases. The only effect this can have now is
1239 /// to forward the cases in a proxy-like fashion.
1240 ///
1241 /// Note that this setting does not affect HTTP/2.
1242 ///
1243 /// Default is false.
1244 #[cfg(feature = "http1")]
1245 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1246 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1247 self.h1_builder.preserve_header_case(val);
1248 self
1249 }
1250
1251 /// Set the maximum number of headers.
1252 ///
1253 /// When a response is received, the parser will reserve a buffer to store headers for optimal
1254 /// performance.
1255 ///
1256 /// If client receives more headers than the buffer size, the error "message header too large"
1257 /// is returned.
1258 ///
1259 /// The headers is allocated on the stack by default, which has higher performance. After
1260 /// setting this value, headers will be allocated in heap memory, that is, heap memory
1261 /// allocation will occur for each response, and there will be a performance drop of about 5%.
1262 ///
1263 /// Note that this setting does not affect HTTP/2.
1264 ///
1265 /// Default is 100.
1266 #[cfg(feature = "http1")]
1267 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1268 pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1269 self.h1_builder.max_headers(val);
1270 self
1271 }
1272
1273 /// Set whether HTTP/0.9 responses should be tolerated.
1274 ///
1275 /// Default is false.
1276 #[cfg(feature = "http1")]
1277 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1278 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1279 self.h1_builder.http09_responses(val);
1280 self
1281 }
1282
1283 /// Set whether the connection **must** use HTTP/2.
1284 ///
1285 /// The destination must either allow HTTP2 Prior Knowledge, or the
1286 /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1287 /// as part of the connection process. This will not make the `Client`
1288 /// utilize ALPN by itself.
1289 ///
1290 /// Note that setting this to true prevents HTTP/1 from being allowed.
1291 ///
1292 /// Default is false.
1293 #[cfg(feature = "http2")]
1294 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1295 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1296 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1297 self
1298 }
1299
1300 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1301 ///
1302 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1303 /// As of v0.4.0, it is 20.
1304 ///
1305 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1306 #[cfg(feature = "http2")]
1307 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1308 pub fn http2_max_pending_accept_reset_streams(
1309 &mut self,
1310 max: impl Into<Option<usize>>,
1311 ) -> &mut Self {
1312 self.h2_builder.max_pending_accept_reset_streams(max.into());
1313 self
1314 }
1315
1316 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1317 /// stream-level flow control.
1318 ///
1319 /// Passing `None` will do nothing.
1320 ///
1321 /// If not set, hyper will use a default.
1322 ///
1323 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1324 #[cfg(feature = "http2")]
1325 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1326 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1327 self.h2_builder.initial_stream_window_size(sz.into());
1328 self
1329 }
1330
1331 /// Sets the max connection-level flow control for HTTP2
1332 ///
1333 /// Passing `None` will do nothing.
1334 ///
1335 /// If not set, hyper will use a default.
1336 #[cfg(feature = "http2")]
1337 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1338 pub fn http2_initial_connection_window_size(
1339 &mut self,
1340 sz: impl Into<Option<u32>>,
1341 ) -> &mut Self {
1342 self.h2_builder.initial_connection_window_size(sz.into());
1343 self
1344 }
1345
1346 /// Sets the initial maximum of locally initiated (send) streams.
1347 ///
1348 /// This value will be overwritten by the value included in the initial
1349 /// SETTINGS frame received from the peer as part of a [connection preface].
1350 ///
1351 /// Passing `None` will do nothing.
1352 ///
1353 /// If not set, hyper will use a default.
1354 ///
1355 /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1356 #[cfg(feature = "http2")]
1357 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1358 pub fn http2_initial_max_send_streams(
1359 &mut self,
1360 initial: impl Into<Option<usize>>,
1361 ) -> &mut Self {
1362 self.h2_builder.initial_max_send_streams(initial);
1363 self
1364 }
1365
1366 /// Sets whether to use an adaptive flow control.
1367 ///
1368 /// Enabling this will override the limits set in
1369 /// `http2_initial_stream_window_size` and
1370 /// `http2_initial_connection_window_size`.
1371 #[cfg(feature = "http2")]
1372 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1373 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1374 self.h2_builder.adaptive_window(enabled);
1375 self
1376 }
1377
1378 /// Sets the maximum frame size to use for HTTP2.
1379 ///
1380 /// Passing `None` will do nothing.
1381 ///
1382 /// If not set, hyper will use a default.
1383 #[cfg(feature = "http2")]
1384 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1385 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1386 self.h2_builder.max_frame_size(sz);
1387 self
1388 }
1389
1390 /// Sets the max size of received header frames for HTTP2.
1391 ///
1392 /// Default is currently 16KB, but can change.
1393 #[cfg(feature = "http2")]
1394 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1395 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1396 self.h2_builder.max_header_list_size(max);
1397 self
1398 }
1399
1400 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1401 /// connection alive.
1402 ///
1403 /// Pass `None` to disable HTTP2 keep-alive.
1404 ///
1405 /// Default is currently disabled.
1406 ///
1407 /// # Cargo Feature
1408 ///
1409 /// Requires the `tokio` cargo feature to be enabled.
1410 #[cfg(feature = "tokio")]
1411 #[cfg(feature = "http2")]
1412 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1413 pub fn http2_keep_alive_interval(
1414 &mut self,
1415 interval: impl Into<Option<Duration>>,
1416 ) -> &mut Self {
1417 self.h2_builder.keep_alive_interval(interval);
1418 self
1419 }
1420
1421 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1422 ///
1423 /// If the ping is not acknowledged within the timeout, the connection will
1424 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1425 ///
1426 /// Default is 20 seconds.
1427 ///
1428 /// # Cargo Feature
1429 ///
1430 /// Requires the `tokio` cargo feature to be enabled.
1431 #[cfg(feature = "tokio")]
1432 #[cfg(feature = "http2")]
1433 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1434 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1435 self.h2_builder.keep_alive_timeout(timeout);
1436 self
1437 }
1438
1439 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1440 ///
1441 /// If disabled, keep-alive pings are only sent while there are open
1442 /// request/responses streams. If enabled, pings are also sent when no
1443 /// streams are active. Does nothing if `http2_keep_alive_interval` is
1444 /// disabled.
1445 ///
1446 /// Default is `false`.
1447 ///
1448 /// # Cargo Feature
1449 ///
1450 /// Requires the `tokio` cargo feature to be enabled.
1451 #[cfg(feature = "tokio")]
1452 #[cfg(feature = "http2")]
1453 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1454 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1455 self.h2_builder.keep_alive_while_idle(enabled);
1456 self
1457 }
1458
1459 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1460 ///
1461 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1462 /// details.
1463 ///
1464 /// The default value is determined by the `h2` crate.
1465 ///
1466 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1467 #[cfg(feature = "http2")]
1468 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1469 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1470 self.h2_builder.max_concurrent_reset_streams(max);
1471 self
1472 }
1473
1474 /// Provide a timer to be used for h2
1475 ///
1476 /// See the documentation of [`h2::client::Builder::timer`] for more
1477 /// details.
1478 ///
1479 /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1480 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1481 where
1482 M: Timer + Send + Sync + 'static,
1483 {
1484 #[cfg(feature = "http2")]
1485 self.h2_builder.timer(timer);
1486 self
1487 }
1488
1489 /// Provide a timer to be used for timeouts and intervals in connection pools.
1490 pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1491 where
1492 M: Timer + Clone + Send + Sync + 'static,
1493 {
1494 self.pool_timer = Some(timer::Timer::new(timer.clone()));
1495 self
1496 }
1497
1498 /// Set the maximum write buffer size for each HTTP/2 stream.
1499 ///
1500 /// Default is currently 1MB, but may change.
1501 ///
1502 /// # Panics
1503 ///
1504 /// The value must be no larger than `u32::MAX`.
1505 #[cfg(feature = "http2")]
1506 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1507 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1508 self.h2_builder.max_send_buf_size(max);
1509 self
1510 }
1511
1512 /// Set whether to retry requests that get disrupted before ever starting
1513 /// to write.
1514 ///
1515 /// This means a request that is queued, and gets given an idle, reused
1516 /// connection, and then encounters an error immediately as the idle
1517 /// connection was found to be unusable.
1518 ///
1519 /// When this is set to `false`, the related `ResponseFuture` would instead
1520 /// resolve to an `Error::Cancel`.
1521 ///
1522 /// Default is `true`.
1523 #[inline]
1524 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1525 self.client_config.retry_canceled_requests = val;
1526 self
1527 }
1528
1529 /// Set whether to automatically add the `Host` header to requests.
1530 ///
1531 /// If true, and a request does not include a `Host` header, one will be
1532 /// added automatically, derived from the authority of the `Uri`.
1533 ///
1534 /// Default is `true`.
1535 #[inline]
1536 pub fn set_host(&mut self, val: bool) -> &mut Self {
1537 self.client_config.set_host = val;
1538 self
1539 }
1540
1541 /// Build a client with this configuration and the default `HttpConnector`.
1542 #[cfg(feature = "tokio")]
1543 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1544 where
1545 B: Body + Send,
1546 B::Data: Send,
1547 {
1548 let mut connector = HttpConnector::new();
1549 if self.pool_config.is_enabled() {
1550 connector.set_keepalive(self.pool_config.idle_timeout);
1551 }
1552 self.build(connector)
1553 }
1554
1555 /// Combine the configuration of this builder with a connector to create a `Client`.
1556 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1557 where
1558 C: Connect + Clone,
1559 B: Body + Send,
1560 B::Data: Send,
1561 {
1562 let exec = self.exec.clone();
1563 let timer = self.pool_timer.clone();
1564 Client {
1565 config: self.client_config,
1566 exec: exec.clone(),
1567 #[cfg(feature = "http1")]
1568 h1_builder: self.h1_builder.clone(),
1569 #[cfg(feature = "http2")]
1570 h2_builder: self.h2_builder.clone(),
1571 connector,
1572 pool: pool::Pool::new(self.pool_config, exec, timer),
1573 }
1574 }
1575}
1576
1577impl fmt::Debug for Builder {
1578 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1579 f.debug_struct("Builder")
1580 .field("client_config", &self.client_config)
1581 .field("pool_config", &self.pool_config)
1582 .finish()
1583 }
1584}
1585
1586// ==== impl Error ====
1587
1588impl fmt::Debug for Error {
1589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1590 let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1591 f.field(&self.kind);
1592 if let Some(ref cause) = self.source {
1593 f.field(cause);
1594 }
1595 f.finish()
1596 }
1597}
1598
1599impl fmt::Display for Error {
1600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1601 write!(f, "client error ({:?})", self.kind)
1602 }
1603}
1604
1605impl StdError for Error {
1606 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1607 self.source.as_ref().map(|e| &**e as _)
1608 }
1609}
1610
1611impl Error {
1612 /// Returns true if this was an error from `Connect`.
1613 pub fn is_connect(&self) -> bool {
1614 matches!(self.kind, ErrorKind::Connect)
1615 }
1616
1617 /// Returns the info of the client connection on which this error occurred.
1618 #[cfg(any(feature = "http1", feature = "http2"))]
1619 pub fn connect_info(&self) -> Option<&Connected> {
1620 self.connect_info.as_ref()
1621 }
1622
1623 #[cfg(any(feature = "http1", feature = "http2"))]
1624 fn with_connect_info(self, connect_info: Connected) -> Self {
1625 Self {
1626 connect_info: Some(connect_info),
1627 ..self
1628 }
1629 }
1630 fn is_canceled(&self) -> bool {
1631 matches!(self.kind, ErrorKind::Canceled)
1632 }
1633
1634 fn tx(src: hyper::Error) -> Self {
1635 e!(SendRequest, src)
1636 }
1637
1638 fn closed(src: hyper::Error) -> Self {
1639 e!(ChannelClosed, src)
1640 }
1641}