1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![cfg_attr(not(all(feature = "full")), allow(unused))]
49
50#[cfg(all(
51 any(feature = "native-tls", feature = "rustls"),
52 any(feature = "async-std", feature = "tokio")
53))]
54pub(crate) mod impls;
55pub mod task;
56
57mod coarse_time;
58mod compound;
59mod dyn_time;
60pub mod general;
61mod opaque;
62pub mod scheduler;
63mod timer;
64mod traits;
65pub mod unimpl;
66pub mod unix;
67
68#[cfg(any(feature = "async-std", feature = "tokio"))]
69use std::io;
70pub use traits::{
71 Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
72 NoOpStreamOpsHandle, Runtime, SleepProvider, StreamOps, TlsProvider, ToplevelBlockOn,
73 ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
74};
75
76pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
77pub use dyn_time::DynTimeProvider;
78pub use timer::{SleepProviderExt, Timeout, TimeoutError};
79
80pub mod tls {
83 pub use crate::traits::{CertifiedConn, TlsConnector};
84
85 #[cfg(all(feature = "native-tls", any(feature = "tokio", feature = "async-std")))]
86 pub use crate::impls::native_tls::NativeTlsProvider;
87 #[cfg(all(feature = "rustls", any(feature = "tokio", feature = "async-std")))]
88 pub use crate::impls::rustls::RustlsProvider;
89}
90
91#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
92pub mod tokio;
93
94#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
95pub mod async_std;
96
97pub use compound::{CompoundRuntime, RuntimeSubstExt};
98
99#[cfg(all(
100 any(feature = "native-tls", feature = "rustls"),
101 feature = "async-std",
102 not(feature = "tokio")
103))]
104use async_std as preferred_backend_mod;
105#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
106use tokio as preferred_backend_mod;
107
108#[cfg(all(
120 any(feature = "native-tls", feature = "rustls"),
121 any(feature = "async-std", feature = "tokio")
122))]
123#[derive(Clone)]
124pub struct PreferredRuntime {
125 inner: preferred_backend_mod::PreferredRuntime,
127}
128
129#[cfg(all(
130 any(feature = "native-tls", feature = "rustls"),
131 any(feature = "async-std", feature = "tokio")
132))]
133crate::opaque::implement_opaque_runtime! {
134 PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
135}
136
137#[cfg(all(
138 any(feature = "native-tls", feature = "rustls"),
139 any(feature = "async-std", feature = "tokio")
140))]
141impl PreferredRuntime {
142 pub fn current() -> io::Result<Self> {
178 let rt = preferred_backend_mod::PreferredRuntime::current()?;
179
180 Ok(Self { inner: rt })
181 }
182
183 pub fn create() -> io::Result<Self> {
207 let rt = preferred_backend_mod::PreferredRuntime::create()?;
208
209 Ok(Self { inner: rt })
210 }
211
212 #[doc(hidden)]
222 pub fn run_test<P, F, O>(func: P) -> O
223 where
224 P: FnOnce(Self) -> F,
225 F: futures::Future<Output = O>,
226 {
227 let runtime = Self::create().expect("Failed to create runtime");
228 runtime.clone().block_on(func(runtime))
229 }
230}
231
232#[doc(hidden)]
238pub mod testing__ {
239 pub trait TestOutcome {
242 fn check_ok(&self);
244 }
245 impl TestOutcome for () {
246 fn check_ok(&self) {}
247 }
248 impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
249 fn check_ok(&self) {
250 self.as_ref().expect("Test failure");
251 }
252 }
253}
254
255macro_rules! declare_conditional_macro {
258 ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
259 $( #[$meta] )*
260 #[cfg(all(feature=$f1, feature=$f2))]
261 #[macro_export]
262 macro_rules! $name {
263 ($tt:tt) => {
264 $tt
265 };
266 }
267
268 $( #[$meta] )*
269 #[cfg(not(all(feature=$f1, feature=$f2)))]
270 #[macro_export]
271 macro_rules! $name {
272 ($tt:tt) => {};
273 }
274
275 pub use $name;
278 };
279}
280
281#[doc(hidden)]
283pub mod cond {
284 declare_conditional_macro! {
285 #[doc(hidden)]
287 macro if_tokio_native_tls_present = ("tokio", "native-tls")
288 }
289 declare_conditional_macro! {
290 #[doc(hidden)]
292 macro if_tokio_rustls_present = ("tokio", "rustls")
293 }
294 declare_conditional_macro! {
295 #[doc(hidden)]
297 macro if_async_std_native_tls_present = ("async-std", "native-tls")
298 }
299 declare_conditional_macro! {
300 #[doc(hidden)]
302 macro if_async_std_rustls_present = ("async-std", "rustls")
303 }
304}
305
306#[macro_export]
322#[cfg(all(
323 any(feature = "native-tls", feature = "rustls"),
324 any(feature = "tokio", feature = "async-std"),
325))]
326macro_rules! test_with_all_runtimes {
327 ( $fn:expr ) => {{
328 use $crate::cond::*;
329 use $crate::testing__::TestOutcome;
330 if_tokio_native_tls_present! {{
335 $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
336 }}
337 if_tokio_rustls_present! {{
338 $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
339 }}
340 if_async_std_native_tls_present! {{
341 $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
342 }}
343 if_async_std_rustls_present! {{
344 $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
345 }}
346 }};
347}
348
349#[macro_export]
361#[cfg(all(
362 any(feature = "native-tls", feature = "rustls"),
363 any(feature = "tokio", feature = "async-std"),
364))]
365macro_rules! test_with_one_runtime {
366 ( $fn:expr ) => {{
367 $crate::PreferredRuntime::run_test($fn)
368 }};
369}
370
371#[cfg(all(
372 test,
373 any(feature = "native-tls", feature = "rustls"),
374 any(feature = "async-std", feature = "tokio"),
375 not(miri), ))]
377mod test {
378 #![allow(clippy::unwrap_used, clippy::unnecessary_wraps)]
379 use crate::SleepProviderExt;
380 use crate::ToplevelRuntime;
381
382 use crate::traits::*;
383
384 use futures::io::{AsyncReadExt, AsyncWriteExt};
385 use futures::stream::StreamExt;
386 use native_tls_crate as native_tls;
387 use std::io::Result as IoResult;
388 use std::net::SocketAddr;
389 use std::net::{Ipv4Addr, SocketAddrV4};
390 use std::time::{Duration, Instant};
391
392 fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
395 let rt = runtime.clone();
396 runtime.block_on(async {
397 let i1 = Instant::now();
398 let one_msec = Duration::from_millis(1);
399 rt.sleep(one_msec).await;
400 let i2 = Instant::now();
401 assert!(i2 >= i1 + one_msec);
402 });
403 Ok(())
404 }
405
406 fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
408 let rt = runtime.clone();
409 runtime.block_on(async {
410 let one_day = Duration::from_secs(86400);
411 let outcome = rt.timeout(one_day, async { 413_u32 }).await;
412 assert_eq!(outcome, Ok(413));
413 });
414 Ok(())
415 }
416
417 fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
419 use futures::future::pending;
420
421 let rt = runtime.clone();
422 runtime.block_on(async {
423 let one_micros = Duration::from_micros(1);
424 let outcome = rt.timeout(one_micros, pending::<()>()).await;
425 assert_eq!(outcome, Err(crate::TimeoutError));
426 assert_eq!(
427 outcome.err().unwrap().to_string(),
428 "Timeout expired".to_string()
429 );
430 });
431 Ok(())
432 }
433 fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
438 let rt = runtime.clone();
439 runtime.block_on(async {
440 let i1 = Instant::now();
441 let now = runtime.wallclock();
442 let one_millis = Duration::from_millis(1);
443 let one_millis_later = now + one_millis;
444
445 rt.sleep_until_wallclock(one_millis_later).await;
446
447 let i2 = Instant::now();
448 let newtime = runtime.wallclock();
449 assert!(newtime >= one_millis_later);
450 assert!(i2 - i1 >= one_millis);
451 });
452 Ok(())
453 }
454
455 fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
459 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
460 let rt1 = runtime.clone();
461
462 let listener = runtime.block_on(rt1.listen(&(SocketAddr::from(localhost))))?;
463 let addr = listener.local_addr()?;
464
465 runtime.block_on(async {
466 let task1 = async {
467 let mut buf = vec![0_u8; 11];
468 let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
469 con.read_exact(&mut buf[..]).await?;
470 IoResult::Ok(buf)
471 };
472 let task2 = async {
473 let mut con = rt1.connect(&addr).await?;
474 con.write_all(b"Hello world").await?;
475 con.flush().await?;
476 IoResult::Ok(())
477 };
478
479 let (data, send_r) = futures::join!(task1, task2);
480 send_r?;
481
482 assert_eq!(&data?[..], b"Hello world");
483
484 Ok(())
485 })
486 }
487
488 fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
492 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
493 let rt1 = runtime.clone();
494
495 let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
496 let addr1 = socket1.local_addr()?;
497
498 let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
499 let addr2 = socket2.local_addr()?;
500
501 runtime.block_on(async {
502 let task1 = async {
503 let mut buf = [0_u8; 16];
504 let (len, addr) = socket1.recv(&mut buf[..]).await?;
505 IoResult::Ok((buf[..len].to_vec(), addr))
506 };
507 let task2 = async {
508 socket2.send(b"Hello world", &addr1).await?;
509 IoResult::Ok(())
510 };
511
512 let (recv_r, send_r) = futures::join!(task1, task2);
513 send_r?;
514 let (buff, addr) = recv_r?;
515 assert_eq!(addr2, addr);
516 assert_eq!(&buff, b"Hello world");
517
518 Ok(())
519 })
520 }
521
522 fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
527 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
528 let rt1 = runtime.clone();
529
530 let listener = runtime
531 .block_on(rt1.listen(&SocketAddr::from(localhost)))
532 .unwrap();
533 let addr = listener.local_addr().unwrap();
534 let mut stream = listener.incoming();
535
536 runtime.block_on(async {
537 let task1 = async {
538 let mut n = 0_u32;
539 loop {
540 let (mut con, _addr) = stream.next().await.unwrap()?;
541 let mut buf = [0_u8; 11];
542 con.read_exact(&mut buf[..]).await?;
543 n += 1;
544 if &buf[..] == b"world done!" {
545 break IoResult::Ok(n);
546 }
547 }
548 };
549 let task2 = async {
550 for _ in 0_u8..5 {
551 let mut con = rt1.connect(&addr).await?;
552 con.write_all(b"Hello world").await?;
553 con.flush().await?;
554 }
555 let mut con = rt1.connect(&addr).await?;
556 con.write_all(b"world done!").await?;
557 con.flush().await?;
558 con.close().await?;
559 IoResult::Ok(())
560 };
561
562 let (n, send_r) = futures::join!(task1, task2);
563 send_r?;
564
565 assert_eq!(n?, 6);
566
567 Ok(())
568 })
569 }
570
571 fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
576 static PFX_ID: &[u8] = include_bytes!("test.pfx");
587 static PFX_PASSWORD: &str = "abc";
590
591 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
592 let listener = std::net::TcpListener::bind(localhost)?;
593 let addr = listener.local_addr()?;
594
595 let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
596
597 let th = std::thread::spawn(move || {
599 use std::io::{Read, Write};
601 let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
602 let (con, _addr) = listener.accept()?;
603 let mut con = acceptor.accept(con).unwrap();
604 let mut buf = [0_u8; 16];
605 loop {
606 let n = con.read(&mut buf)?;
607 if n == 0 {
608 break;
609 }
610 con.write_all(&buf[..n])?;
611 }
612 IoResult::Ok(())
613 });
614
615 let connector = runtime.tls_connector();
616
617 runtime.block_on(async {
618 let text = b"I Suddenly Dont Understand Anything";
619 let mut buf = vec![0_u8; text.len()];
620 let conn = runtime.connect(&addr).await?;
621 let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
622 assert!(conn.peer_certificate()?.is_some());
623 conn.write_all(text).await?;
624 conn.flush().await?;
625 conn.read_exact(&mut buf[..]).await?;
626 assert_eq!(&buf[..], text);
627 conn.close().await?;
628 IoResult::Ok(())
629 })?;
630
631 th.join().unwrap()?;
632 IoResult::Ok(())
633 }
634
635 macro_rules! tests_with_runtime {
636 { $runtime:expr => $($id:ident),* $(,)? } => {
637 $(
638 #[test]
639 fn $id() -> std::io::Result<()> {
640 super::$id($runtime)
641 }
642 )*
643 }
644 }
645
646 macro_rules! runtime_tests {
647 { $($id:ident),* $(,)? } =>
648 {
649 #[cfg(feature="tokio")]
650 mod tokio_runtime_tests {
651 tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
652 }
653 #[cfg(feature="async-std")]
654 mod async_std_runtime_tests {
655 tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
656 }
657 mod default_runtime_tests {
658 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
659 }
660 }
661 }
662
663 macro_rules! tls_runtime_tests {
664 { $($id:ident),* $(,)? } =>
665 {
666 #[cfg(all(feature="tokio", feature = "native-tls"))]
667 mod tokio_native_tls_tests {
668 tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
669 }
670 #[cfg(all(feature="async-std", feature = "native-tls"))]
671 mod async_std_native_tls_tests {
672 tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
673 }
674 #[cfg(all(feature="tokio", feature="rustls"))]
675 mod tokio_rustls_tests {
676 tests_with_runtime! { &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
677 }
678 #[cfg(all(feature="async-std", feature="rustls"))]
679 mod async_std_rustls_tests {
680 tests_with_runtime! { &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
681 }
682 mod default_runtime_tls_tests {
683 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
684 }
685 }
686 }
687
688 runtime_tests! {
689 small_delay,
690 small_timeout_ok,
691 small_timeout_expire,
692 tiny_wallclock,
693 self_connect_tcp,
694 self_connect_udp,
695 listener_stream,
696 }
697
698 tls_runtime_tests! {
699 simple_tls,
700 }
701}