tokio/sync/notify.rs
1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::task::{Context, Poll, Waker};
21
22type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25/// Notifies a single task to wake up.
26///
27/// `Notify` provides a basic mechanism to notify a single task of an event.
28/// `Notify` itself does not carry any data. Instead, it is to be used to signal
29/// another task to perform an operation.
30///
31/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32/// [`notified().await`] method waits for a permit to become available, and
33/// [`notify_one()`] sets a permit **if there currently are no available
34/// permits**.
35///
36/// The synchronization details of `Notify` are similar to
37/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38/// value contains a single permit. [`notified().await`] waits for the permit to
39/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
40/// the permit, waking a pending task if there is one.
41///
42/// If `notify_one()` is called **before** `notified().await`, then the next
43/// call to `notified().await` will complete immediately, consuming the permit.
44/// Any subsequent calls to `notified().await` will wait for a new permit.
45///
46/// If `notify_one()` is called **multiple** times before `notified().await`,
47/// only a **single** permit is stored. The next call to `notified().await` will
48/// complete immediately, but the one after will wait for a new permit.
49///
50/// # Examples
51///
52/// Basic usage.
53///
54/// ```
55/// use tokio::sync::Notify;
56/// use std::sync::Arc;
57///
58/// #[tokio::main]
59/// async fn main() {
60/// let notify = Arc::new(Notify::new());
61/// let notify2 = notify.clone();
62///
63/// let handle = tokio::spawn(async move {
64/// notify2.notified().await;
65/// println!("received notification");
66/// });
67///
68/// println!("sending notification");
69/// notify.notify_one();
70///
71/// // Wait for task to receive notification.
72/// handle.await.unwrap();
73/// }
74/// ```
75///
76/// Unbound multi-producer single-consumer (mpsc) channel.
77///
78/// No wakeups can be lost when using this channel because the call to
79/// `notify_one()` will store a permit in the `Notify`, which the following call
80/// to `notified()` will consume.
81///
82/// ```
83/// use tokio::sync::Notify;
84///
85/// use std::collections::VecDeque;
86/// use std::sync::Mutex;
87///
88/// struct Channel<T> {
89/// values: Mutex<VecDeque<T>>,
90/// notify: Notify,
91/// }
92///
93/// impl<T> Channel<T> {
94/// pub fn send(&self, value: T) {
95/// self.values.lock().unwrap()
96/// .push_back(value);
97///
98/// // Notify the consumer a value is available
99/// self.notify.notify_one();
100/// }
101///
102/// // This is a single-consumer channel, so several concurrent calls to
103/// // `recv` are not allowed.
104/// pub async fn recv(&self) -> T {
105/// loop {
106/// // Drain values
107/// if let Some(value) = self.values.lock().unwrap().pop_front() {
108/// return value;
109/// }
110///
111/// // Wait for values to be available
112/// self.notify.notified().await;
113/// }
114/// }
115/// }
116/// ```
117///
118/// Unbound multi-producer multi-consumer (mpmc) channel.
119///
120/// The call to [`enable`] is important because otherwise if you have two
121/// calls to `recv` and two calls to `send` in parallel, the following could
122/// happen:
123///
124/// 1. Both calls to `try_recv` return `None`.
125/// 2. Both new elements are added to the vector.
126/// 3. The `notify_one` method is called twice, adding only a single
127/// permit to the `Notify`.
128/// 4. Both calls to `recv` reach the `Notified` future. One of them
129/// consumes the permit, and the other sleeps forever.
130///
131/// By adding the `Notified` futures to the list by calling `enable` before
132/// `try_recv`, the `notify_one` calls in step three would remove the
133/// futures from the list and mark them notified instead of adding a permit
134/// to the `Notify`. This ensures that both futures are woken.
135///
136/// Notice that this failure can only happen if there are two concurrent calls
137/// to `recv`. This is why the mpsc example above does not require a call to
138/// `enable`.
139///
140/// ```
141/// use tokio::sync::Notify;
142///
143/// use std::collections::VecDeque;
144/// use std::sync::Mutex;
145///
146/// struct Channel<T> {
147/// messages: Mutex<VecDeque<T>>,
148/// notify_on_sent: Notify,
149/// }
150///
151/// impl<T> Channel<T> {
152/// pub fn send(&self, msg: T) {
153/// let mut locked_queue = self.messages.lock().unwrap();
154/// locked_queue.push_back(msg);
155/// drop(locked_queue);
156///
157/// // Send a notification to one of the calls currently
158/// // waiting in a call to `recv`.
159/// self.notify_on_sent.notify_one();
160/// }
161///
162/// pub fn try_recv(&self) -> Option<T> {
163/// let mut locked_queue = self.messages.lock().unwrap();
164/// locked_queue.pop_front()
165/// }
166///
167/// pub async fn recv(&self) -> T {
168/// let future = self.notify_on_sent.notified();
169/// tokio::pin!(future);
170///
171/// loop {
172/// // Make sure that no wakeup is lost if we get
173/// // `None` from `try_recv`.
174/// future.as_mut().enable();
175///
176/// if let Some(msg) = self.try_recv() {
177/// return msg;
178/// }
179///
180/// // Wait for a call to `notify_one`.
181/// //
182/// // This uses `.as_mut()` to avoid consuming the future,
183/// // which lets us call `Pin::set` below.
184/// future.as_mut().await;
185///
186/// // Reset the future in case another call to
187/// // `try_recv` got the message before us.
188/// future.set(self.notify_on_sent.notified());
189/// }
190/// }
191/// }
192/// ```
193///
194/// [park]: std::thread::park
195/// [unpark]: std::thread::Thread::unpark
196/// [`notified().await`]: Notify::notified()
197/// [`notify_one()`]: Notify::notify_one()
198/// [`enable`]: Notified::enable()
199/// [`Semaphore`]: crate::sync::Semaphore
200#[derive(Debug)]
201pub struct Notify {
202 // `state` uses 2 bits to store one of `EMPTY`,
203 // `WAITING` or `NOTIFIED`. The rest of the bits
204 // are used to store the number of times `notify_waiters`
205 // was called.
206 //
207 // Throughout the code there are two assumptions:
208 // - state can be transitioned *from* `WAITING` only if
209 // `waiters` lock is held
210 // - number of times `notify_waiters` was called can
211 // be modified only if `waiters` lock is held
212 state: AtomicUsize,
213 waiters: Mutex<WaitList>,
214}
215
216#[derive(Debug)]
217struct Waiter {
218 /// Intrusive linked-list pointers.
219 pointers: linked_list::Pointers<Waiter>,
220
221 /// Waiting task's waker. Depending on the value of `notification`,
222 /// this field is either protected by the `waiters` lock in
223 /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224 waker: UnsafeCell<Option<Waker>>,
225
226 /// Notification for this waiter. Uses 2 bits to store if and how was
227 /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
228 /// the rest of it is unused.
229 /// * if it's `None`, then `waker` is protected by the `waiters` lock.
230 /// * if it's `Some`, then `waker` is exclusively owned by the
231 /// enclosing `Waiter` and can be accessed without locking.
232 notification: AtomicNotification,
233
234 /// Should not be `Unpin`.
235 _p: PhantomPinned,
236}
237
238impl Waiter {
239 fn new() -> Waiter {
240 Waiter {
241 pointers: linked_list::Pointers::new(),
242 waker: UnsafeCell::new(None),
243 notification: AtomicNotification::none(),
244 _p: PhantomPinned,
245 }
246 }
247}
248
249generate_addr_of_methods! {
250 impl<> Waiter {
251 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
252 &self.pointers
253 }
254 }
255}
256
257// No notification.
258const NOTIFICATION_NONE: usize = 0b000;
259
260// Notification type used by `notify_one`.
261const NOTIFICATION_ONE: usize = 0b001;
262
263// Notification type used by `notify_last`.
264const NOTIFICATION_LAST: usize = 0b101;
265
266// Notification type used by `notify_waiters`.
267const NOTIFICATION_ALL: usize = 0b010;
268
269/// Notification for a `Waiter`.
270/// This struct is equivalent to `Option<Notification>`, but uses
271/// `AtomicUsize` inside for atomic operations.
272#[derive(Debug)]
273struct AtomicNotification(AtomicUsize);
274
275impl AtomicNotification {
276 fn none() -> Self {
277 AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
278 }
279
280 /// Store-release a notification.
281 /// This method should be called exactly once.
282 fn store_release(&self, notification: Notification) {
283 let data: usize = match notification {
284 Notification::All => NOTIFICATION_ALL,
285 Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
286 Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
287 };
288 self.0.store(data, Release);
289 }
290
291 fn load(&self, ordering: Ordering) -> Option<Notification> {
292 let data = self.0.load(ordering);
293 match data {
294 NOTIFICATION_NONE => None,
295 NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
296 NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
297 NOTIFICATION_ALL => Some(Notification::All),
298 _ => unreachable!(),
299 }
300 }
301
302 /// Clears the notification.
303 /// This method is used by a `Notified` future to consume the
304 /// notification. It uses relaxed ordering and should be only
305 /// used once the atomic notification is no longer shared.
306 fn clear(&self) {
307 self.0.store(NOTIFICATION_NONE, Relaxed);
308 }
309}
310
311#[derive(Debug, PartialEq, Eq)]
312#[repr(usize)]
313enum NotifyOneStrategy {
314 Fifo,
315 Lifo,
316}
317
318#[derive(Debug, PartialEq, Eq)]
319#[repr(usize)]
320enum Notification {
321 One(NotifyOneStrategy),
322 All,
323}
324
325/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
326/// and gates the access to it on `notify.waiters` mutex. It also empties
327/// the list on drop.
328struct NotifyWaitersList<'a> {
329 list: GuardedWaitList,
330 is_empty: bool,
331 notify: &'a Notify,
332}
333
334impl<'a> NotifyWaitersList<'a> {
335 fn new(
336 unguarded_list: WaitList,
337 guard: Pin<&'a Waiter>,
338 notify: &'a Notify,
339 ) -> NotifyWaitersList<'a> {
340 let guard_ptr = NonNull::from(guard.get_ref());
341 let list = unguarded_list.into_guarded(guard_ptr);
342 NotifyWaitersList {
343 list,
344 is_empty: false,
345 notify,
346 }
347 }
348
349 /// Removes the last element from the guarded list. Modifying this list
350 /// requires an exclusive access to the main list in `Notify`.
351 fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
352 let result = self.list.pop_back();
353 if result.is_none() {
354 // Save information about emptiness to avoid waiting for lock
355 // in the destructor.
356 self.is_empty = true;
357 }
358 result
359 }
360}
361
362impl Drop for NotifyWaitersList<'_> {
363 fn drop(&mut self) {
364 // If the list is not empty, we unlink all waiters from it.
365 // We do not wake the waiters to avoid double panics.
366 if !self.is_empty {
367 let _lock_guard = self.notify.waiters.lock();
368 while let Some(waiter) = self.list.pop_back() {
369 // Safety: we never make mutable references to waiters.
370 let waiter = unsafe { waiter.as_ref() };
371 waiter.notification.store_release(Notification::All);
372 }
373 }
374 }
375}
376
377/// Future returned from [`Notify::notified()`].
378///
379/// This future is fused, so once it has completed, any future calls to poll
380/// will immediately return `Poll::Ready`.
381#[derive(Debug)]
382#[must_use = "futures do nothing unless you `.await` or poll them"]
383pub struct Notified<'a> {
384 /// The `Notify` being received on.
385 notify: &'a Notify,
386
387 /// The current state of the receiving process.
388 state: State,
389
390 /// Number of calls to `notify_waiters` at the time of creation.
391 notify_waiters_calls: usize,
392
393 /// Entry in the waiter `LinkedList`.
394 waiter: Waiter,
395}
396
397unsafe impl<'a> Send for Notified<'a> {}
398unsafe impl<'a> Sync for Notified<'a> {}
399
400#[derive(Debug)]
401enum State {
402 Init,
403 Waiting,
404 Done,
405}
406
407const NOTIFY_WAITERS_SHIFT: usize = 2;
408const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
409const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
410
411/// Initial "idle" state.
412const EMPTY: usize = 0;
413
414/// One or more threads are currently waiting to be notified.
415const WAITING: usize = 1;
416
417/// Pending notification.
418const NOTIFIED: usize = 2;
419
420fn set_state(data: usize, state: usize) -> usize {
421 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
422}
423
424fn get_state(data: usize) -> usize {
425 data & STATE_MASK
426}
427
428fn get_num_notify_waiters_calls(data: usize) -> usize {
429 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
430}
431
432fn inc_num_notify_waiters_calls(data: usize) -> usize {
433 data + (1 << NOTIFY_WAITERS_SHIFT)
434}
435
436fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
437 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
438}
439
440impl Notify {
441 /// Create a new `Notify`, initialized without a permit.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use tokio::sync::Notify;
447 ///
448 /// let notify = Notify::new();
449 /// ```
450 pub fn new() -> Notify {
451 Notify {
452 state: AtomicUsize::new(0),
453 waiters: Mutex::new(LinkedList::new()),
454 }
455 }
456
457 /// Create a new `Notify`, initialized without a permit.
458 ///
459 /// When using the `tracing` [unstable feature], a `Notify` created with
460 /// `const_new` will not be instrumented. As such, it will not be visible
461 /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
462 /// an instrumented object if that is needed.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// use tokio::sync::Notify;
468 ///
469 /// static NOTIFY: Notify = Notify::const_new();
470 /// ```
471 ///
472 /// [`tokio-console`]: https://github.com/tokio-rs/console
473 /// [unstable feature]: crate#unstable-features
474 #[cfg(not(all(loom, test)))]
475 pub const fn const_new() -> Notify {
476 Notify {
477 state: AtomicUsize::new(0),
478 waiters: Mutex::const_new(LinkedList::new()),
479 }
480 }
481
482 /// Wait for a notification.
483 ///
484 /// Equivalent to:
485 ///
486 /// ```ignore
487 /// async fn notified(&self);
488 /// ```
489 ///
490 /// Each `Notify` value holds a single permit. If a permit is available from
491 /// an earlier call to [`notify_one()`], then `notified().await` will complete
492 /// immediately, consuming that permit. Otherwise, `notified().await` waits
493 /// for a permit to be made available by the next call to `notify_one()`.
494 ///
495 /// The `Notified` future is not guaranteed to receive wakeups from calls to
496 /// `notify_one()` if it has not yet been polled. See the documentation for
497 /// [`Notified::enable()`] for more details.
498 ///
499 /// The `Notified` future is guaranteed to receive wakeups from
500 /// `notify_waiters()` as soon as it has been created, even if it has not
501 /// yet been polled.
502 ///
503 /// [`notify_one()`]: Notify::notify_one
504 /// [`Notified::enable()`]: Notified::enable
505 ///
506 /// # Cancel safety
507 ///
508 /// This method uses a queue to fairly distribute notifications in the order
509 /// they were requested. Cancelling a call to `notified` makes you lose your
510 /// place in the queue.
511 ///
512 /// # Examples
513 ///
514 /// ```
515 /// use tokio::sync::Notify;
516 /// use std::sync::Arc;
517 ///
518 /// #[tokio::main]
519 /// async fn main() {
520 /// let notify = Arc::new(Notify::new());
521 /// let notify2 = notify.clone();
522 ///
523 /// tokio::spawn(async move {
524 /// notify2.notified().await;
525 /// println!("received notification");
526 /// });
527 ///
528 /// println!("sending notification");
529 /// notify.notify_one();
530 /// }
531 /// ```
532 pub fn notified(&self) -> Notified<'_> {
533 // we load the number of times notify_waiters
534 // was called and store that in the future.
535 let state = self.state.load(SeqCst);
536 Notified {
537 notify: self,
538 state: State::Init,
539 notify_waiters_calls: get_num_notify_waiters_calls(state),
540 waiter: Waiter::new(),
541 }
542 }
543
544 /// Notifies the first waiting task.
545 ///
546 /// If a task is currently waiting, that task is notified. Otherwise, a
547 /// permit is stored in this `Notify` value and the **next** call to
548 /// [`notified().await`] will complete immediately consuming the permit made
549 /// available by this call to `notify_one()`.
550 ///
551 /// At most one permit may be stored by `Notify`. Many sequential calls to
552 /// `notify_one` will result in a single permit being stored. The next call to
553 /// `notified().await` will complete immediately, but the one after that
554 /// will wait.
555 ///
556 /// [`notified().await`]: Notify::notified()
557 ///
558 /// # Examples
559 ///
560 /// ```
561 /// use tokio::sync::Notify;
562 /// use std::sync::Arc;
563 ///
564 /// #[tokio::main]
565 /// async fn main() {
566 /// let notify = Arc::new(Notify::new());
567 /// let notify2 = notify.clone();
568 ///
569 /// tokio::spawn(async move {
570 /// notify2.notified().await;
571 /// println!("received notification");
572 /// });
573 ///
574 /// println!("sending notification");
575 /// notify.notify_one();
576 /// }
577 /// ```
578 // Alias for old name in 0.x
579 #[cfg_attr(docsrs, doc(alias = "notify"))]
580 pub fn notify_one(&self) {
581 self.notify_with_strategy(NotifyOneStrategy::Fifo);
582 }
583
584 /// Notifies the last waiting task.
585 ///
586 /// This function behaves similar to `notify_one`. The only difference is that it wakes
587 /// the most recently added waiter instead of the oldest waiter.
588 ///
589 /// Check the [`notify_one()`] documentation for more info and
590 /// examples.
591 ///
592 /// [`notify_one()`]: Notify::notify_one
593 pub fn notify_last(&self) {
594 self.notify_with_strategy(NotifyOneStrategy::Lifo);
595 }
596
597 fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
598 // Load the current state
599 let mut curr = self.state.load(SeqCst);
600
601 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
602 while let EMPTY | NOTIFIED = get_state(curr) {
603 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
604 // happens-before synchronization must happen between this atomic
605 // operation and a task calling `notified().await`.
606 let new = set_state(curr, NOTIFIED);
607 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
608
609 match res {
610 // No waiters, no further work to do
611 Ok(_) => return,
612 Err(actual) => {
613 curr = actual;
614 }
615 }
616 }
617
618 // There are waiters, the lock must be acquired to notify.
619 let mut waiters = self.waiters.lock();
620
621 // The state must be reloaded while the lock is held. The state may only
622 // transition out of WAITING while the lock is held.
623 curr = self.state.load(SeqCst);
624
625 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
626 drop(waiters);
627 waker.wake();
628 }
629 }
630
631 /// Notifies all waiting tasks.
632 ///
633 /// If a task is currently waiting, that task is notified. Unlike with
634 /// `notify_one()`, no permit is stored to be used by the next call to
635 /// `notified().await`. The purpose of this method is to notify all
636 /// already registered waiters. Registering for notification is done by
637 /// acquiring an instance of the `Notified` future via calling `notified()`.
638 ///
639 /// # Examples
640 ///
641 /// ```
642 /// use tokio::sync::Notify;
643 /// use std::sync::Arc;
644 ///
645 /// #[tokio::main]
646 /// async fn main() {
647 /// let notify = Arc::new(Notify::new());
648 /// let notify2 = notify.clone();
649 ///
650 /// let notified1 = notify.notified();
651 /// let notified2 = notify.notified();
652 ///
653 /// let handle = tokio::spawn(async move {
654 /// println!("sending notifications");
655 /// notify2.notify_waiters();
656 /// });
657 ///
658 /// notified1.await;
659 /// notified2.await;
660 /// println!("received notifications");
661 /// }
662 /// ```
663 pub fn notify_waiters(&self) {
664 let mut waiters = self.waiters.lock();
665
666 // The state must be loaded while the lock is held. The state may only
667 // transition out of WAITING while the lock is held.
668 let curr = self.state.load(SeqCst);
669
670 if matches!(get_state(curr), EMPTY | NOTIFIED) {
671 // There are no waiting tasks. All we need to do is increment the
672 // number of times this method was called.
673 atomic_inc_num_notify_waiters_calls(&self.state);
674 return;
675 }
676
677 // Increment the number of times this method was called
678 // and transition to empty.
679 let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
680 self.state.store(new_state, SeqCst);
681
682 // It is critical for `GuardedLinkedList` safety that the guard node is
683 // pinned in memory and is not dropped until the guarded list is dropped.
684 let guard = Waiter::new();
685 pin!(guard);
686
687 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
688 // underneath to allow every waiter to safely remove itself from it.
689 //
690 // * This list will be still guarded by the `waiters` lock.
691 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
692 // * This wrapper will empty the list on drop. It is critical for safety
693 // that we will not leave any list entry with a pointer to the local
694 // guard node after this function returns / panics.
695 let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
696
697 let mut wakers = WakeList::new();
698 'outer: loop {
699 while wakers.can_push() {
700 match list.pop_back_locked(&mut waiters) {
701 Some(waiter) => {
702 // Safety: we never make mutable references to waiters.
703 let waiter = unsafe { waiter.as_ref() };
704
705 // Safety: we hold the lock, so we can access the waker.
706 if let Some(waker) =
707 unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
708 {
709 wakers.push(waker);
710 }
711
712 // This waiter is unlinked and will not be shared ever again, release it.
713 waiter.notification.store_release(Notification::All);
714 }
715 None => {
716 break 'outer;
717 }
718 }
719 }
720
721 // Release the lock before notifying.
722 drop(waiters);
723
724 // One of the wakers may panic, but the remaining waiters will still
725 // be unlinked from the list in `NotifyWaitersList` destructor.
726 wakers.wake_all();
727
728 // Acquire the lock again.
729 waiters = self.waiters.lock();
730 }
731
732 // Release the lock before notifying
733 drop(waiters);
734
735 wakers.wake_all();
736 }
737}
738
739impl Default for Notify {
740 fn default() -> Notify {
741 Notify::new()
742 }
743}
744
745impl UnwindSafe for Notify {}
746impl RefUnwindSafe for Notify {}
747
748fn notify_locked(
749 waiters: &mut WaitList,
750 state: &AtomicUsize,
751 curr: usize,
752 strategy: NotifyOneStrategy,
753) -> Option<Waker> {
754 match get_state(curr) {
755 EMPTY | NOTIFIED => {
756 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
757
758 match res {
759 Ok(_) => None,
760 Err(actual) => {
761 let actual_state = get_state(actual);
762 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
763 state.store(set_state(actual, NOTIFIED), SeqCst);
764 None
765 }
766 }
767 }
768 WAITING => {
769 // At this point, it is guaranteed that the state will not
770 // concurrently change as holding the lock is required to
771 // transition **out** of `WAITING`.
772 //
773 // Get a pending waiter using one of the available dequeue strategies.
774 let waiter = match strategy {
775 NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
776 NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
777 };
778
779 // Safety: we never make mutable references to waiters.
780 let waiter = unsafe { waiter.as_ref() };
781
782 // Safety: we hold the lock, so we can access the waker.
783 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
784
785 // This waiter is unlinked and will not be shared ever again, release it.
786 waiter
787 .notification
788 .store_release(Notification::One(strategy));
789
790 if waiters.is_empty() {
791 // As this the **final** waiter in the list, the state
792 // must be transitioned to `EMPTY`. As transitioning
793 // **from** `WAITING` requires the lock to be held, a
794 // `store` is sufficient.
795 state.store(set_state(curr, EMPTY), SeqCst);
796 }
797 waker
798 }
799 _ => unreachable!(),
800 }
801}
802
803// ===== impl Notified =====
804
805impl Notified<'_> {
806 /// Adds this future to the list of futures that are ready to receive
807 /// wakeups from calls to [`notify_one`].
808 ///
809 /// Polling the future also adds it to the list, so this method should only
810 /// be used if you want to add the future to the list before the first call
811 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
812 /// that no `Waker` is registered.)
813 ///
814 /// This has no effect on notifications sent using [`notify_waiters`], which
815 /// are received as long as they happen after the creation of the `Notified`
816 /// regardless of whether `enable` or `poll` has been called.
817 ///
818 /// This method returns true if the `Notified` is ready. This happens in the
819 /// following situations:
820 ///
821 /// 1. The `notify_waiters` method was called between the creation of the
822 /// `Notified` and the call to this method.
823 /// 2. This is the first call to `enable` or `poll` on this future, and the
824 /// `Notify` was holding a permit from a previous call to `notify_one`.
825 /// The call consumes the permit in that case.
826 /// 3. The future has previously been enabled or polled, and it has since
827 /// then been marked ready by either consuming a permit from the
828 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
829 /// removed it from the list of futures ready to receive wakeups.
830 ///
831 /// If this method returns true, any future calls to poll on the same future
832 /// will immediately return `Poll::Ready`.
833 ///
834 /// # Examples
835 ///
836 /// Unbound multi-producer multi-consumer (mpmc) channel.
837 ///
838 /// The call to `enable` is important because otherwise if you have two
839 /// calls to `recv` and two calls to `send` in parallel, the following could
840 /// happen:
841 ///
842 /// 1. Both calls to `try_recv` return `None`.
843 /// 2. Both new elements are added to the vector.
844 /// 3. The `notify_one` method is called twice, adding only a single
845 /// permit to the `Notify`.
846 /// 4. Both calls to `recv` reach the `Notified` future. One of them
847 /// consumes the permit, and the other sleeps forever.
848 ///
849 /// By adding the `Notified` futures to the list by calling `enable` before
850 /// `try_recv`, the `notify_one` calls in step three would remove the
851 /// futures from the list and mark them notified instead of adding a permit
852 /// to the `Notify`. This ensures that both futures are woken.
853 ///
854 /// ```
855 /// use tokio::sync::Notify;
856 ///
857 /// use std::collections::VecDeque;
858 /// use std::sync::Mutex;
859 ///
860 /// struct Channel<T> {
861 /// messages: Mutex<VecDeque<T>>,
862 /// notify_on_sent: Notify,
863 /// }
864 ///
865 /// impl<T> Channel<T> {
866 /// pub fn send(&self, msg: T) {
867 /// let mut locked_queue = self.messages.lock().unwrap();
868 /// locked_queue.push_back(msg);
869 /// drop(locked_queue);
870 ///
871 /// // Send a notification to one of the calls currently
872 /// // waiting in a call to `recv`.
873 /// self.notify_on_sent.notify_one();
874 /// }
875 ///
876 /// pub fn try_recv(&self) -> Option<T> {
877 /// let mut locked_queue = self.messages.lock().unwrap();
878 /// locked_queue.pop_front()
879 /// }
880 ///
881 /// pub async fn recv(&self) -> T {
882 /// let future = self.notify_on_sent.notified();
883 /// tokio::pin!(future);
884 ///
885 /// loop {
886 /// // Make sure that no wakeup is lost if we get
887 /// // `None` from `try_recv`.
888 /// future.as_mut().enable();
889 ///
890 /// if let Some(msg) = self.try_recv() {
891 /// return msg;
892 /// }
893 ///
894 /// // Wait for a call to `notify_one`.
895 /// //
896 /// // This uses `.as_mut()` to avoid consuming the future,
897 /// // which lets us call `Pin::set` below.
898 /// future.as_mut().await;
899 ///
900 /// // Reset the future in case another call to
901 /// // `try_recv` got the message before us.
902 /// future.set(self.notify_on_sent.notified());
903 /// }
904 /// }
905 /// }
906 /// ```
907 ///
908 /// [`notify_one`]: Notify::notify_one()
909 /// [`notify_waiters`]: Notify::notify_waiters()
910 pub fn enable(self: Pin<&mut Self>) -> bool {
911 self.poll_notified(None).is_ready()
912 }
913
914 /// A custom `project` implementation is used in place of `pin-project-lite`
915 /// as a custom drop implementation is needed.
916 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
917 unsafe {
918 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
919
920 is_unpin::<&Notify>();
921 is_unpin::<State>();
922 is_unpin::<usize>();
923
924 let me = self.get_unchecked_mut();
925 (
926 me.notify,
927 &mut me.state,
928 &me.notify_waiters_calls,
929 &me.waiter,
930 )
931 }
932 }
933
934 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
935 let (notify, state, notify_waiters_calls, waiter) = self.project();
936
937 'outer_loop: loop {
938 match *state {
939 State::Init => {
940 let curr = notify.state.load(SeqCst);
941
942 // Optimistically try acquiring a pending notification
943 let res = notify.state.compare_exchange(
944 set_state(curr, NOTIFIED),
945 set_state(curr, EMPTY),
946 SeqCst,
947 SeqCst,
948 );
949
950 if res.is_ok() {
951 // Acquired the notification
952 *state = State::Done;
953 continue 'outer_loop;
954 }
955
956 // Clone the waker before locking, a waker clone can be
957 // triggering arbitrary code.
958 let waker = waker.cloned();
959
960 // Acquire the lock and attempt to transition to the waiting
961 // state.
962 let mut waiters = notify.waiters.lock();
963
964 // Reload the state with the lock held
965 let mut curr = notify.state.load(SeqCst);
966
967 // if notify_waiters has been called after the future
968 // was created, then we are done
969 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
970 *state = State::Done;
971 continue 'outer_loop;
972 }
973
974 // Transition the state to WAITING.
975 loop {
976 match get_state(curr) {
977 EMPTY => {
978 // Transition to WAITING
979 let res = notify.state.compare_exchange(
980 set_state(curr, EMPTY),
981 set_state(curr, WAITING),
982 SeqCst,
983 SeqCst,
984 );
985
986 if let Err(actual) = res {
987 assert_eq!(get_state(actual), NOTIFIED);
988 curr = actual;
989 } else {
990 break;
991 }
992 }
993 WAITING => break,
994 NOTIFIED => {
995 // Try consuming the notification
996 let res = notify.state.compare_exchange(
997 set_state(curr, NOTIFIED),
998 set_state(curr, EMPTY),
999 SeqCst,
1000 SeqCst,
1001 );
1002
1003 match res {
1004 Ok(_) => {
1005 // Acquired the notification
1006 *state = State::Done;
1007 continue 'outer_loop;
1008 }
1009 Err(actual) => {
1010 assert_eq!(get_state(actual), EMPTY);
1011 curr = actual;
1012 }
1013 }
1014 }
1015 _ => unreachable!(),
1016 }
1017 }
1018
1019 let mut old_waker = None;
1020 if waker.is_some() {
1021 // Safety: called while locked.
1022 //
1023 // The use of `old_waiter` here is not necessary, as the field is always
1024 // None when we reach this line.
1025 unsafe {
1026 old_waker =
1027 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1028 }
1029 }
1030
1031 // Insert the waiter into the linked list
1032 waiters.push_front(NonNull::from(waiter));
1033
1034 *state = State::Waiting;
1035
1036 drop(waiters);
1037 drop(old_waker);
1038
1039 return Poll::Pending;
1040 }
1041 State::Waiting => {
1042 #[cfg(tokio_taskdump)]
1043 if let Some(waker) = waker {
1044 let mut ctx = Context::from_waker(waker);
1045 std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1046 }
1047
1048 if waiter.notification.load(Acquire).is_some() {
1049 // Safety: waiter is already unlinked and will not be shared again,
1050 // so we have an exclusive access to `waker`.
1051 drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1052
1053 waiter.notification.clear();
1054 *state = State::Done;
1055 return Poll::Ready(());
1056 }
1057
1058 // Our waiter was not notified, implying it is still stored in a waiter
1059 // list (guarded by `notify.waiters`). In order to access the waker
1060 // fields, we must acquire the lock.
1061
1062 let mut old_waker = None;
1063 let mut waiters = notify.waiters.lock();
1064
1065 // We hold the lock and notifications are set only with the lock held,
1066 // so this can be relaxed, because the happens-before relationship is
1067 // established through the mutex.
1068 if waiter.notification.load(Relaxed).is_some() {
1069 // Safety: waiter is already unlinked and will not be shared again,
1070 // so we have an exclusive access to `waker`.
1071 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1072
1073 waiter.notification.clear();
1074
1075 // Drop the old waker after releasing the lock.
1076 drop(waiters);
1077 drop(old_waker);
1078
1079 *state = State::Done;
1080 return Poll::Ready(());
1081 }
1082
1083 // Load the state with the lock held.
1084 let curr = notify.state.load(SeqCst);
1085
1086 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1087 // Before we add a waiter to the list we check if these numbers are
1088 // different while holding the lock. If these numbers are different now,
1089 // it means that there is a call to `notify_waiters` in progress and this
1090 // waiter must be contained by a guarded list used in `notify_waiters`.
1091 // We can treat the waiter as notified and remove it from the list, as
1092 // it would have been notified in the `notify_waiters` call anyways.
1093
1094 // Safety: we hold the lock, so we can modify the waker.
1095 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1096
1097 // Safety: we hold the lock, so we have an exclusive access to the list.
1098 // The list is used in `notify_waiters`, so it must be guarded.
1099 unsafe { waiters.remove(NonNull::from(waiter)) };
1100
1101 *state = State::Done;
1102 } else {
1103 // Safety: we hold the lock, so we can modify the waker.
1104 unsafe {
1105 waiter.waker.with_mut(|v| {
1106 if let Some(waker) = waker {
1107 let should_update = match &*v {
1108 Some(current_waker) => !current_waker.will_wake(waker),
1109 None => true,
1110 };
1111 if should_update {
1112 old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1113 }
1114 }
1115 });
1116 }
1117
1118 // Drop the old waker after releasing the lock.
1119 drop(waiters);
1120 drop(old_waker);
1121
1122 return Poll::Pending;
1123 }
1124
1125 // Explicit drop of the lock to indicate the scope that the
1126 // lock is held. Because holding the lock is required to
1127 // ensure safe access to fields not held within the lock, it
1128 // is helpful to visualize the scope of the critical
1129 // section.
1130 drop(waiters);
1131
1132 // Drop the old waker after releasing the lock.
1133 drop(old_waker);
1134 }
1135 State::Done => {
1136 #[cfg(tokio_taskdump)]
1137 if let Some(waker) = waker {
1138 let mut ctx = Context::from_waker(waker);
1139 std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1140 }
1141 return Poll::Ready(());
1142 }
1143 }
1144 }
1145 }
1146}
1147
1148impl Future for Notified<'_> {
1149 type Output = ();
1150
1151 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1152 self.poll_notified(Some(cx.waker()))
1153 }
1154}
1155
1156impl Drop for Notified<'_> {
1157 fn drop(&mut self) {
1158 // Safety: The type only transitions to a "Waiting" state when pinned.
1159 let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1160
1161 // This is where we ensure safety. The `Notified` value is being
1162 // dropped, which means we must ensure that the waiter entry is no
1163 // longer stored in the linked list.
1164 if matches!(*state, State::Waiting) {
1165 let mut waiters = notify.waiters.lock();
1166 let mut notify_state = notify.state.load(SeqCst);
1167
1168 // We hold the lock, so this field is not concurrently accessed by
1169 // `notify_*` functions and we can use the relaxed ordering.
1170 let notification = waiter.notification.load(Relaxed);
1171
1172 // remove the entry from the list (if not already removed)
1173 //
1174 // Safety: we hold the lock, so we have an exclusive access to every list the
1175 // waiter may be contained in. If the node is not contained in the `waiters`
1176 // list, then it is contained by a guarded list used by `notify_waiters`.
1177 unsafe { waiters.remove(NonNull::from(waiter)) };
1178
1179 if waiters.is_empty() && get_state(notify_state) == WAITING {
1180 notify_state = set_state(notify_state, EMPTY);
1181 notify.state.store(notify_state, SeqCst);
1182 }
1183
1184 // See if the node was notified but not received. In this case, if
1185 // the notification was triggered via `notify_one`, it must be sent
1186 // to the next waiter.
1187 if let Some(Notification::One(strategy)) = notification {
1188 if let Some(waker) =
1189 notify_locked(&mut waiters, ¬ify.state, notify_state, strategy)
1190 {
1191 drop(waiters);
1192 waker.wake();
1193 }
1194 }
1195 }
1196 }
1197}
1198
1199/// # Safety
1200///
1201/// `Waiter` is forced to be !Unpin.
1202unsafe impl linked_list::Link for Waiter {
1203 type Handle = NonNull<Waiter>;
1204 type Target = Waiter;
1205
1206 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1207 *handle
1208 }
1209
1210 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1211 ptr
1212 }
1213
1214 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1215 Waiter::addr_of_pointers(target)
1216 }
1217}
1218
1219fn is_unpin<T: Unpin>() {}