tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37/// // build runtime
38/// let runtime = Builder::new_multi_thread()
39/// .worker_threads(4)
40/// .thread_name("my-custom-name")
41/// .thread_stack_size(3 * 1024 * 1024)
42/// .build()
43/// .unwrap();
44///
45/// // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49 /// Runtime type
50 kind: Kind,
51
52 /// Whether or not to enable the I/O driver
53 enable_io: bool,
54 nevents: usize,
55
56 /// Whether or not to enable the time driver
57 enable_time: bool,
58
59 /// Whether or not the clock should start paused.
60 start_paused: bool,
61
62 /// The number of worker threads, used by Runtime.
63 ///
64 /// Only used when not using the current-thread executor.
65 worker_threads: Option<usize>,
66
67 /// Cap on thread usage.
68 max_blocking_threads: usize,
69
70 /// Name fn used for threads spawned by the runtime.
71 pub(super) thread_name: ThreadNameFn,
72
73 /// Stack size used for threads spawned by the runtime.
74 pub(super) thread_stack_size: Option<usize>,
75
76 /// Callback to run after each thread starts.
77 pub(super) after_start: Option<Callback>,
78
79 /// To run before each worker thread stops
80 pub(super) before_stop: Option<Callback>,
81
82 /// To run before each worker thread is parked.
83 pub(super) before_park: Option<Callback>,
84
85 /// To run after each thread is unparked.
86 pub(super) after_unpark: Option<Callback>,
87
88 /// To run before each task is spawned.
89 pub(super) before_spawn: Option<TaskCallback>,
90
91 /// To run after each task is terminated.
92 pub(super) after_termination: Option<TaskCallback>,
93
94 /// Customizable keep alive timeout for `BlockingPool`
95 pub(super) keep_alive: Option<Duration>,
96
97 /// How many ticks before pulling a task from the global/remote queue?
98 ///
99 /// When `None`, the value is unspecified and behavior details are left to
100 /// the scheduler. Each scheduler flavor could choose to either pick its own
101 /// default value or use some other strategy to decide when to poll from the
102 /// global queue. For example, the multi-threaded scheduler uses a
103 /// self-tuning strategy based on mean task poll times.
104 pub(super) global_queue_interval: Option<u32>,
105
106 /// How many ticks before yielding to the driver for timer and I/O events?
107 pub(super) event_interval: u32,
108
109 pub(super) local_queue_capacity: usize,
110
111 /// When true, the multi-threade scheduler LIFO slot should not be used.
112 ///
113 /// This option should only be exposed as unstable.
114 pub(super) disable_lifo_slot: bool,
115
116 /// Specify a random number generator seed to provide deterministic results
117 pub(super) seed_generator: RngSeedGenerator,
118
119 /// When true, enables task poll count histogram instrumentation.
120 pub(super) metrics_poll_count_histogram_enable: bool,
121
122 /// Configures the task poll count histogram
123 pub(super) metrics_poll_count_histogram: HistogramBuilder,
124
125 #[cfg(tokio_unstable)]
126 pub(super) unhandled_panic: UnhandledPanic,
127}
128
129cfg_unstable! {
130 /// How the runtime should respond to unhandled panics.
131 ///
132 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
133 /// to configure the runtime behavior when a spawned task panics.
134 ///
135 /// See [`Builder::unhandled_panic`] for more details.
136 #[derive(Debug, Clone)]
137 #[non_exhaustive]
138 pub enum UnhandledPanic {
139 /// The runtime should ignore panics on spawned tasks.
140 ///
141 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
142 /// tasks continue running normally.
143 ///
144 /// This is the default behavior.
145 ///
146 /// # Examples
147 ///
148 /// ```
149 /// use tokio::runtime::{self, UnhandledPanic};
150 ///
151 /// # pub fn main() {
152 /// let rt = runtime::Builder::new_current_thread()
153 /// .unhandled_panic(UnhandledPanic::Ignore)
154 /// .build()
155 /// .unwrap();
156 ///
157 /// let task1 = rt.spawn(async { panic!("boom"); });
158 /// let task2 = rt.spawn(async {
159 /// // This task completes normally
160 /// "done"
161 /// });
162 ///
163 /// rt.block_on(async {
164 /// // The panic on the first task is forwarded to the `JoinHandle`
165 /// assert!(task1.await.is_err());
166 ///
167 /// // The second task completes normally
168 /// assert!(task2.await.is_ok());
169 /// })
170 /// # }
171 /// ```
172 ///
173 /// [`JoinHandle`]: struct@crate::task::JoinHandle
174 Ignore,
175
176 /// The runtime should immediately shutdown if a spawned task panics.
177 ///
178 /// The runtime will immediately shutdown even if the panicked task's
179 /// [`JoinHandle`] is still available. All further spawned tasks will be
180 /// immediately dropped and call to [`Runtime::block_on`] will panic.
181 ///
182 /// # Examples
183 ///
184 /// ```should_panic
185 /// use tokio::runtime::{self, UnhandledPanic};
186 ///
187 /// # pub fn main() {
188 /// let rt = runtime::Builder::new_current_thread()
189 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
190 /// .build()
191 /// .unwrap();
192 ///
193 /// rt.spawn(async { panic!("boom"); });
194 /// rt.spawn(async {
195 /// // This task never completes.
196 /// });
197 ///
198 /// rt.block_on(async {
199 /// // Do some work
200 /// # loop { tokio::task::yield_now().await; }
201 /// })
202 /// # }
203 /// ```
204 ///
205 /// [`JoinHandle`]: struct@crate::task::JoinHandle
206 ShutdownRuntime,
207 }
208}
209
210pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
211
212#[derive(Clone, Copy)]
213pub(crate) enum Kind {
214 CurrentThread,
215 #[cfg(feature = "rt-multi-thread")]
216 MultiThread,
217 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
218 MultiThreadAlt,
219}
220
221impl Builder {
222 /// Returns a new builder with the current thread scheduler selected.
223 ///
224 /// Configuration methods can be chained on the return value.
225 ///
226 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
227 /// [`LocalSet`].
228 ///
229 /// [`LocalSet`]: crate::task::LocalSet
230 pub fn new_current_thread() -> Builder {
231 #[cfg(loom)]
232 const EVENT_INTERVAL: u32 = 4;
233 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
234 #[cfg(not(loom))]
235 const EVENT_INTERVAL: u32 = 61;
236
237 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
238 }
239
240 /// Returns a new builder with the multi thread scheduler selected.
241 ///
242 /// Configuration methods can be chained on the return value.
243 #[cfg(feature = "rt-multi-thread")]
244 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
245 pub fn new_multi_thread() -> Builder {
246 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
247 Builder::new(Kind::MultiThread, 61)
248 }
249
250 cfg_unstable! {
251 /// Returns a new builder with the alternate multi thread scheduler
252 /// selected.
253 ///
254 /// The alternate multi threaded scheduler is an in-progress
255 /// candidate to replace the existing multi threaded scheduler. It
256 /// currently does not scale as well to 16+ processors.
257 ///
258 /// This runtime flavor is currently **not considered production
259 /// ready**.
260 ///
261 /// Configuration methods can be chained on the return value.
262 #[cfg(feature = "rt-multi-thread")]
263 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
264 pub fn new_multi_thread_alt() -> Builder {
265 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
266 Builder::new(Kind::MultiThreadAlt, 61)
267 }
268 }
269
270 /// Returns a new runtime builder initialized with default configuration
271 /// values.
272 ///
273 /// Configuration methods can be chained on the return value.
274 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
275 Builder {
276 kind,
277
278 // I/O defaults to "off"
279 enable_io: false,
280 nevents: 1024,
281
282 // Time defaults to "off"
283 enable_time: false,
284
285 // The clock starts not-paused
286 start_paused: false,
287
288 // Read from environment variable first in multi-threaded mode.
289 // Default to lazy auto-detection (one thread per CPU core)
290 worker_threads: None,
291
292 max_blocking_threads: 512,
293
294 // Default thread name
295 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
296
297 // Do not set a stack size by default
298 thread_stack_size: None,
299
300 // No worker thread callbacks
301 after_start: None,
302 before_stop: None,
303 before_park: None,
304 after_unpark: None,
305
306 before_spawn: None,
307 after_termination: None,
308
309 keep_alive: None,
310
311 // Defaults for these values depend on the scheduler kind, so we get them
312 // as parameters.
313 global_queue_interval: None,
314 event_interval,
315
316 #[cfg(not(loom))]
317 local_queue_capacity: 256,
318
319 #[cfg(loom)]
320 local_queue_capacity: 4,
321
322 seed_generator: RngSeedGenerator::new(RngSeed::new()),
323
324 #[cfg(tokio_unstable)]
325 unhandled_panic: UnhandledPanic::Ignore,
326
327 metrics_poll_count_histogram_enable: false,
328
329 metrics_poll_count_histogram: HistogramBuilder::default(),
330
331 disable_lifo_slot: false,
332 }
333 }
334
335 /// Enables both I/O and time drivers.
336 ///
337 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
338 /// individually. If additional components are added to Tokio in the future,
339 /// `enable_all` will include these future components.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use tokio::runtime;
345 ///
346 /// let rt = runtime::Builder::new_multi_thread()
347 /// .enable_all()
348 /// .build()
349 /// .unwrap();
350 /// ```
351 pub fn enable_all(&mut self) -> &mut Self {
352 #[cfg(any(
353 feature = "net",
354 all(unix, feature = "process"),
355 all(unix, feature = "signal")
356 ))]
357 self.enable_io();
358 #[cfg(feature = "time")]
359 self.enable_time();
360
361 self
362 }
363
364 /// Sets the number of worker threads the `Runtime` will use.
365 ///
366 /// This can be any number above 0 though it is advised to keep this value
367 /// on the smaller side.
368 ///
369 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
370 ///
371 /// # Default
372 ///
373 /// The default value is the number of cores available to the system.
374 ///
375 /// When using the `current_thread` runtime this method has no effect.
376 ///
377 /// # Examples
378 ///
379 /// ## Multi threaded runtime with 4 threads
380 ///
381 /// ```
382 /// use tokio::runtime;
383 ///
384 /// // This will spawn a work-stealing runtime with 4 worker threads.
385 /// let rt = runtime::Builder::new_multi_thread()
386 /// .worker_threads(4)
387 /// .build()
388 /// .unwrap();
389 ///
390 /// rt.spawn(async move {});
391 /// ```
392 ///
393 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
394 ///
395 /// ```
396 /// use tokio::runtime;
397 ///
398 /// // Create a runtime that _must_ be driven from a call
399 /// // to `Runtime::block_on`.
400 /// let rt = runtime::Builder::new_current_thread()
401 /// .build()
402 /// .unwrap();
403 ///
404 /// // This will run the runtime and future on the current thread
405 /// rt.block_on(async move {});
406 /// ```
407 ///
408 /// # Panics
409 ///
410 /// This will panic if `val` is not larger than `0`.
411 #[track_caller]
412 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
413 assert!(val > 0, "Worker threads cannot be set to 0");
414 self.worker_threads = Some(val);
415 self
416 }
417
418 /// Specifies the limit for additional threads spawned by the Runtime.
419 ///
420 /// These threads are used for blocking operations like tasks spawned
421 /// through [`spawn_blocking`], this includes but is not limited to:
422 /// - [`fs`] operations
423 /// - dns resolution through [`ToSocketAddrs`]
424 /// - writing to [`Stdout`] or [`Stderr`]
425 /// - reading from [`Stdin`]
426 ///
427 /// Unlike the [`worker_threads`], they are not always active and will exit
428 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
429 ///
430 /// It's recommended to not set this limit too low in order to avoid hanging on operations
431 /// requiring [`spawn_blocking`].
432 ///
433 /// The default value is 512.
434 ///
435 /// # Panics
436 ///
437 /// This will panic if `val` is not larger than `0`.
438 ///
439 /// # Upgrading from 0.x
440 ///
441 /// In old versions `max_threads` limited both blocking and worker threads, but the
442 /// current `max_blocking_threads` does not include async worker threads in the count.
443 ///
444 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
445 /// [`fs`]: mod@crate::fs
446 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
447 /// [`Stdout`]: struct@crate::io::Stdout
448 /// [`Stdin`]: struct@crate::io::Stdin
449 /// [`Stderr`]: struct@crate::io::Stderr
450 /// [`worker_threads`]: Self::worker_threads
451 /// [`thread_keep_alive`]: Self::thread_keep_alive
452 #[track_caller]
453 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
454 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
455 assert!(val > 0, "Max blocking threads cannot be set to 0");
456 self.max_blocking_threads = val;
457 self
458 }
459
460 /// Sets name of threads spawned by the `Runtime`'s thread pool.
461 ///
462 /// The default name is "tokio-runtime-worker".
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// # use tokio::runtime;
468 ///
469 /// # pub fn main() {
470 /// let rt = runtime::Builder::new_multi_thread()
471 /// .thread_name("my-pool")
472 /// .build();
473 /// # }
474 /// ```
475 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
476 let val = val.into();
477 self.thread_name = std::sync::Arc::new(move || val.clone());
478 self
479 }
480
481 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
482 ///
483 /// The default name fn is `|| "tokio-runtime-worker".into()`.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// # use tokio::runtime;
489 /// # use std::sync::atomic::{AtomicUsize, Ordering};
490 /// # pub fn main() {
491 /// let rt = runtime::Builder::new_multi_thread()
492 /// .thread_name_fn(|| {
493 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
494 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
495 /// format!("my-pool-{}", id)
496 /// })
497 /// .build();
498 /// # }
499 /// ```
500 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
501 where
502 F: Fn() -> String + Send + Sync + 'static,
503 {
504 self.thread_name = std::sync::Arc::new(f);
505 self
506 }
507
508 /// Sets the stack size (in bytes) for worker threads.
509 ///
510 /// The actual stack size may be greater than this value if the platform
511 /// specifies minimal stack size.
512 ///
513 /// The default stack size for spawned threads is 2 MiB, though this
514 /// particular stack size is subject to change in the future.
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// # use tokio::runtime;
520 ///
521 /// # pub fn main() {
522 /// let rt = runtime::Builder::new_multi_thread()
523 /// .thread_stack_size(32 * 1024)
524 /// .build();
525 /// # }
526 /// ```
527 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
528 self.thread_stack_size = Some(val);
529 self
530 }
531
532 /// Executes function `f` after each thread is started but before it starts
533 /// doing work.
534 ///
535 /// This is intended for bookkeeping and monitoring use cases.
536 ///
537 /// # Examples
538 ///
539 /// ```
540 /// # use tokio::runtime;
541 /// # pub fn main() {
542 /// let runtime = runtime::Builder::new_multi_thread()
543 /// .on_thread_start(|| {
544 /// println!("thread started");
545 /// })
546 /// .build();
547 /// # }
548 /// ```
549 #[cfg(not(loom))]
550 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
551 where
552 F: Fn() + Send + Sync + 'static,
553 {
554 self.after_start = Some(std::sync::Arc::new(f));
555 self
556 }
557
558 /// Executes function `f` before each thread stops.
559 ///
560 /// This is intended for bookkeeping and monitoring use cases.
561 ///
562 /// # Examples
563 ///
564 /// ```
565 /// # use tokio::runtime;
566 /// # pub fn main() {
567 /// let runtime = runtime::Builder::new_multi_thread()
568 /// .on_thread_stop(|| {
569 /// println!("thread stopping");
570 /// })
571 /// .build();
572 /// # }
573 /// ```
574 #[cfg(not(loom))]
575 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
576 where
577 F: Fn() + Send + Sync + 'static,
578 {
579 self.before_stop = Some(std::sync::Arc::new(f));
580 self
581 }
582
583 /// Executes function `f` just before a thread is parked (goes idle).
584 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
585 /// can be called, and may result in this thread being unparked immediately.
586 ///
587 /// This can be used to start work only when the executor is idle, or for bookkeeping
588 /// and monitoring purposes.
589 ///
590 /// Note: There can only be one park callback for a runtime; calling this function
591 /// more than once replaces the last callback defined, rather than adding to it.
592 ///
593 /// # Examples
594 ///
595 /// ## Multithreaded executor
596 /// ```
597 /// # use std::sync::Arc;
598 /// # use std::sync::atomic::{AtomicBool, Ordering};
599 /// # use tokio::runtime;
600 /// # use tokio::sync::Barrier;
601 /// # pub fn main() {
602 /// let once = AtomicBool::new(true);
603 /// let barrier = Arc::new(Barrier::new(2));
604 ///
605 /// let runtime = runtime::Builder::new_multi_thread()
606 /// .worker_threads(1)
607 /// .on_thread_park({
608 /// let barrier = barrier.clone();
609 /// move || {
610 /// let barrier = barrier.clone();
611 /// if once.swap(false, Ordering::Relaxed) {
612 /// tokio::spawn(async move { barrier.wait().await; });
613 /// }
614 /// }
615 /// })
616 /// .build()
617 /// .unwrap();
618 ///
619 /// runtime.block_on(async {
620 /// barrier.wait().await;
621 /// })
622 /// # }
623 /// ```
624 /// ## Current thread executor
625 /// ```
626 /// # use std::sync::Arc;
627 /// # use std::sync::atomic::{AtomicBool, Ordering};
628 /// # use tokio::runtime;
629 /// # use tokio::sync::Barrier;
630 /// # pub fn main() {
631 /// let once = AtomicBool::new(true);
632 /// let barrier = Arc::new(Barrier::new(2));
633 ///
634 /// let runtime = runtime::Builder::new_current_thread()
635 /// .on_thread_park({
636 /// let barrier = barrier.clone();
637 /// move || {
638 /// let barrier = barrier.clone();
639 /// if once.swap(false, Ordering::Relaxed) {
640 /// tokio::spawn(async move { barrier.wait().await; });
641 /// }
642 /// }
643 /// })
644 /// .build()
645 /// .unwrap();
646 ///
647 /// runtime.block_on(async {
648 /// barrier.wait().await;
649 /// })
650 /// # }
651 /// ```
652 #[cfg(not(loom))]
653 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
654 where
655 F: Fn() + Send + Sync + 'static,
656 {
657 self.before_park = Some(std::sync::Arc::new(f));
658 self
659 }
660
661 /// Executes function `f` just after a thread unparks (starts executing tasks).
662 ///
663 /// This is intended for bookkeeping and monitoring use cases; note that work
664 /// in this callback will increase latencies when the application has allowed one or
665 /// more runtime threads to go idle.
666 ///
667 /// Note: There can only be one unpark callback for a runtime; calling this function
668 /// more than once replaces the last callback defined, rather than adding to it.
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// # use tokio::runtime;
674 /// # pub fn main() {
675 /// let runtime = runtime::Builder::new_multi_thread()
676 /// .on_thread_unpark(|| {
677 /// println!("thread unparking");
678 /// })
679 /// .build();
680 ///
681 /// runtime.unwrap().block_on(async {
682 /// tokio::task::yield_now().await;
683 /// println!("Hello from Tokio!");
684 /// })
685 /// # }
686 /// ```
687 #[cfg(not(loom))]
688 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
689 where
690 F: Fn() + Send + Sync + 'static,
691 {
692 self.after_unpark = Some(std::sync::Arc::new(f));
693 self
694 }
695
696 /// Executes function `f` just before a task is spawned.
697 ///
698 /// `f` is called within the Tokio context, so functions like
699 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
700 /// invoked immediately.
701 ///
702 /// This can be used for bookkeeping or monitoring purposes.
703 ///
704 /// Note: There can only be one spawn callback for a runtime; calling this function more
705 /// than once replaces the last callback defined, rather than adding to it.
706 ///
707 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
708 ///
709 /// **Note**: This is an [unstable API][unstable]. The public API of this type
710 /// may break in 1.x releases. See [the documentation on unstable
711 /// features][unstable] for details.
712 ///
713 /// [unstable]: crate#unstable-features
714 ///
715 /// # Examples
716 ///
717 /// ```
718 /// # use tokio::runtime;
719 /// # pub fn main() {
720 /// let runtime = runtime::Builder::new_current_thread()
721 /// .on_task_spawn(|_| {
722 /// println!("spawning task");
723 /// })
724 /// .build()
725 /// .unwrap();
726 ///
727 /// runtime.block_on(async {
728 /// tokio::task::spawn(std::future::ready(()));
729 ///
730 /// for _ in 0..64 {
731 /// tokio::task::yield_now().await;
732 /// }
733 /// })
734 /// # }
735 /// ```
736 #[cfg(all(not(loom), tokio_unstable))]
737 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
738 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
739 where
740 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
741 {
742 self.before_spawn = Some(std::sync::Arc::new(f));
743 self
744 }
745
746 /// Executes function `f` just after a task is terminated.
747 ///
748 /// `f` is called within the Tokio context, so functions like
749 /// [`tokio::spawn`](crate::spawn) can be called.
750 ///
751 /// This can be used for bookkeeping or monitoring purposes.
752 ///
753 /// Note: There can only be one task termination callback for a runtime; calling this
754 /// function more than once replaces the last callback defined, rather than adding to it.
755 ///
756 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
757 ///
758 /// **Note**: This is an [unstable API][unstable]. The public API of this type
759 /// may break in 1.x releases. See [the documentation on unstable
760 /// features][unstable] for details.
761 ///
762 /// [unstable]: crate#unstable-features
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// # use tokio::runtime;
768 /// # pub fn main() {
769 /// let runtime = runtime::Builder::new_current_thread()
770 /// .on_task_terminate(|_| {
771 /// println!("killing task");
772 /// })
773 /// .build()
774 /// .unwrap();
775 ///
776 /// runtime.block_on(async {
777 /// tokio::task::spawn(std::future::ready(()));
778 ///
779 /// for _ in 0..64 {
780 /// tokio::task::yield_now().await;
781 /// }
782 /// })
783 /// # }
784 /// ```
785 #[cfg(all(not(loom), tokio_unstable))]
786 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
787 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
788 where
789 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
790 {
791 self.after_termination = Some(std::sync::Arc::new(f));
792 self
793 }
794
795 /// Creates the configured `Runtime`.
796 ///
797 /// The returned `Runtime` instance is ready to spawn tasks.
798 ///
799 /// # Examples
800 ///
801 /// ```
802 /// use tokio::runtime::Builder;
803 ///
804 /// let rt = Builder::new_multi_thread().build().unwrap();
805 ///
806 /// rt.block_on(async {
807 /// println!("Hello from the Tokio runtime");
808 /// });
809 /// ```
810 pub fn build(&mut self) -> io::Result<Runtime> {
811 match &self.kind {
812 Kind::CurrentThread => self.build_current_thread_runtime(),
813 #[cfg(feature = "rt-multi-thread")]
814 Kind::MultiThread => self.build_threaded_runtime(),
815 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
816 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
817 }
818 }
819
820 /// Creates the configured `LocalRuntime`.
821 ///
822 /// The returned `LocalRuntime` instance is ready to spawn tasks.
823 ///
824 /// # Panics
825 /// This will panic if `current_thread` is not the selected runtime flavor.
826 /// All other runtime flavors are unsupported by [`LocalRuntime`].
827 ///
828 /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
829 ///
830 /// # Examples
831 ///
832 /// ```
833 /// use tokio::runtime::Builder;
834 ///
835 /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
836 ///
837 /// rt.block_on(async {
838 /// println!("Hello from the Tokio runtime");
839 /// });
840 /// ```
841 #[allow(unused_variables, unreachable_patterns)]
842 #[cfg(tokio_unstable)]
843 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
844 pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
845 match &self.kind {
846 Kind::CurrentThread => self.build_current_thread_local_runtime(),
847 _ => panic!("Only current_thread is supported when building a local runtime"),
848 }
849 }
850
851 fn get_cfg(&self, workers: usize) -> driver::Cfg {
852 driver::Cfg {
853 enable_pause_time: match self.kind {
854 Kind::CurrentThread => true,
855 #[cfg(feature = "rt-multi-thread")]
856 Kind::MultiThread => false,
857 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
858 Kind::MultiThreadAlt => false,
859 },
860 enable_io: self.enable_io,
861 enable_time: self.enable_time,
862 start_paused: self.start_paused,
863 nevents: self.nevents,
864 workers,
865 }
866 }
867
868 /// Sets a custom timeout for a thread in the blocking pool.
869 ///
870 /// By default, the timeout for a thread is set to 10 seconds. This can
871 /// be overridden using `.thread_keep_alive()`.
872 ///
873 /// # Example
874 ///
875 /// ```
876 /// # use tokio::runtime;
877 /// # use std::time::Duration;
878 /// # pub fn main() {
879 /// let rt = runtime::Builder::new_multi_thread()
880 /// .thread_keep_alive(Duration::from_millis(100))
881 /// .build();
882 /// # }
883 /// ```
884 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
885 self.keep_alive = Some(duration);
886 self
887 }
888
889 /// Sets the number of scheduler ticks after which the scheduler will poll the global
890 /// task queue.
891 ///
892 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
893 ///
894 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
895 /// [the module documentation] for the default behavior of the multi-thread scheduler.
896 ///
897 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
898 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
899 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
900 /// getting started on new work, especially if tasks frequently yield rather than complete
901 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
902 /// is a good choice when most tasks quickly complete polling.
903 ///
904 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
905 ///
906 /// # Panics
907 ///
908 /// This function will panic if 0 is passed as an argument.
909 ///
910 /// # Examples
911 ///
912 /// ```
913 /// # use tokio::runtime;
914 /// # pub fn main() {
915 /// let rt = runtime::Builder::new_multi_thread()
916 /// .global_queue_interval(31)
917 /// .build();
918 /// # }
919 /// ```
920 #[track_caller]
921 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
922 assert!(val > 0, "global_queue_interval must be greater than 0");
923 self.global_queue_interval = Some(val);
924 self
925 }
926
927 /// Sets the number of scheduler ticks after which the scheduler will poll for
928 /// external events (timers, I/O, and so on).
929 ///
930 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
931 ///
932 /// By default, the event interval is `61` for all scheduler types.
933 ///
934 /// Setting the event interval determines the effective "priority" of delivering
935 /// these external events (which may wake up additional tasks), compared to
936 /// executing tasks that are currently ready to run. A smaller value is useful
937 /// when tasks frequently spend a long time in polling, or frequently yield,
938 /// which can result in overly long delays picking up I/O events. Conversely,
939 /// picking up new events requires extra synchronization and syscall overhead,
940 /// so if tasks generally complete their polling quickly, a higher event interval
941 /// will minimize that overhead while still keeping the scheduler responsive to
942 /// events.
943 ///
944 /// # Examples
945 ///
946 /// ```
947 /// # use tokio::runtime;
948 /// # pub fn main() {
949 /// let rt = runtime::Builder::new_multi_thread()
950 /// .event_interval(31)
951 /// .build();
952 /// # }
953 /// ```
954 pub fn event_interval(&mut self, val: u32) -> &mut Self {
955 self.event_interval = val;
956 self
957 }
958
959 cfg_unstable! {
960 /// Configure how the runtime responds to an unhandled panic on a
961 /// spawned task.
962 ///
963 /// By default, an unhandled panic (i.e. a panic not caught by
964 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
965 /// execution. The panic's error value is forwarded to the task's
966 /// [`JoinHandle`] and all other spawned tasks continue running.
967 ///
968 /// The `unhandled_panic` option enables configuring this behavior.
969 ///
970 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
971 /// spawned tasks have no impact on the runtime's execution.
972 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
973 /// shutdown immediately when a spawned task panics even if that
974 /// task's `JoinHandle` has not been dropped. All other spawned tasks
975 /// will immediately terminate and further calls to
976 /// [`Runtime::block_on`] will panic.
977 ///
978 /// # Panics
979 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
980 /// on a runtime other than the current thread runtime.
981 ///
982 /// # Unstable
983 ///
984 /// This option is currently unstable and its implementation is
985 /// incomplete. The API may change or be removed in the future. See
986 /// issue [tokio-rs/tokio#4516] for more details.
987 ///
988 /// # Examples
989 ///
990 /// The following demonstrates a runtime configured to shutdown on
991 /// panic. The first spawned task panics and results in the runtime
992 /// shutting down. The second spawned task never has a chance to
993 /// execute. The call to `block_on` will panic due to the runtime being
994 /// forcibly shutdown.
995 ///
996 /// ```should_panic
997 /// use tokio::runtime::{self, UnhandledPanic};
998 ///
999 /// # pub fn main() {
1000 /// let rt = runtime::Builder::new_current_thread()
1001 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1002 /// .build()
1003 /// .unwrap();
1004 ///
1005 /// rt.spawn(async { panic!("boom"); });
1006 /// rt.spawn(async {
1007 /// // This task never completes.
1008 /// });
1009 ///
1010 /// rt.block_on(async {
1011 /// // Do some work
1012 /// # loop { tokio::task::yield_now().await; }
1013 /// })
1014 /// # }
1015 /// ```
1016 ///
1017 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1018 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1019 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1020 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1021 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1022 }
1023
1024 self.unhandled_panic = behavior;
1025 self
1026 }
1027
1028 /// Disables the LIFO task scheduler heuristic.
1029 ///
1030 /// The multi-threaded scheduler includes a heuristic for optimizing
1031 /// message-passing patterns. This heuristic results in the **last**
1032 /// scheduled task being polled first.
1033 ///
1034 /// To implement this heuristic, each worker thread has a slot which
1035 /// holds the task that should be polled next. However, this slot cannot
1036 /// be stolen by other worker threads, which can result in lower total
1037 /// throughput when tasks tend to have longer poll times.
1038 ///
1039 /// This configuration option will disable this heuristic resulting in
1040 /// all scheduled tasks being pushed into the worker-local queue, which
1041 /// is stealable.
1042 ///
1043 /// Consider trying this option when the task "scheduled" time is high
1044 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1045 /// collect this data.
1046 ///
1047 /// # Unstable
1048 ///
1049 /// This configuration option is considered a workaround for the LIFO
1050 /// slot not being stealable. When the slot becomes stealable, we will
1051 /// revisit whether or not this option is necessary. See
1052 /// issue [tokio-rs/tokio#4941].
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```
1057 /// use tokio::runtime;
1058 ///
1059 /// let rt = runtime::Builder::new_multi_thread()
1060 /// .disable_lifo_slot()
1061 /// .build()
1062 /// .unwrap();
1063 /// ```
1064 ///
1065 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1066 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1067 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1068 self.disable_lifo_slot = true;
1069 self
1070 }
1071
1072 /// Specifies the random number generation seed to use within all
1073 /// threads associated with the runtime being built.
1074 ///
1075 /// This option is intended to make certain parts of the runtime
1076 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1077 /// [`tokio::select!`] it will ensure that the order that branches are
1078 /// polled is deterministic.
1079 ///
1080 /// In addition to the code specifying `rng_seed` and interacting with
1081 /// the runtime, the internals of Tokio and the Rust compiler may affect
1082 /// the sequences of random numbers. In order to ensure repeatable
1083 /// results, the version of Tokio, the versions of all other
1084 /// dependencies that interact with Tokio, and the Rust compiler version
1085 /// should also all remain constant.
1086 ///
1087 /// # Examples
1088 ///
1089 /// ```
1090 /// # use tokio::runtime::{self, RngSeed};
1091 /// # pub fn main() {
1092 /// let seed = RngSeed::from_bytes(b"place your seed here");
1093 /// let rt = runtime::Builder::new_current_thread()
1094 /// .rng_seed(seed)
1095 /// .build();
1096 /// # }
1097 /// ```
1098 ///
1099 /// [`tokio::select!`]: crate::select
1100 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1101 self.seed_generator = RngSeedGenerator::new(seed);
1102 self
1103 }
1104 }
1105
1106 cfg_unstable_metrics! {
1107 /// Enables tracking the distribution of task poll times.
1108 ///
1109 /// Task poll times are not instrumented by default as doing so requires
1110 /// calling [`Instant::now()`] twice per task poll, which could add
1111 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1112 /// metrics data.
1113 ///
1114 /// The histogram uses fixed bucket sizes. In other words, the histogram
1115 /// buckets are not dynamic based on input values. Use the
1116 /// `metrics_poll_time_histogram` builder methods to configure the
1117 /// histogram details.
1118 ///
1119 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1120 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1121 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1122 /// to select [`LogHistogram`] instead.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```
1127 /// use tokio::runtime;
1128 ///
1129 /// let rt = runtime::Builder::new_multi_thread()
1130 /// .enable_metrics_poll_time_histogram()
1131 /// .build()
1132 /// .unwrap();
1133 /// # // Test default values here
1134 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1135 /// # let m = rt.handle().metrics();
1136 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1137 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1138 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1139 /// ```
1140 ///
1141 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1142 /// [`Instant::now()`]: std::time::Instant::now
1143 /// [`LogHistogram`]: crate::runtime::LogHistogram
1144 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1145 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1146 self.metrics_poll_count_histogram_enable = true;
1147 self
1148 }
1149
1150 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1151 ///
1152 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1153 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1154 #[doc(hidden)]
1155 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1156 self.enable_metrics_poll_time_histogram()
1157 }
1158
1159 /// Sets the histogram scale for tracking the distribution of task poll
1160 /// times.
1161 ///
1162 /// Tracking the distribution of task poll times can be done using a
1163 /// linear or log scale. When using linear scale, each histogram bucket
1164 /// will represent the same range of poll times. When using log scale,
1165 /// each histogram bucket will cover a range twice as big as the
1166 /// previous bucket.
1167 ///
1168 /// **Default:** linear scale.
1169 ///
1170 /// # Examples
1171 ///
1172 /// ```
1173 /// use tokio::runtime::{self, HistogramScale};
1174 ///
1175 /// # #[allow(deprecated)]
1176 /// let rt = runtime::Builder::new_multi_thread()
1177 /// .enable_metrics_poll_time_histogram()
1178 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1179 /// .build()
1180 /// .unwrap();
1181 /// ```
1182 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1183 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1184 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1185 self
1186 }
1187
1188 /// Configure the histogram for tracking poll times
1189 ///
1190 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1191 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1192 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1193 ///
1194 /// # Examples
1195 /// Configure a [`LogHistogram`] with [default configuration]:
1196 /// ```
1197 /// use tokio::runtime;
1198 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1199 ///
1200 /// let rt = runtime::Builder::new_multi_thread()
1201 /// .enable_metrics_poll_time_histogram()
1202 /// .metrics_poll_time_histogram_configuration(
1203 /// HistogramConfiguration::log(LogHistogram::default())
1204 /// )
1205 /// .build()
1206 /// .unwrap();
1207 /// ```
1208 ///
1209 /// Configure a linear histogram with 100 buckets, each 10μs wide
1210 /// ```
1211 /// use tokio::runtime;
1212 /// use std::time::Duration;
1213 /// use tokio::runtime::HistogramConfiguration;
1214 ///
1215 /// let rt = runtime::Builder::new_multi_thread()
1216 /// .enable_metrics_poll_time_histogram()
1217 /// .metrics_poll_time_histogram_configuration(
1218 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1219 /// )
1220 /// .build()
1221 /// .unwrap();
1222 /// ```
1223 ///
1224 /// Configure a [`LogHistogram`] with the following settings:
1225 /// - Measure times from 100ns to 120s
1226 /// - Max error of 0.1
1227 /// - No more than 1024 buckets
1228 /// ```
1229 /// use std::time::Duration;
1230 /// use tokio::runtime;
1231 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1232 ///
1233 /// let rt = runtime::Builder::new_multi_thread()
1234 /// .enable_metrics_poll_time_histogram()
1235 /// .metrics_poll_time_histogram_configuration(
1236 /// HistogramConfiguration::log(LogHistogram::builder()
1237 /// .max_value(Duration::from_secs(120))
1238 /// .min_value(Duration::from_nanos(100))
1239 /// .max_error(0.1)
1240 /// .max_buckets(1024)
1241 /// .expect("configuration uses 488 buckets")
1242 /// )
1243 /// )
1244 /// .build()
1245 /// .unwrap();
1246 /// ```
1247 ///
1248 /// [`LogHistogram`]: crate::runtime::LogHistogram
1249 /// [default configuration]: crate::runtime::LogHistogramBuilder
1250 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1251 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1252 self
1253 }
1254
1255 /// Sets the histogram resolution for tracking the distribution of task
1256 /// poll times.
1257 ///
1258 /// The resolution is the histogram's first bucket's range. When using a
1259 /// linear histogram scale, each bucket will cover the same range. When
1260 /// using a log scale, each bucket will cover a range twice as big as
1261 /// the previous bucket. In the log case, the resolution represents the
1262 /// smallest bucket range.
1263 ///
1264 /// Note that, when using log scale, the resolution is rounded up to the
1265 /// nearest power of 2 in nanoseconds.
1266 ///
1267 /// **Default:** 100 microseconds.
1268 ///
1269 /// # Examples
1270 ///
1271 /// ```
1272 /// use tokio::runtime;
1273 /// use std::time::Duration;
1274 ///
1275 /// # #[allow(deprecated)]
1276 /// let rt = runtime::Builder::new_multi_thread()
1277 /// .enable_metrics_poll_time_histogram()
1278 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1279 /// .build()
1280 /// .unwrap();
1281 /// ```
1282 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1283 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1284 assert!(resolution > Duration::from_secs(0));
1285 // Sanity check the argument and also make the cast below safe.
1286 assert!(resolution <= Duration::from_secs(1));
1287
1288 let resolution = resolution.as_nanos() as u64;
1289
1290 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1291 self
1292 }
1293
1294 /// Sets the number of buckets for the histogram tracking the
1295 /// distribution of task poll times.
1296 ///
1297 /// The last bucket tracks all greater values that fall out of other
1298 /// ranges. So, configuring the histogram using a linear scale,
1299 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1300 /// polls that take more than 450ms to complete.
1301 ///
1302 /// **Default:** 10
1303 ///
1304 /// # Examples
1305 ///
1306 /// ```
1307 /// use tokio::runtime;
1308 ///
1309 /// # #[allow(deprecated)]
1310 /// let rt = runtime::Builder::new_multi_thread()
1311 /// .enable_metrics_poll_time_histogram()
1312 /// .metrics_poll_count_histogram_buckets(15)
1313 /// .build()
1314 /// .unwrap();
1315 /// ```
1316 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1317 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1318 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1319 self
1320 }
1321 }
1322
1323 cfg_loom! {
1324 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1325 assert!(value.is_power_of_two());
1326 self.local_queue_capacity = value;
1327 self
1328 }
1329 }
1330
1331 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1332 use crate::runtime::runtime::Scheduler;
1333
1334 let (scheduler, handle, blocking_pool) =
1335 self.build_current_thread_runtime_components(None)?;
1336
1337 Ok(Runtime::from_parts(
1338 Scheduler::CurrentThread(scheduler),
1339 handle,
1340 blocking_pool,
1341 ))
1342 }
1343
1344 #[cfg(tokio_unstable)]
1345 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1346 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1347
1348 let tid = std::thread::current().id();
1349
1350 let (scheduler, handle, blocking_pool) =
1351 self.build_current_thread_runtime_components(Some(tid))?;
1352
1353 Ok(LocalRuntime::from_parts(
1354 LocalRuntimeScheduler::CurrentThread(scheduler),
1355 handle,
1356 blocking_pool,
1357 ))
1358 }
1359
1360 fn build_current_thread_runtime_components(
1361 &mut self,
1362 local_tid: Option<ThreadId>,
1363 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1364 use crate::runtime::scheduler;
1365 use crate::runtime::Config;
1366
1367 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1368
1369 // Blocking pool
1370 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1371 let blocking_spawner = blocking_pool.spawner().clone();
1372
1373 // Generate a rng seed for this runtime.
1374 let seed_generator_1 = self.seed_generator.next_generator();
1375 let seed_generator_2 = self.seed_generator.next_generator();
1376
1377 // And now put a single-threaded scheduler on top of the timer. When
1378 // there are no futures ready to do something, it'll let the timer or
1379 // the reactor to generate some new stimuli for the futures to continue
1380 // in their life.
1381 let (scheduler, handle) = CurrentThread::new(
1382 driver,
1383 driver_handle,
1384 blocking_spawner,
1385 seed_generator_2,
1386 Config {
1387 before_park: self.before_park.clone(),
1388 after_unpark: self.after_unpark.clone(),
1389 before_spawn: self.before_spawn.clone(),
1390 after_termination: self.after_termination.clone(),
1391 global_queue_interval: self.global_queue_interval,
1392 event_interval: self.event_interval,
1393 local_queue_capacity: self.local_queue_capacity,
1394 #[cfg(tokio_unstable)]
1395 unhandled_panic: self.unhandled_panic.clone(),
1396 disable_lifo_slot: self.disable_lifo_slot,
1397 seed_generator: seed_generator_1,
1398 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1399 },
1400 local_tid,
1401 );
1402
1403 let handle = Handle {
1404 inner: scheduler::Handle::CurrentThread(handle),
1405 };
1406
1407 Ok((scheduler, handle, blocking_pool))
1408 }
1409
1410 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1411 if self.metrics_poll_count_histogram_enable {
1412 Some(self.metrics_poll_count_histogram.clone())
1413 } else {
1414 None
1415 }
1416 }
1417}
1418
1419cfg_io_driver! {
1420 impl Builder {
1421 /// Enables the I/O driver.
1422 ///
1423 /// Doing this enables using net, process, signal, and some I/O types on
1424 /// the runtime.
1425 ///
1426 /// # Examples
1427 ///
1428 /// ```
1429 /// use tokio::runtime;
1430 ///
1431 /// let rt = runtime::Builder::new_multi_thread()
1432 /// .enable_io()
1433 /// .build()
1434 /// .unwrap();
1435 /// ```
1436 pub fn enable_io(&mut self) -> &mut Self {
1437 self.enable_io = true;
1438 self
1439 }
1440
1441 /// Enables the I/O driver and configures the max number of events to be
1442 /// processed per tick.
1443 ///
1444 /// # Examples
1445 ///
1446 /// ```
1447 /// use tokio::runtime;
1448 ///
1449 /// let rt = runtime::Builder::new_current_thread()
1450 /// .enable_io()
1451 /// .max_io_events_per_tick(1024)
1452 /// .build()
1453 /// .unwrap();
1454 /// ```
1455 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1456 self.nevents = capacity;
1457 self
1458 }
1459 }
1460}
1461
1462cfg_time! {
1463 impl Builder {
1464 /// Enables the time driver.
1465 ///
1466 /// Doing this enables using `tokio::time` on the runtime.
1467 ///
1468 /// # Examples
1469 ///
1470 /// ```
1471 /// use tokio::runtime;
1472 ///
1473 /// let rt = runtime::Builder::new_multi_thread()
1474 /// .enable_time()
1475 /// .build()
1476 /// .unwrap();
1477 /// ```
1478 pub fn enable_time(&mut self) -> &mut Self {
1479 self.enable_time = true;
1480 self
1481 }
1482 }
1483}
1484
1485cfg_test_util! {
1486 impl Builder {
1487 /// Controls if the runtime's clock starts paused or advancing.
1488 ///
1489 /// Pausing time requires the current-thread runtime; construction of
1490 /// the runtime will panic otherwise.
1491 ///
1492 /// # Examples
1493 ///
1494 /// ```
1495 /// use tokio::runtime;
1496 ///
1497 /// let rt = runtime::Builder::new_current_thread()
1498 /// .enable_time()
1499 /// .start_paused(true)
1500 /// .build()
1501 /// .unwrap();
1502 /// ```
1503 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1504 self.start_paused = start_paused;
1505 self
1506 }
1507 }
1508}
1509
1510cfg_rt_multi_thread! {
1511 impl Builder {
1512 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1513 use crate::loom::sys::num_cpus;
1514 use crate::runtime::{Config, runtime::Scheduler};
1515 use crate::runtime::scheduler::{self, MultiThread};
1516
1517 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1518
1519 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1520
1521 // Create the blocking pool
1522 let blocking_pool =
1523 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1524 let blocking_spawner = blocking_pool.spawner().clone();
1525
1526 // Generate a rng seed for this runtime.
1527 let seed_generator_1 = self.seed_generator.next_generator();
1528 let seed_generator_2 = self.seed_generator.next_generator();
1529
1530 let (scheduler, handle, launch) = MultiThread::new(
1531 core_threads,
1532 driver,
1533 driver_handle,
1534 blocking_spawner,
1535 seed_generator_2,
1536 Config {
1537 before_park: self.before_park.clone(),
1538 after_unpark: self.after_unpark.clone(),
1539 before_spawn: self.before_spawn.clone(),
1540 after_termination: self.after_termination.clone(),
1541 global_queue_interval: self.global_queue_interval,
1542 event_interval: self.event_interval,
1543 local_queue_capacity: self.local_queue_capacity,
1544 #[cfg(tokio_unstable)]
1545 unhandled_panic: self.unhandled_panic.clone(),
1546 disable_lifo_slot: self.disable_lifo_slot,
1547 seed_generator: seed_generator_1,
1548 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1549 },
1550 );
1551
1552 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1553
1554 // Spawn the thread pool workers
1555 let _enter = handle.enter();
1556 launch.launch();
1557
1558 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1559 }
1560
1561 cfg_unstable! {
1562 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1563 use crate::loom::sys::num_cpus;
1564 use crate::runtime::{Config, runtime::Scheduler};
1565 use crate::runtime::scheduler::MultiThreadAlt;
1566
1567 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1568 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1569
1570 // Create the blocking pool
1571 let blocking_pool =
1572 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1573 let blocking_spawner = blocking_pool.spawner().clone();
1574
1575 // Generate a rng seed for this runtime.
1576 let seed_generator_1 = self.seed_generator.next_generator();
1577 let seed_generator_2 = self.seed_generator.next_generator();
1578
1579 let (scheduler, handle) = MultiThreadAlt::new(
1580 core_threads,
1581 driver,
1582 driver_handle,
1583 blocking_spawner,
1584 seed_generator_2,
1585 Config {
1586 before_park: self.before_park.clone(),
1587 after_unpark: self.after_unpark.clone(),
1588 before_spawn: self.before_spawn.clone(),
1589 after_termination: self.after_termination.clone(),
1590 global_queue_interval: self.global_queue_interval,
1591 event_interval: self.event_interval,
1592 local_queue_capacity: self.local_queue_capacity,
1593 #[cfg(tokio_unstable)]
1594 unhandled_panic: self.unhandled_panic.clone(),
1595 disable_lifo_slot: self.disable_lifo_slot,
1596 seed_generator: seed_generator_1,
1597 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1598 },
1599 );
1600
1601 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1602 }
1603 }
1604 }
1605}
1606
1607impl fmt::Debug for Builder {
1608 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1609 fmt.debug_struct("Builder")
1610 .field("worker_threads", &self.worker_threads)
1611 .field("max_blocking_threads", &self.max_blocking_threads)
1612 .field(
1613 "thread_name",
1614 &"<dyn Fn() -> String + Send + Sync + 'static>",
1615 )
1616 .field("thread_stack_size", &self.thread_stack_size)
1617 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1618 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1619 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1620 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1621 .finish()
1622 }
1623}