async_executors/exec/
tokio_tp.rs

1//! Provides TokioTp executor specific functionality.
2//
3use
4{
5	crate          :: { SpawnHandle, JoinHandle, BlockingHandle          } ,
6	std            :: { fmt, sync::Arc, future::Future, convert::TryFrom } ,
7	futures_task   :: { FutureObj, Spawn, SpawnError                     } ,
8	tokio::runtime :: { Runtime, RuntimeFlavor, Handle, Builder          } ,
9};
10
11
12/// An executor that uses [`tokio::runtime::Runtime`].
13///
14/// ## Example
15///
16/// The following example shows how to pass an executor to a library function.
17///
18/// ```rust
19/// use
20/// {
21///    futures          :: { task::{ Spawn, SpawnExt } } ,
22///    async_executors  :: { TokioTp                   } ,
23///    tokio::runtime   :: { Builder                   } ,
24///    std::convert     :: { TryFrom                   } ,
25///    futures::channel :: { oneshot, oneshot::Sender  } ,
26/// };
27///
28///
29/// fn lib_function( exec: impl Spawn, tx: Sender<&'static str> )
30/// {
31///    exec.spawn( async
32///    {
33///       tx.send( "I can spawn from a library" ).expect( "send string" );
34///
35///    }).expect( "spawn task" );
36/// }
37///
38///
39/// fn main()
40/// {
41///    // This creates the runtime with defaults. It enables io and timers based on
42///    // the features enabled on _async_executors_. You can also create `TokioTp` from
43///    // a tokio `Runtime` or a `Handle`.
44///    //
45///    let exec = TokioTp::new().expect( "create tokio threadpool" );
46///
47///    let program = async
48///    {
49///       let (tx, rx) = oneshot::channel();
50///
51///       lib_function( &exec, tx );
52///       assert_eq!( "I can spawn from a library", rx.await.expect( "receive on channel" ) );
53///    };
54///
55///    exec.block_on( program );
56/// }
57/// ```
58///
59///
60/// ## Unwind Safety.
61///
62/// You must only spawn futures to this API that are unwind safe. Tokio will wrap it in
63/// [`std::panic::AssertUnwindSafe`] and wrap the poll invocation with [`std::panic::catch_unwind`].
64///
65/// They reason that this is fine because they require `Send + 'static` on the future. As far
66/// as I can tell this is wrong. Unwind safety can be circumvented in several ways even with
67/// `Send + 'static` (eg. `parking_lot::Mutex` is `Send + 'static` but `!UnwindSafe`).
68///
69/// You should make sure that if your future panics, no code that lives on after the spawned task has
70/// unwound, nor any destructors called during the unwind can observe data in an inconsistent state.
71///
72/// If a future is run with `block_on` as opposed to `spawn`, the panic will not be caught and the
73/// thread calling `block_on` will be unwound.
74///
75/// Note that unwind safety is related to logic errors, not related to the memory safety issues that cannot happen
76/// in safe rust (memory safety, undefined behavior, unsoundness, data races, ...). See the relevant
77/// [catch_unwind RFC](https://github.com/rust-lang/rfcs/blob/master/text/1236-stabilize-catch-panic.md)
78/// and it's discussion threads for more info as well as the documentation of [`std::panic::UnwindSafe`].
79//
80#[ derive( Debug, Clone ) ]
81//
82#[ cfg_attr( nightly, doc(cfg( feature = "tokio_tp" )) ) ]
83//
84pub struct TokioTp
85{
86	spawner: Spawner,
87}
88
89
90#[derive(Debug, Clone)]
91enum Spawner
92{
93	Runtime( Arc<Runtime> ) ,
94	Handle ( Handle       ) ,
95}
96
97
98/// Allows to create a [`TokioTp`] from a [`Runtime`].
99///
100/// # Errors
101///
102/// Will fail if you pass a multithreaded runtime. In that case it will return your [`Runtime`].
103//
104impl TryFrom<Runtime> for TokioTp
105{
106	type Error = Runtime;
107
108	fn try_from( rt: Runtime ) -> Result<Self, Runtime>
109	{
110		match rt.handle().runtime_flavor()
111		{
112			RuntimeFlavor::MultiThread => Ok( Self
113			{
114				spawner: Spawner::Runtime( Arc::new(rt) ) ,
115			}),
116
117			_ => Err( rt ),
118		}
119	}
120}
121
122
123/// Allows to create a [`TokioTp`] from a [`Handle`].
124///
125/// # Errors
126///
127/// Will fail if you pass a multithreaded runtime. In that case it will return your [`Handle`].
128//
129impl TryFrom<Handle> for TokioTp
130{
131	type Error = Handle;
132
133	fn try_from( handle: Handle ) -> Result<Self, Handle>
134	{
135		match handle.runtime_flavor()
136		{
137			RuntimeFlavor::MultiThread => Ok( Self
138			{
139				spawner: Spawner::Handle( handle ) ,
140			}),
141
142			_ => Err( handle ),
143		}
144	}
145}
146
147
148
149impl TokioTp
150{
151	/// Create a new [`TokioTp`]. Uses a default multithreaded [`Runtime`] setting timers and io depending
152	/// on the features enabled on _async_executors_.
153	//
154	pub fn new() -> Result<Self, TokioTpErr>
155	{
156		let mut builder = Builder::new_multi_thread();
157
158
159		#[ cfg( feature = "tokio_io" ) ]
160		//
161		builder.enable_io();
162
163		#[ cfg( feature = "tokio_timer" ) ]
164		//
165		builder.enable_time();
166
167
168		let rt = builder.build().map_err( |e| TokioTpErr::Builder(e.kind()) )?;
169
170		Ok(Self
171		{
172			spawner: Spawner::Runtime(Arc::new( rt )),
173		})
174	}
175
176
177
178	/// Try to construct a [`TokioTp`] from the currently entered [`Runtime`]. You can do this
179	/// if you want to construct your runtime with the tokio macros eg:
180	///
181	/// ```
182	/// #[tokio::main]
183	/// async fn main()
184	/// {
185	///    // ...
186	/// }
187	/// ```
188	///
189	/// # Warning
190	///
191	/// `TokioTp::new()` is preferred over this. It's brief, doesn't require macros and is
192	/// the intended behavior for this type. The whole library aims at a paradigm without
193	/// global executors.
194	///
195	/// The main footgun here is that you are now already in async context, so you must not
196	/// call [`TokioTp::block_on`]. `block_on` will panic when run from within an existing
197	/// async context.
198	///
199	/// # Errors
200	///
201	/// Will fail if trying to construct from a current thread runtime or if no runtime
202	/// is running.
203	//
204	pub fn try_current() -> Result< Self, TokioTpErr >
205	{
206		let handle = Handle::try_current()
207			.map_err(|_| TokioTpErr::NoRuntime )?;
208
209		Self::try_from( handle )
210			.map_err(|_| TokioTpErr::WrongFlavour )
211	}
212
213
214	/// Forwards to [`Runtime::block_on`] or [`Handle::block_on`].
215	///
216	/// # Panics
217	///
218	/// If called when a runtime is already entered (eg. in async context), like when you created this
219	/// executor with [`TokioTp::try_current`], this will panic.
220	//
221	pub fn block_on< F: Future >( &self, f: F ) -> F::Output
222	{
223		match &self.spawner
224		{
225			Spawner::Runtime( rt     ) => rt    .block_on( f ) ,
226			Spawner::Handle ( handle ) => handle.block_on( f ) ,
227		}
228	}
229
230
231	/// See: [`tokio::runtime::Runtime::shutdown_timeout`]
232	///
233	///  This tries to unwrap the `Arc<Runtime>` we hold, so that works only if no other clones are around. If this is not the
234	///  only reference, self will be returned to you as an error. It means you cannot shutdown the runtime because there are
235	///  other clones of the executor still alive.
236	///
237	///  # Errors
238	///  - [`TokioTpErr::Cloned`]: if the the [`TokioTp`] has been cloned. You can only shut down the last one.
239	///  - [`TokioTpErr::Handle`]: if the the [`TokioTp`] has been created from a handle. That is we don't own the [`Runtime`].
240	//
241	pub fn shutdown_timeout( self, duration: std::time::Duration ) -> Result<(), TokioTpErr>
242	{
243		let Self{ spawner } = self;
244
245		let arc = match spawner
246		{
247			Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
248			Spawner::Runtime( arc    ) => arc,
249		};
250
251
252		let rt  = match Arc::try_unwrap(arc)
253		{
254			Ok(rt) => rt,
255			Err(arc) =>
256			{
257				let this = Self{ spawner: Spawner::Runtime(arc) };
258				return Err( TokioTpErr::Cloned(this) );
259			}
260		};
261
262		rt.shutdown_timeout( duration );
263
264
265		Ok(())
266	}
267
268
269
270	/// See: [`tokio::runtime::Runtime::shutdown_background`]
271	///
272	///  This tries to unwrap the `Arc<Runtime>` we hold, so that works only if no other clones are around. If this is not the
273	///  only reference, self will be returned to you as an error. It means you cannot shutdown the runtime because there are
274	///  other clones of the executor still alive.
275	///
276	///  # Errors
277	///  - [`TokioTpErr::Cloned`]: if the the [`TokioTp`] has been cloned. You can only shut down the last one.
278	///  - [`TokioTpErr::Handle`]: if the the [`TokioTp`] has been created from a handle. That is we don't own the [`Runtime`].
279	//
280	pub fn shutdown_background( self ) -> Result<(), TokioTpErr>
281	{
282		let Self{ spawner } = self;
283
284		let arc = match spawner
285		{
286			Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
287			Spawner::Runtime( arc    ) => arc,
288		};
289
290
291		let rt  = match Arc::try_unwrap(arc)
292		{
293			Ok(rt) => rt,
294			Err(arc) =>
295			{
296				let this = Self{ spawner: Spawner::Runtime(arc) };
297				return Err( TokioTpErr::Cloned(this) );
298			}
299		};
300
301		rt.shutdown_background();
302
303
304		Ok(())
305	}
306}
307
308
309#[ cfg( feature = "tokio_io" ) ]
310//
311#[ cfg_attr( nightly, doc(cfg( feature = "tokio_io" )) ) ]
312//
313impl crate::TokioIo for TokioTp {}
314
315
316impl Spawn for TokioTp
317{
318	/// Never fails.
319	fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
320	{
321		// We drop the JoinHandle, so the task becomes detached.
322		//
323		match &self.spawner
324		{
325			Spawner::Runtime( rt     ) => drop( rt    .spawn(future) ) ,
326			Spawner::Handle ( handle ) => drop( handle.spawn(future) ) ,
327		}
328
329		Ok(())
330	}
331}
332
333
334
335impl<Out: 'static + Send> SpawnHandle<Out> for TokioTp
336{
337	fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
338	{
339		let handle = match &self.spawner
340		{
341			Spawner::Runtime( rt     ) => rt    .spawn(future) ,
342			Spawner::Handle ( handle ) => handle.spawn(future) ,
343		};
344
345		Ok( JoinHandle::tokio(handle) )
346	}
347}
348
349
350
351impl crate::YieldNow for TokioTp {}
352
353
354
355impl<R: Send + 'static> crate::SpawnBlocking<R> for TokioTp
356{
357	fn spawn_blocking<F>( &self, f: F ) -> BlockingHandle<R>
358
359		where F: FnOnce() -> R + Send + 'static ,
360	{
361		let handle = match &self.spawner
362		{
363			Spawner::Runtime( rt     ) => rt    .spawn_blocking( f ) ,
364			Spawner::Handle ( handle ) => handle.spawn_blocking( f ) ,
365		};
366
367		BlockingHandle::tokio( handle )
368	}
369
370
371	fn spawn_blocking_dyn( &self, f: Box< dyn FnOnce()->R + Send > ) -> BlockingHandle<R>
372	{
373		self.spawn_blocking( f )
374	}
375}
376
377
378
379
380#[ cfg(all( feature = "timer", not(feature="tokio_timer" )) ) ]
381//
382#[ cfg_attr( nightly, doc(cfg(all( feature = "timer", feature = "tokio_tp" ))) ) ]
383//
384impl crate::Timer for TokioTp
385{
386	fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
387	{
388		Box::pin( futures_timer::Delay::new(dur) )
389	}
390}
391
392
393
394#[ cfg( feature = "tokio_timer" ) ]
395//
396#[ cfg_attr( nightly, doc(cfg(all( feature = "tokio_timer", feature = "tokio_tp" ))) ) ]
397//
398impl crate::Timer for TokioTp
399{
400	fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
401	{
402		Box::pin( tokio::time::sleep(dur) )
403	}
404}
405
406
407#[cfg( feature = "tokio_tp" )]
408/// A few errors that can happen while using _tokio_ executors.
409#[derive(Debug, Clone)]
410pub enum TokioTpErr
411{
412	/// The [`tokio::runtime::Builder`] returned an error when construting the [`Runtime`].
413	Builder( std::io::ErrorKind ),
414
415	/// There are other clones of the [`Runtime`], so we cannot shut it down.
416	Cloned ( TokioTp ),
417
418	/// This executor was constructed from the a [`Handle`], so cannot be shut down.
419	Handle ( TokioTp ),
420
421	/// Can't create from current runtime because no runtime currently entered.
422	NoRuntime,
423
424	/// Can't construct from a current thread runtime.
425	WrongFlavour,
426}
427
428
429impl fmt::Display for TokioTpErr
430{
431	fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
432	{
433		use TokioTpErr::*;
434
435		match self
436		{
437			Builder(source) =>
438				write!( f, "tokio::runtime::Builder returned an error: {source}" ),
439			Cloned(_) => write!( f, "The TokioTp executor was cloned. Only the last copy can shut it down." ),
440			Handle(_) => write!( f, "The TokioTp was created from tokio::runtime::Handle. Only an owned executor (created from `Runtime`) can be shut down." ),
441			NoRuntime => write!( f, "Call to tokio::Handle::try_current failed, generally because no entered runtime is active." ),
442			WrongFlavour => write!( f, "Can't create TokioTp from a current thread `Runtime`." ),
443		}
444	}
445}
446
447
448impl std::error::Error for TokioTpErr {}