tor_proto/streammap/
halfstream.rs1use crate::Result;
7use crate::congestion::sendme::{StreamRecvWindow, cmd_counts_towards_windows};
8use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
9use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
11
12#[derive(Debug)]
21pub(crate) struct HalfStream {
22 flow_control: StreamFlowCtrl,
26 recvw: StreamRecvWindow,
29 cmd_checker: AnyCmdChecker,
31}
32
33impl HalfStream {
34 pub(crate) fn new(
36 flow_control: StreamFlowCtrl,
37 recvw: StreamRecvWindow,
38 cmd_checker: AnyCmdChecker,
39 ) -> Self {
40 HalfStream {
41 flow_control,
42 recvw,
43 cmd_checker,
44 }
45 }
46
47 pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
54 use StreamStatus::*;
55
56 match msg.cmd() {
61 RelayCmd::SENDME => {
62 self.flow_control.put_for_incoming_sendme(msg)?;
63 return Ok(Open);
64 }
65 RelayCmd::XON => {
66 self.flow_control.handle_incoming_xon(msg)?;
67 return Ok(Open);
68 }
69 RelayCmd::XOFF => {
70 self.flow_control.handle_incoming_xoff(msg)?;
71 return Ok(Open);
72 }
73 _ => {}
74 }
75
76 if cmd_counts_towards_windows(msg.cmd()) {
77 self.recvw.take()?;
78 }
79
80 let status = self.cmd_checker.check_msg(&msg)?;
81 self.cmd_checker.consume_checked_msg(msg)?;
82 Ok(status)
83 }
84}
85
86#[cfg(test)]
87mod test {
88 #![allow(clippy::bool_assert_comparison)]
90 #![allow(clippy::clone_on_copy)]
91 #![allow(clippy::dbg_macro)]
92 #![allow(clippy::mixed_attributes_style)]
93 #![allow(clippy::print_stderr)]
94 #![allow(clippy::print_stdout)]
95 #![allow(clippy::single_char_pattern)]
96 #![allow(clippy::unwrap_used)]
97 #![allow(clippy::unchecked_time_subtraction)]
98 #![allow(clippy::useless_vec)]
99 #![allow(clippy::needless_pass_by_value)]
100 use super::*;
102 use crate::{
103 client::stream::OutboundDataCmdChecker,
104 congestion::sendme::{StreamRecvWindow, StreamSendWindow},
105 };
106 use rand::{CryptoRng, Rng};
107 use tor_basic_utils::test_rng::testing_rng;
108 use tor_cell::relaycell::{
109 AnyRelayMsgOuter, RelayCellFormat, StreamId,
110 msg::{self, AnyRelayMsg},
111 };
112
113 fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
114 UnparsedRelayMsg::from_singleton_body(
115 RelayCellFormat::V0,
116 AnyRelayMsgOuter::new(StreamId::new(77), val)
117 .encode(RelayCellFormat::V0, rng)
118 .expect("encoding failed"),
119 )
120 .unwrap()
121 }
122
123 #[test]
124 fn halfstream_sendme() {
125 let mut rng = testing_rng();
126
127 let sendw = StreamSendWindow::new(450);
133
134 let mut hs = HalfStream::new(
135 StreamFlowCtrl::new_window(sendw),
136 StreamRecvWindow::new(20),
137 OutboundDataCmdChecker::new_any(),
138 );
139
140 let m = msg::Sendme::new_empty();
142 assert!(
143 hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
144 .is_ok()
145 );
146 let e = hs
148 .handle_msg(to_unparsed(&mut rng, m.into()))
149 .err()
150 .unwrap();
151 assert_eq!(
152 format!("{}", e),
153 "Circuit protocol violation: Unexpected stream SENDME"
154 );
155 }
156
157 fn hs_new() -> HalfStream {
158 HalfStream::new(
159 StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
160 StreamRecvWindow::new(20),
161 OutboundDataCmdChecker::new_any(),
162 )
163 }
164
165 #[test]
166 fn halfstream_data() {
167 let mut hs = hs_new();
168 let mut rng = testing_rng();
169
170 hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
172 .unwrap();
173
174 let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
176 for _ in 0_u8..20 {
177 assert!(
178 hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
179 .is_ok()
180 );
181 }
182
183 let e = hs
185 .handle_msg(to_unparsed(&mut rng, m.into()))
186 .err()
187 .unwrap();
188 assert_eq!(
189 format!("{}", e),
190 "Circuit protocol violation: Received a data cell in violation of a window"
191 );
192 }
193
194 #[test]
195 fn halfstream_connected() {
196 let mut hs = hs_new();
197 let mut rng = testing_rng();
198 let m = msg::Connected::new_empty();
201 assert!(
202 hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
203 .is_ok()
204 );
205 assert!(
206 hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
207 .is_err()
208 );
209
210 let mut cmd_checker = OutboundDataCmdChecker::new_any();
213 {
214 cmd_checker
215 .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
216 .unwrap();
217 }
218 let mut hs = HalfStream::new(
219 StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
220 StreamRecvWindow::new(20),
221 cmd_checker,
222 );
223 let e = hs
224 .handle_msg(to_unparsed(&mut rng, m.into()))
225 .err()
226 .unwrap();
227 assert_eq!(
228 format!("{}", e),
229 "Stream protocol violation: Received CONNECTED twice on a stream."
230 );
231 }
232
233 #[test]
234 fn halfstream_other() {
235 let mut hs = hs_new();
236 let mut rng = testing_rng();
237 let m = msg::Extended2::new(Vec::new());
238 let e = hs
239 .handle_msg(to_unparsed(&mut rng, m.into()))
240 .err()
241 .unwrap();
242 assert_eq!(
243 format!("{}", e),
244 "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
245 );
246 }
247}