1use std::{cell::UnsafeCell, pin::Pin};
2
3use atomic::{Atomic, Ordering};
4
5use crate::stream::{PollRecv, Stream};
6
7use crate::Context;
8#[derive(Copy, Clone)]
9enum State {
10 Ready,
11 Taken,
12}
13
14pub struct OnceStream<T> {
15 state: Atomic<State>,
16 data: UnsafeCell<Option<T>>,
17}
18
19impl<T> OnceStream<T> {
20 pub fn new(item: T) -> Self {
21 Self {
22 state: Atomic::new(State::Ready),
23 data: UnsafeCell::new(Some(item)),
24 }
25 }
26}
27
28impl<T> Stream for OnceStream<T> {
29 type Item = T;
30
31 fn poll_recv(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> PollRecv<Self::Item> {
32 if self
33 .state
34 .compare_exchange(
35 State::Ready,
36 State::Taken,
37 Ordering::AcqRel,
38 Ordering::Relaxed,
39 )
40 .is_ok()
41 {
42 let value = unsafe {
43 let reference = self.data.get().as_mut().unwrap();
44 reference.take().unwrap()
45 };
46
47 return PollRecv::Ready(value);
48 }
49
50 PollRecv::Closed
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use std::pin::Pin;
57
58 use crate::{
59 stream::{PollRecv, Stream},
60 Context,
61 };
62
63 #[test]
64 fn test() {
65 let mut repeat = crate::stream::once(1usize);
66 let mut cx = Context::empty();
67
68 assert_eq!(PollRecv::Ready(1), Pin::new(&mut repeat).poll_recv(&mut cx));
69 assert_eq!(PollRecv::Closed, Pin::new(&mut repeat).poll_recv(&mut cx));
70 }
71}