h2/proto/streams/
recv.rs

1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15    /// Initial window size of remote initiated streams
16    init_window_sz: WindowSize,
17
18    /// Connection level flow control governing received data
19    flow: FlowControl,
20
21    /// Amount of connection window capacity currently used by outstanding streams.
22    in_flight_data: WindowSize,
23
24    /// The lowest stream ID that is still idle
25    next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27    /// The stream ID of the last processed stream
28    last_processed_id: StreamId,
29
30    /// Any streams with a higher ID are ignored.
31    ///
32    /// This starts as MAX, but is lowered when a GOAWAY is received.
33    ///
34    /// > After sending a GOAWAY frame, the sender can discard frames for
35    /// > streams initiated by the receiver with identifiers higher than
36    /// > the identified last stream.
37    max_stream_id: StreamId,
38
39    /// Streams that have pending window updates
40    pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42    /// New streams to be accepted
43    pending_accept: store::Queue<stream::NextAccept>,
44
45    /// Locally reset streams that should be reaped when they expire
46    pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48    /// How long locally reset streams should ignore received frames
49    reset_duration: Duration,
50
51    /// Holds frames that are waiting to be read
52    buffer: Buffer<Event>,
53
54    /// Refused StreamId, this represents a frame that must be sent out.
55    refused: Option<StreamId>,
56
57    /// If push promises are allowed to be received.
58    is_push_enabled: bool,
59
60    /// If extended connect protocol is enabled.
61    is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66    Headers(peer::PollMessage),
67    Data(Bytes),
68    Trailers(HeaderMap),
69}
70
71#[derive(Debug)]
72pub(super) enum RecvHeaderBlockError<T> {
73    Oversize(T),
74    State(Error),
75}
76
77#[derive(Debug)]
78pub(crate) enum Open {
79    PushPromise,
80    Headers,
81}
82
83impl Recv {
84    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85        let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87        let mut flow = FlowControl::new();
88
89        // connections always have the default window size, regardless of
90        // settings
91        flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92            .expect("invalid initial remote window size");
93        flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
95        Recv {
96            init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97            flow,
98            in_flight_data: 0 as WindowSize,
99            next_stream_id: Ok(next_stream_id.into()),
100            pending_window_updates: store::Queue::new(),
101            last_processed_id: StreamId::ZERO,
102            max_stream_id: StreamId::MAX,
103            pending_accept: store::Queue::new(),
104            pending_reset_expired: store::Queue::new(),
105            reset_duration: config.local_reset_duration,
106            buffer: Buffer::new(),
107            refused: None,
108            is_push_enabled: config.local_push_enabled,
109            is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110        }
111    }
112
113    /// Returns the initial receive window size
114    pub fn init_window_sz(&self) -> WindowSize {
115        self.init_window_sz
116    }
117
118    /// Returns the ID of the last processed stream
119    pub fn last_processed_id(&self) -> StreamId {
120        self.last_processed_id
121    }
122
123    /// Update state reflecting a new, remotely opened stream
124    ///
125    /// Returns the stream state if successful. `None` if refused
126    pub fn open(
127        &mut self,
128        id: StreamId,
129        mode: Open,
130        counts: &mut Counts,
131    ) -> Result<Option<StreamId>, Error> {
132        assert!(self.refused.is_none());
133
134        counts.peer().ensure_can_open(id, mode)?;
135
136        let next_id = self.next_stream_id()?;
137        if id < next_id {
138            proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140        }
141
142        self.next_stream_id = id.next_id();
143
144        if !counts.can_inc_num_recv_streams() {
145            self.refused = Some(id);
146            return Ok(None);
147        }
148
149        Ok(Some(id))
150    }
151
152    /// Transition the stream state based on receiving headers
153    ///
154    /// The caller ensures that the frame represents headers and not trailers.
155    pub fn recv_headers(
156        &mut self,
157        frame: frame::Headers,
158        stream: &mut store::Ptr,
159        counts: &mut Counts,
160    ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161        tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162        let is_initial = stream.state.recv_open(&frame)?;
163
164        if is_initial {
165            // TODO: be smarter about this logic
166            if frame.stream_id() > self.last_processed_id {
167                self.last_processed_id = frame.stream_id();
168            }
169
170            // Increment the number of concurrent streams
171            counts.inc_num_recv_streams(stream);
172        }
173
174        if !stream.content_length.is_head() {
175            use super::stream::ContentLength;
176            use http::header;
177
178            if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179                let content_length = match frame::parse_u64(content_length.as_bytes()) {
180                    Ok(v) => v,
181                    Err(_) => {
182                        proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183                        return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184                    }
185                };
186
187                stream.content_length = ContentLength::Remaining(content_length);
188                // END_STREAM on headers frame with non-zero content-length is malformed.
189                // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
190                if frame.is_end_stream()
191                    && content_length > 0
192                    && frame
193                        .pseudo()
194                        .status
195                        .map_or(true, |status| status != 204 && status != 304)
196                {
197                    proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
198                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
199                }
200            }
201        }
202
203        if frame.is_over_size() {
204            // A frame is over size if the decoded header block was bigger than
205            // SETTINGS_MAX_HEADER_LIST_SIZE.
206            //
207            // > A server that receives a larger header block than it is willing
208            // > to handle can send an HTTP 431 (Request Header Fields Too
209            // > Large) status code [RFC6585]. A client can discard responses
210            // > that it cannot process.
211            //
212            // So, if peer is a server, we'll send a 431. In either case,
213            // an error is recorded, which will send a REFUSED_STREAM,
214            // since we don't want any of the data frames either.
215            tracing::debug!(
216                "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
217                 recv_headers: frame is over size; stream={:?}",
218                stream.id
219            );
220            return if counts.peer().is_server() && is_initial {
221                let mut res = frame::Headers::new(
222                    stream.id,
223                    frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
224                    HeaderMap::new(),
225                );
226                res.set_end_stream();
227                Err(RecvHeaderBlockError::Oversize(Some(res)))
228            } else {
229                Err(RecvHeaderBlockError::Oversize(None))
230            };
231        }
232
233        let stream_id = frame.stream_id();
234        let (pseudo, fields) = frame.into_parts();
235
236        if pseudo.protocol.is_some()
237            && counts.peer().is_server()
238            && !self.is_extended_connect_protocol_enabled
239        {
240            proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
241            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
242        }
243
244        if pseudo.status.is_some() && counts.peer().is_server() {
245            proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
246            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
247        }
248
249        if !pseudo.is_informational() {
250            let message = counts
251                .peer()
252                .convert_poll_message(pseudo, fields, stream_id)?;
253
254            // Push the frame onto the stream's recv buffer
255            stream
256                .pending_recv
257                .push_back(&mut self.buffer, Event::Headers(message));
258            stream.notify_recv();
259
260            // Only servers can receive a headers frame that initiates the stream.
261            // This is verified in `Streams` before calling this function.
262            if counts.peer().is_server() {
263                // Correctness: never push a stream to `pending_accept` without having the
264                // corresponding headers frame pushed to `stream.pending_recv`.
265                self.pending_accept.push(stream);
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Called by the server to get the request
273    ///
274    /// # Panics
275    ///
276    /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
277    ///
278    pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
279        use super::peer::PollMessage::*;
280
281        match stream.pending_recv.pop_front(&mut self.buffer) {
282            Some(Event::Headers(Server(request))) => request,
283            _ => unreachable!("server stream queue must start with Headers"),
284        }
285    }
286
287    /// Called by the client to get pushed response
288    pub fn poll_pushed(
289        &mut self,
290        cx: &Context,
291        stream: &mut store::Ptr,
292    ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
293        use super::peer::PollMessage::*;
294
295        let mut ppp = stream.pending_push_promises.take();
296        let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
297            match pushed.pending_recv.pop_front(&mut self.buffer) {
298                Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
299                // When frames are pushed into the queue, it is verified that
300                // the first frame is a HEADERS frame.
301                _ => panic!("Headers not set on pushed stream"),
302            }
303        });
304        stream.pending_push_promises = ppp;
305        if let Some(p) = pushed {
306            Poll::Ready(Some(Ok(p)))
307        } else {
308            let is_open = stream.state.ensure_recv_open()?;
309
310            if is_open {
311                stream.push_task = Some(cx.waker().clone());
312                Poll::Pending
313            } else {
314                Poll::Ready(None)
315            }
316        }
317    }
318
319    /// Called by the client to get the response
320    pub fn poll_response(
321        &mut self,
322        cx: &Context,
323        stream: &mut store::Ptr,
324    ) -> Poll<Result<Response<()>, proto::Error>> {
325        use super::peer::PollMessage::*;
326
327        // If the buffer is not empty, then the first frame must be a HEADERS
328        // frame or the user violated the contract.
329        match stream.pending_recv.pop_front(&mut self.buffer) {
330            Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
331            Some(_) => panic!("poll_response called after response returned"),
332            None => {
333                if !stream.state.ensure_recv_open()? {
334                    proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
335                    return Poll::Ready(Err(Error::library_reset(
336                        stream.id,
337                        Reason::PROTOCOL_ERROR,
338                    )));
339                }
340
341                stream.recv_task = Some(cx.waker().clone());
342                Poll::Pending
343            }
344        }
345    }
346
347    /// Transition the stream based on receiving trailers
348    pub fn recv_trailers(
349        &mut self,
350        frame: frame::Headers,
351        stream: &mut store::Ptr,
352    ) -> Result<(), Error> {
353        // Transition the state
354        stream.state.recv_close()?;
355
356        if stream.ensure_content_length_zero().is_err() {
357            proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
358            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
359        }
360
361        let trailers = frame.into_fields();
362
363        // Push the frame onto the stream's recv buffer
364        stream
365            .pending_recv
366            .push_back(&mut self.buffer, Event::Trailers(trailers));
367        stream.notify_recv();
368
369        Ok(())
370    }
371
372    /// Releases capacity of the connection
373    pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
374        tracing::trace!(
375            "release_connection_capacity; size={}, connection in_flight_data={}",
376            capacity,
377            self.in_flight_data,
378        );
379
380        // Decrement in-flight data
381        self.in_flight_data -= capacity;
382
383        // Assign capacity to connection
384        // TODO: proper error handling
385        let _res = self.flow.assign_capacity(capacity);
386        debug_assert!(_res.is_ok());
387
388        if self.flow.unclaimed_capacity().is_some() {
389            if let Some(task) = task.take() {
390                task.wake();
391            }
392        }
393    }
394
395    /// Releases capacity back to the connection & stream
396    pub fn release_capacity(
397        &mut self,
398        capacity: WindowSize,
399        stream: &mut store::Ptr,
400        task: &mut Option<Waker>,
401    ) -> Result<(), UserError> {
402        tracing::trace!("release_capacity; size={}", capacity);
403
404        if capacity > stream.in_flight_recv_data {
405            return Err(UserError::ReleaseCapacityTooBig);
406        }
407
408        self.release_connection_capacity(capacity, task);
409
410        // Decrement in-flight data
411        stream.in_flight_recv_data -= capacity;
412
413        // Assign capacity to stream
414        // TODO: proper error handling
415        let _res = stream.recv_flow.assign_capacity(capacity);
416        debug_assert!(_res.is_ok());
417
418        if stream.recv_flow.unclaimed_capacity().is_some() {
419            // Queue the stream for sending the WINDOW_UPDATE frame.
420            self.pending_window_updates.push(stream);
421
422            if let Some(task) = task.take() {
423                task.wake();
424            }
425        }
426
427        Ok(())
428    }
429
430    /// Release any unclaimed capacity for a closed stream.
431    pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
432        debug_assert_eq!(stream.ref_count, 0);
433
434        if stream.in_flight_recv_data == 0 {
435            return;
436        }
437
438        tracing::trace!(
439            "auto-release closed stream ({:?}) capacity: {:?}",
440            stream.id,
441            stream.in_flight_recv_data,
442        );
443
444        self.release_connection_capacity(stream.in_flight_recv_data, task);
445        stream.in_flight_recv_data = 0;
446
447        self.clear_recv_buffer(stream);
448    }
449
450    /// Set the "target" connection window size.
451    ///
452    /// By default, all new connections start with 64kb of window size. As
453    /// streams used and release capacity, we will send WINDOW_UPDATEs for the
454    /// connection to bring it back up to the initial "target".
455    ///
456    /// Setting a target means that we will try to tell the peer about
457    /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
458    /// for the whole connection.
459    ///
460    /// The `task` is an optional parked task for the `Connection` that might
461    /// be blocked on needing more window capacity.
462    pub fn set_target_connection_window(
463        &mut self,
464        target: WindowSize,
465        task: &mut Option<Waker>,
466    ) -> Result<(), Reason> {
467        tracing::trace!(
468            "set_target_connection_window; target={}; available={}, reserved={}",
469            target,
470            self.flow.available(),
471            self.in_flight_data,
472        );
473
474        // The current target connection window is our `available` plus any
475        // in-flight data reserved by streams.
476        //
477        // Update the flow controller with the difference between the new
478        // target and the current target.
479        let current = self
480            .flow
481            .available()
482            .add(self.in_flight_data)?
483            .checked_size();
484        if target > current {
485            self.flow.assign_capacity(target - current)?;
486        } else {
487            self.flow.claim_capacity(current - target)?;
488        }
489
490        // If changing the target capacity means we gained a bunch of capacity,
491        // enough that we went over the update threshold, then schedule sending
492        // a connection WINDOW_UPDATE.
493        if self.flow.unclaimed_capacity().is_some() {
494            if let Some(task) = task.take() {
495                task.wake();
496            }
497        }
498        Ok(())
499    }
500
501    pub(crate) fn apply_local_settings(
502        &mut self,
503        settings: &frame::Settings,
504        store: &mut Store,
505    ) -> Result<(), proto::Error> {
506        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
507            self.is_extended_connect_protocol_enabled = val;
508        }
509
510        if let Some(target) = settings.initial_window_size() {
511            let old_sz = self.init_window_sz;
512            self.init_window_sz = target;
513
514            tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
515
516            // Per RFC 7540 §6.9.2:
517            //
518            // In addition to changing the flow-control window for streams that are
519            // not yet active, a SETTINGS frame can alter the initial flow-control
520            // window size for streams with active flow-control windows (that is,
521            // streams in the "open" or "half-closed (remote)" state). When the
522            // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
523            // the size of all stream flow-control windows that it maintains by the
524            // difference between the new value and the old value.
525            //
526            // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
527            // space in a flow-control window to become negative. A sender MUST
528            // track the negative flow-control window and MUST NOT send new
529            // flow-controlled frames until it receives WINDOW_UPDATE frames that
530            // cause the flow-control window to become positive.
531
532            match target.cmp(&old_sz) {
533                Ordering::Less => {
534                    // We must decrease the (local) window on every open stream.
535                    let dec = old_sz - target;
536                    tracing::trace!("decrementing all windows; dec={}", dec);
537
538                    store.try_for_each(|mut stream| {
539                        stream
540                            .recv_flow
541                            .dec_recv_window(dec)
542                            .map_err(proto::Error::library_go_away)?;
543                        Ok::<_, proto::Error>(())
544                    })?;
545                }
546                Ordering::Greater => {
547                    // We must increase the (local) window on every open stream.
548                    let inc = target - old_sz;
549                    tracing::trace!("incrementing all windows; inc={}", inc);
550                    store.try_for_each(|mut stream| {
551                        // XXX: Shouldn't the peer have already noticed our
552                        // overflow and sent us a GOAWAY?
553                        stream
554                            .recv_flow
555                            .inc_window(inc)
556                            .map_err(proto::Error::library_go_away)?;
557                        stream
558                            .recv_flow
559                            .assign_capacity(inc)
560                            .map_err(proto::Error::library_go_away)?;
561                        Ok::<_, proto::Error>(())
562                    })?;
563                }
564                Ordering::Equal => (),
565            }
566        }
567
568        Ok(())
569    }
570
571    pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
572        if !stream.state.is_recv_end_stream() {
573            return false;
574        }
575
576        stream.pending_recv.is_empty()
577    }
578
579    pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
580        let sz = frame.payload().len();
581
582        // This should have been enforced at the codec::FramedRead layer, so
583        // this is just a sanity check.
584        assert!(sz <= MAX_WINDOW_SIZE as usize);
585
586        let sz = sz as WindowSize;
587
588        let is_ignoring_frame = stream.state.is_local_error();
589
590        if !is_ignoring_frame && !stream.state.is_recv_streaming() {
591            // TODO: There are cases where this can be a stream error of
592            // STREAM_CLOSED instead...
593
594            // Receiving a DATA frame when not expecting one is a protocol
595            // error.
596            proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
597            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
598        }
599
600        tracing::trace!(
601            "recv_data; size={}; connection={}; stream={}",
602            sz,
603            self.flow.window_size(),
604            stream.recv_flow.window_size()
605        );
606
607        if is_ignoring_frame {
608            tracing::trace!(
609                "recv_data; frame ignored on locally reset {:?} for some time",
610                stream.id,
611            );
612            return self.ignore_data(sz);
613        }
614
615        // Ensure that there is enough capacity on the connection before acting
616        // on the stream.
617        self.consume_connection_window(sz)?;
618
619        if stream.recv_flow.window_size() < sz {
620            // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
621            // > A receiver MAY respond with a stream error (Section 5.4.2) or
622            // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
623            // > it is unable to accept a frame.
624            //
625            // So, for violating the **stream** window, we can send either a
626            // stream or connection error. We've opted to send a stream
627            // error.
628            return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
629        }
630
631        if stream.dec_content_length(frame.payload().len()).is_err() {
632            proto_err!(stream:
633                "recv_data: content-length overflow; stream={:?}; len={:?}",
634                stream.id,
635                frame.payload().len(),
636            );
637            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
638        }
639
640        if frame.is_end_stream() {
641            if stream.ensure_content_length_zero().is_err() {
642                proto_err!(stream:
643                    "recv_data: content-length underflow; stream={:?}; len={:?}",
644                    stream.id,
645                    frame.payload().len(),
646                );
647                return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
648            }
649
650            if stream.state.recv_close().is_err() {
651                proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
652                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
653            }
654        }
655
656        // Received a frame, but no one cared about it. fix issue#648
657        if !stream.is_recv {
658            tracing::trace!(
659                "recv_data; frame ignored on stream release {:?} for some time",
660                stream.id,
661            );
662            self.release_connection_capacity(sz, &mut None);
663            return Ok(());
664        }
665
666        // Update stream level flow control
667        stream
668            .recv_flow
669            .send_data(sz)
670            .map_err(proto::Error::library_go_away)?;
671
672        // Track the data as in-flight
673        stream.in_flight_recv_data += sz;
674
675        let event = Event::Data(frame.into_payload());
676
677        // Push the frame onto the recv buffer
678        stream.pending_recv.push_back(&mut self.buffer, event);
679        stream.notify_recv();
680
681        Ok(())
682    }
683
684    pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
685        // Ensure that there is enough capacity on the connection...
686        self.consume_connection_window(sz)?;
687
688        // Since we are ignoring this frame,
689        // we aren't returning the frame to the user. That means they
690        // have no way to release the capacity back to the connection. So
691        // we have to release it automatically.
692        //
693        // This call doesn't send a WINDOW_UPDATE immediately, just marks
694        // the capacity as available to be reclaimed. When the available
695        // capacity meets a threshold, a WINDOW_UPDATE is then sent.
696        self.release_connection_capacity(sz, &mut None);
697        Ok(())
698    }
699
700    pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
701        if self.flow.window_size() < sz {
702            tracing::debug!(
703                "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
704                self.flow.window_size(),
705                sz,
706            );
707            return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
708        }
709
710        // Update connection level flow control
711        self.flow.send_data(sz).map_err(Error::library_go_away)?;
712
713        // Track the data as in-flight
714        self.in_flight_data += sz;
715        Ok(())
716    }
717
718    pub fn recv_push_promise(
719        &mut self,
720        frame: frame::PushPromise,
721        stream: &mut store::Ptr,
722    ) -> Result<(), Error> {
723        stream.state.reserve_remote()?;
724        if frame.is_over_size() {
725            // A frame is over size if the decoded header block was bigger than
726            // SETTINGS_MAX_HEADER_LIST_SIZE.
727            //
728            // > A server that receives a larger header block than it is willing
729            // > to handle can send an HTTP 431 (Request Header Fields Too
730            // > Large) status code [RFC6585]. A client can discard responses
731            // > that it cannot process.
732            //
733            // So, if peer is a server, we'll send a 431. In either case,
734            // an error is recorded, which will send a PROTOCOL_ERROR,
735            // since we don't want any of the data frames either.
736            tracing::debug!(
737                "stream error PROTOCOL_ERROR -- recv_push_promise: \
738                 headers frame is over size; promised_id={:?};",
739                frame.promised_id(),
740            );
741            return Err(Error::library_reset(
742                frame.promised_id(),
743                Reason::PROTOCOL_ERROR,
744            ));
745        }
746
747        let promised_id = frame.promised_id();
748        let (pseudo, fields) = frame.into_parts();
749        let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
750
751        if let Err(e) = frame::PushPromise::validate_request(&req) {
752            use PushPromiseHeaderError::*;
753            match e {
754                NotSafeAndCacheable => proto_err!(
755                    stream:
756                    "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
757                    req.method(),
758                    promised_id,
759                ),
760                InvalidContentLength(e) => proto_err!(
761                    stream:
762                    "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
763                    e,
764                    promised_id,
765                ),
766            }
767            return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
768        }
769
770        use super::peer::PollMessage::*;
771        stream
772            .pending_recv
773            .push_back(&mut self.buffer, Event::Headers(Server(req)));
774        stream.notify_recv();
775        stream.notify_push();
776        Ok(())
777    }
778
779    /// Ensures that `id` is not in the `Idle` state.
780    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
781        if let Ok(next) = self.next_stream_id {
782            if id >= next {
783                tracing::debug!(
784                    "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
785                    id
786                );
787                return Err(Reason::PROTOCOL_ERROR);
788            }
789        }
790        // if next_stream_id is overflowed, that's ok.
791
792        Ok(())
793    }
794
795    /// Handle remote sending an explicit RST_STREAM.
796    pub fn recv_reset(
797        &mut self,
798        frame: frame::Reset,
799        stream: &mut Stream,
800        counts: &mut Counts,
801    ) -> Result<(), Error> {
802        // Reseting a stream that the user hasn't accepted is possible,
803        // but should be done with care. These streams will continue
804        // to take up memory in the accept queue, but will no longer be
805        // counted as "concurrent" streams.
806        //
807        // So, we have a separate limit for these.
808        //
809        // See https://github.com/hyperium/hyper/issues/2877
810        if stream.is_pending_accept {
811            if counts.can_inc_num_remote_reset_streams() {
812                counts.inc_num_remote_reset_streams();
813            } else {
814                tracing::warn!(
815                    "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
816                    counts.max_remote_reset_streams(),
817                );
818                return Err(Error::library_go_away_data(
819                    Reason::ENHANCE_YOUR_CALM,
820                    "too_many_resets",
821                ));
822            }
823        }
824
825        // Notify the stream
826        stream.state.recv_reset(frame, stream.is_pending_send);
827
828        stream.notify_send();
829        stream.notify_recv();
830        stream.notify_push();
831
832        Ok(())
833    }
834
835    /// Handle a connection-level error
836    pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
837        // Receive an error
838        stream.state.handle_error(err);
839
840        // If a receiver is waiting, notify it
841        stream.notify_send();
842        stream.notify_recv();
843        stream.notify_push();
844    }
845
846    pub fn go_away(&mut self, last_processed_id: StreamId) {
847        assert!(self.max_stream_id >= last_processed_id);
848        self.max_stream_id = last_processed_id;
849    }
850
851    pub fn recv_eof(&mut self, stream: &mut Stream) {
852        stream.state.recv_eof();
853        stream.notify_send();
854        stream.notify_recv();
855        stream.notify_push();
856    }
857
858    pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
859        while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
860            // drop it
861        }
862    }
863
864    /// Get the max ID of streams we can receive.
865    ///
866    /// This gets lowered if we send a GOAWAY frame.
867    pub fn max_stream_id(&self) -> StreamId {
868        self.max_stream_id
869    }
870
871    pub fn next_stream_id(&self) -> Result<StreamId, Error> {
872        if let Ok(id) = self.next_stream_id {
873            Ok(id)
874        } else {
875            Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
876        }
877    }
878
879    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
880        if let Ok(next_id) = self.next_stream_id {
881            // Peer::is_local_init should have been called beforehand
882            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
883            id < next_id
884        } else {
885            true
886        }
887    }
888
889    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
890        if let Ok(next_id) = self.next_stream_id {
891            // !Peer::is_local_init should have been called beforehand
892            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
893            if id >= next_id {
894                self.next_stream_id = id.next_id();
895            }
896        }
897    }
898
899    /// Returns true if the remote peer can reserve a stream with the given ID.
900    pub fn ensure_can_reserve(&self) -> Result<(), Error> {
901        if !self.is_push_enabled {
902            proto_err!(conn: "recv_push_promise: push is disabled");
903            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
904        }
905
906        Ok(())
907    }
908
909    /// Add a locally reset stream to queue to be eventually reaped.
910    pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
911        if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
912            return;
913        }
914
915        tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
916
917        if counts.can_inc_num_reset_streams() {
918            counts.inc_num_reset_streams();
919            self.pending_reset_expired.push(stream);
920        }
921    }
922
923    /// Send any pending refusals.
924    pub fn send_pending_refusal<T, B>(
925        &mut self,
926        cx: &mut Context,
927        dst: &mut Codec<T, Prioritized<B>>,
928    ) -> Poll<io::Result<()>>
929    where
930        T: AsyncWrite + Unpin,
931        B: Buf,
932    {
933        if let Some(stream_id) = self.refused {
934            ready!(dst.poll_ready(cx))?;
935
936            // Create the RST_STREAM frame
937            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
938
939            // Buffer the frame
940            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
941        }
942
943        self.refused = None;
944
945        Poll::Ready(Ok(()))
946    }
947
948    pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
949        if !self.pending_reset_expired.is_empty() {
950            let now = Instant::now();
951            let reset_duration = self.reset_duration;
952            while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
953                let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
954                // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
955                // subtraction can panic (because, on some platforms, `Instant` isn't actually
956                // monotonic). We use a saturating operation to avoid this panic here.
957                now.saturating_duration_since(reset_at) > reset_duration
958            }) {
959                counts.transition_after(stream, true);
960            }
961        }
962    }
963
964    pub fn clear_queues(
965        &mut self,
966        clear_pending_accept: bool,
967        store: &mut Store,
968        counts: &mut Counts,
969    ) {
970        self.clear_stream_window_update_queue(store, counts);
971        self.clear_all_reset_streams(store, counts);
972
973        if clear_pending_accept {
974            self.clear_all_pending_accept(store, counts);
975        }
976    }
977
978    fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
979        while let Some(stream) = self.pending_window_updates.pop(store) {
980            counts.transition(stream, |_, stream| {
981                tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
982            })
983        }
984    }
985
986    /// Called on EOF
987    fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
988        while let Some(stream) = self.pending_reset_expired.pop(store) {
989            counts.transition_after(stream, true);
990        }
991    }
992
993    fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
994        while let Some(stream) = self.pending_accept.pop(store) {
995            counts.transition_after(stream, false);
996        }
997    }
998
999    pub fn poll_complete<T, B>(
1000        &mut self,
1001        cx: &mut Context,
1002        store: &mut Store,
1003        counts: &mut Counts,
1004        dst: &mut Codec<T, Prioritized<B>>,
1005    ) -> Poll<io::Result<()>>
1006    where
1007        T: AsyncWrite + Unpin,
1008        B: Buf,
1009    {
1010        // Send any pending connection level window updates
1011        ready!(self.send_connection_window_update(cx, dst))?;
1012
1013        // Send any pending stream level window updates
1014        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1015
1016        Poll::Ready(Ok(()))
1017    }
1018
1019    /// Send connection level window update
1020    fn send_connection_window_update<T, B>(
1021        &mut self,
1022        cx: &mut Context,
1023        dst: &mut Codec<T, Prioritized<B>>,
1024    ) -> Poll<io::Result<()>>
1025    where
1026        T: AsyncWrite + Unpin,
1027        B: Buf,
1028    {
1029        if let Some(incr) = self.flow.unclaimed_capacity() {
1030            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1031
1032            // Ensure the codec has capacity
1033            ready!(dst.poll_ready(cx))?;
1034
1035            // Buffer the WINDOW_UPDATE frame
1036            dst.buffer(frame.into())
1037                .expect("invalid WINDOW_UPDATE frame");
1038
1039            // Update flow control
1040            self.flow
1041                .inc_window(incr)
1042                .expect("unexpected flow control state");
1043        }
1044
1045        Poll::Ready(Ok(()))
1046    }
1047
1048    /// Send stream level window update
1049    pub fn send_stream_window_updates<T, B>(
1050        &mut self,
1051        cx: &mut Context,
1052        store: &mut Store,
1053        counts: &mut Counts,
1054        dst: &mut Codec<T, Prioritized<B>>,
1055    ) -> Poll<io::Result<()>>
1056    where
1057        T: AsyncWrite + Unpin,
1058        B: Buf,
1059    {
1060        loop {
1061            // Ensure the codec has capacity
1062            ready!(dst.poll_ready(cx))?;
1063
1064            // Get the next stream
1065            let stream = match self.pending_window_updates.pop(store) {
1066                Some(stream) => stream,
1067                None => return Poll::Ready(Ok(())),
1068            };
1069
1070            counts.transition(stream, |_, stream| {
1071                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1072                debug_assert!(!stream.is_pending_window_update);
1073
1074                if !stream.state.is_recv_streaming() {
1075                    // No need to send window updates on the stream if the stream is
1076                    // no longer receiving data.
1077                    //
1078                    // TODO: is this correct? We could possibly send a window
1079                    // update on a ReservedRemote stream if we already know
1080                    // we want to stream the data faster...
1081                    return;
1082                }
1083
1084                // TODO: de-dup
1085                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1086                    // Create the WINDOW_UPDATE frame
1087                    let frame = frame::WindowUpdate::new(stream.id, incr);
1088
1089                    // Buffer it
1090                    dst.buffer(frame.into())
1091                        .expect("invalid WINDOW_UPDATE frame");
1092
1093                    // Update flow control
1094                    stream
1095                        .recv_flow
1096                        .inc_window(incr)
1097                        .expect("unexpected flow control state");
1098                }
1099            })
1100        }
1101    }
1102
1103    pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1104        self.pending_accept.pop(store).map(|ptr| ptr.key())
1105    }
1106
1107    pub fn poll_data(
1108        &mut self,
1109        cx: &Context,
1110        stream: &mut Stream,
1111    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1112        match stream.pending_recv.pop_front(&mut self.buffer) {
1113            Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1114            Some(event) => {
1115                // Frame is trailer
1116                stream.pending_recv.push_front(&mut self.buffer, event);
1117
1118                // Notify the recv task. This is done just in case
1119                // `poll_trailers` was called.
1120                //
1121                // It is very likely that `notify_recv` will just be a no-op (as
1122                // the task will be None), so this isn't really much of a
1123                // performance concern. It also means we don't have to track
1124                // state to see if `poll_trailers` was called before `poll_data`
1125                // returned `None`.
1126                stream.notify_recv();
1127
1128                // No more data frames
1129                Poll::Ready(None)
1130            }
1131            None => self.schedule_recv(cx, stream),
1132        }
1133    }
1134
1135    pub fn poll_trailers(
1136        &mut self,
1137        cx: &Context,
1138        stream: &mut Stream,
1139    ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1140        match stream.pending_recv.pop_front(&mut self.buffer) {
1141            Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1142            Some(event) => {
1143                // Frame is not trailers.. not ready to poll trailers yet.
1144                stream.pending_recv.push_front(&mut self.buffer, event);
1145
1146                Poll::Pending
1147            }
1148            None => self.schedule_recv(cx, stream),
1149        }
1150    }
1151
1152    fn schedule_recv<T>(
1153        &mut self,
1154        cx: &Context,
1155        stream: &mut Stream,
1156    ) -> Poll<Option<Result<T, proto::Error>>> {
1157        if stream.state.ensure_recv_open()? {
1158            // Request to get notified once more frames arrive
1159            stream.recv_task = Some(cx.waker().clone());
1160            Poll::Pending
1161        } else {
1162            // No more frames will be received
1163            Poll::Ready(None)
1164        }
1165    }
1166}
1167
1168// ===== impl Open =====
1169
1170impl Open {
1171    pub fn is_push_promise(&self) -> bool {
1172        matches!(*self, Self::PushPromise)
1173    }
1174}
1175
1176// ===== impl RecvHeaderBlockError =====
1177
1178impl<T> From<Error> for RecvHeaderBlockError<T> {
1179    fn from(err: Error) -> Self {
1180        RecvHeaderBlockError::State(err)
1181    }
1182}