tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
66use crate::runtime::{
67    blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::util::atomic_cell::AtomicCell;
71use crate::util::rand::{FastRand, RngSeedGenerator};
72
73use std::cell::RefCell;
74use std::task::Waker;
75use std::thread;
76use std::time::Duration;
77
78mod metrics;
79
80cfg_taskdump! {
81    mod taskdump;
82}
83
84cfg_not_taskdump! {
85    mod taskdump_mock;
86}
87
88/// A scheduler worker
89pub(super) struct Worker {
90    /// Reference to scheduler's handle
91    handle: Arc<Handle>,
92
93    /// Index holding this worker's remote state
94    index: usize,
95
96    /// Used to hand-off a worker's core to another thread.
97    core: AtomicCell<Core>,
98}
99
100/// Core data
101struct Core {
102    /// Used to schedule bookkeeping tasks every so often.
103    tick: u32,
104
105    /// When a task is scheduled from a worker, it is stored in this slot. The
106    /// worker will check this slot for a task **before** checking the run
107    /// queue. This effectively results in the **last** scheduled task to be run
108    /// next (LIFO). This is an optimization for improving locality which
109    /// benefits message passing patterns and helps to reduce latency.
110    lifo_slot: Option<Notified>,
111
112    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
113    /// they go to the back of the `run_queue`.
114    lifo_enabled: bool,
115
116    /// The worker-local run queue.
117    run_queue: queue::Local<Arc<Handle>>,
118
119    /// True if the worker is currently searching for more work. Searching
120    /// involves attempting to steal from other workers.
121    is_searching: bool,
122
123    /// True if the scheduler is being shutdown
124    is_shutdown: bool,
125
126    /// True if the scheduler is being traced
127    is_traced: bool,
128
129    /// Parker
130    ///
131    /// Stored in an `Option` as the parker is added / removed to make the
132    /// borrow checker happy.
133    park: Option<Parker>,
134
135    /// Per-worker runtime stats
136    stats: Stats,
137
138    /// How often to check the global queue
139    global_queue_interval: u32,
140
141    /// Fast random number generator.
142    rand: FastRand,
143}
144
145/// State shared across all workers
146pub(crate) struct Shared {
147    /// Per-worker remote state. All other workers have access to this and is
148    /// how they communicate between each other.
149    remotes: Box<[Remote]>,
150
151    /// Global task queue used for:
152    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
153    ///  2. Submit work to the scheduler when a worker run queue is saturated
154    pub(super) inject: inject::Shared<Arc<Handle>>,
155
156    /// Coordinates idle workers
157    idle: Idle,
158
159    /// Collection of all active tasks spawned onto this executor.
160    pub(crate) owned: OwnedTasks<Arc<Handle>>,
161
162    /// Data synchronized by the scheduler mutex
163    pub(super) synced: Mutex<Synced>,
164
165    /// Cores that have observed the shutdown signal
166    ///
167    /// The core is **not** placed back in the worker to avoid it from being
168    /// stolen by a thread that was spawned as part of `block_in_place`.
169    #[allow(clippy::vec_box)] // we're moving an already-boxed value
170    shutdown_cores: Mutex<Vec<Box<Core>>>,
171
172    /// The number of cores that have observed the trace signal.
173    pub(super) trace_status: TraceStatus,
174
175    /// Scheduler configuration options
176    config: Config,
177
178    /// Collects metrics from the runtime.
179    pub(super) scheduler_metrics: SchedulerMetrics,
180
181    pub(super) worker_metrics: Box<[WorkerMetrics]>,
182
183    /// Only held to trigger some code on drop. This is used to get internal
184    /// runtime metrics that can be useful when doing performance
185    /// investigations. This does nothing (empty struct, no drop impl) unless
186    /// the `tokio_internal_mt_counters` `cfg` flag is set.
187    _counters: Counters,
188}
189
190/// Data synchronized by the scheduler mutex
191pub(crate) struct Synced {
192    /// Synchronized state for `Idle`.
193    pub(super) idle: idle::Synced,
194
195    /// Synchronized state for `Inject`.
196    pub(crate) inject: inject::Synced,
197}
198
199/// Used to communicate with a worker from other threads.
200struct Remote {
201    /// Steals tasks from this worker.
202    pub(super) steal: queue::Steal<Arc<Handle>>,
203
204    /// Unparks the associated worker thread
205    unpark: Unparker,
206}
207
208/// Thread-local context
209pub(crate) struct Context {
210    /// Worker
211    worker: Arc<Worker>,
212
213    /// Core data
214    core: RefCell<Option<Box<Core>>>,
215
216    /// Tasks to wake after resource drivers are polled. This is mostly to
217    /// handle yielded tasks.
218    pub(crate) defer: Defer,
219}
220
221/// Starts the workers
222pub(crate) struct Launch(Vec<Arc<Worker>>);
223
224/// Running a task may consume the core. If the core is still available when
225/// running the task completes, it is returned. Otherwise, the worker will need
226/// to stop processing.
227type RunResult = Result<Box<Core>, ()>;
228
229/// A task handle
230type Task = task::Task<Arc<Handle>>;
231
232/// A notified task handle
233type Notified = task::Notified<Arc<Handle>>;
234
235/// Value picked out of thin-air. Running the LIFO slot a handful of times
236/// seems sufficient to benefit from locality. More than 3 times probably is
237/// overweighing. The value can be tuned in the future with data that shows
238/// improvements.
239const MAX_LIFO_POLLS_PER_TICK: usize = 3;
240
241pub(super) fn create(
242    size: usize,
243    park: Parker,
244    driver_handle: driver::Handle,
245    blocking_spawner: blocking::Spawner,
246    seed_generator: RngSeedGenerator,
247    config: Config,
248) -> (Arc<Handle>, Launch) {
249    let mut cores = Vec::with_capacity(size);
250    let mut remotes = Vec::with_capacity(size);
251    let mut worker_metrics = Vec::with_capacity(size);
252
253    // Create the local queues
254    for _ in 0..size {
255        let (steal, run_queue) = queue::local();
256
257        let park = park.clone();
258        let unpark = park.unpark();
259        let metrics = WorkerMetrics::from_config(&config);
260        let stats = Stats::new(&metrics);
261
262        cores.push(Box::new(Core {
263            tick: 0,
264            lifo_slot: None,
265            lifo_enabled: !config.disable_lifo_slot,
266            run_queue,
267            is_searching: false,
268            is_shutdown: false,
269            is_traced: false,
270            park: Some(park),
271            global_queue_interval: stats.tuned_global_queue_interval(&config),
272            stats,
273            rand: FastRand::from_seed(config.seed_generator.next_seed()),
274        }));
275
276        remotes.push(Remote { steal, unpark });
277        worker_metrics.push(metrics);
278    }
279
280    let (idle, idle_synced) = Idle::new(size);
281    let (inject, inject_synced) = inject::Shared::new();
282
283    let remotes_len = remotes.len();
284    let handle = Arc::new(Handle {
285        task_hooks: TaskHooks {
286            task_spawn_callback: config.before_spawn.clone(),
287            task_terminate_callback: config.after_termination.clone(),
288        },
289        shared: Shared {
290            remotes: remotes.into_boxed_slice(),
291            inject,
292            idle,
293            owned: OwnedTasks::new(size),
294            synced: Mutex::new(Synced {
295                idle: idle_synced,
296                inject: inject_synced,
297            }),
298            shutdown_cores: Mutex::new(vec![]),
299            trace_status: TraceStatus::new(remotes_len),
300            config,
301            scheduler_metrics: SchedulerMetrics::new(),
302            worker_metrics: worker_metrics.into_boxed_slice(),
303            _counters: Counters,
304        },
305        driver: driver_handle,
306        blocking_spawner,
307        seed_generator,
308    });
309
310    let mut launch = Launch(vec![]);
311
312    for (index, core) in cores.drain(..).enumerate() {
313        launch.0.push(Arc::new(Worker {
314            handle: handle.clone(),
315            index,
316            core: AtomicCell::new(Some(core)),
317        }));
318    }
319
320    (handle, launch)
321}
322
323#[track_caller]
324pub(crate) fn block_in_place<F, R>(f: F) -> R
325where
326    F: FnOnce() -> R,
327{
328    // Try to steal the worker core back
329    struct Reset {
330        take_core: bool,
331        budget: coop::Budget,
332    }
333
334    impl Drop for Reset {
335        fn drop(&mut self) {
336            with_current(|maybe_cx| {
337                if let Some(cx) = maybe_cx {
338                    if self.take_core {
339                        let core = cx.worker.core.take();
340
341                        if core.is_some() {
342                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
343                                .set_thread_id(thread::current().id());
344                        }
345
346                        let mut cx_core = cx.core.borrow_mut();
347                        assert!(cx_core.is_none());
348                        *cx_core = core;
349                    }
350
351                    // Reset the task budget as we are re-entering the
352                    // runtime.
353                    coop::set(self.budget);
354                }
355            });
356        }
357    }
358
359    let mut had_entered = false;
360    let mut take_core = false;
361
362    let setup_result = with_current(|maybe_cx| {
363        match (
364            crate::runtime::context::current_enter_context(),
365            maybe_cx.is_some(),
366        ) {
367            (context::EnterRuntime::Entered { .. }, true) => {
368                // We are on a thread pool runtime thread, so we just need to
369                // set up blocking.
370                had_entered = true;
371            }
372            (
373                context::EnterRuntime::Entered {
374                    allow_block_in_place,
375                },
376                false,
377            ) => {
378                // We are on an executor, but _not_ on the thread pool.  That is
379                // _only_ okay if we are in a thread pool runtime's block_on
380                // method:
381                if allow_block_in_place {
382                    had_entered = true;
383                    return Ok(());
384                } else {
385                    // This probably means we are on the current_thread runtime or in a
386                    // LocalSet, where it is _not_ okay to block.
387                    return Err(
388                        "can call blocking only when running on the multi-threaded runtime",
389                    );
390                }
391            }
392            (context::EnterRuntime::NotEntered, true) => {
393                // This is a nested call to block_in_place (we already exited).
394                // All the necessary setup has already been done.
395                return Ok(());
396            }
397            (context::EnterRuntime::NotEntered, false) => {
398                // We are outside of the tokio runtime, so blocking is fine.
399                // We can also skip all of the thread pool blocking setup steps.
400                return Ok(());
401            }
402        }
403
404        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
405
406        // Get the worker core. If none is set, then blocking is fine!
407        let mut core = match cx.core.borrow_mut().take() {
408            Some(core) => core,
409            None => return Ok(()),
410        };
411
412        // If we heavily call `spawn_blocking`, there might be no available thread to
413        // run this core. Except for the task in the lifo_slot, all tasks can be
414        // stolen, so we move the task out of the lifo_slot to the run_queue.
415        if let Some(task) = core.lifo_slot.take() {
416            core.run_queue
417                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
418        }
419
420        // We are taking the core from the context and sending it to another
421        // thread.
422        take_core = true;
423
424        // The parker should be set here
425        assert!(core.park.is_some());
426
427        // In order to block, the core must be sent to another thread for
428        // execution.
429        //
430        // First, move the core back into the worker's shared core slot.
431        cx.worker.core.set(core);
432
433        // Next, clone the worker handle and send it to a new thread for
434        // processing.
435        //
436        // Once the blocking task is done executing, we will attempt to
437        // steal the core back.
438        let worker = cx.worker.clone();
439        runtime::spawn_blocking(move || run(worker));
440        Ok(())
441    });
442
443    if let Err(panic_message) = setup_result {
444        panic!("{}", panic_message);
445    }
446
447    if had_entered {
448        // Unset the current task's budget. Blocking sections are not
449        // constrained by task budgets.
450        let _reset = Reset {
451            take_core,
452            budget: coop::stop(),
453        };
454
455        crate::runtime::context::exit_runtime(f)
456    } else {
457        f()
458    }
459}
460
461impl Launch {
462    pub(crate) fn launch(mut self) {
463        for worker in self.0.drain(..) {
464            runtime::spawn_blocking(move || run(worker));
465        }
466    }
467}
468
469fn run(worker: Arc<Worker>) {
470    #[allow(dead_code)]
471    struct AbortOnPanic;
472
473    impl Drop for AbortOnPanic {
474        fn drop(&mut self) {
475            if std::thread::panicking() {
476                eprintln!("worker thread panicking; aborting process");
477                std::process::abort();
478            }
479        }
480    }
481
482    // Catching panics on worker threads in tests is quite tricky. Instead, when
483    // debug assertions are enabled, we just abort the process.
484    #[cfg(debug_assertions)]
485    let _abort_on_panic = AbortOnPanic;
486
487    // Acquire a core. If this fails, then another thread is running this
488    // worker and there is nothing further to do.
489    let core = match worker.core.take() {
490        Some(core) => core,
491        None => return,
492    };
493
494    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
495
496    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
497
498    crate::runtime::context::enter_runtime(&handle, true, |_| {
499        // Set the worker context.
500        let cx = scheduler::Context::MultiThread(Context {
501            worker,
502            core: RefCell::new(None),
503            defer: Defer::new(),
504        });
505
506        context::set_scheduler(&cx, || {
507            let cx = cx.expect_multi_thread();
508
509            // This should always be an error. It only returns a `Result` to support
510            // using `?` to short circuit.
511            assert!(cx.run(core).is_err());
512
513            // Check if there are any deferred tasks to notify. This can happen when
514            // the worker core is lost due to `block_in_place()` being called from
515            // within the task.
516            cx.defer.wake();
517        });
518    });
519}
520
521impl Context {
522    fn run(&self, mut core: Box<Core>) -> RunResult {
523        // Reset `lifo_enabled` here in case the core was previously stolen from
524        // a task that had the LIFO slot disabled.
525        self.reset_lifo_enabled(&mut core);
526
527        // Start as "processing" tasks as polling tasks from the local queue
528        // will be one of the first things we do.
529        core.stats.start_processing_scheduled_tasks();
530
531        while !core.is_shutdown {
532            self.assert_lifo_enabled_is_correct(&core);
533
534            if core.is_traced {
535                core = self.worker.handle.trace_core(core);
536            }
537
538            // Increment the tick
539            core.tick();
540
541            // Run maintenance, if needed
542            core = self.maintenance(core);
543
544            // First, check work available to the current worker.
545            if let Some(task) = core.next_task(&self.worker) {
546                core = self.run_task(task, core)?;
547                continue;
548            }
549
550            // We consumed all work in the queues and will start searching for work.
551            core.stats.end_processing_scheduled_tasks();
552
553            // There is no more **local** work to process, try to steal work
554            // from other workers.
555            if let Some(task) = core.steal_work(&self.worker) {
556                // Found work, switch back to processing
557                core.stats.start_processing_scheduled_tasks();
558                core = self.run_task(task, core)?;
559            } else {
560                // Wait for work
561                core = if !self.defer.is_empty() {
562                    self.park_timeout(core, Some(Duration::from_millis(0)))
563                } else {
564                    self.park(core)
565                };
566                core.stats.start_processing_scheduled_tasks();
567            }
568        }
569
570        core.pre_shutdown(&self.worker);
571        // Signal shutdown
572        self.worker.handle.shutdown_core(core);
573        Err(())
574    }
575
576    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
577        let task = self.worker.handle.shared.owned.assert_owner(task);
578
579        // Make sure the worker is not in the **searching** state. This enables
580        // another idle worker to try to steal work.
581        core.transition_from_searching(&self.worker);
582
583        self.assert_lifo_enabled_is_correct(&core);
584
585        // Measure the poll start time. Note that we may end up polling other
586        // tasks under this measurement. In this case, the tasks came from the
587        // LIFO slot and are considered part of the current task for scheduling
588        // purposes. These tasks inherent the "parent"'s limits.
589        core.stats.start_poll();
590
591        // Make the core available to the runtime context
592        *self.core.borrow_mut() = Some(core);
593
594        // Run the task
595        coop::budget(|| {
596            task.run();
597            let mut lifo_polls = 0;
598
599            // As long as there is budget remaining and a task exists in the
600            // `lifo_slot`, then keep running.
601            loop {
602                // Check if we still have the core. If not, the core was stolen
603                // by another worker.
604                let mut core = match self.core.borrow_mut().take() {
605                    Some(core) => core,
606                    None => {
607                        // In this case, we cannot call `reset_lifo_enabled()`
608                        // because the core was stolen. The stealer will handle
609                        // that at the top of `Context::run`
610                        return Err(());
611                    }
612                };
613
614                // Check for a task in the LIFO slot
615                let task = match core.lifo_slot.take() {
616                    Some(task) => task,
617                    None => {
618                        self.reset_lifo_enabled(&mut core);
619                        core.stats.end_poll();
620                        return Ok(core);
621                    }
622                };
623
624                if !coop::has_budget_remaining() {
625                    core.stats.end_poll();
626
627                    // Not enough budget left to run the LIFO task, push it to
628                    // the back of the queue and return.
629                    core.run_queue.push_back_or_overflow(
630                        task,
631                        &*self.worker.handle,
632                        &mut core.stats,
633                    );
634                    // If we hit this point, the LIFO slot should be enabled.
635                    // There is no need to reset it.
636                    debug_assert!(core.lifo_enabled);
637                    return Ok(core);
638                }
639
640                // Track that we are about to run a task from the LIFO slot.
641                lifo_polls += 1;
642                super::counters::inc_lifo_schedules();
643
644                // Disable the LIFO slot if we reach our limit
645                //
646                // In ping-ping style workloads where task A notifies task B,
647                // which notifies task A again, continuously prioritizing the
648                // LIFO slot can cause starvation as these two tasks will
649                // repeatedly schedule the other. To mitigate this, we limit the
650                // number of times the LIFO slot is prioritized.
651                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
652                    core.lifo_enabled = false;
653                    super::counters::inc_lifo_capped();
654                }
655
656                // Run the LIFO task, then loop
657                *self.core.borrow_mut() = Some(core);
658                let task = self.worker.handle.shared.owned.assert_owner(task);
659                task.run();
660            }
661        })
662    }
663
664    fn reset_lifo_enabled(&self, core: &mut Core) {
665        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
666    }
667
668    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
669        debug_assert_eq!(
670            core.lifo_enabled,
671            !self.worker.handle.shared.config.disable_lifo_slot
672        );
673    }
674
675    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
676        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
677            super::counters::inc_num_maintenance();
678
679            core.stats.end_processing_scheduled_tasks();
680
681            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
682            // to run without actually putting the thread to sleep.
683            core = self.park_timeout(core, Some(Duration::from_millis(0)));
684
685            // Run regularly scheduled maintenance
686            core.maintenance(&self.worker);
687
688            core.stats.start_processing_scheduled_tasks();
689        }
690
691        core
692    }
693
694    /// Parks the worker thread while waiting for tasks to execute.
695    ///
696    /// This function checks if indeed there's no more work left to be done before parking.
697    /// Also important to notice that, before parking, the worker thread will try to take
698    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
699    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
700    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
701    /// When the local queue is saturated, the overflow tasks are added to the injection queue
702    /// from where other workers can pick them up.
703    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
704    /// after all the IOs get dispatched
705    fn park(&self, mut core: Box<Core>) -> Box<Core> {
706        if let Some(f) = &self.worker.handle.shared.config.before_park {
707            f();
708        }
709
710        if core.transition_to_parked(&self.worker) {
711            while !core.is_shutdown && !core.is_traced {
712                core.stats.about_to_park();
713                core.stats
714                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
715
716                core = self.park_timeout(core, None);
717
718                core.stats.unparked();
719
720                // Run regularly scheduled maintenance
721                core.maintenance(&self.worker);
722
723                if core.transition_from_parked(&self.worker) {
724                    break;
725                }
726            }
727        }
728
729        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
730            f();
731        }
732        core
733    }
734
735    fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
736        self.assert_lifo_enabled_is_correct(&core);
737
738        // Take the parker out of core
739        let mut park = core.park.take().expect("park missing");
740
741        // Store `core` in context
742        *self.core.borrow_mut() = Some(core);
743
744        // Park thread
745        if let Some(timeout) = duration {
746            park.park_timeout(&self.worker.handle.driver, timeout);
747        } else {
748            park.park(&self.worker.handle.driver);
749        }
750
751        self.defer.wake();
752
753        // Remove `core` from context
754        core = self.core.borrow_mut().take().expect("core missing");
755
756        // Place `park` back in `core`
757        core.park = Some(park);
758
759        if core.should_notify_others() {
760            self.worker.handle.notify_parked_local();
761        }
762
763        core
764    }
765
766    pub(crate) fn defer(&self, waker: &Waker) {
767        self.defer.defer(waker);
768    }
769
770    #[allow(dead_code)]
771    pub(crate) fn get_worker_index(&self) -> usize {
772        self.worker.index
773    }
774}
775
776impl Core {
777    /// Increment the tick
778    fn tick(&mut self) {
779        self.tick = self.tick.wrapping_add(1);
780    }
781
782    /// Return the next notified task available to this worker.
783    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
784        if self.tick % self.global_queue_interval == 0 {
785            // Update the global queue interval, if needed
786            self.tune_global_queue_interval(worker);
787
788            worker
789                .handle
790                .next_remote_task()
791                .or_else(|| self.next_local_task())
792        } else {
793            let maybe_task = self.next_local_task();
794
795            if maybe_task.is_some() {
796                return maybe_task;
797            }
798
799            if worker.inject().is_empty() {
800                return None;
801            }
802
803            // Other threads can only **remove** tasks from the current worker's
804            // `run_queue`. So, we can be confident that by the time we call
805            // `run_queue.push_back` below, there will be *at least* `cap`
806            // available slots in the queue.
807            let cap = usize::min(
808                self.run_queue.remaining_slots(),
809                self.run_queue.max_capacity() / 2,
810            );
811
812            // The worker is currently idle, pull a batch of work from the
813            // injection queue. We don't want to pull *all* the work so other
814            // workers can also get some.
815            let n = usize::min(
816                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
817                cap,
818            );
819
820            // Take at least one task since the first task is returned directly
821            // and not pushed onto the local queue.
822            let n = usize::max(1, n);
823
824            let mut synced = worker.handle.shared.synced.lock();
825            // safety: passing in the correct `inject::Synced`.
826            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
827
828            // Pop the first task to return immediately
829            let ret = tasks.next();
830
831            // Push the rest of the on the run queue
832            self.run_queue.push_back(tasks);
833
834            ret
835        }
836    }
837
838    fn next_local_task(&mut self) -> Option<Notified> {
839        self.lifo_slot.take().or_else(|| self.run_queue.pop())
840    }
841
842    /// Function responsible for stealing tasks from another worker
843    ///
844    /// Note: Only if less than half the workers are searching for tasks to steal
845    /// a new worker will actually try to steal. The idea is to make sure not all
846    /// workers will be trying to steal at the same time.
847    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
848        if !self.transition_to_searching(worker) {
849            return None;
850        }
851
852        let num = worker.handle.shared.remotes.len();
853        // Start from a random worker
854        let start = self.rand.fastrand_n(num as u32) as usize;
855
856        for i in 0..num {
857            let i = (start + i) % num;
858
859            // Don't steal from ourself! We know we don't have work.
860            if i == worker.index {
861                continue;
862            }
863
864            let target = &worker.handle.shared.remotes[i];
865            if let Some(task) = target
866                .steal
867                .steal_into(&mut self.run_queue, &mut self.stats)
868            {
869                return Some(task);
870            }
871        }
872
873        // Fallback on checking the global queue
874        worker.handle.next_remote_task()
875    }
876
877    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
878        if !self.is_searching {
879            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
880        }
881
882        self.is_searching
883    }
884
885    fn transition_from_searching(&mut self, worker: &Worker) {
886        if !self.is_searching {
887            return;
888        }
889
890        self.is_searching = false;
891        worker.handle.transition_worker_from_searching();
892    }
893
894    fn has_tasks(&self) -> bool {
895        self.lifo_slot.is_some() || self.run_queue.has_tasks()
896    }
897
898    fn should_notify_others(&self) -> bool {
899        // If there are tasks available to steal, but this worker is not
900        // looking for tasks to steal, notify another worker.
901        if self.is_searching {
902            return false;
903        }
904        self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
905    }
906
907    /// Prepares the worker state for parking.
908    ///
909    /// Returns true if the transition happened, false if there is work to do first.
910    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
911        // Workers should not park if they have work to do
912        if self.has_tasks() || self.is_traced {
913            return false;
914        }
915
916        // When the final worker transitions **out** of searching to parked, it
917        // must check all the queues one last time in case work materialized
918        // between the last work scan and transitioning out of searching.
919        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
920            &worker.handle.shared,
921            worker.index,
922            self.is_searching,
923        );
924
925        // The worker is no longer searching. Setting this is the local cache
926        // only.
927        self.is_searching = false;
928
929        if is_last_searcher {
930            worker.handle.notify_if_work_pending();
931        }
932
933        true
934    }
935
936    /// Returns `true` if the transition happened.
937    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
938        // If a task is in the lifo slot/run queue, then we must unpark regardless of
939        // being notified
940        if self.has_tasks() {
941            // When a worker wakes, it should only transition to the "searching"
942            // state when the wake originates from another worker *or* a new task
943            // is pushed. We do *not* want the worker to transition to "searching"
944            // when it wakes when the I/O driver receives new events.
945            self.is_searching = !worker
946                .handle
947                .shared
948                .idle
949                .unpark_worker_by_id(&worker.handle.shared, worker.index);
950            return true;
951        }
952
953        if worker
954            .handle
955            .shared
956            .idle
957            .is_parked(&worker.handle.shared, worker.index)
958        {
959            return false;
960        }
961
962        // When unparked, the worker is in the searching state.
963        self.is_searching = true;
964        true
965    }
966
967    /// Runs maintenance work such as checking the pool's state.
968    fn maintenance(&mut self, worker: &Worker) {
969        self.stats
970            .submit(&worker.handle.shared.worker_metrics[worker.index]);
971
972        if !self.is_shutdown {
973            // Check if the scheduler has been shutdown
974            let synced = worker.handle.shared.synced.lock();
975            self.is_shutdown = worker.inject().is_closed(&synced.inject);
976        }
977
978        if !self.is_traced {
979            // Check if the worker should be tracing.
980            self.is_traced = worker.handle.shared.trace_status.trace_requested();
981        }
982    }
983
984    /// Signals all tasks to shut down, and waits for them to complete. Must run
985    /// before we enter the single-threaded phase of shutdown processing.
986    fn pre_shutdown(&mut self, worker: &Worker) {
987        // Start from a random inner list
988        let start = self
989            .rand
990            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
991        // Signal to all tasks to shut down.
992        worker
993            .handle
994            .shared
995            .owned
996            .close_and_shutdown_all(start as usize);
997
998        self.stats
999            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1000    }
1001
1002    /// Shuts down the core.
1003    fn shutdown(&mut self, handle: &Handle) {
1004        // Take the core
1005        let mut park = self.park.take().expect("park missing");
1006
1007        // Drain the queue
1008        while self.next_local_task().is_some() {}
1009
1010        park.shutdown(&handle.driver);
1011    }
1012
1013    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1014        let next = self
1015            .stats
1016            .tuned_global_queue_interval(&worker.handle.shared.config);
1017
1018        // Smooth out jitter
1019        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1020            self.global_queue_interval = next;
1021        }
1022    }
1023}
1024
1025impl Worker {
1026    /// Returns a reference to the scheduler's injection queue.
1027    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1028        &self.handle.shared.inject
1029    }
1030}
1031
1032// TODO: Move `Handle` impls into handle.rs
1033impl task::Schedule for Arc<Handle> {
1034    fn release(&self, task: &Task) -> Option<Task> {
1035        self.shared.owned.remove(task)
1036    }
1037
1038    fn schedule(&self, task: Notified) {
1039        self.schedule_task(task, false);
1040    }
1041
1042    fn hooks(&self) -> TaskHarnessScheduleHooks {
1043        TaskHarnessScheduleHooks {
1044            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1045        }
1046    }
1047
1048    fn yield_now(&self, task: Notified) {
1049        self.schedule_task(task, true);
1050    }
1051}
1052
1053impl Handle {
1054    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1055        with_current(|maybe_cx| {
1056            if let Some(cx) = maybe_cx {
1057                // Make sure the task is part of the **current** scheduler.
1058                if self.ptr_eq(&cx.worker.handle) {
1059                    // And the current thread still holds a core
1060                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1061                        self.schedule_local(core, task, is_yield);
1062                        return;
1063                    }
1064                }
1065            }
1066
1067            // Otherwise, use the inject queue.
1068            self.push_remote_task(task);
1069            self.notify_parked_remote();
1070        });
1071    }
1072
1073    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1074        if let Some(task) = task {
1075            self.schedule_task(task, false);
1076        }
1077    }
1078
1079    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1080        core.stats.inc_local_schedule_count();
1081
1082        // Spawning from the worker thread. If scheduling a "yield" then the
1083        // task must always be pushed to the back of the queue, enabling other
1084        // tasks to be executed. If **not** a yield, then there is more
1085        // flexibility and the task may go to the front of the queue.
1086        let should_notify = if is_yield || !core.lifo_enabled {
1087            core.run_queue
1088                .push_back_or_overflow(task, self, &mut core.stats);
1089            true
1090        } else {
1091            // Push to the LIFO slot
1092            let prev = core.lifo_slot.take();
1093            let ret = prev.is_some();
1094
1095            if let Some(prev) = prev {
1096                core.run_queue
1097                    .push_back_or_overflow(prev, self, &mut core.stats);
1098            }
1099
1100            core.lifo_slot = Some(task);
1101
1102            ret
1103        };
1104
1105        // Only notify if not currently parked. If `park` is `None`, then the
1106        // scheduling is from a resource driver. As notifications often come in
1107        // batches, the notification is delayed until the park is complete.
1108        if should_notify && core.park.is_some() {
1109            self.notify_parked_local();
1110        }
1111    }
1112
1113    fn next_remote_task(&self) -> Option<Notified> {
1114        if self.shared.inject.is_empty() {
1115            return None;
1116        }
1117
1118        let mut synced = self.shared.synced.lock();
1119        // safety: passing in correct `idle::Synced`
1120        unsafe { self.shared.inject.pop(&mut synced.inject) }
1121    }
1122
1123    fn push_remote_task(&self, task: Notified) {
1124        self.shared.scheduler_metrics.inc_remote_schedule_count();
1125
1126        let mut synced = self.shared.synced.lock();
1127        // safety: passing in correct `idle::Synced`
1128        unsafe {
1129            self.shared.inject.push(&mut synced.inject, task);
1130        }
1131    }
1132
1133    pub(super) fn close(&self) {
1134        if self
1135            .shared
1136            .inject
1137            .close(&mut self.shared.synced.lock().inject)
1138        {
1139            self.notify_all();
1140        }
1141    }
1142
1143    fn notify_parked_local(&self) {
1144        super::counters::inc_num_inc_notify_local();
1145
1146        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1147            super::counters::inc_num_unparks_local();
1148            self.shared.remotes[index].unpark.unpark(&self.driver);
1149        }
1150    }
1151
1152    fn notify_parked_remote(&self) {
1153        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1154            self.shared.remotes[index].unpark.unpark(&self.driver);
1155        }
1156    }
1157
1158    pub(super) fn notify_all(&self) {
1159        for remote in &self.shared.remotes[..] {
1160            remote.unpark.unpark(&self.driver);
1161        }
1162    }
1163
1164    fn notify_if_work_pending(&self) {
1165        for remote in &self.shared.remotes[..] {
1166            if !remote.steal.is_empty() {
1167                self.notify_parked_local();
1168                return;
1169            }
1170        }
1171
1172        if !self.shared.inject.is_empty() {
1173            self.notify_parked_local();
1174        }
1175    }
1176
1177    fn transition_worker_from_searching(&self) {
1178        if self.shared.idle.transition_worker_from_searching() {
1179            // We are the final searching worker. Because work was found, we
1180            // need to notify another worker.
1181            self.notify_parked_local();
1182        }
1183    }
1184
1185    /// Signals that a worker has observed the shutdown signal and has replaced
1186    /// its core back into its handle.
1187    ///
1188    /// If all workers have reached this point, the final cleanup is performed.
1189    fn shutdown_core(&self, core: Box<Core>) {
1190        let mut cores = self.shared.shutdown_cores.lock();
1191        cores.push(core);
1192
1193        if cores.len() != self.shared.remotes.len() {
1194            return;
1195        }
1196
1197        debug_assert!(self.shared.owned.is_empty());
1198
1199        for mut core in cores.drain(..) {
1200            core.shutdown(self);
1201        }
1202
1203        // Drain the injection queue
1204        //
1205        // We already shut down every task, so we can simply drop the tasks.
1206        while let Some(task) = self.next_remote_task() {
1207            drop(task);
1208        }
1209    }
1210
1211    fn ptr_eq(&self, other: &Handle) -> bool {
1212        std::ptr::eq(self, other)
1213    }
1214}
1215
1216impl Overflow<Arc<Handle>> for Handle {
1217    fn push(&self, task: task::Notified<Arc<Handle>>) {
1218        self.push_remote_task(task);
1219    }
1220
1221    fn push_batch<I>(&self, iter: I)
1222    where
1223        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1224    {
1225        unsafe {
1226            self.shared.inject.push_batch(self, iter);
1227        }
1228    }
1229}
1230
1231pub(crate) struct InjectGuard<'a> {
1232    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1233}
1234
1235impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1236    fn as_mut(&mut self) -> &mut inject::Synced {
1237        &mut self.lock.inject
1238    }
1239}
1240
1241impl<'a> Lock<inject::Synced> for &'a Handle {
1242    type Handle = InjectGuard<'a>;
1243
1244    fn lock(self) -> Self::Handle {
1245        InjectGuard {
1246            lock: self.shared.synced.lock(),
1247        }
1248    }
1249}
1250
1251#[track_caller]
1252fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1253    use scheduler::Context::MultiThread;
1254
1255    context::with_scheduler(|ctx| match ctx {
1256        Some(MultiThread(ctx)) => f(Some(ctx)),
1257        _ => f(None),
1258    })
1259}