h2/proto/streams/
prioritize.rs

1use super::store::Resolve;
2use super::*;
3
4use crate::frame::Reason;
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::Take;
10use std::{
11    cmp::{self, Ordering},
12    fmt, io, mem,
13    task::{Context, Poll, Waker},
14};
15
16/// # Warning
17///
18/// Queued streams are ordered by stream ID, as we need to ensure that
19/// lower-numbered streams are sent headers before higher-numbered ones.
20/// This is because "idle" stream IDs – those which have been initiated but
21/// have yet to receive frames – will be implicitly closed on receipt of a
22/// frame on a higher stream ID. If these queues was not ordered by stream
23/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24/// idle stream is opened first.
25#[derive(Debug)]
26pub(super) struct Prioritize {
27    /// Queue of streams waiting for socket capacity to send a frame.
28    pending_send: store::Queue<stream::NextSend>,
29
30    /// Queue of streams waiting for window capacity to produce data.
31    pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33    /// Streams waiting for capacity due to max concurrency
34    ///
35    /// The `SendRequest` handle is `Clone`. This enables initiating requests
36    /// from many tasks. However, offering this capability while supporting
37    /// backpressure at some level is tricky. If there are many `SendRequest`
38    /// handles and a single stream becomes available, which handle gets
39    /// assigned that stream? Maybe that handle is no longer ready to send a
40    /// request.
41    ///
42    /// The strategy used is to allow each `SendRequest` handle one buffered
43    /// request. A `SendRequest` handle is ready to send a request if it has no
44    /// associated buffered requests. This is the same strategy as `mpsc` in the
45    /// futures library.
46    pending_open: store::Queue<stream::NextOpen>,
47
48    /// Connection level flow control governing sent data
49    flow: FlowControl,
50
51    /// Stream ID of the last stream opened.
52    last_opened_id: StreamId,
53
54    /// What `DATA` frame is currently being sent in the codec.
55    in_flight_data_frame: InFlightData,
56
57    /// The maximum amount of bytes a stream should buffer.
58    max_buffer_size: usize,
59}
60
61#[derive(Debug, Eq, PartialEq)]
62enum InFlightData {
63    /// There is no `DATA` frame in flight.
64    Nothing,
65    /// There is a `DATA` frame in flight belonging to the given stream.
66    DataFrame(store::Key),
67    /// There was a `DATA` frame, but the stream's queue was since cleared.
68    Drop,
69}
70
71pub(crate) struct Prioritized<B> {
72    // The buffer
73    inner: Take<B>,
74
75    end_of_stream: bool,
76
77    // The stream that this is associated with
78    stream: store::Key,
79}
80
81// ===== impl Prioritize =====
82
83impl Prioritize {
84    pub fn new(config: &Config) -> Prioritize {
85        let mut flow = FlowControl::new();
86
87        flow.inc_window(config.remote_init_window_sz)
88            .expect("invalid initial window size");
89
90        // TODO: proper error handling
91        let _res = flow.assign_capacity(config.remote_init_window_sz);
92        debug_assert!(_res.is_ok());
93
94        tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96        Prioritize {
97            pending_send: store::Queue::new(),
98            pending_capacity: store::Queue::new(),
99            pending_open: store::Queue::new(),
100            flow,
101            last_opened_id: StreamId::ZERO,
102            in_flight_data_frame: InFlightData::Nothing,
103            max_buffer_size: config.local_max_buffer_size,
104        }
105    }
106
107    pub(crate) fn max_buffer_size(&self) -> usize {
108        self.max_buffer_size
109    }
110
111    /// Queue a frame to be sent to the remote
112    pub fn queue_frame<B>(
113        &mut self,
114        frame: Frame<B>,
115        buffer: &mut Buffer<Frame<B>>,
116        stream: &mut store::Ptr,
117        task: &mut Option<Waker>,
118    ) {
119        let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120        let _e = span.enter();
121        // Queue the frame in the buffer
122        stream.pending_send.push_back(buffer, frame);
123        self.schedule_send(stream, task);
124    }
125
126    pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127        // If the stream is waiting to be opened, nothing more to do.
128        if stream.is_send_ready() {
129            tracing::trace!(?stream.id, "schedule_send");
130            // Queue the stream
131            self.pending_send.push(stream);
132
133            // Notify the connection.
134            if let Some(task) = task.take() {
135                task.wake();
136            }
137        }
138    }
139
140    pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141        self.pending_open.push(stream);
142    }
143
144    /// Send a data frame
145    pub fn send_data<B>(
146        &mut self,
147        frame: frame::Data<B>,
148        buffer: &mut Buffer<Frame<B>>,
149        stream: &mut store::Ptr,
150        counts: &mut Counts,
151        task: &mut Option<Waker>,
152    ) -> Result<(), UserError>
153    where
154        B: Buf,
155    {
156        let sz = frame.payload().remaining();
157
158        if sz > MAX_WINDOW_SIZE as usize {
159            return Err(UserError::PayloadTooBig);
160        }
161
162        let sz = sz as WindowSize;
163
164        if !stream.state.is_send_streaming() {
165            if stream.state.is_closed() {
166                return Err(InactiveStreamId);
167            } else {
168                return Err(UnexpectedFrameType);
169            }
170        }
171
172        // Update the buffered data counter
173        stream.buffered_send_data += sz as usize;
174
175        let span =
176            tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177        let _e = span.enter();
178        tracing::trace!(buffered = stream.buffered_send_data);
179
180        // Implicitly request more send capacity if not enough has been
181        // requested yet.
182        if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183            // Update the target requested capacity
184            stream.requested_send_capacity =
185                cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
187            // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188            // cannot be assigned at the time it is called.
189            //
190            // Streams over the max concurrent count will still call `send_data` so we should be
191            // careful not to put it into `pending_capacity` as it will starve the connection
192            // capacity for other streams
193            if !stream.is_pending_open {
194                self.try_assign_capacity(stream);
195            }
196        }
197
198        if frame.is_end_stream() {
199            stream.state.send_close();
200            self.reserve_capacity(0, stream, counts);
201        }
202
203        tracing::trace!(
204            available = %stream.send_flow.available(),
205            buffered = stream.buffered_send_data,
206        );
207
208        // The `stream.buffered_send_data == 0` check is here so that, if a zero
209        // length data frame is queued to the front (there is no previously
210        // queued data), it gets sent out immediately even if there is no
211        // available send window.
212        //
213        // Sending out zero length data frames can be done to signal
214        // end-of-stream.
215        //
216        if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217            // The stream currently has capacity to send the data frame, so
218            // queue it up and notify the connection task.
219            self.queue_frame(frame.into(), buffer, stream, task);
220        } else {
221            // The stream has no capacity to send the frame now, save it but
222            // don't notify the connection task. Once additional capacity
223            // becomes available, the frame will be flushed.
224            stream.pending_send.push_back(buffer, frame.into());
225        }
226
227        Ok(())
228    }
229
230    /// Request capacity to send data
231    pub fn reserve_capacity(
232        &mut self,
233        capacity: WindowSize,
234        stream: &mut store::Ptr,
235        counts: &mut Counts,
236    ) {
237        let span = tracing::trace_span!(
238            "reserve_capacity",
239            ?stream.id,
240            requested = capacity,
241            effective = (capacity as usize) + stream.buffered_send_data,
242            curr = stream.requested_send_capacity
243        );
244        let _e = span.enter();
245
246        // Actual capacity is `capacity` + the current amount of buffered data.
247        // If it were less, then we could never send out the buffered data.
248        let capacity = (capacity as usize) + stream.buffered_send_data;
249
250        match capacity.cmp(&(stream.requested_send_capacity as usize)) {
251            Ordering::Equal => {
252                // Nothing to do
253            }
254            Ordering::Less => {
255                // Update the target requested capacity
256                stream.requested_send_capacity = capacity as WindowSize;
257
258                // Currently available capacity assigned to the stream
259                let available = stream.send_flow.available().as_size();
260
261                // If the stream has more assigned capacity than requested, reclaim
262                // some for the connection
263                if available as usize > capacity {
264                    let diff = available - capacity as WindowSize;
265
266                    // TODO: proper error handling
267                    let _res = stream.send_flow.claim_capacity(diff);
268                    debug_assert!(_res.is_ok());
269
270                    self.assign_connection_capacity(diff, stream, counts);
271                }
272            }
273            Ordering::Greater => {
274                // If trying to *add* capacity, but the stream send side is closed,
275                // there's nothing to be done.
276                if stream.state.is_send_closed() {
277                    return;
278                }
279
280                // Update the target requested capacity
281                stream.requested_send_capacity =
282                    cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
283
284                // Try to assign additional capacity to the stream. If none is
285                // currently available, the stream will be queued to receive some
286                // when more becomes available.
287                self.try_assign_capacity(stream);
288            }
289        }
290    }
291
292    pub fn recv_stream_window_update(
293        &mut self,
294        inc: WindowSize,
295        stream: &mut store::Ptr,
296    ) -> Result<(), Reason> {
297        let span = tracing::trace_span!(
298            "recv_stream_window_update",
299            ?stream.id,
300            ?stream.state,
301            inc,
302            flow = ?stream.send_flow
303        );
304        let _e = span.enter();
305
306        if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
307            // We can't send any data, so don't bother doing anything else.
308            return Ok(());
309        }
310
311        // Update the stream level flow control.
312        stream.send_flow.inc_window(inc)?;
313
314        // If the stream is waiting on additional capacity, then this will
315        // assign it (if available on the connection) and notify the producer
316        self.try_assign_capacity(stream);
317
318        Ok(())
319    }
320
321    pub fn recv_connection_window_update(
322        &mut self,
323        inc: WindowSize,
324        store: &mut Store,
325        counts: &mut Counts,
326    ) -> Result<(), Reason> {
327        // Update the connection's window
328        self.flow.inc_window(inc)?;
329
330        self.assign_connection_capacity(inc, store, counts);
331        Ok(())
332    }
333
334    /// Reclaim all capacity assigned to the stream and re-assign it to the
335    /// connection
336    pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
337        let available = stream.send_flow.available().as_size();
338        if available > 0 {
339            // TODO: proper error handling
340            let _res = stream.send_flow.claim_capacity(available);
341            debug_assert!(_res.is_ok());
342            // Re-assign all capacity to the connection
343            self.assign_connection_capacity(available, stream, counts);
344        }
345    }
346
347    /// Reclaim just reserved capacity, not buffered capacity, and re-assign
348    /// it to the connection
349    pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350        // only reclaim reserved capacity that isn't already buffered
351        if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
352            let reserved =
353                stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;
354
355            // Panic safety: due to how `reserved` is computed it can't be greater
356            // than what's available.
357            stream
358                .send_flow
359                .claim_capacity(reserved)
360                .expect("window size should be greater than reserved");
361
362            self.assign_connection_capacity(reserved, stream, counts);
363        }
364    }
365
366    pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
367        let span = tracing::trace_span!("clear_pending_capacity");
368        let _e = span.enter();
369        while let Some(stream) = self.pending_capacity.pop(store) {
370            counts.transition(stream, |_, stream| {
371                tracing::trace!(?stream.id, "clear_pending_capacity");
372            })
373        }
374    }
375
376    pub fn assign_connection_capacity<R>(
377        &mut self,
378        inc: WindowSize,
379        store: &mut R,
380        counts: &mut Counts,
381    ) where
382        R: Resolve,
383    {
384        let span = tracing::trace_span!("assign_connection_capacity", inc);
385        let _e = span.enter();
386
387        // TODO: proper error handling
388        let _res = self.flow.assign_capacity(inc);
389        debug_assert!(_res.is_ok());
390
391        // Assign newly acquired capacity to streams pending capacity.
392        while self.flow.available() > 0 {
393            let stream = match self.pending_capacity.pop(store) {
394                Some(stream) => stream,
395                None => return,
396            };
397
398            // Streams pending capacity may have been reset before capacity
399            // became available. In that case, the stream won't want any
400            // capacity, and so we shouldn't "transition" on it, but just evict
401            // it and continue the loop.
402            if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
403                continue;
404            }
405
406            counts.transition(stream, |_, stream| {
407                // Try to assign capacity to the stream. This will also re-queue the
408                // stream if there isn't enough connection level capacity to fulfill
409                // the capacity request.
410                self.try_assign_capacity(stream);
411            })
412        }
413    }
414
415    /// Request capacity to send data
416    fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
417        let total_requested = stream.requested_send_capacity;
418
419        // Total requested should never go below actual assigned
420        // (Note: the window size can go lower than assigned)
421        debug_assert!(stream.send_flow.available() <= total_requested as usize);
422
423        // The amount of additional capacity that the stream requests.
424        // Don't assign more than the window has available!
425        let additional = cmp::min(
426            total_requested - stream.send_flow.available().as_size(),
427            // Can't assign more than what is available
428            stream.send_flow.window_size() - stream.send_flow.available().as_size(),
429        );
430        let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
431        let _e = span.enter();
432        tracing::trace!(
433            requested = total_requested,
434            additional,
435            buffered = stream.buffered_send_data,
436            window = stream.send_flow.window_size(),
437            conn = %self.flow.available()
438        );
439
440        if additional == 0 {
441            // Nothing more to do
442            return;
443        }
444
445        // If the stream has requested capacity, then it must be in the
446        // streaming state (more data could be sent) or there is buffered data
447        // waiting to be sent.
448        debug_assert!(
449            stream.state.is_send_streaming() || stream.buffered_send_data > 0,
450            "state={:?}",
451            stream.state
452        );
453
454        // The amount of currently available capacity on the connection
455        let conn_available = self.flow.available().as_size();
456
457        // First check if capacity is immediately available
458        if conn_available > 0 {
459            // The amount of capacity to assign to the stream
460            // TODO: Should prioritization factor into this?
461            let assign = cmp::min(conn_available, additional);
462
463            tracing::trace!(capacity = assign, "assigning");
464
465            // Assign the capacity to the stream
466            stream.assign_capacity(assign, self.max_buffer_size);
467
468            // Claim the capacity from the connection
469            // TODO: proper error handling
470            let _res = self.flow.claim_capacity(assign);
471            debug_assert!(_res.is_ok());
472        }
473
474        tracing::trace!(
475            available = %stream.send_flow.available(),
476            requested = stream.requested_send_capacity,
477            buffered = stream.buffered_send_data,
478            has_unavailable = %stream.send_flow.has_unavailable()
479        );
480
481        if stream.send_flow.available() < stream.requested_send_capacity as usize
482            && stream.send_flow.has_unavailable()
483        {
484            // The stream requires additional capacity and the stream's
485            // window has available capacity, but the connection window
486            // does not.
487            //
488            // In this case, the stream needs to be queued up for when the
489            // connection has more capacity.
490            self.pending_capacity.push(stream);
491        }
492
493        // If data is buffered and the stream is send ready, then
494        // schedule the stream for execution
495        if stream.buffered_send_data > 0 && stream.is_send_ready() {
496            // TODO: This assertion isn't *exactly* correct. There can still be
497            // buffered send data while the stream's pending send queue is
498            // empty. This can happen when a large data frame is in the process
499            // of being **partially** sent. Once the window has been sent, the
500            // data frame will be returned to the prioritization layer to be
501            // re-scheduled.
502            //
503            // That said, it would be nice to figure out how to make this
504            // assertion correctly.
505            //
506            // debug_assert!(!stream.pending_send.is_empty());
507
508            self.pending_send.push(stream);
509        }
510    }
511
512    pub fn poll_complete<T, B>(
513        &mut self,
514        cx: &mut Context,
515        buffer: &mut Buffer<Frame<B>>,
516        store: &mut Store,
517        counts: &mut Counts,
518        dst: &mut Codec<T, Prioritized<B>>,
519    ) -> Poll<io::Result<()>>
520    where
521        T: AsyncWrite + Unpin,
522        B: Buf,
523    {
524        // Ensure codec is ready
525        ready!(dst.poll_ready(cx))?;
526
527        // Reclaim any frame that has previously been written
528        self.reclaim_frame(buffer, store, dst);
529
530        // The max frame length
531        let max_frame_len = dst.max_send_frame_size();
532
533        tracing::trace!("poll_complete");
534
535        loop {
536            if let Some(mut stream) = self.pop_pending_open(store, counts) {
537                self.pending_send.push_front(&mut stream);
538                self.try_assign_capacity(&mut stream);
539            }
540
541            match self.pop_frame(buffer, store, max_frame_len, counts) {
542                Some(frame) => {
543                    tracing::trace!(?frame, "writing");
544
545                    debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
546                    if let Frame::Data(ref frame) = frame {
547                        self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
548                    }
549                    dst.buffer(frame).expect("invalid frame");
550
551                    // Ensure the codec is ready to try the loop again.
552                    ready!(dst.poll_ready(cx))?;
553
554                    // Because, always try to reclaim...
555                    self.reclaim_frame(buffer, store, dst);
556                }
557                None => {
558                    // Try to flush the codec.
559                    ready!(dst.flush(cx))?;
560
561                    // This might release a data frame...
562                    if !self.reclaim_frame(buffer, store, dst) {
563                        return Poll::Ready(Ok(()));
564                    }
565
566                    // No need to poll ready as poll_complete() does this for
567                    // us...
568                }
569            }
570        }
571    }
572
573    /// Tries to reclaim a pending data frame from the codec.
574    ///
575    /// Returns true if a frame was reclaimed.
576    ///
577    /// When a data frame is written to the codec, it may not be written in its
578    /// entirety (large chunks are split up into potentially many data frames).
579    /// In this case, the stream needs to be reprioritized.
580    fn reclaim_frame<T, B>(
581        &mut self,
582        buffer: &mut Buffer<Frame<B>>,
583        store: &mut Store,
584        dst: &mut Codec<T, Prioritized<B>>,
585    ) -> bool
586    where
587        B: Buf,
588    {
589        let span = tracing::trace_span!("try_reclaim_frame");
590        let _e = span.enter();
591
592        // First check if there are any data chunks to take back
593        if let Some(frame) = dst.take_last_data_frame() {
594            self.reclaim_frame_inner(buffer, store, frame)
595        } else {
596            false
597        }
598    }
599
600    fn reclaim_frame_inner<B>(
601        &mut self,
602        buffer: &mut Buffer<Frame<B>>,
603        store: &mut Store,
604        frame: frame::Data<Prioritized<B>>,
605    ) -> bool
606    where
607        B: Buf,
608    {
609        tracing::trace!(
610            ?frame,
611            sz = frame.payload().inner.get_ref().remaining(),
612            "reclaimed"
613        );
614
615        let mut eos = false;
616        let key = frame.payload().stream;
617
618        match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
619            InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
620            InFlightData::Drop => {
621                tracing::trace!("not reclaiming frame for cancelled stream");
622                return false;
623            }
624            InFlightData::DataFrame(k) => {
625                debug_assert_eq!(k, key);
626            }
627        }
628
629        let mut frame = frame.map(|prioritized| {
630            // TODO: Ensure fully written
631            eos = prioritized.end_of_stream;
632            prioritized.inner.into_inner()
633        });
634
635        if frame.payload().has_remaining() {
636            let mut stream = store.resolve(key);
637
638            if eos {
639                frame.set_end_stream(true);
640            }
641
642            self.push_back_frame(frame.into(), buffer, &mut stream);
643
644            return true;
645        }
646
647        false
648    }
649
650    /// Push the frame to the front of the stream's deque, scheduling the
651    /// stream if needed.
652    fn push_back_frame<B>(
653        &mut self,
654        frame: Frame<B>,
655        buffer: &mut Buffer<Frame<B>>,
656        stream: &mut store::Ptr,
657    ) {
658        // Push the frame to the front of the stream's deque
659        stream.pending_send.push_front(buffer, frame);
660
661        // If needed, schedule the sender
662        if stream.send_flow.available() > 0 {
663            debug_assert!(!stream.pending_send.is_empty());
664            self.pending_send.push(stream);
665        }
666    }
667
668    pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
669        let span = tracing::trace_span!("clear_queue", ?stream.id);
670        let _e = span.enter();
671
672        // TODO: make this more efficient?
673        while let Some(frame) = stream.pending_send.pop_front(buffer) {
674            tracing::trace!(?frame, "dropping");
675        }
676
677        stream.buffered_send_data = 0;
678        stream.requested_send_capacity = 0;
679        if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
680            if stream.key() == key {
681                // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
682                self.in_flight_data_frame = InFlightData::Drop;
683            }
684        }
685    }
686
687    pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
688        while let Some(mut stream) = self.pending_send.pop(store) {
689            let is_pending_reset = stream.is_pending_reset_expiration();
690            if let Some(reason) = stream.state.get_scheduled_reset() {
691                stream.set_reset(reason, Initiator::Library);
692            }
693            counts.transition_after(stream, is_pending_reset);
694        }
695    }
696
697    pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
698        while let Some(stream) = self.pending_open.pop(store) {
699            let is_pending_reset = stream.is_pending_reset_expiration();
700            counts.transition_after(stream, is_pending_reset);
701        }
702    }
703
704    fn pop_frame<B>(
705        &mut self,
706        buffer: &mut Buffer<Frame<B>>,
707        store: &mut Store,
708        max_len: usize,
709        counts: &mut Counts,
710    ) -> Option<Frame<Prioritized<B>>>
711    where
712        B: Buf,
713    {
714        let span = tracing::trace_span!("pop_frame");
715        let _e = span.enter();
716
717        loop {
718            match self.pending_send.pop(store) {
719                Some(mut stream) => {
720                    let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
721                    let _e = span.enter();
722
723                    // It's possible that this stream, besides having data to send,
724                    // is also queued to send a reset, and thus is already in the queue
725                    // to wait for "some time" after a reset.
726                    //
727                    // To be safe, we just always ask the stream.
728                    let is_pending_reset = stream.is_pending_reset_expiration();
729
730                    tracing::trace!(is_pending_reset);
731
732                    let frame = match stream.pending_send.pop_front(buffer) {
733                        Some(Frame::Data(mut frame)) => {
734                            // Get the amount of capacity remaining for stream's
735                            // window.
736                            let stream_capacity = stream.send_flow.available();
737                            let sz = frame.payload().remaining();
738
739                            tracing::trace!(
740                                sz,
741                                eos = frame.is_end_stream(),
742                                window = %stream_capacity,
743                                available = %stream.send_flow.available(),
744                                requested = stream.requested_send_capacity,
745                                buffered = stream.buffered_send_data,
746                                "data frame"
747                            );
748
749                            // Zero length data frames always have capacity to
750                            // be sent.
751                            if sz > 0 && stream_capacity == 0 {
752                                tracing::trace!("stream capacity is 0");
753
754                                // Ensure that the stream is waiting for
755                                // connection level capacity
756                                //
757                                // TODO: uncomment
758                                // debug_assert!(stream.is_pending_send_capacity);
759
760                                // The stream has no more capacity, this can
761                                // happen if the remote reduced the stream
762                                // window. In this case, we need to buffer the
763                                // frame and wait for a window update...
764                                stream.pending_send.push_front(buffer, frame.into());
765
766                                continue;
767                            }
768
769                            // Only send up to the max frame length
770                            let len = cmp::min(sz, max_len);
771
772                            // Only send up to the stream's window capacity
773                            let len =
774                                cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
775
776                            // There *must* be be enough connection level
777                            // capacity at this point.
778                            debug_assert!(len <= self.flow.window_size());
779
780                            // Check if the stream level window the peer knows is available. In some
781                            // scenarios, maybe the window we know is available but the window which
782                            // peer knows is not.
783                            if len > 0 && len > stream.send_flow.window_size() {
784                                stream.pending_send.push_front(buffer, frame.into());
785                                continue;
786                            }
787
788                            tracing::trace!(len, "sending data frame");
789
790                            // Update the flow control
791                            tracing::trace_span!("updating stream flow").in_scope(|| {
792                                stream.send_data(len, self.max_buffer_size);
793
794                                // Assign the capacity back to the connection that
795                                // was just consumed from the stream in the previous
796                                // line.
797                                // TODO: proper error handling
798                                let _res = self.flow.assign_capacity(len);
799                                debug_assert!(_res.is_ok());
800                            });
801
802                            let (eos, len) = tracing::trace_span!("updating connection flow")
803                                .in_scope(|| {
804                                    // TODO: proper error handling
805                                    let _res = self.flow.send_data(len);
806                                    debug_assert!(_res.is_ok());
807
808                                    // Wrap the frame's data payload to ensure that the
809                                    // correct amount of data gets written.
810
811                                    let eos = frame.is_end_stream();
812                                    let len = len as usize;
813
814                                    if frame.payload().remaining() > len {
815                                        frame.set_end_stream(false);
816                                    }
817                                    (eos, len)
818                                });
819
820                            Frame::Data(frame.map(|buf| Prioritized {
821                                inner: buf.take(len),
822                                end_of_stream: eos,
823                                stream: stream.key(),
824                            }))
825                        }
826                        Some(Frame::PushPromise(pp)) => {
827                            let mut pushed =
828                                stream.store_mut().find_mut(&pp.promised_id()).unwrap();
829                            pushed.is_pending_push = false;
830                            // Transition stream from pending_push to pending_open
831                            // if possible
832                            if !pushed.pending_send.is_empty() {
833                                if counts.can_inc_num_send_streams() {
834                                    counts.inc_num_send_streams(&mut pushed);
835                                    self.pending_send.push(&mut pushed);
836                                } else {
837                                    self.queue_open(&mut pushed);
838                                }
839                            }
840                            Frame::PushPromise(pp)
841                        }
842                        Some(frame) => frame.map(|_| {
843                            unreachable!(
844                                "Frame::map closure will only be called \
845                                 on DATA frames."
846                            )
847                        }),
848                        None => {
849                            if let Some(reason) = stream.state.get_scheduled_reset() {
850                                stream.set_reset(reason, Initiator::Library);
851
852                                let frame = frame::Reset::new(stream.id, reason);
853                                Frame::Reset(frame)
854                            } else {
855                                // If the stream receives a RESET from the peer, it may have
856                                // had data buffered to be sent, but all the frames are cleared
857                                // in clear_queue(). Instead of doing O(N) traversal through queue
858                                // to remove, lets just ignore the stream here.
859                                tracing::trace!("removing dangling stream from pending_send");
860                                // Since this should only happen as a consequence of `clear_queue`,
861                                // we must be in a closed state of some kind.
862                                debug_assert!(stream.state.is_closed());
863                                counts.transition_after(stream, is_pending_reset);
864                                continue;
865                            }
866                        }
867                    };
868
869                    tracing::trace!("pop_frame; frame={:?}", frame);
870
871                    if cfg!(debug_assertions) && stream.state.is_idle() {
872                        debug_assert!(stream.id > self.last_opened_id);
873                        self.last_opened_id = stream.id;
874                    }
875
876                    if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
877                        // TODO: Only requeue the sender IF it is ready to send
878                        // the next frame. i.e. don't requeue it if the next
879                        // frame is a data frame and the stream does not have
880                        // any more capacity.
881                        self.pending_send.push(&mut stream);
882                    }
883
884                    counts.transition_after(stream, is_pending_reset);
885
886                    return Some(frame);
887                }
888                None => return None,
889            }
890        }
891    }
892
893    fn pop_pending_open<'s>(
894        &mut self,
895        store: &'s mut Store,
896        counts: &mut Counts,
897    ) -> Option<store::Ptr<'s>> {
898        tracing::trace!("schedule_pending_open");
899        // check for any pending open streams
900        if counts.can_inc_num_send_streams() {
901            if let Some(mut stream) = self.pending_open.pop(store) {
902                tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
903
904                counts.inc_num_send_streams(&mut stream);
905                stream.notify_send();
906                return Some(stream);
907            }
908        }
909
910        None
911    }
912}
913
914// ===== impl Prioritized =====
915
916impl<B> Buf for Prioritized<B>
917where
918    B: Buf,
919{
920    fn remaining(&self) -> usize {
921        self.inner.remaining()
922    }
923
924    fn chunk(&self) -> &[u8] {
925        self.inner.chunk()
926    }
927
928    fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
929        self.inner.chunks_vectored(dst)
930    }
931
932    fn advance(&mut self, cnt: usize) {
933        self.inner.advance(cnt)
934    }
935}
936
937impl<B: Buf> fmt::Debug for Prioritized<B> {
938    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
939        fmt.debug_struct("Prioritized")
940            .field("remaining", &self.inner.get_ref().remaining())
941            .field("end_of_stream", &self.end_of_stream)
942            .field("stream", &self.stream)
943            .finish()
944    }
945}