Skip to main content

tor_proto/
stream.rs

1//! Tor stream handling.
2//!
3//! A stream is an anonymized conversation; multiple streams can be
4//! multiplexed over a single circuit.
5
6pub(crate) mod cmdcheck;
7pub(crate) mod flow_ctrl;
8pub(crate) mod raw;
9
10#[cfg(any(feature = "hs-service", feature = "relay"))]
11pub(crate) mod incoming;
12
13pub(crate) mod queue;
14
15use futures::SinkExt as _;
16use oneshot_fused_workaround as oneshot;
17use postage::watch;
18use safelog::sensitive;
19
20use tor_async_utils::SinkCloseChannel as _;
21use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
22use tor_cell::relaycell::msg::{AnyRelayMsg, End};
23use tor_cell::relaycell::{RelayCellFormat, StreamId, UnparsedRelayMsg};
24use tor_memquota::mq_queue::{self, MpscSpec};
25
26use flow_ctrl::state::StreamRateLimit;
27
28use crate::memquota::StreamAccount;
29use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
30use crate::stream::raw::StreamReceiver;
31use crate::{ClientTunnel, Error, HopLocation, Result};
32
33use std::pin::Pin;
34use std::sync::Arc;
35
36/// Initial value for outbound flow-control window on streams.
37pub(crate) const SEND_WINDOW_INIT: u16 = 500;
38/// Initial value for inbound flow-control window on streams.
39pub(crate) const RECV_WINDOW_INIT: u16 = 500;
40
41/// Size of the buffer used between the reactor and a `StreamReader`.
42///
43/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
44///             get sent more than the receive window anyway!). We might do due to things that
45///             don't count towards the window though.
46pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
47
48/// MPSC queue relating to a stream (either inbound or outbound), sender
49pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
50/// MPSC queue relating to a stream (either inbound or outbound), receiver
51pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
52
53/// A behavior to perform when closing a stream.
54///
55/// We don't use `Option<End>` here, since the behavior of `SendNothing` is so surprising
56/// that we shouldn't let it pass unremarked.
57#[derive(Clone, Debug)]
58pub(crate) enum CloseStreamBehavior {
59    /// Send nothing at all, so that the other side will not realize we have
60    /// closed the stream.
61    ///
62    /// We should only do this for incoming onion service streams when we
63    /// want to black-hole the client's requests.
64    SendNothing,
65    /// Send an End cell, if we haven't already sent one.
66    SendEnd(End),
67}
68
69impl Default for CloseStreamBehavior {
70    fn default() -> Self {
71        Self::SendEnd(End::new_misc())
72    }
73}
74
75/// A collection of components that can be combined to implement a Tor stream,
76/// or anything that requires a stream ID.
77///
78/// Not all components may be needed, depending on the purpose of the "stream".
79/// For example we build `RELAY_RESOLVE` requests like we do data streams,
80/// but they won't use the `StreamTarget` as they don't need to send additional
81/// messages.
82#[derive(Debug)]
83pub(crate) struct StreamComponents {
84    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
85    pub(crate) stream_receiver: StreamReceiver,
86    /// A handle that can communicate messages to the circuit reactor for this stream.
87    pub(crate) target: StreamTarget,
88    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
89    pub(crate) memquota: StreamAccount,
90    /// The control information needed to add XON/XOFF flow control to the stream.
91    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
92}
93
94/// Internal handle, used to implement a stream on a particular tunnel.
95///
96/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
97/// the reader should additionally hold an `mpsc::Receiver` to get
98/// relay messages for the stream.
99///
100/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
101/// close the stream by sending an END message to the other side.
102/// You can close a stream earlier by using [`StreamTarget::close`]
103/// or [`StreamTarget::close_pending`].
104#[derive(Clone, Debug)]
105pub(crate) struct StreamTarget {
106    /// Which hop of the circuit this stream is with.
107    pub(crate) hop: Option<HopLocation>,
108    /// Reactor ID for this stream.
109    pub(crate) stream_id: StreamId,
110    /// Encoding to use for relay cells sent on this stream.
111    ///
112    /// This is mostly irrelevant, except when deciding
113    /// how many bytes we can pack in a DATA message.
114    pub(crate) relay_cell_format: RelayCellFormat,
115    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
116    // TODO(arti#2068): we should consider making this an `Option`
117    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
118    /// Channel to send cells down.
119    pub(crate) tx: StreamMpscSender<AnyRelayMsg>,
120    /// Reference to the tunnel that this stream is on.
121    pub(crate) tunnel: Tunnel,
122}
123
124/// A client or relay tunnel.
125#[derive(Debug, Clone, derive_more::From)]
126pub(crate) enum Tunnel {
127    /// A client tunnel.
128    Client(Arc<ClientTunnel>),
129    /// A relay tunnel.
130    #[cfg(feature = "relay")]
131    Relay(Arc<crate::relay::RelayCirc>),
132}
133
134impl StreamTarget {
135    /// Deliver a relay message for the stream that owns this StreamTarget.
136    ///
137    /// The StreamTarget will set the correct stream ID and pick the
138    /// right hop, but will not validate that the message is well-formed
139    /// or meaningful in context.
140    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
141        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
142        Ok(())
143    }
144
145    /// Close the pending stream that owns this StreamTarget, delivering the specified
146    /// END message (if any)
147    ///
148    /// The stream is closed by sending a control message (`CtrlMsg::ClosePendingStream`)
149    /// to the reactor.
150    ///
151    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
152    ///
153    /// The StreamTarget will set the correct stream ID and pick the
154    /// right hop, but will not validate that the message is well-formed
155    /// or meaningful in context.
156    ///
157    /// Note that in many cases, the actual contents of an END message can leak unwanted
158    /// information. Please consider carefully before sending anything but an
159    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
160    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
161    ///
162    /// In addition to sending the END message, this function also ensures
163    /// the state of the stream map entry of this stream is updated
164    /// accordingly.
165    ///
166    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
167    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
168    /// function is for closing pending incoming streams (a stream is said to be pending if we have
169    /// received the message initiating the stream but have not responded to it yet).
170    ///
171    /// **NOTE**: This function should be called at most once per request.
172    /// Calling it twice is an error.
173    #[cfg(any(feature = "hs-service", feature = "relay"))]
174    pub(crate) fn close_pending(
175        &self,
176        message: crate::stream::CloseStreamBehavior,
177    ) -> Result<oneshot::Receiver<Result<()>>> {
178        match &self.tunnel {
179            Tunnel::Client(t) => {
180                cfg_if::cfg_if! {
181                    if #[cfg(feature = "hs-service")] {
182                        t.close_pending(self.stream_id, self.hop, message)
183                    } else {
184                        Err(tor_error::internal!("close_pending() called on client stream?!").into())
185                    }
186                }
187            }
188            #[cfg(feature = "relay")]
189            Tunnel::Relay(t) => t.close_pending(self.stream_id, message),
190        }
191    }
192
193    /// Queue a "close" for the stream corresponding to this StreamTarget.
194    ///
195    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
196    ///
197    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
198    /// on this `StreamTarget`` or any clone of it.
199    /// The reactor *will* try to flush any already-send messages before it closes the stream.
200    ///
201    /// You don't need to call this method if the stream is closing because all of its StreamTargets
202    /// have been dropped.
203    pub(crate) fn close(&mut self) {
204        Pin::new(&mut self.tx).close_channel();
205    }
206
207    /// Called when a circuit-level protocol error has occurred and the
208    /// tunnel needs to shut down.
209    pub(crate) fn protocol_error(&mut self) {
210        match &self.tunnel {
211            Tunnel::Client(t) => t.terminate(),
212            #[cfg(feature = "relay")]
213            Tunnel::Relay(t) => t.terminate(),
214        }
215    }
216
217    /// Request to send a SENDME cell for this stream.
218    ///
219    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
220    /// block or wait for a response from the circuit reactor.
221    /// An error is only returned if we are unable to send the request.
222    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
223    /// this here and an error will not be returned.
224    pub(crate) fn send_sendme(&mut self) -> Result<()> {
225        match &self.tunnel {
226            Tunnel::Client(t) => t.send_sendme(self.stream_id, self.hop),
227            #[cfg(feature = "relay")]
228            Tunnel::Relay(t) => t.send_sendme(self.stream_id),
229        }
230    }
231
232    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
233    ///
234    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
235    /// the stream.
236    /// But it may decide not to, and may discard this update.
237    /// For example the stream may have a large amount of buffered data, and the reactor may not
238    /// want to send an XON while the buffer is large.
239    ///
240    /// This sends a message to inform the circuit reactor of the new drain rate,
241    /// but it does not block or wait for a response from the reactor.
242    /// An error is only returned if we are unable to send the update.
243    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
244        match &mut self.tunnel {
245            Tunnel::Client(t) => t.drain_rate_update(self.stream_id, self.hop, rate),
246            #[cfg(feature = "relay")]
247            Tunnel::Relay(t) => t.drain_rate_update(self.stream_id, rate),
248        }
249    }
250
251    /// Return a reference to the tunnel that this `StreamTarget` is using.
252    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
253    pub(crate) fn tunnel(&self) -> &Tunnel {
254        &self.tunnel
255    }
256
257    /// Return the kind of relay cell in use on this `StreamTarget`.
258    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
259        self.relay_cell_format
260    }
261
262    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
263    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
264        &self.rate_limit_stream
265    }
266}
267
268/// Return the stream ID of `msg`, if it has one.
269///
270/// Returns `Ok(None)` if `msg` is a meta cell.
271pub(crate) fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
272    let cmd = msg.cmd();
273    let streamid = msg.stream_id();
274    if !cmd.accepts_streamid_val(streamid) {
275        return Err(Error::CircProto(format!(
276            "Invalid stream ID {} for relay command {}",
277            sensitive(StreamId::get_or_zero(streamid)),
278            msg.cmd()
279        )));
280    }
281
282    Ok(streamid)
283}