tokio/runtime/
runtime.rs

1use super::BOX_FUTURE_THRESHOLD;
2use crate::runtime::blocking::BlockingPool;
3use crate::runtime::scheduler::CurrentThread;
4use crate::runtime::{context, EnterGuard, Handle};
5use crate::task::JoinHandle;
6use crate::util::trace::SpawnMeta;
7
8use std::future::Future;
9use std::mem;
10use std::time::Duration;
11
12cfg_rt_multi_thread! {
13    use crate::runtime::Builder;
14    use crate::runtime::scheduler::MultiThread;
15
16    cfg_unstable! {
17        use crate::runtime::scheduler::MultiThreadAlt;
18    }
19}
20
21/// The Tokio runtime.
22///
23/// The runtime provides an I/O driver, task scheduler, [timer], and
24/// blocking pool, necessary for running asynchronous tasks.
25///
26/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
27/// However, most users will use the [`#[tokio::main]`][main] annotation on
28/// their entry point instead.
29///
30/// See [module level][mod] documentation for more details.
31///
32/// # Shutdown
33///
34/// Shutting down the runtime is done by dropping the value, or calling
35/// [`shutdown_background`] or [`shutdown_timeout`].
36///
37/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
38/// Then they are dropped. They are not *guaranteed* to run to completion, but
39/// *might* do so if they do not yield until completion.
40///
41/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
42/// until they return.
43///
44/// The thread initiating the shutdown blocks until all spawned work has been
45/// stopped. This can take an indefinite amount of time. The `Drop`
46/// implementation waits forever for this.
47///
48/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
49/// waiting forever is undesired. When the timeout is reached, spawned work that
50/// did not stop in time and threads running it are leaked. The work continues
51/// to run until one of the stopping conditions is fulfilled, but the thread
52/// initiating the shutdown is unblocked.
53///
54/// Once the runtime has been dropped, any outstanding I/O resources bound to
55/// it will no longer function. Calling any method on them will result in an
56/// error.
57///
58/// # Sharing
59///
60/// There are several ways to establish shared access to a Tokio runtime:
61///
62///  * Using an <code>[Arc]\<Runtime></code>.
63///  * Using a [`Handle`].
64///  * Entering the runtime context.
65///
66/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
67/// things with the runtime such as spawning new tasks or entering the runtime
68/// context. Both types can be cloned to create a new handle that allows access
69/// to the same runtime. By passing clones into different tasks or threads, you
70/// will be able to access the runtime from those tasks or threads.
71///
72/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
73/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
74/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
75/// runtime happens when the destructor of the `Runtime` object runs.
76///
77/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
78/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
79/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
80/// reference is left over.
81///
82/// The runtime context is entered using the [`Runtime::enter`] or
83/// [`Handle::enter`] methods, which use a thread-local variable to store the
84/// current runtime. Whenever you are inside the runtime context, methods such
85/// as [`tokio::spawn`] will use the runtime whose context you are inside.
86///
87/// [timer]: crate::time
88/// [mod]: index.html
89/// [`new`]: method@Self::new
90/// [`Builder`]: struct@Builder
91/// [`Handle`]: struct@Handle
92/// [main]: macro@crate::main
93/// [`tokio::spawn`]: crate::spawn
94/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
95/// [Arc]: std::sync::Arc
96/// [`shutdown_background`]: method@Runtime::shutdown_background
97/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
98#[derive(Debug)]
99pub struct Runtime {
100    /// Task scheduler
101    scheduler: Scheduler,
102
103    /// Handle to runtime, also contains driver handles
104    handle: Handle,
105
106    /// Blocking pool handle, used to signal shutdown
107    blocking_pool: BlockingPool,
108}
109
110/// The flavor of a `Runtime`.
111///
112/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
113#[derive(Debug, PartialEq, Eq)]
114#[non_exhaustive]
115pub enum RuntimeFlavor {
116    /// The flavor that executes all tasks on the current thread.
117    CurrentThread,
118    /// The flavor that executes tasks across multiple threads.
119    MultiThread,
120    /// The flavor that executes tasks across multiple threads.
121    #[cfg(tokio_unstable)]
122    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
123    MultiThreadAlt,
124}
125
126/// The runtime scheduler is either a multi-thread or a current-thread executor.
127#[derive(Debug)]
128pub(super) enum Scheduler {
129    /// Execute all tasks on the current-thread.
130    CurrentThread(CurrentThread),
131
132    /// Execute tasks across multiple threads.
133    #[cfg(feature = "rt-multi-thread")]
134    MultiThread(MultiThread),
135
136    /// Execute tasks across multiple threads.
137    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
138    MultiThreadAlt(MultiThreadAlt),
139}
140
141impl Runtime {
142    pub(super) fn from_parts(
143        scheduler: Scheduler,
144        handle: Handle,
145        blocking_pool: BlockingPool,
146    ) -> Runtime {
147        Runtime {
148            scheduler,
149            handle,
150            blocking_pool,
151        }
152    }
153
154    /// Creates a new runtime instance with default configuration values.
155    ///
156    /// This results in the multi threaded scheduler, I/O driver, and time driver being
157    /// initialized.
158    ///
159    /// Most applications will not need to call this function directly. Instead,
160    /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
161    /// configuration is necessary, the [runtime builder] may be used.
162    ///
163    /// See [module level][mod] documentation for more details.
164    ///
165    /// # Examples
166    ///
167    /// Creating a new `Runtime` with default configuration values.
168    ///
169    /// ```
170    /// use tokio::runtime::Runtime;
171    ///
172    /// let rt = Runtime::new()
173    ///     .unwrap();
174    ///
175    /// // Use the runtime...
176    /// ```
177    ///
178    /// [mod]: index.html
179    /// [main]: ../attr.main.html
180    /// [threaded scheduler]: index.html#threaded-scheduler
181    /// [runtime builder]: crate::runtime::Builder
182    #[cfg(feature = "rt-multi-thread")]
183    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
184    pub fn new() -> std::io::Result<Runtime> {
185        Builder::new_multi_thread().enable_all().build()
186    }
187
188    /// Returns a handle to the runtime's spawner.
189    ///
190    /// The returned handle can be used to spawn tasks that run on this runtime, and can
191    /// be cloned to allow moving the `Handle` to other threads.
192    ///
193    /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
194    /// Refer to the documentation of [`Handle::block_on`] for more.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// use tokio::runtime::Runtime;
200    ///
201    /// let rt = Runtime::new()
202    ///     .unwrap();
203    ///
204    /// let handle = rt.handle();
205    ///
206    /// // Use the handle...
207    /// ```
208    pub fn handle(&self) -> &Handle {
209        &self.handle
210    }
211
212    /// Spawns a future onto the Tokio runtime.
213    ///
214    /// This spawns the given future onto the runtime's executor, usually a
215    /// thread pool. The thread pool is then responsible for polling the future
216    /// until it completes.
217    ///
218    /// The provided future will start running in the background immediately
219    /// when `spawn` is called, even if you don't await the returned
220    /// `JoinHandle`.
221    ///
222    /// See [module level][mod] documentation for more details.
223    ///
224    /// [mod]: index.html
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use tokio::runtime::Runtime;
230    ///
231    /// # fn dox() {
232    /// // Create the runtime
233    /// let rt = Runtime::new().unwrap();
234    ///
235    /// // Spawn a future onto the runtime
236    /// rt.spawn(async {
237    ///     println!("now running on a worker thread");
238    /// });
239    /// # }
240    /// ```
241    #[track_caller]
242    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
243    where
244        F: Future + Send + 'static,
245        F::Output: Send + 'static,
246    {
247        let fut_size = mem::size_of::<F>();
248        if fut_size > BOX_FUTURE_THRESHOLD {
249            self.handle
250                .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
251        } else {
252            self.handle
253                .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
254        }
255    }
256
257    /// Runs the provided function on an executor dedicated to blocking operations.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// use tokio::runtime::Runtime;
263    ///
264    /// # fn dox() {
265    /// // Create the runtime
266    /// let rt = Runtime::new().unwrap();
267    ///
268    /// // Spawn a blocking function onto the runtime
269    /// rt.spawn_blocking(|| {
270    ///     println!("now running on a worker thread");
271    /// });
272    /// # }
273    /// ```
274    #[track_caller]
275    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
276    where
277        F: FnOnce() -> R + Send + 'static,
278        R: Send + 'static,
279    {
280        self.handle.spawn_blocking(func)
281    }
282
283    /// Runs a future to completion on the Tokio runtime. This is the
284    /// runtime's entry point.
285    ///
286    /// This runs the given future on the current thread, blocking until it is
287    /// complete, and yielding its resolved result. Any tasks or timers
288    /// which the future spawns internally will be executed on the runtime.
289    ///
290    /// # Non-worker future
291    ///
292    /// Note that the future required by this function does not run as a
293    /// worker. The expectation is that other tasks are spawned by the future here.
294    /// Awaiting on other futures from the future provided here will not
295    /// perform as fast as those spawned as workers.
296    ///
297    /// # Multi thread scheduler
298    ///
299    /// When the multi thread scheduler is used this will allow futures
300    /// to run within the io driver and timer context of the overall runtime.
301    ///
302    /// Any spawned tasks will continue running after `block_on` returns.
303    ///
304    /// # Current thread scheduler
305    ///
306    /// When the current thread scheduler is enabled `block_on`
307    /// can be called concurrently from multiple threads. The first call
308    /// will take ownership of the io and timer drivers. This means
309    /// other threads which do not own the drivers will hook into that one.
310    /// When the first `block_on` completes, other threads will be able to
311    /// "steal" the driver to allow continued execution of their futures.
312    ///
313    /// Any spawned tasks will be suspended after `block_on` returns. Calling
314    /// `block_on` again will resume previously spawned tasks.
315    ///
316    /// # Panics
317    ///
318    /// This function panics if the provided future panics, or if called within an
319    /// asynchronous execution context.
320    ///
321    /// # Examples
322    ///
323    /// ```no_run
324    /// use tokio::runtime::Runtime;
325    ///
326    /// // Create the runtime
327    /// let rt  = Runtime::new().unwrap();
328    ///
329    /// // Execute the future, blocking the current thread until completion
330    /// rt.block_on(async {
331    ///     println!("hello");
332    /// });
333    /// ```
334    ///
335    /// [handle]: fn@Handle::block_on
336    #[track_caller]
337    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338        let fut_size = mem::size_of::<F>();
339        if fut_size > BOX_FUTURE_THRESHOLD {
340            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341        } else {
342            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343        }
344    }
345
346    #[track_caller]
347    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348        #[cfg(all(
349            tokio_unstable,
350            tokio_taskdump,
351            feature = "rt",
352            target_os = "linux",
353            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354        ))]
355        let future = super::task::trace::Trace::root(future);
356
357        #[cfg(all(tokio_unstable, feature = "tracing"))]
358        let future = crate::util::trace::task(
359            future,
360            "block_on",
361            _meta,
362            crate::runtime::task::Id::next().as_u64(),
363        );
364
365        let _enter = self.enter();
366
367        match &self.scheduler {
368            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369            #[cfg(feature = "rt-multi-thread")]
370            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
372            Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
373        }
374    }
375
376    /// Enters the runtime context.
377    ///
378    /// This allows you to construct types that must have an executor
379    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
380    /// also allow you to call methods such as [`tokio::spawn`].
381    ///
382    /// [`Sleep`]: struct@crate::time::Sleep
383    /// [`TcpStream`]: struct@crate::net::TcpStream
384    /// [`tokio::spawn`]: fn@crate::spawn
385    ///
386    /// # Example
387    ///
388    /// ```
389    /// use tokio::runtime::Runtime;
390    /// use tokio::task::JoinHandle;
391    ///
392    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
393    ///     // Had we not used `rt.enter` below, this would panic.
394    ///     tokio::spawn(async move {
395    ///         println!("{}", msg);
396    ///     })
397    /// }
398    ///
399    /// fn main() {
400    ///     let rt = Runtime::new().unwrap();
401    ///
402    ///     let s = "Hello World!".to_string();
403    ///
404    ///     // By entering the context, we tie `tokio::spawn` to this executor.
405    ///     let _guard = rt.enter();
406    ///     let handle = function_that_spawns(s);
407    ///
408    ///     // Wait for the task before we end the test.
409    ///     rt.block_on(handle).unwrap();
410    /// }
411    /// ```
412    pub fn enter(&self) -> EnterGuard<'_> {
413        self.handle.enter()
414    }
415
416    /// Shuts down the runtime, waiting for at most `duration` for all spawned
417    /// work to stop.
418    ///
419    /// See the [struct level documentation](Runtime#shutdown) for more details.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// use tokio::runtime::Runtime;
425    /// use tokio::task;
426    ///
427    /// use std::thread;
428    /// use std::time::Duration;
429    ///
430    /// fn main() {
431    ///    let runtime = Runtime::new().unwrap();
432    ///
433    ///    runtime.block_on(async move {
434    ///        task::spawn_blocking(move || {
435    ///            thread::sleep(Duration::from_secs(10_000));
436    ///        });
437    ///    });
438    ///
439    ///    runtime.shutdown_timeout(Duration::from_millis(100));
440    /// }
441    /// ```
442    pub fn shutdown_timeout(mut self, duration: Duration) {
443        // Wakeup and shutdown all the worker threads
444        self.handle.inner.shutdown();
445        self.blocking_pool.shutdown(Some(duration));
446    }
447
448    /// Shuts down the runtime, without waiting for any spawned work to stop.
449    ///
450    /// This can be useful if you want to drop a runtime from within another runtime.
451    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
452    /// to complete, which would normally not be permitted within an asynchronous context.
453    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
454    ///
455    /// Note however, that because we do not wait for any blocking tasks to complete, this
456    /// may result in a resource leak (in that any blocking tasks are still running until they
457    /// return.
458    ///
459    /// See the [struct level documentation](Runtime#shutdown) for more details.
460    ///
461    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
462    ///
463    /// ```
464    /// use tokio::runtime::Runtime;
465    ///
466    /// fn main() {
467    ///    let runtime = Runtime::new().unwrap();
468    ///
469    ///    runtime.block_on(async move {
470    ///        let inner_runtime = Runtime::new().unwrap();
471    ///        // ...
472    ///        inner_runtime.shutdown_background();
473    ///    });
474    /// }
475    /// ```
476    pub fn shutdown_background(self) {
477        self.shutdown_timeout(Duration::from_nanos(0));
478    }
479
480    /// Returns a view that lets you get information about how the runtime
481    /// is performing.
482    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
483        self.handle.metrics()
484    }
485}
486
487#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
488impl Drop for Runtime {
489    fn drop(&mut self) {
490        match &mut self.scheduler {
491            Scheduler::CurrentThread(current_thread) => {
492                // This ensures that tasks spawned on the current-thread
493                // runtime are dropped inside the runtime's context.
494                let _guard = context::try_set_current(&self.handle.inner);
495                current_thread.shutdown(&self.handle.inner);
496            }
497            #[cfg(feature = "rt-multi-thread")]
498            Scheduler::MultiThread(multi_thread) => {
499                // The threaded scheduler drops its tasks on its worker threads, which is
500                // already in the runtime's context.
501                multi_thread.shutdown(&self.handle.inner);
502            }
503            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
504            Scheduler::MultiThreadAlt(multi_thread) => {
505                // The threaded scheduler drops its tasks on its worker threads, which is
506                // already in the runtime's context.
507                multi_thread.shutdown(&self.handle.inner);
508            }
509        }
510    }
511}
512
513impl std::panic::UnwindSafe for Runtime {}
514
515impl std::panic::RefUnwindSafe for Runtime {}