h2/proto/streams/
state.rs

1use std::fmt;
2use std::io;
3
4use crate::codec::UserError;
5use crate::frame::{self, Reason, StreamId};
6use crate::proto::{self, Error, Initiator, PollReset};
7
8use self::Inner::*;
9use self::Peer::*;
10
11/// Represents the state of an H2 stream
12///
13/// ```not_rust
14///                              +--------+
15///                      send PP |        | recv PP
16///                     ,--------|  idle  |--------.
17///                    /         |        |         \
18///                   v          +--------+          v
19///            +----------+          |           +----------+
20///            |          |          | send H /  |          |
21///     ,------| reserved |          | recv H    | reserved |------.
22///     |      | (local)  |          |           | (remote) |      |
23///     |      +----------+          v           +----------+      |
24///     |          |             +--------+             |          |
25///     |          |     recv ES |        | send ES     |          |
26///     |   send H |     ,-------|  open  |-------.     | recv H   |
27///     |          |    /        |        |        \    |          |
28///     |          v   v         +--------+         v   v          |
29///     |      +----------+          |           +----------+      |
30///     |      |   half   |          |           |   half   |      |
31///     |      |  closed  |          | send R /  |  closed  |      |
32///     |      | (remote) |          | recv R    | (local)  |      |
33///     |      +----------+          |           +----------+      |
34///     |           |                |                 |           |
35///     |           | send ES /      |       recv ES / |           |
36///     |           | send R /       v        send R / |           |
37///     |           | recv R     +--------+   recv R   |           |
38///     | send R /  `----------->|        |<-----------'  send R / |
39///     | recv R                 | closed |               recv R   |
40///     `----------------------->|        |<----------------------'
41///                              +--------+
42///
43///        send:   endpoint sends this frame
44///        recv:   endpoint receives this frame
45///
46///        H:  HEADERS frame (with implied CONTINUATIONs)
47///        PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
48///        ES: END_STREAM flag
49///        R:  RST_STREAM frame
50/// ```
51#[derive(Clone)]
52pub struct State {
53    inner: Inner,
54}
55
56#[derive(Debug, Clone)]
57enum Inner {
58    Idle,
59    // TODO: these states shouldn't count against concurrency limits:
60    ReservedLocal,
61    ReservedRemote,
62    Open { local: Peer, remote: Peer },
63    HalfClosedLocal(Peer), // TODO: explicitly name this value
64    HalfClosedRemote(Peer),
65    Closed(Cause),
66}
67
68#[derive(Debug, Copy, Clone, Default)]
69enum Peer {
70    #[default]
71    AwaitingHeaders,
72    Streaming,
73}
74
75#[derive(Debug, Clone)]
76enum Cause {
77    EndStream,
78    Error(Error),
79
80    /// This indicates to the connection that a reset frame must be sent out
81    /// once the send queue has been flushed.
82    ///
83    /// Examples of when this could happen:
84    /// - User drops all references to a stream, so we want to CANCEL the it.
85    /// - Header block size was too large, so we want to REFUSE, possibly
86    ///   after sending a 431 response frame.
87    ScheduledLibraryReset(Reason),
88}
89
90impl State {
91    /// Opens the send-half of a stream if it is not already open.
92    pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
93        let local = Streaming;
94
95        self.inner = match self.inner {
96            Idle => {
97                if eos {
98                    HalfClosedLocal(AwaitingHeaders)
99                } else {
100                    Open {
101                        local,
102                        remote: AwaitingHeaders,
103                    }
104                }
105            }
106            Open {
107                local: AwaitingHeaders,
108                remote,
109            } => {
110                if eos {
111                    HalfClosedLocal(remote)
112                } else {
113                    Open { local, remote }
114                }
115            }
116            HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
117                if eos {
118                    Closed(Cause::EndStream)
119                } else {
120                    HalfClosedRemote(local)
121                }
122            }
123            _ => {
124                // All other transitions result in a protocol error
125                return Err(UserError::UnexpectedFrameType);
126            }
127        };
128
129        Ok(())
130    }
131
132    /// Opens the receive-half of the stream when a HEADERS frame is received.
133    ///
134    /// Returns true if this transitions the state to Open.
135    pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
136        let mut initial = false;
137        let eos = frame.is_end_stream();
138
139        self.inner = match self.inner {
140            Idle => {
141                initial = true;
142
143                if eos {
144                    HalfClosedRemote(AwaitingHeaders)
145                } else {
146                    Open {
147                        local: AwaitingHeaders,
148                        remote: if frame.is_informational() {
149                            tracing::trace!("skipping 1xx response headers");
150                            AwaitingHeaders
151                        } else {
152                            Streaming
153                        },
154                    }
155                }
156            }
157            ReservedRemote => {
158                initial = true;
159
160                if eos {
161                    Closed(Cause::EndStream)
162                } else if frame.is_informational() {
163                    tracing::trace!("skipping 1xx response headers");
164                    ReservedRemote
165                } else {
166                    HalfClosedLocal(Streaming)
167                }
168            }
169            Open {
170                local,
171                remote: AwaitingHeaders,
172            } => {
173                if eos {
174                    HalfClosedRemote(local)
175                } else {
176                    Open {
177                        local,
178                        remote: if frame.is_informational() {
179                            tracing::trace!("skipping 1xx response headers");
180                            AwaitingHeaders
181                        } else {
182                            Streaming
183                        },
184                    }
185                }
186            }
187            HalfClosedLocal(AwaitingHeaders) => {
188                if eos {
189                    Closed(Cause::EndStream)
190                } else if frame.is_informational() {
191                    tracing::trace!("skipping 1xx response headers");
192                    HalfClosedLocal(AwaitingHeaders)
193                } else {
194                    HalfClosedLocal(Streaming)
195                }
196            }
197            ref state => {
198                // All other transitions result in a protocol error
199                proto_err!(conn: "recv_open: in unexpected state {:?}", state);
200                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
201            }
202        };
203
204        Ok(initial)
205    }
206
207    /// Transition from Idle -> ReservedRemote
208    pub fn reserve_remote(&mut self) -> Result<(), Error> {
209        match self.inner {
210            Idle => {
211                self.inner = ReservedRemote;
212                Ok(())
213            }
214            ref state => {
215                proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
216                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
217            }
218        }
219    }
220
221    /// Transition from Idle -> ReservedLocal
222    pub fn reserve_local(&mut self) -> Result<(), UserError> {
223        match self.inner {
224            Idle => {
225                self.inner = ReservedLocal;
226                Ok(())
227            }
228            _ => Err(UserError::UnexpectedFrameType),
229        }
230    }
231
232    /// Indicates that the remote side will not send more data to the local.
233    pub fn recv_close(&mut self) -> Result<(), Error> {
234        match self.inner {
235            Open { local, .. } => {
236                // The remote side will continue to receive data.
237                tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
238                self.inner = HalfClosedRemote(local);
239                Ok(())
240            }
241            HalfClosedLocal(..) => {
242                tracing::trace!("recv_close: HalfClosedLocal => Closed");
243                self.inner = Closed(Cause::EndStream);
244                Ok(())
245            }
246            ref state => {
247                proto_err!(conn: "recv_close: in unexpected state {:?}", state);
248                Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
249            }
250        }
251    }
252
253    /// The remote explicitly sent a RST_STREAM.
254    ///
255    /// # Arguments
256    /// - `frame`: the received RST_STREAM frame.
257    /// - `queued`: true if this stream has frames in the pending send queue.
258    pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
259        match self.inner {
260            // If the stream is already in a `Closed` state, do nothing,
261            // provided that there are no frames still in the send queue.
262            Closed(..) if !queued => {}
263            // A notionally `Closed` stream may still have queued frames in
264            // the following cases:
265            //
266            // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
267            //   actually closed the stream yet).
268            // - if the cause is `Cause::EndStream`: we transition to this
269            //   state when an EOS frame is *enqueued* (so that it's invalid
270            //   to enqueue more frames), not when the EOS frame is *sent*;
271            //   therefore, there may still be frames ahead of the EOS frame
272            //   in the send queue.
273            //
274            // In either of these cases, we want to overwrite the stream's
275            // previous state with the received RST_STREAM, so that the queue
276            // will be cleared by `Prioritize::pop_frame`.
277            ref state => {
278                tracing::trace!(
279                    "recv_reset; frame={:?}; state={:?}; queued={:?}",
280                    frame,
281                    state,
282                    queued
283                );
284                self.inner = Closed(Cause::Error(Error::remote_reset(
285                    frame.stream_id(),
286                    frame.reason(),
287                )));
288            }
289        }
290    }
291
292    /// Handle a connection-level error.
293    pub fn handle_error(&mut self, err: &proto::Error) {
294        match self.inner {
295            Closed(..) => {}
296            _ => {
297                tracing::trace!("handle_error; err={:?}", err);
298                self.inner = Closed(Cause::Error(err.clone()));
299            }
300        }
301    }
302
303    pub fn recv_eof(&mut self) {
304        match self.inner {
305            Closed(..) => {}
306            ref state => {
307                tracing::trace!("recv_eof; state={:?}", state);
308                self.inner = Closed(Cause::Error(
309                    io::Error::new(
310                        io::ErrorKind::BrokenPipe,
311                        "stream closed because of a broken pipe",
312                    )
313                    .into(),
314                ));
315            }
316        }
317    }
318
319    /// Indicates that the local side will not send more data to the local.
320    pub fn send_close(&mut self) {
321        match self.inner {
322            Open { remote, .. } => {
323                // The remote side will continue to receive data.
324                tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
325                self.inner = HalfClosedLocal(remote);
326            }
327            HalfClosedRemote(..) => {
328                tracing::trace!("send_close: HalfClosedRemote => Closed");
329                self.inner = Closed(Cause::EndStream);
330            }
331            ref state => panic!("send_close: unexpected state {:?}", state),
332        }
333    }
334
335    /// Set the stream state to reset locally.
336    pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
337        self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
338    }
339
340    /// Set the stream state to a scheduled reset.
341    pub fn set_scheduled_reset(&mut self, reason: Reason) {
342        debug_assert!(!self.is_closed());
343        self.inner = Closed(Cause::ScheduledLibraryReset(reason));
344    }
345
346    pub fn get_scheduled_reset(&self) -> Option<Reason> {
347        match self.inner {
348            Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
349            _ => None,
350        }
351    }
352
353    pub fn is_scheduled_reset(&self) -> bool {
354        matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
355    }
356
357    pub fn is_local_error(&self) -> bool {
358        match self.inner {
359            Closed(Cause::Error(ref e)) => e.is_local(),
360            Closed(Cause::ScheduledLibraryReset(..)) => true,
361            _ => false,
362        }
363    }
364
365    pub fn is_remote_reset(&self) -> bool {
366        matches!(
367            self.inner,
368            Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
369        )
370    }
371
372    /// Returns true if the stream is already reset.
373    pub fn is_reset(&self) -> bool {
374        match self.inner {
375            Closed(Cause::EndStream) => false,
376            Closed(_) => true,
377            _ => false,
378        }
379    }
380
381    pub fn is_send_streaming(&self) -> bool {
382        matches!(
383            self.inner,
384            Open {
385                local: Streaming,
386                ..
387            } | HalfClosedRemote(Streaming)
388        )
389    }
390
391    /// Returns true when the stream is in a state to receive headers
392    pub fn is_recv_headers(&self) -> bool {
393        matches!(
394            self.inner,
395            Idle | Open {
396                remote: AwaitingHeaders,
397                ..
398            } | HalfClosedLocal(AwaitingHeaders)
399                | ReservedRemote
400        )
401    }
402
403    pub fn is_recv_streaming(&self) -> bool {
404        matches!(
405            self.inner,
406            Open {
407                remote: Streaming,
408                ..
409            } | HalfClosedLocal(Streaming)
410        )
411    }
412
413    pub fn is_recv_end_stream(&self) -> bool {
414        // In either case END_STREAM has been received
415        matches!(self.inner, Closed(Cause::EndStream) | HalfClosedRemote(..))
416    }
417
418    pub fn is_closed(&self) -> bool {
419        matches!(self.inner, Closed(_))
420    }
421
422    pub fn is_send_closed(&self) -> bool {
423        matches!(
424            self.inner,
425            Closed(..) | HalfClosedLocal(..) | ReservedRemote
426        )
427    }
428
429    pub fn is_idle(&self) -> bool {
430        matches!(self.inner, Idle)
431    }
432
433    pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
434        // TODO: Is this correct?
435        match self.inner {
436            Closed(Cause::Error(ref e)) => Err(e.clone()),
437            Closed(Cause::ScheduledLibraryReset(reason)) => {
438                Err(proto::Error::library_go_away(reason))
439            }
440            Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
441            _ => Ok(true),
442        }
443    }
444
445    /// Returns a reason if the stream has been reset.
446    pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
447        match self.inner {
448            Closed(Cause::Error(Error::Reset(_, reason, _)))
449            | Closed(Cause::Error(Error::GoAway(_, reason, _)))
450            | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
451            Closed(Cause::Error(ref e)) => Err(e.clone().into()),
452            Open {
453                local: Streaming, ..
454            }
455            | HalfClosedRemote(Streaming) => match mode {
456                PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
457                PollReset::Streaming => Ok(None),
458            },
459            _ => Ok(None),
460        }
461    }
462}
463
464impl Default for State {
465    fn default() -> State {
466        State { inner: Inner::Idle }
467    }
468}
469
470// remove some noise for debug output
471impl fmt::Debug for State {
472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473        self.inner.fmt(f)
474    }
475}