h2/proto/streams/
counts.rs

1use super::*;
2
3#[derive(Debug)]
4pub(super) struct Counts {
5    /// Acting as a client or server. This allows us to track which values to
6    /// inc / dec.
7    peer: peer::Dyn,
8
9    /// Maximum number of locally initiated streams
10    max_send_streams: usize,
11
12    /// Current number of remote initiated streams
13    num_send_streams: usize,
14
15    /// Maximum number of remote initiated streams
16    max_recv_streams: usize,
17
18    /// Current number of locally initiated streams
19    num_recv_streams: usize,
20
21    /// Maximum number of pending locally reset streams
22    max_local_reset_streams: usize,
23
24    /// Current number of pending locally reset streams
25    num_local_reset_streams: usize,
26
27    /// Max number of "pending accept" streams that were remotely reset
28    max_remote_reset_streams: usize,
29
30    /// Current number of "pending accept" streams that were remotely reset
31    num_remote_reset_streams: usize,
32
33    /// Maximum number of locally reset streams due to protocol error across
34    /// the lifetime of the connection.
35    ///
36    /// When this gets exceeded, we issue GOAWAYs.
37    max_local_error_reset_streams: Option<usize>,
38
39    /// Total number of locally reset streams due to protocol error across the
40    /// lifetime of the connection.
41    num_local_error_reset_streams: usize,
42}
43
44impl Counts {
45    /// Create a new `Counts` using the provided configuration values.
46    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47        Counts {
48            peer,
49            max_send_streams: config.initial_max_send_streams,
50            num_send_streams: 0,
51            max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52            num_recv_streams: 0,
53            max_local_reset_streams: config.local_reset_max,
54            num_local_reset_streams: 0,
55            max_remote_reset_streams: config.remote_reset_max,
56            num_remote_reset_streams: 0,
57            max_local_error_reset_streams: config.local_max_error_reset_streams,
58            num_local_error_reset_streams: 0,
59        }
60    }
61
62    /// Returns true when the next opened stream will reach capacity of outbound streams
63    ///
64    /// The number of client send streams is incremented in prioritize; send_request has to guess if
65    /// it should wait before allowing another request to be sent.
66    pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67        self.max_send_streams <= (self.num_send_streams + 1)
68    }
69
70    /// Returns the current peer
71    pub fn peer(&self) -> peer::Dyn {
72        self.peer
73    }
74
75    pub fn has_streams(&self) -> bool {
76        self.num_send_streams != 0 || self.num_recv_streams != 0
77    }
78
79    /// Returns true if we can issue another local reset due to protocol error.
80    pub fn can_inc_num_local_error_resets(&self) -> bool {
81        if let Some(max) = self.max_local_error_reset_streams {
82            max > self.num_local_error_reset_streams
83        } else {
84            true
85        }
86    }
87
88    pub fn inc_num_local_error_resets(&mut self) {
89        assert!(self.can_inc_num_local_error_resets());
90
91        // Increment the number of remote initiated streams
92        self.num_local_error_reset_streams += 1;
93    }
94
95    pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96        self.max_local_error_reset_streams
97    }
98
99    /// Returns true if the receive stream concurrency can be incremented
100    pub fn can_inc_num_recv_streams(&self) -> bool {
101        self.max_recv_streams > self.num_recv_streams
102    }
103
104    /// Increments the number of concurrent receive streams.
105    ///
106    /// # Panics
107    ///
108    /// Panics on failure as this should have been validated before hand.
109    pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110        assert!(self.can_inc_num_recv_streams());
111        assert!(!stream.is_counted);
112
113        // Increment the number of remote initiated streams
114        self.num_recv_streams += 1;
115        stream.is_counted = true;
116    }
117
118    /// Returns true if the send stream concurrency can be incremented
119    pub fn can_inc_num_send_streams(&self) -> bool {
120        self.max_send_streams > self.num_send_streams
121    }
122
123    /// Increments the number of concurrent send streams.
124    ///
125    /// # Panics
126    ///
127    /// Panics on failure as this should have been validated before hand.
128    pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129        assert!(self.can_inc_num_send_streams());
130        assert!(!stream.is_counted);
131
132        // Increment the number of remote initiated streams
133        self.num_send_streams += 1;
134        stream.is_counted = true;
135    }
136
137    /// Returns true if the number of pending reset streams can be incremented.
138    pub fn can_inc_num_reset_streams(&self) -> bool {
139        self.max_local_reset_streams > self.num_local_reset_streams
140    }
141
142    /// Increments the number of pending reset streams.
143    ///
144    /// # Panics
145    ///
146    /// Panics on failure as this should have been validated before hand.
147    pub fn inc_num_reset_streams(&mut self) {
148        assert!(self.can_inc_num_reset_streams());
149
150        self.num_local_reset_streams += 1;
151    }
152
153    pub(crate) fn max_remote_reset_streams(&self) -> usize {
154        self.max_remote_reset_streams
155    }
156
157    /// Returns true if the number of pending REMOTE reset streams can be
158    /// incremented.
159    pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160        self.max_remote_reset_streams > self.num_remote_reset_streams
161    }
162
163    /// Increments the number of pending REMOTE reset streams.
164    ///
165    /// # Panics
166    ///
167    /// Panics on failure as this should have been validated before hand.
168    pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169        assert!(self.can_inc_num_remote_reset_streams());
170
171        self.num_remote_reset_streams += 1;
172    }
173
174    pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175        assert!(self.num_remote_reset_streams > 0);
176
177        self.num_remote_reset_streams -= 1;
178    }
179
180    pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181        match settings.max_concurrent_streams() {
182            Some(val) => self.max_send_streams = val as usize,
183            None if is_initial => self.max_send_streams = usize::MAX,
184            None => {}
185        }
186    }
187
188    /// Run a block of code that could potentially transition a stream's state.
189    ///
190    /// If the stream state transitions to closed, this function will perform
191    /// all necessary cleanup.
192    ///
193    /// TODO: Is this function still needed?
194    pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195    where
196        F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197    {
198        // TODO: Does this need to be computed before performing the action?
199        let is_pending_reset = stream.is_pending_reset_expiration();
200
201        // Run the action
202        let ret = f(self, &mut stream);
203
204        self.transition_after(stream, is_pending_reset);
205
206        ret
207    }
208
209    // TODO: move this to macro?
210    pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211        tracing::trace!(
212            "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213             pending_send_empty={:?}; buffered_send_data={}; \
214             num_recv={}; num_send={}",
215            stream.id,
216            stream.state,
217            stream.is_closed(),
218            stream.pending_send.is_empty(),
219            stream.buffered_send_data,
220            self.num_recv_streams,
221            self.num_send_streams
222        );
223
224        if stream.is_closed() {
225            if !stream.is_pending_reset_expiration() {
226                stream.unlink();
227                if is_reset_counted {
228                    self.dec_num_reset_streams();
229                }
230            }
231
232            if !stream.state.is_scheduled_reset() && stream.is_counted {
233                tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234                // Decrement the number of active streams.
235                self.dec_num_streams(&mut stream);
236            }
237        }
238
239        // Release the stream if it requires releasing
240        if stream.is_released() {
241            stream.remove();
242        }
243    }
244
245    /// Returns the maximum number of streams that can be initiated by this
246    /// peer.
247    pub(crate) fn max_send_streams(&self) -> usize {
248        self.max_send_streams
249    }
250
251    /// Returns the maximum number of streams that can be initiated by the
252    /// remote peer.
253    pub(crate) fn max_recv_streams(&self) -> usize {
254        self.max_recv_streams
255    }
256
257    fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258        assert!(stream.is_counted);
259
260        if self.peer.is_local_init(stream.id) {
261            assert!(self.num_send_streams > 0);
262            self.num_send_streams -= 1;
263            stream.is_counted = false;
264        } else {
265            assert!(self.num_recv_streams > 0);
266            self.num_recv_streams -= 1;
267            stream.is_counted = false;
268        }
269    }
270
271    fn dec_num_reset_streams(&mut self) {
272        assert!(self.num_local_reset_streams > 0);
273        self.num_local_reset_streams -= 1;
274    }
275}
276
277impl Drop for Counts {
278    fn drop(&mut self) {
279        use std::thread;
280
281        if !thread::panicking() {
282            debug_assert!(!self.has_streams());
283        }
284    }
285}