postage/
sync.rs

1use std::sync::Arc;
2
3use notifier::Notifier;
4use ref_count::RefCount;
5use std::fmt::Debug;
6
7use crate::Context;
8
9use self::{notifier::NotificationGuard, ref_count::TryDecrement};
10
11pub mod mpmc_circular_buffer;
12pub mod notifier;
13mod oneshot_cell;
14mod ref_count;
15// mod rr_lock;
16mod state_cell;
17pub(crate) mod transfer;
18
19pub(crate) fn shared<E>(extension: E) -> (SenderShared<E>, ReceiverShared<E>) {
20    let inner = Arc::new(Shared::new(extension));
21
22    let sender = SenderShared {
23        inner: inner.clone(),
24    };
25
26    let receiver = ReceiverShared { inner };
27
28    (sender, receiver)
29}
30
31#[derive(Debug)]
32pub struct Shared<E> {
33    sender_notify: Notifier,
34    sender_count: RefCount,
35    receiver_notify: Notifier,
36    receiver_count: RefCount,
37    pub(crate) extension: E,
38}
39
40impl<E> Shared<E> {
41    pub fn new(extension: E) -> Self {
42        Self {
43            sender_notify: Notifier::new(),
44            sender_count: RefCount::new(1),
45            receiver_notify: Notifier::new(),
46            receiver_count: RefCount::new(1),
47            extension,
48        }
49    }
50}
51
52pub struct SenderShared<E> {
53    inner: Arc<Shared<E>>,
54}
55
56impl<E> SenderShared<E> {
57    pub fn extension(&self) -> &E {
58        &self.inner.extension
59    }
60
61    pub fn notify_receivers(&self) {
62        self.inner.receiver_notify.notify();
63    }
64
65    pub fn notify_self(&self) {
66        self.inner.sender_notify.notify();
67    }
68
69    pub fn subscribe_recv(&self, cx: &Context<'_>) {
70        self.inner.sender_notify.subscribe(cx);
71    }
72
73    pub fn recv_guard(&self) -> NotificationGuard {
74        self.inner.sender_notify.guard()
75    }
76
77    pub fn is_alive(&self) -> bool {
78        self.inner.receiver_count.is_alive()
79    }
80
81    pub fn clone_receiver(&self) -> ReceiverShared<E> {
82        self.inner.receiver_count.increment();
83
84        ReceiverShared {
85            inner: self.inner.clone(),
86        }
87    }
88
89    pub fn is_closed(&self) -> bool {
90        !self.is_alive()
91    }
92}
93
94impl<E> Debug for SenderShared<E>
95where
96    E: Debug,
97{
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        self.inner.fmt(f)
100    }
101}
102
103impl<E> Clone for SenderShared<E> {
104    fn clone(&self) -> Self {
105        let inner = self.inner.clone();
106        inner.sender_count.increment();
107
108        Self { inner }
109    }
110}
111
112impl<E> Drop for SenderShared<E> {
113    fn drop(&mut self) {
114        match self.inner.sender_count.decrement() {
115            TryDecrement::Alive(_) => {}
116            TryDecrement::Dead => {
117                self.notify_receivers();
118            }
119        }
120    }
121}
122
123pub struct ReceiverShared<E> {
124    pub(crate) inner: Arc<Shared<E>>,
125}
126
127impl<E> ReceiverShared<E> {
128    pub fn extension(&self) -> &E {
129        &self.inner.extension
130    }
131
132    pub fn notify_senders(&self) {
133        self.inner.sender_notify.notify();
134    }
135
136    pub fn subscribe_send(&self, cx: &Context<'_>) {
137        self.inner.receiver_notify.subscribe(cx);
138    }
139
140    pub fn send_guard(&self) -> NotificationGuard {
141        self.inner.receiver_notify.guard()
142    }
143
144    pub fn is_alive(&self) -> bool {
145        self.inner.sender_count.is_alive()
146    }
147
148    pub fn is_closed(&self) -> bool {
149        !self.is_alive()
150    }
151}
152
153impl<E> Clone for ReceiverShared<E> {
154    fn clone(&self) -> Self {
155        let inner = self.inner.clone();
156        inner.receiver_count.increment();
157
158        Self { inner }
159    }
160}
161
162impl<E> Drop for ReceiverShared<E> {
163    fn drop(&mut self) {
164        match self.inner.receiver_count.decrement() {
165            TryDecrement::Alive(_) => {}
166            TryDecrement::Dead => {
167                self.notify_senders();
168            }
169        }
170    }
171}