tokio/runtime/io/
driver.rs

1// Signal handling
2cfg_signal_internal_and_unix! {
3    mod signal;
4}
5
6use crate::io::interest::Interest;
7use crate::io::ready::Ready;
8use crate::loom::sync::Mutex;
9use crate::runtime::driver;
10use crate::runtime::io::registration_set;
11use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
12
13use mio::event::Source;
14use std::fmt;
15use std::io;
16use std::sync::Arc;
17use std::time::Duration;
18
19/// I/O driver, backed by Mio.
20pub(crate) struct Driver {
21    /// True when an event with the signal token is received
22    signal_ready: bool,
23
24    /// Reuse the `mio::Events` value across calls to poll.
25    events: mio::Events,
26
27    /// The system event queue.
28    poll: mio::Poll,
29}
30
31/// A reference to an I/O driver.
32pub(crate) struct Handle {
33    /// Registers I/O resources.
34    registry: mio::Registry,
35
36    /// Tracks all registrations
37    registrations: RegistrationSet,
38
39    /// State that should be synchronized
40    synced: Mutex<registration_set::Synced>,
41
42    /// Used to wake up the reactor from a call to `turn`.
43    /// Not supported on `Wasi` due to lack of threading support.
44    #[cfg(not(target_os = "wasi"))]
45    waker: mio::Waker,
46
47    pub(crate) metrics: IoDriverMetrics,
48}
49
50#[derive(Debug)]
51pub(crate) struct ReadyEvent {
52    pub(super) tick: u8,
53    pub(crate) ready: Ready,
54    pub(super) is_shutdown: bool,
55}
56
57cfg_net_unix!(
58    impl ReadyEvent {
59        pub(crate) fn with_ready(&self, ready: Ready) -> Self {
60            Self {
61                ready,
62                tick: self.tick,
63                is_shutdown: self.is_shutdown,
64            }
65        }
66    }
67);
68
69#[derive(Debug, Eq, PartialEq, Clone, Copy)]
70pub(super) enum Direction {
71    Read,
72    Write,
73}
74
75pub(super) enum Tick {
76    Set,
77    Clear(u8),
78}
79
80const TOKEN_WAKEUP: mio::Token = mio::Token(0);
81const TOKEN_SIGNAL: mio::Token = mio::Token(1);
82
83fn _assert_kinds() {
84    fn _assert<T: Send + Sync>() {}
85
86    _assert::<Handle>();
87}
88
89// ===== impl Driver =====
90
91impl Driver {
92    /// Creates a new event loop, returning any error that happened during the
93    /// creation.
94    pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
95        let poll = mio::Poll::new()?;
96        #[cfg(not(target_os = "wasi"))]
97        let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
98        let registry = poll.registry().try_clone()?;
99
100        let driver = Driver {
101            signal_ready: false,
102            events: mio::Events::with_capacity(nevents),
103            poll,
104        };
105
106        let (registrations, synced) = RegistrationSet::new();
107
108        let handle = Handle {
109            registry,
110            registrations,
111            synced: Mutex::new(synced),
112            #[cfg(not(target_os = "wasi"))]
113            waker,
114            metrics: IoDriverMetrics::default(),
115        };
116
117        Ok((driver, handle))
118    }
119
120    pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
121        let handle = rt_handle.io();
122        self.turn(handle, None);
123    }
124
125    pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
126        let handle = rt_handle.io();
127        self.turn(handle, Some(duration));
128    }
129
130    pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
131        let handle = rt_handle.io();
132        let ios = handle.registrations.shutdown(&mut handle.synced.lock());
133
134        // `shutdown()` must be called without holding the lock.
135        for io in ios {
136            io.shutdown();
137        }
138    }
139
140    fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
141        debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
142
143        handle.release_pending_registrations();
144
145        let events = &mut self.events;
146
147        // Block waiting for an event to happen, peeling out how many events
148        // happened.
149        match self.poll.poll(events, max_wait) {
150            Ok(()) => {}
151            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
152            #[cfg(target_os = "wasi")]
153            Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
154                // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
155                // just return from the park, as there would be nothing, which wakes us up.
156            }
157            Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
158        }
159
160        // Process all the events that came in, dispatching appropriately
161        let mut ready_count = 0;
162        for event in events.iter() {
163            let token = event.token();
164
165            if token == TOKEN_WAKEUP {
166                // Nothing to do, the event is used to unblock the I/O driver
167            } else if token == TOKEN_SIGNAL {
168                self.signal_ready = true;
169            } else {
170                let ready = Ready::from_mio(event);
171                let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
172
173                // Safety: we ensure that the pointers used as tokens are not freed
174                // until they are both deregistered from mio **and** we know the I/O
175                // driver is not concurrently polling. The I/O driver holds ownership of
176                // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
177                let io: &ScheduledIo = unsafe { &*ptr };
178
179                io.set_readiness(Tick::Set, |curr| curr | ready);
180                io.wake(ready);
181
182                ready_count += 1;
183            }
184        }
185
186        handle.metrics.incr_ready_count_by(ready_count);
187    }
188}
189
190impl fmt::Debug for Driver {
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        write!(f, "Driver")
193    }
194}
195
196impl Handle {
197    /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
198    /// makes the next call to `turn` return immediately.
199    ///
200    /// This method is intended to be used in situations where a notification
201    /// needs to otherwise be sent to the main reactor. If the reactor is
202    /// currently blocked inside of `turn` then it will wake up and soon return
203    /// after this method has been called. If the reactor is not currently
204    /// blocked in `turn`, then the next call to `turn` will not block and
205    /// return immediately.
206    pub(crate) fn unpark(&self) {
207        #[cfg(not(target_os = "wasi"))]
208        self.waker.wake().expect("failed to wake I/O driver");
209    }
210
211    /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
212    ///
213    /// The registration token is returned.
214    pub(super) fn add_source(
215        &self,
216        source: &mut impl mio::event::Source,
217        interest: Interest,
218    ) -> io::Result<Arc<ScheduledIo>> {
219        let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
220        let token = scheduled_io.token();
221
222        // we should remove the `scheduled_io` from the `registrations` set if registering
223        // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
224        if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
225            // safety: `scheduled_io` is part of the `registrations` set.
226            unsafe {
227                self.registrations
228                    .remove(&mut self.synced.lock(), &scheduled_io)
229            };
230
231            return Err(e);
232        }
233
234        // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
235        self.metrics.incr_fd_count();
236
237        Ok(scheduled_io)
238    }
239
240    /// Deregisters an I/O resource from the reactor.
241    pub(super) fn deregister_source(
242        &self,
243        registration: &Arc<ScheduledIo>,
244        source: &mut impl Source,
245    ) -> io::Result<()> {
246        // Deregister the source with the OS poller **first**
247        self.registry.deregister(source)?;
248
249        if self
250            .registrations
251            .deregister(&mut self.synced.lock(), registration)
252        {
253            self.unpark();
254        }
255
256        self.metrics.dec_fd_count();
257
258        Ok(())
259    }
260
261    fn release_pending_registrations(&self) {
262        if self.registrations.needs_release() {
263            self.registrations.release(&mut self.synced.lock());
264        }
265    }
266}
267
268impl fmt::Debug for Handle {
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        write!(f, "Handle")
271    }
272}
273
274impl Direction {
275    pub(super) fn mask(self) -> Ready {
276        match self {
277            Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
278            Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
279        }
280    }
281}