tor_proto/
congestion.rs

1//! Congestion control subsystem.
2//!
3//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5//! part of congestion control.
6//!
7//! # Implementation
8//!
9//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10//! in turn updates the congestion control state so that the very important
11//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12//! sent or not.
13//!
14//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16//! protocol violation state.
17//!
18//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19//! order to update the state.
20
21#[cfg(any(test, feature = "testing"))]
22pub(crate) mod test_utils;
23
24mod fixed;
25pub mod params;
26mod rtt;
27pub(crate) mod sendme;
28mod vegas;
29
30use crate::{Error, Result};
31
32use self::{
33    params::{Algorithm, CongestionControlParams, CongestionWindowParams},
34    rtt::RoundtripTimeEstimator,
35    sendme::SendmeValidator,
36};
37use tor_cell::relaycell::msg::SendmeTag;
38use tor_rtcompat::{DynTimeProvider, SleepProvider};
39
40/// This trait defines what a congestion control algorithm must implement in order to interface
41/// with the circuit reactor.
42///
43/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
44/// that on error, it means we can't recover or that there is a protocol violation. In both
45/// cases, the circuit MUST be closed.
46pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
47    /// Return true iff this algorithm uses stream level SENDMEs.
48    fn uses_stream_sendme(&self) -> bool;
49    /// Return true iff this algorithm uses stream level XON/XOFFs.
50    fn uses_xon_xoff(&self) -> bool;
51    /// Return true iff the next cell is expected to be a SENDME.
52    fn is_next_cell_sendme(&self) -> bool;
53    /// Return true iff a cell can be sent on the wire according to the congestion control
54    /// algorithm.
55    fn can_send(&self) -> bool;
56    /// Return the congestion window object. The reason is returns an Option is because not all
57    /// algorithm uses one and so we avoid acting on it if so.
58    fn cwnd(&self) -> Option<&CongestionWindow>;
59
60    /// Inform the algorithm that we just got a DATA cell.
61    ///
62    /// Return true if a SENDME should be sent immediately or false if not.
63    fn data_received(&mut self) -> Result<bool>;
64    /// Inform the algorithm that we just sent a DATA cell.
65    fn data_sent(&mut self) -> Result<()>;
66    /// Inform the algorithm that we've just received a SENDME.
67    ///
68    /// This is a core function because the algorithm massively update its state when receiving a
69    /// SENDME by using the RTT value and congestion signals.
70    fn sendme_received(
71        &mut self,
72        state: &mut State,
73        rtt: &mut RoundtripTimeEstimator,
74        signals: CongestionSignals,
75    ) -> Result<()>;
76    /// Inform the algorithm that we just sent a SENDME.
77    fn sendme_sent(&mut self) -> Result<()>;
78
79    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
80    ///
81    /// Optional, because not all algorithms track this.
82    #[cfg(feature = "conflux")]
83    fn inflight(&self) -> Option<u32>;
84
85    /// Test Only: Return the congestion window.
86    #[cfg(test)]
87    fn send_window(&self) -> u32;
88
89    /// Return the congestion control [`Algorithm`] implemented by this type.
90    fn algorithm(&self) -> Algorithm;
91}
92
93/// These are congestion signals used by a congestion control algorithm to make decisions. These
94/// signals are various states of our internals. This is not an exhaustive list.
95#[derive(Copy, Clone)]
96pub(crate) struct CongestionSignals {
97    /// Indicate if the channel is blocked.
98    pub(crate) channel_blocked: bool,
99    /// The size of the channel outbound queue.
100    pub(crate) channel_outbound_size: u32,
101}
102
103impl CongestionSignals {
104    /// Constructor
105    pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
106        Self {
107            channel_blocked,
108            channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
109        }
110    }
111}
112
113/// Congestion control state.
114#[derive(Copy, Clone, Default)]
115pub(crate) enum State {
116    /// The initial state any circuit starts in. Used to gradually increase the amount of data
117    /// being transmitted in order to converge towards to optimal capacity.
118    #[default]
119    SlowStart,
120    /// Steady state representing what we think is optimal. This is always after slow start.
121    Steady,
122}
123
124impl State {
125    /// Return true iff this is SlowStart.
126    pub(crate) fn in_slow_start(&self) -> bool {
127        matches!(self, State::SlowStart)
128    }
129}
130
131/// A congestion window. This is generic for all algorithms but their parameters' value will differ
132/// depending on the selected algorithm.
133#[derive(Clone, Debug)]
134pub(crate) struct CongestionWindow {
135    /// Congestion window parameters from the consensus.
136    params: CongestionWindowParams,
137    /// The actual value of our congestion window.
138    value: u32,
139    /// The congestion window is full.
140    is_full: bool,
141}
142
143impl CongestionWindow {
144    /// Constructor taking consensus parameters.
145    fn new(params: &CongestionWindowParams) -> Self {
146        Self {
147            value: params.cwnd_init(),
148            params: params.clone(),
149            is_full: false,
150        }
151    }
152
153    /// Decrement the window by the increment value.
154    pub(crate) fn dec(&mut self) {
155        self.value = self
156            .value
157            .saturating_sub(self.increment())
158            .max(self.params.cwnd_min());
159    }
160
161    /// Increment the window by the increment value.
162    pub(crate) fn inc(&mut self) {
163        self.value = self
164            .value
165            .saturating_add(self.increment())
166            .min(self.params.cwnd_max());
167    }
168
169    /// Return the current value.
170    pub(crate) fn get(&self) -> u32 {
171        self.value
172    }
173
174    /// Return the expected rate for which the congestion window should be updated at.
175    ///
176    /// See `CWND_UPDATE_RATE` in prop324.
177    pub(crate) fn update_rate(&self, state: &State) -> u32 {
178        if state.in_slow_start() {
179            1
180        } else {
181            (self.get() + self.increment_rate() * self.sendme_inc() / 2)
182                / (self.increment_rate() * self.sendme_inc())
183        }
184    }
185
186    /// Return minimum value of the congestion window.
187    pub(crate) fn min(&self) -> u32 {
188        self.params.cwnd_min()
189    }
190
191    /// Set the congestion window value with a new value.
192    pub(crate) fn set(&mut self, value: u32) {
193        self.value = value;
194    }
195
196    /// Return the increment value.
197    pub(crate) fn increment(&self) -> u32 {
198        self.params.cwnd_inc()
199    }
200
201    /// Return the rate at which we should increment the window.
202    pub(crate) fn increment_rate(&self) -> u32 {
203        self.params.cwnd_inc_rate()
204    }
205
206    /// Return true iff this congestion window is full.
207    pub(crate) fn is_full(&self) -> bool {
208        self.is_full
209    }
210
211    /// Reset the full flag meaning it is now not full.
212    pub(crate) fn reset_full(&mut self) {
213        self.is_full = false;
214    }
215
216    /// Return the number of expected SENDMEs per congestion window.
217    ///
218    /// Spec: prop324 SENDME_PER_CWND definition
219    pub(crate) fn sendme_per_cwnd(&self) -> u32 {
220        (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
221    }
222
223    /// Return the RFC3742 slow start increment value.
224    ///
225    /// Spec: prop324 rfc3742_ss_inc definition
226    pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
227        let inc = if self.get() <= ss_cap {
228            ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
229        } else {
230            (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
231        };
232        self.value += inc;
233        inc
234    }
235
236    /// Evaluate the fullness of the window with the given parameters.
237    ///
238    /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
239    /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
240    pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
241        if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
242            self.is_full = true;
243        } else if (100 * inflight) < (full_minpct * self.get()) {
244            self.is_full = false;
245        }
246    }
247
248    /// Return the SENDME increment value.
249    pub(crate) fn sendme_inc(&self) -> u32 {
250        self.params.sendme_inc()
251    }
252
253    /// Return the congestion window params.
254    #[cfg(any(test, feature = "conflux"))]
255    pub(crate) fn params(&self) -> &CongestionWindowParams {
256        &self.params
257    }
258}
259
260/// Congestion control state of a hop on a circuit.
261///
262/// This controls the entire logic of congestion control and circuit level SENDMEs.
263pub(crate) struct CongestionControl {
264    /// Which congestion control state are we in?
265    state: State,
266    /// This is the SENDME validator as in it keeps track of the circuit tag found within an
267    /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
268    /// expected values.
269    sendme_validator: SendmeValidator<SendmeTag>,
270    /// The RTT estimator for the circuit we are attached on.
271    rtt: RoundtripTimeEstimator,
272    /// The congestion control algorithm.
273    algorithm: Box<dyn CongestionControlAlgorithm>,
274}
275
276impl CongestionControl {
277    /// Construct a new CongestionControl
278    pub(crate) fn new(params: &CongestionControlParams) -> Self {
279        let state = State::default();
280        // Use what the consensus tells us to use.
281        let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
282            Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
283            Algorithm::Vegas(ref p) => {
284                let cwnd = CongestionWindow::new(params.cwnd_params());
285                Box::new(vegas::Vegas::new(*p, &state, cwnd))
286            }
287        };
288        Self {
289            algorithm,
290            rtt: RoundtripTimeEstimator::new(params.rtt_params()),
291            sendme_validator: SendmeValidator::new(),
292            state,
293        }
294    }
295
296    /// Return true iff the underlying algorithm uses stream level SENDMEs.
297    /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
298    pub(crate) fn uses_stream_sendme(&self) -> bool {
299        self.algorithm.uses_stream_sendme()
300    }
301
302    /// Return true iff the underlying algorithm uses stream level XON/XOFFs.
303    /// At the moment, only Vegas uses it.
304    pub(crate) fn uses_xon_xoff(&self) -> bool {
305        self.algorithm.uses_xon_xoff()
306    }
307
308    /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
309    pub(crate) fn can_send(&self) -> bool {
310        self.algorithm.can_send()
311    }
312
313    /// Called when a SENDME cell is received.
314    ///
315    /// An error is returned if there is a protocol violation with regards to congestion control.
316    pub(crate) fn note_sendme_received(
317        &mut self,
318        runtime: &DynTimeProvider,
319        tag: SendmeTag,
320        signals: CongestionSignals,
321    ) -> Result<()> {
322        // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
323        // closing the circuit.
324        self.sendme_validator.validate(Some(tag))?;
325
326        let now = runtime.now();
327        // Update our RTT estimate if the algorithm yields back a congestion window. RTT
328        // measurements only make sense for a congestion window. For example, FixedWindow here
329        // doesn't use it and so no need for the RTT.
330        if let Some(cwnd) = self.algorithm.cwnd() {
331            self.rtt
332                .update(now, &self.state, cwnd)
333                .map_err(|e| Error::CircProto(e.to_string()))?;
334        }
335
336        // Notify the algorithm that we've received a SENDME.
337        self.algorithm
338            .sendme_received(&mut self.state, &mut self.rtt, signals)
339    }
340
341    /// Called when a SENDME cell is sent.
342    pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
343        self.algorithm.sendme_sent()
344    }
345
346    /// Called when a DATA cell is received.
347    ///
348    /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
349    /// a protocol violation with regards to flow or congestion control.
350    pub(crate) fn note_data_received(&mut self) -> Result<bool> {
351        self.algorithm.data_received()
352    }
353
354    /// Called when a DATA cell is sent.
355    ///
356    /// An error is returned if there is a protocol violation with regards to flow or congestion
357    /// control.
358    pub(crate) fn note_data_sent<U>(&mut self, runtime: &DynTimeProvider, tag: &U) -> Result<()>
359    where
360        U: Clone + Into<SendmeTag>,
361    {
362        // Inform the algorithm that the data was just sent. This is important to be the very first
363        // thing so the congestion window can be updated accordingly making the following calls
364        // using the latest data.
365        self.algorithm.data_sent()?;
366
367        // If next cell is a SENDME, we need to record the tag of this cell in order to validate
368        // the next SENDME when it arrives.
369        if self.algorithm.is_next_cell_sendme() {
370            self.sendme_validator.record(tag);
371            // Only keep the SENDME timestamp if the algorithm has a congestion window.
372            if self.algorithm.cwnd().is_some() {
373                self.rtt.expect_sendme(runtime.now());
374            }
375        }
376
377        Ok(())
378    }
379
380    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
381    ///
382    /// Optional, because not all algorithms track this.
383    #[cfg(feature = "conflux")]
384    pub(crate) fn inflight(&self) -> Option<u32> {
385        self.algorithm.inflight()
386    }
387
388    /// Return the congestion window object.
389    ///
390    /// Optional, because not all algorithms track this.
391    #[cfg(feature = "conflux")]
392    pub(crate) fn cwnd(&self) -> Option<&CongestionWindow> {
393        self.algorithm.cwnd()
394    }
395
396    /// Return a reference to the RTT estimator.
397    ///
398    /// Used for conflux, for choosing the best circuit to send on.
399    #[cfg(feature = "conflux")]
400    pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
401        &self.rtt
402    }
403
404    /// Return the congestion control algorithm.
405    #[cfg(feature = "conflux")]
406    pub(crate) fn algorithm(&self) -> Algorithm {
407        self.algorithm.algorithm()
408    }
409}
410
411#[cfg(test)]
412mod test {
413    // @@ begin test lint list maintained by maint/add_warning @@
414    #![allow(clippy::bool_assert_comparison)]
415    #![allow(clippy::clone_on_copy)]
416    #![allow(clippy::dbg_macro)]
417    #![allow(clippy::mixed_attributes_style)]
418    #![allow(clippy::print_stderr)]
419    #![allow(clippy::print_stdout)]
420    #![allow(clippy::single_char_pattern)]
421    #![allow(clippy::unwrap_used)]
422    #![allow(clippy::unchecked_duration_subtraction)]
423    #![allow(clippy::useless_vec)]
424    #![allow(clippy::needless_pass_by_value)]
425    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
426
427    use crate::congestion::test_utils::new_cwnd;
428
429    use super::CongestionControl;
430    use tor_cell::relaycell::msg::SendmeTag;
431
432    impl CongestionControl {
433        /// For testing: get a copy of the current send window, and the
434        /// expected incoming tags.
435        pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
436            (
437                self.algorithm.send_window(),
438                self.sendme_validator.expected_tags(),
439            )
440        }
441    }
442
443    #[test]
444    fn test_cwnd() {
445        let mut cwnd = new_cwnd();
446
447        // Validate the getters are coherent with initialization.
448        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
449        assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
450        assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
451        assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
452        assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
453        assert!(!cwnd.is_full());
454
455        // Validate changes.
456        cwnd.inc();
457        assert_eq!(
458            cwnd.get(),
459            cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
460        );
461        cwnd.dec();
462        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
463    }
464}