tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37///     // build runtime
38///     let runtime = Builder::new_multi_thread()
39///         .worker_threads(4)
40///         .thread_name("my-custom-name")
41///         .thread_stack_size(3 * 1024 * 1024)
42///         .build()
43///         .unwrap();
44///
45///     // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49    /// Runtime type
50    kind: Kind,
51
52    /// Whether or not to enable the I/O driver
53    enable_io: bool,
54    nevents: usize,
55
56    /// Whether or not to enable the time driver
57    enable_time: bool,
58
59    /// Whether or not the clock should start paused.
60    start_paused: bool,
61
62    /// The number of worker threads, used by Runtime.
63    ///
64    /// Only used when not using the current-thread executor.
65    worker_threads: Option<usize>,
66
67    /// Cap on thread usage.
68    max_blocking_threads: usize,
69
70    /// Name fn used for threads spawned by the runtime.
71    pub(super) thread_name: ThreadNameFn,
72
73    /// Stack size used for threads spawned by the runtime.
74    pub(super) thread_stack_size: Option<usize>,
75
76    /// Callback to run after each thread starts.
77    pub(super) after_start: Option<Callback>,
78
79    /// To run before each worker thread stops
80    pub(super) before_stop: Option<Callback>,
81
82    /// To run before each worker thread is parked.
83    pub(super) before_park: Option<Callback>,
84
85    /// To run after each thread is unparked.
86    pub(super) after_unpark: Option<Callback>,
87
88    /// To run before each task is spawned.
89    pub(super) before_spawn: Option<TaskCallback>,
90
91    /// To run after each task is terminated.
92    pub(super) after_termination: Option<TaskCallback>,
93
94    /// Customizable keep alive timeout for `BlockingPool`
95    pub(super) keep_alive: Option<Duration>,
96
97    /// How many ticks before pulling a task from the global/remote queue?
98    ///
99    /// When `None`, the value is unspecified and behavior details are left to
100    /// the scheduler. Each scheduler flavor could choose to either pick its own
101    /// default value or use some other strategy to decide when to poll from the
102    /// global queue. For example, the multi-threaded scheduler uses a
103    /// self-tuning strategy based on mean task poll times.
104    pub(super) global_queue_interval: Option<u32>,
105
106    /// How many ticks before yielding to the driver for timer and I/O events?
107    pub(super) event_interval: u32,
108
109    pub(super) local_queue_capacity: usize,
110
111    /// When true, the multi-threade scheduler LIFO slot should not be used.
112    ///
113    /// This option should only be exposed as unstable.
114    pub(super) disable_lifo_slot: bool,
115
116    /// Specify a random number generator seed to provide deterministic results
117    pub(super) seed_generator: RngSeedGenerator,
118
119    /// When true, enables task poll count histogram instrumentation.
120    pub(super) metrics_poll_count_histogram_enable: bool,
121
122    /// Configures the task poll count histogram
123    pub(super) metrics_poll_count_histogram: HistogramBuilder,
124
125    #[cfg(tokio_unstable)]
126    pub(super) unhandled_panic: UnhandledPanic,
127}
128
129cfg_unstable! {
130    /// How the runtime should respond to unhandled panics.
131    ///
132    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
133    /// to configure the runtime behavior when a spawned task panics.
134    ///
135    /// See [`Builder::unhandled_panic`] for more details.
136    #[derive(Debug, Clone)]
137    #[non_exhaustive]
138    pub enum UnhandledPanic {
139        /// The runtime should ignore panics on spawned tasks.
140        ///
141        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
142        /// tasks continue running normally.
143        ///
144        /// This is the default behavior.
145        ///
146        /// # Examples
147        ///
148        /// ```
149        /// use tokio::runtime::{self, UnhandledPanic};
150        ///
151        /// # pub fn main() {
152        /// let rt = runtime::Builder::new_current_thread()
153        ///     .unhandled_panic(UnhandledPanic::Ignore)
154        ///     .build()
155        ///     .unwrap();
156        ///
157        /// let task1 = rt.spawn(async { panic!("boom"); });
158        /// let task2 = rt.spawn(async {
159        ///     // This task completes normally
160        ///     "done"
161        /// });
162        ///
163        /// rt.block_on(async {
164        ///     // The panic on the first task is forwarded to the `JoinHandle`
165        ///     assert!(task1.await.is_err());
166        ///
167        ///     // The second task completes normally
168        ///     assert!(task2.await.is_ok());
169        /// })
170        /// # }
171        /// ```
172        ///
173        /// [`JoinHandle`]: struct@crate::task::JoinHandle
174        Ignore,
175
176        /// The runtime should immediately shutdown if a spawned task panics.
177        ///
178        /// The runtime will immediately shutdown even if the panicked task's
179        /// [`JoinHandle`] is still available. All further spawned tasks will be
180        /// immediately dropped and call to [`Runtime::block_on`] will panic.
181        ///
182        /// # Examples
183        ///
184        /// ```should_panic
185        /// use tokio::runtime::{self, UnhandledPanic};
186        ///
187        /// # pub fn main() {
188        /// let rt = runtime::Builder::new_current_thread()
189        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
190        ///     .build()
191        ///     .unwrap();
192        ///
193        /// rt.spawn(async { panic!("boom"); });
194        /// rt.spawn(async {
195        ///     // This task never completes.
196        /// });
197        ///
198        /// rt.block_on(async {
199        ///     // Do some work
200        /// # loop { tokio::task::yield_now().await; }
201        /// })
202        /// # }
203        /// ```
204        ///
205        /// [`JoinHandle`]: struct@crate::task::JoinHandle
206        ShutdownRuntime,
207    }
208}
209
210pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
211
212#[derive(Clone, Copy)]
213pub(crate) enum Kind {
214    CurrentThread,
215    #[cfg(feature = "rt-multi-thread")]
216    MultiThread,
217    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
218    MultiThreadAlt,
219}
220
221impl Builder {
222    /// Returns a new builder with the current thread scheduler selected.
223    ///
224    /// Configuration methods can be chained on the return value.
225    ///
226    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
227    /// [`LocalSet`].
228    ///
229    /// [`LocalSet`]: crate::task::LocalSet
230    pub fn new_current_thread() -> Builder {
231        #[cfg(loom)]
232        const EVENT_INTERVAL: u32 = 4;
233        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
234        #[cfg(not(loom))]
235        const EVENT_INTERVAL: u32 = 61;
236
237        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
238    }
239
240    /// Returns a new builder with the multi thread scheduler selected.
241    ///
242    /// Configuration methods can be chained on the return value.
243    #[cfg(feature = "rt-multi-thread")]
244    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
245    pub fn new_multi_thread() -> Builder {
246        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
247        Builder::new(Kind::MultiThread, 61)
248    }
249
250    cfg_unstable! {
251        /// Returns a new builder with the alternate multi thread scheduler
252        /// selected.
253        ///
254        /// The alternate multi threaded scheduler is an in-progress
255        /// candidate to replace the existing multi threaded scheduler. It
256        /// currently does not scale as well to 16+ processors.
257        ///
258        /// This runtime flavor is currently **not considered production
259        /// ready**.
260        ///
261        /// Configuration methods can be chained on the return value.
262        #[cfg(feature = "rt-multi-thread")]
263        #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
264        pub fn new_multi_thread_alt() -> Builder {
265            // The number `61` is fairly arbitrary. I believe this value was copied from golang.
266            Builder::new(Kind::MultiThreadAlt, 61)
267        }
268    }
269
270    /// Returns a new runtime builder initialized with default configuration
271    /// values.
272    ///
273    /// Configuration methods can be chained on the return value.
274    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
275        Builder {
276            kind,
277
278            // I/O defaults to "off"
279            enable_io: false,
280            nevents: 1024,
281
282            // Time defaults to "off"
283            enable_time: false,
284
285            // The clock starts not-paused
286            start_paused: false,
287
288            // Read from environment variable first in multi-threaded mode.
289            // Default to lazy auto-detection (one thread per CPU core)
290            worker_threads: None,
291
292            max_blocking_threads: 512,
293
294            // Default thread name
295            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
296
297            // Do not set a stack size by default
298            thread_stack_size: None,
299
300            // No worker thread callbacks
301            after_start: None,
302            before_stop: None,
303            before_park: None,
304            after_unpark: None,
305
306            before_spawn: None,
307            after_termination: None,
308
309            keep_alive: None,
310
311            // Defaults for these values depend on the scheduler kind, so we get them
312            // as parameters.
313            global_queue_interval: None,
314            event_interval,
315
316            #[cfg(not(loom))]
317            local_queue_capacity: 256,
318
319            #[cfg(loom)]
320            local_queue_capacity: 4,
321
322            seed_generator: RngSeedGenerator::new(RngSeed::new()),
323
324            #[cfg(tokio_unstable)]
325            unhandled_panic: UnhandledPanic::Ignore,
326
327            metrics_poll_count_histogram_enable: false,
328
329            metrics_poll_count_histogram: HistogramBuilder::default(),
330
331            disable_lifo_slot: false,
332        }
333    }
334
335    /// Enables both I/O and time drivers.
336    ///
337    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
338    /// individually. If additional components are added to Tokio in the future,
339    /// `enable_all` will include these future components.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use tokio::runtime;
345    ///
346    /// let rt = runtime::Builder::new_multi_thread()
347    ///     .enable_all()
348    ///     .build()
349    ///     .unwrap();
350    /// ```
351    pub fn enable_all(&mut self) -> &mut Self {
352        #[cfg(any(
353            feature = "net",
354            all(unix, feature = "process"),
355            all(unix, feature = "signal")
356        ))]
357        self.enable_io();
358        #[cfg(feature = "time")]
359        self.enable_time();
360
361        self
362    }
363
364    /// Sets the number of worker threads the `Runtime` will use.
365    ///
366    /// This can be any number above 0 though it is advised to keep this value
367    /// on the smaller side.
368    ///
369    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
370    ///
371    /// # Default
372    ///
373    /// The default value is the number of cores available to the system.
374    ///
375    /// When using the `current_thread` runtime this method has no effect.
376    ///
377    /// # Examples
378    ///
379    /// ## Multi threaded runtime with 4 threads
380    ///
381    /// ```
382    /// use tokio::runtime;
383    ///
384    /// // This will spawn a work-stealing runtime with 4 worker threads.
385    /// let rt = runtime::Builder::new_multi_thread()
386    ///     .worker_threads(4)
387    ///     .build()
388    ///     .unwrap();
389    ///
390    /// rt.spawn(async move {});
391    /// ```
392    ///
393    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
394    ///
395    /// ```
396    /// use tokio::runtime;
397    ///
398    /// // Create a runtime that _must_ be driven from a call
399    /// // to `Runtime::block_on`.
400    /// let rt = runtime::Builder::new_current_thread()
401    ///     .build()
402    ///     .unwrap();
403    ///
404    /// // This will run the runtime and future on the current thread
405    /// rt.block_on(async move {});
406    /// ```
407    ///
408    /// # Panics
409    ///
410    /// This will panic if `val` is not larger than `0`.
411    #[track_caller]
412    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
413        assert!(val > 0, "Worker threads cannot be set to 0");
414        self.worker_threads = Some(val);
415        self
416    }
417
418    /// Specifies the limit for additional threads spawned by the Runtime.
419    ///
420    /// These threads are used for blocking operations like tasks spawned
421    /// through [`spawn_blocking`], this includes but is not limited to:
422    /// - [`fs`] operations
423    /// - dns resolution through [`ToSocketAddrs`]
424    /// - writing to [`Stdout`] or [`Stderr`]
425    /// - reading from [`Stdin`]
426    ///
427    /// Unlike the [`worker_threads`], they are not always active and will exit
428    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
429    ///
430    /// It's recommended to not set this limit too low in order to avoid hanging on operations
431    /// requiring [`spawn_blocking`].
432    ///
433    /// The default value is 512.
434    ///
435    /// # Panics
436    ///
437    /// This will panic if `val` is not larger than `0`.
438    ///
439    /// # Upgrading from 0.x
440    ///
441    /// In old versions `max_threads` limited both blocking and worker threads, but the
442    /// current `max_blocking_threads` does not include async worker threads in the count.
443    ///
444    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
445    /// [`fs`]: mod@crate::fs
446    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
447    /// [`Stdout`]: struct@crate::io::Stdout
448    /// [`Stdin`]: struct@crate::io::Stdin
449    /// [`Stderr`]: struct@crate::io::Stderr
450    /// [`worker_threads`]: Self::worker_threads
451    /// [`thread_keep_alive`]: Self::thread_keep_alive
452    #[track_caller]
453    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
454    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
455        assert!(val > 0, "Max blocking threads cannot be set to 0");
456        self.max_blocking_threads = val;
457        self
458    }
459
460    /// Sets name of threads spawned by the `Runtime`'s thread pool.
461    ///
462    /// The default name is "tokio-runtime-worker".
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// # use tokio::runtime;
468    ///
469    /// # pub fn main() {
470    /// let rt = runtime::Builder::new_multi_thread()
471    ///     .thread_name("my-pool")
472    ///     .build();
473    /// # }
474    /// ```
475    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
476        let val = val.into();
477        self.thread_name = std::sync::Arc::new(move || val.clone());
478        self
479    }
480
481    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
482    ///
483    /// The default name fn is `|| "tokio-runtime-worker".into()`.
484    ///
485    /// # Examples
486    ///
487    /// ```
488    /// # use tokio::runtime;
489    /// # use std::sync::atomic::{AtomicUsize, Ordering};
490    /// # pub fn main() {
491    /// let rt = runtime::Builder::new_multi_thread()
492    ///     .thread_name_fn(|| {
493    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
494    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
495    ///        format!("my-pool-{}", id)
496    ///     })
497    ///     .build();
498    /// # }
499    /// ```
500    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
501    where
502        F: Fn() -> String + Send + Sync + 'static,
503    {
504        self.thread_name = std::sync::Arc::new(f);
505        self
506    }
507
508    /// Sets the stack size (in bytes) for worker threads.
509    ///
510    /// The actual stack size may be greater than this value if the platform
511    /// specifies minimal stack size.
512    ///
513    /// The default stack size for spawned threads is 2 MiB, though this
514    /// particular stack size is subject to change in the future.
515    ///
516    /// # Examples
517    ///
518    /// ```
519    /// # use tokio::runtime;
520    ///
521    /// # pub fn main() {
522    /// let rt = runtime::Builder::new_multi_thread()
523    ///     .thread_stack_size(32 * 1024)
524    ///     .build();
525    /// # }
526    /// ```
527    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
528        self.thread_stack_size = Some(val);
529        self
530    }
531
532    /// Executes function `f` after each thread is started but before it starts
533    /// doing work.
534    ///
535    /// This is intended for bookkeeping and monitoring use cases.
536    ///
537    /// # Examples
538    ///
539    /// ```
540    /// # use tokio::runtime;
541    /// # pub fn main() {
542    /// let runtime = runtime::Builder::new_multi_thread()
543    ///     .on_thread_start(|| {
544    ///         println!("thread started");
545    ///     })
546    ///     .build();
547    /// # }
548    /// ```
549    #[cfg(not(loom))]
550    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
551    where
552        F: Fn() + Send + Sync + 'static,
553    {
554        self.after_start = Some(std::sync::Arc::new(f));
555        self
556    }
557
558    /// Executes function `f` before each thread stops.
559    ///
560    /// This is intended for bookkeeping and monitoring use cases.
561    ///
562    /// # Examples
563    ///
564    /// ```
565    /// # use tokio::runtime;
566    /// # pub fn main() {
567    /// let runtime = runtime::Builder::new_multi_thread()
568    ///     .on_thread_stop(|| {
569    ///         println!("thread stopping");
570    ///     })
571    ///     .build();
572    /// # }
573    /// ```
574    #[cfg(not(loom))]
575    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
576    where
577        F: Fn() + Send + Sync + 'static,
578    {
579        self.before_stop = Some(std::sync::Arc::new(f));
580        self
581    }
582
583    /// Executes function `f` just before a thread is parked (goes idle).
584    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
585    /// can be called, and may result in this thread being unparked immediately.
586    ///
587    /// This can be used to start work only when the executor is idle, or for bookkeeping
588    /// and monitoring purposes.
589    ///
590    /// Note: There can only be one park callback for a runtime; calling this function
591    /// more than once replaces the last callback defined, rather than adding to it.
592    ///
593    /// # Examples
594    ///
595    /// ## Multithreaded executor
596    /// ```
597    /// # use std::sync::Arc;
598    /// # use std::sync::atomic::{AtomicBool, Ordering};
599    /// # use tokio::runtime;
600    /// # use tokio::sync::Barrier;
601    /// # pub fn main() {
602    /// let once = AtomicBool::new(true);
603    /// let barrier = Arc::new(Barrier::new(2));
604    ///
605    /// let runtime = runtime::Builder::new_multi_thread()
606    ///     .worker_threads(1)
607    ///     .on_thread_park({
608    ///         let barrier = barrier.clone();
609    ///         move || {
610    ///             let barrier = barrier.clone();
611    ///             if once.swap(false, Ordering::Relaxed) {
612    ///                 tokio::spawn(async move { barrier.wait().await; });
613    ///            }
614    ///         }
615    ///     })
616    ///     .build()
617    ///     .unwrap();
618    ///
619    /// runtime.block_on(async {
620    ///    barrier.wait().await;
621    /// })
622    /// # }
623    /// ```
624    /// ## Current thread executor
625    /// ```
626    /// # use std::sync::Arc;
627    /// # use std::sync::atomic::{AtomicBool, Ordering};
628    /// # use tokio::runtime;
629    /// # use tokio::sync::Barrier;
630    /// # pub fn main() {
631    /// let once = AtomicBool::new(true);
632    /// let barrier = Arc::new(Barrier::new(2));
633    ///
634    /// let runtime = runtime::Builder::new_current_thread()
635    ///     .on_thread_park({
636    ///         let barrier = barrier.clone();
637    ///         move || {
638    ///             let barrier = barrier.clone();
639    ///             if once.swap(false, Ordering::Relaxed) {
640    ///                 tokio::spawn(async move { barrier.wait().await; });
641    ///            }
642    ///         }
643    ///     })
644    ///     .build()
645    ///     .unwrap();
646    ///
647    /// runtime.block_on(async {
648    ///    barrier.wait().await;
649    /// })
650    /// # }
651    /// ```
652    #[cfg(not(loom))]
653    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
654    where
655        F: Fn() + Send + Sync + 'static,
656    {
657        self.before_park = Some(std::sync::Arc::new(f));
658        self
659    }
660
661    /// Executes function `f` just after a thread unparks (starts executing tasks).
662    ///
663    /// This is intended for bookkeeping and monitoring use cases; note that work
664    /// in this callback will increase latencies when the application has allowed one or
665    /// more runtime threads to go idle.
666    ///
667    /// Note: There can only be one unpark callback for a runtime; calling this function
668    /// more than once replaces the last callback defined, rather than adding to it.
669    ///
670    /// # Examples
671    ///
672    /// ```
673    /// # use tokio::runtime;
674    /// # pub fn main() {
675    /// let runtime = runtime::Builder::new_multi_thread()
676    ///     .on_thread_unpark(|| {
677    ///         println!("thread unparking");
678    ///     })
679    ///     .build();
680    ///
681    /// runtime.unwrap().block_on(async {
682    ///    tokio::task::yield_now().await;
683    ///    println!("Hello from Tokio!");
684    /// })
685    /// # }
686    /// ```
687    #[cfg(not(loom))]
688    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
689    where
690        F: Fn() + Send + Sync + 'static,
691    {
692        self.after_unpark = Some(std::sync::Arc::new(f));
693        self
694    }
695
696    /// Executes function `f` just before a task is spawned.
697    ///
698    /// `f` is called within the Tokio context, so functions like
699    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
700    /// invoked immediately.
701    ///
702    /// This can be used for bookkeeping or monitoring purposes.
703    ///
704    /// Note: There can only be one spawn callback for a runtime; calling this function more
705    /// than once replaces the last callback defined, rather than adding to it.
706    ///
707    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
708    ///
709    /// **Note**: This is an [unstable API][unstable]. The public API of this type  
710    /// may break in 1.x releases. See [the documentation on unstable  
711    /// features][unstable] for details.  
712    ///  
713    /// [unstable]: crate#unstable-features  
714    ///
715    /// # Examples
716    ///
717    /// ```
718    /// # use tokio::runtime;
719    /// # pub fn main() {
720    /// let runtime = runtime::Builder::new_current_thread()
721    ///     .on_task_spawn(|_| {
722    ///         println!("spawning task");
723    ///     })
724    ///     .build()
725    ///     .unwrap();
726    ///
727    /// runtime.block_on(async {
728    ///     tokio::task::spawn(std::future::ready(()));
729    ///
730    ///     for _ in 0..64 {
731    ///         tokio::task::yield_now().await;
732    ///     }
733    /// })
734    /// # }
735    /// ```
736    #[cfg(all(not(loom), tokio_unstable))]
737    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
738    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
739    where
740        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
741    {
742        self.before_spawn = Some(std::sync::Arc::new(f));
743        self
744    }
745
746    /// Executes function `f` just after a task is terminated.
747    ///
748    /// `f` is called within the Tokio context, so functions like
749    /// [`tokio::spawn`](crate::spawn) can be called.
750    ///
751    /// This can be used for bookkeeping or monitoring purposes.
752    ///
753    /// Note: There can only be one task termination callback for a runtime; calling this
754    /// function more than once replaces the last callback defined, rather than adding to it.
755    ///
756    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
757    ///
758    /// **Note**: This is an [unstable API][unstable]. The public API of this type  
759    /// may break in 1.x releases. See [the documentation on unstable  
760    /// features][unstable] for details.  
761    ///  
762    /// [unstable]: crate#unstable-features  
763    ///
764    /// # Examples
765    ///
766    /// ```
767    /// # use tokio::runtime;
768    /// # pub fn main() {
769    /// let runtime = runtime::Builder::new_current_thread()
770    ///     .on_task_terminate(|_| {
771    ///         println!("killing task");
772    ///     })
773    ///     .build()
774    ///     .unwrap();
775    ///
776    /// runtime.block_on(async {
777    ///     tokio::task::spawn(std::future::ready(()));
778    ///
779    ///     for _ in 0..64 {
780    ///         tokio::task::yield_now().await;
781    ///     }
782    /// })
783    /// # }
784    /// ```
785    #[cfg(all(not(loom), tokio_unstable))]
786    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
787    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
788    where
789        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
790    {
791        self.after_termination = Some(std::sync::Arc::new(f));
792        self
793    }
794
795    /// Creates the configured `Runtime`.
796    ///
797    /// The returned `Runtime` instance is ready to spawn tasks.
798    ///
799    /// # Examples
800    ///
801    /// ```
802    /// use tokio::runtime::Builder;
803    ///
804    /// let rt  = Builder::new_multi_thread().build().unwrap();
805    ///
806    /// rt.block_on(async {
807    ///     println!("Hello from the Tokio runtime");
808    /// });
809    /// ```
810    pub fn build(&mut self) -> io::Result<Runtime> {
811        match &self.kind {
812            Kind::CurrentThread => self.build_current_thread_runtime(),
813            #[cfg(feature = "rt-multi-thread")]
814            Kind::MultiThread => self.build_threaded_runtime(),
815            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
816            Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
817        }
818    }
819
820    /// Creates the configured `LocalRuntime`.
821    ///
822    /// The returned `LocalRuntime` instance is ready to spawn tasks.
823    ///
824    /// # Panics
825    /// This will panic if `current_thread` is not the selected runtime flavor.
826    /// All other runtime flavors are unsupported by [`LocalRuntime`].
827    ///
828    /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
829    ///
830    /// # Examples
831    ///
832    /// ```
833    /// use tokio::runtime::Builder;
834    ///
835    /// let rt  = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
836    ///
837    /// rt.block_on(async {
838    ///     println!("Hello from the Tokio runtime");
839    /// });
840    /// ```
841    #[allow(unused_variables, unreachable_patterns)]
842    #[cfg(tokio_unstable)]
843    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
844    pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
845        match &self.kind {
846            Kind::CurrentThread => self.build_current_thread_local_runtime(),
847            _ => panic!("Only current_thread is supported when building a local runtime"),
848        }
849    }
850
851    fn get_cfg(&self, workers: usize) -> driver::Cfg {
852        driver::Cfg {
853            enable_pause_time: match self.kind {
854                Kind::CurrentThread => true,
855                #[cfg(feature = "rt-multi-thread")]
856                Kind::MultiThread => false,
857                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
858                Kind::MultiThreadAlt => false,
859            },
860            enable_io: self.enable_io,
861            enable_time: self.enable_time,
862            start_paused: self.start_paused,
863            nevents: self.nevents,
864            workers,
865        }
866    }
867
868    /// Sets a custom timeout for a thread in the blocking pool.
869    ///
870    /// By default, the timeout for a thread is set to 10 seconds. This can
871    /// be overridden using `.thread_keep_alive()`.
872    ///
873    /// # Example
874    ///
875    /// ```
876    /// # use tokio::runtime;
877    /// # use std::time::Duration;
878    /// # pub fn main() {
879    /// let rt = runtime::Builder::new_multi_thread()
880    ///     .thread_keep_alive(Duration::from_millis(100))
881    ///     .build();
882    /// # }
883    /// ```
884    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
885        self.keep_alive = Some(duration);
886        self
887    }
888
889    /// Sets the number of scheduler ticks after which the scheduler will poll the global
890    /// task queue.
891    ///
892    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
893    ///
894    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
895    /// [the module documentation] for the default behavior of the multi-thread scheduler.
896    ///
897    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
898    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
899    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
900    /// getting started on new work, especially if tasks frequently yield rather than complete
901    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
902    /// is a good choice when most tasks quickly complete polling.
903    ///
904    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
905    ///
906    /// # Panics
907    ///
908    /// This function will panic if 0 is passed as an argument.
909    ///
910    /// # Examples
911    ///
912    /// ```
913    /// # use tokio::runtime;
914    /// # pub fn main() {
915    /// let rt = runtime::Builder::new_multi_thread()
916    ///     .global_queue_interval(31)
917    ///     .build();
918    /// # }
919    /// ```
920    #[track_caller]
921    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
922        assert!(val > 0, "global_queue_interval must be greater than 0");
923        self.global_queue_interval = Some(val);
924        self
925    }
926
927    /// Sets the number of scheduler ticks after which the scheduler will poll for
928    /// external events (timers, I/O, and so on).
929    ///
930    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
931    ///
932    /// By default, the event interval is `61` for all scheduler types.
933    ///
934    /// Setting the event interval determines the effective "priority" of delivering
935    /// these external events (which may wake up additional tasks), compared to
936    /// executing tasks that are currently ready to run. A smaller value is useful
937    /// when tasks frequently spend a long time in polling, or frequently yield,
938    /// which can result in overly long delays picking up I/O events. Conversely,
939    /// picking up new events requires extra synchronization and syscall overhead,
940    /// so if tasks generally complete their polling quickly, a higher event interval
941    /// will minimize that overhead while still keeping the scheduler responsive to
942    /// events.
943    ///
944    /// # Examples
945    ///
946    /// ```
947    /// # use tokio::runtime;
948    /// # pub fn main() {
949    /// let rt = runtime::Builder::new_multi_thread()
950    ///     .event_interval(31)
951    ///     .build();
952    /// # }
953    /// ```
954    pub fn event_interval(&mut self, val: u32) -> &mut Self {
955        self.event_interval = val;
956        self
957    }
958
959    cfg_unstable! {
960        /// Configure how the runtime responds to an unhandled panic on a
961        /// spawned task.
962        ///
963        /// By default, an unhandled panic (i.e. a panic not caught by
964        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
965        /// execution. The panic's error value is forwarded to the task's
966        /// [`JoinHandle`] and all other spawned tasks continue running.
967        ///
968        /// The `unhandled_panic` option enables configuring this behavior.
969        ///
970        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
971        ///   spawned tasks have no impact on the runtime's execution.
972        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
973        ///   shutdown immediately when a spawned task panics even if that
974        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
975        ///   will immediately terminate and further calls to
976        ///   [`Runtime::block_on`] will panic.
977        ///
978        /// # Panics
979        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
980        /// on a runtime other than the current thread runtime.
981        ///
982        /// # Unstable
983        ///
984        /// This option is currently unstable and its implementation is
985        /// incomplete. The API may change or be removed in the future. See
986        /// issue [tokio-rs/tokio#4516] for more details.
987        ///
988        /// # Examples
989        ///
990        /// The following demonstrates a runtime configured to shutdown on
991        /// panic. The first spawned task panics and results in the runtime
992        /// shutting down. The second spawned task never has a chance to
993        /// execute. The call to `block_on` will panic due to the runtime being
994        /// forcibly shutdown.
995        ///
996        /// ```should_panic
997        /// use tokio::runtime::{self, UnhandledPanic};
998        ///
999        /// # pub fn main() {
1000        /// let rt = runtime::Builder::new_current_thread()
1001        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1002        ///     .build()
1003        ///     .unwrap();
1004        ///
1005        /// rt.spawn(async { panic!("boom"); });
1006        /// rt.spawn(async {
1007        ///     // This task never completes.
1008        /// });
1009        ///
1010        /// rt.block_on(async {
1011        ///     // Do some work
1012        /// # loop { tokio::task::yield_now().await; }
1013        /// })
1014        /// # }
1015        /// ```
1016        ///
1017        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1018        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1019        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1020            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1021                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1022            }
1023
1024            self.unhandled_panic = behavior;
1025            self
1026        }
1027
1028        /// Disables the LIFO task scheduler heuristic.
1029        ///
1030        /// The multi-threaded scheduler includes a heuristic for optimizing
1031        /// message-passing patterns. This heuristic results in the **last**
1032        /// scheduled task being polled first.
1033        ///
1034        /// To implement this heuristic, each worker thread has a slot which
1035        /// holds the task that should be polled next. However, this slot cannot
1036        /// be stolen by other worker threads, which can result in lower total
1037        /// throughput when tasks tend to have longer poll times.
1038        ///
1039        /// This configuration option will disable this heuristic resulting in
1040        /// all scheduled tasks being pushed into the worker-local queue, which
1041        /// is stealable.
1042        ///
1043        /// Consider trying this option when the task "scheduled" time is high
1044        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1045        /// collect this data.
1046        ///
1047        /// # Unstable
1048        ///
1049        /// This configuration option is considered a workaround for the LIFO
1050        /// slot not being stealable. When the slot becomes stealable, we will
1051        /// revisit whether or not this option is necessary. See
1052        /// issue [tokio-rs/tokio#4941].
1053        ///
1054        /// # Examples
1055        ///
1056        /// ```
1057        /// use tokio::runtime;
1058        ///
1059        /// let rt = runtime::Builder::new_multi_thread()
1060        ///     .disable_lifo_slot()
1061        ///     .build()
1062        ///     .unwrap();
1063        /// ```
1064        ///
1065        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1066        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1067        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1068            self.disable_lifo_slot = true;
1069            self
1070        }
1071
1072        /// Specifies the random number generation seed to use within all
1073        /// threads associated with the runtime being built.
1074        ///
1075        /// This option is intended to make certain parts of the runtime
1076        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1077        /// [`tokio::select!`] it will ensure that the order that branches are
1078        /// polled is deterministic.
1079        ///
1080        /// In addition to the code specifying `rng_seed` and interacting with
1081        /// the runtime, the internals of Tokio and the Rust compiler may affect
1082        /// the sequences of random numbers. In order to ensure repeatable
1083        /// results, the version of Tokio, the versions of all other
1084        /// dependencies that interact with Tokio, and the Rust compiler version
1085        /// should also all remain constant.
1086        ///
1087        /// # Examples
1088        ///
1089        /// ```
1090        /// # use tokio::runtime::{self, RngSeed};
1091        /// # pub fn main() {
1092        /// let seed = RngSeed::from_bytes(b"place your seed here");
1093        /// let rt = runtime::Builder::new_current_thread()
1094        ///     .rng_seed(seed)
1095        ///     .build();
1096        /// # }
1097        /// ```
1098        ///
1099        /// [`tokio::select!`]: crate::select
1100        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1101            self.seed_generator = RngSeedGenerator::new(seed);
1102            self
1103        }
1104    }
1105
1106    cfg_unstable_metrics! {
1107        /// Enables tracking the distribution of task poll times.
1108        ///
1109        /// Task poll times are not instrumented by default as doing so requires
1110        /// calling [`Instant::now()`] twice per task poll, which could add
1111        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1112        /// metrics data.
1113        ///
1114        /// The histogram uses fixed bucket sizes. In other words, the histogram
1115        /// buckets are not dynamic based on input values. Use the
1116        /// `metrics_poll_time_histogram` builder methods to configure the
1117        /// histogram details.
1118        ///
1119        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1120        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1121        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1122        /// to select [`LogHistogram`] instead.
1123        ///
1124        /// # Examples
1125        ///
1126        /// ```
1127        /// use tokio::runtime;
1128        ///
1129        /// let rt = runtime::Builder::new_multi_thread()
1130        ///     .enable_metrics_poll_time_histogram()
1131        ///     .build()
1132        ///     .unwrap();
1133        /// # // Test default values here
1134        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1135        /// # let m = rt.handle().metrics();
1136        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1137        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1138        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1139        /// ```
1140        ///
1141        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1142        /// [`Instant::now()`]: std::time::Instant::now
1143        /// [`LogHistogram`]: crate::runtime::LogHistogram
1144        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1145        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1146            self.metrics_poll_count_histogram_enable = true;
1147            self
1148        }
1149
1150        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1151        ///
1152        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1153        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1154        #[doc(hidden)]
1155        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1156            self.enable_metrics_poll_time_histogram()
1157        }
1158
1159        /// Sets the histogram scale for tracking the distribution of task poll
1160        /// times.
1161        ///
1162        /// Tracking the distribution of task poll times can be done using a
1163        /// linear or log scale. When using linear scale, each histogram bucket
1164        /// will represent the same range of poll times. When using log scale,
1165        /// each histogram bucket will cover a range twice as big as the
1166        /// previous bucket.
1167        ///
1168        /// **Default:** linear scale.
1169        ///
1170        /// # Examples
1171        ///
1172        /// ```
1173        /// use tokio::runtime::{self, HistogramScale};
1174        ///
1175        /// # #[allow(deprecated)]
1176        /// let rt = runtime::Builder::new_multi_thread()
1177        ///     .enable_metrics_poll_time_histogram()
1178        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1179        ///     .build()
1180        ///     .unwrap();
1181        /// ```
1182        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1183        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1184            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1185            self
1186        }
1187
1188        /// Configure the histogram for tracking poll times
1189        ///
1190        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1191        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1192        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1193        ///
1194        /// # Examples
1195        /// Configure a [`LogHistogram`] with [default configuration]:
1196        /// ```
1197        /// use tokio::runtime;
1198        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1199        ///
1200        /// let rt = runtime::Builder::new_multi_thread()
1201        ///     .enable_metrics_poll_time_histogram()
1202        ///     .metrics_poll_time_histogram_configuration(
1203        ///         HistogramConfiguration::log(LogHistogram::default())
1204        ///     )
1205        ///     .build()
1206        ///     .unwrap();
1207        /// ```
1208        ///
1209        /// Configure a linear histogram with 100 buckets, each 10μs wide
1210        /// ```
1211        /// use tokio::runtime;
1212        /// use std::time::Duration;
1213        /// use tokio::runtime::HistogramConfiguration;
1214        ///
1215        /// let rt = runtime::Builder::new_multi_thread()
1216        ///     .enable_metrics_poll_time_histogram()
1217        ///     .metrics_poll_time_histogram_configuration(
1218        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1219        ///     )
1220        ///     .build()
1221        ///     .unwrap();
1222        /// ```
1223        ///
1224        /// Configure a [`LogHistogram`] with the following settings:
1225        /// - Measure times from 100ns to 120s
1226        /// - Max error of 0.1
1227        /// - No more than 1024 buckets
1228        /// ```
1229        /// use std::time::Duration;
1230        /// use tokio::runtime;
1231        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1232        ///
1233        /// let rt = runtime::Builder::new_multi_thread()
1234        ///     .enable_metrics_poll_time_histogram()
1235        ///     .metrics_poll_time_histogram_configuration(
1236        ///         HistogramConfiguration::log(LogHistogram::builder()
1237        ///             .max_value(Duration::from_secs(120))
1238        ///             .min_value(Duration::from_nanos(100))
1239        ///             .max_error(0.1)
1240        ///             .max_buckets(1024)
1241        ///             .expect("configuration uses 488 buckets")
1242        ///         )
1243        ///     )
1244        ///     .build()
1245        ///     .unwrap();
1246        /// ```
1247        ///
1248        /// [`LogHistogram`]: crate::runtime::LogHistogram
1249        /// [default configuration]: crate::runtime::LogHistogramBuilder
1250        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1251            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1252            self
1253        }
1254
1255        /// Sets the histogram resolution for tracking the distribution of task
1256        /// poll times.
1257        ///
1258        /// The resolution is the histogram's first bucket's range. When using a
1259        /// linear histogram scale, each bucket will cover the same range. When
1260        /// using a log scale, each bucket will cover a range twice as big as
1261        /// the previous bucket. In the log case, the resolution represents the
1262        /// smallest bucket range.
1263        ///
1264        /// Note that, when using log scale, the resolution is rounded up to the
1265        /// nearest power of 2 in nanoseconds.
1266        ///
1267        /// **Default:** 100 microseconds.
1268        ///
1269        /// # Examples
1270        ///
1271        /// ```
1272        /// use tokio::runtime;
1273        /// use std::time::Duration;
1274        ///
1275        /// # #[allow(deprecated)]
1276        /// let rt = runtime::Builder::new_multi_thread()
1277        ///     .enable_metrics_poll_time_histogram()
1278        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1279        ///     .build()
1280        ///     .unwrap();
1281        /// ```
1282        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1283        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1284            assert!(resolution > Duration::from_secs(0));
1285            // Sanity check the argument and also make the cast below safe.
1286            assert!(resolution <= Duration::from_secs(1));
1287
1288            let resolution = resolution.as_nanos() as u64;
1289
1290            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1291            self
1292        }
1293
1294        /// Sets the number of buckets for the histogram tracking the
1295        /// distribution of task poll times.
1296        ///
1297        /// The last bucket tracks all greater values that fall out of other
1298        /// ranges. So, configuring the histogram using a linear scale,
1299        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1300        /// polls that take more than 450ms to complete.
1301        ///
1302        /// **Default:** 10
1303        ///
1304        /// # Examples
1305        ///
1306        /// ```
1307        /// use tokio::runtime;
1308        ///
1309        /// # #[allow(deprecated)]
1310        /// let rt = runtime::Builder::new_multi_thread()
1311        ///     .enable_metrics_poll_time_histogram()
1312        ///     .metrics_poll_count_histogram_buckets(15)
1313        ///     .build()
1314        ///     .unwrap();
1315        /// ```
1316        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1317        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1318            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1319            self
1320        }
1321    }
1322
1323    cfg_loom! {
1324        pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1325            assert!(value.is_power_of_two());
1326            self.local_queue_capacity = value;
1327            self
1328        }
1329    }
1330
1331    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1332        use crate::runtime::runtime::Scheduler;
1333
1334        let (scheduler, handle, blocking_pool) =
1335            self.build_current_thread_runtime_components(None)?;
1336
1337        Ok(Runtime::from_parts(
1338            Scheduler::CurrentThread(scheduler),
1339            handle,
1340            blocking_pool,
1341        ))
1342    }
1343
1344    #[cfg(tokio_unstable)]
1345    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1346        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1347
1348        let tid = std::thread::current().id();
1349
1350        let (scheduler, handle, blocking_pool) =
1351            self.build_current_thread_runtime_components(Some(tid))?;
1352
1353        Ok(LocalRuntime::from_parts(
1354            LocalRuntimeScheduler::CurrentThread(scheduler),
1355            handle,
1356            blocking_pool,
1357        ))
1358    }
1359
1360    fn build_current_thread_runtime_components(
1361        &mut self,
1362        local_tid: Option<ThreadId>,
1363    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1364        use crate::runtime::scheduler;
1365        use crate::runtime::Config;
1366
1367        let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1368
1369        // Blocking pool
1370        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1371        let blocking_spawner = blocking_pool.spawner().clone();
1372
1373        // Generate a rng seed for this runtime.
1374        let seed_generator_1 = self.seed_generator.next_generator();
1375        let seed_generator_2 = self.seed_generator.next_generator();
1376
1377        // And now put a single-threaded scheduler on top of the timer. When
1378        // there are no futures ready to do something, it'll let the timer or
1379        // the reactor to generate some new stimuli for the futures to continue
1380        // in their life.
1381        let (scheduler, handle) = CurrentThread::new(
1382            driver,
1383            driver_handle,
1384            blocking_spawner,
1385            seed_generator_2,
1386            Config {
1387                before_park: self.before_park.clone(),
1388                after_unpark: self.after_unpark.clone(),
1389                before_spawn: self.before_spawn.clone(),
1390                after_termination: self.after_termination.clone(),
1391                global_queue_interval: self.global_queue_interval,
1392                event_interval: self.event_interval,
1393                local_queue_capacity: self.local_queue_capacity,
1394                #[cfg(tokio_unstable)]
1395                unhandled_panic: self.unhandled_panic.clone(),
1396                disable_lifo_slot: self.disable_lifo_slot,
1397                seed_generator: seed_generator_1,
1398                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1399            },
1400            local_tid,
1401        );
1402
1403        let handle = Handle {
1404            inner: scheduler::Handle::CurrentThread(handle),
1405        };
1406
1407        Ok((scheduler, handle, blocking_pool))
1408    }
1409
1410    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1411        if self.metrics_poll_count_histogram_enable {
1412            Some(self.metrics_poll_count_histogram.clone())
1413        } else {
1414            None
1415        }
1416    }
1417}
1418
1419cfg_io_driver! {
1420    impl Builder {
1421        /// Enables the I/O driver.
1422        ///
1423        /// Doing this enables using net, process, signal, and some I/O types on
1424        /// the runtime.
1425        ///
1426        /// # Examples
1427        ///
1428        /// ```
1429        /// use tokio::runtime;
1430        ///
1431        /// let rt = runtime::Builder::new_multi_thread()
1432        ///     .enable_io()
1433        ///     .build()
1434        ///     .unwrap();
1435        /// ```
1436        pub fn enable_io(&mut self) -> &mut Self {
1437            self.enable_io = true;
1438            self
1439        }
1440
1441        /// Enables the I/O driver and configures the max number of events to be
1442        /// processed per tick.
1443        ///
1444        /// # Examples
1445        ///
1446        /// ```
1447        /// use tokio::runtime;
1448        ///
1449        /// let rt = runtime::Builder::new_current_thread()
1450        ///     .enable_io()
1451        ///     .max_io_events_per_tick(1024)
1452        ///     .build()
1453        ///     .unwrap();
1454        /// ```
1455        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1456            self.nevents = capacity;
1457            self
1458        }
1459    }
1460}
1461
1462cfg_time! {
1463    impl Builder {
1464        /// Enables the time driver.
1465        ///
1466        /// Doing this enables using `tokio::time` on the runtime.
1467        ///
1468        /// # Examples
1469        ///
1470        /// ```
1471        /// use tokio::runtime;
1472        ///
1473        /// let rt = runtime::Builder::new_multi_thread()
1474        ///     .enable_time()
1475        ///     .build()
1476        ///     .unwrap();
1477        /// ```
1478        pub fn enable_time(&mut self) -> &mut Self {
1479            self.enable_time = true;
1480            self
1481        }
1482    }
1483}
1484
1485cfg_test_util! {
1486    impl Builder {
1487        /// Controls if the runtime's clock starts paused or advancing.
1488        ///
1489        /// Pausing time requires the current-thread runtime; construction of
1490        /// the runtime will panic otherwise.
1491        ///
1492        /// # Examples
1493        ///
1494        /// ```
1495        /// use tokio::runtime;
1496        ///
1497        /// let rt = runtime::Builder::new_current_thread()
1498        ///     .enable_time()
1499        ///     .start_paused(true)
1500        ///     .build()
1501        ///     .unwrap();
1502        /// ```
1503        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1504            self.start_paused = start_paused;
1505            self
1506        }
1507    }
1508}
1509
1510cfg_rt_multi_thread! {
1511    impl Builder {
1512        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1513            use crate::loom::sys::num_cpus;
1514            use crate::runtime::{Config, runtime::Scheduler};
1515            use crate::runtime::scheduler::{self, MultiThread};
1516
1517            let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1518
1519            let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1520
1521            // Create the blocking pool
1522            let blocking_pool =
1523                blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1524            let blocking_spawner = blocking_pool.spawner().clone();
1525
1526            // Generate a rng seed for this runtime.
1527            let seed_generator_1 = self.seed_generator.next_generator();
1528            let seed_generator_2 = self.seed_generator.next_generator();
1529
1530            let (scheduler, handle, launch) = MultiThread::new(
1531                core_threads,
1532                driver,
1533                driver_handle,
1534                blocking_spawner,
1535                seed_generator_2,
1536                Config {
1537                    before_park: self.before_park.clone(),
1538                    after_unpark: self.after_unpark.clone(),
1539                    before_spawn: self.before_spawn.clone(),
1540                    after_termination: self.after_termination.clone(),
1541                    global_queue_interval: self.global_queue_interval,
1542                    event_interval: self.event_interval,
1543                    local_queue_capacity: self.local_queue_capacity,
1544                    #[cfg(tokio_unstable)]
1545                    unhandled_panic: self.unhandled_panic.clone(),
1546                    disable_lifo_slot: self.disable_lifo_slot,
1547                    seed_generator: seed_generator_1,
1548                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1549                },
1550            );
1551
1552            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1553
1554            // Spawn the thread pool workers
1555            let _enter = handle.enter();
1556            launch.launch();
1557
1558            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1559        }
1560
1561        cfg_unstable! {
1562            fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1563                use crate::loom::sys::num_cpus;
1564                use crate::runtime::{Config, runtime::Scheduler};
1565                use crate::runtime::scheduler::MultiThreadAlt;
1566
1567                let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1568                let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1569
1570                // Create the blocking pool
1571                let blocking_pool =
1572                    blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1573                let blocking_spawner = blocking_pool.spawner().clone();
1574
1575                // Generate a rng seed for this runtime.
1576                let seed_generator_1 = self.seed_generator.next_generator();
1577                let seed_generator_2 = self.seed_generator.next_generator();
1578
1579                let (scheduler, handle) = MultiThreadAlt::new(
1580                    core_threads,
1581                    driver,
1582                    driver_handle,
1583                    blocking_spawner,
1584                    seed_generator_2,
1585                    Config {
1586                        before_park: self.before_park.clone(),
1587                        after_unpark: self.after_unpark.clone(),
1588                        before_spawn: self.before_spawn.clone(),
1589                        after_termination: self.after_termination.clone(),
1590                        global_queue_interval: self.global_queue_interval,
1591                        event_interval: self.event_interval,
1592                        local_queue_capacity: self.local_queue_capacity,
1593                        #[cfg(tokio_unstable)]
1594                        unhandled_panic: self.unhandled_panic.clone(),
1595                        disable_lifo_slot: self.disable_lifo_slot,
1596                        seed_generator: seed_generator_1,
1597                        metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1598                    },
1599                );
1600
1601                Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1602            }
1603        }
1604    }
1605}
1606
1607impl fmt::Debug for Builder {
1608    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1609        fmt.debug_struct("Builder")
1610            .field("worker_threads", &self.worker_threads)
1611            .field("max_blocking_threads", &self.max_blocking_threads)
1612            .field(
1613                "thread_name",
1614                &"<dyn Fn() -> String + Send + Sync + 'static>",
1615            )
1616            .field("thread_stack_size", &self.thread_stack_size)
1617            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1618            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1619            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1620            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1621            .finish()
1622    }
1623}