tor_proto/stream/
xon_xoff.rs

1//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2//!
3//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4//! control.
5
6use std::io::Error;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{AsyncRead, Stream};
11use pin_project::pin_project;
12use tor_basic_utils::assert_val_impl_trait;
13use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14
15use crate::stream::DrainRateRequest;
16use crate::tunnel::StreamTarget;
17use crate::util::notify::NotifyReceiver;
18
19/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
20///
21/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
22/// events.
23#[derive(Debug)]
24#[pin_project]
25pub(crate) struct XonXoffReader<R> {
26    /// How we communicate with the circuit reactor.
27    #[pin]
28    ctrl: XonXoffReaderCtrl,
29    /// The inner reader.
30    #[pin]
31    reader: R,
32    /// Have we received a drain rate request notification from the reactor,
33    /// but haven't yet sent a drain rate update back to the reactor?
34    pending_drain_rate_update: bool,
35}
36
37impl<R> XonXoffReader<R> {
38    /// Create a new [`XonXoffReader`].
39    ///
40    /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
41    /// the incoming stream buffer is empty or not.
42    pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
43        Self {
44            ctrl,
45            reader,
46            pending_drain_rate_update: false,
47        }
48    }
49
50    /// Get a reference to the inner [`AsyncRead`].
51    ///
52    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
53    /// how you use the returned reader (for example if it uses interior mutability).
54    pub(crate) fn inner(&self) -> &R {
55        &self.reader
56    }
57
58    /// Get a mutable reference to the inner [`AsyncRead`].
59    ///
60    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
61    /// how you use the returned reader (for example if you read bytes directly).
62    pub(crate) fn inner_mut(&mut self) -> &mut R {
63        &mut self.reader
64    }
65}
66
67impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
68    fn poll_read(
69        self: Pin<&mut Self>,
70        cx: &mut Context<'_>,
71        buf: &mut [u8],
72    ) -> Poll<Result<usize, Error>> {
73        let mut self_ = self.project();
74
75        // ensure that `drain_rate_request_stream` is a `FusedStream`,
76        // which means that we don't need to worry about calling `poll_next()` repeatedly
77        assert_val_impl_trait!(
78            self_.ctrl.drain_rate_request_stream,
79            futures::stream::FusedStream,
80        );
81
82        // check if the circuit reactor has requested a drain rate update
83        if let Poll::Ready(Some(())) = self_
84            .ctrl
85            .as_mut()
86            .project()
87            .drain_rate_request_stream
88            .poll_next(cx)
89        {
90            // a drain rate update was requested, so we need to send a drain rate update once we
91            // have no more bytes buffered
92            *self_.pending_drain_rate_update = true;
93        }
94
95        // try reading from the inner reader
96        let res = self_.reader.as_mut().poll_read(cx, buf);
97
98        // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
99        if *self_.pending_drain_rate_update && self_.reader.is_empty() {
100            // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
101            // send an "unlimited" drain rate
102            self_
103                .ctrl
104                .stream_target
105                .drain_rate_update(XonKbpsEwma::Unlimited)?;
106            *self_.pending_drain_rate_update = false;
107        }
108
109        res
110    }
111}
112
113/// The control structure for a stream that partakes in XON/XOFF flow control.
114#[derive(Debug)]
115#[pin_project]
116pub(crate) struct XonXoffReaderCtrl {
117    /// Receive notifications when the reactor requests a new drain rate.
118    /// When we do, we should begin waiting for the receive buffer to clear.
119    /// Then when the buffer clears, we should send a new drain rate update to the reactor.
120    #[pin]
121    drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
122    /// A handle to the reactor for this stream.
123    /// This allows us to send drain rate updates to the circuit reactor.
124    stream_target: StreamTarget,
125}
126
127impl XonXoffReaderCtrl {
128    /// Create a new [`XonXoffReaderCtrl`].
129    pub(crate) fn new(
130        drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
131        stream_target: StreamTarget,
132    ) -> Self {
133        Self {
134            drain_rate_request_stream,
135            stream_target,
136        }
137    }
138}
139
140/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
141/// (typically resulting in an XON message).
142pub(crate) trait BufferIsEmpty {
143    /// Returns `true` if there are no incoming bytes buffered on this stream.
144    ///
145    /// This takes a `&mut` so that implementers can
146    /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
147    /// a stream if necessary.
148    fn is_empty(self: Pin<&mut Self>) -> bool;
149}