async_executors/iface/
blocking_handle.rs

1#[ allow(unused_imports) ] // some imports are conditional on features
2//
3use
4{
5	std         :: { future::Future, sync::atomic::{ AtomicBool, Ordering } } ,
6	std         :: { task::{ Poll, Context }, pin::Pin                      } ,
7	futures_util:: { future::{ AbortHandle, Aborted, RemoteHandle }, ready  } ,
8	super       :: *,
9};
10
11
12#[ cfg( feature = "async_global" ) ]
13//
14type BoxedFut<T> = Pin<Box< dyn Future<Output=T> + Send >>;
15
16
17/// A framework agnostic BlockingHandle type. This is returned by [`SpawnBlocking`](crate::SpawnBlocking).
18/// Await this handle for the output of the task. As opposed to a [`JoinHandle`], you can't cancel a blocking
19/// task once it has started running. If you drop this after the task starts running, it will just detach
20/// and let the task run in the background.
21//
22#[ derive( Debug ) ]
23//
24pub struct BlockingHandle<T>( InnerBh<T> );
25
26
27impl<T> BlockingHandle<T>
28{
29	/// Make a wrapper around [`tokio::task::JoinHandle`].
30	//
31	#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
32	//
33	pub fn tokio( handle: TokioJoinHandle<T> ) -> Self
34	{
35		Self( InnerBh::Tokio(handle) )
36	}
37
38
39	/// Make a wrapper around [`async_global_executor::Task`].
40	//
41	#[ cfg( feature = "async_global" ) ]
42	//
43	pub fn async_global( task: BoxedFut<T> ) -> Self
44	{
45		Self( InnerBh::AsyncGlobal(task) )
46	}
47
48
49	/// Make a wrapper around [`async_std::task::JoinHandle`](async_std_crate::task::JoinHandle).
50	//
51	#[ cfg( feature = "async_std" ) ]
52	//
53	pub fn async_std( handle: AsyncStdJoinHandle<T> ) -> Self
54	{
55		Self( InnerBh::AsyncStd(handle) )
56	}
57}
58
59
60
61#[ allow(dead_code) ]
62//
63enum InnerBh<T>
64{
65	/// Wrapper around tokio BlockingHandle.
66	//
67	#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
68	//
69	Tokio( TokioJoinHandle<T> ),
70
71	/// Wrapper around AsyncStd BlockingHandle.
72	//
73	#[ cfg( feature = "async_global" ) ]
74	//
75	AsyncGlobal( BoxedFut<T> ),
76
77	/// Wrapper around AsyncStd BlockingHandle.
78	//
79	#[ cfg( feature = "async_std" ) ]
80	//
81	AsyncStd( AsyncStdJoinHandle<T> ),
82
83	// Since the other variants are behind feature flags, the generic won't be
84	// used if we don't include this.
85	//
86	Phantom( std::marker::PhantomData< fn()->T > ),
87}
88
89
90
91impl<T: 'static> Future for BlockingHandle<T>
92{
93	type Output = T;
94
95	fn poll( self: Pin<&mut Self>, _cx: &mut Context<'_> ) -> Poll<Self::Output>
96	{
97		match &mut self.get_mut().0
98		{
99			#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
100			//
101			InnerBh::Tokio(handle) =>
102			{
103				match ready!( Pin::new( handle ).poll( _cx ) )
104				{
105					Ok(t)  => Poll::Ready( t ),
106
107					Err(e) => panic!( "Task has been canceled or has panicked. \
108						Are you dropping the executor to early? Error: {}", e ),
109				}
110			}
111
112			#[ cfg( feature = "async_std"    ) ] InnerBh::AsyncStd   ( handle ) => Pin::new( handle ).poll( _cx ) ,
113			#[ cfg( feature = "async_global" ) ] InnerBh::AsyncGlobal( task   ) => Pin::new( task   ).poll( _cx ) ,
114
115			InnerBh::Phantom(_) => unreachable!(),
116		}
117	}
118}
119
120
121
122impl<T> std::fmt::Debug for InnerBh<T>
123{
124	fn fmt( &self,	f: &mut std::fmt::Formatter<'_> ) -> std::fmt::Result
125	{
126		write!( f, "InnerBh" )
127	}
128}