1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 init_window_sz: WindowSize,
17
18 flow: FlowControl,
20
21 in_flight_data: WindowSize,
23
24 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 last_processed_id: StreamId,
29
30 max_stream_id: StreamId,
38
39 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 pending_accept: store::Queue<stream::NextAccept>,
44
45 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 reset_duration: Duration,
50
51 buffer: Buffer<Event>,
53
54 refused: Option<StreamId>,
56
57 is_push_enabled: bool,
59
60 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69}
70
71#[derive(Debug)]
72pub(super) enum RecvHeaderBlockError<T> {
73 Oversize(T),
74 State(Error),
75}
76
77#[derive(Debug)]
78pub(crate) enum Open {
79 PushPromise,
80 Headers,
81}
82
83impl Recv {
84 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85 let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87 let mut flow = FlowControl::new();
88
89 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92 .expect("invalid initial remote window size");
93 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
95 Recv {
96 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97 flow,
98 in_flight_data: 0 as WindowSize,
99 next_stream_id: Ok(next_stream_id.into()),
100 pending_window_updates: store::Queue::new(),
101 last_processed_id: StreamId::ZERO,
102 max_stream_id: StreamId::MAX,
103 pending_accept: store::Queue::new(),
104 pending_reset_expired: store::Queue::new(),
105 reset_duration: config.local_reset_duration,
106 buffer: Buffer::new(),
107 refused: None,
108 is_push_enabled: config.local_push_enabled,
109 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110 }
111 }
112
113 pub fn init_window_sz(&self) -> WindowSize {
115 self.init_window_sz
116 }
117
118 pub fn last_processed_id(&self) -> StreamId {
120 self.last_processed_id
121 }
122
123 pub fn open(
127 &mut self,
128 id: StreamId,
129 mode: Open,
130 counts: &mut Counts,
131 ) -> Result<Option<StreamId>, Error> {
132 assert!(self.refused.is_none());
133
134 counts.peer().ensure_can_open(id, mode)?;
135
136 let next_id = self.next_stream_id()?;
137 if id < next_id {
138 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140 }
141
142 self.next_stream_id = id.next_id();
143
144 if !counts.can_inc_num_recv_streams() {
145 self.refused = Some(id);
146 return Ok(None);
147 }
148
149 Ok(Some(id))
150 }
151
152 pub fn recv_headers(
156 &mut self,
157 frame: frame::Headers,
158 stream: &mut store::Ptr,
159 counts: &mut Counts,
160 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162 let is_initial = stream.state.recv_open(&frame)?;
163
164 if is_initial {
165 if frame.stream_id() > self.last_processed_id {
167 self.last_processed_id = frame.stream_id();
168 }
169
170 counts.inc_num_recv_streams(stream);
172 }
173
174 if !stream.content_length.is_head() {
175 use super::stream::ContentLength;
176 use http::header;
177
178 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179 let content_length = match frame::parse_u64(content_length.as_bytes()) {
180 Ok(v) => v,
181 Err(_) => {
182 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184 }
185 };
186
187 stream.content_length = ContentLength::Remaining(content_length);
188 if frame.is_end_stream()
191 && content_length > 0
192 && frame
193 .pseudo()
194 .status
195 .map_or(true, |status| status != 204 && status != 304)
196 {
197 proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
198 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
199 }
200 }
201 }
202
203 if frame.is_over_size() {
204 tracing::debug!(
216 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
217 recv_headers: frame is over size; stream={:?}",
218 stream.id
219 );
220 return if counts.peer().is_server() && is_initial {
221 let mut res = frame::Headers::new(
222 stream.id,
223 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
224 HeaderMap::new(),
225 );
226 res.set_end_stream();
227 Err(RecvHeaderBlockError::Oversize(Some(res)))
228 } else {
229 Err(RecvHeaderBlockError::Oversize(None))
230 };
231 }
232
233 let stream_id = frame.stream_id();
234 let (pseudo, fields) = frame.into_parts();
235
236 if pseudo.protocol.is_some()
237 && counts.peer().is_server()
238 && !self.is_extended_connect_protocol_enabled
239 {
240 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
241 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
242 }
243
244 if pseudo.status.is_some() && counts.peer().is_server() {
245 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
246 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
247 }
248
249 if !pseudo.is_informational() {
250 let message = counts
251 .peer()
252 .convert_poll_message(pseudo, fields, stream_id)?;
253
254 stream
256 .pending_recv
257 .push_back(&mut self.buffer, Event::Headers(message));
258 stream.notify_recv();
259
260 if counts.peer().is_server() {
263 self.pending_accept.push(stream);
266 }
267 }
268
269 Ok(())
270 }
271
272 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
279 use super::peer::PollMessage::*;
280
281 match stream.pending_recv.pop_front(&mut self.buffer) {
282 Some(Event::Headers(Server(request))) => request,
283 _ => unreachable!("server stream queue must start with Headers"),
284 }
285 }
286
287 pub fn poll_pushed(
289 &mut self,
290 cx: &Context,
291 stream: &mut store::Ptr,
292 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
293 use super::peer::PollMessage::*;
294
295 let mut ppp = stream.pending_push_promises.take();
296 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
297 match pushed.pending_recv.pop_front(&mut self.buffer) {
298 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
299 _ => panic!("Headers not set on pushed stream"),
302 }
303 });
304 stream.pending_push_promises = ppp;
305 if let Some(p) = pushed {
306 Poll::Ready(Some(Ok(p)))
307 } else {
308 let is_open = stream.state.ensure_recv_open()?;
309
310 if is_open {
311 stream.push_task = Some(cx.waker().clone());
312 Poll::Pending
313 } else {
314 Poll::Ready(None)
315 }
316 }
317 }
318
319 pub fn poll_response(
321 &mut self,
322 cx: &Context,
323 stream: &mut store::Ptr,
324 ) -> Poll<Result<Response<()>, proto::Error>> {
325 use super::peer::PollMessage::*;
326
327 match stream.pending_recv.pop_front(&mut self.buffer) {
330 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
331 Some(_) => panic!("poll_response called after response returned"),
332 None => {
333 if !stream.state.ensure_recv_open()? {
334 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
335 return Poll::Ready(Err(Error::library_reset(
336 stream.id,
337 Reason::PROTOCOL_ERROR,
338 )));
339 }
340
341 stream.recv_task = Some(cx.waker().clone());
342 Poll::Pending
343 }
344 }
345 }
346
347 pub fn recv_trailers(
349 &mut self,
350 frame: frame::Headers,
351 stream: &mut store::Ptr,
352 ) -> Result<(), Error> {
353 stream.state.recv_close()?;
355
356 if stream.ensure_content_length_zero().is_err() {
357 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
358 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
359 }
360
361 let trailers = frame.into_fields();
362
363 stream
365 .pending_recv
366 .push_back(&mut self.buffer, Event::Trailers(trailers));
367 stream.notify_recv();
368
369 Ok(())
370 }
371
372 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
374 tracing::trace!(
375 "release_connection_capacity; size={}, connection in_flight_data={}",
376 capacity,
377 self.in_flight_data,
378 );
379
380 self.in_flight_data -= capacity;
382
383 let _res = self.flow.assign_capacity(capacity);
386 debug_assert!(_res.is_ok());
387
388 if self.flow.unclaimed_capacity().is_some() {
389 if let Some(task) = task.take() {
390 task.wake();
391 }
392 }
393 }
394
395 pub fn release_capacity(
397 &mut self,
398 capacity: WindowSize,
399 stream: &mut store::Ptr,
400 task: &mut Option<Waker>,
401 ) -> Result<(), UserError> {
402 tracing::trace!("release_capacity; size={}", capacity);
403
404 if capacity > stream.in_flight_recv_data {
405 return Err(UserError::ReleaseCapacityTooBig);
406 }
407
408 self.release_connection_capacity(capacity, task);
409
410 stream.in_flight_recv_data -= capacity;
412
413 let _res = stream.recv_flow.assign_capacity(capacity);
416 debug_assert!(_res.is_ok());
417
418 if stream.recv_flow.unclaimed_capacity().is_some() {
419 self.pending_window_updates.push(stream);
421
422 if let Some(task) = task.take() {
423 task.wake();
424 }
425 }
426
427 Ok(())
428 }
429
430 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
432 debug_assert_eq!(stream.ref_count, 0);
433
434 if stream.in_flight_recv_data == 0 {
435 return;
436 }
437
438 tracing::trace!(
439 "auto-release closed stream ({:?}) capacity: {:?}",
440 stream.id,
441 stream.in_flight_recv_data,
442 );
443
444 self.release_connection_capacity(stream.in_flight_recv_data, task);
445 stream.in_flight_recv_data = 0;
446
447 self.clear_recv_buffer(stream);
448 }
449
450 pub fn set_target_connection_window(
463 &mut self,
464 target: WindowSize,
465 task: &mut Option<Waker>,
466 ) -> Result<(), Reason> {
467 tracing::trace!(
468 "set_target_connection_window; target={}; available={}, reserved={}",
469 target,
470 self.flow.available(),
471 self.in_flight_data,
472 );
473
474 let current = self
480 .flow
481 .available()
482 .add(self.in_flight_data)?
483 .checked_size();
484 if target > current {
485 self.flow.assign_capacity(target - current)?;
486 } else {
487 self.flow.claim_capacity(current - target)?;
488 }
489
490 if self.flow.unclaimed_capacity().is_some() {
494 if let Some(task) = task.take() {
495 task.wake();
496 }
497 }
498 Ok(())
499 }
500
501 pub(crate) fn apply_local_settings(
502 &mut self,
503 settings: &frame::Settings,
504 store: &mut Store,
505 ) -> Result<(), proto::Error> {
506 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
507 self.is_extended_connect_protocol_enabled = val;
508 }
509
510 if let Some(target) = settings.initial_window_size() {
511 let old_sz = self.init_window_sz;
512 self.init_window_sz = target;
513
514 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
515
516 match target.cmp(&old_sz) {
533 Ordering::Less => {
534 let dec = old_sz - target;
536 tracing::trace!("decrementing all windows; dec={}", dec);
537
538 store.try_for_each(|mut stream| {
539 stream
540 .recv_flow
541 .dec_recv_window(dec)
542 .map_err(proto::Error::library_go_away)?;
543 Ok::<_, proto::Error>(())
544 })?;
545 }
546 Ordering::Greater => {
547 let inc = target - old_sz;
549 tracing::trace!("incrementing all windows; inc={}", inc);
550 store.try_for_each(|mut stream| {
551 stream
554 .recv_flow
555 .inc_window(inc)
556 .map_err(proto::Error::library_go_away)?;
557 stream
558 .recv_flow
559 .assign_capacity(inc)
560 .map_err(proto::Error::library_go_away)?;
561 Ok::<_, proto::Error>(())
562 })?;
563 }
564 Ordering::Equal => (),
565 }
566 }
567
568 Ok(())
569 }
570
571 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
572 if !stream.state.is_recv_end_stream() {
573 return false;
574 }
575
576 stream.pending_recv.is_empty()
577 }
578
579 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
580 let sz = frame.payload().len();
581
582 assert!(sz <= MAX_WINDOW_SIZE as usize);
585
586 let sz = sz as WindowSize;
587
588 let is_ignoring_frame = stream.state.is_local_error();
589
590 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
591 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
597 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
598 }
599
600 tracing::trace!(
601 "recv_data; size={}; connection={}; stream={}",
602 sz,
603 self.flow.window_size(),
604 stream.recv_flow.window_size()
605 );
606
607 if is_ignoring_frame {
608 tracing::trace!(
609 "recv_data; frame ignored on locally reset {:?} for some time",
610 stream.id,
611 );
612 return self.ignore_data(sz);
613 }
614
615 self.consume_connection_window(sz)?;
618
619 if stream.recv_flow.window_size() < sz {
620 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
629 }
630
631 if stream.dec_content_length(frame.payload().len()).is_err() {
632 proto_err!(stream:
633 "recv_data: content-length overflow; stream={:?}; len={:?}",
634 stream.id,
635 frame.payload().len(),
636 );
637 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
638 }
639
640 if frame.is_end_stream() {
641 if stream.ensure_content_length_zero().is_err() {
642 proto_err!(stream:
643 "recv_data: content-length underflow; stream={:?}; len={:?}",
644 stream.id,
645 frame.payload().len(),
646 );
647 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
648 }
649
650 if stream.state.recv_close().is_err() {
651 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
652 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
653 }
654 }
655
656 if !stream.is_recv {
658 tracing::trace!(
659 "recv_data; frame ignored on stream release {:?} for some time",
660 stream.id,
661 );
662 self.release_connection_capacity(sz, &mut None);
663 return Ok(());
664 }
665
666 stream
668 .recv_flow
669 .send_data(sz)
670 .map_err(proto::Error::library_go_away)?;
671
672 stream.in_flight_recv_data += sz;
674
675 let event = Event::Data(frame.into_payload());
676
677 stream.pending_recv.push_back(&mut self.buffer, event);
679 stream.notify_recv();
680
681 Ok(())
682 }
683
684 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
685 self.consume_connection_window(sz)?;
687
688 self.release_connection_capacity(sz, &mut None);
697 Ok(())
698 }
699
700 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
701 if self.flow.window_size() < sz {
702 tracing::debug!(
703 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
704 self.flow.window_size(),
705 sz,
706 );
707 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
708 }
709
710 self.flow.send_data(sz).map_err(Error::library_go_away)?;
712
713 self.in_flight_data += sz;
715 Ok(())
716 }
717
718 pub fn recv_push_promise(
719 &mut self,
720 frame: frame::PushPromise,
721 stream: &mut store::Ptr,
722 ) -> Result<(), Error> {
723 stream.state.reserve_remote()?;
724 if frame.is_over_size() {
725 tracing::debug!(
737 "stream error PROTOCOL_ERROR -- recv_push_promise: \
738 headers frame is over size; promised_id={:?};",
739 frame.promised_id(),
740 );
741 return Err(Error::library_reset(
742 frame.promised_id(),
743 Reason::PROTOCOL_ERROR,
744 ));
745 }
746
747 let promised_id = frame.promised_id();
748 let (pseudo, fields) = frame.into_parts();
749 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
750
751 if let Err(e) = frame::PushPromise::validate_request(&req) {
752 use PushPromiseHeaderError::*;
753 match e {
754 NotSafeAndCacheable => proto_err!(
755 stream:
756 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
757 req.method(),
758 promised_id,
759 ),
760 InvalidContentLength(e) => proto_err!(
761 stream:
762 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
763 e,
764 promised_id,
765 ),
766 }
767 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
768 }
769
770 use super::peer::PollMessage::*;
771 stream
772 .pending_recv
773 .push_back(&mut self.buffer, Event::Headers(Server(req)));
774 stream.notify_recv();
775 stream.notify_push();
776 Ok(())
777 }
778
779 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
781 if let Ok(next) = self.next_stream_id {
782 if id >= next {
783 tracing::debug!(
784 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
785 id
786 );
787 return Err(Reason::PROTOCOL_ERROR);
788 }
789 }
790 Ok(())
793 }
794
795 pub fn recv_reset(
797 &mut self,
798 frame: frame::Reset,
799 stream: &mut Stream,
800 counts: &mut Counts,
801 ) -> Result<(), Error> {
802 if stream.is_pending_accept {
811 if counts.can_inc_num_remote_reset_streams() {
812 counts.inc_num_remote_reset_streams();
813 } else {
814 tracing::warn!(
815 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
816 counts.max_remote_reset_streams(),
817 );
818 return Err(Error::library_go_away_data(
819 Reason::ENHANCE_YOUR_CALM,
820 "too_many_resets",
821 ));
822 }
823 }
824
825 stream.state.recv_reset(frame, stream.is_pending_send);
827
828 stream.notify_send();
829 stream.notify_recv();
830 stream.notify_push();
831
832 Ok(())
833 }
834
835 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
837 stream.state.handle_error(err);
839
840 stream.notify_send();
842 stream.notify_recv();
843 stream.notify_push();
844 }
845
846 pub fn go_away(&mut self, last_processed_id: StreamId) {
847 assert!(self.max_stream_id >= last_processed_id);
848 self.max_stream_id = last_processed_id;
849 }
850
851 pub fn recv_eof(&mut self, stream: &mut Stream) {
852 stream.state.recv_eof();
853 stream.notify_send();
854 stream.notify_recv();
855 stream.notify_push();
856 }
857
858 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
859 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
860 }
862 }
863
864 pub fn max_stream_id(&self) -> StreamId {
868 self.max_stream_id
869 }
870
871 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
872 if let Ok(id) = self.next_stream_id {
873 Ok(id)
874 } else {
875 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
876 }
877 }
878
879 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
880 if let Ok(next_id) = self.next_stream_id {
881 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
883 id < next_id
884 } else {
885 true
886 }
887 }
888
889 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
890 if let Ok(next_id) = self.next_stream_id {
891 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
893 if id >= next_id {
894 self.next_stream_id = id.next_id();
895 }
896 }
897 }
898
899 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
901 if !self.is_push_enabled {
902 proto_err!(conn: "recv_push_promise: push is disabled");
903 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
904 }
905
906 Ok(())
907 }
908
909 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
911 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
912 return;
913 }
914
915 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
916
917 if counts.can_inc_num_reset_streams() {
918 counts.inc_num_reset_streams();
919 self.pending_reset_expired.push(stream);
920 }
921 }
922
923 pub fn send_pending_refusal<T, B>(
925 &mut self,
926 cx: &mut Context,
927 dst: &mut Codec<T, Prioritized<B>>,
928 ) -> Poll<io::Result<()>>
929 where
930 T: AsyncWrite + Unpin,
931 B: Buf,
932 {
933 if let Some(stream_id) = self.refused {
934 ready!(dst.poll_ready(cx))?;
935
936 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
938
939 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
941 }
942
943 self.refused = None;
944
945 Poll::Ready(Ok(()))
946 }
947
948 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
949 if !self.pending_reset_expired.is_empty() {
950 let now = Instant::now();
951 let reset_duration = self.reset_duration;
952 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
953 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
954 now.saturating_duration_since(reset_at) > reset_duration
958 }) {
959 counts.transition_after(stream, true);
960 }
961 }
962 }
963
964 pub fn clear_queues(
965 &mut self,
966 clear_pending_accept: bool,
967 store: &mut Store,
968 counts: &mut Counts,
969 ) {
970 self.clear_stream_window_update_queue(store, counts);
971 self.clear_all_reset_streams(store, counts);
972
973 if clear_pending_accept {
974 self.clear_all_pending_accept(store, counts);
975 }
976 }
977
978 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
979 while let Some(stream) = self.pending_window_updates.pop(store) {
980 counts.transition(stream, |_, stream| {
981 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
982 })
983 }
984 }
985
986 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
988 while let Some(stream) = self.pending_reset_expired.pop(store) {
989 counts.transition_after(stream, true);
990 }
991 }
992
993 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
994 while let Some(stream) = self.pending_accept.pop(store) {
995 counts.transition_after(stream, false);
996 }
997 }
998
999 pub fn poll_complete<T, B>(
1000 &mut self,
1001 cx: &mut Context,
1002 store: &mut Store,
1003 counts: &mut Counts,
1004 dst: &mut Codec<T, Prioritized<B>>,
1005 ) -> Poll<io::Result<()>>
1006 where
1007 T: AsyncWrite + Unpin,
1008 B: Buf,
1009 {
1010 ready!(self.send_connection_window_update(cx, dst))?;
1012
1013 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1015
1016 Poll::Ready(Ok(()))
1017 }
1018
1019 fn send_connection_window_update<T, B>(
1021 &mut self,
1022 cx: &mut Context,
1023 dst: &mut Codec<T, Prioritized<B>>,
1024 ) -> Poll<io::Result<()>>
1025 where
1026 T: AsyncWrite + Unpin,
1027 B: Buf,
1028 {
1029 if let Some(incr) = self.flow.unclaimed_capacity() {
1030 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1031
1032 ready!(dst.poll_ready(cx))?;
1034
1035 dst.buffer(frame.into())
1037 .expect("invalid WINDOW_UPDATE frame");
1038
1039 self.flow
1041 .inc_window(incr)
1042 .expect("unexpected flow control state");
1043 }
1044
1045 Poll::Ready(Ok(()))
1046 }
1047
1048 pub fn send_stream_window_updates<T, B>(
1050 &mut self,
1051 cx: &mut Context,
1052 store: &mut Store,
1053 counts: &mut Counts,
1054 dst: &mut Codec<T, Prioritized<B>>,
1055 ) -> Poll<io::Result<()>>
1056 where
1057 T: AsyncWrite + Unpin,
1058 B: Buf,
1059 {
1060 loop {
1061 ready!(dst.poll_ready(cx))?;
1063
1064 let stream = match self.pending_window_updates.pop(store) {
1066 Some(stream) => stream,
1067 None => return Poll::Ready(Ok(())),
1068 };
1069
1070 counts.transition(stream, |_, stream| {
1071 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1072 debug_assert!(!stream.is_pending_window_update);
1073
1074 if !stream.state.is_recv_streaming() {
1075 return;
1082 }
1083
1084 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1086 let frame = frame::WindowUpdate::new(stream.id, incr);
1088
1089 dst.buffer(frame.into())
1091 .expect("invalid WINDOW_UPDATE frame");
1092
1093 stream
1095 .recv_flow
1096 .inc_window(incr)
1097 .expect("unexpected flow control state");
1098 }
1099 })
1100 }
1101 }
1102
1103 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1104 self.pending_accept.pop(store).map(|ptr| ptr.key())
1105 }
1106
1107 pub fn poll_data(
1108 &mut self,
1109 cx: &Context,
1110 stream: &mut Stream,
1111 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1112 match stream.pending_recv.pop_front(&mut self.buffer) {
1113 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1114 Some(event) => {
1115 stream.pending_recv.push_front(&mut self.buffer, event);
1117
1118 stream.notify_recv();
1127
1128 Poll::Ready(None)
1130 }
1131 None => self.schedule_recv(cx, stream),
1132 }
1133 }
1134
1135 pub fn poll_trailers(
1136 &mut self,
1137 cx: &Context,
1138 stream: &mut Stream,
1139 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1140 match stream.pending_recv.pop_front(&mut self.buffer) {
1141 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1142 Some(event) => {
1143 stream.pending_recv.push_front(&mut self.buffer, event);
1145
1146 Poll::Pending
1147 }
1148 None => self.schedule_recv(cx, stream),
1149 }
1150 }
1151
1152 fn schedule_recv<T>(
1153 &mut self,
1154 cx: &Context,
1155 stream: &mut Stream,
1156 ) -> Poll<Option<Result<T, proto::Error>>> {
1157 if stream.state.ensure_recv_open()? {
1158 stream.recv_task = Some(cx.waker().clone());
1160 Poll::Pending
1161 } else {
1162 Poll::Ready(None)
1164 }
1165 }
1166}
1167
1168impl Open {
1171 pub fn is_push_promise(&self) -> bool {
1172 matches!(*self, Self::PushPromise)
1173 }
1174}
1175
1176impl<T> From<Error> for RecvHeaderBlockError<T> {
1179 fn from(err: Error) -> Self {
1180 RecvHeaderBlockError::State(err)
1181 }
1182}