Skip to main content

tor_proto/circuit/reactor/
stream.rs

1//! The stream reactor.
2
3use crate::circuit::circhop::CircHopOutbound;
4use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5use crate::circuit::{CircHopSyncView, UniqId};
6use crate::congestion::{CongestionControl, sendme};
7use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8use crate::stream::CloseStreamBehavior;
9use crate::stream::cmdcheck::StreamStatus;
10use crate::stream::flow_ctrl::state::StreamRateLimit;
11use crate::stream::queue::stream_queue;
12use crate::streammap;
13use crate::util::err::ReactorError;
14use crate::util::notify::NotifySender;
15use crate::{Error, HopNum};
16
17#[cfg(any(feature = "hs-service", feature = "relay"))]
18use crate::stream::incoming::{
19    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
20    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
21};
22
23use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
24use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
25use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
26use tor_error::into_internal;
27use tor_log_ratelim::log_ratelim;
28use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
29use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
30
31use derive_deftly::Deftly;
32use futures::SinkExt;
33use futures::channel::mpsc;
34use futures::{FutureExt as _, StreamExt as _, future, select_biased};
35use postage::watch;
36use tracing::debug;
37
38use std::pin::Pin;
39use std::result::Result as StdResult;
40use std::sync::{Arc, Mutex};
41use std::task::Poll;
42use std::time::Duration;
43
44/// Size of the buffer for communication between a StreamTarget and the reactor.
45///
46// TODO(tuning): figure out if this is a good size for this buffer
47const CIRCUIT_BUFFER_SIZE: usize = 128;
48
49/// Trait for customizing the behavior of the stream reactor.
50///
51/// Used for plugging in the implementation-dependent (client vs relay)
52/// parts of the implementation into the generic one.
53pub(crate) trait StreamHandler: Send + Sync + 'static {
54    /// Return the amount of time a newly closed stream
55    /// should be kept in the stream map for.
56    ///
57    /// This is the amount of time we are willing to wait for
58    /// an END ack before removing the half-stream from the map.
59    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
60}
61
62/// The stream reactor for a given hop.
63///
64/// Drives the application streams.
65///
66/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
67/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
68///
69/// The local streams are polled from the main loop, and any ready messages are sent
70/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
71///
72/// Shuts downs down if an error occurs, or if the sending end
73/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
74#[derive(Deftly)]
75#[derive_deftly(CircuitReactor)]
76#[deftly(reactor_name = "stream reactor")]
77#[deftly(run_inner_fn = "Self::run_once")]
78#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
79pub(crate) struct StreamReactor {
80    /// The hop this stream reactor is for.
81    ///
82    /// This is `None` for relays.
83    hopnum: Option<HopNum>,
84    /// The state of this circuit hop.
85    hop: CircHopOutbound,
86    /// The time provider.
87    time_provider: DynTimeProvider,
88    /// An identifier for logging about this reactor's circuit.
89    unique_id: UniqId,
90    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
91    ///
92    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
93    /// carrying Tor stream data to us.
94    ///
95    /// This serves a dual purpose:
96    ///
97    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
98    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
99    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
100    ///     shuts down, we will get EOS upon calling `.next()`)
101    cell_rx: mpsc::Receiver<StreamMsg>,
102    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
103    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
104    /// A handler for incoming streams.
105    ///
106    /// Set to `None` if incoming streams are not allowed on this circuit.
107    ///
108    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
109    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
110    /// The ability to update the handler after the reactor is launched is needed
111    /// for onion services, where the incoming stream request handler only gets installed
112    /// after the virtual hop is created.
113    #[cfg(any(feature = "hs-service", feature = "relay"))]
114    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
115    /// A handler for customizing the stream reactor behavior.
116    inner: Arc<dyn StreamHandler>,
117    /// Memory quota account
118    memquota: CircuitAccount,
119}
120
121#[allow(unused)] // TODO(relay)
122impl StreamReactor {
123    /// Create a new [`StreamReactor`].
124    #[allow(clippy::too_many_arguments)] // TODO
125    pub(crate) fn new<R: Runtime>(
126        runtime: R,
127        hopnum: Option<HopNum>,
128        hop: CircHopOutbound,
129        unique_id: UniqId,
130        cell_rx: mpsc::Receiver<StreamMsg>,
131        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
132        inner: Arc<dyn StreamHandler>,
133        #[cfg(any(feature = "hs-service", feature = "relay"))] //
134        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
135        memquota: CircuitAccount,
136    ) -> Self {
137        Self {
138            hopnum,
139            hop,
140            time_provider: DynTimeProvider::new(runtime),
141            unique_id,
142            #[cfg(any(feature = "hs-service", feature = "relay"))]
143            incoming,
144            cell_rx,
145            bwd_tx,
146            inner,
147            memquota,
148        }
149    }
150
151    /// Helper for [`run`](Self::run).
152    ///
153    /// Polls the stream map for messages
154    /// that need to be delivered to the other endpoint,
155    /// and the `cells_rx` MPSC stream for stream messages received
156    /// from the `ForwardReactor` that need to be delivered to the application streams.
157    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
158        use postage::prelude::{Sink as _, Stream as _};
159
160        // Garbage-collect all halfstreams that have expired.
161        //
162        // Note: this will iterate over the closed streams of this hop.
163        // If we think this will cause perf issues, one idea would be to make
164        // StreamMap::closed_streams into a min-heap, and add a branch to the
165        // select_biased! below to sleep until the first expiry is due
166        // (but my gut feeling is that iterating is cheaper)
167        self.hop
168            .stream_map()
169            .lock()
170            .expect("poisoned lock")
171            .remove_expired_halfstreams(self.time_provider.now());
172
173        let mut streams = Arc::clone(self.hop.stream_map());
174        let can_send = self
175            .hop
176            .ccontrol()
177            .lock()
178            .expect("poisoned lock")
179            .can_send();
180        let mut ready_streams_fut = future::poll_fn(move |cx| {
181            if !can_send {
182                // We can't send anything on this hop that counts towards SENDME windows.
183                //
184                // Note: this does not block outgoing flow-control messages:
185                //
186                //   * circuit SENDMEs are initiated by the forward reactor,
187                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
188                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
189                //     by sending a control message to the reactor
190                //     (TODO(relay): not yet implemented)
191                //   * XOFFs are sent in response to messages on streams
192                //     (i.e. RELAY messages with non-zero stream IDs).
193                //     These messages are delivered to us by the forward reactor
194                //     inside BackwardReactorCmd::HandleMsg
195                //   * XON will be initiated by StreamTarget::drain_rate_update(),
196                //     by sending a control message to the reactor
197                //     (TODO(relay): not yet implemented)\
198                return Poll::Pending;
199            }
200
201            let mut streams = streams.lock().expect("lock poisoned");
202            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
203                // No ready streams
204                //
205                // TODO(flushing): if there are no ready Tor streams, we might want to defer
206                // flushing until stream data becomes available (or until a timeout elapses).
207                // The deferred flushing approach should enable us to send
208                // more than one message at a time to the channel reactor.
209                return Poll::Pending;
210            };
211
212            if msg.is_none() {
213                // This means the local sender has been dropped,
214                // which presumably can only happen if an error occurs,
215                // or if the Tor stream ends. In both cases, we're going to
216                // want to send an END to the client to let them know,
217                // and to remove the stream from the stream map.
218                //
219                // TODO(relay): the local sender part is not implemented yet
220                return Poll::Ready(StreamEvent::Closed {
221                    sid,
222                    behav: CloseStreamBehavior::default(),
223                    reason: streammap::TerminateReason::StreamTargetClosed,
224                });
225            };
226
227            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
228
229            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
230        });
231
232        select_biased! {
233            res = self.cell_rx.next().fuse() => {
234                let Some(cmd) = res else {
235                    // The forward reactor has shut down
236                    return Err(ReactorError::Shutdown);
237                };
238
239                self.handle_reactor_cmd(cmd).await?;
240            }
241            event = ready_streams_fut.fuse() => {
242                self.handle_stream_event(event).await?;
243            }
244        }
245
246        Ok(())
247    }
248
249    /// Handle a stream message sent to us by the forward reactor.
250    ///
251    /// Delivers the message to its corresponding application stream.
252    async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
253        let StreamMsg {
254            sid,
255            msg,
256            cell_counts_toward_windows,
257        } = msg;
258
259        // We need to apply stream-level flow control *before* encoding the message.
260        // May optionally return a message that needs to be sent back to the client.
261        let bwd_msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
262
263        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
264        if let Some(bwd_msg) = bwd_msg {
265            // We might be out of capacity entirely; see if we are about to hit a limit.
266            //
267            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
268            // need a way to restore this limit, and similarly for about_to_send().
269            self.hop.decrement_cell_limit()?;
270
271            let c_t_w = sendme::cmd_counts_towards_windows(bwd_msg.cmd());
272
273            // We need to apply stream-level flow control *before* encoding the message
274            // (the BWD handles the encoding)
275            if c_t_w {
276                if let Some(stream_id) = bwd_msg.stream_id() {
277                    self.hop
278                        .about_to_send(self.unique_id, stream_id, bwd_msg.msg())?;
279                }
280            }
281
282            // NOTE: on the client side, we call note_data_sent()
283            // just before writing the cell to the channel.
284            // We can't do that here, because we're not the ones
285            // encoding the cell, so we don't have the SENDME tag
286            // which is needed for note_data_sent().
287            //
288            // Instead, we notify the CC algorithm in the BWD,
289            // right after we've finished sending the cell.
290
291            self.send_msg_to_bwd(bwd_msg).await?;
292        }
293
294        Ok(())
295    }
296
297    /// Handle a RELAY message that has a non-zero stream ID.
298    ///
299    /// A returned message is one that we need to send back to the client.
300    //
301    // TODO(relay): this is very similar to the client impl from
302    // Circuit::handle_in_order_relay_msg()
303    fn handle_msg(
304        &mut self,
305        streamid: StreamId,
306        msg: UnparsedRelayMsg,
307        cell_counts_toward_windows: bool,
308    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
309        let cmd = msg.cmd();
310        let possible_proto_violation_err = move |streamid: StreamId| {
311            Error::StreamProto(format!(
312                "Unexpected {cmd:?} message on unknown stream {streamid}"
313            ))
314        };
315        let now = self.time_provider.now();
316
317        // Check if any of our already-open streams want this message
318        let res = self.hop.handle_msg(
319            possible_proto_violation_err,
320            cell_counts_toward_windows,
321            streamid,
322            msg,
323            now,
324        )?;
325
326        // If it was an incoming stream request, we don't need to worry about
327        // sending an XOFF as there's no stream data within this message.
328        if let Some(msg) = res {
329            cfg_if::cfg_if! {
330                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
331                    return self.handle_incoming_stream_request(streamid, msg);
332                } else {
333                    return Err(
334                        tor_error::internal!(
335                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
336                            ).into()
337                    );
338                }
339            }
340        }
341
342        // We may want to send an XOFF if the incoming buffer is too large.
343        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
344            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
345            return Ok(Some(cell));
346        }
347
348        Ok(None)
349    }
350
351    /// A helper for handling incoming stream requests.
352    ///
353    /// Accepts the specified incoming stream request,
354    /// by adding a new entry to our stream map.
355    ///
356    /// Returns the cell we need to send back to the client,
357    /// if an error occurred and the stream cannot be opened.
358    ///
359    /// Returns None if everything went well
360    /// (the CONNECTED response only comes if the external
361    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
362    /// is able to actually establish the connection to the address
363    /// specified in the BEGIN).
364    ///
365    /// Any error returned from this function will shut down the reactor.
366    #[cfg(any(feature = "hs-service", feature = "relay"))]
367    fn handle_incoming_stream_request(
368        &mut self,
369        sid: StreamId,
370        msg: UnparsedRelayMsg,
371    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
372        let mut lock = self.incoming.lock().expect("poisoned lock");
373        let Some(handler) = lock.as_mut() else {
374            return Err(
375                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
376            );
377        };
378
379        if self.hopnum != handler.hop_num {
380            let expected_hopnum = match handler.hop_num {
381                Some(hopnum) => hopnum.display().to_string(),
382                None => "client".to_string(),
383            };
384
385            let actual_hopnum = match self.hopnum {
386                Some(hopnum) => hopnum.display().to_string(),
387                None => "None".to_string(),
388            };
389
390            return Err(Error::CircProto(format!(
391                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
392                expected_hopnum,
393                msg.cmd(),
394                actual_hopnum,
395            ))
396            .into());
397        }
398
399        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
400
401        if message_closes_stream {
402            self.hop
403                .stream_map()
404                .lock()
405                .expect("poisoned lock")
406                .ending_msg_received(sid)?;
407
408            return Ok(None);
409        }
410
411        let req = parse_incoming_stream_req(msg)?;
412        let view = CircHopSyncView::new(&self.hop);
413
414        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
415            // We can't honor this request, so we bail by sending an END.
416            return Ok(Some(reject));
417        };
418
419        let memquota =
420            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
421
422        let (sender, receiver) = stream_queue(
423            #[cfg(not(feature = "flowctl-cc"))]
424            crate::stream::STREAM_READER_BUFFER,
425            &memquota,
426            &self.time_provider,
427        )
428        .map_err(|e| ReactorError::Err(e.into()))?;
429
430        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
431            .new_mq(self.time_provider.clone(), memquota.as_raw_account())
432            .map_err(|e| ReactorError::Err(e.into()))?;
433
434        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
435
436        // A channel for the reactor to request a new drain rate from the reader.
437        // Typically this notification will be sent after an XOFF is sent so that the reader can
438        // send us a new drain rate when the stream data queue becomes empty.
439        let mut drain_rate_request_tx = NotifySender::new_typed();
440        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
441
442        let cmd_checker = InboundDataCmdChecker::new_connected();
443        self.hop.add_ent_with_id(
444            sender,
445            msg_rx,
446            rate_limit_tx,
447            drain_rate_request_tx,
448            sid,
449            cmd_checker,
450        )?;
451
452        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
453            req,
454            stream_id: sid,
455            hop: None,
456            msg_tx,
457            receiver,
458            rate_limit_stream: rate_limit_rx,
459            drain_rate_request_stream: drain_rate_request_rx,
460            memquota,
461            relay_cell_format: self.hop.relay_cell_format(),
462        });
463
464        log_ratelim!("Delivering message to incoming stream handler"; outcome);
465
466        if let Err(e) = outcome {
467            if e.is_full() {
468                // The IncomingStreamRequestHandler's stream is full; it isn't
469                // handling requests fast enough. So instead, we reply with an
470                // END cell.
471                let end_msg = AnyRelayMsgOuter::new(
472                    Some(sid),
473                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
474                );
475
476                return Ok(Some(end_msg));
477            } else if e.is_disconnected() {
478                // The IncomingStreamRequestHandler's stream has been dropped.
479                // In the Tor protocol as it stands, this always means that the
480                // circuit itself is out-of-use and should be closed.
481                //
482                // Note that we will _not_ reach this point immediately after
483                // the IncomingStreamRequestHandler is dropped; we won't hit it
484                // until we next get an incoming request.  Thus, if we later
485                // want to add early detection for a dropped
486                // IncomingStreamRequestHandler, we need to do it elsewhere, in
487                // a different way.
488                debug!(
489                    circ_id = %self.unique_id,
490                    "Incoming stream request receiver dropped",
491                );
492                // This will _cause_ the circuit to get closed.
493                return Err(ReactorError::Err(Error::CircuitClosed));
494            } else {
495                // There are no errors like this with the current design of
496                // futures::mpsc, but we shouldn't just ignore the possibility
497                // that they'll be added later.
498                return Err(
499                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
500                );
501            }
502        }
503
504        Ok(None)
505    }
506
507    /// Check if we should reject this incoming stream request or not.
508    ///
509    /// Returns a cell we need to send back to the client if we must reject the request,
510    /// or `None` if we are allowed to accept it.
511    ///`
512    /// Any error returned from this function will shut down the reactor.
513    #[cfg(any(feature = "hs-service", feature = "relay"))]
514    fn should_reject_incoming<'a>(
515        handler: &mut IncomingStreamRequestHandler,
516        sid: StreamId,
517        request: &IncomingStreamRequest,
518        view: &CircHopSyncView<'a>,
519    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
520        use IncomingStreamRequestDisposition::*;
521
522        let ctx = IncomingStreamRequestContext { request };
523
524        // Run the externally provided filter to check if we should
525        // open the stream or not.
526        match handler.filter.as_mut().disposition(&ctx, view)? {
527            Accept => {
528                // All is well, we can accept the stream request
529                Ok(None)
530            }
531            CloseCircuit => Err(ReactorError::Shutdown),
532            RejectRequest(end) => {
533                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
534
535                Ok(Some(end_msg))
536            }
537        }
538    }
539
540    /// Handle a [`StreamEvent`].
541    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
542        match event {
543            StreamEvent::Closed { sid, behav, reason } => {
544                let timeout = self.inner.halfstream_expiry(&self.hop);
545                let expire_at = self.time_provider.now() + timeout;
546                let res =
547                    self.hop
548                        .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
549                let Some(msg) = res else {
550                    // We may not need to send anything at all...
551                    return Ok(());
552                };
553
554                self.send_msg_to_bwd(msg.cell).await
555            }
556            StreamEvent::ReadyMsg { sid, msg } => {
557                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
558                    .await
559            }
560        }
561    }
562
563    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
564    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
565        let msg = ReadyStreamMsg {
566            hop: self.hopnum,
567            relay_cell_format: self.hop.relay_cell_format(),
568            ccontrol: Arc::clone(self.hop.ccontrol()),
569            msg,
570        };
571
572        self.bwd_tx
573            .send(msg)
574            .await
575            .map_err(|_| ReactorError::Shutdown)?;
576
577        Ok(())
578    }
579}
580
581/// A Tor stream-related event.
582enum StreamEvent {
583    /// A stream was closed.
584    ///
585    /// It needs to be removed from the reactor's stream map.
586    Closed {
587        /// The ID of the stream to close.
588        sid: StreamId,
589        /// The stream-closing behavior.
590        behav: CloseStreamBehavior,
591        /// The reason for closing the stream.
592        reason: streammap::TerminateReason,
593    },
594    /// A stream has a ready message.
595    ReadyMsg {
596        /// The ID of the stream to close.
597        sid: StreamId,
598        /// The message.
599        msg: AnyRelayMsg,
600    },
601}
602
603/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
604/// to an [`IncomingStreamRequest`]
605#[cfg(any(feature = "hs-service", feature = "relay"))]
606fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
607    // TODO(relay): support other stream-initiating messages, not just BEGIN
608    let begin = msg
609        .decode::<Begin>()
610        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
611        .into_msg();
612
613    Ok(IncomingStreamRequest::Begin(begin))
614}
615
616/// A stream message to be sent to the backward reactor for delivery.
617pub(crate) struct ReadyStreamMsg {
618    /// The hop number, or `None` if we are a relay.
619    pub(crate) hop: Option<HopNum>,
620    /// The message to send.
621    pub(crate) msg: AnyRelayMsgOuter,
622    /// The cell format used with the hop the message should be sent to.
623    pub(crate) relay_cell_format: RelayCellFormat,
624    /// The CC object to use.
625    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
626}
627
628/// Stream data received from the other endpoint
629/// that needs to be handled by [`StreamReactor`].
630pub(crate) struct StreamMsg {
631    /// The ID of the stream this message is for.
632    pub(crate) sid: StreamId,
633    /// The message.
634    pub(crate) msg: UnparsedRelayMsg,
635    /// Whether the cell this message came from counts towards flow-control windows.
636    pub(crate) cell_counts_toward_windows: bool,
637}