1use atomic::{Atomic, Ordering};
2
3use crate::{stream::PollRecv, Context};
4
5use super::{
6 notifier::Notifier,
7 oneshot_cell::{OneshotCell, TryRecvError},
8};
9
10#[derive(Copy, Clone)]
11enum State {
12 Alive,
13 Dead,
14}
15
16pub struct Transfer<T: Sized> {
17 sender: Atomic<State>,
18 receiver: Atomic<State>,
19 value: OneshotCell<T>,
20 notify_rx: Notifier,
21}
22
23impl<T> Transfer<T> {
24 pub fn new() -> Self {
25 Self {
26 sender: Atomic::new(State::Alive),
27 receiver: Atomic::new(State::Alive),
28 value: OneshotCell::new(),
29 notify_rx: Notifier::new(),
30 }
31 }
32
33 pub fn send(&self, value: T) -> Result<(), T> {
34 if let State::Dead = self.receiver.load(Ordering::Acquire) {
35 return Err(value);
36 }
37
38 self.value.send(value)?;
39 self.notify_rx.notify();
40
41 Ok(())
42 }
43
44 pub fn recv(&self, cx: &Context<'_>) -> PollRecv<T> {
45 loop {
46 let guard = self.notify_rx.guard();
47 match self.value.try_recv() {
48 Ok(value) => return PollRecv::Ready(value),
49 Err(TryRecvError::Pending) => {
50 if let State::Dead = self.sender.load(Ordering::Acquire) {
51 return match self.value.try_recv() {
52 Ok(v) => PollRecv::Ready(v),
53 Err(TryRecvError::Pending) => PollRecv::Closed,
54 Err(TryRecvError::Closed) => PollRecv::Closed,
55 };
56 }
57
58 self.notify_rx.subscribe(cx);
59
60 if guard.is_expired() {
61 continue;
62 }
63
64 return PollRecv::Pending;
65 }
66 Err(TryRecvError::Closed) => return PollRecv::Closed,
67 }
68 }
69 }
70
71 pub fn sender_disconnect(&self) {
72 self.sender.store(State::Dead, Ordering::Release);
73 self.notify_rx.notify();
74 }
75
76 pub fn receiver_disconnect(&self) {
77 self.receiver.store(State::Dead, Ordering::Release);
78 }
79}