tor_proto/stream.rs
1//! Tor stream handling.
2//!
3//! A stream is an anonymized conversation; multiple streams can be
4//! multiplexed over a single circuit.
5
6pub(crate) mod cmdcheck;
7pub(crate) mod flow_ctrl;
8pub(crate) mod raw;
9
10#[cfg(any(feature = "hs-service", feature = "relay"))]
11pub(crate) mod incoming;
12
13pub(crate) mod queue;
14
15use futures::SinkExt as _;
16use oneshot_fused_workaround as oneshot;
17use postage::watch;
18use safelog::sensitive;
19
20use tor_async_utils::SinkCloseChannel as _;
21use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
22use tor_cell::relaycell::msg::{AnyRelayMsg, End};
23use tor_cell::relaycell::{RelayCellFormat, StreamId, UnparsedRelayMsg};
24use tor_memquota::mq_queue::{self, MpscSpec};
25
26use flow_ctrl::state::StreamRateLimit;
27
28use crate::memquota::StreamAccount;
29use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
30use crate::stream::raw::StreamReceiver;
31use crate::{ClientTunnel, Error, HopLocation, Result};
32
33use std::pin::Pin;
34use std::sync::Arc;
35
36/// Initial value for outbound flow-control window on streams.
37pub(crate) const SEND_WINDOW_INIT: u16 = 500;
38/// Initial value for inbound flow-control window on streams.
39pub(crate) const RECV_WINDOW_INIT: u16 = 500;
40
41/// Size of the buffer used between the reactor and a `StreamReader`.
42///
43/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
44/// get sent more than the receive window anyway!). We might do due to things that
45/// don't count towards the window though.
46pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
47
48/// MPSC queue relating to a stream (either inbound or outbound), sender
49pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
50/// MPSC queue relating to a stream (either inbound or outbound), receiver
51pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
52
53/// A behavior to perform when closing a stream.
54///
55/// We don't use `Option<End>` here, since the behavior of `SendNothing` is so surprising
56/// that we shouldn't let it pass unremarked.
57#[derive(Clone, Debug)]
58pub(crate) enum CloseStreamBehavior {
59 /// Send nothing at all, so that the other side will not realize we have
60 /// closed the stream.
61 ///
62 /// We should only do this for incoming onion service streams when we
63 /// want to black-hole the client's requests.
64 SendNothing,
65 /// Send an End cell, if we haven't already sent one.
66 SendEnd(End),
67}
68
69impl Default for CloseStreamBehavior {
70 fn default() -> Self {
71 Self::SendEnd(End::new_misc())
72 }
73}
74
75/// A collection of components that can be combined to implement a Tor stream,
76/// or anything that requires a stream ID.
77///
78/// Not all components may be needed, depending on the purpose of the "stream".
79/// For example we build `RELAY_RESOLVE` requests like we do data streams,
80/// but they won't use the `StreamTarget` as they don't need to send additional
81/// messages.
82#[derive(Debug)]
83pub(crate) struct StreamComponents {
84 /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
85 pub(crate) stream_receiver: StreamReceiver,
86 /// A handle that can communicate messages to the circuit reactor for this stream.
87 pub(crate) target: StreamTarget,
88 /// The memquota [account](tor_memquota::Account) to use for data on this stream.
89 pub(crate) memquota: StreamAccount,
90 /// The control information needed to add XON/XOFF flow control to the stream.
91 pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
92}
93
94/// Internal handle, used to implement a stream on a particular tunnel.
95///
96/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
97/// the reader should additionally hold an `mpsc::Receiver` to get
98/// relay messages for the stream.
99///
100/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
101/// close the stream by sending an END message to the other side.
102/// You can close a stream earlier by using [`StreamTarget::close`]
103/// or [`StreamTarget::close_pending`].
104#[derive(Clone, Debug)]
105pub(crate) struct StreamTarget {
106 /// Which hop of the circuit this stream is with.
107 pub(crate) hop: Option<HopLocation>,
108 /// Reactor ID for this stream.
109 pub(crate) stream_id: StreamId,
110 /// Encoding to use for relay cells sent on this stream.
111 ///
112 /// This is mostly irrelevant, except when deciding
113 /// how many bytes we can pack in a DATA message.
114 pub(crate) relay_cell_format: RelayCellFormat,
115 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
116 // TODO(arti#2068): we should consider making this an `Option`
117 pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
118 /// Channel to send cells down.
119 pub(crate) tx: StreamMpscSender<AnyRelayMsg>,
120 /// Reference to the tunnel that this stream is on.
121 pub(crate) tunnel: Tunnel,
122}
123
124/// A client or relay tunnel.
125#[derive(Debug, Clone, derive_more::From)]
126pub(crate) enum Tunnel {
127 /// A client tunnel.
128 Client(Arc<ClientTunnel>),
129 /// A relay tunnel.
130 #[cfg(feature = "relay")]
131 Relay(Arc<crate::relay::RelayCirc>),
132}
133
134impl StreamTarget {
135 /// Deliver a relay message for the stream that owns this StreamTarget.
136 ///
137 /// The StreamTarget will set the correct stream ID and pick the
138 /// right hop, but will not validate that the message is well-formed
139 /// or meaningful in context.
140 pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
141 self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
142 Ok(())
143 }
144
145 /// Close the pending stream that owns this StreamTarget, delivering the specified
146 /// END message (if any)
147 ///
148 /// The stream is closed by sending a control message (`CtrlMsg::ClosePendingStream`)
149 /// to the reactor.
150 ///
151 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
152 ///
153 /// The StreamTarget will set the correct stream ID and pick the
154 /// right hop, but will not validate that the message is well-formed
155 /// or meaningful in context.
156 ///
157 /// Note that in many cases, the actual contents of an END message can leak unwanted
158 /// information. Please consider carefully before sending anything but an
159 /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
160 /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
161 ///
162 /// In addition to sending the END message, this function also ensures
163 /// the state of the stream map entry of this stream is updated
164 /// accordingly.
165 ///
166 /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
167 /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
168 /// function is for closing pending incoming streams (a stream is said to be pending if we have
169 /// received the message initiating the stream but have not responded to it yet).
170 ///
171 /// **NOTE**: This function should be called at most once per request.
172 /// Calling it twice is an error.
173 #[cfg(any(feature = "hs-service", feature = "relay"))]
174 pub(crate) fn close_pending(
175 &self,
176 message: crate::stream::CloseStreamBehavior,
177 ) -> Result<oneshot::Receiver<Result<()>>> {
178 match &self.tunnel {
179 Tunnel::Client(t) => {
180 cfg_if::cfg_if! {
181 if #[cfg(feature = "hs-service")] {
182 t.close_pending(self.stream_id, self.hop, message)
183 } else {
184 Err(tor_error::internal!("close_pending() called on client stream?!").into())
185 }
186 }
187 }
188 #[cfg(feature = "relay")]
189 Tunnel::Relay(t) => t.close_pending(self.stream_id, message),
190 }
191 }
192
193 /// Queue a "close" for the stream corresponding to this StreamTarget.
194 ///
195 /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
196 ///
197 /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
198 /// on this `StreamTarget`` or any clone of it.
199 /// The reactor *will* try to flush any already-send messages before it closes the stream.
200 ///
201 /// You don't need to call this method if the stream is closing because all of its StreamTargets
202 /// have been dropped.
203 pub(crate) fn close(&mut self) {
204 Pin::new(&mut self.tx).close_channel();
205 }
206
207 /// Called when a circuit-level protocol error has occurred and the
208 /// tunnel needs to shut down.
209 pub(crate) fn protocol_error(&mut self) {
210 match &self.tunnel {
211 Tunnel::Client(t) => t.terminate(),
212 #[cfg(feature = "relay")]
213 Tunnel::Relay(t) => t.terminate(),
214 }
215 }
216
217 /// Request to send a SENDME cell for this stream.
218 ///
219 /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
220 /// block or wait for a response from the circuit reactor.
221 /// An error is only returned if we are unable to send the request.
222 /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
223 /// this here and an error will not be returned.
224 pub(crate) fn send_sendme(&mut self) -> Result<()> {
225 match &self.tunnel {
226 Tunnel::Client(t) => t.send_sendme(self.stream_id, self.hop),
227 #[cfg(feature = "relay")]
228 Tunnel::Relay(t) => t.send_sendme(self.stream_id),
229 }
230 }
231
232 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
233 ///
234 /// Typically the circuit reactor would send this new rate in an XON message to the other end of
235 /// the stream.
236 /// But it may decide not to, and may discard this update.
237 /// For example the stream may have a large amount of buffered data, and the reactor may not
238 /// want to send an XON while the buffer is large.
239 ///
240 /// This sends a message to inform the circuit reactor of the new drain rate,
241 /// but it does not block or wait for a response from the reactor.
242 /// An error is only returned if we are unable to send the update.
243 pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
244 match &mut self.tunnel {
245 Tunnel::Client(t) => t.drain_rate_update(self.stream_id, self.hop, rate),
246 #[cfg(feature = "relay")]
247 Tunnel::Relay(t) => t.drain_rate_update(self.stream_id, rate),
248 }
249 }
250
251 /// Return a reference to the tunnel that this `StreamTarget` is using.
252 #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
253 pub(crate) fn tunnel(&self) -> &Tunnel {
254 &self.tunnel
255 }
256
257 /// Return the kind of relay cell in use on this `StreamTarget`.
258 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
259 self.relay_cell_format
260 }
261
262 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
263 pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
264 &self.rate_limit_stream
265 }
266}
267
268/// Return the stream ID of `msg`, if it has one.
269///
270/// Returns `Ok(None)` if `msg` is a meta cell.
271pub(crate) fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
272 let cmd = msg.cmd();
273 let streamid = msg.stream_id();
274 if !cmd.accepts_streamid_val(streamid) {
275 return Err(Error::CircProto(format!(
276 "Invalid stream ID {} for relay command {}",
277 sensitive(StreamId::get_or_zero(streamid)),
278 msg.cmd()
279 )));
280 }
281
282 Ok(streamid)
283}