tor_proto/stream/xon_xoff.rs
1//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2//!
3//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4//! control.
5
6use std::io::Error;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{AsyncRead, Stream};
11use pin_project::pin_project;
12use tor_basic_utils::assert_val_impl_trait;
13use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14
15use crate::stream::DrainRateRequest;
16use crate::tunnel::StreamTarget;
17use crate::util::notify::NotifyReceiver;
18
19/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
20///
21/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
22/// events.
23#[derive(Debug)]
24#[pin_project]
25pub(crate) struct XonXoffReader<R> {
26 /// How we communicate with the circuit reactor.
27 #[pin]
28 ctrl: XonXoffReaderCtrl,
29 /// The inner reader.
30 #[pin]
31 reader: R,
32 /// Have we received a drain rate request notification from the reactor,
33 /// but haven't yet sent a drain rate update back to the reactor?
34 pending_drain_rate_update: bool,
35}
36
37impl<R> XonXoffReader<R> {
38 /// Create a new [`XonXoffReader`].
39 ///
40 /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
41 /// the incoming stream buffer is empty or not.
42 pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
43 Self {
44 ctrl,
45 reader,
46 pending_drain_rate_update: false,
47 }
48 }
49
50 /// Get a reference to the inner [`AsyncRead`].
51 ///
52 /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
53 /// how you use the returned reader (for example if it uses interior mutability).
54 pub(crate) fn inner(&self) -> &R {
55 &self.reader
56 }
57
58 /// Get a mutable reference to the inner [`AsyncRead`].
59 ///
60 /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
61 /// how you use the returned reader (for example if you read bytes directly).
62 pub(crate) fn inner_mut(&mut self) -> &mut R {
63 &mut self.reader
64 }
65}
66
67impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
68 fn poll_read(
69 self: Pin<&mut Self>,
70 cx: &mut Context<'_>,
71 buf: &mut [u8],
72 ) -> Poll<Result<usize, Error>> {
73 let mut self_ = self.project();
74
75 // ensure that `drain_rate_request_stream` is a `FusedStream`,
76 // which means that we don't need to worry about calling `poll_next()` repeatedly
77 assert_val_impl_trait!(
78 self_.ctrl.drain_rate_request_stream,
79 futures::stream::FusedStream,
80 );
81
82 // check if the circuit reactor has requested a drain rate update
83 if let Poll::Ready(Some(())) = self_
84 .ctrl
85 .as_mut()
86 .project()
87 .drain_rate_request_stream
88 .poll_next(cx)
89 {
90 // a drain rate update was requested, so we need to send a drain rate update once we
91 // have no more bytes buffered
92 *self_.pending_drain_rate_update = true;
93 }
94
95 // try reading from the inner reader
96 let res = self_.reader.as_mut().poll_read(cx, buf);
97
98 // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
99 if *self_.pending_drain_rate_update && self_.reader.is_empty() {
100 // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
101 // send an "unlimited" drain rate
102 self_
103 .ctrl
104 .stream_target
105 .drain_rate_update(XonKbpsEwma::Unlimited)?;
106 *self_.pending_drain_rate_update = false;
107 }
108
109 res
110 }
111}
112
113/// The control structure for a stream that partakes in XON/XOFF flow control.
114#[derive(Debug)]
115#[pin_project]
116pub(crate) struct XonXoffReaderCtrl {
117 /// Receive notifications when the reactor requests a new drain rate.
118 /// When we do, we should begin waiting for the receive buffer to clear.
119 /// Then when the buffer clears, we should send a new drain rate update to the reactor.
120 #[pin]
121 drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
122 /// A handle to the reactor for this stream.
123 /// This allows us to send drain rate updates to the circuit reactor.
124 stream_target: StreamTarget,
125}
126
127impl XonXoffReaderCtrl {
128 /// Create a new [`XonXoffReaderCtrl`].
129 pub(crate) fn new(
130 drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
131 stream_target: StreamTarget,
132 ) -> Self {
133 Self {
134 drain_rate_request_stream,
135 stream_target,
136 }
137 }
138}
139
140/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
141/// (typically resulting in an XON message).
142pub(crate) trait BufferIsEmpty {
143 /// Returns `true` if there are no incoming bytes buffered on this stream.
144 ///
145 /// This takes a `&mut` so that implementers can
146 /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
147 /// a stream if necessary.
148 fn is_empty(self: Pin<&mut Self>) -> bool;
149}