hyper_util/client/legacy/
pool.rs

1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_util::ready;
18use tracing::{debug, trace};
19
20use hyper::rt::Sleep;
21use hyper::rt::Timer as _;
22
23use crate::common::{exec, exec::Exec, timer::Timer};
24
25// FIXME: allow() required due to `impl Trait` leaking types to this lint
26#[allow(missing_debug_implementations)]
27pub struct Pool<T, K: Key> {
28    // If the pool is disabled, this is None.
29    inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
30}
31
32// Before using a pooled connection, make sure the sender is not dead.
33//
34// This is a trait to allow the `client::pool::tests` to work for `i32`.
35//
36// See https://github.com/hyperium/hyper/issues/1429
37pub trait Poolable: Unpin + Send + Sized + 'static {
38    fn is_open(&self) -> bool;
39    /// Reserve this connection.
40    ///
41    /// Allows for HTTP/2 to return a shared reservation.
42    fn reserve(self) -> Reservation<Self>;
43    fn can_share(&self) -> bool;
44}
45
46pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
47
48impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
49
50/// A marker to identify what version a pooled connection is.
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
52#[allow(dead_code)]
53pub enum Ver {
54    Auto,
55    Http2,
56}
57
58/// When checking out a pooled connection, it might be that the connection
59/// only supports a single reservation, or it might be usable for many.
60///
61/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
62/// used for multiple requests.
63// FIXME: allow() required due to `impl Trait` leaking types to this lint
64#[allow(missing_debug_implementations)]
65pub enum Reservation<T> {
66    /// This connection could be used multiple times, the first one will be
67    /// reinserted into the `idle` pool, and the second will be given to
68    /// the `Checkout`.
69    #[cfg(feature = "http2")]
70    Shared(T, T),
71    /// This connection requires unique access. It will be returned after
72    /// use is complete.
73    Unique(T),
74}
75
76/// Simple type alias in case the key type needs to be adjusted.
77// pub type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
78
79struct PoolInner<T, K: Eq + Hash> {
80    // A flag that a connection is being established, and the connection
81    // should be shared. This prevents making multiple HTTP/2 connections
82    // to the same host.
83    connecting: HashSet<K>,
84    // These are internal Conns sitting in the event loop in the KeepAlive
85    // state, waiting to receive a new Request to send on the socket.
86    idle: HashMap<K, Vec<Idle<T>>>,
87    max_idle_per_host: usize,
88    // These are outstanding Checkouts that are waiting for a socket to be
89    // able to send a Request one. This is used when "racing" for a new
90    // connection.
91    //
92    // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
93    // for the Pool to receive an idle Conn. When a Conn becomes idle,
94    // this list is checked for any parked Checkouts, and tries to notify
95    // them that the Conn could be used instead of waiting for a brand new
96    // connection.
97    waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
98    // A oneshot channel is used to allow the interval to be notified when
99    // the Pool completely drops. That way, the interval can cancel immediately.
100    idle_interval_ref: Option<oneshot::Sender<Infallible>>,
101    exec: Exec,
102    timer: Option<Timer>,
103    timeout: Option<Duration>,
104}
105
106// This is because `Weak::new()` *allocates* space for `T`, even if it
107// doesn't need it!
108struct WeakOpt<T>(Option<Weak<T>>);
109
110#[derive(Clone, Copy, Debug)]
111pub struct Config {
112    pub idle_timeout: Option<Duration>,
113    pub max_idle_per_host: usize,
114}
115
116impl Config {
117    pub fn is_enabled(&self) -> bool {
118        self.max_idle_per_host > 0
119    }
120}
121
122impl<T, K: Key> Pool<T, K> {
123    pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
124    where
125        E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
126        M: hyper::rt::Timer + Send + Sync + Clone + 'static,
127    {
128        let exec = Exec::new(executor);
129        let timer = timer.map(|t| Timer::new(t));
130        let inner = if config.is_enabled() {
131            Some(Arc::new(Mutex::new(PoolInner {
132                connecting: HashSet::new(),
133                idle: HashMap::new(),
134                idle_interval_ref: None,
135                max_idle_per_host: config.max_idle_per_host,
136                waiters: HashMap::new(),
137                exec,
138                timer,
139                timeout: config.idle_timeout,
140            })))
141        } else {
142            None
143        };
144
145        Pool { inner }
146    }
147
148    pub(crate) fn is_enabled(&self) -> bool {
149        self.inner.is_some()
150    }
151
152    #[cfg(test)]
153    pub(super) fn no_timer(&self) {
154        // Prevent an actual interval from being created for this pool...
155        {
156            let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
157            assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
158            let (tx, _) = oneshot::channel();
159            inner.idle_interval_ref = Some(tx);
160        }
161    }
162}
163
164impl<T: Poolable, K: Key> Pool<T, K> {
165    /// Returns a `Checkout` which is a future that resolves if an idle
166    /// connection becomes available.
167    pub fn checkout(&self, key: K) -> Checkout<T, K> {
168        Checkout {
169            key,
170            pool: self.clone(),
171            waiter: None,
172        }
173    }
174
175    /// Ensure that there is only ever 1 connecting task for HTTP/2
176    /// connections. This does nothing for HTTP/1.
177    pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
178        if ver == Ver::Http2 {
179            if let Some(ref enabled) = self.inner {
180                let mut inner = enabled.lock().unwrap();
181                return if inner.connecting.insert(key.clone()) {
182                    let connecting = Connecting {
183                        key: key.clone(),
184                        pool: WeakOpt::downgrade(enabled),
185                    };
186                    Some(connecting)
187                } else {
188                    trace!("HTTP/2 connecting already in progress for {:?}", key);
189                    None
190                };
191            }
192        }
193
194        // else
195        Some(Connecting {
196            key: key.clone(),
197            // in HTTP/1's case, there is never a lock, so we don't
198            // need to do anything in Drop.
199            pool: WeakOpt::none(),
200        })
201    }
202
203    #[cfg(test)]
204    fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
205        self.inner.as_ref().expect("enabled").lock().expect("lock")
206    }
207
208    /* Used in client/tests.rs...
209    #[cfg(test)]
210    pub(super) fn h1_key(&self, s: &str) -> Key {
211        Arc::new(s.to_string())
212    }
213
214    #[cfg(test)]
215    pub(super) fn idle_count(&self, key: &Key) -> usize {
216        self
217            .locked()
218            .idle
219            .get(key)
220            .map(|list| list.len())
221            .unwrap_or(0)
222    }
223    */
224
225    pub fn pooled(
226        &self,
227        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
228        value: T,
229    ) -> Pooled<T, K> {
230        let (value, pool_ref) = if let Some(ref enabled) = self.inner {
231            match value.reserve() {
232                #[cfg(feature = "http2")]
233                Reservation::Shared(to_insert, to_return) => {
234                    let mut inner = enabled.lock().unwrap();
235                    inner.put(connecting.key.clone(), to_insert, enabled);
236                    // Do this here instead of Drop for Connecting because we
237                    // already have a lock, no need to lock the mutex twice.
238                    inner.connected(&connecting.key);
239                    // prevent the Drop of Connecting from repeating inner.connected()
240                    connecting.pool = WeakOpt::none();
241
242                    // Shared reservations don't need a reference to the pool,
243                    // since the pool always keeps a copy.
244                    (to_return, WeakOpt::none())
245                }
246                Reservation::Unique(value) => {
247                    // Unique reservations must take a reference to the pool
248                    // since they hope to reinsert once the reservation is
249                    // completed
250                    (value, WeakOpt::downgrade(enabled))
251                }
252            }
253        } else {
254            // If pool is not enabled, skip all the things...
255
256            // The Connecting should have had no pool ref
257            debug_assert!(connecting.pool.upgrade().is_none());
258
259            (value, WeakOpt::none())
260        };
261        Pooled {
262            key: connecting.key.clone(),
263            is_reused: false,
264            pool: pool_ref,
265            value: Some(value),
266        }
267    }
268
269    fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
270        debug!("reuse idle connection for {:?}", key);
271        // TODO: unhack this
272        // In Pool::pooled(), which is used for inserting brand new connections,
273        // there's some code that adjusts the pool reference taken depending
274        // on if the Reservation can be shared or is unique. By the time
275        // reuse() is called, the reservation has already been made, and
276        // we just have the final value, without knowledge of if this is
277        // unique or shared. So, the hack is to just assume Ver::Http2 means
278        // shared... :(
279        let mut pool_ref = WeakOpt::none();
280        if !value.can_share() {
281            if let Some(ref enabled) = self.inner {
282                pool_ref = WeakOpt::downgrade(enabled);
283            }
284        }
285
286        Pooled {
287            is_reused: true,
288            key: key.clone(),
289            pool: pool_ref,
290            value: Some(value),
291        }
292    }
293}
294
295/// Pop off this list, looking for a usable connection that hasn't expired.
296struct IdlePopper<'a, T, K> {
297    key: &'a K,
298    list: &'a mut Vec<Idle<T>>,
299}
300
301impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
302    fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
303        while let Some(entry) = self.list.pop() {
304            // If the connection has been closed, or is older than our idle
305            // timeout, simply drop it and keep looking...
306            if !entry.value.is_open() {
307                trace!("removing closed connection for {:?}", self.key);
308                continue;
309            }
310            // TODO: Actually, since the `idle` list is pushed to the end always,
311            // that would imply that if *this* entry is expired, then anything
312            // "earlier" in the list would *have* to be expired also... Right?
313            //
314            // In that case, we could just break out of the loop and drop the
315            // whole list...
316            if expiration.expires(entry.idle_at) {
317                trace!("removing expired connection for {:?}", self.key);
318                continue;
319            }
320
321            let value = match entry.value.reserve() {
322                #[cfg(feature = "http2")]
323                Reservation::Shared(to_reinsert, to_checkout) => {
324                    self.list.push(Idle {
325                        idle_at: Instant::now(),
326                        value: to_reinsert,
327                    });
328                    to_checkout
329                }
330                Reservation::Unique(unique) => unique,
331            };
332
333            return Some(Idle {
334                idle_at: entry.idle_at,
335                value,
336            });
337        }
338
339        None
340    }
341}
342
343impl<T: Poolable, K: Key> PoolInner<T, K> {
344    fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
345        if value.can_share() && self.idle.contains_key(&key) {
346            trace!("put; existing idle HTTP/2 connection for {:?}", key);
347            return;
348        }
349        trace!("put; add idle connection for {:?}", key);
350        let mut remove_waiters = false;
351        let mut value = Some(value);
352        if let Some(waiters) = self.waiters.get_mut(&key) {
353            while let Some(tx) = waiters.pop_front() {
354                if !tx.is_canceled() {
355                    let reserved = value.take().expect("value already sent");
356                    let reserved = match reserved.reserve() {
357                        #[cfg(feature = "http2")]
358                        Reservation::Shared(to_keep, to_send) => {
359                            value = Some(to_keep);
360                            to_send
361                        }
362                        Reservation::Unique(uniq) => uniq,
363                    };
364                    match tx.send(reserved) {
365                        Ok(()) => {
366                            if value.is_none() {
367                                break;
368                            } else {
369                                continue;
370                            }
371                        }
372                        Err(e) => {
373                            value = Some(e);
374                        }
375                    }
376                }
377
378                trace!("put; removing canceled waiter for {:?}", key);
379            }
380            remove_waiters = waiters.is_empty();
381        }
382        if remove_waiters {
383            self.waiters.remove(&key);
384        }
385
386        match value {
387            Some(value) => {
388                // borrow-check scope...
389                {
390                    let idle_list = self.idle.entry(key.clone()).or_default();
391                    if self.max_idle_per_host <= idle_list.len() {
392                        trace!("max idle per host for {:?}, dropping connection", key);
393                        return;
394                    }
395
396                    debug!("pooling idle connection for {:?}", key);
397                    idle_list.push(Idle {
398                        value,
399                        idle_at: Instant::now(),
400                    });
401                }
402
403                self.spawn_idle_interval(__pool_ref);
404            }
405            None => trace!("put; found waiter for {:?}", key),
406        }
407    }
408
409    /// A `Connecting` task is complete. Not necessarily successfully,
410    /// but the lock is going away, so clean up.
411    fn connected(&mut self, key: &K) {
412        let existed = self.connecting.remove(key);
413        debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
414        // cancel any waiters. if there are any, it's because
415        // this Connecting task didn't complete successfully.
416        // those waiters would never receive a connection.
417        self.waiters.remove(key);
418    }
419
420    fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
421        if self.idle_interval_ref.is_some() {
422            return;
423        }
424        let dur = if let Some(dur) = self.timeout {
425            dur
426        } else {
427            return;
428        };
429        let timer = if let Some(timer) = self.timer.clone() {
430            timer
431        } else {
432            return;
433        };
434        let (tx, rx) = oneshot::channel();
435        self.idle_interval_ref = Some(tx);
436
437        let interval = IdleTask {
438            timer: timer.clone(),
439            duration: dur,
440            deadline: Instant::now(),
441            fut: timer.sleep_until(Instant::now()), // ready at first tick
442            pool: WeakOpt::downgrade(pool_ref),
443            pool_drop_notifier: rx,
444        };
445
446        self.exec.execute(interval);
447    }
448}
449
450impl<T, K: Eq + Hash> PoolInner<T, K> {
451    /// Any `FutureResponse`s that were created will have made a `Checkout`,
452    /// and possibly inserted into the pool that it is waiting for an idle
453    /// connection. If a user ever dropped that future, we need to clean out
454    /// those parked senders.
455    fn clean_waiters(&mut self, key: &K) {
456        let mut remove_waiters = false;
457        if let Some(waiters) = self.waiters.get_mut(key) {
458            waiters.retain(|tx| !tx.is_canceled());
459            remove_waiters = waiters.is_empty();
460        }
461        if remove_waiters {
462            self.waiters.remove(key);
463        }
464    }
465}
466
467impl<T: Poolable, K: Key> PoolInner<T, K> {
468    /// This should *only* be called by the IdleTask
469    fn clear_expired(&mut self) {
470        let dur = self.timeout.expect("interval assumes timeout");
471
472        let now = Instant::now();
473        //self.last_idle_check_at = now;
474
475        self.idle.retain(|key, values| {
476            values.retain(|entry| {
477                if !entry.value.is_open() {
478                    trace!("idle interval evicting closed for {:?}", key);
479                    return false;
480                }
481
482                // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
483                if now.saturating_duration_since(entry.idle_at) > dur {
484                    trace!("idle interval evicting expired for {:?}", key);
485                    return false;
486                }
487
488                // Otherwise, keep this value...
489                true
490            });
491
492            // returning false evicts this key/val
493            !values.is_empty()
494        });
495    }
496}
497
498impl<T, K: Key> Clone for Pool<T, K> {
499    fn clone(&self) -> Pool<T, K> {
500        Pool {
501            inner: self.inner.clone(),
502        }
503    }
504}
505
506/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
507// Note: The bounds `T: Poolable` is needed for the Drop impl.
508pub struct Pooled<T: Poolable, K: Key> {
509    value: Option<T>,
510    is_reused: bool,
511    key: K,
512    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
513}
514
515impl<T: Poolable, K: Key> Pooled<T, K> {
516    pub fn is_reused(&self) -> bool {
517        self.is_reused
518    }
519
520    pub fn is_pool_enabled(&self) -> bool {
521        self.pool.0.is_some()
522    }
523
524    fn as_ref(&self) -> &T {
525        self.value.as_ref().expect("not dropped")
526    }
527
528    fn as_mut(&mut self) -> &mut T {
529        self.value.as_mut().expect("not dropped")
530    }
531}
532
533impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
534    type Target = T;
535    fn deref(&self) -> &T {
536        self.as_ref()
537    }
538}
539
540impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
541    fn deref_mut(&mut self) -> &mut T {
542        self.as_mut()
543    }
544}
545
546impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
547    fn drop(&mut self) {
548        if let Some(value) = self.value.take() {
549            if !value.is_open() {
550                // If we *already* know the connection is done here,
551                // it shouldn't be re-inserted back into the pool.
552                return;
553            }
554
555            if let Some(pool) = self.pool.upgrade() {
556                if let Ok(mut inner) = pool.lock() {
557                    inner.put(self.key.clone(), value, &pool);
558                }
559            } else if !value.can_share() {
560                trace!("pool dropped, dropping pooled ({:?})", self.key);
561            }
562            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
563            // have an actual reference to the Pool.
564        }
565    }
566}
567
568impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        f.debug_struct("Pooled").field("key", &self.key).finish()
571    }
572}
573
574struct Idle<T> {
575    idle_at: Instant,
576    value: T,
577}
578
579// FIXME: allow() required due to `impl Trait` leaking types to this lint
580#[allow(missing_debug_implementations)]
581pub struct Checkout<T, K: Key> {
582    key: K,
583    pool: Pool<T, K>,
584    waiter: Option<oneshot::Receiver<T>>,
585}
586
587#[derive(Debug)]
588#[non_exhaustive]
589pub enum Error {
590    PoolDisabled,
591    CheckoutNoLongerWanted,
592    CheckedOutClosedValue,
593}
594
595impl Error {
596    pub(super) fn is_canceled(&self) -> bool {
597        matches!(self, Error::CheckedOutClosedValue)
598    }
599}
600
601impl fmt::Display for Error {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        f.write_str(match self {
604            Error::PoolDisabled => "pool is disabled",
605            Error::CheckedOutClosedValue => "checked out connection was closed",
606            Error::CheckoutNoLongerWanted => "request was canceled",
607        })
608    }
609}
610
611impl StdError for Error {}
612
613impl<T: Poolable, K: Key> Checkout<T, K> {
614    fn poll_waiter(
615        &mut self,
616        cx: &mut task::Context<'_>,
617    ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
618        if let Some(mut rx) = self.waiter.take() {
619            match Pin::new(&mut rx).poll(cx) {
620                Poll::Ready(Ok(value)) => {
621                    if value.is_open() {
622                        Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
623                    } else {
624                        Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
625                    }
626                }
627                Poll::Pending => {
628                    self.waiter = Some(rx);
629                    Poll::Pending
630                }
631                Poll::Ready(Err(_canceled)) => {
632                    Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
633                }
634            }
635        } else {
636            Poll::Ready(None)
637        }
638    }
639
640    fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
641        let entry = {
642            let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
643            let expiration = Expiration::new(inner.timeout);
644            let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
645                trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
646                // A block to end the mutable borrow on list,
647                // so the map below can check is_empty()
648                {
649                    let popper = IdlePopper {
650                        key: &self.key,
651                        list,
652                    };
653                    popper.pop(&expiration)
654                }
655                .map(|e| (e, list.is_empty()))
656            });
657
658            let (entry, empty) = if let Some((e, empty)) = maybe_entry {
659                (Some(e), empty)
660            } else {
661                // No entry found means nuke the list for sure.
662                (None, true)
663            };
664            if empty {
665                //TODO: This could be done with the HashMap::entry API instead.
666                inner.idle.remove(&self.key);
667            }
668
669            if entry.is_none() && self.waiter.is_none() {
670                let (tx, mut rx) = oneshot::channel();
671                trace!("checkout waiting for idle connection: {:?}", self.key);
672                inner
673                    .waiters
674                    .entry(self.key.clone())
675                    .or_insert_with(VecDeque::new)
676                    .push_back(tx);
677
678                // register the waker with this oneshot
679                assert!(Pin::new(&mut rx).poll(cx).is_pending());
680                self.waiter = Some(rx);
681            }
682
683            entry
684        };
685
686        entry.map(|e| self.pool.reuse(&self.key, e.value))
687    }
688}
689
690impl<T: Poolable, K: Key> Future for Checkout<T, K> {
691    type Output = Result<Pooled<T, K>, Error>;
692
693    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
694        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
695            return Poll::Ready(Ok(pooled));
696        }
697
698        if let Some(pooled) = self.checkout(cx) {
699            Poll::Ready(Ok(pooled))
700        } else if !self.pool.is_enabled() {
701            Poll::Ready(Err(Error::PoolDisabled))
702        } else {
703            // There's a new waiter, already registered in self.checkout()
704            debug_assert!(self.waiter.is_some());
705            Poll::Pending
706        }
707    }
708}
709
710impl<T, K: Key> Drop for Checkout<T, K> {
711    fn drop(&mut self) {
712        if self.waiter.take().is_some() {
713            trace!("checkout dropped for {:?}", self.key);
714            if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
715                inner.clean_waiters(&self.key);
716            }
717        }
718    }
719}
720
721// FIXME: allow() required due to `impl Trait` leaking types to this lint
722#[allow(missing_debug_implementations)]
723pub struct Connecting<T: Poolable, K: Key> {
724    key: K,
725    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
726}
727
728impl<T: Poolable, K: Key> Connecting<T, K> {
729    pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
730        debug_assert!(
731            self.pool.0.is_none(),
732            "Connecting::alpn_h2 but already Http2"
733        );
734
735        pool.connecting(&self.key, Ver::Http2)
736    }
737}
738
739impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
740    fn drop(&mut self) {
741        if let Some(pool) = self.pool.upgrade() {
742            // No need to panic on drop, that could abort!
743            if let Ok(mut inner) = pool.lock() {
744                inner.connected(&self.key);
745            }
746        }
747    }
748}
749
750struct Expiration(Option<Duration>);
751
752impl Expiration {
753    fn new(dur: Option<Duration>) -> Expiration {
754        Expiration(dur)
755    }
756
757    fn expires(&self, instant: Instant) -> bool {
758        match self.0 {
759            // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
760            Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
761            None => false,
762        }
763    }
764}
765
766pin_project_lite::pin_project! {
767    struct IdleTask<T, K: Key> {
768        timer: Timer,
769        duration: Duration,
770        deadline: Instant,
771        fut: Pin<Box<dyn Sleep>>,
772        pool: WeakOpt<Mutex<PoolInner<T, K>>>,
773        // This allows the IdleTask to be notified as soon as the entire
774        // Pool is fully dropped, and shutdown. This channel is never sent on,
775        // but Err(Canceled) will be received when the Pool is dropped.
776        #[pin]
777        pool_drop_notifier: oneshot::Receiver<Infallible>,
778    }
779}
780
781impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
782    type Output = ();
783
784    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
785        let mut this = self.project();
786        loop {
787            match this.pool_drop_notifier.as_mut().poll(cx) {
788                Poll::Ready(Ok(n)) => match n {},
789                Poll::Pending => (),
790                Poll::Ready(Err(_canceled)) => {
791                    trace!("pool closed, canceling idle interval");
792                    return Poll::Ready(());
793                }
794            }
795
796            ready!(Pin::new(&mut this.fut).poll(cx));
797            // Set this task to run after the next deadline
798            // If the poll missed the deadline by a lot, set the deadline
799            // from the current time instead
800            *this.deadline += *this.duration;
801            if *this.deadline < Instant::now() - Duration::from_millis(5) {
802                *this.deadline = Instant::now() + *this.duration;
803            }
804            *this.fut = this.timer.sleep_until(*this.deadline);
805
806            if let Some(inner) = this.pool.upgrade() {
807                if let Ok(mut inner) = inner.lock() {
808                    trace!("idle interval checking for expired");
809                    inner.clear_expired();
810                    continue;
811                }
812            }
813            return Poll::Ready(());
814        }
815    }
816}
817
818impl<T> WeakOpt<T> {
819    fn none() -> Self {
820        WeakOpt(None)
821    }
822
823    fn downgrade(arc: &Arc<T>) -> Self {
824        WeakOpt(Some(Arc::downgrade(arc)))
825    }
826
827    fn upgrade(&self) -> Option<Arc<T>> {
828        self.0.as_ref().and_then(Weak::upgrade)
829    }
830}
831
832#[cfg(all(test, not(miri)))]
833mod tests {
834    use std::fmt::Debug;
835    use std::future::Future;
836    use std::hash::Hash;
837    use std::pin::Pin;
838    use std::task::{self, Poll};
839    use std::time::Duration;
840
841    use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
842    use crate::rt::{TokioExecutor, TokioTimer};
843
844    use crate::common::timer;
845
846    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
847    struct KeyImpl(http::uri::Scheme, http::uri::Authority);
848
849    type KeyTuple = (http::uri::Scheme, http::uri::Authority);
850
851    /// Test unique reservations.
852    #[derive(Debug, PartialEq, Eq)]
853    struct Uniq<T>(T);
854
855    impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
856        fn is_open(&self) -> bool {
857            true
858        }
859
860        fn reserve(self) -> Reservation<Self> {
861            Reservation::Unique(self)
862        }
863
864        fn can_share(&self) -> bool {
865            false
866        }
867    }
868
869    fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
870        Connecting {
871            key,
872            pool: WeakOpt::none(),
873        }
874    }
875
876    fn host_key(s: &str) -> KeyImpl {
877        KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
878    }
879
880    fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
881        pool_max_idle_no_timer(::std::usize::MAX)
882    }
883
884    fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
885        let pool = Pool::new(
886            super::Config {
887                idle_timeout: Some(Duration::from_millis(100)),
888                max_idle_per_host: max_idle,
889            },
890            TokioExecutor::new(),
891            Option::<timer::Timer>::None,
892        );
893        pool.no_timer();
894        pool
895    }
896
897    #[tokio::test]
898    async fn test_pool_checkout_smoke() {
899        let pool = pool_no_timer();
900        let key = host_key("foo");
901        let pooled = pool.pooled(c(key.clone()), Uniq(41));
902
903        drop(pooled);
904
905        match pool.checkout(key).await {
906            Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
907            Err(_) => panic!("not ready"),
908        };
909    }
910
911    /// Helper to check if the future is ready after polling once.
912    struct PollOnce<'a, F>(&'a mut F);
913
914    impl<F, T, U> Future for PollOnce<'_, F>
915    where
916        F: Future<Output = Result<T, U>> + Unpin,
917    {
918        type Output = Option<()>;
919
920        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
921            match Pin::new(&mut self.0).poll(cx) {
922                Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
923                Poll::Ready(Err(_)) => Poll::Ready(Some(())),
924                Poll::Pending => Poll::Ready(None),
925            }
926        }
927    }
928
929    #[tokio::test]
930    async fn test_pool_checkout_returns_none_if_expired() {
931        let pool = pool_no_timer();
932        let key = host_key("foo");
933        let pooled = pool.pooled(c(key.clone()), Uniq(41));
934
935        drop(pooled);
936        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
937        let mut checkout = pool.checkout(key);
938        let poll_once = PollOnce(&mut checkout);
939        let is_not_ready = poll_once.await.is_none();
940        assert!(is_not_ready);
941    }
942
943    #[tokio::test]
944    async fn test_pool_checkout_removes_expired() {
945        let pool = pool_no_timer();
946        let key = host_key("foo");
947
948        pool.pooled(c(key.clone()), Uniq(41));
949        pool.pooled(c(key.clone()), Uniq(5));
950        pool.pooled(c(key.clone()), Uniq(99));
951
952        assert_eq!(
953            pool.locked().idle.get(&key).map(|entries| entries.len()),
954            Some(3)
955        );
956        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
957
958        let mut checkout = pool.checkout(key.clone());
959        let poll_once = PollOnce(&mut checkout);
960        // checkout.await should clean out the expired
961        poll_once.await;
962        assert!(pool.locked().idle.get(&key).is_none());
963    }
964
965    #[test]
966    fn test_pool_max_idle_per_host() {
967        let pool = pool_max_idle_no_timer(2);
968        let key = host_key("foo");
969
970        pool.pooled(c(key.clone()), Uniq(41));
971        pool.pooled(c(key.clone()), Uniq(5));
972        pool.pooled(c(key.clone()), Uniq(99));
973
974        // pooled and dropped 3, max_idle should only allow 2
975        assert_eq!(
976            pool.locked().idle.get(&key).map(|entries| entries.len()),
977            Some(2)
978        );
979    }
980
981    #[tokio::test]
982    async fn test_pool_timer_removes_expired() {
983        let pool = Pool::new(
984            super::Config {
985                idle_timeout: Some(Duration::from_millis(10)),
986                max_idle_per_host: std::usize::MAX,
987            },
988            TokioExecutor::new(),
989            Some(TokioTimer::new()),
990        );
991
992        let key = host_key("foo");
993
994        pool.pooled(c(key.clone()), Uniq(41));
995        pool.pooled(c(key.clone()), Uniq(5));
996        pool.pooled(c(key.clone()), Uniq(99));
997
998        assert_eq!(
999            pool.locked().idle.get(&key).map(|entries| entries.len()),
1000            Some(3)
1001        );
1002
1003        // Let the timer tick passed the expiration...
1004        tokio::time::sleep(Duration::from_millis(30)).await;
1005        // Yield so the Interval can reap...
1006        tokio::task::yield_now().await;
1007
1008        assert!(pool.locked().idle.get(&key).is_none());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_pool_checkout_task_unparked() {
1013        use futures_util::future::join;
1014        use futures_util::FutureExt;
1015
1016        let pool = pool_no_timer();
1017        let key = host_key("foo");
1018        let pooled = pool.pooled(c(key.clone()), Uniq(41));
1019
1020        let checkout = join(pool.checkout(key), async {
1021            // the checkout future will park first,
1022            // and then this lazy future will be polled, which will insert
1023            // the pooled back into the pool
1024            //
1025            // this test makes sure that doing so will unpark the checkout
1026            drop(pooled);
1027        })
1028        .map(|(entry, _)| entry);
1029
1030        assert_eq!(*checkout.await.unwrap(), Uniq(41));
1031    }
1032
1033    #[tokio::test]
1034    async fn test_pool_checkout_drop_cleans_up_waiters() {
1035        let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1036        let key = host_key("foo");
1037
1038        let mut checkout1 = pool.checkout(key.clone());
1039        let mut checkout2 = pool.checkout(key.clone());
1040
1041        let poll_once1 = PollOnce(&mut checkout1);
1042        let poll_once2 = PollOnce(&mut checkout2);
1043
1044        // first poll needed to get into Pool's parked
1045        poll_once1.await;
1046        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1047        poll_once2.await;
1048        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1049
1050        // on drop, clean up Pool
1051        drop(checkout1);
1052        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1053
1054        drop(checkout2);
1055        assert!(pool.locked().waiters.get(&key).is_none());
1056    }
1057
1058    #[derive(Debug)]
1059    struct CanClose {
1060        #[allow(unused)]
1061        val: i32,
1062        closed: bool,
1063    }
1064
1065    impl Poolable for CanClose {
1066        fn is_open(&self) -> bool {
1067            !self.closed
1068        }
1069
1070        fn reserve(self) -> Reservation<Self> {
1071            Reservation::Unique(self)
1072        }
1073
1074        fn can_share(&self) -> bool {
1075            false
1076        }
1077    }
1078
1079    #[test]
1080    fn pooled_drop_if_closed_doesnt_reinsert() {
1081        let pool = pool_no_timer();
1082        let key = host_key("foo");
1083        pool.pooled(
1084            c(key.clone()),
1085            CanClose {
1086                val: 57,
1087                closed: true,
1088            },
1089        );
1090
1091        assert!(!pool.locked().idle.contains_key(&key));
1092    }
1093}