tokio/runtime/io/scheduled_io.rs
1use crate::io::interest::Interest;
2use crate::io::ready::Ready;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Mutex;
5use crate::runtime::io::{Direction, ReadyEvent, Tick};
6use crate::util::bit;
7use crate::util::linked_list::{self, LinkedList};
8use crate::util::WakeList;
9
10use std::cell::UnsafeCell;
11use std::future::Future;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire};
16use std::task::{Context, Poll, Waker};
17
18/// Stored in the I/O driver resource slab.
19#[derive(Debug)]
20// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21// from crossbeam-utils/src/cache_padded.rs
22//
23// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24// lines at a time, so we have to align to 128 bytes rather than 64.
25//
26// Sources:
27// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29//
30// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31//
32// Sources:
33// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34//
35// powerpc64 has 128-byte cache line size.
36//
37// Sources:
38// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39#[cfg_attr(
40 any(
41 target_arch = "x86_64",
42 target_arch = "aarch64",
43 target_arch = "powerpc64",
44 ),
45 repr(align(128))
46)]
47// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
48//
49// Sources:
50// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
55// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
56#[cfg_attr(
57 any(
58 target_arch = "arm",
59 target_arch = "mips",
60 target_arch = "mips64",
61 target_arch = "sparc",
62 target_arch = "hexagon",
63 ),
64 repr(align(32))
65)]
66// m68k has 16-byte cache line size.
67//
68// Sources:
69// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
70#[cfg_attr(target_arch = "m68k", repr(align(16)))]
71// s390x has 256-byte cache line size.
72//
73// Sources:
74// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
75// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
76#[cfg_attr(target_arch = "s390x", repr(align(256)))]
77// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
78//
79// Sources:
80// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
81// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
82// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
83// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
84//
85// All others are assumed to have 64-byte cache line size.
86#[cfg_attr(
87 not(any(
88 target_arch = "x86_64",
89 target_arch = "aarch64",
90 target_arch = "powerpc64",
91 target_arch = "arm",
92 target_arch = "mips",
93 target_arch = "mips64",
94 target_arch = "sparc",
95 target_arch = "hexagon",
96 target_arch = "m68k",
97 target_arch = "s390x",
98 )),
99 repr(align(64))
100)]
101pub(crate) struct ScheduledIo {
102 pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
103
104 /// Packs the resource's readiness and I/O driver latest tick.
105 readiness: AtomicUsize,
106
107 waiters: Mutex<Waiters>,
108}
109
110type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
111
112#[derive(Debug, Default)]
113struct Waiters {
114 /// List of all current waiters.
115 list: WaitList,
116
117 /// Waker used for `AsyncRead`.
118 reader: Option<Waker>,
119
120 /// Waker used for `AsyncWrite`.
121 writer: Option<Waker>,
122}
123
124#[derive(Debug)]
125struct Waiter {
126 pointers: linked_list::Pointers<Waiter>,
127
128 /// The waker for this task.
129 waker: Option<Waker>,
130
131 /// The interest this waiter is waiting on.
132 interest: Interest,
133
134 is_ready: bool,
135
136 /// Should never be `!Unpin`.
137 _p: PhantomPinned,
138}
139
140generate_addr_of_methods! {
141 impl<> Waiter {
142 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
143 &self.pointers
144 }
145 }
146}
147
148/// Future returned by `readiness()`.
149struct Readiness<'a> {
150 scheduled_io: &'a ScheduledIo,
151
152 state: State,
153
154 /// Entry in the waiter `LinkedList`.
155 waiter: UnsafeCell<Waiter>,
156}
157
158enum State {
159 Init,
160 Waiting,
161 Done,
162}
163
164// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
165//
166// | shutdown | driver tick | readiness |
167// |----------+-------------+-----------|
168// | 1 bit | 15 bits + 16 bits |
169
170const READINESS: bit::Pack = bit::Pack::least_significant(16);
171
172const TICK: bit::Pack = READINESS.then(15);
173
174const SHUTDOWN: bit::Pack = TICK.then(1);
175
176// ===== impl ScheduledIo =====
177
178impl Default for ScheduledIo {
179 fn default() -> ScheduledIo {
180 ScheduledIo {
181 linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
182 readiness: AtomicUsize::new(0),
183 waiters: Mutex::new(Waiters::default()),
184 }
185 }
186}
187
188impl ScheduledIo {
189 pub(crate) fn token(&self) -> mio::Token {
190 mio::Token(super::EXPOSE_IO.expose_provenance(self))
191 }
192
193 /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a
194 /// permanently shutdown state.
195 pub(super) fn shutdown(&self) {
196 let mask = SHUTDOWN.pack(1, 0);
197 self.readiness.fetch_or(mask, AcqRel);
198 self.wake(Ready::ALL);
199 }
200
201 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
202 /// the current value, returning the previous readiness value.
203 ///
204 /// # Arguments
205 /// - `tick`: whether setting the tick or trying to clear readiness for a
206 /// specific tick.
207 /// - `f`: a closure returning a new readiness value given the previous
208 /// readiness.
209 pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
210 let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
211 // If the io driver is shut down, then you are only allowed to clear readiness.
212 debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
213
214 const MAX_TICK: usize = TICK.max_value() + 1;
215 let tick = TICK.unpack(curr);
216
217 let new_tick = match tick_op {
218 // Trying to clear readiness with an old event!
219 Tick::Clear(t) if tick as u8 != t => return None,
220 Tick::Clear(t) => t as usize,
221 Tick::Set => tick.wrapping_add(1) % MAX_TICK,
222 };
223 let ready = Ready::from_usize(READINESS.unpack(curr));
224 Some(TICK.pack(new_tick, f(ready).as_usize()))
225 });
226 }
227
228 /// Notifies all pending waiters that have registered interest in `ready`.
229 ///
230 /// There may be many waiters to notify. Waking the pending task **must** be
231 /// done from outside of the lock otherwise there is a potential for a
232 /// deadlock.
233 ///
234 /// A stack array of wakers is created and filled with wakers to notify, the
235 /// lock is released, and the wakers are notified. Because there may be more
236 /// than 32 wakers to notify, if the stack array fills up, the lock is
237 /// released, the array is cleared, and the iteration continues.
238 pub(super) fn wake(&self, ready: Ready) {
239 let mut wakers = WakeList::new();
240
241 let mut waiters = self.waiters.lock();
242
243 // check for AsyncRead slot
244 if ready.is_readable() {
245 if let Some(waker) = waiters.reader.take() {
246 wakers.push(waker);
247 }
248 }
249
250 // check for AsyncWrite slot
251 if ready.is_writable() {
252 if let Some(waker) = waiters.writer.take() {
253 wakers.push(waker);
254 }
255 }
256
257 'outer: loop {
258 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
259
260 while wakers.can_push() {
261 match iter.next() {
262 Some(waiter) => {
263 let waiter = unsafe { &mut *waiter.as_ptr() };
264
265 if let Some(waker) = waiter.waker.take() {
266 waiter.is_ready = true;
267 wakers.push(waker);
268 }
269 }
270 None => {
271 break 'outer;
272 }
273 }
274 }
275
276 drop(waiters);
277
278 wakers.wake_all();
279
280 // Acquire the lock again.
281 waiters = self.waiters.lock();
282 }
283
284 // Release the lock before notifying
285 drop(waiters);
286
287 wakers.wake_all();
288 }
289
290 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
291 let curr = self.readiness.load(Acquire);
292
293 ReadyEvent {
294 tick: TICK.unpack(curr) as u8,
295 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
296 is_shutdown: SHUTDOWN.unpack(curr) != 0,
297 }
298 }
299
300 /// Polls for readiness events in a given direction.
301 ///
302 /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
303 /// which cannot use the `async fn` version. This uses reserved reader
304 /// and writer slots.
305 pub(super) fn poll_readiness(
306 &self,
307 cx: &mut Context<'_>,
308 direction: Direction,
309 ) -> Poll<ReadyEvent> {
310 let curr = self.readiness.load(Acquire);
311
312 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
313 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
314
315 if ready.is_empty() && !is_shutdown {
316 // Update the task info
317 let mut waiters = self.waiters.lock();
318 let waker = match direction {
319 Direction::Read => &mut waiters.reader,
320 Direction::Write => &mut waiters.writer,
321 };
322
323 // Avoid cloning the waker if one is already stored that matches the
324 // current task.
325 match waker {
326 Some(waker) => waker.clone_from(cx.waker()),
327 None => *waker = Some(cx.waker().clone()),
328 }
329
330 // Try again, in case the readiness was changed while we were
331 // taking the waiters lock
332 let curr = self.readiness.load(Acquire);
333 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
334 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
335 if is_shutdown {
336 Poll::Ready(ReadyEvent {
337 tick: TICK.unpack(curr) as u8,
338 ready: direction.mask(),
339 is_shutdown,
340 })
341 } else if ready.is_empty() {
342 Poll::Pending
343 } else {
344 Poll::Ready(ReadyEvent {
345 tick: TICK.unpack(curr) as u8,
346 ready,
347 is_shutdown,
348 })
349 }
350 } else {
351 Poll::Ready(ReadyEvent {
352 tick: TICK.unpack(curr) as u8,
353 ready,
354 is_shutdown,
355 })
356 }
357 }
358
359 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
360 // This consumes the current readiness state **except** for closed
361 // states. Closed states are excluded because they are final states.
362 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
363 self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
364 }
365
366 pub(crate) fn clear_wakers(&self) {
367 let mut waiters = self.waiters.lock();
368 waiters.reader.take();
369 waiters.writer.take();
370 }
371}
372
373impl Drop for ScheduledIo {
374 fn drop(&mut self) {
375 self.wake(Ready::ALL);
376 }
377}
378
379unsafe impl Send for ScheduledIo {}
380unsafe impl Sync for ScheduledIo {}
381
382impl ScheduledIo {
383 /// An async version of `poll_readiness` which uses a linked list of wakers.
384 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
385 self.readiness_fut(interest).await
386 }
387
388 // This is in a separate function so that the borrow checker doesn't think
389 // we are borrowing the `UnsafeCell` possibly over await boundaries.
390 //
391 // Go figure.
392 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
393 Readiness {
394 scheduled_io: self,
395 state: State::Init,
396 waiter: UnsafeCell::new(Waiter {
397 pointers: linked_list::Pointers::new(),
398 waker: None,
399 is_ready: false,
400 interest,
401 _p: PhantomPinned,
402 }),
403 }
404 }
405}
406
407unsafe impl linked_list::Link for Waiter {
408 type Handle = NonNull<Waiter>;
409 type Target = Waiter;
410
411 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
412 *handle
413 }
414
415 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
416 ptr
417 }
418
419 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
420 Waiter::addr_of_pointers(target)
421 }
422}
423
424// ===== impl Readiness =====
425
426impl Future for Readiness<'_> {
427 type Output = ReadyEvent;
428
429 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
430 use std::sync::atomic::Ordering::SeqCst;
431
432 let (scheduled_io, state, waiter) = unsafe {
433 let me = self.get_unchecked_mut();
434 (&me.scheduled_io, &mut me.state, &me.waiter)
435 };
436
437 loop {
438 match *state {
439 State::Init => {
440 // Optimistically check existing readiness
441 let curr = scheduled_io.readiness.load(SeqCst);
442 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
443
444 // Safety: `waiter.interest` never changes
445 let interest = unsafe { (*waiter.get()).interest };
446 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
447
448 if !ready.is_empty() || is_shutdown {
449 // Currently ready!
450 let tick = TICK.unpack(curr) as u8;
451 *state = State::Done;
452 return Poll::Ready(ReadyEvent {
453 tick,
454 ready,
455 is_shutdown,
456 });
457 }
458
459 // Wasn't ready, take the lock (and check again while locked).
460 let mut waiters = scheduled_io.waiters.lock();
461
462 let curr = scheduled_io.readiness.load(SeqCst);
463 let mut ready = Ready::from_usize(READINESS.unpack(curr));
464 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
465
466 if is_shutdown {
467 ready = Ready::ALL;
468 }
469
470 let ready = ready.intersection(interest);
471
472 if !ready.is_empty() || is_shutdown {
473 // Currently ready!
474 let tick = TICK.unpack(curr) as u8;
475 *state = State::Done;
476 return Poll::Ready(ReadyEvent {
477 tick,
478 ready,
479 is_shutdown,
480 });
481 }
482
483 // Not ready even after locked, insert into list...
484
485 // Safety: called while locked
486 unsafe {
487 (*waiter.get()).waker = Some(cx.waker().clone());
488 }
489
490 // Insert the waiter into the linked list
491 //
492 // safety: pointers from `UnsafeCell` are never null.
493 waiters
494 .list
495 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
496 *state = State::Waiting;
497 }
498 State::Waiting => {
499 // Currently in the "Waiting" state, implying the caller has
500 // a waiter stored in the waiter list (guarded by
501 // `notify.waiters`). In order to access the waker fields,
502 // we must hold the lock.
503
504 let waiters = scheduled_io.waiters.lock();
505
506 // Safety: called while locked
507 let w = unsafe { &mut *waiter.get() };
508
509 if w.is_ready {
510 // Our waker has been notified.
511 *state = State::Done;
512 } else {
513 // Update the waker, if necessary.
514 w.waker.as_mut().unwrap().clone_from(cx.waker());
515 return Poll::Pending;
516 }
517
518 // Explicit drop of the lock to indicate the scope that the
519 // lock is held. Because holding the lock is required to
520 // ensure safe access to fields not held within the lock, it
521 // is helpful to visualize the scope of the critical
522 // section.
523 drop(waiters);
524 }
525 State::Done => {
526 // Safety: State::Done means it is no longer shared
527 let w = unsafe { &mut *waiter.get() };
528
529 let curr = scheduled_io.readiness.load(Acquire);
530 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
531
532 // The returned tick might be newer than the event
533 // which notified our waker. This is ok because the future
534 // still didn't return `Poll::Ready`.
535 let tick = TICK.unpack(curr) as u8;
536
537 // The readiness state could have been cleared in the meantime,
538 // but we allow the returned ready set to be empty.
539 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);
540
541 return Poll::Ready(ReadyEvent {
542 tick,
543 ready,
544 is_shutdown,
545 });
546 }
547 }
548 }
549 }
550}
551
552impl Drop for Readiness<'_> {
553 fn drop(&mut self) {
554 let mut waiters = self.scheduled_io.waiters.lock();
555
556 // Safety: `waiter` is only ever stored in `waiters`
557 unsafe {
558 waiters
559 .list
560 .remove(NonNull::new_unchecked(self.waiter.get()))
561 };
562 }
563}
564
565unsafe impl Send for Readiness<'_> {}
566unsafe impl Sync for Readiness<'_> {}