1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_util::ready;
18use tracing::{debug, trace};
19
20use hyper::rt::Sleep;
21use hyper::rt::Timer as _;
22
23use crate::common::{exec, exec::Exec, timer::Timer};
24
25#[allow(missing_debug_implementations)]
27pub struct Pool<T, K: Key> {
28 inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
30}
31
32pub trait Poolable: Unpin + Send + Sized + 'static {
38 fn is_open(&self) -> bool;
39 fn reserve(self) -> Reservation<Self>;
43 fn can_share(&self) -> bool;
44}
45
46pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
47
48impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
52#[allow(dead_code)]
53pub enum Ver {
54 Auto,
55 Http2,
56}
57
58#[allow(missing_debug_implementations)]
65pub enum Reservation<T> {
66 #[cfg(feature = "http2")]
70 Shared(T, T),
71 Unique(T),
74}
75
76struct PoolInner<T, K: Eq + Hash> {
80 connecting: HashSet<K>,
84 idle: HashMap<K, Vec<Idle<T>>>,
87 max_idle_per_host: usize,
88 waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
98 idle_interval_ref: Option<oneshot::Sender<Infallible>>,
101 exec: Exec,
102 timer: Option<Timer>,
103 timeout: Option<Duration>,
104}
105
106struct WeakOpt<T>(Option<Weak<T>>);
109
110#[derive(Clone, Copy, Debug)]
111pub struct Config {
112 pub idle_timeout: Option<Duration>,
113 pub max_idle_per_host: usize,
114}
115
116impl Config {
117 pub fn is_enabled(&self) -> bool {
118 self.max_idle_per_host > 0
119 }
120}
121
122impl<T, K: Key> Pool<T, K> {
123 pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
124 where
125 E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
126 M: hyper::rt::Timer + Send + Sync + Clone + 'static,
127 {
128 let exec = Exec::new(executor);
129 let timer = timer.map(|t| Timer::new(t));
130 let inner = if config.is_enabled() {
131 Some(Arc::new(Mutex::new(PoolInner {
132 connecting: HashSet::new(),
133 idle: HashMap::new(),
134 idle_interval_ref: None,
135 max_idle_per_host: config.max_idle_per_host,
136 waiters: HashMap::new(),
137 exec,
138 timer,
139 timeout: config.idle_timeout,
140 })))
141 } else {
142 None
143 };
144
145 Pool { inner }
146 }
147
148 pub(crate) fn is_enabled(&self) -> bool {
149 self.inner.is_some()
150 }
151
152 #[cfg(test)]
153 pub(super) fn no_timer(&self) {
154 {
156 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
157 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
158 let (tx, _) = oneshot::channel();
159 inner.idle_interval_ref = Some(tx);
160 }
161 }
162}
163
164impl<T: Poolable, K: Key> Pool<T, K> {
165 pub fn checkout(&self, key: K) -> Checkout<T, K> {
168 Checkout {
169 key,
170 pool: self.clone(),
171 waiter: None,
172 }
173 }
174
175 pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
178 if ver == Ver::Http2 {
179 if let Some(ref enabled) = self.inner {
180 let mut inner = enabled.lock().unwrap();
181 return if inner.connecting.insert(key.clone()) {
182 let connecting = Connecting {
183 key: key.clone(),
184 pool: WeakOpt::downgrade(enabled),
185 };
186 Some(connecting)
187 } else {
188 trace!("HTTP/2 connecting already in progress for {:?}", key);
189 None
190 };
191 }
192 }
193
194 Some(Connecting {
196 key: key.clone(),
197 pool: WeakOpt::none(),
200 })
201 }
202
203 #[cfg(test)]
204 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
205 self.inner.as_ref().expect("enabled").lock().expect("lock")
206 }
207
208 pub fn pooled(
226 &self,
227 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
228 value: T,
229 ) -> Pooled<T, K> {
230 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
231 match value.reserve() {
232 #[cfg(feature = "http2")]
233 Reservation::Shared(to_insert, to_return) => {
234 let mut inner = enabled.lock().unwrap();
235 inner.put(connecting.key.clone(), to_insert, enabled);
236 inner.connected(&connecting.key);
239 connecting.pool = WeakOpt::none();
241
242 (to_return, WeakOpt::none())
245 }
246 Reservation::Unique(value) => {
247 (value, WeakOpt::downgrade(enabled))
251 }
252 }
253 } else {
254 debug_assert!(connecting.pool.upgrade().is_none());
258
259 (value, WeakOpt::none())
260 };
261 Pooled {
262 key: connecting.key.clone(),
263 is_reused: false,
264 pool: pool_ref,
265 value: Some(value),
266 }
267 }
268
269 fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
270 debug!("reuse idle connection for {:?}", key);
271 let mut pool_ref = WeakOpt::none();
280 if !value.can_share() {
281 if let Some(ref enabled) = self.inner {
282 pool_ref = WeakOpt::downgrade(enabled);
283 }
284 }
285
286 Pooled {
287 is_reused: true,
288 key: key.clone(),
289 pool: pool_ref,
290 value: Some(value),
291 }
292 }
293}
294
295struct IdlePopper<'a, T, K> {
297 key: &'a K,
298 list: &'a mut Vec<Idle<T>>,
299}
300
301impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
302 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
303 while let Some(entry) = self.list.pop() {
304 if !entry.value.is_open() {
307 trace!("removing closed connection for {:?}", self.key);
308 continue;
309 }
310 if expiration.expires(entry.idle_at) {
317 trace!("removing expired connection for {:?}", self.key);
318 continue;
319 }
320
321 let value = match entry.value.reserve() {
322 #[cfg(feature = "http2")]
323 Reservation::Shared(to_reinsert, to_checkout) => {
324 self.list.push(Idle {
325 idle_at: Instant::now(),
326 value: to_reinsert,
327 });
328 to_checkout
329 }
330 Reservation::Unique(unique) => unique,
331 };
332
333 return Some(Idle {
334 idle_at: entry.idle_at,
335 value,
336 });
337 }
338
339 None
340 }
341}
342
343impl<T: Poolable, K: Key> PoolInner<T, K> {
344 fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
345 if value.can_share() && self.idle.contains_key(&key) {
346 trace!("put; existing idle HTTP/2 connection for {:?}", key);
347 return;
348 }
349 trace!("put; add idle connection for {:?}", key);
350 let mut remove_waiters = false;
351 let mut value = Some(value);
352 if let Some(waiters) = self.waiters.get_mut(&key) {
353 while let Some(tx) = waiters.pop_front() {
354 if !tx.is_canceled() {
355 let reserved = value.take().expect("value already sent");
356 let reserved = match reserved.reserve() {
357 #[cfg(feature = "http2")]
358 Reservation::Shared(to_keep, to_send) => {
359 value = Some(to_keep);
360 to_send
361 }
362 Reservation::Unique(uniq) => uniq,
363 };
364 match tx.send(reserved) {
365 Ok(()) => {
366 if value.is_none() {
367 break;
368 } else {
369 continue;
370 }
371 }
372 Err(e) => {
373 value = Some(e);
374 }
375 }
376 }
377
378 trace!("put; removing canceled waiter for {:?}", key);
379 }
380 remove_waiters = waiters.is_empty();
381 }
382 if remove_waiters {
383 self.waiters.remove(&key);
384 }
385
386 match value {
387 Some(value) => {
388 {
390 let idle_list = self.idle.entry(key.clone()).or_default();
391 if self.max_idle_per_host <= idle_list.len() {
392 trace!("max idle per host for {:?}, dropping connection", key);
393 return;
394 }
395
396 debug!("pooling idle connection for {:?}", key);
397 idle_list.push(Idle {
398 value,
399 idle_at: Instant::now(),
400 });
401 }
402
403 self.spawn_idle_interval(__pool_ref);
404 }
405 None => trace!("put; found waiter for {:?}", key),
406 }
407 }
408
409 fn connected(&mut self, key: &K) {
412 let existed = self.connecting.remove(key);
413 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
414 self.waiters.remove(key);
418 }
419
420 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
421 if self.idle_interval_ref.is_some() {
422 return;
423 }
424 let dur = if let Some(dur) = self.timeout {
425 dur
426 } else {
427 return;
428 };
429 let timer = if let Some(timer) = self.timer.clone() {
430 timer
431 } else {
432 return;
433 };
434 let (tx, rx) = oneshot::channel();
435 self.idle_interval_ref = Some(tx);
436
437 let interval = IdleTask {
438 timer: timer.clone(),
439 duration: dur,
440 deadline: Instant::now(),
441 fut: timer.sleep_until(Instant::now()), pool: WeakOpt::downgrade(pool_ref),
443 pool_drop_notifier: rx,
444 };
445
446 self.exec.execute(interval);
447 }
448}
449
450impl<T, K: Eq + Hash> PoolInner<T, K> {
451 fn clean_waiters(&mut self, key: &K) {
456 let mut remove_waiters = false;
457 if let Some(waiters) = self.waiters.get_mut(key) {
458 waiters.retain(|tx| !tx.is_canceled());
459 remove_waiters = waiters.is_empty();
460 }
461 if remove_waiters {
462 self.waiters.remove(key);
463 }
464 }
465}
466
467impl<T: Poolable, K: Key> PoolInner<T, K> {
468 fn clear_expired(&mut self) {
470 let dur = self.timeout.expect("interval assumes timeout");
471
472 let now = Instant::now();
473 self.idle.retain(|key, values| {
476 values.retain(|entry| {
477 if !entry.value.is_open() {
478 trace!("idle interval evicting closed for {:?}", key);
479 return false;
480 }
481
482 if now.saturating_duration_since(entry.idle_at) > dur {
484 trace!("idle interval evicting expired for {:?}", key);
485 return false;
486 }
487
488 true
490 });
491
492 !values.is_empty()
494 });
495 }
496}
497
498impl<T, K: Key> Clone for Pool<T, K> {
499 fn clone(&self) -> Pool<T, K> {
500 Pool {
501 inner: self.inner.clone(),
502 }
503 }
504}
505
506pub struct Pooled<T: Poolable, K: Key> {
509 value: Option<T>,
510 is_reused: bool,
511 key: K,
512 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
513}
514
515impl<T: Poolable, K: Key> Pooled<T, K> {
516 pub fn is_reused(&self) -> bool {
517 self.is_reused
518 }
519
520 pub fn is_pool_enabled(&self) -> bool {
521 self.pool.0.is_some()
522 }
523
524 fn as_ref(&self) -> &T {
525 self.value.as_ref().expect("not dropped")
526 }
527
528 fn as_mut(&mut self) -> &mut T {
529 self.value.as_mut().expect("not dropped")
530 }
531}
532
533impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
534 type Target = T;
535 fn deref(&self) -> &T {
536 self.as_ref()
537 }
538}
539
540impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
541 fn deref_mut(&mut self) -> &mut T {
542 self.as_mut()
543 }
544}
545
546impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
547 fn drop(&mut self) {
548 if let Some(value) = self.value.take() {
549 if !value.is_open() {
550 return;
553 }
554
555 if let Some(pool) = self.pool.upgrade() {
556 if let Ok(mut inner) = pool.lock() {
557 inner.put(self.key.clone(), value, &pool);
558 }
559 } else if !value.can_share() {
560 trace!("pool dropped, dropping pooled ({:?})", self.key);
561 }
562 }
565 }
566}
567
568impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 f.debug_struct("Pooled").field("key", &self.key).finish()
571 }
572}
573
574struct Idle<T> {
575 idle_at: Instant,
576 value: T,
577}
578
579#[allow(missing_debug_implementations)]
581pub struct Checkout<T, K: Key> {
582 key: K,
583 pool: Pool<T, K>,
584 waiter: Option<oneshot::Receiver<T>>,
585}
586
587#[derive(Debug)]
588#[non_exhaustive]
589pub enum Error {
590 PoolDisabled,
591 CheckoutNoLongerWanted,
592 CheckedOutClosedValue,
593}
594
595impl Error {
596 pub(super) fn is_canceled(&self) -> bool {
597 matches!(self, Error::CheckedOutClosedValue)
598 }
599}
600
601impl fmt::Display for Error {
602 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603 f.write_str(match self {
604 Error::PoolDisabled => "pool is disabled",
605 Error::CheckedOutClosedValue => "checked out connection was closed",
606 Error::CheckoutNoLongerWanted => "request was canceled",
607 })
608 }
609}
610
611impl StdError for Error {}
612
613impl<T: Poolable, K: Key> Checkout<T, K> {
614 fn poll_waiter(
615 &mut self,
616 cx: &mut task::Context<'_>,
617 ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
618 if let Some(mut rx) = self.waiter.take() {
619 match Pin::new(&mut rx).poll(cx) {
620 Poll::Ready(Ok(value)) => {
621 if value.is_open() {
622 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
623 } else {
624 Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
625 }
626 }
627 Poll::Pending => {
628 self.waiter = Some(rx);
629 Poll::Pending
630 }
631 Poll::Ready(Err(_canceled)) => {
632 Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
633 }
634 }
635 } else {
636 Poll::Ready(None)
637 }
638 }
639
640 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
641 let entry = {
642 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
643 let expiration = Expiration::new(inner.timeout);
644 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
645 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
646 {
649 let popper = IdlePopper {
650 key: &self.key,
651 list,
652 };
653 popper.pop(&expiration)
654 }
655 .map(|e| (e, list.is_empty()))
656 });
657
658 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
659 (Some(e), empty)
660 } else {
661 (None, true)
663 };
664 if empty {
665 inner.idle.remove(&self.key);
667 }
668
669 if entry.is_none() && self.waiter.is_none() {
670 let (tx, mut rx) = oneshot::channel();
671 trace!("checkout waiting for idle connection: {:?}", self.key);
672 inner
673 .waiters
674 .entry(self.key.clone())
675 .or_insert_with(VecDeque::new)
676 .push_back(tx);
677
678 assert!(Pin::new(&mut rx).poll(cx).is_pending());
680 self.waiter = Some(rx);
681 }
682
683 entry
684 };
685
686 entry.map(|e| self.pool.reuse(&self.key, e.value))
687 }
688}
689
690impl<T: Poolable, K: Key> Future for Checkout<T, K> {
691 type Output = Result<Pooled<T, K>, Error>;
692
693 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
694 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
695 return Poll::Ready(Ok(pooled));
696 }
697
698 if let Some(pooled) = self.checkout(cx) {
699 Poll::Ready(Ok(pooled))
700 } else if !self.pool.is_enabled() {
701 Poll::Ready(Err(Error::PoolDisabled))
702 } else {
703 debug_assert!(self.waiter.is_some());
705 Poll::Pending
706 }
707 }
708}
709
710impl<T, K: Key> Drop for Checkout<T, K> {
711 fn drop(&mut self) {
712 if self.waiter.take().is_some() {
713 trace!("checkout dropped for {:?}", self.key);
714 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
715 inner.clean_waiters(&self.key);
716 }
717 }
718 }
719}
720
721#[allow(missing_debug_implementations)]
723pub struct Connecting<T: Poolable, K: Key> {
724 key: K,
725 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
726}
727
728impl<T: Poolable, K: Key> Connecting<T, K> {
729 pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
730 debug_assert!(
731 self.pool.0.is_none(),
732 "Connecting::alpn_h2 but already Http2"
733 );
734
735 pool.connecting(&self.key, Ver::Http2)
736 }
737}
738
739impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
740 fn drop(&mut self) {
741 if let Some(pool) = self.pool.upgrade() {
742 if let Ok(mut inner) = pool.lock() {
744 inner.connected(&self.key);
745 }
746 }
747 }
748}
749
750struct Expiration(Option<Duration>);
751
752impl Expiration {
753 fn new(dur: Option<Duration>) -> Expiration {
754 Expiration(dur)
755 }
756
757 fn expires(&self, instant: Instant) -> bool {
758 match self.0 {
759 Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
761 None => false,
762 }
763 }
764}
765
766pin_project_lite::pin_project! {
767 struct IdleTask<T, K: Key> {
768 timer: Timer,
769 duration: Duration,
770 deadline: Instant,
771 fut: Pin<Box<dyn Sleep>>,
772 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
773 #[pin]
777 pool_drop_notifier: oneshot::Receiver<Infallible>,
778 }
779}
780
781impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
782 type Output = ();
783
784 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
785 let mut this = self.project();
786 loop {
787 match this.pool_drop_notifier.as_mut().poll(cx) {
788 Poll::Ready(Ok(n)) => match n {},
789 Poll::Pending => (),
790 Poll::Ready(Err(_canceled)) => {
791 trace!("pool closed, canceling idle interval");
792 return Poll::Ready(());
793 }
794 }
795
796 ready!(Pin::new(&mut this.fut).poll(cx));
797 *this.deadline += *this.duration;
801 if *this.deadline < Instant::now() - Duration::from_millis(5) {
802 *this.deadline = Instant::now() + *this.duration;
803 }
804 *this.fut = this.timer.sleep_until(*this.deadline);
805
806 if let Some(inner) = this.pool.upgrade() {
807 if let Ok(mut inner) = inner.lock() {
808 trace!("idle interval checking for expired");
809 inner.clear_expired();
810 continue;
811 }
812 }
813 return Poll::Ready(());
814 }
815 }
816}
817
818impl<T> WeakOpt<T> {
819 fn none() -> Self {
820 WeakOpt(None)
821 }
822
823 fn downgrade(arc: &Arc<T>) -> Self {
824 WeakOpt(Some(Arc::downgrade(arc)))
825 }
826
827 fn upgrade(&self) -> Option<Arc<T>> {
828 self.0.as_ref().and_then(Weak::upgrade)
829 }
830}
831
832#[cfg(all(test, not(miri)))]
833mod tests {
834 use std::fmt::Debug;
835 use std::future::Future;
836 use std::hash::Hash;
837 use std::pin::Pin;
838 use std::task::{self, Poll};
839 use std::time::Duration;
840
841 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
842 use crate::rt::{TokioExecutor, TokioTimer};
843
844 use crate::common::timer;
845
846 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
847 struct KeyImpl(http::uri::Scheme, http::uri::Authority);
848
849 type KeyTuple = (http::uri::Scheme, http::uri::Authority);
850
851 #[derive(Debug, PartialEq, Eq)]
853 struct Uniq<T>(T);
854
855 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
856 fn is_open(&self) -> bool {
857 true
858 }
859
860 fn reserve(self) -> Reservation<Self> {
861 Reservation::Unique(self)
862 }
863
864 fn can_share(&self) -> bool {
865 false
866 }
867 }
868
869 fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
870 Connecting {
871 key,
872 pool: WeakOpt::none(),
873 }
874 }
875
876 fn host_key(s: &str) -> KeyImpl {
877 KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
878 }
879
880 fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
881 pool_max_idle_no_timer(::std::usize::MAX)
882 }
883
884 fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
885 let pool = Pool::new(
886 super::Config {
887 idle_timeout: Some(Duration::from_millis(100)),
888 max_idle_per_host: max_idle,
889 },
890 TokioExecutor::new(),
891 Option::<timer::Timer>::None,
892 );
893 pool.no_timer();
894 pool
895 }
896
897 #[tokio::test]
898 async fn test_pool_checkout_smoke() {
899 let pool = pool_no_timer();
900 let key = host_key("foo");
901 let pooled = pool.pooled(c(key.clone()), Uniq(41));
902
903 drop(pooled);
904
905 match pool.checkout(key).await {
906 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
907 Err(_) => panic!("not ready"),
908 };
909 }
910
911 struct PollOnce<'a, F>(&'a mut F);
913
914 impl<F, T, U> Future for PollOnce<'_, F>
915 where
916 F: Future<Output = Result<T, U>> + Unpin,
917 {
918 type Output = Option<()>;
919
920 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
921 match Pin::new(&mut self.0).poll(cx) {
922 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
923 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
924 Poll::Pending => Poll::Ready(None),
925 }
926 }
927 }
928
929 #[tokio::test]
930 async fn test_pool_checkout_returns_none_if_expired() {
931 let pool = pool_no_timer();
932 let key = host_key("foo");
933 let pooled = pool.pooled(c(key.clone()), Uniq(41));
934
935 drop(pooled);
936 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
937 let mut checkout = pool.checkout(key);
938 let poll_once = PollOnce(&mut checkout);
939 let is_not_ready = poll_once.await.is_none();
940 assert!(is_not_ready);
941 }
942
943 #[tokio::test]
944 async fn test_pool_checkout_removes_expired() {
945 let pool = pool_no_timer();
946 let key = host_key("foo");
947
948 pool.pooled(c(key.clone()), Uniq(41));
949 pool.pooled(c(key.clone()), Uniq(5));
950 pool.pooled(c(key.clone()), Uniq(99));
951
952 assert_eq!(
953 pool.locked().idle.get(&key).map(|entries| entries.len()),
954 Some(3)
955 );
956 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
957
958 let mut checkout = pool.checkout(key.clone());
959 let poll_once = PollOnce(&mut checkout);
960 poll_once.await;
962 assert!(pool.locked().idle.get(&key).is_none());
963 }
964
965 #[test]
966 fn test_pool_max_idle_per_host() {
967 let pool = pool_max_idle_no_timer(2);
968 let key = host_key("foo");
969
970 pool.pooled(c(key.clone()), Uniq(41));
971 pool.pooled(c(key.clone()), Uniq(5));
972 pool.pooled(c(key.clone()), Uniq(99));
973
974 assert_eq!(
976 pool.locked().idle.get(&key).map(|entries| entries.len()),
977 Some(2)
978 );
979 }
980
981 #[tokio::test]
982 async fn test_pool_timer_removes_expired() {
983 let pool = Pool::new(
984 super::Config {
985 idle_timeout: Some(Duration::from_millis(10)),
986 max_idle_per_host: std::usize::MAX,
987 },
988 TokioExecutor::new(),
989 Some(TokioTimer::new()),
990 );
991
992 let key = host_key("foo");
993
994 pool.pooled(c(key.clone()), Uniq(41));
995 pool.pooled(c(key.clone()), Uniq(5));
996 pool.pooled(c(key.clone()), Uniq(99));
997
998 assert_eq!(
999 pool.locked().idle.get(&key).map(|entries| entries.len()),
1000 Some(3)
1001 );
1002
1003 tokio::time::sleep(Duration::from_millis(30)).await;
1005 tokio::task::yield_now().await;
1007
1008 assert!(pool.locked().idle.get(&key).is_none());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_pool_checkout_task_unparked() {
1013 use futures_util::future::join;
1014 use futures_util::FutureExt;
1015
1016 let pool = pool_no_timer();
1017 let key = host_key("foo");
1018 let pooled = pool.pooled(c(key.clone()), Uniq(41));
1019
1020 let checkout = join(pool.checkout(key), async {
1021 drop(pooled);
1027 })
1028 .map(|(entry, _)| entry);
1029
1030 assert_eq!(*checkout.await.unwrap(), Uniq(41));
1031 }
1032
1033 #[tokio::test]
1034 async fn test_pool_checkout_drop_cleans_up_waiters() {
1035 let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1036 let key = host_key("foo");
1037
1038 let mut checkout1 = pool.checkout(key.clone());
1039 let mut checkout2 = pool.checkout(key.clone());
1040
1041 let poll_once1 = PollOnce(&mut checkout1);
1042 let poll_once2 = PollOnce(&mut checkout2);
1043
1044 poll_once1.await;
1046 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1047 poll_once2.await;
1048 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1049
1050 drop(checkout1);
1052 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1053
1054 drop(checkout2);
1055 assert!(pool.locked().waiters.get(&key).is_none());
1056 }
1057
1058 #[derive(Debug)]
1059 struct CanClose {
1060 #[allow(unused)]
1061 val: i32,
1062 closed: bool,
1063 }
1064
1065 impl Poolable for CanClose {
1066 fn is_open(&self) -> bool {
1067 !self.closed
1068 }
1069
1070 fn reserve(self) -> Reservation<Self> {
1071 Reservation::Unique(self)
1072 }
1073
1074 fn can_share(&self) -> bool {
1075 false
1076 }
1077 }
1078
1079 #[test]
1080 fn pooled_drop_if_closed_doesnt_reinsert() {
1081 let pool = pool_no_timer();
1082 let key = host_key("foo");
1083 pool.pooled(
1084 c(key.clone()),
1085 CanClose {
1086 val: 57,
1087 closed: true,
1088 },
1089 );
1090
1091 assert!(!pool.locked().idle.contains_key(&key));
1092 }
1093}