futures_util/future/try_future/
try_flatten.rs

1use core::pin::Pin;
2use futures_core::future::{FusedFuture, Future, TryFuture};
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream, TryStream};
5use futures_core::task::{Context, Poll};
6#[cfg(feature = "sink")]
7use futures_sink::Sink;
8use pin_project_lite::pin_project;
9
10pin_project! {
11    #[project = TryFlattenProj]
12    #[derive(Debug)]
13    pub enum TryFlatten<Fut1, Fut2> {
14        First { #[pin] f: Fut1 },
15        Second { #[pin] f: Fut2 },
16        Empty,
17    }
18}
19
20impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
21    pub(crate) fn new(future: Fut1) -> Self {
22        Self::First { f: future }
23    }
24}
25
26impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
27where
28    Fut: TryFuture,
29    Fut::Ok: TryFuture<Error = Fut::Error>,
30{
31    fn is_terminated(&self) -> bool {
32        match self {
33            Self::Empty => true,
34            _ => false,
35        }
36    }
37}
38
39impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
40where
41    Fut: TryFuture,
42    Fut::Ok: TryFuture<Error = Fut::Error>,
43{
44    type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
45
46    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        Poll::Ready(loop {
48            match self.as_mut().project() {
49                TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
50                    Ok(f) => self.set(Self::Second { f }),
51                    Err(e) => {
52                        self.set(Self::Empty);
53                        break Err(e);
54                    }
55                },
56                TryFlattenProj::Second { f } => {
57                    let output = ready!(f.try_poll(cx));
58                    self.set(Self::Empty);
59                    break output;
60                }
61                TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
62            }
63        })
64    }
65}
66
67impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
68where
69    Fut: TryFuture,
70    Fut::Ok: TryStream<Error = Fut::Error>,
71{
72    fn is_terminated(&self) -> bool {
73        match self {
74            Self::Empty => true,
75            _ => false,
76        }
77    }
78}
79
80impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
81where
82    Fut: TryFuture,
83    Fut::Ok: TryStream<Error = Fut::Error>,
84{
85    type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
86
87    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        Poll::Ready(loop {
89            match self.as_mut().project() {
90                TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
91                    Ok(f) => self.set(Self::Second { f }),
92                    Err(e) => {
93                        self.set(Self::Empty);
94                        break Some(Err(e));
95                    }
96                },
97                TryFlattenProj::Second { f } => {
98                    let output = ready!(f.try_poll_next(cx));
99                    if output.is_none() {
100                        self.set(Self::Empty);
101                    }
102                    break output;
103                }
104                TryFlattenProj::Empty => break None,
105            }
106        })
107    }
108}
109
110#[cfg(feature = "sink")]
111impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok>
112where
113    Fut: TryFuture,
114    Fut::Ok: Sink<Item, Error = Fut::Error>,
115{
116    type Error = Fut::Error;
117
118    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119        Poll::Ready(loop {
120            match self.as_mut().project() {
121                TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
122                    Ok(f) => self.set(Self::Second { f }),
123                    Err(e) => {
124                        self.set(Self::Empty);
125                        break Err(e);
126                    }
127                },
128                TryFlattenProj::Second { f } => {
129                    break ready!(f.poll_ready(cx));
130                }
131                TryFlattenProj::Empty => panic!("poll_ready called after eof"),
132            }
133        })
134    }
135
136    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
137        match self.project() {
138            TryFlattenProj::First { .. } => panic!("poll_ready not called first"),
139            TryFlattenProj::Second { f } => f.start_send(item),
140            TryFlattenProj::Empty => panic!("start_send called after eof"),
141        }
142    }
143
144    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145        match self.project() {
146            TryFlattenProj::First { .. } => Poll::Ready(Ok(())),
147            TryFlattenProj::Second { f } => f.poll_flush(cx),
148            TryFlattenProj::Empty => panic!("poll_flush called after eof"),
149        }
150    }
151
152    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153        let res = match self.as_mut().project() {
154            TryFlattenProj::Second { f } => f.poll_close(cx),
155            _ => Poll::Ready(Ok(())),
156        };
157        if res.is_ready() {
158            self.set(Self::Empty);
159        }
160        res
161    }
162}