tor_proto/congestion.rs
1//! Congestion control subsystem.
2//!
3//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5//! part of congestion control.
6//!
7//! # Implementation
8//!
9//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10//! in turn updates the congestion control state so that the very important
11//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12//! sent or not.
13//!
14//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16//! protocol violation state.
17//!
18//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19//! order to update the state.
20
21#[cfg(any(test, feature = "testing"))]
22pub(crate) mod test_utils;
23
24mod fixed;
25pub mod params;
26mod rtt;
27pub(crate) mod sendme;
28mod vegas;
29
30use crate::{Error, Result};
31
32use self::{
33 params::{Algorithm, CongestionControlParams, CongestionWindowParams},
34 rtt::RoundtripTimeEstimator,
35 sendme::SendmeValidator,
36};
37use tor_cell::relaycell::msg::SendmeTag;
38use tor_rtcompat::{DynTimeProvider, SleepProvider};
39
40/// This trait defines what a congestion control algorithm must implement in order to interface
41/// with the circuit reactor.
42///
43/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
44/// that on error, it means we can't recover or that there is a protocol violation. In both
45/// cases, the circuit MUST be closed.
46pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
47 /// Return true iff this algorithm uses stream level SENDMEs.
48 fn uses_stream_sendme(&self) -> bool;
49 /// Return true iff this algorithm uses stream level XON/XOFFs.
50 fn uses_xon_xoff(&self) -> bool;
51 /// Return true iff the next cell is expected to be a SENDME.
52 fn is_next_cell_sendme(&self) -> bool;
53 /// Return true iff a cell can be sent on the wire according to the congestion control
54 /// algorithm.
55 fn can_send(&self) -> bool;
56 /// Return the congestion window object. The reason is returns an Option is because not all
57 /// algorithm uses one and so we avoid acting on it if so.
58 fn cwnd(&self) -> Option<&CongestionWindow>;
59
60 /// Inform the algorithm that we just got a DATA cell.
61 ///
62 /// Return true if a SENDME should be sent immediately or false if not.
63 fn data_received(&mut self) -> Result<bool>;
64 /// Inform the algorithm that we just sent a DATA cell.
65 fn data_sent(&mut self) -> Result<()>;
66 /// Inform the algorithm that we've just received a SENDME.
67 ///
68 /// This is a core function because the algorithm massively update its state when receiving a
69 /// SENDME by using the RTT value and congestion signals.
70 fn sendme_received(
71 &mut self,
72 state: &mut State,
73 rtt: &mut RoundtripTimeEstimator,
74 signals: CongestionSignals,
75 ) -> Result<()>;
76 /// Inform the algorithm that we just sent a SENDME.
77 fn sendme_sent(&mut self) -> Result<()>;
78
79 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
80 ///
81 /// Optional, because not all algorithms track this.
82 #[cfg(feature = "conflux")]
83 fn inflight(&self) -> Option<u32>;
84
85 /// Test Only: Return the congestion window.
86 #[cfg(test)]
87 fn send_window(&self) -> u32;
88
89 /// Return the congestion control [`Algorithm`] implemented by this type.
90 fn algorithm(&self) -> Algorithm;
91}
92
93/// These are congestion signals used by a congestion control algorithm to make decisions. These
94/// signals are various states of our internals. This is not an exhaustive list.
95#[derive(Copy, Clone)]
96pub(crate) struct CongestionSignals {
97 /// Indicate if the channel is blocked.
98 pub(crate) channel_blocked: bool,
99 /// The size of the channel outbound queue.
100 pub(crate) channel_outbound_size: u32,
101}
102
103impl CongestionSignals {
104 /// Constructor
105 pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
106 Self {
107 channel_blocked,
108 channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
109 }
110 }
111}
112
113/// Congestion control state.
114#[derive(Copy, Clone, Default)]
115pub(crate) enum State {
116 /// The initial state any circuit starts in. Used to gradually increase the amount of data
117 /// being transmitted in order to converge towards to optimal capacity.
118 #[default]
119 SlowStart,
120 /// Steady state representing what we think is optimal. This is always after slow start.
121 Steady,
122}
123
124impl State {
125 /// Return true iff this is SlowStart.
126 pub(crate) fn in_slow_start(&self) -> bool {
127 matches!(self, State::SlowStart)
128 }
129}
130
131/// A congestion window. This is generic for all algorithms but their parameters' value will differ
132/// depending on the selected algorithm.
133#[derive(Clone, Debug)]
134pub(crate) struct CongestionWindow {
135 /// Congestion window parameters from the consensus.
136 params: CongestionWindowParams,
137 /// The actual value of our congestion window.
138 value: u32,
139 /// The congestion window is full.
140 is_full: bool,
141}
142
143impl CongestionWindow {
144 /// Constructor taking consensus parameters.
145 fn new(params: &CongestionWindowParams) -> Self {
146 Self {
147 value: params.cwnd_init(),
148 params: params.clone(),
149 is_full: false,
150 }
151 }
152
153 /// Decrement the window by the increment value.
154 pub(crate) fn dec(&mut self) {
155 self.value = self
156 .value
157 .saturating_sub(self.increment())
158 .max(self.params.cwnd_min());
159 }
160
161 /// Increment the window by the increment value.
162 pub(crate) fn inc(&mut self) {
163 self.value = self
164 .value
165 .saturating_add(self.increment())
166 .min(self.params.cwnd_max());
167 }
168
169 /// Return the current value.
170 pub(crate) fn get(&self) -> u32 {
171 self.value
172 }
173
174 /// Return the expected rate for which the congestion window should be updated at.
175 ///
176 /// See `CWND_UPDATE_RATE` in prop324.
177 pub(crate) fn update_rate(&self, state: &State) -> u32 {
178 if state.in_slow_start() {
179 1
180 } else {
181 (self.get() + self.increment_rate() * self.sendme_inc() / 2)
182 / (self.increment_rate() * self.sendme_inc())
183 }
184 }
185
186 /// Return minimum value of the congestion window.
187 pub(crate) fn min(&self) -> u32 {
188 self.params.cwnd_min()
189 }
190
191 /// Set the congestion window value with a new value.
192 pub(crate) fn set(&mut self, value: u32) {
193 self.value = value;
194 }
195
196 /// Return the increment value.
197 pub(crate) fn increment(&self) -> u32 {
198 self.params.cwnd_inc()
199 }
200
201 /// Return the rate at which we should increment the window.
202 pub(crate) fn increment_rate(&self) -> u32 {
203 self.params.cwnd_inc_rate()
204 }
205
206 /// Return true iff this congestion window is full.
207 pub(crate) fn is_full(&self) -> bool {
208 self.is_full
209 }
210
211 /// Reset the full flag meaning it is now not full.
212 pub(crate) fn reset_full(&mut self) {
213 self.is_full = false;
214 }
215
216 /// Return the number of expected SENDMEs per congestion window.
217 ///
218 /// Spec: prop324 SENDME_PER_CWND definition
219 pub(crate) fn sendme_per_cwnd(&self) -> u32 {
220 (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
221 }
222
223 /// Return the RFC3742 slow start increment value.
224 ///
225 /// Spec: prop324 rfc3742_ss_inc definition
226 pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
227 let inc = if self.get() <= ss_cap {
228 ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
229 } else {
230 (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
231 };
232 self.value += inc;
233 inc
234 }
235
236 /// Evaluate the fullness of the window with the given parameters.
237 ///
238 /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
239 /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
240 pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
241 if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
242 self.is_full = true;
243 } else if (100 * inflight) < (full_minpct * self.get()) {
244 self.is_full = false;
245 }
246 }
247
248 /// Return the SENDME increment value.
249 pub(crate) fn sendme_inc(&self) -> u32 {
250 self.params.sendme_inc()
251 }
252
253 /// Return the congestion window params.
254 #[cfg(any(test, feature = "conflux"))]
255 pub(crate) fn params(&self) -> &CongestionWindowParams {
256 &self.params
257 }
258}
259
260/// Congestion control state of a hop on a circuit.
261///
262/// This controls the entire logic of congestion control and circuit level SENDMEs.
263pub(crate) struct CongestionControl {
264 /// Which congestion control state are we in?
265 state: State,
266 /// This is the SENDME validator as in it keeps track of the circuit tag found within an
267 /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
268 /// expected values.
269 sendme_validator: SendmeValidator<SendmeTag>,
270 /// The RTT estimator for the circuit we are attached on.
271 rtt: RoundtripTimeEstimator,
272 /// The congestion control algorithm.
273 algorithm: Box<dyn CongestionControlAlgorithm>,
274}
275
276impl CongestionControl {
277 /// Construct a new CongestionControl
278 pub(crate) fn new(params: &CongestionControlParams) -> Self {
279 let state = State::default();
280 // Use what the consensus tells us to use.
281 let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
282 Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
283 Algorithm::Vegas(ref p) => {
284 let cwnd = CongestionWindow::new(params.cwnd_params());
285 Box::new(vegas::Vegas::new(*p, &state, cwnd))
286 }
287 };
288 Self {
289 algorithm,
290 rtt: RoundtripTimeEstimator::new(params.rtt_params()),
291 sendme_validator: SendmeValidator::new(),
292 state,
293 }
294 }
295
296 /// Return true iff the underlying algorithm uses stream level SENDMEs.
297 /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
298 pub(crate) fn uses_stream_sendme(&self) -> bool {
299 self.algorithm.uses_stream_sendme()
300 }
301
302 /// Return true iff the underlying algorithm uses stream level XON/XOFFs.
303 /// At the moment, only Vegas uses it.
304 pub(crate) fn uses_xon_xoff(&self) -> bool {
305 self.algorithm.uses_xon_xoff()
306 }
307
308 /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
309 pub(crate) fn can_send(&self) -> bool {
310 self.algorithm.can_send()
311 }
312
313 /// Called when a SENDME cell is received.
314 ///
315 /// An error is returned if there is a protocol violation with regards to congestion control.
316 pub(crate) fn note_sendme_received(
317 &mut self,
318 runtime: &DynTimeProvider,
319 tag: SendmeTag,
320 signals: CongestionSignals,
321 ) -> Result<()> {
322 // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
323 // closing the circuit.
324 self.sendme_validator.validate(Some(tag))?;
325
326 let now = runtime.now();
327 // Update our RTT estimate if the algorithm yields back a congestion window. RTT
328 // measurements only make sense for a congestion window. For example, FixedWindow here
329 // doesn't use it and so no need for the RTT.
330 if let Some(cwnd) = self.algorithm.cwnd() {
331 self.rtt
332 .update(now, &self.state, cwnd)
333 .map_err(|e| Error::CircProto(e.to_string()))?;
334 }
335
336 // Notify the algorithm that we've received a SENDME.
337 self.algorithm
338 .sendme_received(&mut self.state, &mut self.rtt, signals)
339 }
340
341 /// Called when a SENDME cell is sent.
342 pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
343 self.algorithm.sendme_sent()
344 }
345
346 /// Called when a DATA cell is received.
347 ///
348 /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
349 /// a protocol violation with regards to flow or congestion control.
350 pub(crate) fn note_data_received(&mut self) -> Result<bool> {
351 self.algorithm.data_received()
352 }
353
354 /// Called when a DATA cell is sent.
355 ///
356 /// An error is returned if there is a protocol violation with regards to flow or congestion
357 /// control.
358 pub(crate) fn note_data_sent<U>(&mut self, runtime: &DynTimeProvider, tag: &U) -> Result<()>
359 where
360 U: Clone + Into<SendmeTag>,
361 {
362 // Inform the algorithm that the data was just sent. This is important to be the very first
363 // thing so the congestion window can be updated accordingly making the following calls
364 // using the latest data.
365 self.algorithm.data_sent()?;
366
367 // If next cell is a SENDME, we need to record the tag of this cell in order to validate
368 // the next SENDME when it arrives.
369 if self.algorithm.is_next_cell_sendme() {
370 self.sendme_validator.record(tag);
371 // Only keep the SENDME timestamp if the algorithm has a congestion window.
372 if self.algorithm.cwnd().is_some() {
373 self.rtt.expect_sendme(runtime.now());
374 }
375 }
376
377 Ok(())
378 }
379
380 /// Return the number of in-flight cells (sent but awaiting SENDME ack).
381 ///
382 /// Optional, because not all algorithms track this.
383 #[cfg(feature = "conflux")]
384 pub(crate) fn inflight(&self) -> Option<u32> {
385 self.algorithm.inflight()
386 }
387
388 /// Return the congestion window object.
389 ///
390 /// Optional, because not all algorithms track this.
391 #[cfg(feature = "conflux")]
392 pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
393 self.algorithm.cwnd()
394 }
395
396 /// Return a reference to the RTT estimator.
397 ///
398 /// Used for conflux, for choosing the best circuit to send on.
399 #[cfg(feature = "conflux")]
400 pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
401 &self.rtt
402 }
403
404 /// Return the congestion control algorithm.
405 #[cfg(feature = "conflux")]
406 pub(crate) fn algorithm(&self) -> Algorithm {
407 self.algorithm.algorithm()
408 }
409}
410
411#[cfg(test)]
412mod test {
413 // @@ begin test lint list maintained by maint/add_warning @@
414 #![allow(clippy::bool_assert_comparison)]
415 #![allow(clippy::clone_on_copy)]
416 #![allow(clippy::dbg_macro)]
417 #![allow(clippy::mixed_attributes_style)]
418 #![allow(clippy::print_stderr)]
419 #![allow(clippy::print_stdout)]
420 #![allow(clippy::single_char_pattern)]
421 #![allow(clippy::unwrap_used)]
422 #![allow(clippy::unchecked_duration_subtraction)]
423 #![allow(clippy::useless_vec)]
424 #![allow(clippy::needless_pass_by_value)]
425 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
426
427 use crate::congestion::test_utils::new_cwnd;
428
429 use super::CongestionControl;
430 use tor_cell::relaycell::msg::SendmeTag;
431
432 impl CongestionControl {
433 /// For testing: get a copy of the current send window, and the
434 /// expected incoming tags.
435 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
436 (
437 self.algorithm.send_window(),
438 self.sendme_validator.expected_tags(),
439 )
440 }
441 }
442
443 #[test]
444 fn test_cwnd() {
445 let mut cwnd = new_cwnd();
446
447 // Validate the getters are coherent with initialization.
448 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
449 assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
450 assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
451 assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
452 assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
453 assert!(!cwnd.is_full());
454
455 // Validate changes.
456 cwnd.inc();
457 assert_eq!(
458 cwnd.get(),
459 cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
460 );
461 cwnd.dec();
462 assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
463 }
464}