1use super::*;
23#[derive(Debug)]
4pub(super) struct Counts {
5/// Acting as a client or server. This allows us to track which values to
6 /// inc / dec.
7peer: peer::Dyn,
89/// Maximum number of locally initiated streams
10max_send_streams: usize,
1112/// Current number of remote initiated streams
13num_send_streams: usize,
1415/// Maximum number of remote initiated streams
16max_recv_streams: usize,
1718/// Current number of locally initiated streams
19num_recv_streams: usize,
2021/// Maximum number of pending locally reset streams
22max_local_reset_streams: usize,
2324/// Current number of pending locally reset streams
25num_local_reset_streams: usize,
2627/// Max number of "pending accept" streams that were remotely reset
28max_remote_reset_streams: usize,
2930/// Current number of "pending accept" streams that were remotely reset
31num_remote_reset_streams: usize,
3233/// Maximum number of locally reset streams due to protocol error across
34 /// the lifetime of the connection.
35 ///
36 /// When this gets exceeded, we issue GOAWAYs.
37max_local_error_reset_streams: Option<usize>,
3839/// Total number of locally reset streams due to protocol error across the
40 /// lifetime of the connection.
41num_local_error_reset_streams: usize,
42}
4344impl Counts {
45/// Create a new `Counts` using the provided configuration values.
46pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47 Counts {
48 peer,
49 max_send_streams: config.initial_max_send_streams,
50 num_send_streams: 0,
51 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52 num_recv_streams: 0,
53 max_local_reset_streams: config.local_reset_max,
54 num_local_reset_streams: 0,
55 max_remote_reset_streams: config.remote_reset_max,
56 num_remote_reset_streams: 0,
57 max_local_error_reset_streams: config.local_max_error_reset_streams,
58 num_local_error_reset_streams: 0,
59 }
60 }
6162/// Returns true when the next opened stream will reach capacity of outbound streams
63 ///
64 /// The number of client send streams is incremented in prioritize; send_request has to guess if
65 /// it should wait before allowing another request to be sent.
66pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67self.max_send_streams <= (self.num_send_streams + 1)
68 }
6970/// Returns the current peer
71pub fn peer(&self) -> peer::Dyn {
72self.peer
73 }
7475pub fn has_streams(&self) -> bool {
76self.num_send_streams != 0 || self.num_recv_streams != 0
77}
7879/// Returns true if we can issue another local reset due to protocol error.
80pub fn can_inc_num_local_error_resets(&self) -> bool {
81if let Some(max) = self.max_local_error_reset_streams {
82 max > self.num_local_error_reset_streams
83 } else {
84true
85}
86 }
8788pub fn inc_num_local_error_resets(&mut self) {
89assert!(self.can_inc_num_local_error_resets());
9091// Increment the number of remote initiated streams
92self.num_local_error_reset_streams += 1;
93 }
9495pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96self.max_local_error_reset_streams
97 }
9899/// Returns true if the receive stream concurrency can be incremented
100pub fn can_inc_num_recv_streams(&self) -> bool {
101self.max_recv_streams > self.num_recv_streams
102 }
103104/// Increments the number of concurrent receive streams.
105 ///
106 /// # Panics
107 ///
108 /// Panics on failure as this should have been validated before hand.
109pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110assert!(self.can_inc_num_recv_streams());
111assert!(!stream.is_counted);
112113// Increment the number of remote initiated streams
114self.num_recv_streams += 1;
115 stream.is_counted = true;
116 }
117118/// Returns true if the send stream concurrency can be incremented
119pub fn can_inc_num_send_streams(&self) -> bool {
120self.max_send_streams > self.num_send_streams
121 }
122123/// Increments the number of concurrent send streams.
124 ///
125 /// # Panics
126 ///
127 /// Panics on failure as this should have been validated before hand.
128pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129assert!(self.can_inc_num_send_streams());
130assert!(!stream.is_counted);
131132// Increment the number of remote initiated streams
133self.num_send_streams += 1;
134 stream.is_counted = true;
135 }
136137/// Returns true if the number of pending reset streams can be incremented.
138pub fn can_inc_num_reset_streams(&self) -> bool {
139self.max_local_reset_streams > self.num_local_reset_streams
140 }
141142/// Increments the number of pending reset streams.
143 ///
144 /// # Panics
145 ///
146 /// Panics on failure as this should have been validated before hand.
147pub fn inc_num_reset_streams(&mut self) {
148assert!(self.can_inc_num_reset_streams());
149150self.num_local_reset_streams += 1;
151 }
152153pub(crate) fn max_remote_reset_streams(&self) -> usize {
154self.max_remote_reset_streams
155 }
156157/// Returns true if the number of pending REMOTE reset streams can be
158 /// incremented.
159pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160self.max_remote_reset_streams > self.num_remote_reset_streams
161 }
162163/// Increments the number of pending REMOTE reset streams.
164 ///
165 /// # Panics
166 ///
167 /// Panics on failure as this should have been validated before hand.
168pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169assert!(self.can_inc_num_remote_reset_streams());
170171self.num_remote_reset_streams += 1;
172 }
173174pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175assert!(self.num_remote_reset_streams > 0);
176177self.num_remote_reset_streams -= 1;
178 }
179180pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181match settings.max_concurrent_streams() {
182Some(val) => self.max_send_streams = val as usize,
183None if is_initial => self.max_send_streams = usize::MAX,
184None => {}
185 }
186 }
187188/// Run a block of code that could potentially transition a stream's state.
189 ///
190 /// If the stream state transitions to closed, this function will perform
191 /// all necessary cleanup.
192 ///
193 /// TODO: Is this function still needed?
194pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195where
196F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197 {
198// TODO: Does this need to be computed before performing the action?
199let is_pending_reset = stream.is_pending_reset_expiration();
200201// Run the action
202let ret = f(self, &mut stream);
203204self.transition_after(stream, is_pending_reset);
205206 ret
207 }
208209// TODO: move this to macro?
210pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211tracing::trace!(
212"transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213 pending_send_empty={:?}; buffered_send_data={}; \
214 num_recv={}; num_send={}",
215 stream.id,
216 stream.state,
217 stream.is_closed(),
218 stream.pending_send.is_empty(),
219 stream.buffered_send_data,
220self.num_recv_streams,
221self.num_send_streams
222 );
223224if stream.is_closed() {
225if !stream.is_pending_reset_expiration() {
226 stream.unlink();
227if is_reset_counted {
228self.dec_num_reset_streams();
229 }
230 }
231232if !stream.state.is_scheduled_reset() && stream.is_counted {
233tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234// Decrement the number of active streams.
235self.dec_num_streams(&mut stream);
236 }
237 }
238239// Release the stream if it requires releasing
240if stream.is_released() {
241 stream.remove();
242 }
243 }
244245/// Returns the maximum number of streams that can be initiated by this
246 /// peer.
247pub(crate) fn max_send_streams(&self) -> usize {
248self.max_send_streams
249 }
250251/// Returns the maximum number of streams that can be initiated by the
252 /// remote peer.
253pub(crate) fn max_recv_streams(&self) -> usize {
254self.max_recv_streams
255 }
256257fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258assert!(stream.is_counted);
259260if self.peer.is_local_init(stream.id) {
261assert!(self.num_send_streams > 0);
262self.num_send_streams -= 1;
263 stream.is_counted = false;
264 } else {
265assert!(self.num_recv_streams > 0);
266self.num_recv_streams -= 1;
267 stream.is_counted = false;
268 }
269 }
270271fn dec_num_reset_streams(&mut self) {
272assert!(self.num_local_reset_streams > 0);
273self.num_local_reset_streams -= 1;
274 }
275}
276277impl Drop for Counts {
278fn drop(&mut self) {
279use std::thread;
280281if !thread::panicking() {
282debug_assert!(!self.has_streams());
283 }
284 }
285}