tor_proto/tunnel/
halfstream.rs

1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
5
6use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamFlowControl, StreamStatus};
8use crate::Result;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10
11/// Type to track state of half-closed streams.
12///
13/// A half-closed stream is one where we've sent an END cell, but where
14/// the other side might still send us data.
15///
16/// We need to track these streams instead of forgetting about them entirely,
17/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19#[derive(Debug)]
20pub(super) struct HalfStream {
21    /// Flow control for this stream.
22    ///
23    /// Used to process incoming flow control messages (SENDME, XON, etc).
24    flow_control: StreamFlowControl,
25    /// Receive window for this stream. Used to detect whether we get too
26    /// many data cells.
27    recvw: StreamRecvWindow,
28    /// Object to tell us which cells to accept on this stream.
29    cmd_checker: AnyCmdChecker,
30}
31
32impl HalfStream {
33    /// Create a new half-closed stream.
34    pub(super) fn new(
35        flow_control: StreamFlowControl,
36        recvw: StreamRecvWindow,
37        cmd_checker: AnyCmdChecker,
38    ) -> Self {
39        HalfStream {
40            flow_control,
41            recvw,
42            cmd_checker,
43        }
44    }
45
46    /// Process an incoming message and adjust this HalfStream accordingly.
47    /// Give an error if the protocol has been violated.
48    ///
49    /// The caller must handle END cells; it is an internal error to pass
50    /// END cells to this method.
51    /// no ends here.
52    pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
53        use StreamStatus::*;
54
55        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
56        //
57        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
58        // if possible
59        match msg.cmd() {
60            RelayCmd::SENDME => {
61                self.flow_control.put_for_incoming_sendme(msg)?;
62                return Ok(Open);
63            }
64            RelayCmd::XON => {
65                self.flow_control.handle_incoming_xon(msg)?;
66                return Ok(Open);
67            }
68            RelayCmd::XOFF => {
69                self.flow_control.handle_incoming_xoff(msg)?;
70                return Ok(Open);
71            }
72            _ => {}
73        }
74
75        if cmd_counts_towards_windows(msg.cmd()) {
76            self.recvw.take()?;
77        }
78
79        let status = self.cmd_checker.check_msg(&msg)?;
80        self.cmd_checker.consume_checked_msg(msg)?;
81        Ok(status)
82    }
83}
84
85#[cfg(test)]
86mod test {
87    // @@ begin test lint list maintained by maint/add_warning @@
88    #![allow(clippy::bool_assert_comparison)]
89    #![allow(clippy::clone_on_copy)]
90    #![allow(clippy::dbg_macro)]
91    #![allow(clippy::mixed_attributes_style)]
92    #![allow(clippy::print_stderr)]
93    #![allow(clippy::print_stdout)]
94    #![allow(clippy::single_char_pattern)]
95    #![allow(clippy::unwrap_used)]
96    #![allow(clippy::unchecked_duration_subtraction)]
97    #![allow(clippy::useless_vec)]
98    #![allow(clippy::needless_pass_by_value)]
99    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
100    use super::*;
101    use crate::{
102        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
103        stream::DataCmdChecker,
104    };
105    use rand::{CryptoRng, Rng};
106    use tor_basic_utils::test_rng::testing_rng;
107    use tor_cell::relaycell::{
108        msg::{self, AnyRelayMsg},
109        AnyRelayMsgOuter, RelayCellFormat, StreamId,
110    };
111
112    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
113        UnparsedRelayMsg::from_singleton_body(
114            RelayCellFormat::V0,
115            AnyRelayMsgOuter::new(StreamId::new(77), val)
116                .encode(RelayCellFormat::V0, rng)
117                .expect("encoding failed"),
118        )
119        .unwrap()
120    }
121
122    #[test]
123    fn halfstream_sendme() {
124        let mut rng = testing_rng();
125
126        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
127        // expecting one is if the window busts its maximum.
128        //
129        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
130        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
131        let sendw = StreamSendWindow::new(450);
132
133        let mut hs = HalfStream::new(
134            StreamFlowControl::new_window_based(sendw),
135            StreamRecvWindow::new(20),
136            DataCmdChecker::new_any(),
137        );
138
139        // one sendme is fine
140        let m = msg::Sendme::new_empty();
141        assert!(hs
142            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
143            .is_ok());
144        // but no more were expected!
145        let e = hs
146            .handle_msg(to_unparsed(&mut rng, m.into()))
147            .err()
148            .unwrap();
149        assert_eq!(
150            format!("{}", e),
151            "Circuit protocol violation: Unexpected stream SENDME"
152        );
153    }
154
155    fn hs_new() -> HalfStream {
156        HalfStream::new(
157            StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
158            StreamRecvWindow::new(20),
159            DataCmdChecker::new_any(),
160        )
161    }
162
163    #[test]
164    fn halfstream_data() {
165        let mut hs = hs_new();
166        let mut rng = testing_rng();
167
168        // we didn't give a connected cell during setup, so do it now.
169        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
170            .unwrap();
171
172        // 20 data cells are okay.
173        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
174        for _ in 0_u8..20 {
175            assert!(hs
176                .handle_msg(to_unparsed(&mut rng, m.clone().into()))
177                .is_ok());
178        }
179
180        // But one more is a protocol violation.
181        let e = hs
182            .handle_msg(to_unparsed(&mut rng, m.into()))
183            .err()
184            .unwrap();
185        assert_eq!(
186            format!("{}", e),
187            "Circuit protocol violation: Received a data cell in violation of a window"
188        );
189    }
190
191    #[test]
192    fn halfstream_connected() {
193        let mut hs = hs_new();
194        let mut rng = testing_rng();
195        // We were told to accept a connected, so we'll accept one
196        // and no more.
197        let m = msg::Connected::new_empty();
198        assert!(hs
199            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
200            .is_ok());
201        assert!(hs
202            .handle_msg(to_unparsed(&mut rng, m.clone().into()))
203            .is_err());
204
205        // If we try that again _after getting a connected_,
206        // accept any.
207        let mut cmd_checker = DataCmdChecker::new_any();
208        {
209            cmd_checker
210                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
211                .unwrap();
212        }
213        let mut hs = HalfStream::new(
214            StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
215            StreamRecvWindow::new(20),
216            cmd_checker,
217        );
218        let e = hs
219            .handle_msg(to_unparsed(&mut rng, m.into()))
220            .err()
221            .unwrap();
222        assert_eq!(
223            format!("{}", e),
224            "Stream protocol violation: Received CONNECTED twice on a stream."
225        );
226    }
227
228    #[test]
229    fn halfstream_other() {
230        let mut hs = hs_new();
231        let mut rng = testing_rng();
232        let m = msg::Extended2::new(Vec::new());
233        let e = hs
234            .handle_msg(to_unparsed(&mut rng, m.into()))
235            .err()
236            .unwrap();
237        assert_eq!(
238            format!("{}", e),
239            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
240        );
241    }
242}