tokio/io/bsd/poll_aio.rs
1//! Use POSIX AIO futures with Tokio.
2
3use crate::io::interest::Interest;
4use crate::runtime::io::{ReadyEvent, Registration};
5use crate::runtime::scheduler;
6use mio::event::Source;
7use mio::Registry;
8use mio::Token;
9use std::fmt;
10use std::io;
11use std::ops::{Deref, DerefMut};
12use std::os::unix::io::AsRawFd;
13use std::os::unix::prelude::RawFd;
14use std::task::{ready, Context, Poll};
15
16/// Like [`mio::event::Source`], but for POSIX AIO only.
17///
18/// Tokio's consumer must pass an implementor of this trait to create a
19/// [`Aio`] object.
20pub trait AioSource {
21 /// Registers this AIO event source with Tokio's reactor.
22 fn register(&mut self, kq: RawFd, token: usize);
23
24 /// Deregisters this AIO event source with Tokio's reactor.
25 fn deregister(&mut self);
26}
27
28/// Wraps the user's AioSource in order to implement mio::event::Source, which
29/// is what the rest of the crate wants.
30struct MioSource<T>(T);
31
32impl<T: AioSource> Source for MioSource<T> {
33 fn register(
34 &mut self,
35 registry: &Registry,
36 token: Token,
37 interests: mio::Interest,
38 ) -> io::Result<()> {
39 assert!(interests.is_aio() || interests.is_lio());
40 self.0.register(registry.as_raw_fd(), usize::from(token));
41 Ok(())
42 }
43
44 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
45 self.0.deregister();
46 Ok(())
47 }
48
49 fn reregister(
50 &mut self,
51 registry: &Registry,
52 token: Token,
53 interests: mio::Interest,
54 ) -> io::Result<()> {
55 assert!(interests.is_aio() || interests.is_lio());
56 self.0.register(registry.as_raw_fd(), usize::from(token));
57 Ok(())
58 }
59}
60
61/// Associates a POSIX AIO control block with the reactor that drives it.
62///
63/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
64/// by the reactor.
65///
66/// The wrapped source may be accessed through the `Aio` via the `Deref` and
67/// `DerefMut` traits.
68///
69/// ## Clearing readiness
70///
71/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
72/// Source is not completely ready and must return to the Pending state,
73/// [`Aio::clear_ready`] may be used. This can be useful with
74/// [`lio_listio`], which may generate a kevent when only a portion of the
75/// operations have completed.
76///
77/// ## Platforms
78///
79/// Only FreeBSD implements POSIX AIO with kqueue notification, so
80/// `Aio` is only available for that operating system.
81///
82/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
83// Note: Unlike every other kqueue event source, POSIX AIO registers events not
84// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
85// aio_write, etc. It needs the kqueue's file descriptor to do that. So
86// AsyncFd can't be used for POSIX AIO.
87//
88// Note that Aio doesn't implement Drop. There's no need. Unlike other
89// kqueue sources, simply dropping the object effectively deregisters it.
90pub struct Aio<E> {
91 io: MioSource<E>,
92 registration: Registration,
93}
94
95// ===== impl Aio =====
96
97impl<E: AioSource> Aio<E> {
98 /// Creates a new `Aio` suitable for use with POSIX AIO functions.
99 ///
100 /// It will be associated with the default reactor. The runtime is usually
101 /// set implicitly when this function is called from a future driven by a
102 /// Tokio runtime, otherwise runtime can be set explicitly with
103 /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
104 pub fn new_for_aio(io: E) -> io::Result<Self> {
105 Self::new_with_interest(io, Interest::AIO)
106 }
107
108 /// Creates a new `Aio` suitable for use with [`lio_listio`].
109 ///
110 /// It will be associated with the default reactor. The runtime is usually
111 /// set implicitly when this function is called from a future driven by a
112 /// Tokio runtime, otherwise runtime can be set explicitly with
113 /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
114 ///
115 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
116 pub fn new_for_lio(io: E) -> io::Result<Self> {
117 Self::new_with_interest(io, Interest::LIO)
118 }
119
120 fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
121 let mut io = MioSource(io);
122 let handle = scheduler::Handle::current();
123 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
124 Ok(Self { io, registration })
125 }
126
127 /// Indicates to Tokio that the source is no longer ready. The internal
128 /// readiness flag will be cleared, and tokio will wait for the next
129 /// edge-triggered readiness notification from the OS.
130 ///
131 /// It is critical that this method not be called unless your code
132 /// _actually observes_ that the source is _not_ ready. The OS must
133 /// deliver a subsequent notification, or this source will block
134 /// forever. It is equally critical that you `do` call this method if you
135 /// resubmit the same structure to the kernel and poll it again.
136 ///
137 /// This method is not very useful with AIO readiness, since each `aiocb`
138 /// structure is typically only used once. It's main use with
139 /// [`lio_listio`], which will sometimes send notification when only a
140 /// portion of its elements are complete. In that case, the caller must
141 /// call `clear_ready` before resubmitting it.
142 ///
143 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
144 pub fn clear_ready(&self, ev: AioEvent) {
145 self.registration.clear_readiness(ev.0)
146 }
147
148 /// Destroy the [`Aio`] and return its inner source.
149 pub fn into_inner(self) -> E {
150 self.io.0
151 }
152
153 /// Polls for readiness. Either AIO or LIO counts.
154 ///
155 /// This method returns:
156 /// * `Poll::Pending` if the underlying operation is not complete, whether
157 /// or not it completed successfully. This will be true if the OS is
158 /// still processing it, or if it has not yet been submitted to the OS.
159 /// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
160 /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
161 /// _not_ indicate that the underlying operation encountered an error.
162 ///
163 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
164 /// is scheduled to receive a wakeup when the underlying operation
165 /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
166 /// `Context` passed to the most recent call is scheduled to receive a wakeup.
167 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
168 let ev = ready!(self.registration.poll_read_ready(cx))?;
169 Poll::Ready(Ok(AioEvent(ev)))
170 }
171}
172
173impl<E: AioSource> Deref for Aio<E> {
174 type Target = E;
175
176 fn deref(&self) -> &E {
177 &self.io.0
178 }
179}
180
181impl<E: AioSource> DerefMut for Aio<E> {
182 fn deref_mut(&mut self) -> &mut E {
183 &mut self.io.0
184 }
185}
186
187impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("Aio").field("io", &self.io.0).finish()
190 }
191}
192
193/// Opaque data returned by [`Aio::poll_ready`].
194///
195/// It can be fed back to [`Aio::clear_ready`].
196#[derive(Debug)]
197pub struct AioEvent(ReadyEvent);