tokio/runtime/scheduler/
mod.rs

1cfg_rt! {
2    pub(crate) mod current_thread;
3    pub(crate) use current_thread::CurrentThread;
4
5    mod defer;
6    use defer::Defer;
7
8    pub(crate) mod inject;
9    pub(crate) use inject::Inject;
10
11    use crate::runtime::TaskHooks;
12}
13
14cfg_rt_multi_thread! {
15    mod block_in_place;
16    pub(crate) use block_in_place::block_in_place;
17
18    mod lock;
19    use lock::Lock;
20
21    pub(crate) mod multi_thread;
22    pub(crate) use multi_thread::MultiThread;
23
24    cfg_unstable! {
25        pub(crate) mod multi_thread_alt;
26        pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt;
27    }
28}
29
30use crate::runtime::driver;
31
32#[derive(Debug, Clone)]
33pub(crate) enum Handle {
34    #[cfg(feature = "rt")]
35    CurrentThread(Arc<current_thread::Handle>),
36
37    #[cfg(feature = "rt-multi-thread")]
38    MultiThread(Arc<multi_thread::Handle>),
39
40    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
41    MultiThreadAlt(Arc<multi_thread_alt::Handle>),
42
43    // TODO: This is to avoid triggering "dead code" warnings many other places
44    // in the codebase. Remove this during a later cleanup
45    #[cfg(not(feature = "rt"))]
46    #[allow(dead_code)]
47    Disabled,
48}
49
50#[cfg(feature = "rt")]
51pub(super) enum Context {
52    CurrentThread(current_thread::Context),
53
54    #[cfg(feature = "rt-multi-thread")]
55    MultiThread(multi_thread::Context),
56
57    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
58    MultiThreadAlt(multi_thread_alt::Context),
59}
60
61impl Handle {
62    #[cfg_attr(not(feature = "full"), allow(dead_code))]
63    pub(crate) fn driver(&self) -> &driver::Handle {
64        match *self {
65            #[cfg(feature = "rt")]
66            Handle::CurrentThread(ref h) => &h.driver,
67
68            #[cfg(feature = "rt-multi-thread")]
69            Handle::MultiThread(ref h) => &h.driver,
70
71            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
72            Handle::MultiThreadAlt(ref h) => &h.driver,
73
74            #[cfg(not(feature = "rt"))]
75            Handle::Disabled => unreachable!(),
76        }
77    }
78}
79
80cfg_rt! {
81    use crate::future::Future;
82    use crate::loom::sync::Arc;
83    use crate::runtime::{blocking, task::Id};
84    use crate::runtime::context;
85    use crate::task::JoinHandle;
86    use crate::util::RngSeedGenerator;
87    use std::task::Waker;
88
89    macro_rules! match_flavor {
90        ($self:expr, $ty:ident($h:ident) => $e:expr) => {
91            match $self {
92                $ty::CurrentThread($h) => $e,
93
94                #[cfg(feature = "rt-multi-thread")]
95                $ty::MultiThread($h) => $e,
96
97                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
98                $ty::MultiThreadAlt($h) => $e,
99            }
100        }
101    }
102
103    impl Handle {
104        #[track_caller]
105        pub(crate) fn current() -> Handle {
106            match context::with_current(Clone::clone) {
107                Ok(handle) => handle,
108                Err(e) => panic!("{}", e),
109            }
110        }
111
112        pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
113            match_flavor!(self, Handle(h) => &h.blocking_spawner)
114        }
115
116        pub(crate) fn is_local(&self) -> bool {
117            match self {
118                Handle::CurrentThread(h) => h.local_tid.is_some(),
119
120                #[cfg(feature = "rt-multi-thread")]
121                Handle::MultiThread(_) => false,
122
123                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
124                Handle::MultiThreadAlt(_) => false,
125            }
126        }
127
128        /// Returns true if this is a local runtime and the runtime is owned by the current thread.
129        pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
130            match self {
131                Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
132
133                #[cfg(feature = "rt-multi-thread")]
134                Handle::MultiThread(_) => false,
135
136                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
137                Handle::MultiThreadAlt(_) => false,
138            }
139        }
140
141        pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
142        where
143            F: Future + Send + 'static,
144            F::Output: Send + 'static,
145        {
146            match self {
147                Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
148
149                #[cfg(feature = "rt-multi-thread")]
150                Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
151
152                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
153                Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id),
154            }
155        }
156
157        /// Spawn a local task
158        ///
159        /// # Safety
160        /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
161        /// by the current thread.
162        #[allow(irrefutable_let_patterns)]
163        pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
164        where
165            F: Future + 'static,
166            F::Output: 'static,
167        {
168            if let Handle::CurrentThread(h) = self {
169                current_thread::Handle::spawn_local(h, future, id)
170            } else {
171                panic!("Only current_thread and LocalSet have spawn_local internals implemented")
172            }
173        }
174
175        pub(crate) fn shutdown(&self) {
176            match *self {
177                Handle::CurrentThread(_) => {},
178
179                #[cfg(feature = "rt-multi-thread")]
180                Handle::MultiThread(ref h) => h.shutdown(),
181
182                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
183                Handle::MultiThreadAlt(ref h) => h.shutdown(),
184            }
185        }
186
187        pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
188            match_flavor!(self, Handle(h) => &h.seed_generator)
189        }
190
191        pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
192            match self {
193                Handle::CurrentThread(handle) => handle,
194                #[cfg(feature = "rt-multi-thread")]
195                _ => panic!("not a CurrentThread handle"),
196            }
197        }
198
199        pub(crate) fn hooks(&self) -> &TaskHooks {
200            match self {
201                Handle::CurrentThread(h) => &h.task_hooks,
202                #[cfg(feature = "rt-multi-thread")]
203                Handle::MultiThread(h) => &h.task_hooks,
204                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
205                Handle::MultiThreadAlt(h) => &h.task_hooks,
206            }
207        }
208
209        cfg_rt_multi_thread! {
210            cfg_unstable! {
211                pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> {
212                    match self {
213                        Handle::MultiThreadAlt(handle) => handle,
214                        _ => panic!("not a `MultiThreadAlt` handle"),
215                    }
216                }
217            }
218        }
219    }
220
221    impl Handle {
222        pub(crate) fn num_workers(&self) -> usize {
223            match self {
224                Handle::CurrentThread(_) => 1,
225                #[cfg(feature = "rt-multi-thread")]
226                Handle::MultiThread(handle) => handle.num_workers(),
227                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
228                Handle::MultiThreadAlt(handle) => handle.num_workers(),
229            }
230        }
231
232        pub(crate) fn num_alive_tasks(&self) -> usize {
233            match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
234        }
235
236        pub(crate) fn injection_queue_depth(&self) -> usize {
237            match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
238        }
239    }
240
241    cfg_unstable_metrics! {
242        use crate::runtime::{SchedulerMetrics, WorkerMetrics};
243
244        impl Handle {
245            cfg_64bit_metrics! {
246                pub(crate) fn spawned_tasks_count(&self) -> u64 {
247                    match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
248                }
249            }
250
251            pub(crate) fn num_blocking_threads(&self) -> usize {
252                match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
253            }
254
255            pub(crate) fn num_idle_blocking_threads(&self) -> usize {
256                match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
257            }
258
259            pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
260                match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
261            }
262
263            pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
264                match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
265            }
266
267            pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
268                match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
269            }
270
271            pub(crate) fn blocking_queue_depth(&self) -> usize {
272                match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
273            }
274        }
275    }
276
277    impl Context {
278        #[track_caller]
279        pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
280            match self {
281                Context::CurrentThread(context) => context,
282                #[cfg(feature = "rt-multi-thread")]
283                _ => panic!("expected `CurrentThread::Context`")
284            }
285        }
286
287        pub(crate) fn defer(&self, waker: &Waker) {
288            match_flavor!(self, Context(context) => context.defer(waker));
289        }
290
291        cfg_rt_multi_thread! {
292            #[track_caller]
293            pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
294                match self {
295                    Context::MultiThread(context) => context,
296                    _ => panic!("expected `MultiThread::Context`")
297                }
298            }
299
300            cfg_unstable! {
301                #[track_caller]
302                pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context {
303                    match self {
304                        Context::MultiThreadAlt(context) => context,
305                        _ => panic!("expected `MultiThreadAlt::Context`")
306                    }
307                }
308            }
309        }
310    }
311}
312
313cfg_not_rt! {
314    #[cfg(any(
315        feature = "net",
316        all(unix, feature = "process"),
317        all(unix, feature = "signal"),
318        feature = "time",
319    ))]
320    impl Handle {
321        #[track_caller]
322        pub(crate) fn current() -> Handle {
323            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
324        }
325    }
326}