h2/proto/streams/prioritize.rs
1use super::store::Resolve;
2use super::*;
3
4use crate::frame::Reason;
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::Take;
10use std::{
11 cmp::{self, Ordering},
12 fmt, io, mem,
13 task::{Context, Poll, Waker},
14};
15
16/// # Warning
17///
18/// Queued streams are ordered by stream ID, as we need to ensure that
19/// lower-numbered streams are sent headers before higher-numbered ones.
20/// This is because "idle" stream IDs – those which have been initiated but
21/// have yet to receive frames – will be implicitly closed on receipt of a
22/// frame on a higher stream ID. If these queues was not ordered by stream
23/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24/// idle stream is opened first.
25#[derive(Debug)]
26pub(super) struct Prioritize {
27 /// Queue of streams waiting for socket capacity to send a frame.
28 pending_send: store::Queue<stream::NextSend>,
29
30 /// Queue of streams waiting for window capacity to produce data.
31 pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33 /// Streams waiting for capacity due to max concurrency
34 ///
35 /// The `SendRequest` handle is `Clone`. This enables initiating requests
36 /// from many tasks. However, offering this capability while supporting
37 /// backpressure at some level is tricky. If there are many `SendRequest`
38 /// handles and a single stream becomes available, which handle gets
39 /// assigned that stream? Maybe that handle is no longer ready to send a
40 /// request.
41 ///
42 /// The strategy used is to allow each `SendRequest` handle one buffered
43 /// request. A `SendRequest` handle is ready to send a request if it has no
44 /// associated buffered requests. This is the same strategy as `mpsc` in the
45 /// futures library.
46 pending_open: store::Queue<stream::NextOpen>,
47
48 /// Connection level flow control governing sent data
49 flow: FlowControl,
50
51 /// Stream ID of the last stream opened.
52 last_opened_id: StreamId,
53
54 /// What `DATA` frame is currently being sent in the codec.
55 in_flight_data_frame: InFlightData,
56
57 /// The maximum amount of bytes a stream should buffer.
58 max_buffer_size: usize,
59}
60
61#[derive(Debug, Eq, PartialEq)]
62enum InFlightData {
63 /// There is no `DATA` frame in flight.
64 Nothing,
65 /// There is a `DATA` frame in flight belonging to the given stream.
66 DataFrame(store::Key),
67 /// There was a `DATA` frame, but the stream's queue was since cleared.
68 Drop,
69}
70
71pub(crate) struct Prioritized<B> {
72 // The buffer
73 inner: Take<B>,
74
75 end_of_stream: bool,
76
77 // The stream that this is associated with
78 stream: store::Key,
79}
80
81// ===== impl Prioritize =====
82
83impl Prioritize {
84 pub fn new(config: &Config) -> Prioritize {
85 let mut flow = FlowControl::new();
86
87 flow.inc_window(config.remote_init_window_sz)
88 .expect("invalid initial window size");
89
90 // TODO: proper error handling
91 let _res = flow.assign_capacity(config.remote_init_window_sz);
92 debug_assert!(_res.is_ok());
93
94 tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96 Prioritize {
97 pending_send: store::Queue::new(),
98 pending_capacity: store::Queue::new(),
99 pending_open: store::Queue::new(),
100 flow,
101 last_opened_id: StreamId::ZERO,
102 in_flight_data_frame: InFlightData::Nothing,
103 max_buffer_size: config.local_max_buffer_size,
104 }
105 }
106
107 pub(crate) fn max_buffer_size(&self) -> usize {
108 self.max_buffer_size
109 }
110
111 /// Queue a frame to be sent to the remote
112 pub fn queue_frame<B>(
113 &mut self,
114 frame: Frame<B>,
115 buffer: &mut Buffer<Frame<B>>,
116 stream: &mut store::Ptr,
117 task: &mut Option<Waker>,
118 ) {
119 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120 let _e = span.enter();
121 // Queue the frame in the buffer
122 stream.pending_send.push_back(buffer, frame);
123 self.schedule_send(stream, task);
124 }
125
126 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127 // If the stream is waiting to be opened, nothing more to do.
128 if stream.is_send_ready() {
129 tracing::trace!(?stream.id, "schedule_send");
130 // Queue the stream
131 self.pending_send.push(stream);
132
133 // Notify the connection.
134 if let Some(task) = task.take() {
135 task.wake();
136 }
137 }
138 }
139
140 pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141 self.pending_open.push(stream);
142 }
143
144 /// Send a data frame
145 pub fn send_data<B>(
146 &mut self,
147 frame: frame::Data<B>,
148 buffer: &mut Buffer<Frame<B>>,
149 stream: &mut store::Ptr,
150 counts: &mut Counts,
151 task: &mut Option<Waker>,
152 ) -> Result<(), UserError>
153 where
154 B: Buf,
155 {
156 let sz = frame.payload().remaining();
157
158 if sz > MAX_WINDOW_SIZE as usize {
159 return Err(UserError::PayloadTooBig);
160 }
161
162 let sz = sz as WindowSize;
163
164 if !stream.state.is_send_streaming() {
165 if stream.state.is_closed() {
166 return Err(InactiveStreamId);
167 } else {
168 return Err(UnexpectedFrameType);
169 }
170 }
171
172 // Update the buffered data counter
173 stream.buffered_send_data += sz as usize;
174
175 let span =
176 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177 let _e = span.enter();
178 tracing::trace!(buffered = stream.buffered_send_data);
179
180 // Implicitly request more send capacity if not enough has been
181 // requested yet.
182 if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183 // Update the target requested capacity
184 stream.requested_send_capacity =
185 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
187 // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188 // cannot be assigned at the time it is called.
189 //
190 // Streams over the max concurrent count will still call `send_data` so we should be
191 // careful not to put it into `pending_capacity` as it will starve the connection
192 // capacity for other streams
193 if !stream.is_pending_open {
194 self.try_assign_capacity(stream);
195 }
196 }
197
198 if frame.is_end_stream() {
199 stream.state.send_close();
200 self.reserve_capacity(0, stream, counts);
201 }
202
203 tracing::trace!(
204 available = %stream.send_flow.available(),
205 buffered = stream.buffered_send_data,
206 );
207
208 // The `stream.buffered_send_data == 0` check is here so that, if a zero
209 // length data frame is queued to the front (there is no previously
210 // queued data), it gets sent out immediately even if there is no
211 // available send window.
212 //
213 // Sending out zero length data frames can be done to signal
214 // end-of-stream.
215 //
216 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217 // The stream currently has capacity to send the data frame, so
218 // queue it up and notify the connection task.
219 self.queue_frame(frame.into(), buffer, stream, task);
220 } else {
221 // The stream has no capacity to send the frame now, save it but
222 // don't notify the connection task. Once additional capacity
223 // becomes available, the frame will be flushed.
224 stream.pending_send.push_back(buffer, frame.into());
225 }
226
227 Ok(())
228 }
229
230 /// Request capacity to send data
231 pub fn reserve_capacity(
232 &mut self,
233 capacity: WindowSize,
234 stream: &mut store::Ptr,
235 counts: &mut Counts,
236 ) {
237 let span = tracing::trace_span!(
238 "reserve_capacity",
239 ?stream.id,
240 requested = capacity,
241 effective = (capacity as usize) + stream.buffered_send_data,
242 curr = stream.requested_send_capacity
243 );
244 let _e = span.enter();
245
246 // Actual capacity is `capacity` + the current amount of buffered data.
247 // If it were less, then we could never send out the buffered data.
248 let capacity = (capacity as usize) + stream.buffered_send_data;
249
250 match capacity.cmp(&(stream.requested_send_capacity as usize)) {
251 Ordering::Equal => {
252 // Nothing to do
253 }
254 Ordering::Less => {
255 // Update the target requested capacity
256 stream.requested_send_capacity = capacity as WindowSize;
257
258 // Currently available capacity assigned to the stream
259 let available = stream.send_flow.available().as_size();
260
261 // If the stream has more assigned capacity than requested, reclaim
262 // some for the connection
263 if available as usize > capacity {
264 let diff = available - capacity as WindowSize;
265
266 // TODO: proper error handling
267 let _res = stream.send_flow.claim_capacity(diff);
268 debug_assert!(_res.is_ok());
269
270 self.assign_connection_capacity(diff, stream, counts);
271 }
272 }
273 Ordering::Greater => {
274 // If trying to *add* capacity, but the stream send side is closed,
275 // there's nothing to be done.
276 if stream.state.is_send_closed() {
277 return;
278 }
279
280 // Update the target requested capacity
281 stream.requested_send_capacity =
282 cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
283
284 // Try to assign additional capacity to the stream. If none is
285 // currently available, the stream will be queued to receive some
286 // when more becomes available.
287 self.try_assign_capacity(stream);
288 }
289 }
290 }
291
292 pub fn recv_stream_window_update(
293 &mut self,
294 inc: WindowSize,
295 stream: &mut store::Ptr,
296 ) -> Result<(), Reason> {
297 let span = tracing::trace_span!(
298 "recv_stream_window_update",
299 ?stream.id,
300 ?stream.state,
301 inc,
302 flow = ?stream.send_flow
303 );
304 let _e = span.enter();
305
306 if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
307 // We can't send any data, so don't bother doing anything else.
308 return Ok(());
309 }
310
311 // Update the stream level flow control.
312 stream.send_flow.inc_window(inc)?;
313
314 // If the stream is waiting on additional capacity, then this will
315 // assign it (if available on the connection) and notify the producer
316 self.try_assign_capacity(stream);
317
318 Ok(())
319 }
320
321 pub fn recv_connection_window_update(
322 &mut self,
323 inc: WindowSize,
324 store: &mut Store,
325 counts: &mut Counts,
326 ) -> Result<(), Reason> {
327 // Update the connection's window
328 self.flow.inc_window(inc)?;
329
330 self.assign_connection_capacity(inc, store, counts);
331 Ok(())
332 }
333
334 /// Reclaim all capacity assigned to the stream and re-assign it to the
335 /// connection
336 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
337 let available = stream.send_flow.available().as_size();
338 if available > 0 {
339 // TODO: proper error handling
340 let _res = stream.send_flow.claim_capacity(available);
341 debug_assert!(_res.is_ok());
342 // Re-assign all capacity to the connection
343 self.assign_connection_capacity(available, stream, counts);
344 }
345 }
346
347 /// Reclaim just reserved capacity, not buffered capacity, and re-assign
348 /// it to the connection
349 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350 // only reclaim reserved capacity that isn't already buffered
351 if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
352 let reserved =
353 stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;
354
355 // Panic safety: due to how `reserved` is computed it can't be greater
356 // than what's available.
357 stream
358 .send_flow
359 .claim_capacity(reserved)
360 .expect("window size should be greater than reserved");
361
362 self.assign_connection_capacity(reserved, stream, counts);
363 }
364 }
365
366 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
367 let span = tracing::trace_span!("clear_pending_capacity");
368 let _e = span.enter();
369 while let Some(stream) = self.pending_capacity.pop(store) {
370 counts.transition(stream, |_, stream| {
371 tracing::trace!(?stream.id, "clear_pending_capacity");
372 })
373 }
374 }
375
376 pub fn assign_connection_capacity<R>(
377 &mut self,
378 inc: WindowSize,
379 store: &mut R,
380 counts: &mut Counts,
381 ) where
382 R: Resolve,
383 {
384 let span = tracing::trace_span!("assign_connection_capacity", inc);
385 let _e = span.enter();
386
387 // TODO: proper error handling
388 let _res = self.flow.assign_capacity(inc);
389 debug_assert!(_res.is_ok());
390
391 // Assign newly acquired capacity to streams pending capacity.
392 while self.flow.available() > 0 {
393 let stream = match self.pending_capacity.pop(store) {
394 Some(stream) => stream,
395 None => return,
396 };
397
398 // Streams pending capacity may have been reset before capacity
399 // became available. In that case, the stream won't want any
400 // capacity, and so we shouldn't "transition" on it, but just evict
401 // it and continue the loop.
402 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
403 continue;
404 }
405
406 counts.transition(stream, |_, stream| {
407 // Try to assign capacity to the stream. This will also re-queue the
408 // stream if there isn't enough connection level capacity to fulfill
409 // the capacity request.
410 self.try_assign_capacity(stream);
411 })
412 }
413 }
414
415 /// Request capacity to send data
416 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
417 let total_requested = stream.requested_send_capacity;
418
419 // Total requested should never go below actual assigned
420 // (Note: the window size can go lower than assigned)
421 debug_assert!(stream.send_flow.available() <= total_requested as usize);
422
423 // The amount of additional capacity that the stream requests.
424 // Don't assign more than the window has available!
425 let additional = cmp::min(
426 total_requested - stream.send_flow.available().as_size(),
427 // Can't assign more than what is available
428 stream.send_flow.window_size() - stream.send_flow.available().as_size(),
429 );
430 let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
431 let _e = span.enter();
432 tracing::trace!(
433 requested = total_requested,
434 additional,
435 buffered = stream.buffered_send_data,
436 window = stream.send_flow.window_size(),
437 conn = %self.flow.available()
438 );
439
440 if additional == 0 {
441 // Nothing more to do
442 return;
443 }
444
445 // If the stream has requested capacity, then it must be in the
446 // streaming state (more data could be sent) or there is buffered data
447 // waiting to be sent.
448 debug_assert!(
449 stream.state.is_send_streaming() || stream.buffered_send_data > 0,
450 "state={:?}",
451 stream.state
452 );
453
454 // The amount of currently available capacity on the connection
455 let conn_available = self.flow.available().as_size();
456
457 // First check if capacity is immediately available
458 if conn_available > 0 {
459 // The amount of capacity to assign to the stream
460 // TODO: Should prioritization factor into this?
461 let assign = cmp::min(conn_available, additional);
462
463 tracing::trace!(capacity = assign, "assigning");
464
465 // Assign the capacity to the stream
466 stream.assign_capacity(assign, self.max_buffer_size);
467
468 // Claim the capacity from the connection
469 // TODO: proper error handling
470 let _res = self.flow.claim_capacity(assign);
471 debug_assert!(_res.is_ok());
472 }
473
474 tracing::trace!(
475 available = %stream.send_flow.available(),
476 requested = stream.requested_send_capacity,
477 buffered = stream.buffered_send_data,
478 has_unavailable = %stream.send_flow.has_unavailable()
479 );
480
481 if stream.send_flow.available() < stream.requested_send_capacity as usize
482 && stream.send_flow.has_unavailable()
483 {
484 // The stream requires additional capacity and the stream's
485 // window has available capacity, but the connection window
486 // does not.
487 //
488 // In this case, the stream needs to be queued up for when the
489 // connection has more capacity.
490 self.pending_capacity.push(stream);
491 }
492
493 // If data is buffered and the stream is send ready, then
494 // schedule the stream for execution
495 if stream.buffered_send_data > 0 && stream.is_send_ready() {
496 // TODO: This assertion isn't *exactly* correct. There can still be
497 // buffered send data while the stream's pending send queue is
498 // empty. This can happen when a large data frame is in the process
499 // of being **partially** sent. Once the window has been sent, the
500 // data frame will be returned to the prioritization layer to be
501 // re-scheduled.
502 //
503 // That said, it would be nice to figure out how to make this
504 // assertion correctly.
505 //
506 // debug_assert!(!stream.pending_send.is_empty());
507
508 self.pending_send.push(stream);
509 }
510 }
511
512 pub fn poll_complete<T, B>(
513 &mut self,
514 cx: &mut Context,
515 buffer: &mut Buffer<Frame<B>>,
516 store: &mut Store,
517 counts: &mut Counts,
518 dst: &mut Codec<T, Prioritized<B>>,
519 ) -> Poll<io::Result<()>>
520 where
521 T: AsyncWrite + Unpin,
522 B: Buf,
523 {
524 // Ensure codec is ready
525 ready!(dst.poll_ready(cx))?;
526
527 // Reclaim any frame that has previously been written
528 self.reclaim_frame(buffer, store, dst);
529
530 // The max frame length
531 let max_frame_len = dst.max_send_frame_size();
532
533 tracing::trace!("poll_complete");
534
535 loop {
536 if let Some(mut stream) = self.pop_pending_open(store, counts) {
537 self.pending_send.push_front(&mut stream);
538 self.try_assign_capacity(&mut stream);
539 }
540
541 match self.pop_frame(buffer, store, max_frame_len, counts) {
542 Some(frame) => {
543 tracing::trace!(?frame, "writing");
544
545 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
546 if let Frame::Data(ref frame) = frame {
547 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
548 }
549 dst.buffer(frame).expect("invalid frame");
550
551 // Ensure the codec is ready to try the loop again.
552 ready!(dst.poll_ready(cx))?;
553
554 // Because, always try to reclaim...
555 self.reclaim_frame(buffer, store, dst);
556 }
557 None => {
558 // Try to flush the codec.
559 ready!(dst.flush(cx))?;
560
561 // This might release a data frame...
562 if !self.reclaim_frame(buffer, store, dst) {
563 return Poll::Ready(Ok(()));
564 }
565
566 // No need to poll ready as poll_complete() does this for
567 // us...
568 }
569 }
570 }
571 }
572
573 /// Tries to reclaim a pending data frame from the codec.
574 ///
575 /// Returns true if a frame was reclaimed.
576 ///
577 /// When a data frame is written to the codec, it may not be written in its
578 /// entirety (large chunks are split up into potentially many data frames).
579 /// In this case, the stream needs to be reprioritized.
580 fn reclaim_frame<T, B>(
581 &mut self,
582 buffer: &mut Buffer<Frame<B>>,
583 store: &mut Store,
584 dst: &mut Codec<T, Prioritized<B>>,
585 ) -> bool
586 where
587 B: Buf,
588 {
589 let span = tracing::trace_span!("try_reclaim_frame");
590 let _e = span.enter();
591
592 // First check if there are any data chunks to take back
593 if let Some(frame) = dst.take_last_data_frame() {
594 self.reclaim_frame_inner(buffer, store, frame)
595 } else {
596 false
597 }
598 }
599
600 fn reclaim_frame_inner<B>(
601 &mut self,
602 buffer: &mut Buffer<Frame<B>>,
603 store: &mut Store,
604 frame: frame::Data<Prioritized<B>>,
605 ) -> bool
606 where
607 B: Buf,
608 {
609 tracing::trace!(
610 ?frame,
611 sz = frame.payload().inner.get_ref().remaining(),
612 "reclaimed"
613 );
614
615 let mut eos = false;
616 let key = frame.payload().stream;
617
618 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
619 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
620 InFlightData::Drop => {
621 tracing::trace!("not reclaiming frame for cancelled stream");
622 return false;
623 }
624 InFlightData::DataFrame(k) => {
625 debug_assert_eq!(k, key);
626 }
627 }
628
629 let mut frame = frame.map(|prioritized| {
630 // TODO: Ensure fully written
631 eos = prioritized.end_of_stream;
632 prioritized.inner.into_inner()
633 });
634
635 if frame.payload().has_remaining() {
636 let mut stream = store.resolve(key);
637
638 if eos {
639 frame.set_end_stream(true);
640 }
641
642 self.push_back_frame(frame.into(), buffer, &mut stream);
643
644 return true;
645 }
646
647 false
648 }
649
650 /// Push the frame to the front of the stream's deque, scheduling the
651 /// stream if needed.
652 fn push_back_frame<B>(
653 &mut self,
654 frame: Frame<B>,
655 buffer: &mut Buffer<Frame<B>>,
656 stream: &mut store::Ptr,
657 ) {
658 // Push the frame to the front of the stream's deque
659 stream.pending_send.push_front(buffer, frame);
660
661 // If needed, schedule the sender
662 if stream.send_flow.available() > 0 {
663 debug_assert!(!stream.pending_send.is_empty());
664 self.pending_send.push(stream);
665 }
666 }
667
668 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
669 let span = tracing::trace_span!("clear_queue", ?stream.id);
670 let _e = span.enter();
671
672 // TODO: make this more efficient?
673 while let Some(frame) = stream.pending_send.pop_front(buffer) {
674 tracing::trace!(?frame, "dropping");
675 }
676
677 stream.buffered_send_data = 0;
678 stream.requested_send_capacity = 0;
679 if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
680 if stream.key() == key {
681 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
682 self.in_flight_data_frame = InFlightData::Drop;
683 }
684 }
685 }
686
687 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
688 while let Some(mut stream) = self.pending_send.pop(store) {
689 let is_pending_reset = stream.is_pending_reset_expiration();
690 if let Some(reason) = stream.state.get_scheduled_reset() {
691 stream.set_reset(reason, Initiator::Library);
692 }
693 counts.transition_after(stream, is_pending_reset);
694 }
695 }
696
697 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
698 while let Some(stream) = self.pending_open.pop(store) {
699 let is_pending_reset = stream.is_pending_reset_expiration();
700 counts.transition_after(stream, is_pending_reset);
701 }
702 }
703
704 fn pop_frame<B>(
705 &mut self,
706 buffer: &mut Buffer<Frame<B>>,
707 store: &mut Store,
708 max_len: usize,
709 counts: &mut Counts,
710 ) -> Option<Frame<Prioritized<B>>>
711 where
712 B: Buf,
713 {
714 let span = tracing::trace_span!("pop_frame");
715 let _e = span.enter();
716
717 loop {
718 match self.pending_send.pop(store) {
719 Some(mut stream) => {
720 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
721 let _e = span.enter();
722
723 // It's possible that this stream, besides having data to send,
724 // is also queued to send a reset, and thus is already in the queue
725 // to wait for "some time" after a reset.
726 //
727 // To be safe, we just always ask the stream.
728 let is_pending_reset = stream.is_pending_reset_expiration();
729
730 tracing::trace!(is_pending_reset);
731
732 let frame = match stream.pending_send.pop_front(buffer) {
733 Some(Frame::Data(mut frame)) => {
734 // Get the amount of capacity remaining for stream's
735 // window.
736 let stream_capacity = stream.send_flow.available();
737 let sz = frame.payload().remaining();
738
739 tracing::trace!(
740 sz,
741 eos = frame.is_end_stream(),
742 window = %stream_capacity,
743 available = %stream.send_flow.available(),
744 requested = stream.requested_send_capacity,
745 buffered = stream.buffered_send_data,
746 "data frame"
747 );
748
749 // Zero length data frames always have capacity to
750 // be sent.
751 if sz > 0 && stream_capacity == 0 {
752 tracing::trace!("stream capacity is 0");
753
754 // Ensure that the stream is waiting for
755 // connection level capacity
756 //
757 // TODO: uncomment
758 // debug_assert!(stream.is_pending_send_capacity);
759
760 // The stream has no more capacity, this can
761 // happen if the remote reduced the stream
762 // window. In this case, we need to buffer the
763 // frame and wait for a window update...
764 stream.pending_send.push_front(buffer, frame.into());
765
766 continue;
767 }
768
769 // Only send up to the max frame length
770 let len = cmp::min(sz, max_len);
771
772 // Only send up to the stream's window capacity
773 let len =
774 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
775
776 // There *must* be be enough connection level
777 // capacity at this point.
778 debug_assert!(len <= self.flow.window_size());
779
780 // Check if the stream level window the peer knows is available. In some
781 // scenarios, maybe the window we know is available but the window which
782 // peer knows is not.
783 if len > 0 && len > stream.send_flow.window_size() {
784 stream.pending_send.push_front(buffer, frame.into());
785 continue;
786 }
787
788 tracing::trace!(len, "sending data frame");
789
790 // Update the flow control
791 tracing::trace_span!("updating stream flow").in_scope(|| {
792 stream.send_data(len, self.max_buffer_size);
793
794 // Assign the capacity back to the connection that
795 // was just consumed from the stream in the previous
796 // line.
797 // TODO: proper error handling
798 let _res = self.flow.assign_capacity(len);
799 debug_assert!(_res.is_ok());
800 });
801
802 let (eos, len) = tracing::trace_span!("updating connection flow")
803 .in_scope(|| {
804 // TODO: proper error handling
805 let _res = self.flow.send_data(len);
806 debug_assert!(_res.is_ok());
807
808 // Wrap the frame's data payload to ensure that the
809 // correct amount of data gets written.
810
811 let eos = frame.is_end_stream();
812 let len = len as usize;
813
814 if frame.payload().remaining() > len {
815 frame.set_end_stream(false);
816 }
817 (eos, len)
818 });
819
820 Frame::Data(frame.map(|buf| Prioritized {
821 inner: buf.take(len),
822 end_of_stream: eos,
823 stream: stream.key(),
824 }))
825 }
826 Some(Frame::PushPromise(pp)) => {
827 let mut pushed =
828 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
829 pushed.is_pending_push = false;
830 // Transition stream from pending_push to pending_open
831 // if possible
832 if !pushed.pending_send.is_empty() {
833 if counts.can_inc_num_send_streams() {
834 counts.inc_num_send_streams(&mut pushed);
835 self.pending_send.push(&mut pushed);
836 } else {
837 self.queue_open(&mut pushed);
838 }
839 }
840 Frame::PushPromise(pp)
841 }
842 Some(frame) => frame.map(|_| {
843 unreachable!(
844 "Frame::map closure will only be called \
845 on DATA frames."
846 )
847 }),
848 None => {
849 if let Some(reason) = stream.state.get_scheduled_reset() {
850 stream.set_reset(reason, Initiator::Library);
851
852 let frame = frame::Reset::new(stream.id, reason);
853 Frame::Reset(frame)
854 } else {
855 // If the stream receives a RESET from the peer, it may have
856 // had data buffered to be sent, but all the frames are cleared
857 // in clear_queue(). Instead of doing O(N) traversal through queue
858 // to remove, lets just ignore the stream here.
859 tracing::trace!("removing dangling stream from pending_send");
860 // Since this should only happen as a consequence of `clear_queue`,
861 // we must be in a closed state of some kind.
862 debug_assert!(stream.state.is_closed());
863 counts.transition_after(stream, is_pending_reset);
864 continue;
865 }
866 }
867 };
868
869 tracing::trace!("pop_frame; frame={:?}", frame);
870
871 if cfg!(debug_assertions) && stream.state.is_idle() {
872 debug_assert!(stream.id > self.last_opened_id);
873 self.last_opened_id = stream.id;
874 }
875
876 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
877 // TODO: Only requeue the sender IF it is ready to send
878 // the next frame. i.e. don't requeue it if the next
879 // frame is a data frame and the stream does not have
880 // any more capacity.
881 self.pending_send.push(&mut stream);
882 }
883
884 counts.transition_after(stream, is_pending_reset);
885
886 return Some(frame);
887 }
888 None => return None,
889 }
890 }
891 }
892
893 fn pop_pending_open<'s>(
894 &mut self,
895 store: &'s mut Store,
896 counts: &mut Counts,
897 ) -> Option<store::Ptr<'s>> {
898 tracing::trace!("schedule_pending_open");
899 // check for any pending open streams
900 if counts.can_inc_num_send_streams() {
901 if let Some(mut stream) = self.pending_open.pop(store) {
902 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
903
904 counts.inc_num_send_streams(&mut stream);
905 stream.notify_send();
906 return Some(stream);
907 }
908 }
909
910 None
911 }
912}
913
914// ===== impl Prioritized =====
915
916impl<B> Buf for Prioritized<B>
917where
918 B: Buf,
919{
920 fn remaining(&self) -> usize {
921 self.inner.remaining()
922 }
923
924 fn chunk(&self) -> &[u8] {
925 self.inner.chunk()
926 }
927
928 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
929 self.inner.chunks_vectored(dst)
930 }
931
932 fn advance(&mut self, cnt: usize) {
933 self.inner.advance(cnt)
934 }
935}
936
937impl<B: Buf> fmt::Debug for Prioritized<B> {
938 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
939 fmt.debug_struct("Prioritized")
940 .field("remaining", &self.inner.get_ref().remaining())
941 .field("end_of_stream", &self.end_of_stream)
942 .field("stream", &self.stream)
943 .finish()
944 }
945}