tokio/sync/
notify.rs

1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::task::{Context, Poll, Waker};
21
22type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25/// Notifies a single task to wake up.
26///
27/// `Notify` provides a basic mechanism to notify a single task of an event.
28/// `Notify` itself does not carry any data. Instead, it is to be used to signal
29/// another task to perform an operation.
30///
31/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32/// [`notified().await`] method waits for a permit to become available, and
33/// [`notify_one()`] sets a permit **if there currently are no available
34/// permits**.
35///
36/// The synchronization details of `Notify` are similar to
37/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38/// value contains a single permit. [`notified().await`] waits for the permit to
39/// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
40/// the permit, waking a pending task if there is one.
41///
42/// If `notify_one()` is called **before** `notified().await`, then the next
43/// call to `notified().await` will complete immediately, consuming the permit.
44/// Any subsequent calls to `notified().await` will wait for a new permit.
45///
46/// If `notify_one()` is called **multiple** times before `notified().await`,
47/// only a **single** permit is stored. The next call to `notified().await` will
48/// complete immediately, but the one after will wait for a new permit.
49///
50/// # Examples
51///
52/// Basic usage.
53///
54/// ```
55/// use tokio::sync::Notify;
56/// use std::sync::Arc;
57///
58/// #[tokio::main]
59/// async fn main() {
60///     let notify = Arc::new(Notify::new());
61///     let notify2 = notify.clone();
62///
63///     let handle = tokio::spawn(async move {
64///         notify2.notified().await;
65///         println!("received notification");
66///     });
67///
68///     println!("sending notification");
69///     notify.notify_one();
70///
71///     // Wait for task to receive notification.
72///     handle.await.unwrap();
73/// }
74/// ```
75///
76/// Unbound multi-producer single-consumer (mpsc) channel.
77///
78/// No wakeups can be lost when using this channel because the call to
79/// `notify_one()` will store a permit in the `Notify`, which the following call
80/// to `notified()` will consume.
81///
82/// ```
83/// use tokio::sync::Notify;
84///
85/// use std::collections::VecDeque;
86/// use std::sync::Mutex;
87///
88/// struct Channel<T> {
89///     values: Mutex<VecDeque<T>>,
90///     notify: Notify,
91/// }
92///
93/// impl<T> Channel<T> {
94///     pub fn send(&self, value: T) {
95///         self.values.lock().unwrap()
96///             .push_back(value);
97///
98///         // Notify the consumer a value is available
99///         self.notify.notify_one();
100///     }
101///
102///     // This is a single-consumer channel, so several concurrent calls to
103///     // `recv` are not allowed.
104///     pub async fn recv(&self) -> T {
105///         loop {
106///             // Drain values
107///             if let Some(value) = self.values.lock().unwrap().pop_front() {
108///                 return value;
109///             }
110///
111///             // Wait for values to be available
112///             self.notify.notified().await;
113///         }
114///     }
115/// }
116/// ```
117///
118/// Unbound multi-producer multi-consumer (mpmc) channel.
119///
120/// The call to [`enable`] is important because otherwise if you have two
121/// calls to `recv` and two calls to `send` in parallel, the following could
122/// happen:
123///
124///  1. Both calls to `try_recv` return `None`.
125///  2. Both new elements are added to the vector.
126///  3. The `notify_one` method is called twice, adding only a single
127///     permit to the `Notify`.
128///  4. Both calls to `recv` reach the `Notified` future. One of them
129///     consumes the permit, and the other sleeps forever.
130///
131/// By adding the `Notified` futures to the list by calling `enable` before
132/// `try_recv`, the `notify_one` calls in step three would remove the
133/// futures from the list and mark them notified instead of adding a permit
134/// to the `Notify`. This ensures that both futures are woken.
135///
136/// Notice that this failure can only happen if there are two concurrent calls
137/// to `recv`. This is why the mpsc example above does not require a call to
138/// `enable`.
139///
140/// ```
141/// use tokio::sync::Notify;
142///
143/// use std::collections::VecDeque;
144/// use std::sync::Mutex;
145///
146/// struct Channel<T> {
147///     messages: Mutex<VecDeque<T>>,
148///     notify_on_sent: Notify,
149/// }
150///
151/// impl<T> Channel<T> {
152///     pub fn send(&self, msg: T) {
153///         let mut locked_queue = self.messages.lock().unwrap();
154///         locked_queue.push_back(msg);
155///         drop(locked_queue);
156///
157///         // Send a notification to one of the calls currently
158///         // waiting in a call to `recv`.
159///         self.notify_on_sent.notify_one();
160///     }
161///
162///     pub fn try_recv(&self) -> Option<T> {
163///         let mut locked_queue = self.messages.lock().unwrap();
164///         locked_queue.pop_front()
165///     }
166///
167///     pub async fn recv(&self) -> T {
168///         let future = self.notify_on_sent.notified();
169///         tokio::pin!(future);
170///
171///         loop {
172///             // Make sure that no wakeup is lost if we get
173///             // `None` from `try_recv`.
174///             future.as_mut().enable();
175///
176///             if let Some(msg) = self.try_recv() {
177///                 return msg;
178///             }
179///
180///             // Wait for a call to `notify_one`.
181///             //
182///             // This uses `.as_mut()` to avoid consuming the future,
183///             // which lets us call `Pin::set` below.
184///             future.as_mut().await;
185///
186///             // Reset the future in case another call to
187///             // `try_recv` got the message before us.
188///             future.set(self.notify_on_sent.notified());
189///         }
190///     }
191/// }
192/// ```
193///
194/// [park]: std::thread::park
195/// [unpark]: std::thread::Thread::unpark
196/// [`notified().await`]: Notify::notified()
197/// [`notify_one()`]: Notify::notify_one()
198/// [`enable`]: Notified::enable()
199/// [`Semaphore`]: crate::sync::Semaphore
200#[derive(Debug)]
201pub struct Notify {
202    // `state` uses 2 bits to store one of `EMPTY`,
203    // `WAITING` or `NOTIFIED`. The rest of the bits
204    // are used to store the number of times `notify_waiters`
205    // was called.
206    //
207    // Throughout the code there are two assumptions:
208    // - state can be transitioned *from* `WAITING` only if
209    //   `waiters` lock is held
210    // - number of times `notify_waiters` was called can
211    //   be modified only if `waiters` lock is held
212    state: AtomicUsize,
213    waiters: Mutex<WaitList>,
214}
215
216#[derive(Debug)]
217struct Waiter {
218    /// Intrusive linked-list pointers.
219    pointers: linked_list::Pointers<Waiter>,
220
221    /// Waiting task's waker. Depending on the value of `notification`,
222    /// this field is either protected by the `waiters` lock in
223    /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224    waker: UnsafeCell<Option<Waker>>,
225
226    /// Notification for this waiter. Uses 2 bits to store if and how was
227    /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
228    /// the rest of it is unused.
229    /// * if it's `None`, then `waker` is protected by the `waiters` lock.
230    /// * if it's `Some`, then `waker` is exclusively owned by the
231    ///   enclosing `Waiter` and can be accessed without locking.
232    notification: AtomicNotification,
233
234    /// Should not be `Unpin`.
235    _p: PhantomPinned,
236}
237
238impl Waiter {
239    fn new() -> Waiter {
240        Waiter {
241            pointers: linked_list::Pointers::new(),
242            waker: UnsafeCell::new(None),
243            notification: AtomicNotification::none(),
244            _p: PhantomPinned,
245        }
246    }
247}
248
249generate_addr_of_methods! {
250    impl<> Waiter {
251        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
252            &self.pointers
253        }
254    }
255}
256
257// No notification.
258const NOTIFICATION_NONE: usize = 0b000;
259
260// Notification type used by `notify_one`.
261const NOTIFICATION_ONE: usize = 0b001;
262
263// Notification type used by `notify_last`.
264const NOTIFICATION_LAST: usize = 0b101;
265
266// Notification type used by `notify_waiters`.
267const NOTIFICATION_ALL: usize = 0b010;
268
269/// Notification for a `Waiter`.
270/// This struct is equivalent to `Option<Notification>`, but uses
271/// `AtomicUsize` inside for atomic operations.
272#[derive(Debug)]
273struct AtomicNotification(AtomicUsize);
274
275impl AtomicNotification {
276    fn none() -> Self {
277        AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
278    }
279
280    /// Store-release a notification.
281    /// This method should be called exactly once.
282    fn store_release(&self, notification: Notification) {
283        let data: usize = match notification {
284            Notification::All => NOTIFICATION_ALL,
285            Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
286            Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
287        };
288        self.0.store(data, Release);
289    }
290
291    fn load(&self, ordering: Ordering) -> Option<Notification> {
292        let data = self.0.load(ordering);
293        match data {
294            NOTIFICATION_NONE => None,
295            NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
296            NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
297            NOTIFICATION_ALL => Some(Notification::All),
298            _ => unreachable!(),
299        }
300    }
301
302    /// Clears the notification.
303    /// This method is used by a `Notified` future to consume the
304    /// notification. It uses relaxed ordering and should be only
305    /// used once the atomic notification is no longer shared.
306    fn clear(&self) {
307        self.0.store(NOTIFICATION_NONE, Relaxed);
308    }
309}
310
311#[derive(Debug, PartialEq, Eq)]
312#[repr(usize)]
313enum NotifyOneStrategy {
314    Fifo,
315    Lifo,
316}
317
318#[derive(Debug, PartialEq, Eq)]
319#[repr(usize)]
320enum Notification {
321    One(NotifyOneStrategy),
322    All,
323}
324
325/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
326/// and gates the access to it on `notify.waiters` mutex. It also empties
327/// the list on drop.
328struct NotifyWaitersList<'a> {
329    list: GuardedWaitList,
330    is_empty: bool,
331    notify: &'a Notify,
332}
333
334impl<'a> NotifyWaitersList<'a> {
335    fn new(
336        unguarded_list: WaitList,
337        guard: Pin<&'a Waiter>,
338        notify: &'a Notify,
339    ) -> NotifyWaitersList<'a> {
340        let guard_ptr = NonNull::from(guard.get_ref());
341        let list = unguarded_list.into_guarded(guard_ptr);
342        NotifyWaitersList {
343            list,
344            is_empty: false,
345            notify,
346        }
347    }
348
349    /// Removes the last element from the guarded list. Modifying this list
350    /// requires an exclusive access to the main list in `Notify`.
351    fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
352        let result = self.list.pop_back();
353        if result.is_none() {
354            // Save information about emptiness to avoid waiting for lock
355            // in the destructor.
356            self.is_empty = true;
357        }
358        result
359    }
360}
361
362impl Drop for NotifyWaitersList<'_> {
363    fn drop(&mut self) {
364        // If the list is not empty, we unlink all waiters from it.
365        // We do not wake the waiters to avoid double panics.
366        if !self.is_empty {
367            let _lock_guard = self.notify.waiters.lock();
368            while let Some(waiter) = self.list.pop_back() {
369                // Safety: we never make mutable references to waiters.
370                let waiter = unsafe { waiter.as_ref() };
371                waiter.notification.store_release(Notification::All);
372            }
373        }
374    }
375}
376
377/// Future returned from [`Notify::notified()`].
378///
379/// This future is fused, so once it has completed, any future calls to poll
380/// will immediately return `Poll::Ready`.
381#[derive(Debug)]
382#[must_use = "futures do nothing unless you `.await` or poll them"]
383pub struct Notified<'a> {
384    /// The `Notify` being received on.
385    notify: &'a Notify,
386
387    /// The current state of the receiving process.
388    state: State,
389
390    /// Number of calls to `notify_waiters` at the time of creation.
391    notify_waiters_calls: usize,
392
393    /// Entry in the waiter `LinkedList`.
394    waiter: Waiter,
395}
396
397unsafe impl<'a> Send for Notified<'a> {}
398unsafe impl<'a> Sync for Notified<'a> {}
399
400#[derive(Debug)]
401enum State {
402    Init,
403    Waiting,
404    Done,
405}
406
407const NOTIFY_WAITERS_SHIFT: usize = 2;
408const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
409const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
410
411/// Initial "idle" state.
412const EMPTY: usize = 0;
413
414/// One or more threads are currently waiting to be notified.
415const WAITING: usize = 1;
416
417/// Pending notification.
418const NOTIFIED: usize = 2;
419
420fn set_state(data: usize, state: usize) -> usize {
421    (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
422}
423
424fn get_state(data: usize) -> usize {
425    data & STATE_MASK
426}
427
428fn get_num_notify_waiters_calls(data: usize) -> usize {
429    (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
430}
431
432fn inc_num_notify_waiters_calls(data: usize) -> usize {
433    data + (1 << NOTIFY_WAITERS_SHIFT)
434}
435
436fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
437    data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
438}
439
440impl Notify {
441    /// Create a new `Notify`, initialized without a permit.
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// use tokio::sync::Notify;
447    ///
448    /// let notify = Notify::new();
449    /// ```
450    pub fn new() -> Notify {
451        Notify {
452            state: AtomicUsize::new(0),
453            waiters: Mutex::new(LinkedList::new()),
454        }
455    }
456
457    /// Create a new `Notify`, initialized without a permit.
458    ///
459    /// When using the `tracing` [unstable feature], a `Notify` created with
460    /// `const_new` will not be instrumented. As such, it will not be visible
461    /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
462    /// an instrumented object if that is needed.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// use tokio::sync::Notify;
468    ///
469    /// static NOTIFY: Notify = Notify::const_new();
470    /// ```
471    ///
472    /// [`tokio-console`]: https://github.com/tokio-rs/console
473    /// [unstable feature]: crate#unstable-features
474    #[cfg(not(all(loom, test)))]
475    pub const fn const_new() -> Notify {
476        Notify {
477            state: AtomicUsize::new(0),
478            waiters: Mutex::const_new(LinkedList::new()),
479        }
480    }
481
482    /// Wait for a notification.
483    ///
484    /// Equivalent to:
485    ///
486    /// ```ignore
487    /// async fn notified(&self);
488    /// ```
489    ///
490    /// Each `Notify` value holds a single permit. If a permit is available from
491    /// an earlier call to [`notify_one()`], then `notified().await` will complete
492    /// immediately, consuming that permit. Otherwise, `notified().await` waits
493    /// for a permit to be made available by the next call to `notify_one()`.
494    ///
495    /// The `Notified` future is not guaranteed to receive wakeups from calls to
496    /// `notify_one()` if it has not yet been polled. See the documentation for
497    /// [`Notified::enable()`] for more details.
498    ///
499    /// The `Notified` future is guaranteed to receive wakeups from
500    /// `notify_waiters()` as soon as it has been created, even if it has not
501    /// yet been polled.
502    ///
503    /// [`notify_one()`]: Notify::notify_one
504    /// [`Notified::enable()`]: Notified::enable
505    ///
506    /// # Cancel safety
507    ///
508    /// This method uses a queue to fairly distribute notifications in the order
509    /// they were requested. Cancelling a call to `notified` makes you lose your
510    /// place in the queue.
511    ///
512    /// # Examples
513    ///
514    /// ```
515    /// use tokio::sync::Notify;
516    /// use std::sync::Arc;
517    ///
518    /// #[tokio::main]
519    /// async fn main() {
520    ///     let notify = Arc::new(Notify::new());
521    ///     let notify2 = notify.clone();
522    ///
523    ///     tokio::spawn(async move {
524    ///         notify2.notified().await;
525    ///         println!("received notification");
526    ///     });
527    ///
528    ///     println!("sending notification");
529    ///     notify.notify_one();
530    /// }
531    /// ```
532    pub fn notified(&self) -> Notified<'_> {
533        // we load the number of times notify_waiters
534        // was called and store that in the future.
535        let state = self.state.load(SeqCst);
536        Notified {
537            notify: self,
538            state: State::Init,
539            notify_waiters_calls: get_num_notify_waiters_calls(state),
540            waiter: Waiter::new(),
541        }
542    }
543
544    /// Notifies the first waiting task.
545    ///
546    /// If a task is currently waiting, that task is notified. Otherwise, a
547    /// permit is stored in this `Notify` value and the **next** call to
548    /// [`notified().await`] will complete immediately consuming the permit made
549    /// available by this call to `notify_one()`.
550    ///
551    /// At most one permit may be stored by `Notify`. Many sequential calls to
552    /// `notify_one` will result in a single permit being stored. The next call to
553    /// `notified().await` will complete immediately, but the one after that
554    /// will wait.
555    ///
556    /// [`notified().await`]: Notify::notified()
557    ///
558    /// # Examples
559    ///
560    /// ```
561    /// use tokio::sync::Notify;
562    /// use std::sync::Arc;
563    ///
564    /// #[tokio::main]
565    /// async fn main() {
566    ///     let notify = Arc::new(Notify::new());
567    ///     let notify2 = notify.clone();
568    ///
569    ///     tokio::spawn(async move {
570    ///         notify2.notified().await;
571    ///         println!("received notification");
572    ///     });
573    ///
574    ///     println!("sending notification");
575    ///     notify.notify_one();
576    /// }
577    /// ```
578    // Alias for old name in 0.x
579    #[cfg_attr(docsrs, doc(alias = "notify"))]
580    pub fn notify_one(&self) {
581        self.notify_with_strategy(NotifyOneStrategy::Fifo);
582    }
583
584    /// Notifies the last waiting task.
585    ///
586    /// This function behaves similar to `notify_one`. The only difference is that it wakes
587    /// the most recently added waiter instead of the oldest waiter.
588    ///
589    /// Check the [`notify_one()`] documentation for more info and
590    /// examples.
591    ///
592    /// [`notify_one()`]: Notify::notify_one
593    pub fn notify_last(&self) {
594        self.notify_with_strategy(NotifyOneStrategy::Lifo);
595    }
596
597    fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
598        // Load the current state
599        let mut curr = self.state.load(SeqCst);
600
601        // If the state is `EMPTY`, transition to `NOTIFIED` and return.
602        while let EMPTY | NOTIFIED = get_state(curr) {
603            // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
604            // happens-before synchronization must happen between this atomic
605            // operation and a task calling `notified().await`.
606            let new = set_state(curr, NOTIFIED);
607            let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
608
609            match res {
610                // No waiters, no further work to do
611                Ok(_) => return,
612                Err(actual) => {
613                    curr = actual;
614                }
615            }
616        }
617
618        // There are waiters, the lock must be acquired to notify.
619        let mut waiters = self.waiters.lock();
620
621        // The state must be reloaded while the lock is held. The state may only
622        // transition out of WAITING while the lock is held.
623        curr = self.state.load(SeqCst);
624
625        if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
626            drop(waiters);
627            waker.wake();
628        }
629    }
630
631    /// Notifies all waiting tasks.
632    ///
633    /// If a task is currently waiting, that task is notified. Unlike with
634    /// `notify_one()`, no permit is stored to be used by the next call to
635    /// `notified().await`. The purpose of this method is to notify all
636    /// already registered waiters. Registering for notification is done by
637    /// acquiring an instance of the `Notified` future via calling `notified()`.
638    ///
639    /// # Examples
640    ///
641    /// ```
642    /// use tokio::sync::Notify;
643    /// use std::sync::Arc;
644    ///
645    /// #[tokio::main]
646    /// async fn main() {
647    ///     let notify = Arc::new(Notify::new());
648    ///     let notify2 = notify.clone();
649    ///
650    ///     let notified1 = notify.notified();
651    ///     let notified2 = notify.notified();
652    ///
653    ///     let handle = tokio::spawn(async move {
654    ///         println!("sending notifications");
655    ///         notify2.notify_waiters();
656    ///     });
657    ///
658    ///     notified1.await;
659    ///     notified2.await;
660    ///     println!("received notifications");
661    /// }
662    /// ```
663    pub fn notify_waiters(&self) {
664        let mut waiters = self.waiters.lock();
665
666        // The state must be loaded while the lock is held. The state may only
667        // transition out of WAITING while the lock is held.
668        let curr = self.state.load(SeqCst);
669
670        if matches!(get_state(curr), EMPTY | NOTIFIED) {
671            // There are no waiting tasks. All we need to do is increment the
672            // number of times this method was called.
673            atomic_inc_num_notify_waiters_calls(&self.state);
674            return;
675        }
676
677        // Increment the number of times this method was called
678        // and transition to empty.
679        let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
680        self.state.store(new_state, SeqCst);
681
682        // It is critical for `GuardedLinkedList` safety that the guard node is
683        // pinned in memory and is not dropped until the guarded list is dropped.
684        let guard = Waiter::new();
685        pin!(guard);
686
687        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
688        // underneath to allow every waiter to safely remove itself from it.
689        //
690        // * This list will be still guarded by the `waiters` lock.
691        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
692        // * This wrapper will empty the list on drop. It is critical for safety
693        //   that we will not leave any list entry with a pointer to the local
694        //   guard node after this function returns / panics.
695        let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
696
697        let mut wakers = WakeList::new();
698        'outer: loop {
699            while wakers.can_push() {
700                match list.pop_back_locked(&mut waiters) {
701                    Some(waiter) => {
702                        // Safety: we never make mutable references to waiters.
703                        let waiter = unsafe { waiter.as_ref() };
704
705                        // Safety: we hold the lock, so we can access the waker.
706                        if let Some(waker) =
707                            unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
708                        {
709                            wakers.push(waker);
710                        }
711
712                        // This waiter is unlinked and will not be shared ever again, release it.
713                        waiter.notification.store_release(Notification::All);
714                    }
715                    None => {
716                        break 'outer;
717                    }
718                }
719            }
720
721            // Release the lock before notifying.
722            drop(waiters);
723
724            // One of the wakers may panic, but the remaining waiters will still
725            // be unlinked from the list in `NotifyWaitersList` destructor.
726            wakers.wake_all();
727
728            // Acquire the lock again.
729            waiters = self.waiters.lock();
730        }
731
732        // Release the lock before notifying
733        drop(waiters);
734
735        wakers.wake_all();
736    }
737}
738
739impl Default for Notify {
740    fn default() -> Notify {
741        Notify::new()
742    }
743}
744
745impl UnwindSafe for Notify {}
746impl RefUnwindSafe for Notify {}
747
748fn notify_locked(
749    waiters: &mut WaitList,
750    state: &AtomicUsize,
751    curr: usize,
752    strategy: NotifyOneStrategy,
753) -> Option<Waker> {
754    match get_state(curr) {
755        EMPTY | NOTIFIED => {
756            let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
757
758            match res {
759                Ok(_) => None,
760                Err(actual) => {
761                    let actual_state = get_state(actual);
762                    assert!(actual_state == EMPTY || actual_state == NOTIFIED);
763                    state.store(set_state(actual, NOTIFIED), SeqCst);
764                    None
765                }
766            }
767        }
768        WAITING => {
769            // At this point, it is guaranteed that the state will not
770            // concurrently change as holding the lock is required to
771            // transition **out** of `WAITING`.
772            //
773            // Get a pending waiter using one of the available dequeue strategies.
774            let waiter = match strategy {
775                NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
776                NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
777            };
778
779            // Safety: we never make mutable references to waiters.
780            let waiter = unsafe { waiter.as_ref() };
781
782            // Safety: we hold the lock, so we can access the waker.
783            let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
784
785            // This waiter is unlinked and will not be shared ever again, release it.
786            waiter
787                .notification
788                .store_release(Notification::One(strategy));
789
790            if waiters.is_empty() {
791                // As this the **final** waiter in the list, the state
792                // must be transitioned to `EMPTY`. As transitioning
793                // **from** `WAITING` requires the lock to be held, a
794                // `store` is sufficient.
795                state.store(set_state(curr, EMPTY), SeqCst);
796            }
797            waker
798        }
799        _ => unreachable!(),
800    }
801}
802
803// ===== impl Notified =====
804
805impl Notified<'_> {
806    /// Adds this future to the list of futures that are ready to receive
807    /// wakeups from calls to [`notify_one`].
808    ///
809    /// Polling the future also adds it to the list, so this method should only
810    /// be used if you want to add the future to the list before the first call
811    /// to `poll`. (In fact, this method is equivalent to calling `poll` except
812    /// that no `Waker` is registered.)
813    ///
814    /// This has no effect on notifications sent using [`notify_waiters`], which
815    /// are received as long as they happen after the creation of the `Notified`
816    /// regardless of whether `enable` or `poll` has been called.
817    ///
818    /// This method returns true if the `Notified` is ready. This happens in the
819    /// following situations:
820    ///
821    ///  1. The `notify_waiters` method was called between the creation of the
822    ///     `Notified` and the call to this method.
823    ///  2. This is the first call to `enable` or `poll` on this future, and the
824    ///     `Notify` was holding a permit from a previous call to `notify_one`.
825    ///     The call consumes the permit in that case.
826    ///  3. The future has previously been enabled or polled, and it has since
827    ///     then been marked ready by either consuming a permit from the
828    ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
829    ///     removed it from the list of futures ready to receive wakeups.
830    ///
831    /// If this method returns true, any future calls to poll on the same future
832    /// will immediately return `Poll::Ready`.
833    ///
834    /// # Examples
835    ///
836    /// Unbound multi-producer multi-consumer (mpmc) channel.
837    ///
838    /// The call to `enable` is important because otherwise if you have two
839    /// calls to `recv` and two calls to `send` in parallel, the following could
840    /// happen:
841    ///
842    ///  1. Both calls to `try_recv` return `None`.
843    ///  2. Both new elements are added to the vector.
844    ///  3. The `notify_one` method is called twice, adding only a single
845    ///     permit to the `Notify`.
846    ///  4. Both calls to `recv` reach the `Notified` future. One of them
847    ///     consumes the permit, and the other sleeps forever.
848    ///
849    /// By adding the `Notified` futures to the list by calling `enable` before
850    /// `try_recv`, the `notify_one` calls in step three would remove the
851    /// futures from the list and mark them notified instead of adding a permit
852    /// to the `Notify`. This ensures that both futures are woken.
853    ///
854    /// ```
855    /// use tokio::sync::Notify;
856    ///
857    /// use std::collections::VecDeque;
858    /// use std::sync::Mutex;
859    ///
860    /// struct Channel<T> {
861    ///     messages: Mutex<VecDeque<T>>,
862    ///     notify_on_sent: Notify,
863    /// }
864    ///
865    /// impl<T> Channel<T> {
866    ///     pub fn send(&self, msg: T) {
867    ///         let mut locked_queue = self.messages.lock().unwrap();
868    ///         locked_queue.push_back(msg);
869    ///         drop(locked_queue);
870    ///
871    ///         // Send a notification to one of the calls currently
872    ///         // waiting in a call to `recv`.
873    ///         self.notify_on_sent.notify_one();
874    ///     }
875    ///
876    ///     pub fn try_recv(&self) -> Option<T> {
877    ///         let mut locked_queue = self.messages.lock().unwrap();
878    ///         locked_queue.pop_front()
879    ///     }
880    ///
881    ///     pub async fn recv(&self) -> T {
882    ///         let future = self.notify_on_sent.notified();
883    ///         tokio::pin!(future);
884    ///
885    ///         loop {
886    ///             // Make sure that no wakeup is lost if we get
887    ///             // `None` from `try_recv`.
888    ///             future.as_mut().enable();
889    ///
890    ///             if let Some(msg) = self.try_recv() {
891    ///                 return msg;
892    ///             }
893    ///
894    ///             // Wait for a call to `notify_one`.
895    ///             //
896    ///             // This uses `.as_mut()` to avoid consuming the future,
897    ///             // which lets us call `Pin::set` below.
898    ///             future.as_mut().await;
899    ///
900    ///             // Reset the future in case another call to
901    ///             // `try_recv` got the message before us.
902    ///             future.set(self.notify_on_sent.notified());
903    ///         }
904    ///     }
905    /// }
906    /// ```
907    ///
908    /// [`notify_one`]: Notify::notify_one()
909    /// [`notify_waiters`]: Notify::notify_waiters()
910    pub fn enable(self: Pin<&mut Self>) -> bool {
911        self.poll_notified(None).is_ready()
912    }
913
914    /// A custom `project` implementation is used in place of `pin-project-lite`
915    /// as a custom drop implementation is needed.
916    fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
917        unsafe {
918            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
919
920            is_unpin::<&Notify>();
921            is_unpin::<State>();
922            is_unpin::<usize>();
923
924            let me = self.get_unchecked_mut();
925            (
926                me.notify,
927                &mut me.state,
928                &me.notify_waiters_calls,
929                &me.waiter,
930            )
931        }
932    }
933
934    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
935        let (notify, state, notify_waiters_calls, waiter) = self.project();
936
937        'outer_loop: loop {
938            match *state {
939                State::Init => {
940                    let curr = notify.state.load(SeqCst);
941
942                    // Optimistically try acquiring a pending notification
943                    let res = notify.state.compare_exchange(
944                        set_state(curr, NOTIFIED),
945                        set_state(curr, EMPTY),
946                        SeqCst,
947                        SeqCst,
948                    );
949
950                    if res.is_ok() {
951                        // Acquired the notification
952                        *state = State::Done;
953                        continue 'outer_loop;
954                    }
955
956                    // Clone the waker before locking, a waker clone can be
957                    // triggering arbitrary code.
958                    let waker = waker.cloned();
959
960                    // Acquire the lock and attempt to transition to the waiting
961                    // state.
962                    let mut waiters = notify.waiters.lock();
963
964                    // Reload the state with the lock held
965                    let mut curr = notify.state.load(SeqCst);
966
967                    // if notify_waiters has been called after the future
968                    // was created, then we are done
969                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
970                        *state = State::Done;
971                        continue 'outer_loop;
972                    }
973
974                    // Transition the state to WAITING.
975                    loop {
976                        match get_state(curr) {
977                            EMPTY => {
978                                // Transition to WAITING
979                                let res = notify.state.compare_exchange(
980                                    set_state(curr, EMPTY),
981                                    set_state(curr, WAITING),
982                                    SeqCst,
983                                    SeqCst,
984                                );
985
986                                if let Err(actual) = res {
987                                    assert_eq!(get_state(actual), NOTIFIED);
988                                    curr = actual;
989                                } else {
990                                    break;
991                                }
992                            }
993                            WAITING => break,
994                            NOTIFIED => {
995                                // Try consuming the notification
996                                let res = notify.state.compare_exchange(
997                                    set_state(curr, NOTIFIED),
998                                    set_state(curr, EMPTY),
999                                    SeqCst,
1000                                    SeqCst,
1001                                );
1002
1003                                match res {
1004                                    Ok(_) => {
1005                                        // Acquired the notification
1006                                        *state = State::Done;
1007                                        continue 'outer_loop;
1008                                    }
1009                                    Err(actual) => {
1010                                        assert_eq!(get_state(actual), EMPTY);
1011                                        curr = actual;
1012                                    }
1013                                }
1014                            }
1015                            _ => unreachable!(),
1016                        }
1017                    }
1018
1019                    let mut old_waker = None;
1020                    if waker.is_some() {
1021                        // Safety: called while locked.
1022                        //
1023                        // The use of `old_waiter` here is not necessary, as the field is always
1024                        // None when we reach this line.
1025                        unsafe {
1026                            old_waker =
1027                                waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1028                        }
1029                    }
1030
1031                    // Insert the waiter into the linked list
1032                    waiters.push_front(NonNull::from(waiter));
1033
1034                    *state = State::Waiting;
1035
1036                    drop(waiters);
1037                    drop(old_waker);
1038
1039                    return Poll::Pending;
1040                }
1041                State::Waiting => {
1042                    #[cfg(tokio_taskdump)]
1043                    if let Some(waker) = waker {
1044                        let mut ctx = Context::from_waker(waker);
1045                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1046                    }
1047
1048                    if waiter.notification.load(Acquire).is_some() {
1049                        // Safety: waiter is already unlinked and will not be shared again,
1050                        // so we have an exclusive access to `waker`.
1051                        drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1052
1053                        waiter.notification.clear();
1054                        *state = State::Done;
1055                        return Poll::Ready(());
1056                    }
1057
1058                    // Our waiter was not notified, implying it is still stored in a waiter
1059                    // list (guarded by `notify.waiters`). In order to access the waker
1060                    // fields, we must acquire the lock.
1061
1062                    let mut old_waker = None;
1063                    let mut waiters = notify.waiters.lock();
1064
1065                    // We hold the lock and notifications are set only with the lock held,
1066                    // so this can be relaxed, because the happens-before relationship is
1067                    // established through the mutex.
1068                    if waiter.notification.load(Relaxed).is_some() {
1069                        // Safety: waiter is already unlinked and will not be shared again,
1070                        // so we have an exclusive access to `waker`.
1071                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1072
1073                        waiter.notification.clear();
1074
1075                        // Drop the old waker after releasing the lock.
1076                        drop(waiters);
1077                        drop(old_waker);
1078
1079                        *state = State::Done;
1080                        return Poll::Ready(());
1081                    }
1082
1083                    // Load the state with the lock held.
1084                    let curr = notify.state.load(SeqCst);
1085
1086                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1087                        // Before we add a waiter to the list we check if these numbers are
1088                        // different while holding the lock. If these numbers are different now,
1089                        // it means that there is a call to `notify_waiters` in progress and this
1090                        // waiter must be contained by a guarded list used in `notify_waiters`.
1091                        // We can treat the waiter as notified and remove it from the list, as
1092                        // it would have been notified in the `notify_waiters` call anyways.
1093
1094                        // Safety: we hold the lock, so we can modify the waker.
1095                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1096
1097                        // Safety: we hold the lock, so we have an exclusive access to the list.
1098                        // The list is used in `notify_waiters`, so it must be guarded.
1099                        unsafe { waiters.remove(NonNull::from(waiter)) };
1100
1101                        *state = State::Done;
1102                    } else {
1103                        // Safety: we hold the lock, so we can modify the waker.
1104                        unsafe {
1105                            waiter.waker.with_mut(|v| {
1106                                if let Some(waker) = waker {
1107                                    let should_update = match &*v {
1108                                        Some(current_waker) => !current_waker.will_wake(waker),
1109                                        None => true,
1110                                    };
1111                                    if should_update {
1112                                        old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1113                                    }
1114                                }
1115                            });
1116                        }
1117
1118                        // Drop the old waker after releasing the lock.
1119                        drop(waiters);
1120                        drop(old_waker);
1121
1122                        return Poll::Pending;
1123                    }
1124
1125                    // Explicit drop of the lock to indicate the scope that the
1126                    // lock is held. Because holding the lock is required to
1127                    // ensure safe access to fields not held within the lock, it
1128                    // is helpful to visualize the scope of the critical
1129                    // section.
1130                    drop(waiters);
1131
1132                    // Drop the old waker after releasing the lock.
1133                    drop(old_waker);
1134                }
1135                State::Done => {
1136                    #[cfg(tokio_taskdump)]
1137                    if let Some(waker) = waker {
1138                        let mut ctx = Context::from_waker(waker);
1139                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1140                    }
1141                    return Poll::Ready(());
1142                }
1143            }
1144        }
1145    }
1146}
1147
1148impl Future for Notified<'_> {
1149    type Output = ();
1150
1151    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1152        self.poll_notified(Some(cx.waker()))
1153    }
1154}
1155
1156impl Drop for Notified<'_> {
1157    fn drop(&mut self) {
1158        // Safety: The type only transitions to a "Waiting" state when pinned.
1159        let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1160
1161        // This is where we ensure safety. The `Notified` value is being
1162        // dropped, which means we must ensure that the waiter entry is no
1163        // longer stored in the linked list.
1164        if matches!(*state, State::Waiting) {
1165            let mut waiters = notify.waiters.lock();
1166            let mut notify_state = notify.state.load(SeqCst);
1167
1168            // We hold the lock, so this field is not concurrently accessed by
1169            // `notify_*` functions and we can use the relaxed ordering.
1170            let notification = waiter.notification.load(Relaxed);
1171
1172            // remove the entry from the list (if not already removed)
1173            //
1174            // Safety: we hold the lock, so we have an exclusive access to every list the
1175            // waiter may be contained in. If the node is not contained in the `waiters`
1176            // list, then it is contained by a guarded list used by `notify_waiters`.
1177            unsafe { waiters.remove(NonNull::from(waiter)) };
1178
1179            if waiters.is_empty() && get_state(notify_state) == WAITING {
1180                notify_state = set_state(notify_state, EMPTY);
1181                notify.state.store(notify_state, SeqCst);
1182            }
1183
1184            // See if the node was notified but not received. In this case, if
1185            // the notification was triggered via `notify_one`, it must be sent
1186            // to the next waiter.
1187            if let Some(Notification::One(strategy)) = notification {
1188                if let Some(waker) =
1189                    notify_locked(&mut waiters, &notify.state, notify_state, strategy)
1190                {
1191                    drop(waiters);
1192                    waker.wake();
1193                }
1194            }
1195        }
1196    }
1197}
1198
1199/// # Safety
1200///
1201/// `Waiter` is forced to be !Unpin.
1202unsafe impl linked_list::Link for Waiter {
1203    type Handle = NonNull<Waiter>;
1204    type Target = Waiter;
1205
1206    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1207        *handle
1208    }
1209
1210    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1211        ptr
1212    }
1213
1214    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1215        Waiter::addr_of_pointers(target)
1216    }
1217}
1218
1219fn is_unpin<T: Unpin>() {}