tokio/runtime/
driver.rs

1//! Abstracts out the entire chain of runtime sub-drivers into common types.
2
3// Eventually, this file will see significant refactoring / cleanup. For now, we
4// don't need to worry much about dead code with certain feature permutations.
5#![cfg_attr(
6    any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
7    allow(dead_code)
8)]
9
10use crate::runtime::park::{ParkThread, UnparkThread};
11
12use std::io;
13use std::time::Duration;
14
15#[derive(Debug)]
16pub(crate) struct Driver {
17    inner: TimeDriver,
18}
19
20#[derive(Debug)]
21pub(crate) struct Handle {
22    /// IO driver handle
23    pub(crate) io: IoHandle,
24
25    /// Signal driver handle
26    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
27    pub(crate) signal: SignalHandle,
28
29    /// Time driver handle
30    pub(crate) time: TimeHandle,
31
32    /// Source of `Instant::now()`
33    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
34    pub(crate) clock: Clock,
35}
36
37pub(crate) struct Cfg {
38    pub(crate) enable_io: bool,
39    pub(crate) enable_time: bool,
40    pub(crate) enable_pause_time: bool,
41    pub(crate) start_paused: bool,
42    pub(crate) nevents: usize,
43    pub(crate) workers: usize,
44}
45
46impl Driver {
47    pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48        let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
49
50        let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
51
52        let (time_driver, time_handle) =
53            create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
54
55        Ok((
56            Self { inner: time_driver },
57            Handle {
58                io: io_handle,
59                signal: signal_handle,
60                time: time_handle,
61                clock,
62            },
63        ))
64    }
65
66    pub(crate) fn is_enabled(&self) -> bool {
67        self.inner.is_enabled()
68    }
69
70    pub(crate) fn park(&mut self, handle: &Handle) {
71        self.inner.park(handle);
72    }
73
74    pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
75        self.inner.park_timeout(handle, duration);
76    }
77
78    pub(crate) fn shutdown(&mut self, handle: &Handle) {
79        self.inner.shutdown(handle);
80    }
81}
82
83impl Handle {
84    pub(crate) fn unpark(&self) {
85        #[cfg(feature = "time")]
86        if let Some(handle) = &self.time {
87            handle.unpark();
88        }
89
90        self.io.unpark();
91    }
92
93    cfg_io_driver! {
94        #[track_caller]
95        pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
96            self.io
97                .as_ref()
98                .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
99        }
100    }
101
102    cfg_signal_internal_and_unix! {
103        #[track_caller]
104        pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle {
105            self.signal
106                .as_ref()
107                .expect("there is no signal driver running, must be called from the context of Tokio runtime")
108        }
109    }
110
111    cfg_time! {
112        /// Returns a reference to the time driver handle.
113        ///
114        /// Panics if no time driver is present.
115        #[track_caller]
116        pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
117            self.time
118                .as_ref()
119                .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
120        }
121
122        pub(crate) fn clock(&self) -> &Clock {
123            &self.clock
124        }
125    }
126}
127
128// ===== io driver =====
129
130cfg_io_driver! {
131    pub(crate) type IoDriver = crate::runtime::io::Driver;
132
133    #[derive(Debug)]
134    pub(crate) enum IoStack {
135        Enabled(ProcessDriver),
136        Disabled(ParkThread),
137    }
138
139    #[derive(Debug)]
140    pub(crate) enum IoHandle {
141        Enabled(crate::runtime::io::Handle),
142        Disabled(UnparkThread),
143    }
144
145    fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
146        #[cfg(loom)]
147        assert!(!enabled);
148
149        let ret = if enabled {
150            let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
151
152            let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
153            let process_driver = create_process_driver(signal_driver);
154
155            (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
156        } else {
157            let park_thread = ParkThread::new();
158            let unpark_thread = park_thread.unpark();
159            (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
160        };
161
162        Ok(ret)
163    }
164
165    impl IoStack {
166        pub(crate) fn is_enabled(&self) -> bool {
167            match self {
168                IoStack::Enabled(..) => true,
169                IoStack::Disabled(..) => false,
170            }
171        }
172
173        pub(crate) fn park(&mut self, handle: &Handle) {
174            match self {
175                IoStack::Enabled(v) => v.park(handle),
176                IoStack::Disabled(v) => v.park(),
177            }
178        }
179
180        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
181            match self {
182                IoStack::Enabled(v) => v.park_timeout(handle, duration),
183                IoStack::Disabled(v) => v.park_timeout(duration),
184            }
185        }
186
187        pub(crate) fn shutdown(&mut self, handle: &Handle) {
188            match self {
189                IoStack::Enabled(v) => v.shutdown(handle),
190                IoStack::Disabled(v) => v.shutdown(),
191            }
192        }
193    }
194
195    impl IoHandle {
196        pub(crate) fn unpark(&self) {
197            match self {
198                IoHandle::Enabled(handle) => handle.unpark(),
199                IoHandle::Disabled(handle) => handle.unpark(),
200            }
201        }
202
203        pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
204            match self {
205                IoHandle::Enabled(v) => Some(v),
206                IoHandle::Disabled(..) => None,
207            }
208        }
209    }
210}
211
212cfg_not_io_driver! {
213    pub(crate) type IoHandle = UnparkThread;
214
215    #[derive(Debug)]
216    pub(crate) struct IoStack(ParkThread);
217
218    fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
219        let park_thread = ParkThread::new();
220        let unpark_thread = park_thread.unpark();
221        Ok((IoStack(park_thread), unpark_thread, Default::default()))
222    }
223
224    impl IoStack {
225        pub(crate) fn park(&mut self, _handle: &Handle) {
226            self.0.park();
227        }
228
229        pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
230            self.0.park_timeout(duration);
231        }
232
233        pub(crate) fn shutdown(&mut self, _handle: &Handle) {
234            self.0.shutdown();
235        }
236
237        /// This is not a "real" driver, so it is not considered enabled.
238        pub(crate) fn is_enabled(&self) -> bool {
239            false
240        }
241    }
242}
243
244// ===== signal driver =====
245
246cfg_signal_internal_and_unix! {
247    type SignalDriver = crate::runtime::signal::Driver;
248    pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;
249
250    fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
251        let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
252        let handle = driver.handle();
253        Ok((driver, Some(handle)))
254    }
255}
256
257cfg_not_signal_internal! {
258    pub(crate) type SignalHandle = ();
259
260    cfg_io_driver! {
261        type SignalDriver = IoDriver;
262
263        fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
264            Ok((io_driver, ()))
265        }
266    }
267}
268
269// ===== process driver =====
270
271cfg_process_driver! {
272    type ProcessDriver = crate::runtime::process::Driver;
273
274    fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
275        ProcessDriver::new(signal_driver)
276    }
277}
278
279cfg_not_process_driver! {
280    cfg_io_driver! {
281        type ProcessDriver = SignalDriver;
282
283        fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
284            signal_driver
285        }
286    }
287}
288
289// ===== time driver =====
290
291cfg_time! {
292    #[derive(Debug)]
293    pub(crate) enum TimeDriver {
294        Enabled {
295            driver: crate::runtime::time::Driver,
296        },
297        Disabled(IoStack),
298    }
299
300    pub(crate) type Clock = crate::time::Clock;
301    pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
302
303    fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
304        crate::time::Clock::new(enable_pausing, start_paused)
305    }
306
307    fn create_time_driver(
308        enable: bool,
309        io_stack: IoStack,
310        clock: &Clock,
311        workers: usize,
312    ) -> (TimeDriver, TimeHandle) {
313        if enable {
314            let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32);
315
316            (TimeDriver::Enabled { driver }, Some(handle))
317        } else {
318            (TimeDriver::Disabled(io_stack), None)
319        }
320    }
321
322    impl TimeDriver {
323        pub(crate) fn is_enabled(&self) -> bool {
324            match self {
325                TimeDriver::Enabled { .. } => true,
326                TimeDriver::Disabled(inner) => inner.is_enabled(),
327            }
328        }
329
330        pub(crate) fn park(&mut self, handle: &Handle) {
331            match self {
332                TimeDriver::Enabled { driver, .. } => driver.park(handle),
333                TimeDriver::Disabled(v) => v.park(handle),
334            }
335        }
336
337        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
338            match self {
339                TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
340                TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
341            }
342        }
343
344        pub(crate) fn shutdown(&mut self, handle: &Handle) {
345            match self {
346                TimeDriver::Enabled { driver } => driver.shutdown(handle),
347                TimeDriver::Disabled(v) => v.shutdown(handle),
348            }
349        }
350    }
351}
352
353cfg_not_time! {
354    type TimeDriver = IoStack;
355
356    pub(crate) type Clock = ();
357    pub(crate) type TimeHandle = ();
358
359    fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
360        ()
361    }
362
363    fn create_time_driver(
364        _enable: bool,
365        io_stack: IoStack,
366        _clock: &Clock,
367        _workers: usize,
368    ) -> (TimeDriver, TimeHandle) {
369        (io_stack, ())
370    }
371}