tor_proto/tunnel/
halfstream.rs1use crate::congestion::sendme::{cmd_counts_towards_windows, StreamRecvWindow};
7use crate::stream::{AnyCmdChecker, StreamFlowControl, StreamStatus};
8use crate::Result;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10
11#[derive(Debug)]
20pub(super) struct HalfStream {
21 flow_control: StreamFlowControl,
25 recvw: StreamRecvWindow,
28 cmd_checker: AnyCmdChecker,
30}
31
32impl HalfStream {
33 pub(super) fn new(
35 flow_control: StreamFlowControl,
36 recvw: StreamRecvWindow,
37 cmd_checker: AnyCmdChecker,
38 ) -> Self {
39 HalfStream {
40 flow_control,
41 recvw,
42 cmd_checker,
43 }
44 }
45
46 pub(super) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
53 use StreamStatus::*;
54
55 match msg.cmd() {
60 RelayCmd::SENDME => {
61 self.flow_control.put_for_incoming_sendme(msg)?;
62 return Ok(Open);
63 }
64 RelayCmd::XON => {
65 self.flow_control.handle_incoming_xon(msg)?;
66 return Ok(Open);
67 }
68 RelayCmd::XOFF => {
69 self.flow_control.handle_incoming_xoff(msg)?;
70 return Ok(Open);
71 }
72 _ => {}
73 }
74
75 if cmd_counts_towards_windows(msg.cmd()) {
76 self.recvw.take()?;
77 }
78
79 let status = self.cmd_checker.check_msg(&msg)?;
80 self.cmd_checker.consume_checked_msg(msg)?;
81 Ok(status)
82 }
83}
84
85#[cfg(test)]
86mod test {
87 #![allow(clippy::bool_assert_comparison)]
89 #![allow(clippy::clone_on_copy)]
90 #![allow(clippy::dbg_macro)]
91 #![allow(clippy::mixed_attributes_style)]
92 #![allow(clippy::print_stderr)]
93 #![allow(clippy::print_stdout)]
94 #![allow(clippy::single_char_pattern)]
95 #![allow(clippy::unwrap_used)]
96 #![allow(clippy::unchecked_duration_subtraction)]
97 #![allow(clippy::useless_vec)]
98 #![allow(clippy::needless_pass_by_value)]
99 use super::*;
101 use crate::{
102 congestion::sendme::{StreamRecvWindow, StreamSendWindow},
103 stream::DataCmdChecker,
104 };
105 use rand::{CryptoRng, Rng};
106 use tor_basic_utils::test_rng::testing_rng;
107 use tor_cell::relaycell::{
108 msg::{self, AnyRelayMsg},
109 AnyRelayMsgOuter, RelayCellFormat, StreamId,
110 };
111
112 fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
113 UnparsedRelayMsg::from_singleton_body(
114 RelayCellFormat::V0,
115 AnyRelayMsgOuter::new(StreamId::new(77), val)
116 .encode(RelayCellFormat::V0, rng)
117 .expect("encoding failed"),
118 )
119 .unwrap()
120 }
121
122 #[test]
123 fn halfstream_sendme() {
124 let mut rng = testing_rng();
125
126 let sendw = StreamSendWindow::new(450);
132
133 let mut hs = HalfStream::new(
134 StreamFlowControl::new_window_based(sendw),
135 StreamRecvWindow::new(20),
136 DataCmdChecker::new_any(),
137 );
138
139 let m = msg::Sendme::new_empty();
141 assert!(hs
142 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
143 .is_ok());
144 let e = hs
146 .handle_msg(to_unparsed(&mut rng, m.into()))
147 .err()
148 .unwrap();
149 assert_eq!(
150 format!("{}", e),
151 "Circuit protocol violation: Unexpected stream SENDME"
152 );
153 }
154
155 fn hs_new() -> HalfStream {
156 HalfStream::new(
157 StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
158 StreamRecvWindow::new(20),
159 DataCmdChecker::new_any(),
160 )
161 }
162
163 #[test]
164 fn halfstream_data() {
165 let mut hs = hs_new();
166 let mut rng = testing_rng();
167
168 hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
170 .unwrap();
171
172 let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
174 for _ in 0_u8..20 {
175 assert!(hs
176 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
177 .is_ok());
178 }
179
180 let e = hs
182 .handle_msg(to_unparsed(&mut rng, m.into()))
183 .err()
184 .unwrap();
185 assert_eq!(
186 format!("{}", e),
187 "Circuit protocol violation: Received a data cell in violation of a window"
188 );
189 }
190
191 #[test]
192 fn halfstream_connected() {
193 let mut hs = hs_new();
194 let mut rng = testing_rng();
195 let m = msg::Connected::new_empty();
198 assert!(hs
199 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
200 .is_ok());
201 assert!(hs
202 .handle_msg(to_unparsed(&mut rng, m.clone().into()))
203 .is_err());
204
205 let mut cmd_checker = DataCmdChecker::new_any();
208 {
209 cmd_checker
210 .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
211 .unwrap();
212 }
213 let mut hs = HalfStream::new(
214 StreamFlowControl::new_window_based(StreamSendWindow::new(20)),
215 StreamRecvWindow::new(20),
216 cmd_checker,
217 );
218 let e = hs
219 .handle_msg(to_unparsed(&mut rng, m.into()))
220 .err()
221 .unwrap();
222 assert_eq!(
223 format!("{}", e),
224 "Stream protocol violation: Received CONNECTED twice on a stream."
225 );
226 }
227
228 #[test]
229 fn halfstream_other() {
230 let mut hs = hs_new();
231 let mut rng = testing_rng();
232 let m = msg::Extended2::new(Vec::new());
233 let e = hs
234 .handle_msg(to_unparsed(&mut rng, m.into()))
235 .err()
236 .unwrap();
237 assert_eq!(
238 format!("{}", e),
239 "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
240 );
241 }
242}