mio/
io_source.rs

1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4// TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
5// can use `std::os::fd` and be merged with the above.
6#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
18/// implementation.
19///
20/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
21///
22/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
23/// Mio supports registering any FD or socket that can be registered with the
24/// underlying OS selector. `IoSource` provides the necessary bridge.
25///
26/// [`RawFd`]: std::os::fd::RawFd
27/// [`RawSocket`]: std::os::windows::io::RawSocket
28///
29/// # Notes
30///
31/// To handle the registrations and events properly **all** I/O operations (such
32/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
33/// internal state is updated accordingly.
34///
35/// [`Poll`]: crate::Poll
36/// [`do_io`]: IoSource::do_io
37pub struct IoSource<T> {
38    state: IoSourceState,
39    inner: T,
40    #[cfg(debug_assertions)]
41    selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45    /// Create a new `IoSource`.
46    pub fn new(io: T) -> IoSource<T> {
47        IoSource {
48            state: IoSourceState::new(),
49            inner: io,
50            #[cfg(debug_assertions)]
51            selector_id: SelectorId::new(),
52        }
53    }
54
55    /// Execute an I/O operations ensuring that the socket receives more events
56    /// if it hits a [`WouldBlock`] error.
57    ///
58    /// # Notes
59    ///
60    /// This method is required to be called for **all** I/O operations to
61    /// ensure the user will receive events once the socket is ready again after
62    /// returning a [`WouldBlock`] error.
63    ///
64    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
65    pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66    where
67        F: FnOnce(&T) -> io::Result<R>,
68    {
69        self.state.do_io(f, &self.inner)
70    }
71
72    /// Returns the I/O source, dropping the state.
73    ///
74    /// # Notes
75    ///
76    /// To ensure no more events are to be received for this I/O source first
77    /// [`deregister`] it.
78    ///
79    /// [`deregister`]: Registry::deregister
80    pub fn into_inner(self) -> T {
81        self.inner
82    }
83}
84
85/// Be careful when using this method. All I/O operations that may block must go
86/// through the [`do_io`] method.
87///
88/// [`do_io`]: IoSource::do_io
89impl<T> Deref for IoSource<T> {
90    type Target = T;
91
92    fn deref(&self) -> &Self::Target {
93        &self.inner
94    }
95}
96
97/// Be careful when using this method. All I/O operations that may block must go
98/// through the [`do_io`] method.
99///
100/// [`do_io`]: IoSource::do_io
101impl<T> DerefMut for IoSource<T> {
102    fn deref_mut(&mut self) -> &mut Self::Target {
103        &mut self.inner
104    }
105}
106
107#[cfg(any(unix, target_os = "hermit"))]
108impl<T> event::Source for IoSource<T>
109where
110    T: AsRawFd,
111{
112    fn register(
113        &mut self,
114        registry: &Registry,
115        token: Token,
116        interests: Interest,
117    ) -> io::Result<()> {
118        #[cfg(debug_assertions)]
119        self.selector_id.associate(registry)?;
120        self.state
121            .register(registry, token, interests, self.inner.as_raw_fd())
122    }
123
124    fn reregister(
125        &mut self,
126        registry: &Registry,
127        token: Token,
128        interests: Interest,
129    ) -> io::Result<()> {
130        #[cfg(debug_assertions)]
131        self.selector_id.check_association(registry)?;
132        self.state
133            .reregister(registry, token, interests, self.inner.as_raw_fd())
134    }
135
136    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
137        #[cfg(debug_assertions)]
138        self.selector_id.remove_association(registry)?;
139        self.state.deregister(registry, self.inner.as_raw_fd())
140    }
141}
142
143#[cfg(windows)]
144impl<T> event::Source for IoSource<T>
145where
146    T: AsRawSocket,
147{
148    fn register(
149        &mut self,
150        registry: &Registry,
151        token: Token,
152        interests: Interest,
153    ) -> io::Result<()> {
154        #[cfg(debug_assertions)]
155        self.selector_id.associate(registry)?;
156        self.state
157            .register(registry, token, interests, self.inner.as_raw_socket())
158    }
159
160    fn reregister(
161        &mut self,
162        registry: &Registry,
163        token: Token,
164        interests: Interest,
165    ) -> io::Result<()> {
166        #[cfg(debug_assertions)]
167        self.selector_id.check_association(registry)?;
168        self.state.reregister(registry, token, interests)
169    }
170
171    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
172        #[cfg(debug_assertions)]
173        self.selector_id.remove_association(_registry)?;
174        self.state.deregister()
175    }
176}
177
178#[cfg(target_os = "wasi")]
179impl<T> event::Source for IoSource<T>
180where
181    T: AsRawFd,
182{
183    fn register(
184        &mut self,
185        registry: &Registry,
186        token: Token,
187        interests: Interest,
188    ) -> io::Result<()> {
189        #[cfg(debug_assertions)]
190        self.selector_id.associate(registry)?;
191        registry
192            .selector()
193            .register(self.inner.as_raw_fd() as _, token, interests)
194    }
195
196    fn reregister(
197        &mut self,
198        registry: &Registry,
199        token: Token,
200        interests: Interest,
201    ) -> io::Result<()> {
202        #[cfg(debug_assertions)]
203        self.selector_id.check_association(registry)?;
204        registry
205            .selector()
206            .reregister(self.inner.as_raw_fd() as _, token, interests)
207    }
208
209    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
210        #[cfg(debug_assertions)]
211        self.selector_id.remove_association(registry)?;
212        registry.selector().deregister(self.inner.as_raw_fd() as _)
213    }
214}
215
216impl<T> fmt::Debug for IoSource<T>
217where
218    T: fmt::Debug,
219{
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        self.inner.fmt(f)
222    }
223}
224
225/// Used to associate an `IoSource` with a `sys::Selector`.
226#[cfg(debug_assertions)]
227#[derive(Debug)]
228struct SelectorId {
229    id: AtomicUsize,
230}
231
232#[cfg(debug_assertions)]
233impl SelectorId {
234    /// Value of `id` if `SelectorId` is not associated with any
235    /// `sys::Selector`. Valid selector ids start at 1.
236    const UNASSOCIATED: usize = 0;
237
238    /// Create a new `SelectorId`.
239    const fn new() -> SelectorId {
240        SelectorId {
241            id: AtomicUsize::new(Self::UNASSOCIATED),
242        }
243    }
244
245    /// Associate an I/O source with `registry`, returning an error if its
246    /// already registered.
247    fn associate(&self, registry: &Registry) -> io::Result<()> {
248        let registry_id = registry.selector().id();
249        let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
250
251        if previous_id == Self::UNASSOCIATED {
252            Ok(())
253        } else {
254            Err(io::Error::new(
255                io::ErrorKind::AlreadyExists,
256                "I/O source already registered with a `Registry`",
257            ))
258        }
259    }
260
261    /// Check the association of an I/O source with `registry`, returning an
262    /// error if its registered with a different `Registry` or not registered at
263    /// all.
264    fn check_association(&self, registry: &Registry) -> io::Result<()> {
265        let registry_id = registry.selector().id();
266        let id = self.id.load(Ordering::Acquire);
267
268        if id == registry_id {
269            Ok(())
270        } else if id == Self::UNASSOCIATED {
271            Err(io::Error::new(
272                io::ErrorKind::NotFound,
273                "I/O source not registered with `Registry`",
274            ))
275        } else {
276            Err(io::Error::new(
277                io::ErrorKind::AlreadyExists,
278                "I/O source already registered with a different `Registry`",
279            ))
280        }
281    }
282
283    /// Remove a previously made association from `registry`, returns an error
284    /// if it was not previously associated with `registry`.
285    fn remove_association(&self, registry: &Registry) -> io::Result<()> {
286        let registry_id = registry.selector().id();
287        let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
288
289        if previous_id == registry_id {
290            Ok(())
291        } else {
292            Err(io::Error::new(
293                io::ErrorKind::NotFound,
294                "I/O source not registered with `Registry`",
295            ))
296        }
297    }
298}
299
300#[cfg(debug_assertions)]
301impl Clone for SelectorId {
302    fn clone(&self) -> SelectorId {
303        SelectorId {
304            id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
305        }
306    }
307}