tor_proto/circuit/reactor/forward.rs
1//! A circuit's view of the forward state of the circuit.
2
3use crate::circuit::UniqId;
4use crate::circuit::reactor::backward::BackwardReactorCmd;
5use crate::circuit::reactor::hop_mgr::HopMgr;
6use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
7use crate::circuit::reactor::stream::StreamMsg;
8use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
9use crate::congestion::sendme;
10use crate::stream::cmdcheck::AnyCmdChecker;
11use crate::stream::msg_streamid;
12use crate::util::err::ReactorError;
13use crate::{Error, HopNum, Result};
14
15#[cfg(any(feature = "hs-service", feature = "relay"))]
16use crate::stream::incoming::{
17 IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
18};
19
20// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21use crate::client::circuit::padding::PaddingController;
22
23use tor_cell::chancell::msg::AnyChanMsg;
24use tor_cell::relaycell::msg::{Sendme, SendmeTag};
25use tor_cell::relaycell::{
26 AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
27};
28use tor_error::internal;
29use tor_linkspec::HasRelayIds;
30use tor_rtcompat::Runtime;
31
32use derive_deftly::Deftly;
33use futures::SinkExt;
34use futures::channel::mpsc;
35use futures::{FutureExt as _, StreamExt, select_biased};
36use tracing::debug;
37
38use std::result::Result as StdResult;
39
40use crate::circuit::CircuitRxReceiver;
41
42/// The forward circuit reactor.
43///
44/// See the [`reactor`](crate::circuit::reactor) module-level docs.
45///
46/// Shuts downs down if an error occurs, or if either the [`Reactor`](super::Reactor)
47/// or the [`BackwardReactor`](super::BackwardReactor) shuts down:
48///
49/// * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
50/// (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
51/// * if `BackwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
52/// which, in turn, causes the `ForwardReactor` to shut down as described above
53#[derive(Deftly)]
54#[derive_deftly(CircuitReactor)]
55#[deftly(reactor_name = "forward reactor")]
56#[deftly(run_inner_fn = "Self::run_once")]
57#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
58pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
59 /// A handle to the runtime.
60 runtime: R,
61 /// An identifier for logging about this reactor's circuit.
62 unique_id: UniqId,
63 /// Implementation-dependent part of the reactor.
64 ///
65 /// This enables us to customize the behavior of the reactor,
66 /// depending on whether we are a client or a relay.
67 inner: F,
68 /// Channel for receiving control commands.
69 command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
70 /// Channel for receiving control messages.
71 control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
72 /// The reading end of the inbound Tor channel.
73 ///
74 /// Yields cells moving from the client towards the exit, if we are a relay,
75 /// or cells moving towards *us*, if we are a client.
76 inbound_chan_rx: CircuitRxReceiver,
77 /// Sender for sending commands to the BackwardReactor.
78 ///
79 /// Used for sending:
80 ///
81 /// * circuit-level SENDMEs received from the other endpoint
82 /// (`[BackwardReactorCmd::HandleSendme]`)
83 /// * circuit-level SENDMEs that need to be delivered to the other endpoint
84 /// (using `[BackwardReactorCmd::SendRelayMsg]`)
85 ///
86 /// The receiver is in [`BackwardReactor`](super::BackwardReactor), which is responsible for
87 /// sending cell over the inbound channel.
88 backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
89 /// Hop manager, storing per-hop state, and handles to the stream reactors.
90 ///
91 /// Contains the `CircHopList`.
92 hop_mgr: HopMgr<R>,
93 /// An implementation-specific event stream.
94 ///
95 /// Polled from the main loop of the reactor.
96 /// Each event is passed to [`ForwardHandler::handle_event`].
97 circ_events: mpsc::Receiver<F::CircEvent>,
98 /// A padding controller to which padding-related events should be reported.
99 padding_ctrl: PaddingController,
100}
101
102/// A control command aimed at the generic forward reactor.
103pub(crate) enum CtrlCmd<C> {
104 /// Begin accepting streams on this circuit.
105 //
106 // TODO(DEDUP): this is very similar to its client-side counterpart,
107 // except the hop is a Option<HopNum> instead of a TargetHop.
108 #[cfg(any(feature = "hs-service", feature = "relay"))]
109 AwaitStreamRequests {
110 /// A channel for sending information about an incoming stream request.
111 incoming_sender: StreamReqSender,
112 /// A `CmdChecker` to keep track of which message types are acceptable.
113 cmd_checker: AnyCmdChecker,
114 /// Oneshot channel to notify on completion.
115 done: ReactorResultChannel<()>,
116 /// The hop that is allowed to create streams.
117 ///
118 /// Set to None if we are a relay wanting to accept stream requests.
119 hop: Option<HopNum>,
120 /// A filter used to check requests before passing them on.
121 filter: Box<dyn IncomingStreamRequestFilter>,
122 },
123 /// An implementation-dependent control command.
124 #[allow(unused)] // TODO(relay)
125 Custom(C),
126}
127
128/// A control message aimed at the generic forward reactor.
129pub(crate) enum CtrlMsg<M> {
130 /// An implementation-dependent control message.
131 #[allow(unused)] // TODO(relay)
132 Custom(M),
133}
134
135/// Trait for customizing the behavior of the forward reactor.
136///
137/// Used for plugging in the implementation-dependent (client vs relay)
138/// parts of the implementation into the generic one.
139pub(crate) trait ForwardHandler: ControlHandler {
140 /// Type that explains how to build an outgoing channel.
141 type BuildSpec: HasRelayIds;
142
143 /// The subclass of ChanMsg that can arrive on this type of circuit.
144 type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
145
146 /// An opaque event type.
147 ///
148 /// The [`ForwardReactor`] polls an MPSC stream yielding `CircEvent`s from the main loop.
149 /// Each event is passed to [`Self::handle_event`] for handling.
150 type CircEvent;
151
152 /// Handle a non-SENDME RELAY message on this circuit with stream ID 0.
153 async fn handle_meta_msg<R: Runtime>(
154 &mut self,
155 runtime: &R,
156 early: bool,
157 hopnum: Option<HopNum>,
158 msg: UnparsedRelayMsg,
159 relay_cell_format: RelayCellFormat,
160 ) -> StdResult<(), ReactorError>;
161
162 /// Handle a forward (TODO terminology) cell.
163 ///
164 /// The cell is
165 /// - moving from the client towards the exit, if we're a relay
166 /// - moving from the guard towards us, if we're a client
167 ///
168 /// Returns an error if the cell should cause the reactor to shut down,
169 /// or a [`ForwardCellDisposition`] specifying how it should be handled.
170 ///
171 /// Returns `None` if the cell was handled internally by this handler.
172 async fn handle_forward_cell<R: Runtime>(
173 &mut self,
174 hop_mgr: &mut HopMgr<R>,
175 cell: Self::CircChanMsg,
176 ) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
177
178 /// Handle an implementation-specific circuit event.
179 ///
180 /// Returns a command for the backward reactor.
181 fn handle_event(
182 &mut self,
183 event: Self::CircEvent,
184 ) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
185
186 /// Wait until the outbound channel, if there is one, is ready to accept more cells.
187 ///
188 /// Resolves immediately if there is no outbound channel.
189 /// Blocks if there is a pending outbound channel.
190 async fn outbound_chan_ready(&mut self) -> Result<()>;
191}
192
193/// What action to take in response to a cell arriving on our inbound Tor channel.
194pub(crate) enum ForwardCellDisposition {
195 /// Handle a decoded RELAY or RELAY_EARLY cell in the [`ForwardReactor`].
196 HandleRecognizedRelay {
197 /// The decoded cell.
198 cell: RelayCellDecoderResult,
199 /// Whether this was a RELAY_EARLY.
200 early: bool,
201 /// The hop this cell was for.
202 hopnum: Option<HopNum>,
203 /// The SENDME tag.
204 tag: SendmeTag,
205 },
206}
207
208impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
209 /// Create a new [`ForwardReactor`].
210 #[allow(clippy::too_many_arguments)] // TODO
211 pub(super) fn new(
212 runtime: R,
213 unique_id: UniqId,
214 inner: F,
215 hop_mgr: HopMgr<R>,
216 inbound_chan_rx: CircuitRxReceiver,
217 control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
218 command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
219 backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
220 circ_events: mpsc::Receiver<F::CircEvent>,
221 padding_ctrl: PaddingController,
222 ) -> Self {
223 Self {
224 runtime,
225 unique_id,
226 inbound_chan_rx,
227 control_rx,
228 command_rx,
229 inner,
230 backward_reactor_tx,
231 hop_mgr,
232 circ_events,
233 padding_ctrl,
234 }
235 }
236
237 /// Helper for [`run`](Self::run).
238 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
239 let outbound_chan_ready = self.inner.outbound_chan_ready();
240
241 let inbound_chan_rx_fut = async {
242 // Avoid reading from the inbound_chan_rx Tor Channel if the outgoing sink is blocked
243 outbound_chan_ready.await?;
244 Ok(self.inbound_chan_rx.next().await)
245 };
246
247 select_biased! {
248 res = self.command_rx.next().fuse() => {
249 let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
250 self.handle_cmd(cmd)
251 }
252 res = self.control_rx.next().fuse() => {
253 let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
254 self.handle_msg(msg)
255 }
256 res = self.circ_events.next().fuse() => {
257 let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
258 if let Some(cmd) = self.inner.handle_event(ev)? {
259 self.send_reactor_cmd(cmd).await?;
260 }
261
262 Ok(())
263 }
264 res = inbound_chan_rx_fut.fuse() => {
265 let cell = res.map_err(ReactorError::Err)?;
266 let Some(cell) = cell else {
267 debug!(
268 circ_id = %self.unique_id,
269 "Backward channel has closed, shutting down forward relay reactor",
270 );
271
272 return Err(ReactorError::Shutdown);
273 };
274
275 let cell: F::CircChanMsg = cell.try_into()?;
276 let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
277 return Ok(());
278 };
279
280 match disp {
281 ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
282 self.handle_relay_cell(cell, early, hopnum, tag).await
283 }
284 }
285 },
286 }
287 }
288
289 /// Handle a control command.
290 fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
291 match cmd {
292 #[cfg(any(feature = "hs-service", feature = "relay"))]
293 CtrlCmd::AwaitStreamRequests {
294 incoming_sender,
295 cmd_checker,
296 done,
297 hop,
298 filter,
299 } => {
300 let handler = IncomingStreamRequestHandler {
301 incoming_sender,
302 cmd_checker,
303 hop_num: hop,
304 filter,
305 };
306
307 // Update the HopMgr with the
308 let ret = self.hop_mgr.set_incoming_handler(handler);
309 let _ = done.send(ret); // don't care if the corresponding receiver goes away.
310 Ok(())
311 }
312 CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
313 }
314 }
315
316 /// Handle a control message.
317 fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
318 match msg {
319 CtrlMsg::Custom(c) => self.inner.handle_msg(c),
320 }
321 }
322
323 /// Note that we have received a RELAY cell.
324 ///
325 /// Updates the padding and CC state.
326 fn note_relay_cell_received(
327 &self,
328 hopnum: Option<HopNum>,
329 c_t_w: bool,
330 ) -> Result<(RelayCellFormat, bool)> {
331 let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
332 let hop = hops
333 .get_mut(hopnum)
334 .ok_or_else(|| internal!("msg from non-existent hop???"))?;
335
336 // Check whether we are allowed to receive more data for this circuit hop.
337 hop.inbound.decrement_cell_limit()?;
338
339 // Decrement the circuit sendme windows, and see if we need to
340 // send a sendme cell.
341 let send_circ_sendme = if c_t_w {
342 hop.ccontrol
343 .lock()
344 .expect("poisoned lock")
345 .note_data_received()?
346 } else {
347 false
348 };
349
350 let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
351
352 Ok((relay_cell_format, send_circ_sendme))
353 }
354
355 /// Handle a RELAY cell.
356 ///
357 // TODO(DEDUP): very similar to Client::handle_relay_cell()
358 async fn handle_relay_cell(
359 &mut self,
360 decode_res: RelayCellDecoderResult,
361 early: bool,
362 hopnum: Option<HopNum>,
363 tag: SendmeTag,
364 ) -> StdResult<(), ReactorError> {
365 // For padding purposes, if we are a relay, we set the hopnum to 0
366 // TODO(relay): is this right?
367 let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
368 if decode_res.is_padding() {
369 self.padding_ctrl.decrypted_padding(hopnum_padding)?;
370 } else {
371 self.padding_ctrl.decrypted_data(hopnum_padding);
372 }
373
374 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
375 let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
376
377 // If we do need to send a circuit-level SENDME cell, do so.
378 if send_circ_sendme {
379 // This always sends a V1 (tagged) sendme cell, and thereby assumes
380 // that SendmeEmitMinVersion is no more than 1. If the authorities
381 // every increase that parameter to a higher number, this will
382 // become incorrect. (Higher numbers are not currently defined.)
383 let sendme = Sendme::from(tag);
384 let msg = AnyRelayMsgOuter::new(None, sendme.into());
385 let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
386
387 // NOTE: sending the SENDME to the backward reactor for handling
388 // might seem counterintuitive, given that we have access to
389 // the congestion control object right here (via hop_mgr).
390 //
391 // However, the forward reactor does not have access to the
392 // outbound_chan_tx part of the inbound (towards the client) Tor channel,
393 // and so it cannot handle the SENDME on its own
394 // (because it cannot obtain the congestion signals),
395 // so the SENDME needs to be handled in the backward reactor.
396 //
397 // NOTE: this will block if the backward reactor is not ready
398 // to send any more cells.
399 self.send_reactor_cmd(forward).await?;
400 }
401
402 let (mut msgs, incomplete) = decode_res.into_parts();
403 while let Some(msg) = msgs.next() {
404 match self
405 .handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
406 .await
407 {
408 Ok(()) => continue,
409 Err(e) => {
410 for m in msgs {
411 debug!(
412 circ_id = %self.unique_id,
413 "Ignoring relay msg received after triggering shutdown: {m:?}",
414 );
415 }
416 if let Some(incomplete) = incomplete {
417 debug!(
418 circ_id = %self.unique_id,
419 "Ignoring partial relay msg received after triggering shutdown: {:?}",
420 incomplete,
421 );
422 }
423
424 return Err(e);
425 }
426 }
427 }
428
429 Ok(())
430 }
431
432 /// Handle a single incoming RELAY message.
433 async fn handle_relay_msg(
434 &mut self,
435 early: bool,
436 hop: Option<HopNum>,
437 msg: UnparsedRelayMsg,
438 relay_cell_format: RelayCellFormat,
439 cell_counts_toward_windows: bool,
440 ) -> StdResult<(), ReactorError> {
441 // If this msg wants/refuses to have a Stream ID, does it
442 // have/not have one?
443 let streamid = msg_streamid(&msg)?;
444
445 // If this doesn't have a StreamId, it's a meta cell,
446 // not meant for a particular stream.
447 let Some(sid) = streamid else {
448 return self
449 .handle_meta_msg(early, hop, msg, relay_cell_format)
450 .await;
451 };
452
453 let msg = StreamMsg {
454 sid,
455 msg,
456 cell_counts_toward_windows,
457 };
458
459 // All messages on streams are handled in the stream reactor
460 // (because that's where the stream map is)
461 //
462 // Internally, this will spawn a StreamReactor for the target hop,
463 // if not already spawned.
464 self.hop_mgr.send(hop, msg).await
465 }
466
467 /// Handle a RELAY or RELAY_EARLY message on this circuit with stream ID 0.
468 async fn handle_meta_msg(
469 &mut self,
470 early: bool,
471 hopnum: Option<HopNum>,
472 msg: UnparsedRelayMsg,
473 relay_cell_format: RelayCellFormat,
474 ) -> StdResult<(), ReactorError> {
475 match msg.cmd() {
476 RelayCmd::SENDME => {
477 let sendme = msg
478 .decode::<Sendme>()
479 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
480 .into_msg();
481
482 let cmd = BackwardReactorCmd::HandleSendme {
483 hop: hopnum,
484 sendme,
485 };
486
487 self.send_reactor_cmd(cmd).await
488 }
489 _ => {
490 self.inner
491 .handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
492 .await
493 }
494 }
495 }
496
497 /// Send a command to the backward reactor.
498 ///
499 /// Blocks if the `backward_reactor_tx` channel is full, i.e. if the backward reactor
500 /// is not ready to send any more cells.
501 ///
502 /// Returns an error if the backward reactor has shut down.
503 async fn send_reactor_cmd(
504 &mut self,
505 forward: BackwardReactorCmd,
506 ) -> StdResult<(), ReactorError> {
507 self.backward_reactor_tx.send(forward).await.map_err(|_| {
508 // The other reactor has shut down
509 ReactorError::Shutdown
510 })
511 }
512}