tor_proto/circuit/reactor/backward.rs
1//! A circuit's view of the backward state of the circuit.
2
3use crate::channel::Channel;
4use crate::circuit::UniqId;
5use crate::circuit::cell_sender::CircuitCellSender;
6use crate::circuit::reactor::ControlHandler;
7use crate::circuit::reactor::circhop::CircHopList;
8use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
9use crate::circuit::reactor::stream::ReadyStreamMsg;
10use crate::congestion::{CongestionControl, sendme};
11use crate::crypto::cell::RelayCellBody;
12use crate::util::err::ReactorError;
13use crate::util::poll_all::PollAll;
14use crate::{Error, HopNum, Result};
15
16// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
17use crate::client::circuit::padding::{
18 self, PaddingController, PaddingEvent, PaddingEventStream, QueuedCellPaddingInfo,
19};
20
21use tor_cell::chancell::msg::{AnyChanMsg, Relay};
22use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCmd, CircId};
23use tor_cell::relaycell::msg::{Sendme, SendmeTag};
24use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, RelayCmd};
25use tor_error::internal;
26use tor_rtcompat::{DynTimeProvider, Runtime};
27
28use derive_deftly::Deftly;
29use futures::SinkExt;
30use futures::channel::mpsc;
31use futures::{FutureExt as _, StreamExt, future, select_biased};
32use tracing::{debug, trace};
33
34use std::pin::Pin;
35use std::result::Result as StdResult;
36use std::sync::{Arc, Mutex, RwLock};
37
38use crate::circuit::CircuitRxReceiver;
39
40#[cfg(feature = "circ-padding")]
41use crate::circuit::padding::{CircPaddingDisposition, padding_disposition};
42
43#[cfg(feature = "relay")]
44use tor_cell::relaycell::msg::Extended2;
45
46/// The "backward" circuit reactor of a relay.
47///
48/// See the [`reactor`](crate::circuit::reactor) module-level docs.
49///
50/// Shuts downs down if an error occurs, or if the [`Reactor`](super::Reactor),
51/// [`ForwardReactor`](super::ForwardReactor), or if one of the
52/// [`StreamReactor`](super::stream::StreamReactor)s of this circuit shuts down:
53///
54/// * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
55/// (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
56/// * if `ForwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
57/// which, in turn, causes the `BackwardReactor` to shut down as described above
58/// * if one of the `StreamReactor`s shuts down, the `ForwardReactor` will
59/// notice when it next tries to deliver a stream message to it, and shut down,
60/// causing the `BackwardReactor` and top-level `Reactor` to follow suit
61#[derive(Deftly)]
62#[derive_deftly(CircuitReactor)]
63#[deftly(reactor_name = "backward reactor")]
64#[deftly(run_inner_fn = "Self::run_once")]
65#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
66pub(super) struct BackwardReactor<B: BackwardHandler> {
67 /// The time provider.
68 time_provider: DynTimeProvider,
69 /// An identifier for logging about this reactor's circuit.
70 unique_id: UniqId,
71 /// The circuit identifier on the backward Tor channel.
72 circ_id: CircId,
73 /// The inbound Tor channel.
74 channel: Arc<Channel>,
75 /// Implementation-dependent part of the reactor.
76 ///
77 /// This enables us to customize the behavior of the reactor,
78 /// depending on whether we are a client or a relay.
79 inner: B,
80 /// The reading end of the outbound Tor channel, if we are not the last hop.
81 ///
82 /// Yields cells moving from the exit towards the client, if we are a middle relay.
83 outbound_chan_rx: Option<CircuitRxReceiver>,
84 /// The per-hop state, shared with the forward reactor.
85 ///
86 /// The backward reactor acquires a read lock to this whenever it needs to
87 ///
88 /// * send a circuit-level SENDME
89 /// * handle a circuit-level SENDME
90 /// * send a padding cell
91 ///
92 // Note: For the sending/handling of SENDMEs, we lock the hop list
93 // to extract the relay cell format and CC state of the hop.
94 // Technically, for the SENDME cases, we could've avoided locking
95 // the hop list from the BWD, by having the FWD share the relay cell format
96 // and CC state in the BackwardReactorCmd::{Send,Handle}Sendme command.
97 // But for the padding case, we *need* the hop list, because we need
98 // to work out what relay cell format to use when sending the padding cell.
99 // But for the sake of simplicity, I made the BWD consult the CircHopList in all cases.
100 //
101 // TODO: the backward reactor only ever reads from this.
102 // Conceptually, it is the forward reactor's HopMgr that owns this list:
103 // only HopMgr can add hops to the list.
104 //
105 // Perhaps we need a specialized abstraction that only allows reading here.
106 // This could be a wrapper over RwLock, providing a read-only API.
107 hops: Arc<RwLock<CircHopList>>,
108 /// The sending end of the backward Tor channel.
109 ///
110 /// Delivers cells towards the other endpoint: towards the client, if we are a relay,
111 /// or towards the exit, if we are a client.
112 inbound_chan_tx: CircuitCellSender,
113 /// Channel for receiving control commands.
114 command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
115 /// Channel for receiving control messages.
116 control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
117 /// Receiver for [`BackwardReactorCmd`]s coming from the forward reactor.
118 ///
119 /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
120 /// carrying Tor stream data to us.
121 ///
122 /// This serves a dual purpose:
123 ///
124 /// * it enables the `ForwardReactor` to deliver Tor stream data received
125 /// from the other endpoint
126 /// * it lets the `BackwardReactor` know if the `ForwardReactor` has shut down:
127 /// we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
128 /// shuts down, we will get EOS upon calling `.next()`)
129 forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
130 /// A channel for receiving endpoint-bound stream messages from the StreamReactor(s)
131 /// (the stream messages are client-bound if we are a relay, or exit-bound if we are a client).
132 stream_rx: mpsc::Receiver<ReadyStreamMsg>,
133 /// A padding controller to which padding-related events should be reported.
134 padding_ctrl: PaddingController,
135 /// An event stream telling us about padding-related events.
136 padding_event_stream: PaddingEventStream,
137 /// Current rules for blocking traffic, according to the padding controller.
138 #[cfg(feature = "circ-padding")]
139 padding_block: Option<padding::StartBlocking>,
140}
141
142/// A control message aimed at the generic forward reactor.
143pub(crate) enum CtrlMsg<M> {
144 /// An implementation-dependent control message.
145 #[allow(unused)] // TODO(relay)
146 Custom(M),
147}
148
149/// A control command aimed at the generic forward reactor.
150pub(crate) enum CtrlCmd<C> {
151 /// An implementation-dependent control command.
152 #[allow(unused)] // TODO(relay)
153 Custom(C),
154}
155
156/// Trait for customizing the behavior of the backward reactor.
157///
158/// Used for plugging in the implementation-dependent (client vs relay)
159/// parts of the implementation into the generic one.
160pub(crate) trait BackwardHandler: ControlHandler {
161 /// The subclass of ChanMsg that can arrive on this type of circuit.
162 type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error> + Send;
163
164 /// Encrypt a RelayCellBody that is moving in the backward direction.
165 fn encrypt_relay_cell(
166 &mut self,
167 cmd: ChanCmd,
168 body: &mut RelayCellBody,
169 hop: Option<HopNum>,
170 ) -> SendmeTag;
171
172 /// Handle a cell that was read from the Tor outbound channel.
173 ///
174 /// Returns an error if the cell should cause the reactor to shut down,
175 /// or a [`BackwardCellDisposition`] specifying how it should be handled.
176 fn handle_backward_cell(
177 &mut self,
178 circ_id: UniqId,
179 cell: Self::CircChanMsg,
180 ) -> StdResult<BackwardCellDisposition, ReactorError>;
181}
182
183/// What action to take in response to a cell arriving on our outbound Tor channel.
184pub(crate) enum BackwardCellDisposition {
185 /// Forward the cell, writing it to the inbound Tor channel.
186 Forward(AnyChanMsg),
187}
188
189#[allow(unused)] // TODO(relay)
190impl<B: BackwardHandler> BackwardReactor<B> {
191 /// Create a new [`BackwardReactor`].
192 #[allow(clippy::too_many_arguments)] // TODO
193 pub(super) fn new<R: Runtime>(
194 runtime: R,
195 channel: &Arc<Channel>,
196 circ_id: CircId,
197 unique_id: UniqId,
198 inner: B,
199 hops: Arc<RwLock<CircHopList>>,
200 forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
201 control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
202 command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
203 padding_ctrl: PaddingController,
204 padding_event_stream: PaddingEventStream,
205 stream_rx: mpsc::Receiver<ReadyStreamMsg>,
206 ) -> Self {
207 let channel = Arc::clone(channel);
208 let inbound_chan_tx = CircuitCellSender::from_channel_sender(channel.sender());
209
210 Self {
211 time_provider: DynTimeProvider::new(runtime),
212 outbound_chan_rx: None,
213 channel,
214 inner,
215 hops,
216 inbound_chan_tx,
217 unique_id,
218 circ_id,
219 forward_reactor_rx,
220 control_rx,
221 command_rx,
222 stream_rx,
223 padding_ctrl,
224 padding_event_stream,
225 #[cfg(feature = "circ-padding")]
226 padding_block: None,
227 }
228 }
229
230 /// Helper for [`run`](Self::run).
231 ///
232 /// Handles cells arriving on the outbound Tor channel,
233 /// and writes cells to the inbound Tor channel.
234 ///
235 /// Because the Tor application streams, the `forward_reactor_rx` MPSC streams,
236 /// and the outbound Tor channel MPSC stream are driven concurrently using [`PollAll`],
237 /// this function can send up to 3 cells per call over the inbound Tor channel:
238 ///
239 /// * a cell carrying Tor stream data
240 /// * a cell received from the outbound Tor channel, if we are a relay
241 /// (moving from the exit towards the client)
242 /// * a circuit-level SENDME
243 ///
244 /// However, in practice, leaky pipe is not really used,
245 /// and so relays that have application streams (i.e. the exits),
246 /// are not going to have an outbound Tor channel,
247 /// and so this will only really drive Tor stream data,
248 /// delivering at most 2 cells per call.
249 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
250 use postage::prelude::{Sink as _, Stream as _};
251
252 /// The maximum number of events we expect to handle per reactor loop.
253 ///
254 /// This is bounded by the number of futures we push into the PollAll.
255 const PER_LOOP_EVENT_COUNT: usize = 3;
256
257 // A collection of futures we plan to drive concurrently.
258 let mut poll_all =
259 PollAll::<PER_LOOP_EVENT_COUNT, Option<CircuitEvent<B::CircChanMsg>>>::new();
260
261 // Flush the backward Tor channel sink, and check it for readiness
262 //
263 // TODO(flushing): here and everywhere else we need to flush:
264 //
265 // Currently, we try to flush every time we want to write to the sink,
266 // but may be suboptimal.
267 //
268 // However, we don't actually *wait* for the flush to complete
269 // (we just make a bit of progress by calling poll_flush),
270 // so it's possible that this is actually tolerable.
271 // We should run some tests, and if this turns out to be a performance bottleneck,
272 // we'll have to rethink our flushing approach.
273 let backward_chan_ready = future::poll_fn(|cx| {
274 // The flush outcome doesn't matter,
275 // so we simply move on to the readiness check.
276 // The reason we don't wait on the flush is because we don't
277 // want to flush on *every* reactor loop, but we do want to make
278 // a bit of progress each time.
279 //
280 // (TODO: do we want to handle errors here?)
281 let _ = self.inbound_chan_tx.poll_flush_unpin(cx);
282
283 self.inbound_chan_tx.poll_ready_unpin(cx)
284 });
285
286 // Concurrently, drive :
287 // 1. a future that reads from the StreamReactor, to see if there are
288 // any application streams that have a message to send
289 // (this resolves to a message that needs to be delivered to the peer)
290 poll_all.push(async {
291 // Internally, each stream reactor checks if we're allowed to send anything
292 // that counts towards SENDME windows (and ceases to send us stream data if not)
293 //
294 // The reason we don't check that here is because stream_rx multiplexes stream data
295 // from all hops, and we have no way of knowing which hop will want to send us stream
296 // data next, and therefore we can't know which hop's CC object to use
297 self.stream_rx.next().await.map(CircuitEvent::Send)
298 });
299
300 // 2. the stream of commands coming from the ForwardReactor
301 // (this resolves to a BackwardReactorCmd)
302 poll_all.push(async {
303 let event = match self.forward_reactor_rx.next().await {
304 Some(cmd) => CircuitEvent::Forwarded(cmd),
305 None => {
306 // The forward reactor has crashed, so we have to shut down.
307 CircuitEvent::ForwardShutdown
308 }
309 };
310
311 Some(event)
312 });
313
314 // 3. Messages moving from the outbound channel towards the inbound Tor channel,
315 // if we have an outbound Tor channel.
316 //
317 // NOTE: in practice, clients and exits won't have an outbound Tor channel,
318 // so for them this will be a no-op.
319 poll_all.push(async {
320 let event = if let Some(outbound_chan_rx) = self.outbound_chan_rx.as_mut() {
321 // Forward channel unexpectedly closed, we should close too
322 match outbound_chan_rx.next().await {
323 Some(msg) => match msg.try_into() {
324 Err(e) => CircuitEvent::ProtoViolation(e),
325 Ok(cell) => CircuitEvent::Cell(cell),
326 },
327 None => {
328 // The forward reactor has crashed, so we have to shut down.
329 CircuitEvent::ForwardShutdown
330 }
331 }
332 } else {
333 future::pending().await
334 };
335
336 Some(event)
337 });
338
339 let poll_all = async move {
340 // Avoid polling **any** of the futures if the outgoing sink is blocked.
341 //
342 // This implements backpressure: we avoid reading from our input sources
343 // if we know we're unable to write to the inbound Tor channel sink.
344 //
345 // More specifically, if our inbound Tor channel sink is full and can no longer
346 // accept cells, we stop reading:
347 //
348 // 1. From the application streams (received from StreamReactor), if there are any.
349 //
350 // 2. From the forward_reactor_rx channel, used by the forward reactor to send us
351 //
352 // - a circuit-level SENDME that we have received, or
353 // - a circuit-level SENDME that we need to deliver to the client
354 //
355 // Not reading from the forward_reactor_rx channel, in turn, causes the forward reactor
356 // to block and therefore stop reading from **its** input sources,
357 // propagating backpressure all the way to the other endpoint of the circuit.
358 //
359 // 3. From the outbound Tor channel, if there is one.
360 //
361 // This will delay any SENDMEs the client or exit might have sent along
362 // the way, and therefore count as a congestion signal.
363 //
364 // TODO: memquota setup to make sure this doesn't turn into a memory DOS vector
365 let _ = backward_chan_ready.await;
366
367 // TODO: it's important to not block reading from the forward_reactor_rx channel on the chan
368 // sender readiness (for instance, we should not block the sending of SENDMEs
369 // if the channel is blocked on a padding-induced block).
370 //
371 // This means we will need to move the forward_reactor_rx handling out of the PollAll
372 // to the select_biased! below.
373 poll_all.await
374 };
375
376 let events = select_biased! {
377 res = self.command_rx.next().fuse() => {
378 let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
379 self.handle_cmd(cmd)?;
380 return Ok(());
381 }
382 res = self.control_rx.next().fuse() => {
383 let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
384 self.handle_msg(msg)?;
385 return Ok(());
386 }
387 res = self.padding_event_stream.next().fuse() => {
388 // If there's a padding event, we need to handle it immediately,
389 // because it might tell us to start blocking the inbound_chan_tx sink,
390 // which, in turn, means we need to stop trying to read from
391 // the application streams.
392 let event = res.ok_or_else(|| ReactorError::Shutdown)?;
393
394 cfg_if::cfg_if! {
395 if #[cfg(feature = "circ-padding")] {
396 self.run_padding_event(event).await?;
397 } else {
398 // If padding isn't enabled, we never generate a padding event,
399 // so we can be sure this case will never be called.
400 void::unreachable(event.0);
401 }
402 }
403 return Ok(())
404 }
405 res = poll_all.fuse() => res,
406 };
407
408 // Note: there shouldn't be more than N < PER_LOOP_EVENT_COUNT events to handle
409 // per reactor loop. We need to be careful here, because we must avoid blocking
410 // the reactor.
411 //
412 // If handling more than one event per loop turns out to be a problem, we may
413 // need to dispatch this to a background task instead.
414 //
415 // TODO(relay): this loop is actually a problem.
416 // As mentioned in the run_once() docs, this will attempt to send up
417 // to 3 cells on the inbound tor Channel (or 2 cells, assuming no leaky pipe).
418 //
419 // The problem is that the readiness check above (see backward_chan_ready)
420 // only checks that the queue has enough room for 1 cell, not *2 cells*.
421 // Trying to send more than 2 cell when there is only room for one
422 // will cause the reactor to block (and because there is nothing
423 // driving the flushing of this channel, this will be a hard block).
424 //
425 // We need to rethink the strategy here (e.g. by flushing in parallel
426 // with handle_event())
427 for event in events.into_iter().flatten() {
428 self.handle_event(event).await?;
429 }
430
431 Ok(())
432 }
433
434 /// Handle a control command.
435 fn handle_cmd(&mut self, cmd: CtrlCmd<B::CtrlCmd>) -> StdResult<(), ReactorError> {
436 match cmd {
437 CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
438 }
439 }
440
441 /// Handle a control message.
442 fn handle_msg(&mut self, msg: CtrlMsg<B::CtrlMsg>) -> StdResult<(), ReactorError> {
443 match msg {
444 CtrlMsg::Custom(c) => self.inner.handle_msg(c),
445 }
446 }
447
448 /// Perform some circuit-padding-based event on the specified circuit.
449 //
450 // TODO(DEDUP): this is almost identical to the client-side Conflux::run_padding_event()
451 #[cfg(feature = "circ-padding")]
452 async fn run_padding_event(
453 &mut self,
454 padding_event: PaddingEvent,
455 ) -> StdResult<(), ReactorError> {
456 use PaddingEvent as E;
457
458 match padding_event {
459 E::SendPadding(send_padding) => {
460 self.send_padding(send_padding).await?;
461 }
462 E::StartBlocking(start_blocking) => {
463 self.start_blocking_for_padding(start_blocking);
464 }
465 E::StopBlocking => {
466 self.stop_blocking_for_padding();
467 }
468 }
469 Ok(())
470 }
471
472 /// Handle a request from our padding subsystem to send a padding packet.
473 //
474 // TODO(DEDUP): this is almost identical to the client-side Client::send_padding()
475 #[cfg(feature = "circ-padding")]
476 async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
477 use CircPaddingDisposition::*;
478
479 let target_hop = send_padding.hop;
480
481 match padding_disposition(
482 &send_padding,
483 &self.inbound_chan_tx,
484 self.padding_block.as_ref(),
485 ) {
486 QueuePaddingNormally => {
487 let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
488 self.queue_padding_cell_for_hop(target_hop, queue_info)
489 .await?;
490 }
491 QueuePaddingAndBypass => {
492 let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
493 self.queue_padding_cell_for_hop(target_hop, queue_info)
494 .await?;
495 }
496 TreatQueuedCellAsPadding => {
497 self.padding_ctrl
498 .replaceable_padding_already_queued(target_hop, send_padding);
499 }
500 }
501 Ok(())
502 }
503
504 /// Enable padding-based blocking,
505 /// or change the rule for padding-based blocking to the one in `block`.
506 //
507 // TODO(DEDUP): copy of Client::start_blocking_for_padding()
508 #[cfg(feature = "circ-padding")]
509 pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
510 self.inbound_chan_tx.start_blocking();
511 self.padding_block = Some(block);
512 }
513
514 /// Disable padding-based blocking.
515 ///
516 // TODO(DEDUP): copy of Client::stop_blocking_for_padding()
517 #[cfg(feature = "circ-padding")]
518 pub(super) fn stop_blocking_for_padding(&mut self) {
519 self.inbound_chan_tx.stop_blocking();
520 self.padding_block = None;
521 }
522
523 /// Generate and encrypt a padding cell, and send it to a targeted hop.
524 ///
525 /// Ignores any padding-based blocking.
526 ///
527 // TODO(DEDUP): copy of Client::queue_padding_cell_for_hop()
528 #[cfg(feature = "circ-padding")]
529 async fn queue_padding_cell_for_hop(
530 &mut self,
531 target_hop: HopNum,
532 queue_info: Option<QueuedCellPaddingInfo>,
533 ) -> Result<()> {
534 use tor_cell::relaycell::msg::Drop as DropMsg;
535
536 let msg = AnyRelayMsgOuter::new(None, DropMsg::default().into());
537 let hopnum = Some(target_hop);
538
539 // TODO: the ccontrol state isn't actually needed here, because
540 // DROP cells don't count towards SENDME windows.
541 // Technically, we could avoid unnecessarily Arc::clone()ing the CC state
542 // here, and just extract the relay cell format.
543 // But for that we would need a specialized send_relay_cell_inner()-like function
544 // that doesn't take a CC object, or to make the CC object optional in
545 // send_relay_cell_inner().
546 let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
547
548 self.send_relay_cell_inner(hopnum, relay_cell_format, msg, false, &ccontrol, queue_info)
549 .await
550 }
551
552 /// Determine how exactly to handle a request to handle padding.
553 #[cfg(feature = "circ-padding")]
554 fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
555 crate::circuit::padding::padding_disposition(
556 send_padding,
557 &self.inbound_chan_tx,
558 self.padding_block.as_ref(),
559 )
560 }
561
562 /// Handle a circuit event.
563 async fn handle_event(
564 &mut self,
565 event: CircuitEvent<B::CircChanMsg>,
566 ) -> StdResult<(), ReactorError> {
567 use CircuitEvent::*;
568
569 match event {
570 Cell(cell) => self.handle_backward_cell(cell).await,
571 Send(msg) => {
572 let ReadyStreamMsg {
573 hop,
574 relay_cell_format,
575 msg,
576 ccontrol,
577 } = msg;
578
579 self.send_relay_cell(hop, relay_cell_format, msg, false, &ccontrol)
580 .await?;
581
582 Ok(())
583 }
584 Forwarded(cmd) => self.handle_reactor_cmd(cmd).await,
585 ForwardShutdown => {
586 // The forward reactor has crashed, so we have to shut down.
587 trace!(
588 circ_id = %self.unique_id,
589 "Backward relay reactor shutdown (forward reactor has closed)",
590 );
591
592 Err(ReactorError::Shutdown)
593 }
594 ProtoViolation(err) => Err(err.into()),
595 }
596 }
597
598 /// Return the RelayCellFormat and CC state of a given hop.
599 fn hop_info(
600 &self,
601 hopnum: Option<HopNum>,
602 ) -> Result<(RelayCellFormat, Arc<Mutex<CongestionControl>>)> {
603 let hops = self.hops.read().expect("poisoned lock");
604 let hop = hops
605 .get(hopnum)
606 .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
607 let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
608 let ccontrol = Arc::clone(&hop.ccontrol);
609
610 Ok((relay_cell_format, ccontrol))
611 }
612
613 /// Handle a command sent to us by the forward reactor.
614 async fn handle_reactor_cmd(&mut self, msg: BackwardReactorCmd) -> StdResult<(), ReactorError> {
615 use BackwardReactorCmd::*;
616
617 match msg {
618 SendRelayMsg { hop, msg } => {
619 self.send_relay_msg(hop, msg).await?;
620 }
621 HandleSendme { hop, sendme } => {
622 self.handle_sendme(hop, sendme).await?;
623 return Ok(());
624 }
625 #[cfg(feature = "relay")]
626 HandleCircuitExtended {
627 hop,
628 extended2,
629 outbound_chan_rx,
630 } => {
631 self.outbound_chan_rx = Some(outbound_chan_rx);
632 let msg = AnyRelayMsgOuter::new(None, extended2.into());
633 self.send_relay_msg(hop, msg).await?;
634
635 debug!(
636 circ_id = %self.unique_id,
637 "Extended circuit to the next hop"
638 );
639 }
640 }
641
642 Ok(())
643 }
644
645 /// Send a relay message to the specified hop.
646 async fn send_relay_msg(
647 &mut self,
648 hopnum: Option<HopNum>,
649 msg: AnyRelayMsgOuter,
650 ) -> StdResult<(), ReactorError> {
651 let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
652 let cmd = msg.cmd();
653
654 // TODO(relay): remove this log once we add some tests
655 // and confirm relaying cells works as expected
656 // (in practice it will be too noisy to be useful, even at trace level).
657 trace!(
658 circ_id = %self.unique_id,
659 hopnum=?hopnum,
660 cmd = %cmd,
661 "Sending backward cell"
662 );
663
664 self.send_relay_cell(hopnum, relay_cell_format, msg, false, &ccontrol)
665 .await?;
666
667 if cmd == RelayCmd::SENDME {
668 ccontrol.lock().expect("poisoned lock").note_sendme_sent();
669 }
670
671 Ok(())
672 }
673
674 /// Handle a circuit-level SENDME (stream ID = 0).
675 ///
676 /// Returns an error if the SENDME does not have an authentication tag
677 /// (versions of Tor <=0.3.5 omit the SENDME tag, but we don't support
678 /// those any longer).
679 ///
680 /// Any error returned from this function will shut down the reactor.
681 ///
682 // TODO(DEDUP): duplicates the logic from the client-side Circuit::handle_sendme()
683 async fn handle_sendme(
684 &mut self,
685 hopnum: Option<HopNum>,
686 sendme: Sendme,
687 ) -> StdResult<(), ReactorError> {
688 let tag = sendme
689 .into_sendme_tag()
690 .ok_or_else(|| Error::CircProto("missing tag on circuit sendme".into()))?;
691
692 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
693 // future which *should* resolve immediately
694 let signals = self.inbound_chan_tx.congestion_signals().await;
695
696 let hops = self.hops.read().expect("poisoned lock");
697 let hop = hops
698 .get(hopnum)
699 .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
700
701 // Update the CC object that we received a SENDME along
702 // with possible congestion signals.
703 hop.ccontrol
704 .lock()
705 .expect("poisoned lock")
706 .note_sendme_received(&self.time_provider, tag, signals)?;
707
708 Ok(())
709 }
710
711 /// Encode `msg` and encrypt it, returning the resulting cell
712 /// and tag that should be expected for an authenticated SENDME sent
713 /// in response to that cell.
714 ///
715 // TODO(DEDUP): duplicates the logic from the client-side Circuit::encode_relay_cell()
716 fn encode_relay_cell(
717 &mut self,
718 relay_format: RelayCellFormat,
719 hop: Option<HopNum>,
720 early: bool,
721 msg: AnyRelayMsgOuter,
722 ) -> Result<(AnyChanMsg, SendmeTag)> {
723 let mut body: RelayCellBody = msg
724 .encode(relay_format, &mut rand::rng())
725 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
726 .into();
727 let cmd = if early {
728 ChanCmd::RELAY_EARLY
729 } else {
730 ChanCmd::RELAY
731 };
732
733 // Use the implementation-dependent encryption logic
734 let tag = self.inner.encrypt_relay_cell(cmd, &mut body, hop);
735 let msg = Relay::from(BoxedCellBody::from(body));
736 let msg = if early {
737 AnyChanMsg::RelayEarly(msg.into())
738 } else {
739 AnyChanMsg::Relay(msg)
740 };
741
742 Ok((msg, tag))
743 }
744
745 /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
746 ///
747 /// If there is insufficient outgoing *circuit-level* or *stream-level*
748 /// SENDME window, an error is returned instead.
749 ///
750 /// Does not check whether the cell is well-formed or reasonable.
751 async fn send_relay_cell(
752 &mut self,
753 hop: Option<HopNum>,
754 relay_cell_format: RelayCellFormat,
755 msg: AnyRelayMsgOuter,
756 early: bool,
757 ccontrol: &Arc<Mutex<CongestionControl>>,
758 ) -> Result<()> {
759 self.send_relay_cell_inner(hop, relay_cell_format, msg, early, ccontrol, None)
760 .await
761 }
762
763 /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
764 /// [`QueuedCellPaddingInfo`] in `padding_info`.
765 ///
766 /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
767 /// padding controller.
768 ///
769 // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
770 async fn send_relay_cell_inner(
771 &mut self,
772 hop: Option<HopNum>,
773 relay_cell_format: RelayCellFormat,
774 msg: AnyRelayMsgOuter,
775 early: bool,
776 ccontrol: &Arc<Mutex<CongestionControl>>,
777 padding_info: Option<QueuedCellPaddingInfo>,
778 ) -> Result<()> {
779 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
780 let (msg, tag) = self.encode_relay_cell(relay_cell_format, hop, early, msg)?;
781 let cell = AnyChanCell::new(Some(self.circ_id), msg);
782
783 // TODO: we use HopNum(0) if we're a relay (i.e. if the hop is None).
784 // Is that ok?
785 let hop = hop.unwrap_or_else(|| HopNum::from(0));
786 // Remember that we've enqueued this cell.
787 let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
788
789 // Note: this future is always `Ready`, because we checked the sink for readiness
790 // before polling the async streams, so await won't block.
791 Pin::new(&mut self.inbound_chan_tx)
792 .send_unbounded((cell, padding_info))
793 .await?;
794
795 if c_t_w {
796 ccontrol
797 .lock()
798 .expect("poisoned lock")
799 .note_data_sent(&self.time_provider, &tag)?;
800 }
801
802 Ok(())
803 }
804
805 /// Handle a backward cell (moving from the exit towards the client).
806 async fn handle_backward_cell(&mut self, cell: B::CircChanMsg) -> StdResult<(), ReactorError> {
807 match self.inner.handle_backward_cell(self.unique_id, cell)? {
808 BackwardCellDisposition::Forward(cell) => {
809 let cell = AnyChanCell::new(Some(self.circ_id), cell);
810 self.inbound_chan_tx
811 .send((cell, None))
812 .await
813 .map_err(ReactorError::Err)
814 }
815 }
816 }
817}
818
819impl<B: BackwardHandler> Drop for BackwardReactor<B> {
820 fn drop(&mut self) {
821 // This will send a DESTROY down the inbound Tor channel
822 let _ = self.channel.close_circuit(self.circ_id);
823 }
824}
825
826/// A circuit event that must be handled by the [`BackwardReactor`].
827enum CircuitEvent<M> {
828 /// We received a cell that needs to be handled.
829 ///
830 /// The cell is client-bound if we are a relay, or exit-bound if we are a client).
831 Cell(M),
832 /// We received a RELAY cell from the stream reactor that needs
833 /// to be packaged and written to our Tor channel.
834 ///
835 /// The message is client-bound if we are a relay, or exit-bound if we are a client).
836 Send(ReadyStreamMsg),
837 /// We received a cell from the ForwardReactor that we need to handle.
838 ///
839 /// This might be
840 ///
841 /// * a circuit-level SENDME that we have received, or
842 /// * a circuit-level SENDME that we need to deliver to the client
843 Forwarded(BackwardReactorCmd),
844 /// The forward reactor has shut down.
845 ///
846 /// We need to shut down too.
847 ForwardShutdown,
848 /// Protocol violation.
849 ///
850 /// This can happen if we receive a channel message that is not supported on the channel.
851 ProtoViolation(Error),
852}
853
854/// Instructions from the forward reactor.
855pub(crate) enum BackwardReactorCmd {
856 /// A circuit SENDME we received from the other endpoint.
857 HandleSendme {
858 /// The hop the SENDME came on.
859 hop: Option<HopNum>,
860 /// The SENDME.
861 sendme: Sendme,
862 },
863 /// A message we need to send back to the other endpoint.
864 SendRelayMsg {
865 /// The hop to encode the message for.
866 hop: Option<HopNum>,
867 /// The message to send.
868 msg: AnyRelayMsgOuter,
869 },
870 /// This relay circuit was extended by another hop.
871 ///
872 /// This causes the reactor send the `extended2` message on its inbound channel,
873 /// and start reading from `outbound_chan_rx` in the main loop.
874 //
875 ///
876 // TODO: I wish we didn't need to expose this relay-specific variant
877 // in the generic reactor but we have no choice: abstracting it away
878 // means either introducing a mutex between the relay-side forward/backward
879 // handlers, or yet another mpsc between them.
880 #[cfg(feature = "relay")]
881 HandleCircuitExtended {
882 /// The hop to encode the message for.
883 ///
884 /// In practice, this is always None, because only relays use this.
885 hop: Option<HopNum>,
886 /// The cell to send to the specified hop,
887 extended2: Extended2,
888 /// The reading end of the outbound Tor channel, if we are not the last hop.
889 ///
890 /// Yields cells moving from the exit towards the client, if we are a middle relay.
891 outbound_chan_rx: CircuitRxReceiver,
892 },
893}