h2/proto/streams/state.rs
1use std::fmt;
2use std::io;
3
4use crate::codec::UserError;
5use crate::frame::{self, Reason, StreamId};
6use crate::proto::{self, Error, Initiator, PollReset};
7
8use self::Inner::*;
9use self::Peer::*;
10
11/// Represents the state of an H2 stream
12///
13/// ```not_rust
14/// +--------+
15/// send PP | | recv PP
16/// ,--------| idle |--------.
17/// / | | \
18/// v +--------+ v
19/// +----------+ | +----------+
20/// | | | send H / | |
21/// ,------| reserved | | recv H | reserved |------.
22/// | | (local) | | | (remote) | |
23/// | +----------+ v +----------+ |
24/// | | +--------+ | |
25/// | | recv ES | | send ES | |
26/// | send H | ,-------| open |-------. | recv H |
27/// | | / | | \ | |
28/// | v v +--------+ v v |
29/// | +----------+ | +----------+ |
30/// | | half | | | half | |
31/// | | closed | | send R / | closed | |
32/// | | (remote) | | recv R | (local) | |
33/// | +----------+ | +----------+ |
34/// | | | | |
35/// | | send ES / | recv ES / | |
36/// | | send R / v send R / | |
37/// | | recv R +--------+ recv R | |
38/// | send R / `----------->| |<-----------' send R / |
39/// | recv R | closed | recv R |
40/// `----------------------->| |<----------------------'
41/// +--------+
42///
43/// send: endpoint sends this frame
44/// recv: endpoint receives this frame
45///
46/// H: HEADERS frame (with implied CONTINUATIONs)
47/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
48/// ES: END_STREAM flag
49/// R: RST_STREAM frame
50/// ```
51#[derive(Clone)]
52pub struct State {
53 inner: Inner,
54}
55
56#[derive(Debug, Clone)]
57enum Inner {
58 Idle,
59 // TODO: these states shouldn't count against concurrency limits:
60 ReservedLocal,
61 ReservedRemote,
62 Open { local: Peer, remote: Peer },
63 HalfClosedLocal(Peer), // TODO: explicitly name this value
64 HalfClosedRemote(Peer),
65 Closed(Cause),
66}
67
68#[derive(Debug, Copy, Clone, Default)]
69enum Peer {
70 #[default]
71 AwaitingHeaders,
72 Streaming,
73}
74
75#[derive(Debug, Clone)]
76enum Cause {
77 EndStream,
78 Error(Error),
79
80 /// This indicates to the connection that a reset frame must be sent out
81 /// once the send queue has been flushed.
82 ///
83 /// Examples of when this could happen:
84 /// - User drops all references to a stream, so we want to CANCEL the it.
85 /// - Header block size was too large, so we want to REFUSE, possibly
86 /// after sending a 431 response frame.
87 ScheduledLibraryReset(Reason),
88}
89
90impl State {
91 /// Opens the send-half of a stream if it is not already open.
92 pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
93 let local = Streaming;
94
95 self.inner = match self.inner {
96 Idle => {
97 if eos {
98 HalfClosedLocal(AwaitingHeaders)
99 } else {
100 Open {
101 local,
102 remote: AwaitingHeaders,
103 }
104 }
105 }
106 Open {
107 local: AwaitingHeaders,
108 remote,
109 } => {
110 if eos {
111 HalfClosedLocal(remote)
112 } else {
113 Open { local, remote }
114 }
115 }
116 HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
117 if eos {
118 Closed(Cause::EndStream)
119 } else {
120 HalfClosedRemote(local)
121 }
122 }
123 _ => {
124 // All other transitions result in a protocol error
125 return Err(UserError::UnexpectedFrameType);
126 }
127 };
128
129 Ok(())
130 }
131
132 /// Opens the receive-half of the stream when a HEADERS frame is received.
133 ///
134 /// Returns true if this transitions the state to Open.
135 pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
136 let mut initial = false;
137 let eos = frame.is_end_stream();
138
139 self.inner = match self.inner {
140 Idle => {
141 initial = true;
142
143 if eos {
144 HalfClosedRemote(AwaitingHeaders)
145 } else {
146 Open {
147 local: AwaitingHeaders,
148 remote: if frame.is_informational() {
149 tracing::trace!("skipping 1xx response headers");
150 AwaitingHeaders
151 } else {
152 Streaming
153 },
154 }
155 }
156 }
157 ReservedRemote => {
158 initial = true;
159
160 if eos {
161 Closed(Cause::EndStream)
162 } else if frame.is_informational() {
163 tracing::trace!("skipping 1xx response headers");
164 ReservedRemote
165 } else {
166 HalfClosedLocal(Streaming)
167 }
168 }
169 Open {
170 local,
171 remote: AwaitingHeaders,
172 } => {
173 if eos {
174 HalfClosedRemote(local)
175 } else {
176 Open {
177 local,
178 remote: if frame.is_informational() {
179 tracing::trace!("skipping 1xx response headers");
180 AwaitingHeaders
181 } else {
182 Streaming
183 },
184 }
185 }
186 }
187 HalfClosedLocal(AwaitingHeaders) => {
188 if eos {
189 Closed(Cause::EndStream)
190 } else if frame.is_informational() {
191 tracing::trace!("skipping 1xx response headers");
192 HalfClosedLocal(AwaitingHeaders)
193 } else {
194 HalfClosedLocal(Streaming)
195 }
196 }
197 ref state => {
198 // All other transitions result in a protocol error
199 proto_err!(conn: "recv_open: in unexpected state {:?}", state);
200 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
201 }
202 };
203
204 Ok(initial)
205 }
206
207 /// Transition from Idle -> ReservedRemote
208 pub fn reserve_remote(&mut self) -> Result<(), Error> {
209 match self.inner {
210 Idle => {
211 self.inner = ReservedRemote;
212 Ok(())
213 }
214 ref state => {
215 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
216 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
217 }
218 }
219 }
220
221 /// Transition from Idle -> ReservedLocal
222 pub fn reserve_local(&mut self) -> Result<(), UserError> {
223 match self.inner {
224 Idle => {
225 self.inner = ReservedLocal;
226 Ok(())
227 }
228 _ => Err(UserError::UnexpectedFrameType),
229 }
230 }
231
232 /// Indicates that the remote side will not send more data to the local.
233 pub fn recv_close(&mut self) -> Result<(), Error> {
234 match self.inner {
235 Open { local, .. } => {
236 // The remote side will continue to receive data.
237 tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
238 self.inner = HalfClosedRemote(local);
239 Ok(())
240 }
241 HalfClosedLocal(..) => {
242 tracing::trace!("recv_close: HalfClosedLocal => Closed");
243 self.inner = Closed(Cause::EndStream);
244 Ok(())
245 }
246 ref state => {
247 proto_err!(conn: "recv_close: in unexpected state {:?}", state);
248 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
249 }
250 }
251 }
252
253 /// The remote explicitly sent a RST_STREAM.
254 ///
255 /// # Arguments
256 /// - `frame`: the received RST_STREAM frame.
257 /// - `queued`: true if this stream has frames in the pending send queue.
258 pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
259 match self.inner {
260 // If the stream is already in a `Closed` state, do nothing,
261 // provided that there are no frames still in the send queue.
262 Closed(..) if !queued => {}
263 // A notionally `Closed` stream may still have queued frames in
264 // the following cases:
265 //
266 // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
267 // actually closed the stream yet).
268 // - if the cause is `Cause::EndStream`: we transition to this
269 // state when an EOS frame is *enqueued* (so that it's invalid
270 // to enqueue more frames), not when the EOS frame is *sent*;
271 // therefore, there may still be frames ahead of the EOS frame
272 // in the send queue.
273 //
274 // In either of these cases, we want to overwrite the stream's
275 // previous state with the received RST_STREAM, so that the queue
276 // will be cleared by `Prioritize::pop_frame`.
277 ref state => {
278 tracing::trace!(
279 "recv_reset; frame={:?}; state={:?}; queued={:?}",
280 frame,
281 state,
282 queued
283 );
284 self.inner = Closed(Cause::Error(Error::remote_reset(
285 frame.stream_id(),
286 frame.reason(),
287 )));
288 }
289 }
290 }
291
292 /// Handle a connection-level error.
293 pub fn handle_error(&mut self, err: &proto::Error) {
294 match self.inner {
295 Closed(..) => {}
296 _ => {
297 tracing::trace!("handle_error; err={:?}", err);
298 self.inner = Closed(Cause::Error(err.clone()));
299 }
300 }
301 }
302
303 pub fn recv_eof(&mut self) {
304 match self.inner {
305 Closed(..) => {}
306 ref state => {
307 tracing::trace!("recv_eof; state={:?}", state);
308 self.inner = Closed(Cause::Error(
309 io::Error::new(
310 io::ErrorKind::BrokenPipe,
311 "stream closed because of a broken pipe",
312 )
313 .into(),
314 ));
315 }
316 }
317 }
318
319 /// Indicates that the local side will not send more data to the local.
320 pub fn send_close(&mut self) {
321 match self.inner {
322 Open { remote, .. } => {
323 // The remote side will continue to receive data.
324 tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
325 self.inner = HalfClosedLocal(remote);
326 }
327 HalfClosedRemote(..) => {
328 tracing::trace!("send_close: HalfClosedRemote => Closed");
329 self.inner = Closed(Cause::EndStream);
330 }
331 ref state => panic!("send_close: unexpected state {:?}", state),
332 }
333 }
334
335 /// Set the stream state to reset locally.
336 pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
337 self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
338 }
339
340 /// Set the stream state to a scheduled reset.
341 pub fn set_scheduled_reset(&mut self, reason: Reason) {
342 debug_assert!(!self.is_closed());
343 self.inner = Closed(Cause::ScheduledLibraryReset(reason));
344 }
345
346 pub fn get_scheduled_reset(&self) -> Option<Reason> {
347 match self.inner {
348 Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
349 _ => None,
350 }
351 }
352
353 pub fn is_scheduled_reset(&self) -> bool {
354 matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
355 }
356
357 pub fn is_local_error(&self) -> bool {
358 match self.inner {
359 Closed(Cause::Error(ref e)) => e.is_local(),
360 Closed(Cause::ScheduledLibraryReset(..)) => true,
361 _ => false,
362 }
363 }
364
365 pub fn is_remote_reset(&self) -> bool {
366 matches!(
367 self.inner,
368 Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
369 )
370 }
371
372 /// Returns true if the stream is already reset.
373 pub fn is_reset(&self) -> bool {
374 match self.inner {
375 Closed(Cause::EndStream) => false,
376 Closed(_) => true,
377 _ => false,
378 }
379 }
380
381 pub fn is_send_streaming(&self) -> bool {
382 matches!(
383 self.inner,
384 Open {
385 local: Streaming,
386 ..
387 } | HalfClosedRemote(Streaming)
388 )
389 }
390
391 /// Returns true when the stream is in a state to receive headers
392 pub fn is_recv_headers(&self) -> bool {
393 matches!(
394 self.inner,
395 Idle | Open {
396 remote: AwaitingHeaders,
397 ..
398 } | HalfClosedLocal(AwaitingHeaders)
399 | ReservedRemote
400 )
401 }
402
403 pub fn is_recv_streaming(&self) -> bool {
404 matches!(
405 self.inner,
406 Open {
407 remote: Streaming,
408 ..
409 } | HalfClosedLocal(Streaming)
410 )
411 }
412
413 pub fn is_recv_end_stream(&self) -> bool {
414 // In either case END_STREAM has been received
415 matches!(self.inner, Closed(Cause::EndStream) | HalfClosedRemote(..))
416 }
417
418 pub fn is_closed(&self) -> bool {
419 matches!(self.inner, Closed(_))
420 }
421
422 pub fn is_send_closed(&self) -> bool {
423 matches!(
424 self.inner,
425 Closed(..) | HalfClosedLocal(..) | ReservedRemote
426 )
427 }
428
429 pub fn is_idle(&self) -> bool {
430 matches!(self.inner, Idle)
431 }
432
433 pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
434 // TODO: Is this correct?
435 match self.inner {
436 Closed(Cause::Error(ref e)) => Err(e.clone()),
437 Closed(Cause::ScheduledLibraryReset(reason)) => {
438 Err(proto::Error::library_go_away(reason))
439 }
440 Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
441 _ => Ok(true),
442 }
443 }
444
445 /// Returns a reason if the stream has been reset.
446 pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
447 match self.inner {
448 Closed(Cause::Error(Error::Reset(_, reason, _)))
449 | Closed(Cause::Error(Error::GoAway(_, reason, _)))
450 | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
451 Closed(Cause::Error(ref e)) => Err(e.clone().into()),
452 Open {
453 local: Streaming, ..
454 }
455 | HalfClosedRemote(Streaming) => match mode {
456 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
457 PollReset::Streaming => Ok(None),
458 },
459 _ => Ok(None),
460 }
461 }
462}
463
464impl Default for State {
465 fn default() -> State {
466 State { inner: Inner::Idle }
467 }
468}
469
470// remove some noise for debug output
471impl fmt::Debug for State {
472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473 self.inner.fmt(f)
474 }
475}