hyper/common/
watch.rs

1//! An SPSC broadcast channel.
2//!
3//! - The value can only be a `usize`.
4//! - The consumer is only notified if the value is different.
5//! - The value `0` is reserved for closed.
6
7use futures_util::task::AtomicWaker;
8use std::sync::{
9    atomic::{AtomicUsize, Ordering},
10    Arc,
11};
12use std::task;
13
14type Value = usize;
15
16pub(crate) const CLOSED: usize = 0;
17
18pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
19    debug_assert!(
20        initial != CLOSED,
21        "watch::channel initial state of 0 is reserved"
22    );
23
24    let shared = Arc::new(Shared {
25        value: AtomicUsize::new(initial),
26        waker: AtomicWaker::new(),
27    });
28
29    (
30        Sender {
31            shared: shared.clone(),
32        },
33        Receiver { shared },
34    )
35}
36
37pub(crate) struct Sender {
38    shared: Arc<Shared>,
39}
40
41pub(crate) struct Receiver {
42    shared: Arc<Shared>,
43}
44
45struct Shared {
46    value: AtomicUsize,
47    waker: AtomicWaker,
48}
49
50impl Sender {
51    pub(crate) fn send(&mut self, value: Value) {
52        if self.shared.value.swap(value, Ordering::SeqCst) != value {
53            self.shared.waker.wake();
54        }
55    }
56}
57
58impl Drop for Sender {
59    fn drop(&mut self) {
60        self.send(CLOSED);
61    }
62}
63
64impl Receiver {
65    pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
66        self.shared.waker.register(cx.waker());
67        self.shared.value.load(Ordering::SeqCst)
68    }
69
70    pub(crate) fn peek(&self) -> Value {
71        self.shared.value.load(Ordering::Relaxed)
72    }
73}