Skip to main content

tor_proto/stream/
incoming.rs

1//! Incoming data stream cell handlers, shared by the relay and onion service implementations.
2
3use bitvec::prelude::*;
4use derive_deftly::Deftly;
5use oneshot_fused_workaround as oneshot;
6use postage::watch;
7
8use tor_cell::relaycell::msg::AnyRelayMsg;
9use tor_cell::relaycell::{RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg, msg};
10use tor_cell::restricted_msg;
11use tor_error::internal;
12use tor_memquota::derive_deftly_template_HasMemoryCost;
13use tor_memquota::mq_queue::{self, MpscSpec};
14use tor_rtcompat::DynTimeProvider;
15
16use crate::circuit::CircHopSyncView;
17use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
18use crate::stream::{CloseStreamBehavior, StreamComponents};
19use crate::{Error, Result};
20
21// TODO(relay): move these to a shared module
22use crate::client::stream::DataStream;
23
24use crate::memquota::StreamAccount;
25use crate::stream::StreamMpscSender;
26use crate::stream::flow_ctrl::state::StreamRateLimit;
27use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
28use crate::stream::queue::StreamQueueReceiver;
29use crate::util::notify::NotifyReceiver;
30use crate::{HopLocation, HopNum};
31
32use std::mem::size_of;
33
34/// A `CmdChecker` that enforces invariants for inbound data streams.
35#[derive(Debug, Default)]
36pub(crate) struct InboundDataCmdChecker;
37
38restricted_msg! {
39    /// An allowable incoming message on an incoming data stream.
40    enum IncomingDataStreamMsg:RelayMsg {
41        // SENDME is handled by the reactor.
42        Data, End,
43    }
44}
45
46impl CmdChecker for InboundDataCmdChecker {
47    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
48        use StreamStatus::*;
49        match msg.cmd() {
50            RelayCmd::DATA => Ok(Open),
51            RelayCmd::END => Ok(Closed),
52            _ => Err(Error::StreamProto(format!(
53                "Unexpected {} on an incoming data stream!",
54                msg.cmd()
55            ))),
56        }
57    }
58
59    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
60        let _ = msg
61            .decode::<IncomingDataStreamMsg>()
62            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
63        Ok(())
64    }
65}
66
67impl InboundDataCmdChecker {
68    /// Return a new boxed `DataCmdChecker` in a state suitable for a
69    /// connection where an initial CONNECTED cell is not expected.
70    ///
71    /// This is used by hidden services, exit relays, and directory servers
72    /// to accept streams.
73    pub(crate) fn new_connected() -> AnyCmdChecker {
74        Box::new(Self)
75    }
76}
77
78/// A pending request from the other end of the circuit for us to open a new
79/// stream.
80///
81/// Exits, directory caches, and onion services expect to receive these; others
82/// do not.
83///
84/// On receiving one of these objects, the party handling it should accept it or
85/// reject it.  If it is dropped without being explicitly handled, a reject
86/// message will be sent anyway.
87#[derive(Debug)]
88pub struct IncomingStream {
89    /// The runtime's time provider.
90    time_provider: DynTimeProvider,
91    /// The message that the client sent us to begin the stream.
92    request: IncomingStreamRequest,
93    /// Stream components used to assemble the [`DataStream`].
94    components: StreamComponents,
95}
96
97impl IncomingStream {
98    /// Create a new `IncomingStream`.
99    pub(crate) fn new(
100        time_provider: DynTimeProvider,
101        request: IncomingStreamRequest,
102        components: StreamComponents,
103    ) -> Self {
104        Self {
105            time_provider,
106            request,
107            components,
108        }
109    }
110
111    /// Return the underlying message that was used to try to begin this stream.
112    pub fn request(&self) -> &IncomingStreamRequest {
113        &self.request
114    }
115
116    /// Accept this stream as a new [`DataStream`], and send the client a
117    /// message letting them know the stream was accepted.
118    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
119        let Self {
120            time_provider,
121            request,
122            components:
123                StreamComponents {
124                    mut target,
125                    stream_receiver,
126                    xon_xoff_reader_ctrl,
127                    memquota,
128                },
129        } = self;
130
131        match request {
132            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
133                target.send(message.into()).await?;
134                Ok(DataStream::new_connected(
135                    time_provider,
136                    stream_receiver,
137                    xon_xoff_reader_ctrl,
138                    target,
139                    memquota,
140                ))
141            }
142            IncomingStreamRequest::Resolve(_) => {
143                Err(internal!("Cannot accept data on a RESOLVE stream").into())
144            }
145        }
146    }
147
148    /// Reject this request and send an error message to the client.
149    pub async fn reject(mut self, message: msg::End) -> Result<()> {
150        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
151
152        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
153    }
154
155    /// Reject this request and possibly send an error message to the client.
156    ///
157    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
158    fn reject_inner(
159        &mut self,
160        message: CloseStreamBehavior,
161    ) -> Result<oneshot::Receiver<Result<()>>> {
162        self.components.target.close_pending(message)
163    }
164
165    /// Ignore this request without replying to the client.
166    ///
167    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
168    /// `reject`, or this method, the drop handler will cause it to be
169    /// rejected.)
170    pub async fn discard(mut self) -> Result<()> {
171        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
172
173        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
174    }
175}
176
177// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
178// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
179// circuit reactor will see a close on its mpsc::Receiver, and the circuit
180// reactor will itself send an End.
181
182restricted_msg! {
183    /// The allowed incoming messages on an `IncomingStream`.
184    #[derive(Clone, Debug, Deftly)]
185    #[derive_deftly(HasMemoryCost)]
186    #[non_exhaustive]
187    pub enum IncomingStreamRequest: RelayMsg {
188        /// A BEGIN message.
189        Begin,
190        /// A BEGIN_DIR message.
191        BeginDir,
192        /// A RESOLVE message.
193        Resolve,
194    }
195}
196
197/// Bit-vector used to represent a list of permitted commands.
198///
199/// This is cheaper and faster than using a vec, and avoids side-channel
200/// attacks.
201type RelayCmdSet = bitvec::BitArr!(for 256);
202
203/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
204/// have a non-zero stream ID.
205#[derive(Debug)]
206pub(crate) struct IncomingCmdChecker {
207    /// The "begin" commands that can be received on this type of circuit:
208    ///
209    ///   * onion service circuits only accept `BEGIN`
210    ///   * all relay circuits accept `BEGIN_DIR`
211    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
212    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
213    ///     as well
214    allow_commands: RelayCmdSet,
215}
216
217impl IncomingCmdChecker {
218    /// Create a new boxed `IncomingCmdChecker`.
219    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
220        let mut array = BitArray::ZERO;
221        for c in allow_commands {
222            array.set(u8::from(*c) as usize, true);
223        }
224        Box::new(Self {
225            allow_commands: array,
226        })
227    }
228}
229
230impl CmdChecker for IncomingCmdChecker {
231    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
232        if self.allow_commands[u8::from(msg.cmd()) as usize] {
233            Ok(StreamStatus::Open)
234        } else {
235            Err(Error::StreamProto(format!(
236                "Unexpected {} on incoming stream",
237                msg.cmd()
238            )))
239        }
240    }
241
242    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
243        let _ = msg
244            .decode::<IncomingStreamRequest>()
245            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
246
247        Ok(())
248    }
249}
250
251/// A callback that can check whether a given stream request is acceptable
252/// immediately on its receipt.
253///
254/// This should only be used for checks that need to be done immediately, with a
255/// view of the state of the circuit hop the stream request arrived on.
256/// Any other checks should, if possible,
257/// be done on the [`IncomingStream`] objects as they are received.
258pub trait IncomingStreamRequestFilter: Send + 'static {
259    /// Check an incoming stream request, and decide what to do with it.
260    fn disposition(
261        &mut self,
262        ctx: &IncomingStreamRequestContext<'_>,
263        circ: &CircHopSyncView<'_>,
264    ) -> Result<IncomingStreamRequestDisposition>;
265}
266
267/// What action to take with an incoming stream request.
268#[derive(Clone, Debug)]
269#[non_exhaustive]
270pub enum IncomingStreamRequestDisposition {
271    /// Accept the request (for now) and pass it to the mpsc::Receiver
272    /// that is yielding them as [`IncomingStream``
273    Accept,
274    /// Rejected the request, and close the circuit on which it was received.
275    CloseCircuit,
276    /// Reject the request and send an END message.
277    RejectRequest(msg::End),
278}
279
280/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
281pub struct IncomingStreamRequestContext<'a> {
282    /// The request message itself
283    pub(crate) request: &'a IncomingStreamRequest,
284}
285impl<'a> IncomingStreamRequestContext<'a> {
286    /// Return a reference to the message used to request this stream.
287    pub fn request(&self) -> &'a IncomingStreamRequest {
288        self.request
289    }
290}
291
292/// Information about an incoming stream request.
293#[derive(Debug, Deftly)]
294#[derive_deftly(HasMemoryCost)]
295pub(crate) struct StreamReqInfo {
296    /// The [`IncomingStreamRequest`].
297    pub(crate) req: IncomingStreamRequest,
298    /// The ID of the stream being requested.
299    pub(crate) stream_id: StreamId,
300    /// The [`HopNum`].
301    ///
302    /// Set to `None` if we are an exit relay.
303    //
304    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
305    // incoming stream request from two separate hops.  (There is only one that's valid.)
306    pub(crate) hop: Option<HopLocation>,
307    /// The format which must be used with this stream to encode messages.
308    #[deftly(has_memory_cost(indirect_size = "0"))]
309    pub(crate) relay_cell_format: RelayCellFormat,
310    /// A channel for receiving messages from this stream.
311    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
312    pub(crate) receiver: StreamQueueReceiver,
313    /// A channel for sending messages to be sent on this stream.
314    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
315    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
316    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
317    // TODO(arti#2068): we should consider making this an `Option`
318    // the `watch::Sender` owns the indirect data
319    #[deftly(has_memory_cost(indirect_size = "0"))]
320    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
321    /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
322    /// requested.
323    #[deftly(has_memory_cost(indirect_size = "0"))]
324    pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
325    /// The memory quota account to be used for this stream
326    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
327    pub(crate) memquota: StreamAccount,
328}
329
330/// MPSC queue containing stream requests
331#[cfg(any(feature = "hs-service", feature = "relay"))]
332pub(crate) type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
333
334/// Data required for handling an incoming stream request.
335#[derive(educe::Educe)]
336#[educe(Debug)]
337#[cfg(any(feature = "hs-service", feature = "relay"))]
338pub(crate) struct IncomingStreamRequestHandler {
339    /// A sender for sharing information about an incoming stream request.
340    pub(crate) incoming_sender: StreamReqSender,
341    /// The hop to expect incoming stream requests from.
342    ///
343    /// Set to `None` if we are a relay.
344    pub(crate) hop_num: Option<HopNum>,
345    /// A [`CmdChecker`] for validating incoming streams.
346    pub(crate) cmd_checker: AnyCmdChecker,
347    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
348    /// this request, or wants to reject it immediately.
349    #[educe(Debug(ignore))]
350    pub(crate) filter: Box<dyn IncomingStreamRequestFilter>,
351}
352
353#[cfg(test)]
354mod test {
355    // @@ begin test lint list maintained by maint/add_warning @@
356    #![allow(clippy::bool_assert_comparison)]
357    #![allow(clippy::clone_on_copy)]
358    #![allow(clippy::dbg_macro)]
359    #![allow(clippy::mixed_attributes_style)]
360    #![allow(clippy::print_stderr)]
361    #![allow(clippy::print_stdout)]
362    #![allow(clippy::single_char_pattern)]
363    #![allow(clippy::unwrap_used)]
364    #![allow(clippy::unchecked_time_subtraction)]
365    #![allow(clippy::useless_vec)]
366    #![allow(clippy::needless_pass_by_value)]
367    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
368
369    use tor_cell::relaycell::{
370        AnyRelayMsgOuter, RelayCellFormat,
371        msg::{Begin, BeginDir, Data, Resolve},
372    };
373
374    use super::*;
375
376    #[test]
377    fn incoming_cmd_checker() {
378        // Convert an AnyRelayMsg to an UnparsedRelayCell.
379        let u = |msg| {
380            let body = AnyRelayMsgOuter::new(None, msg)
381                .encode(RelayCellFormat::V0, &mut rand::rng())
382                .unwrap();
383            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
384        };
385        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
386        let begin_dir = u(BeginDir::default().into());
387        let resolve = u(Resolve::new("allium.example.com").into());
388        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
389
390        {
391            let mut cc_none = IncomingCmdChecker::new_any(&[]);
392            for m in [&begin, &begin_dir, &resolve, &data] {
393                assert!(cc_none.check_msg(m).is_err());
394            }
395        }
396
397        {
398            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
399            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
400            for m in [&begin_dir, &resolve, &data] {
401                assert!(cc_begin.check_msg(m).is_err());
402            }
403        }
404
405        {
406            let mut cc_any = IncomingCmdChecker::new_any(&[
407                RelayCmd::BEGIN,
408                RelayCmd::BEGIN_DIR,
409                RelayCmd::RESOLVE,
410            ]);
411            for m in [&begin, &begin_dir, &resolve] {
412                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
413            }
414            assert!(cc_any.check_msg(&data).is_err());
415        }
416    }
417}