h2/proto/streams/
stream.rs

1use crate::Reason;
2
3use super::*;
4
5use std::fmt;
6use std::task::{Context, Waker};
7use std::time::Instant;
8
9/// Tracks Stream related state
10///
11/// # Reference counting
12///
13/// There can be a number of outstanding handles to a single Stream. These are
14/// tracked using reference counting. The `ref_count` field represents the
15/// number of outstanding userspace handles that can reach this stream.
16///
17/// It's important to note that when the stream is placed in an internal queue
18/// (such as an accept queue), this is **not** tracked by a reference count.
19/// Thus, `ref_count` can be zero and the stream still has to be kept around.
20pub(super) struct Stream {
21    /// The h2 stream identifier
22    pub id: StreamId,
23
24    /// Current state of the stream
25    pub state: State,
26
27    /// Set to `true` when the stream is counted against the connection's max
28    /// concurrent streams.
29    pub is_counted: bool,
30
31    /// Number of outstanding handles pointing to this stream
32    pub ref_count: usize,
33
34    // ===== Fields related to sending =====
35    /// Next node in the accept linked list
36    pub next_pending_send: Option<store::Key>,
37
38    /// Set to true when the stream is pending accept
39    pub is_pending_send: bool,
40
41    /// Send data flow control
42    pub send_flow: FlowControl,
43
44    /// Amount of send capacity that has been requested, but not yet allocated.
45    pub requested_send_capacity: WindowSize,
46
47    /// Amount of data buffered at the prioritization layer.
48    /// TODO: Technically this could be greater than the window size...
49    pub buffered_send_data: usize,
50
51    /// Task tracking additional send capacity (i.e. window updates).
52    send_task: Option<Waker>,
53
54    /// Frames pending for this stream being sent to the socket
55    pub pending_send: buffer::Deque,
56
57    /// Next node in the linked list of streams waiting for additional
58    /// connection level capacity.
59    pub next_pending_send_capacity: Option<store::Key>,
60
61    /// True if the stream is waiting for outbound connection capacity
62    pub is_pending_send_capacity: bool,
63
64    /// Set to true when the send capacity has been incremented
65    pub send_capacity_inc: bool,
66
67    /// Next node in the open linked list
68    pub next_open: Option<store::Key>,
69
70    /// Set to true when the stream is pending to be opened
71    pub is_pending_open: bool,
72
73    /// Set to true when a push is pending for this stream
74    pub is_pending_push: bool,
75
76    // ===== Fields related to receiving =====
77    /// Next node in the accept linked list
78    pub next_pending_accept: Option<store::Key>,
79
80    /// Set to true when the stream is pending accept
81    pub is_pending_accept: bool,
82
83    /// Receive data flow control
84    pub recv_flow: FlowControl,
85
86    pub in_flight_recv_data: WindowSize,
87
88    /// Next node in the linked list of streams waiting to send window updates.
89    pub next_window_update: Option<store::Key>,
90
91    /// True if the stream is waiting to send a window update
92    pub is_pending_window_update: bool,
93
94    /// The time when this stream may have been locally reset.
95    pub reset_at: Option<Instant>,
96
97    /// Next node in list of reset streams that should expire eventually
98    pub next_reset_expire: Option<store::Key>,
99
100    /// Frames pending for this stream to read
101    pub pending_recv: buffer::Deque,
102
103    /// When the RecvStream drop occurs, no data should be received.
104    pub is_recv: bool,
105
106    /// Task tracking receiving frames
107    pub recv_task: Option<Waker>,
108
109    /// Task tracking pushed promises.
110    pub push_task: Option<Waker>,
111
112    /// The stream's pending push promises
113    pub pending_push_promises: store::Queue<NextAccept>,
114
115    /// Validate content-length headers
116    pub content_length: ContentLength,
117}
118
119/// State related to validating a stream's content-length
120#[derive(Debug)]
121pub enum ContentLength {
122    Omitted,
123    Head,
124    Remaining(u64),
125}
126
127#[derive(Debug)]
128pub(super) struct NextAccept;
129
130#[derive(Debug)]
131pub(super) struct NextSend;
132
133#[derive(Debug)]
134pub(super) struct NextSendCapacity;
135
136#[derive(Debug)]
137pub(super) struct NextWindowUpdate;
138
139#[derive(Debug)]
140pub(super) struct NextOpen;
141
142#[derive(Debug)]
143pub(super) struct NextResetExpire;
144
145impl Stream {
146    pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
147        let mut send_flow = FlowControl::new();
148        let mut recv_flow = FlowControl::new();
149
150        recv_flow
151            .inc_window(init_recv_window)
152            .expect("invalid initial receive window");
153        // TODO: proper error handling?
154        let _res = recv_flow.assign_capacity(init_recv_window);
155        debug_assert!(_res.is_ok());
156
157        send_flow
158            .inc_window(init_send_window)
159            .expect("invalid initial send window size");
160
161        Stream {
162            id,
163            state: State::default(),
164            ref_count: 0,
165            is_counted: false,
166
167            // ===== Fields related to sending =====
168            next_pending_send: None,
169            is_pending_send: false,
170            send_flow,
171            requested_send_capacity: 0,
172            buffered_send_data: 0,
173            send_task: None,
174            pending_send: buffer::Deque::new(),
175            is_pending_send_capacity: false,
176            next_pending_send_capacity: None,
177            send_capacity_inc: false,
178            is_pending_open: false,
179            next_open: None,
180            is_pending_push: false,
181
182            // ===== Fields related to receiving =====
183            next_pending_accept: None,
184            is_pending_accept: false,
185            recv_flow,
186            in_flight_recv_data: 0,
187            next_window_update: None,
188            is_pending_window_update: false,
189            reset_at: None,
190            next_reset_expire: None,
191            pending_recv: buffer::Deque::new(),
192            is_recv: true,
193            recv_task: None,
194            push_task: None,
195            pending_push_promises: store::Queue::new(),
196            content_length: ContentLength::Omitted,
197        }
198    }
199
200    /// Increment the stream's ref count
201    pub fn ref_inc(&mut self) {
202        assert!(self.ref_count < usize::MAX);
203        self.ref_count += 1;
204    }
205
206    /// Decrements the stream's ref count
207    pub fn ref_dec(&mut self) {
208        assert!(self.ref_count > 0);
209        self.ref_count -= 1;
210    }
211
212    /// Returns true if stream is currently being held for some time because of
213    /// a local reset.
214    pub fn is_pending_reset_expiration(&self) -> bool {
215        self.reset_at.is_some()
216    }
217
218    /// Returns true if frames for this stream are ready to be sent over the wire
219    pub fn is_send_ready(&self) -> bool {
220        // Why do we check pending_open?
221        //
222        // We allow users to call send_request() which schedules a stream to be pending_open
223        // if there is no room according to the concurrency limit (max_send_streams), and we
224        // also allow data to be buffered for send with send_data() if there is no capacity for
225        // the stream to send the data, which attempts to place the stream in pending_send.
226        // If the stream is not open, we don't want the stream to be scheduled for
227        // execution (pending_send). Note that if the stream is in pending_open, it will be
228        // pushed to pending_send when there is room for an open stream.
229        //
230        // In pending_push we track whether a PushPromise still needs to be sent
231        // from a different stream before we can start sending frames on this one.
232        // This is different from the "open" check because reserved streams don't count
233        // toward the concurrency limit.
234        // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
235        !self.is_pending_open && !self.is_pending_push
236    }
237
238    /// Returns true if the stream is closed
239    pub fn is_closed(&self) -> bool {
240        // The state has fully transitioned to closed.
241        self.state.is_closed() &&
242            // Because outbound frames transition the stream state before being
243            // buffered, we have to ensure that all frames have been flushed.
244            self.pending_send.is_empty() &&
245            // Sometimes large data frames are sent out in chunks. After a chunk
246            // of the frame is sent, the remainder is pushed back onto the send
247            // queue to be rescheduled.
248            //
249            // Checking for additional buffered data lets us catch this case.
250            self.buffered_send_data == 0
251    }
252
253    /// Returns true if the stream is no longer in use
254    pub fn is_released(&self) -> bool {
255        // The stream is closed and fully flushed
256        self.is_closed() &&
257            // There are no more outstanding references to the stream
258            self.ref_count == 0 &&
259            // The stream is not in any queue
260            !self.is_pending_send && !self.is_pending_send_capacity &&
261            !self.is_pending_accept && !self.is_pending_window_update &&
262            !self.is_pending_open && self.reset_at.is_none()
263    }
264
265    /// Returns true when the consumer of the stream has dropped all handles
266    /// (indicating no further interest in the stream) and the stream state is
267    /// not actually closed.
268    ///
269    /// In this case, a reset should be sent.
270    pub fn is_canceled_interest(&self) -> bool {
271        self.ref_count == 0 && !self.state.is_closed()
272    }
273
274    /// Current available stream send capacity
275    pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
276        let available = self.send_flow.available().as_size() as usize;
277        let buffered = self.buffered_send_data;
278
279        available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
280    }
281
282    pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
283        let prev_capacity = self.capacity(max_buffer_size);
284        debug_assert!(capacity > 0);
285        // TODO: proper error handling
286        let _res = self.send_flow.assign_capacity(capacity);
287        debug_assert!(_res.is_ok());
288
289        tracing::trace!(
290            "  assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
291            self.send_flow.available(),
292            self.buffered_send_data,
293            self.id,
294            max_buffer_size,
295            prev_capacity,
296        );
297
298        if prev_capacity < self.capacity(max_buffer_size) {
299            self.notify_capacity();
300        }
301    }
302
303    pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
304        let prev_capacity = self.capacity(max_buffer_size);
305
306        // TODO: proper error handling
307        let _res = self.send_flow.send_data(len);
308        debug_assert!(_res.is_ok());
309
310        // Decrement the stream's buffered data counter
311        debug_assert!(self.buffered_send_data >= len as usize);
312        self.buffered_send_data -= len as usize;
313        self.requested_send_capacity -= len;
314
315        tracing::trace!(
316            "  sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
317            self.send_flow.available(),
318            self.buffered_send_data,
319            self.id,
320            max_buffer_size,
321            prev_capacity,
322        );
323
324        if prev_capacity < self.capacity(max_buffer_size) {
325            self.notify_capacity();
326        }
327    }
328
329    /// If the capacity was limited because of the max_send_buffer_size,
330    /// then consider waking the send task again...
331    pub fn notify_capacity(&mut self) {
332        self.send_capacity_inc = true;
333        tracing::trace!("  notifying task");
334        self.notify_send();
335    }
336
337    /// Returns `Err` when the decrement cannot be completed due to overflow.
338    pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
339        match self.content_length {
340            ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
341                Some(val) => *rem = val,
342                None => return Err(()),
343            },
344            ContentLength::Head => {
345                if len != 0 {
346                    return Err(());
347                }
348            }
349            _ => {}
350        }
351
352        Ok(())
353    }
354
355    pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
356        match self.content_length {
357            ContentLength::Remaining(0) => Ok(()),
358            ContentLength::Remaining(_) => Err(()),
359            _ => Ok(()),
360        }
361    }
362
363    pub fn notify_send(&mut self) {
364        if let Some(task) = self.send_task.take() {
365            task.wake();
366        }
367    }
368
369    pub fn wait_send(&mut self, cx: &Context) {
370        self.send_task = Some(cx.waker().clone());
371    }
372
373    pub fn notify_recv(&mut self) {
374        if let Some(task) = self.recv_task.take() {
375            task.wake();
376        }
377    }
378
379    pub(super) fn notify_push(&mut self) {
380        if let Some(task) = self.push_task.take() {
381            task.wake();
382        }
383    }
384
385    /// Set the stream's state to `Closed` with the given reason and initiator.
386    /// Notify the send and receive tasks, if they exist.
387    pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388        self.state.set_reset(self.id, reason, initiator);
389        self.notify_push();
390        self.notify_recv();
391    }
392}
393
394impl fmt::Debug for Stream {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        f.debug_struct("Stream")
397            .field("id", &self.id)
398            .field("state", &self.state)
399            .field("is_counted", &self.is_counted)
400            .field("ref_count", &self.ref_count)
401            .field("next_pending_send", &self.next_pending_send)
402            .field("is_pending_send", &self.is_pending_send)
403            .field("send_flow", &self.send_flow)
404            .field("requested_send_capacity", &self.requested_send_capacity)
405            .field("buffered_send_data", &self.buffered_send_data)
406            .field("send_task", &self.send_task.as_ref().map(|_| ()))
407            .field("pending_send", &self.pending_send)
408            .field(
409                "next_pending_send_capacity",
410                &self.next_pending_send_capacity,
411            )
412            .field("is_pending_send_capacity", &self.is_pending_send_capacity)
413            .field("send_capacity_inc", &self.send_capacity_inc)
414            .field("next_open", &self.next_open)
415            .field("is_pending_open", &self.is_pending_open)
416            .field("is_pending_push", &self.is_pending_push)
417            .field("next_pending_accept", &self.next_pending_accept)
418            .field("is_pending_accept", &self.is_pending_accept)
419            .field("recv_flow", &self.recv_flow)
420            .field("in_flight_recv_data", &self.in_flight_recv_data)
421            .field("next_window_update", &self.next_window_update)
422            .field("is_pending_window_update", &self.is_pending_window_update)
423            .field("reset_at", &self.reset_at)
424            .field("next_reset_expire", &self.next_reset_expire)
425            .field("pending_recv", &self.pending_recv)
426            .field("is_recv", &self.is_recv)
427            .field("recv_task", &self.recv_task.as_ref().map(|_| ()))
428            .field("push_task", &self.push_task.as_ref().map(|_| ()))
429            .field("pending_push_promises", &self.pending_push_promises)
430            .field("content_length", &self.content_length)
431            .finish()
432    }
433}
434
435impl store::Next for NextAccept {
436    fn next(stream: &Stream) -> Option<store::Key> {
437        stream.next_pending_accept
438    }
439
440    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
441        stream.next_pending_accept = key;
442    }
443
444    fn take_next(stream: &mut Stream) -> Option<store::Key> {
445        stream.next_pending_accept.take()
446    }
447
448    fn is_queued(stream: &Stream) -> bool {
449        stream.is_pending_accept
450    }
451
452    fn set_queued(stream: &mut Stream, val: bool) {
453        stream.is_pending_accept = val;
454    }
455}
456
457impl store::Next for NextSend {
458    fn next(stream: &Stream) -> Option<store::Key> {
459        stream.next_pending_send
460    }
461
462    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
463        stream.next_pending_send = key;
464    }
465
466    fn take_next(stream: &mut Stream) -> Option<store::Key> {
467        stream.next_pending_send.take()
468    }
469
470    fn is_queued(stream: &Stream) -> bool {
471        stream.is_pending_send
472    }
473
474    fn set_queued(stream: &mut Stream, val: bool) {
475        if val {
476            // ensure that stream is not queued for being opened
477            // if it's being put into queue for sending data
478            debug_assert!(!stream.is_pending_open);
479        }
480        stream.is_pending_send = val;
481    }
482}
483
484impl store::Next for NextSendCapacity {
485    fn next(stream: &Stream) -> Option<store::Key> {
486        stream.next_pending_send_capacity
487    }
488
489    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
490        stream.next_pending_send_capacity = key;
491    }
492
493    fn take_next(stream: &mut Stream) -> Option<store::Key> {
494        stream.next_pending_send_capacity.take()
495    }
496
497    fn is_queued(stream: &Stream) -> bool {
498        stream.is_pending_send_capacity
499    }
500
501    fn set_queued(stream: &mut Stream, val: bool) {
502        stream.is_pending_send_capacity = val;
503    }
504}
505
506impl store::Next for NextWindowUpdate {
507    fn next(stream: &Stream) -> Option<store::Key> {
508        stream.next_window_update
509    }
510
511    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
512        stream.next_window_update = key;
513    }
514
515    fn take_next(stream: &mut Stream) -> Option<store::Key> {
516        stream.next_window_update.take()
517    }
518
519    fn is_queued(stream: &Stream) -> bool {
520        stream.is_pending_window_update
521    }
522
523    fn set_queued(stream: &mut Stream, val: bool) {
524        stream.is_pending_window_update = val;
525    }
526}
527
528impl store::Next for NextOpen {
529    fn next(stream: &Stream) -> Option<store::Key> {
530        stream.next_open
531    }
532
533    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
534        stream.next_open = key;
535    }
536
537    fn take_next(stream: &mut Stream) -> Option<store::Key> {
538        stream.next_open.take()
539    }
540
541    fn is_queued(stream: &Stream) -> bool {
542        stream.is_pending_open
543    }
544
545    fn set_queued(stream: &mut Stream, val: bool) {
546        if val {
547            // ensure that stream is not queued for being sent
548            // if it's being put into queue for opening the stream
549            debug_assert!(!stream.is_pending_send);
550        }
551        stream.is_pending_open = val;
552    }
553}
554
555impl store::Next for NextResetExpire {
556    fn next(stream: &Stream) -> Option<store::Key> {
557        stream.next_reset_expire
558    }
559
560    fn set_next(stream: &mut Stream, key: Option<store::Key>) {
561        stream.next_reset_expire = key;
562    }
563
564    fn take_next(stream: &mut Stream) -> Option<store::Key> {
565        stream.next_reset_expire.take()
566    }
567
568    fn is_queued(stream: &Stream) -> bool {
569        stream.reset_at.is_some()
570    }
571
572    fn set_queued(stream: &mut Stream, val: bool) {
573        if val {
574            stream.reset_at = Some(Instant::now());
575        } else {
576            stream.reset_at = None;
577        }
578    }
579}
580
581// ===== impl ContentLength =====
582
583impl ContentLength {
584    pub fn is_head(&self) -> bool {
585        matches!(*self, Self::Head)
586    }
587}