tokio/runtime/
driver.rs

1//! Abstracts out the entire chain of runtime sub-drivers into common types.
2
3// Eventually, this file will see significant refactoring / cleanup. For now, we
4// don't need to worry much about dead code with certain feature permutations.
5#![cfg_attr(
6    any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
7    allow(dead_code)
8)]
9
10use crate::runtime::park::{ParkThread, UnparkThread};
11
12use std::io;
13use std::time::Duration;
14
15#[derive(Debug)]
16pub(crate) struct Driver {
17    inner: TimeDriver,
18}
19
20#[derive(Debug)]
21pub(crate) struct Handle {
22    /// IO driver handle
23    pub(crate) io: IoHandle,
24
25    /// Signal driver handle
26    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
27    pub(crate) signal: SignalHandle,
28
29    /// Time driver handle
30    pub(crate) time: TimeHandle,
31
32    /// Source of `Instant::now()`
33    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
34    pub(crate) clock: Clock,
35}
36
37pub(crate) struct Cfg {
38    pub(crate) enable_io: bool,
39    pub(crate) enable_time: bool,
40    pub(crate) enable_pause_time: bool,
41    pub(crate) start_paused: bool,
42    pub(crate) nevents: usize,
43}
44
45impl Driver {
46    pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
47        let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
48
49        let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
50
51        let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);
52
53        Ok((
54            Self { inner: time_driver },
55            Handle {
56                io: io_handle,
57                signal: signal_handle,
58                time: time_handle,
59                clock,
60            },
61        ))
62    }
63
64    pub(crate) fn park(&mut self, handle: &Handle) {
65        self.inner.park(handle);
66    }
67
68    pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
69        self.inner.park_timeout(handle, duration);
70    }
71
72    pub(crate) fn shutdown(&mut self, handle: &Handle) {
73        self.inner.shutdown(handle);
74    }
75}
76
77impl Handle {
78    pub(crate) fn unpark(&self) {
79        #[cfg(feature = "time")]
80        if let Some(handle) = &self.time {
81            handle.unpark();
82        }
83
84        self.io.unpark();
85    }
86
87    cfg_io_driver! {
88        #[track_caller]
89        pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
90            self.io
91                .as_ref()
92                .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
93        }
94    }
95
96    cfg_signal_internal_and_unix! {
97        #[track_caller]
98        pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle {
99            self.signal
100                .as_ref()
101                .expect("there is no signal driver running, must be called from the context of Tokio runtime")
102        }
103    }
104
105    cfg_time! {
106        /// Returns a reference to the time driver handle.
107        ///
108        /// Panics if no time driver is present.
109        #[track_caller]
110        pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
111            self.time
112                .as_ref()
113                .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
114        }
115
116        pub(crate) fn clock(&self) -> &Clock {
117            &self.clock
118        }
119    }
120}
121
122// ===== io driver =====
123
124cfg_io_driver! {
125    pub(crate) type IoDriver = crate::runtime::io::Driver;
126
127    #[derive(Debug)]
128    pub(crate) enum IoStack {
129        Enabled(ProcessDriver),
130        Disabled(ParkThread),
131    }
132
133    #[derive(Debug)]
134    pub(crate) enum IoHandle {
135        Enabled(crate::runtime::io::Handle),
136        Disabled(UnparkThread),
137    }
138
139    fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
140        #[cfg(loom)]
141        assert!(!enabled);
142
143        let ret = if enabled {
144            let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
145
146            let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
147            let process_driver = create_process_driver(signal_driver);
148
149            (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
150        } else {
151            let park_thread = ParkThread::new();
152            let unpark_thread = park_thread.unpark();
153            (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
154        };
155
156        Ok(ret)
157    }
158
159    impl IoStack {
160        pub(crate) fn park(&mut self, handle: &Handle) {
161            match self {
162                IoStack::Enabled(v) => v.park(handle),
163                IoStack::Disabled(v) => v.park(),
164            }
165        }
166
167        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
168            match self {
169                IoStack::Enabled(v) => v.park_timeout(handle, duration),
170                IoStack::Disabled(v) => v.park_timeout(duration),
171            }
172        }
173
174        pub(crate) fn shutdown(&mut self, handle: &Handle) {
175            match self {
176                IoStack::Enabled(v) => v.shutdown(handle),
177                IoStack::Disabled(v) => v.shutdown(),
178            }
179        }
180    }
181
182    impl IoHandle {
183        pub(crate) fn unpark(&self) {
184            match self {
185                IoHandle::Enabled(handle) => handle.unpark(),
186                IoHandle::Disabled(handle) => handle.unpark(),
187            }
188        }
189
190        pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
191            match self {
192                IoHandle::Enabled(v) => Some(v),
193                IoHandle::Disabled(..) => None,
194            }
195        }
196    }
197}
198
199cfg_not_io_driver! {
200    pub(crate) type IoHandle = UnparkThread;
201
202    #[derive(Debug)]
203    pub(crate) struct IoStack(ParkThread);
204
205    fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
206        let park_thread = ParkThread::new();
207        let unpark_thread = park_thread.unpark();
208        Ok((IoStack(park_thread), unpark_thread, Default::default()))
209    }
210
211    impl IoStack {
212        pub(crate) fn park(&mut self, _handle: &Handle) {
213            self.0.park();
214        }
215
216        pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
217            self.0.park_timeout(duration);
218        }
219
220        pub(crate) fn shutdown(&mut self, _handle: &Handle) {
221            self.0.shutdown();
222        }
223
224        /// This is not a "real" driver, so it is not considered enabled.
225        pub(crate) fn is_enabled(&self) -> bool {
226            false
227        }
228    }
229}
230
231// ===== signal driver =====
232
233cfg_signal_internal_and_unix! {
234    type SignalDriver = crate::runtime::signal::Driver;
235    pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;
236
237    fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
238        let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
239        let handle = driver.handle();
240        Ok((driver, Some(handle)))
241    }
242}
243
244cfg_not_signal_internal! {
245    pub(crate) type SignalHandle = ();
246
247    cfg_io_driver! {
248        type SignalDriver = IoDriver;
249
250        fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
251            Ok((io_driver, ()))
252        }
253    }
254}
255
256// ===== process driver =====
257
258cfg_process_driver! {
259    type ProcessDriver = crate::runtime::process::Driver;
260
261    fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
262        ProcessDriver::new(signal_driver)
263    }
264}
265
266cfg_not_process_driver! {
267    cfg_io_driver! {
268        type ProcessDriver = SignalDriver;
269
270        fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
271            signal_driver
272        }
273    }
274}
275
276// ===== time driver =====
277
278cfg_time! {
279    #[derive(Debug)]
280    pub(crate) enum TimeDriver {
281        Enabled {
282            driver: crate::runtime::time::Driver,
283        },
284        Disabled(IoStack),
285    }
286
287    pub(crate) type Clock = crate::time::Clock;
288    pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
289
290    fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
291        crate::time::Clock::new(enable_pausing, start_paused)
292    }
293
294    fn create_time_driver(
295        enable: bool,
296        io_stack: IoStack,
297        clock: &Clock,
298    ) -> (TimeDriver, TimeHandle) {
299        if enable {
300            let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
301
302            (TimeDriver::Enabled { driver }, Some(handle))
303        } else {
304            (TimeDriver::Disabled(io_stack), None)
305        }
306    }
307
308    impl TimeDriver {
309        pub(crate) fn park(&mut self, handle: &Handle) {
310            match self {
311                TimeDriver::Enabled { driver, .. } => driver.park(handle),
312                TimeDriver::Disabled(v) => v.park(handle),
313            }
314        }
315
316        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
317            match self {
318                TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
319                TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
320            }
321        }
322
323        pub(crate) fn shutdown(&mut self, handle: &Handle) {
324            match self {
325                TimeDriver::Enabled { driver } => driver.shutdown(handle),
326                TimeDriver::Disabled(v) => v.shutdown(handle),
327            }
328        }
329    }
330}
331
332cfg_not_time! {
333    type TimeDriver = IoStack;
334
335    pub(crate) type Clock = ();
336    pub(crate) type TimeHandle = ();
337
338    fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
339        ()
340    }
341
342    fn create_time_driver(
343        _enable: bool,
344        io_stack: IoStack,
345        _clock: &Clock,
346    ) -> (TimeDriver, TimeHandle) {
347        (io_stack, ())
348    }
349}