tokio/io/poll_evented.rs
1use crate::io::interest::Interest;
2use crate::runtime::io::Registration;
3use crate::runtime::scheduler;
4
5use mio::event::Source;
6use std::fmt;
7use std::io;
8use std::ops::Deref;
9use std::panic::{RefUnwindSafe, UnwindSafe};
10use std::task::ready;
11
12cfg_io_driver! {
13 /// Associates an I/O resource that implements the [`std::io::Read`] and/or
14 /// [`std::io::Write`] traits with the reactor that drives it.
15 ///
16 /// `PollEvented` uses [`Registration`] internally to take a type that
17 /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
18 /// [`std::io::Write`] and associate it with a reactor that will drive it.
19 ///
20 /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
21 /// used from within the future's execution model. As such, the
22 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
23 /// implementations using the underlying I/O resource as well as readiness
24 /// events provided by the reactor.
25 ///
26 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
27 /// `Sync`), the caller must ensure that there are at most two tasks that
28 /// use a `PollEvented` instance concurrently. One for reading and one for
29 /// writing. While violating this requirement is "safe" from a Rust memory
30 /// model point of view, it will result in unexpected behavior in the form
31 /// of lost notifications and tasks hanging.
32 ///
33 /// ## Readiness events
34 ///
35 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
36 /// this type also supports access to the underlying readiness event stream.
37 /// While similar in function to what [`Registration`] provides, the
38 /// semantics are a bit different.
39 ///
40 /// Two functions are provided to access the readiness events:
41 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
42 /// current readiness state of the `PollEvented` instance. If
43 /// [`poll_read_ready`] indicates read readiness, immediately calling
44 /// [`poll_read_ready`] again will also indicate read readiness.
45 ///
46 /// When the operation is attempted and is unable to succeed due to the I/O
47 /// resource not being ready, the caller must call [`clear_readiness`].
48 /// This clears the readiness state until a new readiness event is received.
49 ///
50 /// This allows the caller to implement additional functions. For example,
51 /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and
52 /// [`clear_readiness`].
53 ///
54 /// ## Platform-specific events
55 ///
56 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
57 /// These events are included as part of the read readiness event stream. The
58 /// write readiness event stream is only for `Ready::writable()` events.
59 ///
60 /// [`AsyncRead`]: crate::io::AsyncRead
61 /// [`AsyncWrite`]: crate::io::AsyncWrite
62 /// [`TcpListener`]: crate::net::TcpListener
63 /// [`clear_readiness`]: Registration::clear_readiness
64 /// [`poll_read_ready`]: Registration::poll_read_ready
65 /// [`poll_write_ready`]: Registration::poll_write_ready
66 pub(crate) struct PollEvented<E: Source> {
67 io: Option<E>,
68 registration: Registration,
69 }
70}
71
72// ===== impl PollEvented =====
73
74impl<E: Source> PollEvented<E> {
75 /// Creates a new `PollEvented` associated with the default reactor.
76 ///
77 /// The returned `PollEvented` has readable and writable interests. For more control, use
78 /// [`Self::new_with_interest`].
79 ///
80 /// # Panics
81 ///
82 /// This function panics if thread-local runtime is not set.
83 ///
84 /// The runtime is usually set implicitly when this function is called
85 /// from a future driven by a tokio runtime, otherwise runtime can be set
86 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
87 #[track_caller]
88 #[cfg_attr(feature = "signal", allow(unused))]
89 pub(crate) fn new(io: E) -> io::Result<Self> {
90 PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
91 }
92
93 /// Creates a new `PollEvented` associated with the default reactor, for
94 /// specific `Interest` state. `new_with_interest` should be used over `new`
95 /// when you need control over the readiness state, such as when a file
96 /// descriptor only allows reads. This does not add `hup` or `error` so if
97 /// you are interested in those states, you will need to add them to the
98 /// readiness state passed to this function.
99 ///
100 /// # Panics
101 ///
102 /// This function panics if thread-local runtime is not set.
103 ///
104 /// The runtime is usually set implicitly when this function is called from
105 /// a future driven by a tokio runtime, otherwise runtime can be set
106 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
107 /// function.
108 #[track_caller]
109 #[cfg_attr(feature = "signal", allow(unused))]
110 pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
111 Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
112 }
113
114 #[track_caller]
115 pub(crate) fn new_with_interest_and_handle(
116 mut io: E,
117 interest: Interest,
118 handle: scheduler::Handle,
119 ) -> io::Result<Self> {
120 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
121 Ok(Self {
122 io: Some(io),
123 registration,
124 })
125 }
126
127 /// Returns a reference to the registration.
128 #[cfg(any(feature = "net", all(feature = "process", target_os = "linux")))]
129 pub(crate) fn registration(&self) -> &Registration {
130 &self.registration
131 }
132
133 /// Deregisters the inner io from the registration and returns a Result containing the inner io.
134 #[cfg(any(feature = "net", feature = "process"))]
135 pub(crate) fn into_inner(mut self) -> io::Result<E> {
136 let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
137 self.registration.deregister(&mut inner)?;
138 Ok(inner)
139 }
140
141 /// Re-register under new runtime with `interest`.
142 #[cfg(all(feature = "process", target_os = "linux"))]
143 pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> {
144 let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here.
145 let _ = self.registration.deregister(io);
146 self.registration =
147 Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?;
148
149 Ok(())
150 }
151}
152
153feature! {
154 #![any(feature = "net", all(unix, feature = "process"))]
155
156 use crate::io::ReadBuf;
157 use std::task::{Context, Poll};
158
159 impl<E: Source> PollEvented<E> {
160 // Safety: The caller must ensure that `E` can read into uninitialized memory
161 pub(crate) unsafe fn poll_read<'a>(
162 &'a self,
163 cx: &mut Context<'_>,
164 buf: &mut ReadBuf<'_>,
165 ) -> Poll<io::Result<()>>
166 where
167 &'a E: io::Read + 'a,
168 {
169 use std::io::Read;
170
171 loop {
172 let evt = ready!(self.registration.poll_read_ready(cx))?;
173
174 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
175
176 // used only when the cfgs below apply
177 #[allow(unused_variables)]
178 let len = b.len();
179
180 match self.io.as_ref().unwrap().read(b) {
181 Ok(n) => {
182 // When mio is using the epoll or kqueue selector, reading a partially full
183 // buffer is sufficient to show that the socket buffer has been drained.
184 //
185 // This optimization does not work for level-triggered selectors such as
186 // windows or when poll is used.
187 //
188 // Read more:
189 // https://github.com/tokio-rs/tokio/issues/5866
190 #[cfg(all(
191 not(mio_unsupported_force_poll_poll),
192 any(
193 // epoll
194 target_os = "android",
195 target_os = "illumos",
196 target_os = "linux",
197 target_os = "redox",
198 // kqueue
199 target_os = "dragonfly",
200 target_os = "freebsd",
201 target_os = "ios",
202 target_os = "macos",
203 target_os = "netbsd",
204 target_os = "openbsd",
205 target_os = "tvos",
206 target_os = "visionos",
207 target_os = "watchos",
208 )
209 ))]
210 if 0 < n && n < len {
211 self.registration.clear_readiness(evt);
212 }
213
214 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
215 // buffer.
216 buf.assume_init(n);
217 buf.advance(n);
218 return Poll::Ready(Ok(()));
219 },
220 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
221 self.registration.clear_readiness(evt);
222 }
223 Err(e) => return Poll::Ready(Err(e)),
224 }
225 }
226 }
227
228 pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
229 where
230 &'a E: io::Write + 'a,
231 {
232 use std::io::Write;
233
234 loop {
235 let evt = ready!(self.registration.poll_write_ready(cx))?;
236
237 match self.io.as_ref().unwrap().write(buf) {
238 Ok(n) => {
239 // if we write only part of our buffer, this is sufficient on unix to show
240 // that the socket buffer is full. Unfortunately this assumption
241 // fails for level-triggered selectors (like on Windows or poll even for
242 // UNIX): https://github.com/tokio-rs/tokio/issues/5866
243 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) {
244 self.registration.clear_readiness(evt);
245 }
246
247 return Poll::Ready(Ok(n));
248 },
249 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
250 self.registration.clear_readiness(evt);
251 }
252 Err(e) => return Poll::Ready(Err(e)),
253 }
254 }
255 }
256
257 #[cfg(any(feature = "net", feature = "process"))]
258 pub(crate) fn poll_write_vectored<'a>(
259 &'a self,
260 cx: &mut Context<'_>,
261 bufs: &[io::IoSlice<'_>],
262 ) -> Poll<io::Result<usize>>
263 where
264 &'a E: io::Write + 'a,
265 {
266 use std::io::Write;
267 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
268 }
269 }
270}
271
272impl<E: Source> UnwindSafe for PollEvented<E> {}
273
274impl<E: Source> RefUnwindSafe for PollEvented<E> {}
275
276impl<E: Source> Deref for PollEvented<E> {
277 type Target = E;
278
279 fn deref(&self) -> &E {
280 self.io.as_ref().unwrap()
281 }
282}
283
284impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 f.debug_struct("PollEvented").field("io", &self.io).finish()
287 }
288}
289
290impl<E: Source> Drop for PollEvented<E> {
291 fn drop(&mut self) {
292 if let Some(mut io) = self.io.take() {
293 // Ignore errors
294 let _ = self.registration.deregister(&mut io);
295 }
296 }
297}