Skip to main content

tor_proto/circuit/reactor/
forward.rs

1//! A circuit's view of the forward state of the circuit.
2
3use crate::circuit::UniqId;
4use crate::circuit::reactor::backward::BackwardReactorCmd;
5use crate::circuit::reactor::hop_mgr::HopMgr;
6use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
7use crate::circuit::reactor::stream::StreamMsg;
8use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
9use crate::congestion::sendme;
10use crate::stream::cmdcheck::AnyCmdChecker;
11use crate::stream::msg_streamid;
12use crate::util::err::ReactorError;
13use crate::{Error, HopNum, Result};
14
15#[cfg(any(feature = "hs-service", feature = "relay"))]
16use crate::stream::incoming::{
17    IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
18};
19
20// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21use crate::client::circuit::padding::PaddingController;
22
23use tor_cell::chancell::msg::AnyChanMsg;
24use tor_cell::relaycell::msg::{Sendme, SendmeTag};
25use tor_cell::relaycell::{
26    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
27};
28use tor_error::internal;
29use tor_linkspec::HasRelayIds;
30use tor_rtcompat::Runtime;
31
32use derive_deftly::Deftly;
33use futures::SinkExt;
34use futures::channel::mpsc;
35use futures::{FutureExt as _, StreamExt, select_biased};
36use tracing::debug;
37
38use std::result::Result as StdResult;
39
40use crate::circuit::CircuitRxReceiver;
41
42/// The forward circuit reactor.
43///
44/// See the [`reactor`](crate::circuit::reactor) module-level docs.
45///
46/// Shuts downs down if an error occurs, or if either the [`Reactor`](super::Reactor)
47/// or the [`BackwardReactor`](super::BackwardReactor) shuts down:
48///
49///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
50///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
51///   * if `BackwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
52///     which, in turn, causes the `ForwardReactor` to shut down as described above
53#[derive(Deftly)]
54#[derive_deftly(CircuitReactor)]
55#[deftly(reactor_name = "forward reactor")]
56#[deftly(run_inner_fn = "Self::run_once")]
57#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
58pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
59    /// A handle to the runtime.
60    runtime: R,
61    /// An identifier for logging about this reactor's circuit.
62    unique_id: UniqId,
63    /// Implementation-dependent part of the reactor.
64    ///
65    /// This enables us to customize the behavior of the reactor,
66    /// depending on whether we are a client or a relay.
67    inner: F,
68    /// Channel for receiving control commands.
69    command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
70    /// Channel for receiving control messages.
71    control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
72    /// The reading end of the inbound Tor channel.
73    ///
74    /// Yields cells moving from the client towards the exit, if we are a relay,
75    /// or cells moving towards *us*, if we are a client.
76    inbound_chan_rx: CircuitRxReceiver,
77    /// Sender for sending commands to the BackwardReactor.
78    ///
79    /// Used for sending:
80    ///
81    ///    * circuit-level SENDMEs received from the other endpoint
82    ///      (`[BackwardReactorCmd::HandleSendme]`)
83    ///    * circuit-level SENDMEs that need to be delivered to the other endpoint
84    ///      (using `[BackwardReactorCmd::SendRelayMsg]`)
85    ///
86    /// The receiver is in [`BackwardReactor`](super::BackwardReactor), which is responsible for
87    /// sending cell over the inbound channel.
88    backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
89    /// Hop manager, storing per-hop state, and handles to the stream reactors.
90    ///
91    /// Contains the `CircHopList`.
92    hop_mgr: HopMgr<R>,
93    /// An implementation-specific event stream.
94    ///
95    /// Polled from the main loop of the reactor.
96    /// Each event is passed to [`ForwardHandler::handle_event`].
97    circ_events: mpsc::Receiver<F::CircEvent>,
98    /// A padding controller to which padding-related events should be reported.
99    padding_ctrl: PaddingController,
100}
101
102/// A control command aimed at the generic forward reactor.
103pub(crate) enum CtrlCmd<C> {
104    /// Begin accepting streams on this circuit.
105    //
106    // TODO(DEDUP): this is very similar to its client-side counterpart,
107    // except the hop is a Option<HopNum> instead of a TargetHop.
108    #[cfg(any(feature = "hs-service", feature = "relay"))]
109    AwaitStreamRequests {
110        /// A channel for sending information about an incoming stream request.
111        incoming_sender: StreamReqSender,
112        /// A `CmdChecker` to keep track of which message types are acceptable.
113        cmd_checker: AnyCmdChecker,
114        /// Oneshot channel to notify on completion.
115        done: ReactorResultChannel<()>,
116        /// The hop that is allowed to create streams.
117        ///
118        /// Set to None if we are a relay wanting to accept stream requests.
119        hop: Option<HopNum>,
120        /// A filter used to check requests before passing them on.
121        filter: Box<dyn IncomingStreamRequestFilter>,
122    },
123    /// An implementation-dependent control command.
124    #[allow(unused)] // TODO(relay)
125    Custom(C),
126}
127
128/// A control message aimed at the generic forward reactor.
129pub(crate) enum CtrlMsg<M> {
130    /// An implementation-dependent control message.
131    #[allow(unused)] // TODO(relay)
132    Custom(M),
133}
134
135/// Trait for customizing the behavior of the forward reactor.
136///
137/// Used for plugging in the implementation-dependent (client vs relay)
138/// parts of the implementation into the generic one.
139pub(crate) trait ForwardHandler: ControlHandler {
140    /// Type that explains how to build an outgoing channel.
141    type BuildSpec: HasRelayIds;
142
143    /// The subclass of ChanMsg that can arrive on this type of circuit.
144    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
145
146    /// An opaque event type.
147    ///
148    /// The [`ForwardReactor`] polls an MPSC stream yielding `CircEvent`s from the main loop.
149    /// Each event is passed to [`Self::handle_event`] for handling.
150    type CircEvent;
151
152    /// Handle a non-SENDME RELAY message on this circuit with stream ID 0.
153    async fn handle_meta_msg<R: Runtime>(
154        &mut self,
155        runtime: &R,
156        early: bool,
157        hopnum: Option<HopNum>,
158        msg: UnparsedRelayMsg,
159        relay_cell_format: RelayCellFormat,
160    ) -> StdResult<(), ReactorError>;
161
162    /// Handle a forward (TODO terminology) cell.
163    ///
164    /// The cell is
165    ///   - moving from the client towards the exit, if we're a relay
166    ///   - moving from the guard towards us, if we're a client
167    ///
168    /// Returns an error if the cell should cause the reactor to shut down,
169    /// or a [`ForwardCellDisposition`] specifying how it should be handled.
170    ///
171    /// Returns `None` if the cell was handled internally by this handler.
172    async fn handle_forward_cell<R: Runtime>(
173        &mut self,
174        hop_mgr: &mut HopMgr<R>,
175        cell: Self::CircChanMsg,
176    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
177
178    /// Handle an implementation-specific circuit event.
179    ///
180    /// Returns a command for the backward reactor.
181    fn handle_event(
182        &mut self,
183        event: Self::CircEvent,
184    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
185
186    /// Wait until the outbound channel, if there is one, is ready to accept more cells.
187    ///
188    /// Resolves immediately if there is no outbound channel.
189    /// Blocks if there is a pending outbound channel.
190    async fn outbound_chan_ready(&mut self) -> Result<()>;
191}
192
193/// What action to take in response to a cell arriving on our inbound Tor channel.
194pub(crate) enum ForwardCellDisposition {
195    /// Handle a decoded RELAY or RELAY_EARLY cell in the [`ForwardReactor`].
196    HandleRecognizedRelay {
197        /// The decoded cell.
198        cell: RelayCellDecoderResult,
199        /// Whether this was a RELAY_EARLY.
200        early: bool,
201        /// The hop this cell was for.
202        hopnum: Option<HopNum>,
203        /// The SENDME tag.
204        tag: SendmeTag,
205    },
206}
207
208impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
209    /// Create a new [`ForwardReactor`].
210    #[allow(clippy::too_many_arguments)] // TODO
211    pub(super) fn new(
212        runtime: R,
213        unique_id: UniqId,
214        inner: F,
215        hop_mgr: HopMgr<R>,
216        inbound_chan_rx: CircuitRxReceiver,
217        control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
218        command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
219        backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
220        circ_events: mpsc::Receiver<F::CircEvent>,
221        padding_ctrl: PaddingController,
222    ) -> Self {
223        Self {
224            runtime,
225            unique_id,
226            inbound_chan_rx,
227            control_rx,
228            command_rx,
229            inner,
230            backward_reactor_tx,
231            hop_mgr,
232            circ_events,
233            padding_ctrl,
234        }
235    }
236
237    /// Helper for [`run`](Self::run).
238    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
239        let outbound_chan_ready = self.inner.outbound_chan_ready();
240
241        let inbound_chan_rx_fut = async {
242            // Avoid reading from the inbound_chan_rx Tor Channel if the outgoing sink is blocked
243            outbound_chan_ready.await?;
244            Ok(self.inbound_chan_rx.next().await)
245        };
246
247        select_biased! {
248            res = self.command_rx.next().fuse() => {
249                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
250                self.handle_cmd(cmd)
251            }
252            res = self.control_rx.next().fuse() => {
253                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
254                self.handle_msg(msg)
255            }
256            res = self.circ_events.next().fuse() => {
257                let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
258                if let Some(cmd) = self.inner.handle_event(ev)? {
259                    self.send_reactor_cmd(cmd).await?;
260                }
261
262                Ok(())
263            }
264            res = inbound_chan_rx_fut.fuse() => {
265                let cell = res.map_err(ReactorError::Err)?;
266                let Some(cell) = cell else {
267                    debug!(
268                        circ_id = %self.unique_id,
269                        "Backward channel has closed, shutting down forward relay reactor",
270                    );
271
272                    return Err(ReactorError::Shutdown);
273                };
274
275                let cell: F::CircChanMsg = cell.try_into()?;
276                let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
277                    return Ok(());
278                };
279
280                match disp {
281                    ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
282                        self.handle_relay_cell(cell, early, hopnum, tag).await
283                    }
284                }
285            },
286        }
287    }
288
289    /// Handle a control command.
290    fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
291        match cmd {
292            #[cfg(any(feature = "hs-service", feature = "relay"))]
293            CtrlCmd::AwaitStreamRequests {
294                incoming_sender,
295                cmd_checker,
296                done,
297                hop,
298                filter,
299            } => {
300                let handler = IncomingStreamRequestHandler {
301                    incoming_sender,
302                    cmd_checker,
303                    hop_num: hop,
304                    filter,
305                };
306
307                // Update the HopMgr with the
308                let ret = self.hop_mgr.set_incoming_handler(handler);
309                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
310                Ok(())
311            }
312            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
313        }
314    }
315
316    /// Handle a control message.
317    fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
318        match msg {
319            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
320        }
321    }
322
323    /// Note that we have received a RELAY cell.
324    ///
325    /// Updates the padding and CC state.
326    fn note_relay_cell_received(
327        &self,
328        hopnum: Option<HopNum>,
329        c_t_w: bool,
330    ) -> Result<(RelayCellFormat, bool)> {
331        let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
332        let hop = hops
333            .get_mut(hopnum)
334            .ok_or_else(|| internal!("msg from non-existent hop???"))?;
335
336        // Check whether we are allowed to receive more data for this circuit hop.
337        hop.inbound.decrement_cell_limit()?;
338
339        // Decrement the circuit sendme windows, and see if we need to
340        // send a sendme cell.
341        let send_circ_sendme = if c_t_w {
342            hop.ccontrol
343                .lock()
344                .expect("poisoned lock")
345                .note_data_received()?
346        } else {
347            false
348        };
349
350        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
351
352        Ok((relay_cell_format, send_circ_sendme))
353    }
354
355    /// Handle a RELAY cell.
356    ///
357    // TODO(DEDUP): very similar to Client::handle_relay_cell()
358    async fn handle_relay_cell(
359        &mut self,
360        decode_res: RelayCellDecoderResult,
361        early: bool,
362        hopnum: Option<HopNum>,
363        tag: SendmeTag,
364    ) -> StdResult<(), ReactorError> {
365        // For padding purposes, if we are a relay, we set the hopnum to 0
366        // TODO(relay): is this right?
367        let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
368        if decode_res.is_padding() {
369            self.padding_ctrl.decrypted_padding(hopnum_padding)?;
370        } else {
371            self.padding_ctrl.decrypted_data(hopnum_padding);
372        }
373
374        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
375        let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
376
377        // If we do need to send a circuit-level SENDME cell, do so.
378        if send_circ_sendme {
379            // This always sends a V1 (tagged) sendme cell, and thereby assumes
380            // that SendmeEmitMinVersion is no more than 1.  If the authorities
381            // every increase that parameter to a higher number, this will
382            // become incorrect.  (Higher numbers are not currently defined.)
383            let sendme = Sendme::from(tag);
384            let msg = AnyRelayMsgOuter::new(None, sendme.into());
385            let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
386
387            // NOTE: sending the SENDME to the backward reactor for handling
388            // might seem counterintuitive, given that we have access to
389            // the congestion control object right here (via hop_mgr).
390            //
391            // However, the forward reactor does not have access to the
392            // outbound_chan_tx part of the inbound (towards the client) Tor channel,
393            // and so it cannot handle the SENDME on its own
394            // (because it cannot obtain the congestion signals),
395            // so the SENDME needs to be handled in the backward reactor.
396            //
397            // NOTE: this will block if the backward reactor is not ready
398            // to send any more cells.
399            self.send_reactor_cmd(forward).await?;
400        }
401
402        let (mut msgs, incomplete) = decode_res.into_parts();
403        while let Some(msg) = msgs.next() {
404            match self
405                .handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
406                .await
407            {
408                Ok(()) => continue,
409                Err(e) => {
410                    for m in msgs {
411                        debug!(
412                            circ_id = %self.unique_id,
413                            "Ignoring relay msg received after triggering shutdown: {m:?}",
414                        );
415                    }
416                    if let Some(incomplete) = incomplete {
417                        debug!(
418                            circ_id = %self.unique_id,
419                            "Ignoring partial relay msg received after triggering shutdown: {:?}",
420                            incomplete,
421                        );
422                    }
423
424                    return Err(e);
425                }
426            }
427        }
428
429        Ok(())
430    }
431
432    /// Handle a single incoming RELAY message.
433    async fn handle_relay_msg(
434        &mut self,
435        early: bool,
436        hop: Option<HopNum>,
437        msg: UnparsedRelayMsg,
438        relay_cell_format: RelayCellFormat,
439        cell_counts_toward_windows: bool,
440    ) -> StdResult<(), ReactorError> {
441        // If this msg wants/refuses to have a Stream ID, does it
442        // have/not have one?
443        let streamid = msg_streamid(&msg)?;
444
445        // If this doesn't have a StreamId, it's a meta cell,
446        // not meant for a particular stream.
447        let Some(sid) = streamid else {
448            return self
449                .handle_meta_msg(early, hop, msg, relay_cell_format)
450                .await;
451        };
452
453        let msg = StreamMsg {
454            sid,
455            msg,
456            cell_counts_toward_windows,
457        };
458
459        // All messages on streams are handled in the stream reactor
460        // (because that's where the stream map is)
461        //
462        // Internally, this will spawn a StreamReactor for the target hop,
463        // if not already spawned.
464        self.hop_mgr.send(hop, msg).await
465    }
466
467    /// Handle a RELAY or RELAY_EARLY message on this circuit with stream ID 0.
468    async fn handle_meta_msg(
469        &mut self,
470        early: bool,
471        hopnum: Option<HopNum>,
472        msg: UnparsedRelayMsg,
473        relay_cell_format: RelayCellFormat,
474    ) -> StdResult<(), ReactorError> {
475        match msg.cmd() {
476            RelayCmd::SENDME => {
477                let sendme = msg
478                    .decode::<Sendme>()
479                    .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
480                    .into_msg();
481
482                let cmd = BackwardReactorCmd::HandleSendme {
483                    hop: hopnum,
484                    sendme,
485                };
486
487                self.send_reactor_cmd(cmd).await
488            }
489            _ => {
490                self.inner
491                    .handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
492                    .await
493            }
494        }
495    }
496
497    /// Send a command to the backward reactor.
498    ///
499    /// Blocks if the `backward_reactor_tx` channel is full, i.e. if the backward reactor
500    /// is not ready to send any more cells.
501    ///
502    /// Returns an error if the backward reactor has shut down.
503    async fn send_reactor_cmd(
504        &mut self,
505        forward: BackwardReactorCmd,
506    ) -> StdResult<(), ReactorError> {
507        self.backward_reactor_tx.send(forward).await.map_err(|_| {
508            // The other reactor has shut down
509            ReactorError::Shutdown
510        })
511    }
512}