postage/stream/
once.rs

1use std::{cell::UnsafeCell, pin::Pin};
2
3use atomic::{Atomic, Ordering};
4
5use crate::stream::{PollRecv, Stream};
6
7use crate::Context;
8#[derive(Copy, Clone)]
9enum State {
10    Ready,
11    Taken,
12}
13
14pub struct OnceStream<T> {
15    state: Atomic<State>,
16    data: UnsafeCell<Option<T>>,
17}
18
19impl<T> OnceStream<T> {
20    pub fn new(item: T) -> Self {
21        Self {
22            state: Atomic::new(State::Ready),
23            data: UnsafeCell::new(Some(item)),
24        }
25    }
26}
27
28impl<T> Stream for OnceStream<T> {
29    type Item = T;
30
31    fn poll_recv(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> PollRecv<Self::Item> {
32        if self
33            .state
34            .compare_exchange(
35                State::Ready,
36                State::Taken,
37                Ordering::AcqRel,
38                Ordering::Relaxed,
39            )
40            .is_ok()
41        {
42            let value = unsafe {
43                let reference = self.data.get().as_mut().unwrap();
44                reference.take().unwrap()
45            };
46
47            return PollRecv::Ready(value);
48        }
49
50        PollRecv::Closed
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use std::pin::Pin;
57
58    use crate::{
59        stream::{PollRecv, Stream},
60        Context,
61    };
62
63    #[test]
64    fn test() {
65        let mut repeat = crate::stream::once(1usize);
66        let mut cx = Context::empty();
67
68        assert_eq!(PollRecv::Ready(1), Pin::new(&mut repeat).poll_recv(&mut cx));
69        assert_eq!(PollRecv::Closed, Pin::new(&mut repeat).poll_recv(&mut cx));
70    }
71}