tokio/io/
poll_evented.rs

1use crate::io::interest::Interest;
2use crate::runtime::io::Registration;
3use crate::runtime::scheduler;
4
5use mio::event::Source;
6use std::fmt;
7use std::io;
8use std::ops::Deref;
9use std::panic::{RefUnwindSafe, UnwindSafe};
10use std::task::ready;
11
12cfg_io_driver! {
13    /// Associates an I/O resource that implements the [`std::io::Read`] and/or
14    /// [`std::io::Write`] traits with the reactor that drives it.
15    ///
16    /// `PollEvented` uses [`Registration`] internally to take a type that
17    /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
18    /// [`std::io::Write`] and associate it with a reactor that will drive it.
19    ///
20    /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
21    /// used from within the future's execution model. As such, the
22    /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
23    /// implementations using the underlying I/O resource as well as readiness
24    /// events provided by the reactor.
25    ///
26    /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
27    /// `Sync`), the caller must ensure that there are at most two tasks that
28    /// use a `PollEvented` instance concurrently. One for reading and one for
29    /// writing. While violating this requirement is "safe" from a Rust memory
30    /// model point of view, it will result in unexpected behavior in the form
31    /// of lost notifications and tasks hanging.
32    ///
33    /// ## Readiness events
34    ///
35    /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
36    /// this type also supports access to the underlying readiness event stream.
37    /// While similar in function to what [`Registration`] provides, the
38    /// semantics are a bit different.
39    ///
40    /// Two functions are provided to access the readiness events:
41    /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
42    /// current readiness state of the `PollEvented` instance. If
43    /// [`poll_read_ready`] indicates read readiness, immediately calling
44    /// [`poll_read_ready`] again will also indicate read readiness.
45    ///
46    /// When the operation is attempted and is unable to succeed due to the I/O
47    /// resource not being ready, the caller must call [`clear_readiness`].
48    /// This clears the readiness state until a new readiness event is received.
49    ///
50    /// This allows the caller to implement additional functions. For example,
51    /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and
52    /// [`clear_readiness`].
53    ///
54    /// ## Platform-specific events
55    ///
56    /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
57    /// These events are included as part of the read readiness event stream. The
58    /// write readiness event stream is only for `Ready::writable()` events.
59    ///
60    /// [`AsyncRead`]: crate::io::AsyncRead
61    /// [`AsyncWrite`]: crate::io::AsyncWrite
62    /// [`TcpListener`]: crate::net::TcpListener
63    /// [`clear_readiness`]: Registration::clear_readiness
64    /// [`poll_read_ready`]: Registration::poll_read_ready
65    /// [`poll_write_ready`]: Registration::poll_write_ready
66    pub(crate) struct PollEvented<E: Source> {
67        io: Option<E>,
68        registration: Registration,
69    }
70}
71
72// ===== impl PollEvented =====
73
74impl<E: Source> PollEvented<E> {
75    /// Creates a new `PollEvented` associated with the default reactor.
76    ///
77    /// The returned `PollEvented` has readable and writable interests. For more control, use
78    /// [`Self::new_with_interest`].
79    ///
80    /// # Panics
81    ///
82    /// This function panics if thread-local runtime is not set.
83    ///
84    /// The runtime is usually set implicitly when this function is called
85    /// from a future driven by a tokio runtime, otherwise runtime can be set
86    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
87    #[track_caller]
88    #[cfg_attr(feature = "signal", allow(unused))]
89    pub(crate) fn new(io: E) -> io::Result<Self> {
90        PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
91    }
92
93    /// Creates a new `PollEvented` associated with the default reactor, for
94    /// specific `Interest` state. `new_with_interest` should be used over `new`
95    /// when you need control over the readiness state, such as when a file
96    /// descriptor only allows reads. This does not add `hup` or `error` so if
97    /// you are interested in those states, you will need to add them to the
98    /// readiness state passed to this function.
99    ///
100    /// # Panics
101    ///
102    /// This function panics if thread-local runtime is not set.
103    ///
104    /// The runtime is usually set implicitly when this function is called from
105    /// a future driven by a tokio runtime, otherwise runtime can be set
106    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
107    /// function.
108    #[track_caller]
109    #[cfg_attr(feature = "signal", allow(unused))]
110    pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
111        Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
112    }
113
114    #[track_caller]
115    pub(crate) fn new_with_interest_and_handle(
116        mut io: E,
117        interest: Interest,
118        handle: scheduler::Handle,
119    ) -> io::Result<Self> {
120        let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
121        Ok(Self {
122            io: Some(io),
123            registration,
124        })
125    }
126
127    /// Returns a reference to the registration.
128    #[cfg(feature = "net")]
129    pub(crate) fn registration(&self) -> &Registration {
130        &self.registration
131    }
132
133    /// Deregisters the inner io from the registration and returns a Result containing the inner io.
134    #[cfg(any(feature = "net", feature = "process"))]
135    pub(crate) fn into_inner(mut self) -> io::Result<E> {
136        let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
137        self.registration.deregister(&mut inner)?;
138        Ok(inner)
139    }
140
141    #[cfg(all(feature = "process", target_os = "linux"))]
142    pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143        self.registration
144            .poll_read_ready(cx)
145            .map_err(io::Error::from)
146            .map_ok(|_| ())
147    }
148
149    /// Re-register under new runtime with `interest`.
150    #[cfg(all(feature = "process", target_os = "linux"))]
151    pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> {
152        let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here.
153        let _ = self.registration.deregister(io);
154        self.registration =
155            Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?;
156
157        Ok(())
158    }
159}
160
161feature! {
162    #![any(feature = "net", all(unix, feature = "process"))]
163
164    use crate::io::ReadBuf;
165    use std::task::{Context, Poll};
166
167    impl<E: Source> PollEvented<E> {
168        // Safety: The caller must ensure that `E` can read into uninitialized memory
169        pub(crate) unsafe fn poll_read<'a>(
170            &'a self,
171            cx: &mut Context<'_>,
172            buf: &mut ReadBuf<'_>,
173        ) -> Poll<io::Result<()>>
174        where
175            &'a E: io::Read + 'a,
176        {
177            use std::io::Read;
178
179            loop {
180                let evt = ready!(self.registration.poll_read_ready(cx))?;
181
182                let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
183
184                // used only when the cfgs below apply
185                #[allow(unused_variables)]
186                let len = b.len();
187
188                match self.io.as_ref().unwrap().read(b) {
189                    Ok(n) => {
190                        // When mio is using the epoll or kqueue selector, reading a partially full
191                        // buffer is sufficient to show that the socket buffer has been drained.
192                        //
193                        // This optimization does not work for level-triggered selectors such as
194                        // windows or when poll is used.
195                        //
196                        // Read more:
197                        // https://github.com/tokio-rs/tokio/issues/5866
198                        #[cfg(all(
199                            not(mio_unsupported_force_poll_poll),
200                            any(
201                                // epoll
202                                target_os = "android",
203                                target_os = "illumos",
204                                target_os = "linux",
205                                target_os = "redox",
206                                // kqueue
207                                target_os = "dragonfly",
208                                target_os = "freebsd",
209                                target_os = "ios",
210                                target_os = "macos",
211                                target_os = "netbsd",
212                                target_os = "openbsd",
213                                target_os = "tvos",
214                                target_os = "visionos",
215                                target_os = "watchos",
216                            )
217                        ))]
218                        if 0 < n && n < len {
219                            self.registration.clear_readiness(evt);
220                        }
221
222                        // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
223                        // buffer.
224                        buf.assume_init(n);
225                        buf.advance(n);
226                        return Poll::Ready(Ok(()));
227                    },
228                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
229                        self.registration.clear_readiness(evt);
230                    }
231                    Err(e) => return Poll::Ready(Err(e)),
232                }
233            }
234        }
235
236        pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
237        where
238            &'a E: io::Write + 'a,
239        {
240            use std::io::Write;
241
242            loop {
243                let evt = ready!(self.registration.poll_write_ready(cx))?;
244
245                match self.io.as_ref().unwrap().write(buf) {
246                    Ok(n) => {
247                        // if we write only part of our buffer, this is sufficient on unix to show
248                        // that the socket buffer is full.  Unfortunately this assumption
249                        // fails for level-triggered selectors (like on Windows or poll even for
250                        // UNIX): https://github.com/tokio-rs/tokio/issues/5866
251                        if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) {
252                            self.registration.clear_readiness(evt);
253                        }
254
255                        return Poll::Ready(Ok(n));
256                    },
257                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
258                        self.registration.clear_readiness(evt);
259                    }
260                    Err(e) => return Poll::Ready(Err(e)),
261                }
262            }
263        }
264
265        #[cfg(any(feature = "net", feature = "process"))]
266        pub(crate) fn poll_write_vectored<'a>(
267            &'a self,
268            cx: &mut Context<'_>,
269            bufs: &[io::IoSlice<'_>],
270        ) -> Poll<io::Result<usize>>
271        where
272            &'a E: io::Write + 'a,
273        {
274            use std::io::Write;
275            self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
276        }
277    }
278}
279
280impl<E: Source> UnwindSafe for PollEvented<E> {}
281
282impl<E: Source> RefUnwindSafe for PollEvented<E> {}
283
284impl<E: Source> Deref for PollEvented<E> {
285    type Target = E;
286
287    fn deref(&self) -> &E {
288        self.io.as_ref().unwrap()
289    }
290}
291
292impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("PollEvented").field("io", &self.io).finish()
295    }
296}
297
298impl<E: Source> Drop for PollEvented<E> {
299    fn drop(&mut self) {
300        if let Some(mut io) = self.io.take() {
301            // Ignore errors
302            let _ = self.registration.deregister(&mut io);
303        }
304    }
305}