hyper_util/client/legacy/connect/
capture.rs

1use std::{ops::Deref, sync::Arc};
2
3use http::Request;
4use tokio::sync::watch;
5
6use super::Connected;
7
8/// [`CaptureConnection`] allows callers to capture [`Connected`] information
9///
10/// To capture a connection for a request, use [`capture_connection`].
11#[derive(Debug, Clone)]
12pub struct CaptureConnection {
13    rx: watch::Receiver<Option<Connected>>,
14}
15
16/// Capture the connection for a given request
17///
18/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
19/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
20/// as the connection is established.
21///
22/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
23///
24/// # Examples
25///
26/// **Synchronous access**:
27/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
28/// established. This is ideal for situations where you are certain the connection has already
29/// been established (e.g. after the response future has already completed).
30/// ```rust
31/// use hyper_util::client::legacy::connect::capture_connection;
32/// let mut request = http::Request::builder()
33///   .uri("http://foo.com")
34///   .body(())
35///   .unwrap();
36///
37/// let captured_connection = capture_connection(&mut request);
38/// // some time later after the request has been sent...
39/// let connection_info = captured_connection.connection_metadata();
40/// println!("we are connected! {:?}", connection_info.as_ref());
41/// ```
42///
43/// **Asynchronous access**:
44/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
45/// connection is available.
46///
47/// ```rust
48/// # #[cfg(feature  = "tokio")]
49/// # async fn example() {
50/// use hyper_util::client::legacy::connect::capture_connection;
51/// use hyper_util::client::legacy::Client;
52/// use hyper_util::rt::TokioExecutor;
53/// use bytes::Bytes;
54/// use http_body_util::Empty;
55/// let mut request = http::Request::builder()
56///   .uri("http://foo.com")
57///   .body(Empty::<Bytes>::new())
58///   .unwrap();
59///
60/// let mut captured = capture_connection(&mut request);
61/// tokio::task::spawn(async move {
62///     let connection_info = captured.wait_for_connection_metadata().await;
63///     println!("we are connected! {:?}", connection_info.as_ref());
64/// });
65///
66/// let client = Client::builder(TokioExecutor::new()).build_http();
67/// client.request(request).await.expect("request failed");
68/// # }
69/// ```
70pub fn capture_connection<B>(request: &mut Request<B>) -> CaptureConnection {
71    let (tx, rx) = CaptureConnection::new();
72    request.extensions_mut().insert(tx);
73    rx
74}
75
76/// TxSide for [`CaptureConnection`]
77///
78/// This is inserted into `Extensions` to allow Hyper to back channel connection info
79#[derive(Clone)]
80pub(crate) struct CaptureConnectionExtension {
81    tx: Arc<watch::Sender<Option<Connected>>>,
82}
83
84impl CaptureConnectionExtension {
85    pub(crate) fn set(&self, connected: &Connected) {
86        self.tx.send_replace(Some(connected.clone()));
87    }
88}
89
90impl CaptureConnection {
91    /// Internal API to create the tx and rx half of [`CaptureConnection`]
92    pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
93        let (tx, rx) = watch::channel(None);
94        (
95            CaptureConnectionExtension { tx: Arc::new(tx) },
96            CaptureConnection { rx },
97        )
98    }
99
100    /// Retrieve the connection metadata, if available
101    pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
102        self.rx.borrow()
103    }
104
105    /// Wait for the connection to be established
106    ///
107    /// If a connection was established, this will always return `Some(...)`. If the request never
108    /// successfully connected (e.g. DNS resolution failure), this method will never return.
109    pub async fn wait_for_connection_metadata(
110        &mut self,
111    ) -> impl Deref<Target = Option<Connected>> + '_ {
112        if self.rx.borrow().is_some() {
113            return self.rx.borrow();
114        }
115        let _ = self.rx.changed().await;
116        self.rx.borrow()
117    }
118}
119
120#[cfg(all(test, not(miri)))]
121mod test {
122    use super::*;
123
124    #[test]
125    fn test_sync_capture_connection() {
126        let (tx, rx) = CaptureConnection::new();
127        assert!(
128            rx.connection_metadata().is_none(),
129            "connection has not been set"
130        );
131        tx.set(&Connected::new().proxy(true));
132        assert_eq!(
133            rx.connection_metadata()
134                .as_ref()
135                .expect("connected should be set")
136                .is_proxied(),
137            true
138        );
139
140        // ensure it can be called multiple times
141        assert_eq!(
142            rx.connection_metadata()
143                .as_ref()
144                .expect("connected should be set")
145                .is_proxied(),
146            true
147        );
148    }
149
150    #[tokio::test]
151    async fn async_capture_connection() {
152        let (tx, mut rx) = CaptureConnection::new();
153        assert!(
154            rx.connection_metadata().is_none(),
155            "connection has not been set"
156        );
157        let test_task = tokio::spawn(async move {
158            assert_eq!(
159                rx.wait_for_connection_metadata()
160                    .await
161                    .as_ref()
162                    .expect("connection should be set")
163                    .is_proxied(),
164                true
165            );
166            // can be awaited multiple times
167            assert!(
168                rx.wait_for_connection_metadata().await.is_some(),
169                "should be awaitable multiple times"
170            );
171
172            assert_eq!(rx.connection_metadata().is_some(), true);
173        });
174        // can't be finished, we haven't set the connection yet
175        assert_eq!(test_task.is_finished(), false);
176        tx.set(&Connected::new().proxy(true));
177
178        assert!(test_task.await.is_ok());
179    }
180
181    #[tokio::test]
182    async fn capture_connection_sender_side_dropped() {
183        let (tx, mut rx) = CaptureConnection::new();
184        assert!(
185            rx.connection_metadata().is_none(),
186            "connection has not been set"
187        );
188        drop(tx);
189        assert!(rx.wait_for_connection_metadata().await.is_none());
190    }
191}