postage/sync/
notifier.rs

1use atomic::Ordering;
2use crossbeam_queue::SegQueue;
3use std::{sync::atomic::AtomicUsize, task::Waker};
4
5#[derive(Debug)]
6pub struct Notifier {
7    generation: AtomicUsize,
8    wakers: SegQueue<Waker>,
9}
10
11impl Notifier {
12    pub fn new() -> Self {
13        Self {
14            generation: AtomicUsize::new(0),
15            wakers: SegQueue::new(),
16        }
17    }
18
19    pub fn guard(&self) -> NotificationGuard {
20        let generation = self.generation.load(Ordering::Relaxed);
21
22        NotificationGuard {
23            generation,
24            stored_generation: &self.generation,
25        }
26    }
27
28    pub fn notify(&self) {
29        self.generation.fetch_add(1, Ordering::AcqRel);
30
31        #[cfg(feature = "debug")]
32        let mut woken = 0usize;
33
34        while let Some(waker) = self.wakers.pop() {
35            #[cfg(feature = "debug")]
36            {
37                woken += 1;
38            }
39
40            waker.wake();
41        }
42
43        #[cfg(feature = "debug")]
44        if woken > 0 {
45            log::info!("Woke {} tasks", woken);
46        }
47    }
48
49    pub fn subscribe(&self, cx: &crate::Context<'_>) {
50        if let Some(waker) = cx.waker() {
51            self.wakers.push(waker.clone());
52        }
53    }
54}
55
56pub struct NotificationGuard<'a> {
57    generation: usize,
58    stored_generation: &'a AtomicUsize,
59}
60
61impl<'a> NotificationGuard<'a> {
62    pub fn is_expired(&self) -> bool {
63        self.stored_generation.load(Ordering::Relaxed) != self.generation
64    }
65}