tokio/io/async_fd.rs
1use crate::io::{Interest, Ready};
2use crate::runtime::io::{ReadyEvent, Registration};
3use crate::runtime::scheduler;
4
5use mio::unix::SourceFd;
6use std::error::Error;
7use std::fmt;
8use std::io;
9use std::os::unix::io::{AsRawFd, RawFd};
10use std::task::{ready, Context, Poll};
11
12/// Associates an IO object backed by a Unix file descriptor with the tokio
13/// reactor, allowing for readiness to be polled. The file descriptor must be of
14/// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
15/// `kqueue`, etc), such as a network socket or pipe, and the file descriptor
16/// must have the nonblocking mode set to true.
17///
18/// Creating an [`AsyncFd`] registers the file descriptor with the current tokio
19/// Reactor, allowing you to directly await the file descriptor being readable
20/// or writable. Once registered, the file descriptor remains registered until
21/// the [`AsyncFd`] is dropped.
22///
23/// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO
24/// object. It is intended that the inner object will handle closing the file
25/// descriptor when it is dropped, avoiding resource leaks and ensuring that the
26/// [`AsyncFd`] can clean up the registration before closing the file descriptor.
27/// The [`AsyncFd::into_inner`] function can be used to extract the inner object
28/// to retake control from the tokio IO reactor. The [`OwnedFd`] type is often
29/// used as the inner object, as it is the simplest type that closes the fd on
30/// drop.
31///
32/// The inner object is required to implement [`AsRawFd`]. This file descriptor
33/// must not change while [`AsyncFd`] owns the inner object, i.e. the
34/// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
35/// file descriptor when called multiple times. Failure to uphold this results
36/// in unspecified behavior in the IO driver, which may include breaking
37/// notifications for other sockets/etc.
38///
39/// Polling for readiness is done by calling the async functions [`readable`]
40/// and [`writable`]. These functions complete when the associated readiness
41/// condition is observed. Any number of tasks can query the same `AsyncFd` in
42/// parallel, on the same or different conditions.
43///
44/// On some platforms, the readiness detecting mechanism relies on
45/// edge-triggered notifications. This means that the OS will only notify Tokio
46/// when the file descriptor transitions from not-ready to ready. For this to
47/// work you should first try to read or write and only poll for readiness
48/// if that fails with an error of [`std::io::ErrorKind::WouldBlock`].
49///
50/// Tokio internally tracks when it has received a ready notification, and when
51/// readiness checking functions like [`readable`] and [`writable`] are called,
52/// if the readiness flag is set, these async functions will complete
53/// immediately. This however does mean that it is critical to ensure that this
54/// ready flag is cleared when (and only when) the file descriptor ceases to be
55/// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions
56/// serves this function; after calling a readiness-checking async function,
57/// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file
58/// descriptor is no longer in a ready state.
59///
60/// ## Use with to a poll-based API
61///
62/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
63/// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
64/// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
65/// Because these functions don't create a future to hold their state, they have
66/// the limitation that only one task can wait on each direction (read or write)
67/// at a time.
68///
69/// # Examples
70///
71/// This example shows how to turn [`std::net::TcpStream`] asynchronous using
72/// `AsyncFd`. It implements the read/write operations both as an `async fn`
73/// and using the IO traits [`AsyncRead`] and [`AsyncWrite`].
74///
75/// ```no_run
76/// use std::io::{self, Read, Write};
77/// use std::net::TcpStream;
78/// use std::pin::Pin;
79/// use std::task::{ready, Context, Poll};
80/// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
81/// use tokio::io::unix::AsyncFd;
82///
83/// pub struct AsyncTcpStream {
84/// inner: AsyncFd<TcpStream>,
85/// }
86///
87/// impl AsyncTcpStream {
88/// pub fn new(tcp: TcpStream) -> io::Result<Self> {
89/// tcp.set_nonblocking(true)?;
90/// Ok(Self {
91/// inner: AsyncFd::new(tcp)?,
92/// })
93/// }
94///
95/// pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
96/// loop {
97/// let mut guard = self.inner.readable().await?;
98///
99/// match guard.try_io(|inner| inner.get_ref().read(out)) {
100/// Ok(result) => return result,
101/// Err(_would_block) => continue,
102/// }
103/// }
104/// }
105///
106/// pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
107/// loop {
108/// let mut guard = self.inner.writable().await?;
109///
110/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
111/// Ok(result) => return result,
112/// Err(_would_block) => continue,
113/// }
114/// }
115/// }
116/// }
117///
118/// impl AsyncRead for AsyncTcpStream {
119/// fn poll_read(
120/// self: Pin<&mut Self>,
121/// cx: &mut Context<'_>,
122/// buf: &mut ReadBuf<'_>
123/// ) -> Poll<io::Result<()>> {
124/// loop {
125/// let mut guard = ready!(self.inner.poll_read_ready(cx))?;
126///
127/// let unfilled = buf.initialize_unfilled();
128/// match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
129/// Ok(Ok(len)) => {
130/// buf.advance(len);
131/// return Poll::Ready(Ok(()));
132/// },
133/// Ok(Err(err)) => return Poll::Ready(Err(err)),
134/// Err(_would_block) => continue,
135/// }
136/// }
137/// }
138/// }
139///
140/// impl AsyncWrite for AsyncTcpStream {
141/// fn poll_write(
142/// self: Pin<&mut Self>,
143/// cx: &mut Context<'_>,
144/// buf: &[u8]
145/// ) -> Poll<io::Result<usize>> {
146/// loop {
147/// let mut guard = ready!(self.inner.poll_write_ready(cx))?;
148///
149/// match guard.try_io(|inner| inner.get_ref().write(buf)) {
150/// Ok(result) => return Poll::Ready(result),
151/// Err(_would_block) => continue,
152/// }
153/// }
154/// }
155///
156/// fn poll_flush(
157/// self: Pin<&mut Self>,
158/// cx: &mut Context<'_>,
159/// ) -> Poll<io::Result<()>> {
160/// // tcp flush is a no-op
161/// Poll::Ready(Ok(()))
162/// }
163///
164/// fn poll_shutdown(
165/// self: Pin<&mut Self>,
166/// cx: &mut Context<'_>,
167/// ) -> Poll<io::Result<()>> {
168/// self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
169/// Poll::Ready(Ok(()))
170/// }
171/// }
172/// ```
173///
174/// [`readable`]: method@Self::readable
175/// [`writable`]: method@Self::writable
176/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
177/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
178/// [`AsyncRead`]: trait@crate::io::AsyncRead
179/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
180/// [`OwnedFd`]: struct@std::os::fd::OwnedFd
181pub struct AsyncFd<T: AsRawFd> {
182 registration: Registration,
183 // The inner value is always present. the Option is required for `drop` and `into_inner`.
184 // In all other methods `unwrap` is valid, and will never panic.
185 inner: Option<T>,
186}
187
188/// Represents an IO-ready event detected on a particular file descriptor that
189/// has not yet been acknowledged. This is a `must_use` structure to help ensure
190/// that you do not forget to explicitly clear (or not clear) the event.
191///
192/// This type exposes an immutable reference to the underlying IO object.
193#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
194pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
195 async_fd: &'a AsyncFd<T>,
196 event: Option<ReadyEvent>,
197}
198
199/// Represents an IO-ready event detected on a particular file descriptor that
200/// has not yet been acknowledged. This is a `must_use` structure to help ensure
201/// that you do not forget to explicitly clear (or not clear) the event.
202///
203/// This type exposes a mutable reference to the underlying IO object.
204#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
205pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
206 async_fd: &'a mut AsyncFd<T>,
207 event: Option<ReadyEvent>,
208}
209
210impl<T: AsRawFd> AsyncFd<T> {
211 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
212 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
213 /// time of creation.
214 ///
215 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
216 /// control, use [`AsyncFd::with_interest`].
217 ///
218 /// This method must be called in the context of a tokio runtime.
219 ///
220 /// # Panics
221 ///
222 /// This function panics if there is no current reactor set, or if the `rt`
223 /// feature flag is not enabled.
224 #[inline]
225 #[track_caller]
226 pub fn new(inner: T) -> io::Result<Self>
227 where
228 T: AsRawFd,
229 {
230 Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE)
231 }
232
233 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
234 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
235 /// file descriptor is cached at the time of creation.
236 ///
237 /// # Panics
238 ///
239 /// This function panics if there is no current reactor set, or if the `rt`
240 /// feature flag is not enabled.
241 #[inline]
242 #[track_caller]
243 pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
244 where
245 T: AsRawFd,
246 {
247 Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
248 }
249
250 #[track_caller]
251 pub(crate) fn new_with_handle_and_interest(
252 inner: T,
253 handle: scheduler::Handle,
254 interest: Interest,
255 ) -> io::Result<Self> {
256 Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
257 }
258
259 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
260 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
261 /// time of creation.
262 ///
263 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
264 /// control, use [`AsyncFd::try_with_interest`].
265 ///
266 /// This method must be called in the context of a tokio runtime.
267 ///
268 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
269 /// passed to this function.
270 ///
271 /// # Panics
272 ///
273 /// This function panics if there is no current reactor set, or if the `rt`
274 /// feature flag is not enabled.
275 #[inline]
276 #[track_caller]
277 pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
278 where
279 T: AsRawFd,
280 {
281 Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
282 }
283
284 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
285 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
286 /// file descriptor is cached at the time of creation.
287 ///
288 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
289 /// passed to this function.
290 ///
291 /// # Panics
292 ///
293 /// This function panics if there is no current reactor set, or if the `rt`
294 /// feature flag is not enabled.
295 #[inline]
296 #[track_caller]
297 pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
298 where
299 T: AsRawFd,
300 {
301 Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
302 }
303
304 #[track_caller]
305 pub(crate) fn try_new_with_handle_and_interest(
306 inner: T,
307 handle: scheduler::Handle,
308 interest: Interest,
309 ) -> Result<Self, AsyncFdTryNewError<T>> {
310 let fd = inner.as_raw_fd();
311
312 match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
313 Ok(registration) => Ok(AsyncFd {
314 registration,
315 inner: Some(inner),
316 }),
317 Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
318 }
319 }
320
321 /// Returns a shared reference to the backing object of this [`AsyncFd`].
322 #[inline]
323 pub fn get_ref(&self) -> &T {
324 self.inner.as_ref().unwrap()
325 }
326
327 /// Returns a mutable reference to the backing object of this [`AsyncFd`].
328 #[inline]
329 pub fn get_mut(&mut self) -> &mut T {
330 self.inner.as_mut().unwrap()
331 }
332
333 fn take_inner(&mut self) -> Option<T> {
334 let inner = self.inner.take()?;
335 let fd = inner.as_raw_fd();
336
337 let _ = self.registration.deregister(&mut SourceFd(&fd));
338
339 Some(inner)
340 }
341
342 /// Deregisters this file descriptor and returns ownership of the backing
343 /// object.
344 pub fn into_inner(mut self) -> T {
345 self.take_inner().unwrap()
346 }
347
348 /// Polls for read readiness.
349 ///
350 /// If the file descriptor is not currently ready for reading, this method
351 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
352 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
353 ///
354 /// Note that on multiple calls to [`poll_read_ready`] or
355 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
356 /// most recent call is scheduled to receive a wakeup. (However,
357 /// [`poll_write_ready`] retains a second, independent waker).
358 ///
359 /// This method is intended for cases where creating and pinning a future
360 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
361 /// preferred, as this supports polling from multiple tasks at once.
362 ///
363 /// This method takes `&self`, so it is possible to call this method
364 /// concurrently with other methods on this struct. This method only
365 /// provides shared access to the inner IO resource when handling the
366 /// [`AsyncFdReadyGuard`].
367 ///
368 /// [`poll_read_ready`]: method@Self::poll_read_ready
369 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
370 /// [`poll_write_ready`]: method@Self::poll_write_ready
371 /// [`readable`]: method@Self::readable
372 /// [`Context`]: struct@std::task::Context
373 /// [`Waker`]: struct@std::task::Waker
374 /// [`Waker::wake`]: method@std::task::Waker::wake
375 pub fn poll_read_ready<'a>(
376 &'a self,
377 cx: &mut Context<'_>,
378 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
379 let event = ready!(self.registration.poll_read_ready(cx))?;
380
381 Poll::Ready(Ok(AsyncFdReadyGuard {
382 async_fd: self,
383 event: Some(event),
384 }))
385 }
386
387 /// Polls for read readiness.
388 ///
389 /// If the file descriptor is not currently ready for reading, this method
390 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
391 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
392 ///
393 /// Note that on multiple calls to [`poll_read_ready`] or
394 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
395 /// most recent call is scheduled to receive a wakeup. (However,
396 /// [`poll_write_ready`] retains a second, independent waker).
397 ///
398 /// This method is intended for cases where creating and pinning a future
399 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
400 /// preferred, as this supports polling from multiple tasks at once.
401 ///
402 /// This method takes `&mut self`, so it is possible to access the inner IO
403 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
404 ///
405 /// [`poll_read_ready`]: method@Self::poll_read_ready
406 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
407 /// [`poll_write_ready`]: method@Self::poll_write_ready
408 /// [`readable`]: method@Self::readable
409 /// [`Context`]: struct@std::task::Context
410 /// [`Waker`]: struct@std::task::Waker
411 /// [`Waker::wake`]: method@std::task::Waker::wake
412 pub fn poll_read_ready_mut<'a>(
413 &'a mut self,
414 cx: &mut Context<'_>,
415 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
416 let event = ready!(self.registration.poll_read_ready(cx))?;
417
418 Poll::Ready(Ok(AsyncFdReadyMutGuard {
419 async_fd: self,
420 event: Some(event),
421 }))
422 }
423
424 /// Polls for write readiness.
425 ///
426 /// If the file descriptor is not currently ready for writing, this method
427 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
428 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
429 ///
430 /// Note that on multiple calls to [`poll_write_ready`] or
431 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
432 /// most recent call is scheduled to receive a wakeup. (However,
433 /// [`poll_read_ready`] retains a second, independent waker).
434 ///
435 /// This method is intended for cases where creating and pinning a future
436 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
437 /// preferred, as this supports polling from multiple tasks at once.
438 ///
439 /// This method takes `&self`, so it is possible to call this method
440 /// concurrently with other methods on this struct. This method only
441 /// provides shared access to the inner IO resource when handling the
442 /// [`AsyncFdReadyGuard`].
443 ///
444 /// [`poll_read_ready`]: method@Self::poll_read_ready
445 /// [`poll_write_ready`]: method@Self::poll_write_ready
446 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
447 /// [`writable`]: method@Self::readable
448 /// [`Context`]: struct@std::task::Context
449 /// [`Waker`]: struct@std::task::Waker
450 /// [`Waker::wake`]: method@std::task::Waker::wake
451 pub fn poll_write_ready<'a>(
452 &'a self,
453 cx: &mut Context<'_>,
454 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
455 let event = ready!(self.registration.poll_write_ready(cx))?;
456
457 Poll::Ready(Ok(AsyncFdReadyGuard {
458 async_fd: self,
459 event: Some(event),
460 }))
461 }
462
463 /// Polls for write readiness.
464 ///
465 /// If the file descriptor is not currently ready for writing, this method
466 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
467 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
468 ///
469 /// Note that on multiple calls to [`poll_write_ready`] or
470 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
471 /// most recent call is scheduled to receive a wakeup. (However,
472 /// [`poll_read_ready`] retains a second, independent waker).
473 ///
474 /// This method is intended for cases where creating and pinning a future
475 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
476 /// preferred, as this supports polling from multiple tasks at once.
477 ///
478 /// This method takes `&mut self`, so it is possible to access the inner IO
479 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
480 ///
481 /// [`poll_read_ready`]: method@Self::poll_read_ready
482 /// [`poll_write_ready`]: method@Self::poll_write_ready
483 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
484 /// [`writable`]: method@Self::readable
485 /// [`Context`]: struct@std::task::Context
486 /// [`Waker`]: struct@std::task::Waker
487 /// [`Waker::wake`]: method@std::task::Waker::wake
488 pub fn poll_write_ready_mut<'a>(
489 &'a mut self,
490 cx: &mut Context<'_>,
491 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
492 let event = ready!(self.registration.poll_write_ready(cx))?;
493
494 Poll::Ready(Ok(AsyncFdReadyMutGuard {
495 async_fd: self,
496 event: Some(event),
497 }))
498 }
499
500 /// Waits for any of the requested ready states, returning a
501 /// [`AsyncFdReadyGuard`] that must be dropped to resume
502 /// polling for the requested ready states.
503 ///
504 /// The function may complete without the file descriptor being ready. This is a
505 /// false-positive and attempting an operation will return with
506 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
507 /// [`Ready`] set, so you should always check the returned value and possibly
508 /// wait again if the requested states are not set.
509 ///
510 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
511 /// When a combined interest is used, it is important to clear only the readiness
512 /// that is actually observed to block. For instance when the combined
513 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
514 /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method:
515 /// `guard.clear_ready_matching(Ready::READABLE)`.
516 /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`]
517 /// method clears all readiness flags.
518 ///
519 /// This method takes `&self`, so it is possible to call this method
520 /// concurrently with other methods on this struct. This method only
521 /// provides shared access to the inner IO resource when handling the
522 /// [`AsyncFdReadyGuard`].
523 ///
524 /// # Examples
525 ///
526 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
527 /// splitting.
528 ///
529 /// ```no_run
530 /// use std::error::Error;
531 /// use std::io;
532 /// use std::io::{Read, Write};
533 /// use std::net::TcpStream;
534 /// use tokio::io::unix::AsyncFd;
535 /// use tokio::io::{Interest, Ready};
536 ///
537 /// #[tokio::main]
538 /// async fn main() -> Result<(), Box<dyn Error>> {
539 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
540 /// stream.set_nonblocking(true)?;
541 /// let stream = AsyncFd::new(stream)?;
542 ///
543 /// loop {
544 /// let mut guard = stream
545 /// .ready(Interest::READABLE | Interest::WRITABLE)
546 /// .await?;
547 ///
548 /// if guard.ready().is_readable() {
549 /// let mut data = vec![0; 1024];
550 /// // Try to read data, this may still fail with `WouldBlock`
551 /// // if the readiness event is a false positive.
552 /// match stream.get_ref().read(&mut data) {
553 /// Ok(n) => {
554 /// println!("read {} bytes", n);
555 /// }
556 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
557 /// // a read has blocked, but a write might still succeed.
558 /// // clear only the read readiness.
559 /// guard.clear_ready_matching(Ready::READABLE);
560 /// continue;
561 /// }
562 /// Err(e) => {
563 /// return Err(e.into());
564 /// }
565 /// }
566 /// }
567 ///
568 /// if guard.ready().is_writable() {
569 /// // Try to write data, this may still fail with `WouldBlock`
570 /// // if the readiness event is a false positive.
571 /// match stream.get_ref().write(b"hello world") {
572 /// Ok(n) => {
573 /// println!("write {} bytes", n);
574 /// }
575 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
576 /// // a write has blocked, but a read might still succeed.
577 /// // clear only the write readiness.
578 /// guard.clear_ready_matching(Ready::WRITABLE);
579 /// continue;
580 /// }
581 /// Err(e) => {
582 /// return Err(e.into());
583 /// }
584 /// }
585 /// }
586 /// }
587 /// }
588 /// ```
589 pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
590 let event = self.registration.readiness(interest).await?;
591
592 Ok(AsyncFdReadyGuard {
593 async_fd: self,
594 event: Some(event),
595 })
596 }
597
598 /// Waits for any of the requested ready states, returning a
599 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume
600 /// polling for the requested ready states.
601 ///
602 /// The function may complete without the file descriptor being ready. This is a
603 /// false-positive and attempting an operation will return with
604 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
605 /// [`Ready`] set, so you should always check the returned value and possibly
606 /// wait again if the requested states are not set.
607 ///
608 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
609 /// When a combined interest is used, it is important to clear only the readiness
610 /// that is actually observed to block. For instance when the combined
611 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
612 /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method:
613 /// `guard.clear_ready_matching(Ready::READABLE)`.
614 /// Also clearing the write readiness in this case would be incorrect.
615 /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags.
616 ///
617 /// This method takes `&mut self`, so it is possible to access the inner IO
618 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
619 ///
620 /// # Examples
621 ///
622 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
623 /// splitting.
624 ///
625 /// ```no_run
626 /// use std::error::Error;
627 /// use std::io;
628 /// use std::io::{Read, Write};
629 /// use std::net::TcpStream;
630 /// use tokio::io::unix::AsyncFd;
631 /// use tokio::io::{Interest, Ready};
632 ///
633 /// #[tokio::main]
634 /// async fn main() -> Result<(), Box<dyn Error>> {
635 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
636 /// stream.set_nonblocking(true)?;
637 /// let mut stream = AsyncFd::new(stream)?;
638 ///
639 /// loop {
640 /// let mut guard = stream
641 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
642 /// .await?;
643 ///
644 /// if guard.ready().is_readable() {
645 /// let mut data = vec![0; 1024];
646 /// // Try to read data, this may still fail with `WouldBlock`
647 /// // if the readiness event is a false positive.
648 /// match guard.get_inner_mut().read(&mut data) {
649 /// Ok(n) => {
650 /// println!("read {} bytes", n);
651 /// }
652 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
653 /// // a read has blocked, but a write might still succeed.
654 /// // clear only the read readiness.
655 /// guard.clear_ready_matching(Ready::READABLE);
656 /// continue;
657 /// }
658 /// Err(e) => {
659 /// return Err(e.into());
660 /// }
661 /// }
662 /// }
663 ///
664 /// if guard.ready().is_writable() {
665 /// // Try to write data, this may still fail with `WouldBlock`
666 /// // if the readiness event is a false positive.
667 /// match guard.get_inner_mut().write(b"hello world") {
668 /// Ok(n) => {
669 /// println!("write {} bytes", n);
670 /// }
671 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
672 /// // a write has blocked, but a read might still succeed.
673 /// // clear only the write readiness.
674 /// guard.clear_ready_matching(Ready::WRITABLE);
675 /// continue;
676 /// }
677 /// Err(e) => {
678 /// return Err(e.into());
679 /// }
680 /// }
681 /// }
682 /// }
683 /// }
684 /// ```
685 pub async fn ready_mut(
686 &mut self,
687 interest: Interest,
688 ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> {
689 let event = self.registration.readiness(interest).await?;
690
691 Ok(AsyncFdReadyMutGuard {
692 async_fd: self,
693 event: Some(event),
694 })
695 }
696
697 /// Waits for the file descriptor to become readable, returning a
698 /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness
699 /// polling.
700 ///
701 /// This method takes `&self`, so it is possible to call this method
702 /// concurrently with other methods on this struct. This method only
703 /// provides shared access to the inner IO resource when handling the
704 /// [`AsyncFdReadyGuard`].
705 ///
706 /// # Cancel safety
707 ///
708 /// This method is cancel safe. Once a readiness event occurs, the method
709 /// will continue to return immediately until the readiness event is
710 /// consumed by an attempt to read or write that fails with `WouldBlock` or
711 /// `Poll::Pending`.
712 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
713 pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
714 self.ready(Interest::READABLE).await
715 }
716
717 /// Waits for the file descriptor to become readable, returning a
718 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness
719 /// polling.
720 ///
721 /// This method takes `&mut self`, so it is possible to access the inner IO
722 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
723 ///
724 /// # Cancel safety
725 ///
726 /// This method is cancel safe. Once a readiness event occurs, the method
727 /// will continue to return immediately until the readiness event is
728 /// consumed by an attempt to read or write that fails with `WouldBlock` or
729 /// `Poll::Pending`.
730 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
731 pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
732 self.ready_mut(Interest::READABLE).await
733 }
734
735 /// Waits for the file descriptor to become writable, returning a
736 /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness
737 /// polling.
738 ///
739 /// This method takes `&self`, so it is possible to call this method
740 /// concurrently with other methods on this struct. This method only
741 /// provides shared access to the inner IO resource when handling the
742 /// [`AsyncFdReadyGuard`].
743 ///
744 /// # Cancel safety
745 ///
746 /// This method is cancel safe. Once a readiness event occurs, the method
747 /// will continue to return immediately until the readiness event is
748 /// consumed by an attempt to read or write that fails with `WouldBlock` or
749 /// `Poll::Pending`.
750 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
751 pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
752 self.ready(Interest::WRITABLE).await
753 }
754
755 /// Waits for the file descriptor to become writable, returning a
756 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness
757 /// polling.
758 ///
759 /// This method takes `&mut self`, so it is possible to access the inner IO
760 /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
761 ///
762 /// # Cancel safety
763 ///
764 /// This method is cancel safe. Once a readiness event occurs, the method
765 /// will continue to return immediately until the readiness event is
766 /// consumed by an attempt to read or write that fails with `WouldBlock` or
767 /// `Poll::Pending`.
768 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
769 pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
770 self.ready_mut(Interest::WRITABLE).await
771 }
772
773 /// Reads or writes from the file descriptor using a user-provided IO operation.
774 ///
775 /// The `async_io` method is a convenience utility that waits for the file
776 /// descriptor to become ready, and then executes the provided IO operation.
777 /// Since file descriptors may be marked ready spuriously, the closure will
778 /// be called repeatedly until it returns something other than a
779 /// [`WouldBlock`] error. This is done using the following loop:
780 ///
781 /// ```no_run
782 /// # use std::io::{self, Result};
783 /// # struct Dox<T> { inner: T }
784 /// # impl<T> Dox<T> {
785 /// # async fn writable(&self) -> Result<&Self> {
786 /// # Ok(self)
787 /// # }
788 /// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
789 /// # panic!()
790 /// # }
791 /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
792 /// loop {
793 /// // or `readable` if called with the read interest.
794 /// let guard = self.writable().await?;
795 ///
796 /// match guard.try_io(&mut f) {
797 /// Ok(result) => return result,
798 /// Err(_would_block) => continue,
799 /// }
800 /// }
801 /// }
802 /// # }
803 /// ```
804 ///
805 /// The closure should only return a [`WouldBlock`] error if it has performed
806 /// an IO operation on the file descriptor that failed due to the file descriptor not being
807 /// ready. Returning a [`WouldBlock`] error in any other situation will
808 /// incorrectly clear the readiness flag, which can cause the file descriptor to
809 /// behave incorrectly.
810 ///
811 /// The closure should not perform the IO operation using any of the methods
812 /// defined on the Tokio [`AsyncFd`] type, as this will mess with the
813 /// readiness flag and can cause the file descriptor to behave incorrectly.
814 ///
815 /// This method is not intended to be used with combined interests.
816 /// The closure should perform only one type of IO operation, so it should not
817 /// require more than one ready state. This method may panic or sleep forever
818 /// if it is called with a combined interest.
819 ///
820 /// # Examples
821 ///
822 /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
823 /// method waits for readiness, and retries if the send operation does block. This example
824 /// is equivalent to the one given for [`try_io`].
825 ///
826 /// ```no_run
827 /// use tokio::io::{Interest, unix::AsyncFd};
828 ///
829 /// use std::io;
830 /// use std::net::UdpSocket;
831 ///
832 /// #[tokio::main]
833 /// async fn main() -> io::Result<()> {
834 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
835 /// socket.set_nonblocking(true)?;
836 /// let async_fd = AsyncFd::new(socket)?;
837 ///
838 /// let written = async_fd
839 /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
840 /// .await?;
841 ///
842 /// println!("wrote {written} bytes");
843 ///
844 /// Ok(())
845 /// }
846 /// ```
847 ///
848 /// [`try_io`]: AsyncFdReadyGuard::try_io
849 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
850 pub async fn async_io<R>(
851 &self,
852 interest: Interest,
853 mut f: impl FnMut(&T) -> io::Result<R>,
854 ) -> io::Result<R> {
855 self.registration
856 .async_io(interest, || f(self.get_ref()))
857 .await
858 }
859
860 /// Reads or writes from the file descriptor using a user-provided IO operation.
861 ///
862 /// The behavior is the same as [`async_io`], except that the closure can mutate the inner
863 /// value of the [`AsyncFd`].
864 ///
865 /// [`async_io`]: AsyncFd::async_io
866 pub async fn async_io_mut<R>(
867 &mut self,
868 interest: Interest,
869 mut f: impl FnMut(&mut T) -> io::Result<R>,
870 ) -> io::Result<R> {
871 self.registration
872 .async_io(interest, || f(self.inner.as_mut().unwrap()))
873 .await
874 }
875
876 /// Tries to read or write from the file descriptor using a user-provided IO operation.
877 ///
878 /// If the file descriptor is ready, the provided closure is called. The closure
879 /// should attempt to perform IO operation on the file descriptor by manually
880 /// calling the appropriate syscall. If the operation fails because the
881 /// file descriptor is not actually ready, then the closure should return a
882 /// `WouldBlock` error and the readiness flag is cleared. The return value
883 /// of the closure is then returned by `try_io`.
884 ///
885 /// If the file descriptor is not ready, then the closure is not called
886 /// and a `WouldBlock` error is returned.
887 ///
888 /// The closure should only return a `WouldBlock` error if it has performed
889 /// an IO operation on the file descriptor that failed due to the file descriptor not being
890 /// ready. Returning a `WouldBlock` error in any other situation will
891 /// incorrectly clear the readiness flag, which can cause the file descriptor to
892 /// behave incorrectly.
893 ///
894 /// The closure should not perform the IO operation using any of the methods
895 /// defined on the Tokio `AsyncFd` type, as this will mess with the
896 /// readiness flag and can cause the file descriptor to behave incorrectly.
897 ///
898 /// This method is not intended to be used with combined interests.
899 /// The closure should perform only one type of IO operation, so it should not
900 /// require more than one ready state. This method may panic or sleep forever
901 /// if it is called with a combined interest.
902 pub fn try_io<R>(
903 &self,
904 interest: Interest,
905 f: impl FnOnce(&T) -> io::Result<R>,
906 ) -> io::Result<R> {
907 self.registration
908 .try_io(interest, || f(self.inner.as_ref().unwrap()))
909 }
910
911 /// Tries to read or write from the file descriptor using a user-provided IO operation.
912 ///
913 /// The behavior is the same as [`try_io`], except that the closure can mutate the inner
914 /// value of the [`AsyncFd`].
915 ///
916 /// [`try_io`]: AsyncFd::try_io
917 pub fn try_io_mut<R>(
918 &mut self,
919 interest: Interest,
920 f: impl FnOnce(&mut T) -> io::Result<R>,
921 ) -> io::Result<R> {
922 self.registration
923 .try_io(interest, || f(self.inner.as_mut().unwrap()))
924 }
925}
926
927impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
928 fn as_raw_fd(&self) -> RawFd {
929 self.inner.as_ref().unwrap().as_raw_fd()
930 }
931}
932
933impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> {
934 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
935 unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) }
936 }
937}
938
939impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
940 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941 f.debug_struct("AsyncFd")
942 .field("inner", &self.inner)
943 .finish()
944 }
945}
946
947impl<T: AsRawFd> Drop for AsyncFd<T> {
948 fn drop(&mut self) {
949 let _ = self.take_inner();
950 }
951}
952
953impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
954 /// Indicates to tokio that the file descriptor is no longer ready. All
955 /// internal readiness flags will be cleared, and tokio will wait for the
956 /// next edge-triggered readiness notification from the OS.
957 ///
958 /// This function is commonly used with guards returned by [`AsyncFd::readable`] and
959 /// [`AsyncFd::writable`].
960 ///
961 /// It is critical that this function not be called unless your code
962 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
963 /// it simply because, for example, a read succeeded; it should be called
964 /// when a read is observed to block.
965 ///
966 /// This method only clears readiness events that happened before the creation of this guard.
967 /// In other words, if the IO resource becomes ready between the creation of the guard and
968 /// this call to `clear_ready`, then the readiness is not actually cleared.
969 pub fn clear_ready(&mut self) {
970 if let Some(event) = self.event.take() {
971 self.async_fd.registration.clear_readiness(event);
972 }
973 }
974
975 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
976 /// The internal readiness flag will be cleared, and tokio will wait for the
977 /// next edge-triggered readiness notification from the OS.
978 ///
979 /// This function is useful in combination with the [`AsyncFd::ready`] method when a
980 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
981 ///
982 /// It is critical that this function not be called unless your code
983 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
984 /// Do not call it simply because, for example, a read succeeded; it should be called
985 /// when a read is observed to block. Only clear the specific readiness that is observed to
986 /// block. For example when a read blocks when using a combined interest,
987 /// only clear `Ready::READABLE`.
988 ///
989 /// This method only clears readiness events that happened before the creation of this guard.
990 /// In other words, if the IO resource becomes ready between the creation of the guard and
991 /// this call to `clear_ready`, then the readiness is not actually cleared.
992 ///
993 /// # Examples
994 ///
995 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
996 /// splitting.
997 ///
998 /// ```no_run
999 /// use std::error::Error;
1000 /// use std::io;
1001 /// use std::io::{Read, Write};
1002 /// use std::net::TcpStream;
1003 /// use tokio::io::unix::AsyncFd;
1004 /// use tokio::io::{Interest, Ready};
1005 ///
1006 /// #[tokio::main]
1007 /// async fn main() -> Result<(), Box<dyn Error>> {
1008 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
1009 /// stream.set_nonblocking(true)?;
1010 /// let stream = AsyncFd::new(stream)?;
1011 ///
1012 /// loop {
1013 /// let mut guard = stream
1014 /// .ready(Interest::READABLE | Interest::WRITABLE)
1015 /// .await?;
1016 ///
1017 /// if guard.ready().is_readable() {
1018 /// let mut data = vec![0; 1024];
1019 /// // Try to read data, this may still fail with `WouldBlock`
1020 /// // if the readiness event is a false positive.
1021 /// match stream.get_ref().read(&mut data) {
1022 /// Ok(n) => {
1023 /// println!("read {} bytes", n);
1024 /// }
1025 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1026 /// // a read has blocked, but a write might still succeed.
1027 /// // clear only the read readiness.
1028 /// guard.clear_ready_matching(Ready::READABLE);
1029 /// continue;
1030 /// }
1031 /// Err(e) => {
1032 /// return Err(e.into());
1033 /// }
1034 /// }
1035 /// }
1036 ///
1037 /// if guard.ready().is_writable() {
1038 /// // Try to write data, this may still fail with `WouldBlock`
1039 /// // if the readiness event is a false positive.
1040 /// match stream.get_ref().write(b"hello world") {
1041 /// Ok(n) => {
1042 /// println!("write {} bytes", n);
1043 /// }
1044 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1045 /// // a write has blocked, but a read might still succeed.
1046 /// // clear only the write readiness.
1047 /// guard.clear_ready_matching(Ready::WRITABLE);
1048 /// continue;
1049 /// }
1050 /// Err(e) => {
1051 /// return Err(e.into());
1052 /// }
1053 /// }
1054 /// }
1055 /// }
1056 /// }
1057 /// ```
1058 pub fn clear_ready_matching(&mut self, ready: Ready) {
1059 if let Some(mut event) = self.event.take() {
1060 self.async_fd
1061 .registration
1062 .clear_readiness(event.with_ready(ready));
1063
1064 // the event is no longer ready for the readiness that was just cleared
1065 event.ready = event.ready - ready;
1066
1067 if !event.ready.is_empty() {
1068 self.event = Some(event);
1069 }
1070 }
1071 }
1072
1073 /// This method should be invoked when you intentionally want to keep the
1074 /// ready flag asserted.
1075 ///
1076 /// While this function is itself a no-op, it satisfies the `#[must_use]`
1077 /// constraint on the [`AsyncFdReadyGuard`] type.
1078 pub fn retain_ready(&mut self) {
1079 // no-op
1080 }
1081
1082 /// Get the [`Ready`] value associated with this guard.
1083 ///
1084 /// This method will return the empty readiness state if
1085 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1086 /// the guard.
1087 ///
1088 /// [`Ready`]: crate::io::Ready
1089 pub fn ready(&self) -> Ready {
1090 match &self.event {
1091 Some(event) => event.ready,
1092 None => Ready::EMPTY,
1093 }
1094 }
1095
1096 /// Performs the provided IO operation.
1097 ///
1098 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1099 /// with this file descriptor is cleared, and the method returns
1100 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1101 /// `AsyncFd` again when this happens.
1102 ///
1103 /// This method helps ensure that the readiness state of the underlying file
1104 /// descriptor remains in sync with the tokio-side readiness state, by
1105 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1106 /// occurs. It is the responsibility of the caller to ensure that `f`
1107 /// returns [`WouldBlock`] only if the file descriptor that originated this
1108 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1109 /// create this `AsyncFdReadyGuard`.
1110 ///
1111 /// # Examples
1112 ///
1113 /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
1114 /// for write-readiness and retrying when the send operation does block are explicit.
1115 /// This example can be written more succinctly using [`AsyncFd::async_io`].
1116 ///
1117 /// ```no_run
1118 /// use tokio::io::unix::AsyncFd;
1119 ///
1120 /// use std::io;
1121 /// use std::net::UdpSocket;
1122 ///
1123 /// #[tokio::main]
1124 /// async fn main() -> io::Result<()> {
1125 /// let socket = UdpSocket::bind("0.0.0.0:8080")?;
1126 /// socket.set_nonblocking(true)?;
1127 /// let async_fd = AsyncFd::new(socket)?;
1128 ///
1129 /// let written = loop {
1130 /// let mut guard = async_fd.writable().await?;
1131 /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
1132 /// Ok(result) => {
1133 /// break result?;
1134 /// }
1135 /// Err(_would_block) => {
1136 /// // try_io already cleared the file descriptor's readiness state
1137 /// continue;
1138 /// }
1139 /// }
1140 /// };
1141 ///
1142 /// println!("wrote {written} bytes");
1143 ///
1144 /// Ok(())
1145 /// }
1146 /// ```
1147 ///
1148 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1149 // Alias for old name in 0.x
1150 #[cfg_attr(docsrs, doc(alias = "with_io"))]
1151 pub fn try_io<R>(
1152 &mut self,
1153 f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
1154 ) -> Result<io::Result<R>, TryIoError> {
1155 let result = f(self.async_fd);
1156
1157 match result {
1158 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1159 self.clear_ready();
1160 Err(TryIoError(()))
1161 }
1162 result => Ok(result),
1163 }
1164 }
1165
1166 /// Returns a shared reference to the inner [`AsyncFd`].
1167 pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
1168 self.async_fd
1169 }
1170
1171 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1172 pub fn get_inner(&self) -> &'a Inner {
1173 self.get_ref().get_ref()
1174 }
1175}
1176
1177impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
1178 /// Indicates to tokio that the file descriptor is no longer ready. All
1179 /// internal readiness flags will be cleared, and tokio will wait for the
1180 /// next edge-triggered readiness notification from the OS.
1181 ///
1182 /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and
1183 /// [`AsyncFd::writable_mut`].
1184 ///
1185 /// It is critical that this function not be called unless your code
1186 /// _actually observes_ that the file descriptor is _not_ ready. Do not call
1187 /// it simply because, for example, a read succeeded; it should be called
1188 /// when a read is observed to block.
1189 ///
1190 /// This method only clears readiness events that happened before the creation of this guard.
1191 /// In other words, if the IO resource becomes ready between the creation of the guard and
1192 /// this call to `clear_ready`, then the readiness is not actually cleared.
1193 pub fn clear_ready(&mut self) {
1194 if let Some(event) = self.event.take() {
1195 self.async_fd.registration.clear_readiness(event);
1196 }
1197 }
1198
1199 /// Indicates to tokio that the file descriptor no longer has a specific readiness.
1200 /// The internal readiness flag will be cleared, and tokio will wait for the
1201 /// next edge-triggered readiness notification from the OS.
1202 ///
1203 /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a
1204 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
1205 ///
1206 /// It is critical that this function not be called unless your code
1207 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
1208 /// Do not call it simply because, for example, a read succeeded; it should be called
1209 /// when a read is observed to block. Only clear the specific readiness that is observed to
1210 /// block. For example when a read blocks when using a combined interest,
1211 /// only clear `Ready::READABLE`.
1212 ///
1213 /// This method only clears readiness events that happened before the creation of this guard.
1214 /// In other words, if the IO resource becomes ready between the creation of the guard and
1215 /// this call to `clear_ready`, then the readiness is not actually cleared.
1216 ///
1217 /// # Examples
1218 ///
1219 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
1220 /// splitting.
1221 ///
1222 /// ```no_run
1223 /// use std::error::Error;
1224 /// use std::io;
1225 /// use std::io::{Read, Write};
1226 /// use std::net::TcpStream;
1227 /// use tokio::io::unix::AsyncFd;
1228 /// use tokio::io::{Interest, Ready};
1229 ///
1230 /// #[tokio::main]
1231 /// async fn main() -> Result<(), Box<dyn Error>> {
1232 /// let stream = TcpStream::connect("127.0.0.1:8080")?;
1233 /// stream.set_nonblocking(true)?;
1234 /// let mut stream = AsyncFd::new(stream)?;
1235 ///
1236 /// loop {
1237 /// let mut guard = stream
1238 /// .ready_mut(Interest::READABLE | Interest::WRITABLE)
1239 /// .await?;
1240 ///
1241 /// if guard.ready().is_readable() {
1242 /// let mut data = vec![0; 1024];
1243 /// // Try to read data, this may still fail with `WouldBlock`
1244 /// // if the readiness event is a false positive.
1245 /// match guard.get_inner_mut().read(&mut data) {
1246 /// Ok(n) => {
1247 /// println!("read {} bytes", n);
1248 /// }
1249 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1250 /// // a read has blocked, but a write might still succeed.
1251 /// // clear only the read readiness.
1252 /// guard.clear_ready_matching(Ready::READABLE);
1253 /// continue;
1254 /// }
1255 /// Err(e) => {
1256 /// return Err(e.into());
1257 /// }
1258 /// }
1259 /// }
1260 ///
1261 /// if guard.ready().is_writable() {
1262 /// // Try to write data, this may still fail with `WouldBlock`
1263 /// // if the readiness event is a false positive.
1264 /// match guard.get_inner_mut().write(b"hello world") {
1265 /// Ok(n) => {
1266 /// println!("write {} bytes", n);
1267 /// }
1268 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1269 /// // a write has blocked, but a read might still succeed.
1270 /// // clear only the write readiness.
1271 /// guard.clear_ready_matching(Ready::WRITABLE);
1272 /// continue;
1273 /// }
1274 /// Err(e) => {
1275 /// return Err(e.into());
1276 /// }
1277 /// }
1278 /// }
1279 /// }
1280 /// }
1281 /// ```
1282 pub fn clear_ready_matching(&mut self, ready: Ready) {
1283 if let Some(mut event) = self.event.take() {
1284 self.async_fd
1285 .registration
1286 .clear_readiness(event.with_ready(ready));
1287
1288 // the event is no longer ready for the readiness that was just cleared
1289 event.ready = event.ready - ready;
1290
1291 if !event.ready.is_empty() {
1292 self.event = Some(event);
1293 }
1294 }
1295 }
1296
1297 /// This method should be invoked when you intentionally want to keep the
1298 /// ready flag asserted.
1299 ///
1300 /// While this function is itself a no-op, it satisfies the `#[must_use]`
1301 /// constraint on the [`AsyncFdReadyGuard`] type.
1302 pub fn retain_ready(&mut self) {
1303 // no-op
1304 }
1305
1306 /// Get the [`Ready`] value associated with this guard.
1307 ///
1308 /// This method will return the empty readiness state if
1309 /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1310 /// the guard.
1311 ///
1312 /// [`Ready`]: super::Ready
1313 pub fn ready(&self) -> Ready {
1314 match &self.event {
1315 Some(event) => event.ready,
1316 None => Ready::EMPTY,
1317 }
1318 }
1319
1320 /// Performs the provided IO operation.
1321 ///
1322 /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1323 /// with this file descriptor is cleared, and the method returns
1324 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1325 /// `AsyncFd` again when this happens.
1326 ///
1327 /// This method helps ensure that the readiness state of the underlying file
1328 /// descriptor remains in sync with the tokio-side readiness state, by
1329 /// clearing the tokio-side state only when a [`WouldBlock`] condition
1330 /// occurs. It is the responsibility of the caller to ensure that `f`
1331 /// returns [`WouldBlock`] only if the file descriptor that originated this
1332 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1333 /// create this `AsyncFdReadyGuard`.
1334 ///
1335 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1336 pub fn try_io<R>(
1337 &mut self,
1338 f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
1339 ) -> Result<io::Result<R>, TryIoError> {
1340 let result = f(self.async_fd);
1341
1342 match result {
1343 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1344 self.clear_ready();
1345 Err(TryIoError(()))
1346 }
1347 result => Ok(result),
1348 }
1349 }
1350
1351 /// Returns a shared reference to the inner [`AsyncFd`].
1352 pub fn get_ref(&self) -> &AsyncFd<Inner> {
1353 self.async_fd
1354 }
1355
1356 /// Returns a mutable reference to the inner [`AsyncFd`].
1357 pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
1358 self.async_fd
1359 }
1360
1361 /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
1362 pub fn get_inner(&self) -> &Inner {
1363 self.get_ref().get_ref()
1364 }
1365
1366 /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
1367 pub fn get_inner_mut(&mut self) -> &mut Inner {
1368 self.get_mut().get_mut()
1369 }
1370}
1371
1372impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
1373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1374 f.debug_struct("ReadyGuard")
1375 .field("async_fd", &self.async_fd)
1376 .finish()
1377 }
1378}
1379
1380impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> {
1381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1382 f.debug_struct("MutReadyGuard")
1383 .field("async_fd", &self.async_fd)
1384 .finish()
1385 }
1386}
1387
1388/// The error type returned by [`try_io`].
1389///
1390/// This error indicates that the IO resource returned a [`WouldBlock`] error.
1391///
1392/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1393/// [`try_io`]: method@AsyncFdReadyGuard::try_io
1394#[derive(Debug)]
1395pub struct TryIoError(());
1396
1397/// Error returned by [`try_new`] or [`try_with_interest`].
1398///
1399/// [`try_new`]: AsyncFd::try_new
1400/// [`try_with_interest`]: AsyncFd::try_with_interest
1401pub struct AsyncFdTryNewError<T> {
1402 inner: T,
1403 cause: io::Error,
1404}
1405
1406impl<T> AsyncFdTryNewError<T> {
1407 /// Returns the original object passed to [`try_new`] or [`try_with_interest`]
1408 /// alongside the error that caused these functions to fail.
1409 ///
1410 /// [`try_new`]: AsyncFd::try_new
1411 /// [`try_with_interest`]: AsyncFd::try_with_interest
1412 pub fn into_parts(self) -> (T, io::Error) {
1413 (self.inner, self.cause)
1414 }
1415}
1416
1417impl<T> fmt::Display for AsyncFdTryNewError<T> {
1418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1419 fmt::Display::fmt(&self.cause, f)
1420 }
1421}
1422
1423impl<T> fmt::Debug for AsyncFdTryNewError<T> {
1424 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1425 fmt::Debug::fmt(&self.cause, f)
1426 }
1427}
1428
1429impl<T> Error for AsyncFdTryNewError<T> {
1430 fn source(&self) -> Option<&(dyn Error + 'static)> {
1431 Some(&self.cause)
1432 }
1433}
1434
1435impl<T> From<AsyncFdTryNewError<T>> for io::Error {
1436 fn from(value: AsyncFdTryNewError<T>) -> Self {
1437 value.cause
1438 }
1439}