Skip to main content

tor_proto/streammap/
halfstream.rs

1//! Type and code for handling a "half-closed" stream.
2//!
3//! A half-closed stream is one that we've sent an END on, but where
4//! we might still receive some cells.
5
6use crate::Result;
7use crate::congestion::sendme::{StreamRecvWindow, cmd_counts_towards_windows};
8use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
9use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
11
12/// Type to track state of half-closed streams.
13///
14/// A half-closed stream is one where we've sent an END cell, but where
15/// the other side might still send us data.
16///
17/// We need to track these streams instead of forgetting about them entirely,
18/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
19/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
20#[derive(Debug)]
21pub(crate) struct HalfStream {
22    /// Flow control for this stream.
23    ///
24    /// Used to process incoming flow control messages (SENDME, XON, etc).
25    flow_control: StreamFlowCtrl,
26    /// Receive window for this stream. Used to detect whether we get too
27    /// many data cells.
28    recvw: StreamRecvWindow,
29    /// Object to tell us which cells to accept on this stream.
30    cmd_checker: AnyCmdChecker,
31}
32
33impl HalfStream {
34    /// Create a new half-closed stream.
35    pub(crate) fn new(
36        flow_control: StreamFlowCtrl,
37        recvw: StreamRecvWindow,
38        cmd_checker: AnyCmdChecker,
39    ) -> Self {
40        HalfStream {
41            flow_control,
42            recvw,
43            cmd_checker,
44        }
45    }
46
47    /// Process an incoming message and adjust this HalfStream accordingly.
48    /// Give an error if the protocol has been violated.
49    ///
50    /// The caller must handle END cells; it is an internal error to pass
51    /// END cells to this method.
52    /// no ends here.
53    pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
54        use StreamStatus::*;
55
56        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
57        //
58        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
59        // if possible
60        match msg.cmd() {
61            RelayCmd::SENDME => {
62                self.flow_control.put_for_incoming_sendme(msg)?;
63                return Ok(Open);
64            }
65            RelayCmd::XON => {
66                self.flow_control.handle_incoming_xon(msg)?;
67                return Ok(Open);
68            }
69            RelayCmd::XOFF => {
70                self.flow_control.handle_incoming_xoff(msg)?;
71                return Ok(Open);
72            }
73            _ => {}
74        }
75
76        if cmd_counts_towards_windows(msg.cmd()) {
77            self.recvw.take()?;
78        }
79
80        let status = self.cmd_checker.check_msg(&msg)?;
81        self.cmd_checker.consume_checked_msg(msg)?;
82        Ok(status)
83    }
84}
85
86#[cfg(test)]
87mod test {
88    // @@ begin test lint list maintained by maint/add_warning @@
89    #![allow(clippy::bool_assert_comparison)]
90    #![allow(clippy::clone_on_copy)]
91    #![allow(clippy::dbg_macro)]
92    #![allow(clippy::mixed_attributes_style)]
93    #![allow(clippy::print_stderr)]
94    #![allow(clippy::print_stdout)]
95    #![allow(clippy::single_char_pattern)]
96    #![allow(clippy::unwrap_used)]
97    #![allow(clippy::unchecked_time_subtraction)]
98    #![allow(clippy::useless_vec)]
99    #![allow(clippy::needless_pass_by_value)]
100    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
101    use super::*;
102    use crate::{
103        client::stream::OutboundDataCmdChecker,
104        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
105    };
106    use rand::{CryptoRng, Rng};
107    use tor_basic_utils::test_rng::testing_rng;
108    use tor_cell::relaycell::{
109        AnyRelayMsgOuter, RelayCellFormat, StreamId,
110        msg::{self, AnyRelayMsg},
111    };
112
113    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
114        UnparsedRelayMsg::from_singleton_body(
115            RelayCellFormat::V0,
116            AnyRelayMsgOuter::new(StreamId::new(77), val)
117                .encode(RelayCellFormat::V0, rng)
118                .expect("encoding failed"),
119        )
120        .unwrap()
121    }
122
123    #[test]
124    fn halfstream_sendme() {
125        let mut rng = testing_rng();
126
127        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
128        // expecting one is if the window busts its maximum.
129        //
130        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
131        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
132        let sendw = StreamSendWindow::new(450);
133
134        let mut hs = HalfStream::new(
135            StreamFlowCtrl::new_window(sendw),
136            StreamRecvWindow::new(20),
137            OutboundDataCmdChecker::new_any(),
138        );
139
140        // one sendme is fine
141        let m = msg::Sendme::new_empty();
142        assert!(
143            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
144                .is_ok()
145        );
146        // but no more were expected!
147        let e = hs
148            .handle_msg(to_unparsed(&mut rng, m.into()))
149            .err()
150            .unwrap();
151        assert_eq!(
152            format!("{}", e),
153            "Circuit protocol violation: Unexpected stream SENDME"
154        );
155    }
156
157    fn hs_new() -> HalfStream {
158        HalfStream::new(
159            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
160            StreamRecvWindow::new(20),
161            OutboundDataCmdChecker::new_any(),
162        )
163    }
164
165    #[test]
166    fn halfstream_data() {
167        let mut hs = hs_new();
168        let mut rng = testing_rng();
169
170        // we didn't give a connected cell during setup, so do it now.
171        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
172            .unwrap();
173
174        // 20 data cells are okay.
175        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
176        for _ in 0_u8..20 {
177            assert!(
178                hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
179                    .is_ok()
180            );
181        }
182
183        // But one more is a protocol violation.
184        let e = hs
185            .handle_msg(to_unparsed(&mut rng, m.into()))
186            .err()
187            .unwrap();
188        assert_eq!(
189            format!("{}", e),
190            "Circuit protocol violation: Received a data cell in violation of a window"
191        );
192    }
193
194    #[test]
195    fn halfstream_connected() {
196        let mut hs = hs_new();
197        let mut rng = testing_rng();
198        // We were told to accept a connected, so we'll accept one
199        // and no more.
200        let m = msg::Connected::new_empty();
201        assert!(
202            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
203                .is_ok()
204        );
205        assert!(
206            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
207                .is_err()
208        );
209
210        // If we try that again _after getting a connected_,
211        // accept any.
212        let mut cmd_checker = OutboundDataCmdChecker::new_any();
213        {
214            cmd_checker
215                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
216                .unwrap();
217        }
218        let mut hs = HalfStream::new(
219            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
220            StreamRecvWindow::new(20),
221            cmd_checker,
222        );
223        let e = hs
224            .handle_msg(to_unparsed(&mut rng, m.into()))
225            .err()
226            .unwrap();
227        assert_eq!(
228            format!("{}", e),
229            "Stream protocol violation: Received CONNECTED twice on a stream."
230        );
231    }
232
233    #[test]
234    fn halfstream_other() {
235        let mut hs = hs_new();
236        let mut rng = testing_rng();
237        let m = msg::Extended2::new(Vec::new());
238        let e = hs
239            .handle_msg(to_unparsed(&mut rng, m.into()))
240            .err()
241            .unwrap();
242        assert_eq!(
243            format!("{}", e),
244            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
245        );
246    }
247}