1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_core::ready;
18use tracing::{debug, trace};
19
20use hyper::rt::Timer as _;
21
22use crate::common::{exec, exec::Exec, timer::Timer};
23
24#[allow(missing_debug_implementations)]
26pub struct Pool<T, K: Key> {
27 inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
29}
30
31pub trait Poolable: Unpin + Send + Sized + 'static {
37 fn is_open(&self) -> bool;
38 fn reserve(self) -> Reservation<Self>;
42 fn can_share(&self) -> bool;
43}
44
45pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
46
47impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
51#[allow(dead_code)]
52pub enum Ver {
53 Auto,
54 Http2,
55}
56
57#[allow(missing_debug_implementations)]
64pub enum Reservation<T> {
65 #[cfg(feature = "http2")]
69 Shared(T, T),
70 Unique(T),
73}
74
75struct PoolInner<T, K: Eq + Hash> {
79 connecting: HashSet<K>,
83 idle: HashMap<K, Vec<Idle<T>>>,
86 max_idle_per_host: usize,
87 waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
97 idle_interval_ref: Option<oneshot::Sender<Infallible>>,
100 exec: Exec,
101 timer: Option<Timer>,
102 timeout: Option<Duration>,
103}
104
105struct WeakOpt<T>(Option<Weak<T>>);
108
109#[derive(Clone, Copy, Debug)]
110pub struct Config {
111 pub idle_timeout: Option<Duration>,
112 pub max_idle_per_host: usize,
113}
114
115impl Config {
116 pub fn is_enabled(&self) -> bool {
117 self.max_idle_per_host > 0
118 }
119}
120
121impl<T, K: Key> Pool<T, K> {
122 pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
123 where
124 E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
125 M: hyper::rt::Timer + Send + Sync + Clone + 'static,
126 {
127 let exec = Exec::new(executor);
128 let timer = timer.map(|t| Timer::new(t));
129 let inner = if config.is_enabled() {
130 Some(Arc::new(Mutex::new(PoolInner {
131 connecting: HashSet::new(),
132 idle: HashMap::new(),
133 idle_interval_ref: None,
134 max_idle_per_host: config.max_idle_per_host,
135 waiters: HashMap::new(),
136 exec,
137 timer,
138 timeout: config.idle_timeout,
139 })))
140 } else {
141 None
142 };
143
144 Pool { inner }
145 }
146
147 pub(crate) fn is_enabled(&self) -> bool {
148 self.inner.is_some()
149 }
150
151 #[cfg(test)]
152 pub(super) fn no_timer(&self) {
153 {
155 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
156 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
157 let (tx, _) = oneshot::channel();
158 inner.idle_interval_ref = Some(tx);
159 }
160 }
161}
162
163impl<T: Poolable, K: Key> Pool<T, K> {
164 pub fn checkout(&self, key: K) -> Checkout<T, K> {
167 Checkout {
168 key,
169 pool: self.clone(),
170 waiter: None,
171 }
172 }
173
174 pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
177 if ver == Ver::Http2 {
178 if let Some(ref enabled) = self.inner {
179 let mut inner = enabled.lock().unwrap();
180 return if inner.connecting.insert(key.clone()) {
181 let connecting = Connecting {
182 key: key.clone(),
183 pool: WeakOpt::downgrade(enabled),
184 };
185 Some(connecting)
186 } else {
187 trace!("HTTP/2 connecting already in progress for {:?}", key);
188 None
189 };
190 }
191 }
192
193 Some(Connecting {
195 key: key.clone(),
196 pool: WeakOpt::none(),
199 })
200 }
201
202 #[cfg(test)]
203 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
204 self.inner.as_ref().expect("enabled").lock().expect("lock")
205 }
206
207 pub fn pooled(
225 &self,
226 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
227 value: T,
228 ) -> Pooled<T, K> {
229 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
230 match value.reserve() {
231 #[cfg(feature = "http2")]
232 Reservation::Shared(to_insert, to_return) => {
233 let mut inner = enabled.lock().unwrap();
234 inner.put(connecting.key.clone(), to_insert, enabled);
235 inner.connected(&connecting.key);
238 connecting.pool = WeakOpt::none();
240
241 (to_return, WeakOpt::none())
244 }
245 Reservation::Unique(value) => {
246 (value, WeakOpt::downgrade(enabled))
250 }
251 }
252 } else {
253 debug_assert!(connecting.pool.upgrade().is_none());
257
258 (value, WeakOpt::none())
259 };
260 Pooled {
261 key: connecting.key.clone(),
262 is_reused: false,
263 pool: pool_ref,
264 value: Some(value),
265 }
266 }
267
268 fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
269 debug!("reuse idle connection for {:?}", key);
270 let mut pool_ref = WeakOpt::none();
279 if !value.can_share() {
280 if let Some(ref enabled) = self.inner {
281 pool_ref = WeakOpt::downgrade(enabled);
282 }
283 }
284
285 Pooled {
286 is_reused: true,
287 key: key.clone(),
288 pool: pool_ref,
289 value: Some(value),
290 }
291 }
292}
293
294struct IdlePopper<'a, T, K> {
296 key: &'a K,
297 list: &'a mut Vec<Idle<T>>,
298}
299
300impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
301 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
302 while let Some(entry) = self.list.pop() {
303 if !entry.value.is_open() {
306 trace!("removing closed connection for {:?}", self.key);
307 continue;
308 }
309 if expiration.expires(entry.idle_at) {
316 trace!("removing expired connection for {:?}", self.key);
317 continue;
318 }
319
320 let value = match entry.value.reserve() {
321 #[cfg(feature = "http2")]
322 Reservation::Shared(to_reinsert, to_checkout) => {
323 self.list.push(Idle {
324 idle_at: Instant::now(),
325 value: to_reinsert,
326 });
327 to_checkout
328 }
329 Reservation::Unique(unique) => unique,
330 };
331
332 return Some(Idle {
333 idle_at: entry.idle_at,
334 value,
335 });
336 }
337
338 None
339 }
340}
341
342impl<T: Poolable, K: Key> PoolInner<T, K> {
343 fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
344 if value.can_share() && self.idle.contains_key(&key) {
345 trace!("put; existing idle HTTP/2 connection for {:?}", key);
346 return;
347 }
348 trace!("put; add idle connection for {:?}", key);
349 let mut remove_waiters = false;
350 let mut value = Some(value);
351 if let Some(waiters) = self.waiters.get_mut(&key) {
352 while let Some(tx) = waiters.pop_front() {
353 if !tx.is_canceled() {
354 let reserved = value.take().expect("value already sent");
355 let reserved = match reserved.reserve() {
356 #[cfg(feature = "http2")]
357 Reservation::Shared(to_keep, to_send) => {
358 value = Some(to_keep);
359 to_send
360 }
361 Reservation::Unique(uniq) => uniq,
362 };
363 match tx.send(reserved) {
364 Ok(()) => {
365 if value.is_none() {
366 break;
367 } else {
368 continue;
369 }
370 }
371 Err(e) => {
372 value = Some(e);
373 }
374 }
375 }
376
377 trace!("put; removing canceled waiter for {:?}", key);
378 }
379 remove_waiters = waiters.is_empty();
380 }
381 if remove_waiters {
382 self.waiters.remove(&key);
383 }
384
385 match value {
386 Some(value) => {
387 {
389 let idle_list = self.idle.entry(key.clone()).or_default();
390 if self.max_idle_per_host <= idle_list.len() {
391 trace!("max idle per host for {:?}, dropping connection", key);
392 return;
393 }
394
395 debug!("pooling idle connection for {:?}", key);
396 idle_list.push(Idle {
397 value,
398 idle_at: Instant::now(),
399 });
400 }
401
402 self.spawn_idle_interval(__pool_ref);
403 }
404 None => trace!("put; found waiter for {:?}", key),
405 }
406 }
407
408 fn connected(&mut self, key: &K) {
411 let existed = self.connecting.remove(key);
412 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
413 self.waiters.remove(key);
417 }
418
419 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
420 if self.idle_interval_ref.is_some() {
421 return;
422 }
423 let dur = if let Some(dur) = self.timeout {
424 dur
425 } else {
426 return;
427 };
428 if dur == Duration::ZERO {
429 return;
430 }
431 let timer = if let Some(timer) = self.timer.clone() {
432 timer
433 } else {
434 return;
435 };
436
437 const MIN_CHECK: Duration = Duration::from_millis(90);
441
442 let dur = dur.max(MIN_CHECK);
443
444 let (tx, rx) = oneshot::channel();
445 self.idle_interval_ref = Some(tx);
446
447 let interval = IdleTask {
448 timer: timer.clone(),
449 duration: dur,
450 pool: WeakOpt::downgrade(pool_ref),
451 pool_drop_notifier: rx,
452 };
453
454 self.exec.execute(interval.run());
455 }
456}
457
458impl<T, K: Eq + Hash> PoolInner<T, K> {
459 fn clean_waiters(&mut self, key: &K) {
464 let mut remove_waiters = false;
465 if let Some(waiters) = self.waiters.get_mut(key) {
466 waiters.retain(|tx| !tx.is_canceled());
467 remove_waiters = waiters.is_empty();
468 }
469 if remove_waiters {
470 self.waiters.remove(key);
471 }
472 }
473}
474
475impl<T: Poolable, K: Key> PoolInner<T, K> {
476 fn clear_expired(&mut self) {
478 let dur = self.timeout.expect("interval assumes timeout");
479
480 let now = Instant::now();
481 self.idle.retain(|key, values| {
484 values.retain(|entry| {
485 if !entry.value.is_open() {
486 trace!("idle interval evicting closed for {:?}", key);
487 return false;
488 }
489
490 if now.saturating_duration_since(entry.idle_at) > dur {
492 trace!("idle interval evicting expired for {:?}", key);
493 return false;
494 }
495
496 true
498 });
499
500 !values.is_empty()
502 });
503 }
504}
505
506impl<T, K: Key> Clone for Pool<T, K> {
507 fn clone(&self) -> Pool<T, K> {
508 Pool {
509 inner: self.inner.clone(),
510 }
511 }
512}
513
514pub struct Pooled<T: Poolable, K: Key> {
517 value: Option<T>,
518 is_reused: bool,
519 key: K,
520 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
521}
522
523impl<T: Poolable, K: Key> Pooled<T, K> {
524 pub fn is_reused(&self) -> bool {
525 self.is_reused
526 }
527
528 pub fn is_pool_enabled(&self) -> bool {
529 self.pool.0.is_some()
530 }
531
532 fn as_ref(&self) -> &T {
533 self.value.as_ref().expect("not dropped")
534 }
535
536 fn as_mut(&mut self) -> &mut T {
537 self.value.as_mut().expect("not dropped")
538 }
539}
540
541impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
542 type Target = T;
543 fn deref(&self) -> &T {
544 self.as_ref()
545 }
546}
547
548impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
549 fn deref_mut(&mut self) -> &mut T {
550 self.as_mut()
551 }
552}
553
554impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
555 fn drop(&mut self) {
556 if let Some(value) = self.value.take() {
557 if !value.is_open() {
558 return;
561 }
562
563 if let Some(pool) = self.pool.upgrade() {
564 if let Ok(mut inner) = pool.lock() {
565 inner.put(self.key.clone(), value, &pool);
566 }
567 } else if !value.can_share() {
568 trace!("pool dropped, dropping pooled ({:?})", self.key);
569 }
570 }
573 }
574}
575
576impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 f.debug_struct("Pooled").field("key", &self.key).finish()
579 }
580}
581
582struct Idle<T> {
583 idle_at: Instant,
584 value: T,
585}
586
587#[allow(missing_debug_implementations)]
589pub struct Checkout<T, K: Key> {
590 key: K,
591 pool: Pool<T, K>,
592 waiter: Option<oneshot::Receiver<T>>,
593}
594
595#[derive(Debug)]
596#[non_exhaustive]
597pub enum Error {
598 PoolDisabled,
599 CheckoutNoLongerWanted,
600 CheckedOutClosedValue,
601}
602
603impl Error {
604 pub(super) fn is_canceled(&self) -> bool {
605 matches!(self, Error::CheckedOutClosedValue)
606 }
607}
608
609impl fmt::Display for Error {
610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611 f.write_str(match self {
612 Error::PoolDisabled => "pool is disabled",
613 Error::CheckedOutClosedValue => "checked out connection was closed",
614 Error::CheckoutNoLongerWanted => "request was canceled",
615 })
616 }
617}
618
619impl StdError for Error {}
620
621impl<T: Poolable, K: Key> Checkout<T, K> {
622 fn poll_waiter(
623 &mut self,
624 cx: &mut task::Context<'_>,
625 ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
626 if let Some(mut rx) = self.waiter.take() {
627 match Pin::new(&mut rx).poll(cx) {
628 Poll::Ready(Ok(value)) => {
629 if value.is_open() {
630 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
631 } else {
632 Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
633 }
634 }
635 Poll::Pending => {
636 self.waiter = Some(rx);
637 Poll::Pending
638 }
639 Poll::Ready(Err(_canceled)) => {
640 Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
641 }
642 }
643 } else {
644 Poll::Ready(None)
645 }
646 }
647
648 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
649 let entry = {
650 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
651 let expiration = Expiration::new(inner.timeout);
652 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
653 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
654 {
657 let popper = IdlePopper {
658 key: &self.key,
659 list,
660 };
661 popper.pop(&expiration)
662 }
663 .map(|e| (e, list.is_empty()))
664 });
665
666 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
667 (Some(e), empty)
668 } else {
669 (None, true)
671 };
672 if empty {
673 inner.idle.remove(&self.key);
675 }
676
677 if entry.is_none() && self.waiter.is_none() {
678 let (tx, mut rx) = oneshot::channel();
679 trace!("checkout waiting for idle connection: {:?}", self.key);
680 inner
681 .waiters
682 .entry(self.key.clone())
683 .or_insert_with(VecDeque::new)
684 .push_back(tx);
685
686 assert!(Pin::new(&mut rx).poll(cx).is_pending());
688 self.waiter = Some(rx);
689 }
690
691 entry
692 };
693
694 entry.map(|e| self.pool.reuse(&self.key, e.value))
695 }
696}
697
698impl<T: Poolable, K: Key> Future for Checkout<T, K> {
699 type Output = Result<Pooled<T, K>, Error>;
700
701 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
702 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
703 return Poll::Ready(Ok(pooled));
704 }
705
706 if let Some(pooled) = self.checkout(cx) {
707 Poll::Ready(Ok(pooled))
708 } else if !self.pool.is_enabled() {
709 Poll::Ready(Err(Error::PoolDisabled))
710 } else {
711 debug_assert!(self.waiter.is_some());
713 Poll::Pending
714 }
715 }
716}
717
718impl<T, K: Key> Drop for Checkout<T, K> {
719 fn drop(&mut self) {
720 if self.waiter.take().is_some() {
721 trace!("checkout dropped for {:?}", self.key);
722 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
723 inner.clean_waiters(&self.key);
724 }
725 }
726 }
727}
728
729#[allow(missing_debug_implementations)]
731pub struct Connecting<T: Poolable, K: Key> {
732 key: K,
733 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
734}
735
736impl<T: Poolable, K: Key> Connecting<T, K> {
737 pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
738 debug_assert!(
739 self.pool.0.is_none(),
740 "Connecting::alpn_h2 but already Http2"
741 );
742
743 pool.connecting(&self.key, Ver::Http2)
744 }
745}
746
747impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
748 fn drop(&mut self) {
749 if let Some(pool) = self.pool.upgrade() {
750 if let Ok(mut inner) = pool.lock() {
752 inner.connected(&self.key);
753 }
754 }
755 }
756}
757
758struct Expiration(Option<Duration>);
759
760impl Expiration {
761 fn new(dur: Option<Duration>) -> Expiration {
762 Expiration(dur)
763 }
764
765 fn expires(&self, instant: Instant) -> bool {
766 match self.0 {
767 Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
769 None => false,
770 }
771 }
772}
773
774struct IdleTask<T, K: Key> {
775 timer: Timer,
776 duration: Duration,
777 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
778 pool_drop_notifier: oneshot::Receiver<Infallible>,
782}
783
784impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
785 async fn run(self) {
786 use futures_util::future;
787
788 let mut sleep = self.timer.sleep_until(Instant::now() + self.duration);
789 let mut on_pool_drop = self.pool_drop_notifier;
790 loop {
791 match future::select(&mut on_pool_drop, &mut sleep).await {
792 future::Either::Left(_) => {
793 break;
795 }
796 future::Either::Right(((), _)) => {
797 if let Some(inner) = self.pool.upgrade() {
798 if let Ok(mut inner) = inner.lock() {
799 trace!("idle interval checking for expired");
800 inner.clear_expired();
801 }
802 }
803
804 let deadline = Instant::now() + self.duration;
805 self.timer.reset(&mut sleep, deadline);
806 }
807 }
808 }
809
810 trace!("pool closed, canceling idle interval");
811 return;
812 }
813}
814
815impl<T> WeakOpt<T> {
816 fn none() -> Self {
817 WeakOpt(None)
818 }
819
820 fn downgrade(arc: &Arc<T>) -> Self {
821 WeakOpt(Some(Arc::downgrade(arc)))
822 }
823
824 fn upgrade(&self) -> Option<Arc<T>> {
825 self.0.as_ref().and_then(Weak::upgrade)
826 }
827}
828
829#[cfg(all(test, not(miri)))]
830mod tests {
831 use std::fmt::Debug;
832 use std::future::Future;
833 use std::hash::Hash;
834 use std::pin::Pin;
835 use std::task::{self, Poll};
836 use std::time::Duration;
837
838 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
839 use crate::rt::{TokioExecutor, TokioTimer};
840
841 use crate::common::timer;
842
843 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
844 struct KeyImpl(http::uri::Scheme, http::uri::Authority);
845
846 type KeyTuple = (http::uri::Scheme, http::uri::Authority);
847
848 #[derive(Debug, PartialEq, Eq)]
850 struct Uniq<T>(T);
851
852 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
853 fn is_open(&self) -> bool {
854 true
855 }
856
857 fn reserve(self) -> Reservation<Self> {
858 Reservation::Unique(self)
859 }
860
861 fn can_share(&self) -> bool {
862 false
863 }
864 }
865
866 fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
867 Connecting {
868 key,
869 pool: WeakOpt::none(),
870 }
871 }
872
873 fn host_key(s: &str) -> KeyImpl {
874 KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
875 }
876
877 fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
878 pool_max_idle_no_timer(usize::MAX)
879 }
880
881 fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
882 let pool = Pool::new(
883 super::Config {
884 idle_timeout: Some(Duration::from_millis(100)),
885 max_idle_per_host: max_idle,
886 },
887 TokioExecutor::new(),
888 Option::<timer::Timer>::None,
889 );
890 pool.no_timer();
891 pool
892 }
893
894 #[tokio::test]
895 async fn test_pool_checkout_smoke() {
896 let pool = pool_no_timer();
897 let key = host_key("foo");
898 let pooled = pool.pooled(c(key.clone()), Uniq(41));
899
900 drop(pooled);
901
902 match pool.checkout(key).await {
903 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
904 Err(_) => panic!("not ready"),
905 };
906 }
907
908 struct PollOnce<'a, F>(&'a mut F);
910
911 impl<F, T, U> Future for PollOnce<'_, F>
912 where
913 F: Future<Output = Result<T, U>> + Unpin,
914 {
915 type Output = Option<()>;
916
917 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
918 match Pin::new(&mut self.0).poll(cx) {
919 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
920 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
921 Poll::Pending => Poll::Ready(None),
922 }
923 }
924 }
925
926 #[tokio::test]
927 async fn test_pool_checkout_returns_none_if_expired() {
928 let pool = pool_no_timer();
929 let key = host_key("foo");
930 let pooled = pool.pooled(c(key.clone()), Uniq(41));
931
932 drop(pooled);
933 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
934 let mut checkout = pool.checkout(key);
935 let poll_once = PollOnce(&mut checkout);
936 let is_not_ready = poll_once.await.is_none();
937 assert!(is_not_ready);
938 }
939
940 #[tokio::test]
941 async fn test_pool_checkout_removes_expired() {
942 let pool = pool_no_timer();
943 let key = host_key("foo");
944
945 pool.pooled(c(key.clone()), Uniq(41));
946 pool.pooled(c(key.clone()), Uniq(5));
947 pool.pooled(c(key.clone()), Uniq(99));
948
949 assert_eq!(
950 pool.locked().idle.get(&key).map(|entries| entries.len()),
951 Some(3)
952 );
953 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
954
955 let mut checkout = pool.checkout(key.clone());
956 let poll_once = PollOnce(&mut checkout);
957 poll_once.await;
959 assert!(!pool.locked().idle.contains_key(&key));
960 }
961
962 #[test]
963 fn test_pool_max_idle_per_host() {
964 let pool = pool_max_idle_no_timer(2);
965 let key = host_key("foo");
966
967 pool.pooled(c(key.clone()), Uniq(41));
968 pool.pooled(c(key.clone()), Uniq(5));
969 pool.pooled(c(key.clone()), Uniq(99));
970
971 assert_eq!(
973 pool.locked().idle.get(&key).map(|entries| entries.len()),
974 Some(2)
975 );
976 }
977
978 #[tokio::test]
979 async fn test_pool_timer_removes_expired() {
980 let pool = Pool::new(
981 super::Config {
982 idle_timeout: Some(Duration::from_millis(10)),
983 max_idle_per_host: usize::MAX,
984 },
985 TokioExecutor::new(),
986 Some(TokioTimer::new()),
987 );
988
989 let key = host_key("foo");
990
991 pool.pooled(c(key.clone()), Uniq(41));
992 pool.pooled(c(key.clone()), Uniq(5));
993 pool.pooled(c(key.clone()), Uniq(99));
994
995 assert_eq!(
996 pool.locked().idle.get(&key).map(|entries| entries.len()),
997 Some(3)
998 );
999
1000 tokio::time::sleep(Duration::from_millis(30)).await;
1002
1003 assert_eq!(
1005 pool.locked().idle.get(&key).map(|entries| entries.len()),
1006 Some(3)
1007 );
1008
1009 tokio::time::sleep(Duration::from_millis(70)).await;
1011 tokio::task::yield_now().await;
1013
1014 assert!(!pool.locked().idle.contains_key(&key));
1015 }
1016
1017 #[tokio::test]
1018 async fn test_pool_checkout_task_unparked() {
1019 use futures_util::future::join;
1020 use futures_util::FutureExt;
1021
1022 let pool = pool_no_timer();
1023 let key = host_key("foo");
1024 let pooled = pool.pooled(c(key.clone()), Uniq(41));
1025
1026 let checkout = join(pool.checkout(key), async {
1027 drop(pooled);
1033 })
1034 .map(|(entry, _)| entry);
1035
1036 assert_eq!(*checkout.await.unwrap(), Uniq(41));
1037 }
1038
1039 #[tokio::test]
1040 async fn test_pool_checkout_drop_cleans_up_waiters() {
1041 let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1042 let key = host_key("foo");
1043
1044 let mut checkout1 = pool.checkout(key.clone());
1045 let mut checkout2 = pool.checkout(key.clone());
1046
1047 let poll_once1 = PollOnce(&mut checkout1);
1048 let poll_once2 = PollOnce(&mut checkout2);
1049
1050 poll_once1.await;
1052 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1053 poll_once2.await;
1054 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1055
1056 drop(checkout1);
1058 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1059
1060 drop(checkout2);
1061 assert!(!pool.locked().waiters.contains_key(&key));
1062 }
1063
1064 #[derive(Debug)]
1065 struct CanClose {
1066 #[allow(unused)]
1067 val: i32,
1068 closed: bool,
1069 }
1070
1071 impl Poolable for CanClose {
1072 fn is_open(&self) -> bool {
1073 !self.closed
1074 }
1075
1076 fn reserve(self) -> Reservation<Self> {
1077 Reservation::Unique(self)
1078 }
1079
1080 fn can_share(&self) -> bool {
1081 false
1082 }
1083 }
1084
1085 #[test]
1086 fn pooled_drop_if_closed_doesnt_reinsert() {
1087 let pool = pool_no_timer();
1088 let key = host_key("foo");
1089 pool.pooled(
1090 c(key.clone()),
1091 CanClose {
1092 val: 57,
1093 closed: true,
1094 },
1095 );
1096
1097 assert!(!pool.locked().idle.contains_key(&key));
1098 }
1099}