tor_proto/circuit/reactor/stream.rs
1//! The stream reactor.
2
3use crate::circuit::circhop::CircHopOutbound;
4use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5use crate::circuit::{CircHopSyncView, UniqId};
6use crate::congestion::{CongestionControl, sendme};
7use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8use crate::stream::CloseStreamBehavior;
9use crate::stream::cmdcheck::StreamStatus;
10use crate::stream::flow_ctrl::state::StreamRateLimit;
11use crate::stream::queue::stream_queue;
12use crate::streammap;
13use crate::util::err::ReactorError;
14use crate::util::notify::NotifySender;
15use crate::{Error, HopNum};
16
17#[cfg(any(feature = "hs-service", feature = "relay"))]
18use crate::stream::incoming::{
19 InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
20 IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
21};
22
23use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
24use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
25use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
26use tor_error::into_internal;
27use tor_log_ratelim::log_ratelim;
28use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
29use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
30
31use derive_deftly::Deftly;
32use futures::SinkExt;
33use futures::channel::mpsc;
34use futures::{FutureExt as _, StreamExt as _, future, select_biased};
35use postage::watch;
36use tracing::debug;
37
38use std::pin::Pin;
39use std::result::Result as StdResult;
40use std::sync::{Arc, Mutex};
41use std::task::Poll;
42use std::time::Duration;
43
44/// Size of the buffer for communication between a StreamTarget and the reactor.
45///
46// TODO(tuning): figure out if this is a good size for this buffer
47const CIRCUIT_BUFFER_SIZE: usize = 128;
48
49/// Trait for customizing the behavior of the stream reactor.
50///
51/// Used for plugging in the implementation-dependent (client vs relay)
52/// parts of the implementation into the generic one.
53pub(crate) trait StreamHandler: Send + Sync + 'static {
54 /// Return the amount of time a newly closed stream
55 /// should be kept in the stream map for.
56 ///
57 /// This is the amount of time we are willing to wait for
58 /// an END ack before removing the half-stream from the map.
59 fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
60}
61
62/// The stream reactor for a given hop.
63///
64/// Drives the application streams.
65///
66/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
67/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
68///
69/// The local streams are polled from the main loop, and any ready messages are sent
70/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
71///
72/// Shuts downs down if an error occurs, or if the sending end
73/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
74#[derive(Deftly)]
75#[derive_deftly(CircuitReactor)]
76#[deftly(reactor_name = "stream reactor")]
77#[deftly(run_inner_fn = "Self::run_once")]
78#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
79pub(crate) struct StreamReactor {
80 /// The hop this stream reactor is for.
81 ///
82 /// This is `None` for relays.
83 hopnum: Option<HopNum>,
84 /// The state of this circuit hop.
85 hop: CircHopOutbound,
86 /// The time provider.
87 time_provider: DynTimeProvider,
88 /// An identifier for logging about this reactor's circuit.
89 unique_id: UniqId,
90 /// Receiver for Tor stream data that need to be delivered to a Tor stream.
91 ///
92 /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
93 /// carrying Tor stream data to us.
94 ///
95 /// This serves a dual purpose:
96 ///
97 /// * it enables the `ForwardReactor` to deliver Tor stream data received from the client
98 /// * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
99 /// we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
100 /// shuts down, we will get EOS upon calling `.next()`)
101 cell_rx: mpsc::Receiver<StreamMsg>,
102 /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
103 bwd_tx: mpsc::Sender<ReadyStreamMsg>,
104 /// A handler for incoming streams.
105 ///
106 /// Set to `None` if incoming streams are not allowed on this circuit.
107 ///
108 /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
109 /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
110 /// The ability to update the handler after the reactor is launched is needed
111 /// for onion services, where the incoming stream request handler only gets installed
112 /// after the virtual hop is created.
113 #[cfg(any(feature = "hs-service", feature = "relay"))]
114 incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
115 /// A handler for customizing the stream reactor behavior.
116 inner: Arc<dyn StreamHandler>,
117 /// Memory quota account
118 memquota: CircuitAccount,
119}
120
121#[allow(unused)] // TODO(relay)
122impl StreamReactor {
123 /// Create a new [`StreamReactor`].
124 #[allow(clippy::too_many_arguments)] // TODO
125 pub(crate) fn new<R: Runtime>(
126 runtime: R,
127 hopnum: Option<HopNum>,
128 hop: CircHopOutbound,
129 unique_id: UniqId,
130 cell_rx: mpsc::Receiver<StreamMsg>,
131 bwd_tx: mpsc::Sender<ReadyStreamMsg>,
132 inner: Arc<dyn StreamHandler>,
133 #[cfg(any(feature = "hs-service", feature = "relay"))] //
134 incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
135 memquota: CircuitAccount,
136 ) -> Self {
137 Self {
138 hopnum,
139 hop,
140 time_provider: DynTimeProvider::new(runtime),
141 unique_id,
142 #[cfg(any(feature = "hs-service", feature = "relay"))]
143 incoming,
144 cell_rx,
145 bwd_tx,
146 inner,
147 memquota,
148 }
149 }
150
151 /// Helper for [`run`](Self::run).
152 ///
153 /// Polls the stream map for messages
154 /// that need to be delivered to the other endpoint,
155 /// and the `cells_rx` MPSC stream for stream messages received
156 /// from the `ForwardReactor` that need to be delivered to the application streams.
157 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
158 use postage::prelude::{Sink as _, Stream as _};
159
160 // Garbage-collect all halfstreams that have expired.
161 //
162 // Note: this will iterate over the closed streams of this hop.
163 // If we think this will cause perf issues, one idea would be to make
164 // StreamMap::closed_streams into a min-heap, and add a branch to the
165 // select_biased! below to sleep until the first expiry is due
166 // (but my gut feeling is that iterating is cheaper)
167 self.hop
168 .stream_map()
169 .lock()
170 .expect("poisoned lock")
171 .remove_expired_halfstreams(self.time_provider.now());
172
173 let mut streams = Arc::clone(self.hop.stream_map());
174 let can_send = self
175 .hop
176 .ccontrol()
177 .lock()
178 .expect("poisoned lock")
179 .can_send();
180 let mut ready_streams_fut = future::poll_fn(move |cx| {
181 if !can_send {
182 // We can't send anything on this hop that counts towards SENDME windows.
183 //
184 // Note: this does not block outgoing flow-control messages:
185 //
186 // * circuit SENDMEs are initiated by the forward reactor,
187 // by sending a BackwardReactorCmd::SendRelayMsg to BWD,
188 // * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
189 // by sending a control message to the reactor
190 // (TODO(relay): not yet implemented)
191 // * XOFFs are sent in response to messages on streams
192 // (i.e. RELAY messages with non-zero stream IDs).
193 // These messages are delivered to us by the forward reactor
194 // inside BackwardReactorCmd::HandleMsg
195 // * XON will be initiated by StreamTarget::drain_rate_update(),
196 // by sending a control message to the reactor
197 // (TODO(relay): not yet implemented)\
198 return Poll::Pending;
199 }
200
201 let mut streams = streams.lock().expect("lock poisoned");
202 let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
203 // No ready streams
204 //
205 // TODO(flushing): if there are no ready Tor streams, we might want to defer
206 // flushing until stream data becomes available (or until a timeout elapses).
207 // The deferred flushing approach should enable us to send
208 // more than one message at a time to the channel reactor.
209 return Poll::Pending;
210 };
211
212 if msg.is_none() {
213 // This means the local sender has been dropped,
214 // which presumably can only happen if an error occurs,
215 // or if the Tor stream ends. In both cases, we're going to
216 // want to send an END to the client to let them know,
217 // and to remove the stream from the stream map.
218 //
219 // TODO(relay): the local sender part is not implemented yet
220 return Poll::Ready(StreamEvent::Closed {
221 sid,
222 behav: CloseStreamBehavior::default(),
223 reason: streammap::TerminateReason::StreamTargetClosed,
224 });
225 };
226
227 let msg = streams.take_ready_msg(sid).expect("msg disappeared");
228
229 Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
230 });
231
232 select_biased! {
233 res = self.cell_rx.next().fuse() => {
234 let Some(cmd) = res else {
235 // The forward reactor has shut down
236 return Err(ReactorError::Shutdown);
237 };
238
239 self.handle_reactor_cmd(cmd).await?;
240 }
241 event = ready_streams_fut.fuse() => {
242 self.handle_stream_event(event).await?;
243 }
244 }
245
246 Ok(())
247 }
248
249 /// Handle a stream message sent to us by the forward reactor.
250 ///
251 /// Delivers the message to its corresponding application stream.
252 async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
253 let StreamMsg {
254 sid,
255 msg,
256 cell_counts_toward_windows,
257 } = msg;
258
259 // We need to apply stream-level flow control *before* encoding the message.
260 // May optionally return a message that needs to be sent back to the client.
261 let bwd_msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
262
263 // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
264 if let Some(bwd_msg) = bwd_msg {
265 // We might be out of capacity entirely; see if we are about to hit a limit.
266 //
267 // TODO: If we ever add a notion of _recoverable_ errors below, we'll
268 // need a way to restore this limit, and similarly for about_to_send().
269 self.hop.decrement_cell_limit()?;
270
271 let c_t_w = sendme::cmd_counts_towards_windows(bwd_msg.cmd());
272
273 // We need to apply stream-level flow control *before* encoding the message
274 // (the BWD handles the encoding)
275 if c_t_w {
276 if let Some(stream_id) = bwd_msg.stream_id() {
277 self.hop
278 .about_to_send(self.unique_id, stream_id, bwd_msg.msg())?;
279 }
280 }
281
282 // NOTE: on the client side, we call note_data_sent()
283 // just before writing the cell to the channel.
284 // We can't do that here, because we're not the ones
285 // encoding the cell, so we don't have the SENDME tag
286 // which is needed for note_data_sent().
287 //
288 // Instead, we notify the CC algorithm in the BWD,
289 // right after we've finished sending the cell.
290
291 self.send_msg_to_bwd(bwd_msg).await?;
292 }
293
294 Ok(())
295 }
296
297 /// Handle a RELAY message that has a non-zero stream ID.
298 ///
299 /// A returned message is one that we need to send back to the client.
300 //
301 // TODO(relay): this is very similar to the client impl from
302 // Circuit::handle_in_order_relay_msg()
303 fn handle_msg(
304 &mut self,
305 streamid: StreamId,
306 msg: UnparsedRelayMsg,
307 cell_counts_toward_windows: bool,
308 ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
309 let cmd = msg.cmd();
310 let possible_proto_violation_err = move |streamid: StreamId| {
311 Error::StreamProto(format!(
312 "Unexpected {cmd:?} message on unknown stream {streamid}"
313 ))
314 };
315 let now = self.time_provider.now();
316
317 // Check if any of our already-open streams want this message
318 let res = self.hop.handle_msg(
319 possible_proto_violation_err,
320 cell_counts_toward_windows,
321 streamid,
322 msg,
323 now,
324 )?;
325
326 // If it was an incoming stream request, we don't need to worry about
327 // sending an XOFF as there's no stream data within this message.
328 if let Some(msg) = res {
329 cfg_if::cfg_if! {
330 if #[cfg(any(feature = "hs-service", feature = "relay"))] {
331 return self.handle_incoming_stream_request(streamid, msg);
332 } else {
333 return Err(
334 tor_error::internal!(
335 "incoming stream not rejected, but relay and hs-service features are disabled?!"
336 ).into()
337 );
338 }
339 }
340 }
341
342 // We may want to send an XOFF if the incoming buffer is too large.
343 if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
344 let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
345 return Ok(Some(cell));
346 }
347
348 Ok(None)
349 }
350
351 /// A helper for handling incoming stream requests.
352 ///
353 /// Accepts the specified incoming stream request,
354 /// by adding a new entry to our stream map.
355 ///
356 /// Returns the cell we need to send back to the client,
357 /// if an error occurred and the stream cannot be opened.
358 ///
359 /// Returns None if everything went well
360 /// (the CONNECTED response only comes if the external
361 /// consumer of our [Stream](futures::Stream) of incoming Tor streams
362 /// is able to actually establish the connection to the address
363 /// specified in the BEGIN).
364 ///
365 /// Any error returned from this function will shut down the reactor.
366 #[cfg(any(feature = "hs-service", feature = "relay"))]
367 fn handle_incoming_stream_request(
368 &mut self,
369 sid: StreamId,
370 msg: UnparsedRelayMsg,
371 ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
372 let mut lock = self.incoming.lock().expect("poisoned lock");
373 let Some(handler) = lock.as_mut() else {
374 return Err(
375 Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
376 );
377 };
378
379 if self.hopnum != handler.hop_num {
380 let expected_hopnum = match handler.hop_num {
381 Some(hopnum) => hopnum.display().to_string(),
382 None => "client".to_string(),
383 };
384
385 let actual_hopnum = match self.hopnum {
386 Some(hopnum) => hopnum.display().to_string(),
387 None => "None".to_string(),
388 };
389
390 return Err(Error::CircProto(format!(
391 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
392 expected_hopnum,
393 msg.cmd(),
394 actual_hopnum,
395 ))
396 .into());
397 }
398
399 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
400
401 if message_closes_stream {
402 self.hop
403 .stream_map()
404 .lock()
405 .expect("poisoned lock")
406 .ending_msg_received(sid)?;
407
408 return Ok(None);
409 }
410
411 let req = parse_incoming_stream_req(msg)?;
412 let view = CircHopSyncView::new(&self.hop);
413
414 if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
415 // We can't honor this request, so we bail by sending an END.
416 return Ok(Some(reject));
417 };
418
419 let memquota =
420 StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
421
422 let (sender, receiver) = stream_queue(
423 #[cfg(not(feature = "flowctl-cc"))]
424 crate::stream::STREAM_READER_BUFFER,
425 &memquota,
426 &self.time_provider,
427 )
428 .map_err(|e| ReactorError::Err(e.into()))?;
429
430 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
431 .new_mq(self.time_provider.clone(), memquota.as_raw_account())
432 .map_err(|e| ReactorError::Err(e.into()))?;
433
434 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
435
436 // A channel for the reactor to request a new drain rate from the reader.
437 // Typically this notification will be sent after an XOFF is sent so that the reader can
438 // send us a new drain rate when the stream data queue becomes empty.
439 let mut drain_rate_request_tx = NotifySender::new_typed();
440 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
441
442 let cmd_checker = InboundDataCmdChecker::new_connected();
443 self.hop.add_ent_with_id(
444 sender,
445 msg_rx,
446 rate_limit_tx,
447 drain_rate_request_tx,
448 sid,
449 cmd_checker,
450 )?;
451
452 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
453 req,
454 stream_id: sid,
455 hop: None,
456 msg_tx,
457 receiver,
458 rate_limit_stream: rate_limit_rx,
459 drain_rate_request_stream: drain_rate_request_rx,
460 memquota,
461 relay_cell_format: self.hop.relay_cell_format(),
462 });
463
464 log_ratelim!("Delivering message to incoming stream handler"; outcome);
465
466 if let Err(e) = outcome {
467 if e.is_full() {
468 // The IncomingStreamRequestHandler's stream is full; it isn't
469 // handling requests fast enough. So instead, we reply with an
470 // END cell.
471 let end_msg = AnyRelayMsgOuter::new(
472 Some(sid),
473 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
474 );
475
476 return Ok(Some(end_msg));
477 } else if e.is_disconnected() {
478 // The IncomingStreamRequestHandler's stream has been dropped.
479 // In the Tor protocol as it stands, this always means that the
480 // circuit itself is out-of-use and should be closed.
481 //
482 // Note that we will _not_ reach this point immediately after
483 // the IncomingStreamRequestHandler is dropped; we won't hit it
484 // until we next get an incoming request. Thus, if we later
485 // want to add early detection for a dropped
486 // IncomingStreamRequestHandler, we need to do it elsewhere, in
487 // a different way.
488 debug!(
489 circ_id = %self.unique_id,
490 "Incoming stream request receiver dropped",
491 );
492 // This will _cause_ the circuit to get closed.
493 return Err(ReactorError::Err(Error::CircuitClosed));
494 } else {
495 // There are no errors like this with the current design of
496 // futures::mpsc, but we shouldn't just ignore the possibility
497 // that they'll be added later.
498 return Err(
499 Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
500 );
501 }
502 }
503
504 Ok(None)
505 }
506
507 /// Check if we should reject this incoming stream request or not.
508 ///
509 /// Returns a cell we need to send back to the client if we must reject the request,
510 /// or `None` if we are allowed to accept it.
511 ///`
512 /// Any error returned from this function will shut down the reactor.
513 #[cfg(any(feature = "hs-service", feature = "relay"))]
514 fn should_reject_incoming<'a>(
515 handler: &mut IncomingStreamRequestHandler,
516 sid: StreamId,
517 request: &IncomingStreamRequest,
518 view: &CircHopSyncView<'a>,
519 ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
520 use IncomingStreamRequestDisposition::*;
521
522 let ctx = IncomingStreamRequestContext { request };
523
524 // Run the externally provided filter to check if we should
525 // open the stream or not.
526 match handler.filter.as_mut().disposition(&ctx, view)? {
527 Accept => {
528 // All is well, we can accept the stream request
529 Ok(None)
530 }
531 CloseCircuit => Err(ReactorError::Shutdown),
532 RejectRequest(end) => {
533 let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
534
535 Ok(Some(end_msg))
536 }
537 }
538 }
539
540 /// Handle a [`StreamEvent`].
541 async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
542 match event {
543 StreamEvent::Closed { sid, behav, reason } => {
544 let timeout = self.inner.halfstream_expiry(&self.hop);
545 let expire_at = self.time_provider.now() + timeout;
546 let res =
547 self.hop
548 .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
549 let Some(msg) = res else {
550 // We may not need to send anything at all...
551 return Ok(());
552 };
553
554 self.send_msg_to_bwd(msg.cell).await
555 }
556 StreamEvent::ReadyMsg { sid, msg } => {
557 self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
558 .await
559 }
560 }
561 }
562
563 /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
564 async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
565 let msg = ReadyStreamMsg {
566 hop: self.hopnum,
567 relay_cell_format: self.hop.relay_cell_format(),
568 ccontrol: Arc::clone(self.hop.ccontrol()),
569 msg,
570 };
571
572 self.bwd_tx
573 .send(msg)
574 .await
575 .map_err(|_| ReactorError::Shutdown)?;
576
577 Ok(())
578 }
579}
580
581/// A Tor stream-related event.
582enum StreamEvent {
583 /// A stream was closed.
584 ///
585 /// It needs to be removed from the reactor's stream map.
586 Closed {
587 /// The ID of the stream to close.
588 sid: StreamId,
589 /// The stream-closing behavior.
590 behav: CloseStreamBehavior,
591 /// The reason for closing the stream.
592 reason: streammap::TerminateReason,
593 },
594 /// A stream has a ready message.
595 ReadyMsg {
596 /// The ID of the stream to close.
597 sid: StreamId,
598 /// The message.
599 msg: AnyRelayMsg,
600 },
601}
602
603/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
604/// to an [`IncomingStreamRequest`]
605#[cfg(any(feature = "hs-service", feature = "relay"))]
606fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
607 // TODO(relay): support other stream-initiating messages, not just BEGIN
608 let begin = msg
609 .decode::<Begin>()
610 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
611 .into_msg();
612
613 Ok(IncomingStreamRequest::Begin(begin))
614}
615
616/// A stream message to be sent to the backward reactor for delivery.
617pub(crate) struct ReadyStreamMsg {
618 /// The hop number, or `None` if we are a relay.
619 pub(crate) hop: Option<HopNum>,
620 /// The message to send.
621 pub(crate) msg: AnyRelayMsgOuter,
622 /// The cell format used with the hop the message should be sent to.
623 pub(crate) relay_cell_format: RelayCellFormat,
624 /// The CC object to use.
625 pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
626}
627
628/// Stream data received from the other endpoint
629/// that needs to be handled by [`StreamReactor`].
630pub(crate) struct StreamMsg {
631 /// The ID of the stream this message is for.
632 pub(crate) sid: StreamId,
633 /// The message.
634 pub(crate) msg: UnparsedRelayMsg,
635 /// Whether the cell this message came from counts towards flow-control windows.
636 pub(crate) cell_counts_toward_windows: bool,
637}