1use std::sync::Arc;
2
3use notifier::Notifier;
4use ref_count::RefCount;
5use std::fmt::Debug;
6
7use crate::Context;
8
9use self::{notifier::NotificationGuard, ref_count::TryDecrement};
10
11pub mod mpmc_circular_buffer;
12pub mod notifier;
13mod oneshot_cell;
14mod ref_count;
15mod state_cell;
17pub(crate) mod transfer;
18
19pub(crate) fn shared<E>(extension: E) -> (SenderShared<E>, ReceiverShared<E>) {
20 let inner = Arc::new(Shared::new(extension));
21
22 let sender = SenderShared {
23 inner: inner.clone(),
24 };
25
26 let receiver = ReceiverShared { inner };
27
28 (sender, receiver)
29}
30
31#[derive(Debug)]
32pub struct Shared<E> {
33 sender_notify: Notifier,
34 sender_count: RefCount,
35 receiver_notify: Notifier,
36 receiver_count: RefCount,
37 pub(crate) extension: E,
38}
39
40impl<E> Shared<E> {
41 pub fn new(extension: E) -> Self {
42 Self {
43 sender_notify: Notifier::new(),
44 sender_count: RefCount::new(1),
45 receiver_notify: Notifier::new(),
46 receiver_count: RefCount::new(1),
47 extension,
48 }
49 }
50}
51
52pub struct SenderShared<E> {
53 inner: Arc<Shared<E>>,
54}
55
56impl<E> SenderShared<E> {
57 pub fn extension(&self) -> &E {
58 &self.inner.extension
59 }
60
61 pub fn notify_receivers(&self) {
62 self.inner.receiver_notify.notify();
63 }
64
65 pub fn notify_self(&self) {
66 self.inner.sender_notify.notify();
67 }
68
69 pub fn subscribe_recv(&self, cx: &Context<'_>) {
70 self.inner.sender_notify.subscribe(cx);
71 }
72
73 pub fn recv_guard(&self) -> NotificationGuard {
74 self.inner.sender_notify.guard()
75 }
76
77 pub fn is_alive(&self) -> bool {
78 self.inner.receiver_count.is_alive()
79 }
80
81 pub fn clone_receiver(&self) -> ReceiverShared<E> {
82 self.inner.receiver_count.increment();
83
84 ReceiverShared {
85 inner: self.inner.clone(),
86 }
87 }
88
89 pub fn is_closed(&self) -> bool {
90 !self.is_alive()
91 }
92}
93
94impl<E> Debug for SenderShared<E>
95where
96 E: Debug,
97{
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 self.inner.fmt(f)
100 }
101}
102
103impl<E> Clone for SenderShared<E> {
104 fn clone(&self) -> Self {
105 let inner = self.inner.clone();
106 inner.sender_count.increment();
107
108 Self { inner }
109 }
110}
111
112impl<E> Drop for SenderShared<E> {
113 fn drop(&mut self) {
114 match self.inner.sender_count.decrement() {
115 TryDecrement::Alive(_) => {}
116 TryDecrement::Dead => {
117 self.notify_receivers();
118 }
119 }
120 }
121}
122
123pub struct ReceiverShared<E> {
124 pub(crate) inner: Arc<Shared<E>>,
125}
126
127impl<E> ReceiverShared<E> {
128 pub fn extension(&self) -> &E {
129 &self.inner.extension
130 }
131
132 pub fn notify_senders(&self) {
133 self.inner.sender_notify.notify();
134 }
135
136 pub fn subscribe_send(&self, cx: &Context<'_>) {
137 self.inner.receiver_notify.subscribe(cx);
138 }
139
140 pub fn send_guard(&self) -> NotificationGuard {
141 self.inner.receiver_notify.guard()
142 }
143
144 pub fn is_alive(&self) -> bool {
145 self.inner.sender_count.is_alive()
146 }
147
148 pub fn is_closed(&self) -> bool {
149 !self.is_alive()
150 }
151}
152
153impl<E> Clone for ReceiverShared<E> {
154 fn clone(&self) -> Self {
155 let inner = self.inner.clone();
156 inner.receiver_count.increment();
157
158 Self { inner }
159 }
160}
161
162impl<E> Drop for ReceiverShared<E> {
163 fn drop(&mut self) {
164 match self.inner.receiver_count.decrement() {
165 TryDecrement::Alive(_) => {}
166 TryDecrement::Dead => {
167 self.notify_senders();
168 }
169 }
170 }
171}