async_executors/exec/tokio_tp.rs
1//! Provides TokioTp executor specific functionality.
2//
3use
4{
5 crate :: { SpawnHandle, JoinHandle, BlockingHandle } ,
6 std :: { fmt, sync::Arc, future::Future, convert::TryFrom } ,
7 futures_task :: { FutureObj, Spawn, SpawnError } ,
8 tokio::runtime :: { Runtime, RuntimeFlavor, Handle, Builder } ,
9};
10
11
12/// An executor that uses [`tokio::runtime::Runtime`].
13///
14/// ## Example
15///
16/// The following example shows how to pass an executor to a library function.
17///
18/// ```rust
19/// use
20/// {
21/// futures :: { task::{ Spawn, SpawnExt } } ,
22/// async_executors :: { TokioTp } ,
23/// tokio::runtime :: { Builder } ,
24/// std::convert :: { TryFrom } ,
25/// futures::channel :: { oneshot, oneshot::Sender } ,
26/// };
27///
28///
29/// fn lib_function( exec: impl Spawn, tx: Sender<&'static str> )
30/// {
31/// exec.spawn( async
32/// {
33/// tx.send( "I can spawn from a library" ).expect( "send string" );
34///
35/// }).expect( "spawn task" );
36/// }
37///
38///
39/// fn main()
40/// {
41/// // This creates the runtime with defaults. It enables io and timers based on
42/// // the features enabled on _async_executors_. You can also create `TokioTp` from
43/// // a tokio `Runtime` or a `Handle`.
44/// //
45/// let exec = TokioTp::new().expect( "create tokio threadpool" );
46///
47/// let program = async
48/// {
49/// let (tx, rx) = oneshot::channel();
50///
51/// lib_function( &exec, tx );
52/// assert_eq!( "I can spawn from a library", rx.await.expect( "receive on channel" ) );
53/// };
54///
55/// exec.block_on( program );
56/// }
57/// ```
58///
59///
60/// ## Unwind Safety.
61///
62/// You must only spawn futures to this API that are unwind safe. Tokio will wrap it in
63/// [`std::panic::AssertUnwindSafe`] and wrap the poll invocation with [`std::panic::catch_unwind`].
64///
65/// They reason that this is fine because they require `Send + 'static` on the future. As far
66/// as I can tell this is wrong. Unwind safety can be circumvented in several ways even with
67/// `Send + 'static` (eg. `parking_lot::Mutex` is `Send + 'static` but `!UnwindSafe`).
68///
69/// You should make sure that if your future panics, no code that lives on after the spawned task has
70/// unwound, nor any destructors called during the unwind can observe data in an inconsistent state.
71///
72/// If a future is run with `block_on` as opposed to `spawn`, the panic will not be caught and the
73/// thread calling `block_on` will be unwound.
74///
75/// Note that unwind safety is related to logic errors, not related to the memory safety issues that cannot happen
76/// in safe rust (memory safety, undefined behavior, unsoundness, data races, ...). See the relevant
77/// [catch_unwind RFC](https://github.com/rust-lang/rfcs/blob/master/text/1236-stabilize-catch-panic.md)
78/// and it's discussion threads for more info as well as the documentation of [`std::panic::UnwindSafe`].
79//
80#[ derive( Debug, Clone ) ]
81//
82#[ cfg_attr( nightly, doc(cfg( feature = "tokio_tp" )) ) ]
83//
84pub struct TokioTp
85{
86 spawner: Spawner,
87}
88
89
90#[derive(Debug, Clone)]
91enum Spawner
92{
93 Runtime( Arc<Runtime> ) ,
94 Handle ( Handle ) ,
95}
96
97
98/// Allows to create a [`TokioTp`] from a [`Runtime`].
99///
100/// # Errors
101///
102/// Will fail if you pass a multithreaded runtime. In that case it will return your [`Runtime`].
103//
104impl TryFrom<Runtime> for TokioTp
105{
106 type Error = Runtime;
107
108 fn try_from( rt: Runtime ) -> Result<Self, Runtime>
109 {
110 match rt.handle().runtime_flavor()
111 {
112 RuntimeFlavor::MultiThread => Ok( Self
113 {
114 spawner: Spawner::Runtime( Arc::new(rt) ) ,
115 }),
116
117 _ => Err( rt ),
118 }
119 }
120}
121
122
123/// Allows to create a [`TokioTp`] from a [`Handle`].
124///
125/// # Errors
126///
127/// Will fail if you pass a multithreaded runtime. In that case it will return your [`Handle`].
128//
129impl TryFrom<Handle> for TokioTp
130{
131 type Error = Handle;
132
133 fn try_from( handle: Handle ) -> Result<Self, Handle>
134 {
135 match handle.runtime_flavor()
136 {
137 RuntimeFlavor::MultiThread => Ok( Self
138 {
139 spawner: Spawner::Handle( handle ) ,
140 }),
141
142 _ => Err( handle ),
143 }
144 }
145}
146
147
148
149impl TokioTp
150{
151 /// Create a new [`TokioTp`]. Uses a default multithreaded [`Runtime`] setting timers and io depending
152 /// on the features enabled on _async_executors_.
153 //
154 pub fn new() -> Result<Self, TokioTpErr>
155 {
156 let mut builder = Builder::new_multi_thread();
157
158
159 #[ cfg( feature = "tokio_io" ) ]
160 //
161 builder.enable_io();
162
163 #[ cfg( feature = "tokio_timer" ) ]
164 //
165 builder.enable_time();
166
167
168 let rt = builder.build().map_err( |e| TokioTpErr::Builder(e.kind()) )?;
169
170 Ok(Self
171 {
172 spawner: Spawner::Runtime(Arc::new( rt )),
173 })
174 }
175
176
177
178 /// Try to construct a [`TokioTp`] from the currently entered [`Runtime`]. You can do this
179 /// if you want to construct your runtime with the tokio macros eg:
180 ///
181 /// ```
182 /// #[tokio::main]
183 /// async fn main()
184 /// {
185 /// // ...
186 /// }
187 /// ```
188 ///
189 /// # Warning
190 ///
191 /// `TokioTp::new()` is preferred over this. It's brief, doesn't require macros and is
192 /// the intended behavior for this type. The whole library aims at a paradigm without
193 /// global executors.
194 ///
195 /// The main footgun here is that you are now already in async context, so you must not
196 /// call [`TokioTp::block_on`]. `block_on` will panic when run from within an existing
197 /// async context.
198 ///
199 /// # Errors
200 ///
201 /// Will fail if trying to construct from a current thread runtime or if no runtime
202 /// is running.
203 //
204 pub fn try_current() -> Result< Self, TokioTpErr >
205 {
206 let handle = Handle::try_current()
207 .map_err(|_| TokioTpErr::NoRuntime )?;
208
209 Self::try_from( handle )
210 .map_err(|_| TokioTpErr::WrongFlavour )
211 }
212
213
214 /// Forwards to [`Runtime::block_on`] or [`Handle::block_on`].
215 ///
216 /// # Panics
217 ///
218 /// If called when a runtime is already entered (eg. in async context), like when you created this
219 /// executor with [`TokioTp::try_current`], this will panic.
220 //
221 pub fn block_on< F: Future >( &self, f: F ) -> F::Output
222 {
223 match &self.spawner
224 {
225 Spawner::Runtime( rt ) => rt .block_on( f ) ,
226 Spawner::Handle ( handle ) => handle.block_on( f ) ,
227 }
228 }
229
230
231 /// See: [`tokio::runtime::Runtime::shutdown_timeout`]
232 ///
233 /// This tries to unwrap the `Arc<Runtime>` we hold, so that works only if no other clones are around. If this is not the
234 /// only reference, self will be returned to you as an error. It means you cannot shutdown the runtime because there are
235 /// other clones of the executor still alive.
236 ///
237 /// # Errors
238 /// - [`TokioTpErr::Cloned`]: if the the [`TokioTp`] has been cloned. You can only shut down the last one.
239 /// - [`TokioTpErr::Handle`]: if the the [`TokioTp`] has been created from a handle. That is we don't own the [`Runtime`].
240 //
241 pub fn shutdown_timeout( self, duration: std::time::Duration ) -> Result<(), TokioTpErr>
242 {
243 let Self{ spawner } = self;
244
245 let arc = match spawner
246 {
247 Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
248 Spawner::Runtime( arc ) => arc,
249 };
250
251
252 let rt = match Arc::try_unwrap(arc)
253 {
254 Ok(rt) => rt,
255 Err(arc) =>
256 {
257 let this = Self{ spawner: Spawner::Runtime(arc) };
258 return Err( TokioTpErr::Cloned(this) );
259 }
260 };
261
262 rt.shutdown_timeout( duration );
263
264
265 Ok(())
266 }
267
268
269
270 /// See: [`tokio::runtime::Runtime::shutdown_background`]
271 ///
272 /// This tries to unwrap the `Arc<Runtime>` we hold, so that works only if no other clones are around. If this is not the
273 /// only reference, self will be returned to you as an error. It means you cannot shutdown the runtime because there are
274 /// other clones of the executor still alive.
275 ///
276 /// # Errors
277 /// - [`TokioTpErr::Cloned`]: if the the [`TokioTp`] has been cloned. You can only shut down the last one.
278 /// - [`TokioTpErr::Handle`]: if the the [`TokioTp`] has been created from a handle. That is we don't own the [`Runtime`].
279 //
280 pub fn shutdown_background( self ) -> Result<(), TokioTpErr>
281 {
282 let Self{ spawner } = self;
283
284 let arc = match spawner
285 {
286 Spawner::Handle ( handle ) => return Err( TokioTpErr::Handle(Self{ spawner: Spawner::Handle(handle) }) ) ,
287 Spawner::Runtime( arc ) => arc,
288 };
289
290
291 let rt = match Arc::try_unwrap(arc)
292 {
293 Ok(rt) => rt,
294 Err(arc) =>
295 {
296 let this = Self{ spawner: Spawner::Runtime(arc) };
297 return Err( TokioTpErr::Cloned(this) );
298 }
299 };
300
301 rt.shutdown_background();
302
303
304 Ok(())
305 }
306}
307
308
309#[ cfg( feature = "tokio_io" ) ]
310//
311#[ cfg_attr( nightly, doc(cfg( feature = "tokio_io" )) ) ]
312//
313impl crate::TokioIo for TokioTp {}
314
315
316impl Spawn for TokioTp
317{
318 /// Never fails.
319 fn spawn_obj( &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError>
320 {
321 // We drop the JoinHandle, so the task becomes detached.
322 //
323 match &self.spawner
324 {
325 Spawner::Runtime( rt ) => drop( rt .spawn(future) ) ,
326 Spawner::Handle ( handle ) => drop( handle.spawn(future) ) ,
327 }
328
329 Ok(())
330 }
331}
332
333
334
335impl<Out: 'static + Send> SpawnHandle<Out> for TokioTp
336{
337 fn spawn_handle_obj( &self, future: FutureObj<'static, Out> ) -> Result<JoinHandle<Out>, SpawnError>
338 {
339 let handle = match &self.spawner
340 {
341 Spawner::Runtime( rt ) => rt .spawn(future) ,
342 Spawner::Handle ( handle ) => handle.spawn(future) ,
343 };
344
345 Ok( JoinHandle::tokio(handle) )
346 }
347}
348
349
350
351impl crate::YieldNow for TokioTp {}
352
353
354
355impl<R: Send + 'static> crate::SpawnBlocking<R> for TokioTp
356{
357 fn spawn_blocking<F>( &self, f: F ) -> BlockingHandle<R>
358
359 where F: FnOnce() -> R + Send + 'static ,
360 {
361 let handle = match &self.spawner
362 {
363 Spawner::Runtime( rt ) => rt .spawn_blocking( f ) ,
364 Spawner::Handle ( handle ) => handle.spawn_blocking( f ) ,
365 };
366
367 BlockingHandle::tokio( handle )
368 }
369
370
371 fn spawn_blocking_dyn( &self, f: Box< dyn FnOnce()->R + Send > ) -> BlockingHandle<R>
372 {
373 self.spawn_blocking( f )
374 }
375}
376
377
378
379
380#[ cfg(all( feature = "timer", not(feature="tokio_timer" )) ) ]
381//
382#[ cfg_attr( nightly, doc(cfg(all( feature = "timer", feature = "tokio_tp" ))) ) ]
383//
384impl crate::Timer for TokioTp
385{
386 fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
387 {
388 Box::pin( futures_timer::Delay::new(dur) )
389 }
390}
391
392
393
394#[ cfg( feature = "tokio_timer" ) ]
395//
396#[ cfg_attr( nightly, doc(cfg(all( feature = "tokio_timer", feature = "tokio_tp" ))) ) ]
397//
398impl crate::Timer for TokioTp
399{
400 fn sleep( &self, dur: std::time::Duration ) -> futures_core::future::BoxFuture<'static, ()>
401 {
402 Box::pin( tokio::time::sleep(dur) )
403 }
404}
405
406
407#[cfg( feature = "tokio_tp" )]
408/// A few errors that can happen while using _tokio_ executors.
409#[derive(Debug, Clone)]
410pub enum TokioTpErr
411{
412 /// The [`tokio::runtime::Builder`] returned an error when construting the [`Runtime`].
413 Builder( std::io::ErrorKind ),
414
415 /// There are other clones of the [`Runtime`], so we cannot shut it down.
416 Cloned ( TokioTp ),
417
418 /// This executor was constructed from the a [`Handle`], so cannot be shut down.
419 Handle ( TokioTp ),
420
421 /// Can't create from current runtime because no runtime currently entered.
422 NoRuntime,
423
424 /// Can't construct from a current thread runtime.
425 WrongFlavour,
426}
427
428
429impl fmt::Display for TokioTpErr
430{
431 fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
432 {
433 use TokioTpErr::*;
434
435 match self
436 {
437 Builder(source) =>
438 write!( f, "tokio::runtime::Builder returned an error: {source}" ),
439 Cloned(_) => write!( f, "The TokioTp executor was cloned. Only the last copy can shut it down." ),
440 Handle(_) => write!( f, "The TokioTp was created from tokio::runtime::Handle. Only an owned executor (created from `Runtime`) can be shut down." ),
441 NoRuntime => write!( f, "Call to tokio::Handle::try_current failed, generally because no entered runtime is active." ),
442 WrongFlavour => write!( f, "Can't create TokioTp from a current thread `Runtime`." ),
443 }
444 }
445}
446
447
448impl std::error::Error for TokioTpErr {}