async_executors/iface/
join_handle.rs

1#[ allow(unused_imports) ] // some imports are conditional on features
2//
3use
4{
5	std         :: { future::Future, sync::atomic::{ AtomicBool, Ordering } } ,
6	std         :: { task::{ Poll, Context }, pin::Pin                      } ,
7	futures_util:: { future::{ AbortHandle, Aborted, RemoteHandle }, ready  } ,
8	super :: *,
9};
10
11
12
13
14/// A framework agnostic JoinHandle type. Cancels the future on dropping the handle.
15/// You can call [`detach`](JoinHandle::detach) to leave the future running when dropping the handle.
16///
17/// This leverages the performance gains from the native join handles compared to
18/// [RemoteHandle](futures_util::future::RemoteHandle) where possible.
19///
20/// It does wrap futures in [Abortable](futures_util::future::Abortable) where needed as
21/// [_async-std_](async_std_crate)'s canceling is asynchronous, which we can't call during drop.
22///
23/// # Panics
24///
25/// There is an inconsistency between executors when it comes to a panicking task.
26/// Generally we unwind the thread on which the handle is awaited when a task panics,
27/// but async-std will also let the executor working thread unwind. No `catch_unwind` was added to
28/// bring async-std in line with the other executors here.
29///
30/// Awaiting the JoinHandle can also panic if you drop the executor before it completes.
31//
32#[ derive( Debug ) ]
33//
34#[ must_use = "JoinHandle will cancel your future when dropped unless you await it." ]
35//
36pub struct JoinHandle<T> { inner: InnerJh<T> }
37
38
39
40impl<T> JoinHandle<T>
41{
42	/// Make a wrapper around [`tokio::task::JoinHandle`].
43	//
44	#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
45	//
46	pub fn tokio( handle: TokioJoinHandle<T> ) -> Self
47	{
48		let detached = false;
49		let inner    = InnerJh::Tokio { handle, detached };
50
51		Self{ inner }
52	}
53
54
55
56	/// Make a wrapper around [`async_global_executor::Task`].
57	//
58	#[ cfg( feature = "async_global" ) ]
59	//
60	pub fn async_global( task: AsyncGlobalTask<T> ) -> Self
61	{
62		let task  = Some( task );
63		let inner = InnerJh::AsyncGlobal{ task };
64
65		Self{ inner }
66	}
67
68
69
70	/// Make a wrapper around [`async_std::task::JoinHandle`](async_std_crate::task::JoinHandle). The task needs to
71	/// be wrapped in an abortable so we can cancel it on drop.
72	//
73	#[ cfg( feature = "async_std" ) ]
74	//
75	pub fn async_std
76	(
77		handle  : AsyncStdJoinHandle<Result<T, Aborted>> ,
78		a_handle: AbortHandle                            ,
79
80	) -> Self
81	{
82		let detached = false;
83		let inner    = InnerJh::AsyncStd{ handle, a_handle, detached };
84
85		Self{ inner }
86	}
87
88
89	/// Make a wrapper around [`futures_util::future::RemoteHandle`].
90	//
91	pub fn remote_handle( handle: RemoteHandle<T> ) -> Self
92	{
93		let inner = InnerJh::RemoteHandle{ handle: Some(handle) };
94
95		Self{ inner }
96	}
97}
98
99
100
101#[ derive(Debug) ] #[ allow(dead_code) ]
102//
103enum InnerJh<T>
104{
105	/// Wrapper around tokio JoinHandle.
106	//
107	#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
108	//
109	Tokio
110	{
111		handle  : TokioJoinHandle<T> ,
112		detached: bool               ,
113	},
114
115	/// Wrapper around AsyncStd JoinHandle.
116	//
117	#[ cfg( feature = "async_global" ) ]
118	//
119	AsyncGlobal
120	{
121		task: Option< AsyncGlobalTask<T> > ,
122	},
123
124	/// Wrapper around AsyncStd JoinHandle.
125	//
126	#[ cfg( feature = "async_std" ) ]
127	//
128	AsyncStd
129	{
130		handle  : AsyncStdJoinHandle<Result<T, Aborted>> ,
131		a_handle: AbortHandle                            ,
132		detached: bool                                   ,
133	},
134
135	/// Wrapper around futures RemoteHandle.
136	//
137	RemoteHandle
138	{
139		handle: Option<RemoteHandle<T>>,
140	},
141}
142
143
144
145impl<T> JoinHandle<T>
146{
147	/// Drops this handle without canceling the underlying future.
148	///
149	/// This method can be used if you want to drop the handle, but let the execution continue.
150	//
151	pub fn detach( mut self )
152	{
153		match &mut self.inner
154		{
155			#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
156			//
157			InnerJh::Tokio{ ref mut detached, .. } =>
158			{
159				// only other use of this is in Drop impl and we consume self here,
160				// so there cannot be any race as this does not sync things across threads,
161				// hence Relaxed ordering.
162				//
163				*detached = true;
164			}
165
166			#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task } =>
167			{
168				let task = task.take();
169				task.unwrap().detach();
170			}
171
172			#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ ref mut detached, .. } =>
173			{
174				*detached = true;
175			}
176
177			InnerJh::RemoteHandle{ handle } =>
178			{
179				if let Some(rh) = handle.take() { rh.forget() };
180			}
181		}
182	}
183}
184
185
186
187impl<T: 'static> Future for JoinHandle<T>
188{
189	type Output = T;
190
191	fn poll( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>
192	{
193		match &mut self.get_mut().inner
194		{
195			#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
196			//
197			InnerJh::Tokio{ handle, .. } =>
198			{
199				match ready!( Pin::new( handle ).poll( cx ) )
200				{
201					Ok (t) => Poll::Ready( t ),
202
203					Err(e) =>
204					{
205						panic!( "Task has been canceled or it has panicked. Are you dropping the executor to early? Error: {}", e );
206					}
207				}
208			}
209
210
211			#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ handle, .. } =>
212			{
213				match ready!( Pin::new( handle ).poll( cx ) )
214				{
215					Ok (t) => Poll::Ready( t ),
216					Err(_) => unreachable!(),
217				}
218			}
219
220
221			#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task, .. } =>
222			{
223				Pin::new( task.as_mut().unwrap() ).poll( cx )
224			}
225
226
227			InnerJh::RemoteHandle{ ref mut handle } => Pin::new( handle ).as_pin_mut().expect( "no polling after detach" ).poll( cx ),
228		}
229	}
230}
231
232
233
234impl<T> Drop for JoinHandle<T>
235{
236	// see reasoning about Relaxed atomic in detach().
237	//
238	fn drop( &mut self )
239	{
240		match &mut self.inner
241		{
242			#[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
243			//
244			InnerJh::Tokio{ handle, detached, .. } =>
245
246				if !*detached { handle.abort() },
247
248
249			#[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd { a_handle, detached, .. } =>
250
251				if !*detached { a_handle.abort() },
252
253
254			// Nothing needs to be done, just drop it.
255			//
256			#[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal { .. } => {}
257
258
259			InnerJh::RemoteHandle{ .. } => {},
260		};
261	}
262}