postage/sync/
transfer.rs

1use atomic::{Atomic, Ordering};
2
3use crate::{stream::PollRecv, Context};
4
5use super::{
6    notifier::Notifier,
7    oneshot_cell::{OneshotCell, TryRecvError},
8};
9
10#[derive(Copy, Clone)]
11enum State {
12    Alive,
13    Dead,
14}
15
16pub struct Transfer<T: Sized> {
17    sender: Atomic<State>,
18    receiver: Atomic<State>,
19    value: OneshotCell<T>,
20    notify_rx: Notifier,
21}
22
23impl<T> Transfer<T> {
24    pub fn new() -> Self {
25        Self {
26            sender: Atomic::new(State::Alive),
27            receiver: Atomic::new(State::Alive),
28            value: OneshotCell::new(),
29            notify_rx: Notifier::new(),
30        }
31    }
32
33    pub fn send(&self, value: T) -> Result<(), T> {
34        if let State::Dead = self.receiver.load(Ordering::Acquire) {
35            return Err(value);
36        }
37
38        self.value.send(value)?;
39        self.notify_rx.notify();
40
41        Ok(())
42    }
43
44    pub fn recv(&self, cx: &Context<'_>) -> PollRecv<T> {
45        loop {
46            let guard = self.notify_rx.guard();
47            match self.value.try_recv() {
48                Ok(value) => return PollRecv::Ready(value),
49                Err(TryRecvError::Pending) => {
50                    if let State::Dead = self.sender.load(Ordering::Acquire) {
51                        return match self.value.try_recv() {
52                            Ok(v) => PollRecv::Ready(v),
53                            Err(TryRecvError::Pending) => PollRecv::Closed,
54                            Err(TryRecvError::Closed) => PollRecv::Closed,
55                        };
56                    }
57
58                    self.notify_rx.subscribe(cx);
59
60                    if guard.is_expired() {
61                        continue;
62                    }
63
64                    return PollRecv::Pending;
65                }
66                Err(TryRecvError::Closed) => return PollRecv::Closed,
67            }
68        }
69    }
70
71    pub fn sender_disconnect(&self) {
72        self.sender.store(State::Dead, Ordering::Release);
73        self.notify_rx.notify();
74    }
75
76    pub fn receiver_disconnect(&self) {
77        self.receiver.store(State::Dead, Ordering::Release);
78    }
79}