Skip to main content

tor_proto/circuit/reactor/
backward.rs

1//! A circuit's view of the backward state of the circuit.
2
3use crate::channel::Channel;
4use crate::circuit::UniqId;
5use crate::circuit::cell_sender::CircuitCellSender;
6use crate::circuit::reactor::ControlHandler;
7use crate::circuit::reactor::circhop::CircHopList;
8use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
9use crate::circuit::reactor::stream::ReadyStreamMsg;
10use crate::congestion::{CongestionControl, sendme};
11use crate::crypto::cell::RelayCellBody;
12use crate::util::err::ReactorError;
13use crate::util::poll_all::PollAll;
14use crate::{Error, HopNum, Result};
15
16// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
17use crate::client::circuit::padding::{
18    self, PaddingController, PaddingEvent, PaddingEventStream, QueuedCellPaddingInfo,
19};
20
21use tor_cell::chancell::msg::{AnyChanMsg, Relay};
22use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCmd, CircId};
23use tor_cell::relaycell::msg::{Sendme, SendmeTag};
24use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, RelayCmd};
25use tor_error::internal;
26use tor_rtcompat::{DynTimeProvider, Runtime};
27
28use derive_deftly::Deftly;
29use futures::SinkExt;
30use futures::channel::mpsc;
31use futures::{FutureExt as _, StreamExt, future, select_biased};
32use tracing::{debug, trace};
33
34use std::pin::Pin;
35use std::result::Result as StdResult;
36use std::sync::{Arc, Mutex, RwLock};
37
38use crate::circuit::CircuitRxReceiver;
39
40#[cfg(feature = "circ-padding")]
41use crate::circuit::padding::{CircPaddingDisposition, padding_disposition};
42
43#[cfg(feature = "relay")]
44use tor_cell::relaycell::msg::Extended2;
45
46/// The "backward" circuit reactor of a relay.
47///
48/// See the [`reactor`](crate::circuit::reactor) module-level docs.
49///
50/// Shuts downs down if an error occurs, or if the [`Reactor`](super::Reactor),
51/// [`ForwardReactor`](super::ForwardReactor), or if one of the
52/// [`StreamReactor`](super::stream::StreamReactor)s of this circuit shuts down:
53///
54///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
55///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
56///   * if `ForwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
57///     which, in turn, causes the `BackwardReactor` to shut down as described above
58///   * if one of the `StreamReactor`s shuts down, the `ForwardReactor` will
59///     notice when it next tries to deliver a stream message to it, and shut down,
60///     causing the `BackwardReactor` and top-level `Reactor` to follow suit
61#[derive(Deftly)]
62#[derive_deftly(CircuitReactor)]
63#[deftly(reactor_name = "backward reactor")]
64#[deftly(run_inner_fn = "Self::run_once")]
65#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
66pub(super) struct BackwardReactor<B: BackwardHandler> {
67    /// The time provider.
68    time_provider: DynTimeProvider,
69    /// An identifier for logging about this reactor's circuit.
70    unique_id: UniqId,
71    /// The circuit identifier on the backward Tor channel.
72    circ_id: CircId,
73    /// The inbound Tor channel.
74    channel: Arc<Channel>,
75    /// Implementation-dependent part of the reactor.
76    ///
77    /// This enables us to customize the behavior of the reactor,
78    /// depending on whether we are a client or a relay.
79    inner: B,
80    /// The reading end of the outbound Tor channel, if we are not the last hop.
81    ///
82    /// Yields cells moving from the exit towards the client, if we are a middle relay.
83    outbound_chan_rx: Option<CircuitRxReceiver>,
84    /// The per-hop state, shared with the forward reactor.
85    ///
86    /// The backward reactor acquires a read lock to this whenever it needs to
87    ///
88    ///   * send a circuit-level SENDME
89    ///   * handle a circuit-level SENDME
90    ///   * send a padding cell
91    ///
92    // Note: For the sending/handling of SENDMEs, we lock the hop list
93    // to extract the relay cell format and CC state of the hop.
94    // Technically, for the SENDME cases, we could've avoided locking
95    // the hop list from the BWD, by having the FWD share the relay cell format
96    // and CC state in the BackwardReactorCmd::{Send,Handle}Sendme command.
97    // But for the padding case, we *need* the hop list, because we need
98    // to work out what relay cell format to use when sending the padding cell.
99    // But for the sake of simplicity, I made the BWD consult the CircHopList in all cases.
100    //
101    // TODO: the backward reactor only ever reads from this.
102    // Conceptually, it is the forward reactor's HopMgr that owns this list:
103    // only HopMgr can add hops to the list.
104    //
105    // Perhaps we need a specialized abstraction that only allows reading here.
106    // This could be a wrapper over RwLock, providing a read-only API.
107    hops: Arc<RwLock<CircHopList>>,
108    /// The sending end of the backward Tor channel.
109    ///
110    /// Delivers cells towards the other endpoint: towards the client, if we are a relay,
111    /// or towards the exit, if we are a client.
112    inbound_chan_tx: CircuitCellSender,
113    /// Channel for receiving control commands.
114    command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
115    /// Channel for receiving control messages.
116    control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
117    /// Receiver for [`BackwardReactorCmd`]s coming from the forward reactor.
118    ///
119    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
120    /// carrying Tor stream data to us.
121    ///
122    /// This serves a dual purpose:
123    ///
124    ///   * it enables the `ForwardReactor` to deliver Tor stream data received
125    ///     from the other endpoint
126    ///   * it lets the `BackwardReactor` know if the `ForwardReactor` has shut down:
127    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
128    ///     shuts down, we will get EOS upon calling `.next()`)
129    forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
130    /// A channel for receiving endpoint-bound stream messages from the StreamReactor(s)
131    /// (the stream messages are client-bound if we are a relay, or exit-bound if we are a client).
132    stream_rx: mpsc::Receiver<ReadyStreamMsg>,
133    /// A padding controller to which padding-related events should be reported.
134    padding_ctrl: PaddingController,
135    /// An event stream telling us about padding-related events.
136    padding_event_stream: PaddingEventStream,
137    /// Current rules for blocking traffic, according to the padding controller.
138    #[cfg(feature = "circ-padding")]
139    padding_block: Option<padding::StartBlocking>,
140}
141
142/// A control message aimed at the generic forward reactor.
143pub(crate) enum CtrlMsg<M> {
144    /// An implementation-dependent control message.
145    #[allow(unused)] // TODO(relay)
146    Custom(M),
147}
148
149/// A control command aimed at the generic forward reactor.
150pub(crate) enum CtrlCmd<C> {
151    /// An implementation-dependent control command.
152    #[allow(unused)] // TODO(relay)
153    Custom(C),
154}
155
156/// Trait for customizing the behavior of the backward reactor.
157///
158/// Used for plugging in the implementation-dependent (client vs relay)
159/// parts of the implementation into the generic one.
160pub(crate) trait BackwardHandler: ControlHandler {
161    /// The subclass of ChanMsg that can arrive on this type of circuit.
162    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error> + Send;
163
164    /// Encrypt a RelayCellBody that is moving in the backward direction.
165    fn encrypt_relay_cell(
166        &mut self,
167        cmd: ChanCmd,
168        body: &mut RelayCellBody,
169        hop: Option<HopNum>,
170    ) -> SendmeTag;
171
172    /// Handle a cell that was read from the Tor outbound channel.
173    ///
174    /// Returns an error if the cell should cause the reactor to shut down,
175    /// or a [`BackwardCellDisposition`] specifying how it should be handled.
176    fn handle_backward_cell(
177        &mut self,
178        circ_id: UniqId,
179        cell: Self::CircChanMsg,
180    ) -> StdResult<BackwardCellDisposition, ReactorError>;
181}
182
183/// What action to take in response to a cell arriving on our outbound Tor channel.
184pub(crate) enum BackwardCellDisposition {
185    /// Forward the cell, writing it to the inbound Tor channel.
186    Forward(AnyChanMsg),
187}
188
189#[allow(unused)] // TODO(relay)
190impl<B: BackwardHandler> BackwardReactor<B> {
191    /// Create a new [`BackwardReactor`].
192    #[allow(clippy::too_many_arguments)] // TODO
193    pub(super) fn new<R: Runtime>(
194        runtime: R,
195        channel: &Arc<Channel>,
196        circ_id: CircId,
197        unique_id: UniqId,
198        inner: B,
199        hops: Arc<RwLock<CircHopList>>,
200        forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
201        control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
202        command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
203        padding_ctrl: PaddingController,
204        padding_event_stream: PaddingEventStream,
205        stream_rx: mpsc::Receiver<ReadyStreamMsg>,
206    ) -> Self {
207        let channel = Arc::clone(channel);
208        let inbound_chan_tx = CircuitCellSender::from_channel_sender(channel.sender());
209
210        Self {
211            time_provider: DynTimeProvider::new(runtime),
212            outbound_chan_rx: None,
213            channel,
214            inner,
215            hops,
216            inbound_chan_tx,
217            unique_id,
218            circ_id,
219            forward_reactor_rx,
220            control_rx,
221            command_rx,
222            stream_rx,
223            padding_ctrl,
224            padding_event_stream,
225            #[cfg(feature = "circ-padding")]
226            padding_block: None,
227        }
228    }
229
230    /// Helper for [`run`](Self::run).
231    ///
232    /// Handles cells arriving on the outbound Tor channel,
233    /// and writes cells to the inbound Tor channel.
234    ///
235    /// Because the Tor application streams, the `forward_reactor_rx` MPSC streams,
236    /// and the outbound Tor channel MPSC stream are driven concurrently using [`PollAll`],
237    /// this function can send up to 3 cells per call over the inbound Tor channel:
238    ///
239    ///    * a cell carrying Tor stream data
240    ///    * a cell received from the outbound Tor channel, if we are a relay
241    ///      (moving from the exit towards the client)
242    ///    * a circuit-level SENDME
243    ///
244    /// However, in practice, leaky pipe is not really used,
245    /// and so relays that have application streams (i.e. the exits),
246    /// are not going to have an outbound Tor channel,
247    /// and so this will only really drive Tor stream data,
248    /// delivering at most 2 cells per call.
249    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
250        use postage::prelude::{Sink as _, Stream as _};
251
252        /// The maximum number of events we expect to handle per reactor loop.
253        ///
254        /// This is bounded by the number of futures we push into the PollAll.
255        const PER_LOOP_EVENT_COUNT: usize = 3;
256
257        // A collection of futures we plan to drive concurrently.
258        let mut poll_all =
259            PollAll::<PER_LOOP_EVENT_COUNT, Option<CircuitEvent<B::CircChanMsg>>>::new();
260
261        // Flush the backward Tor channel sink, and check it for readiness
262        //
263        // TODO(flushing): here and everywhere else we need to flush:
264        //
265        // Currently, we try to flush every time we want to write to the sink,
266        // but may be suboptimal.
267        //
268        // However, we don't actually *wait* for the flush to complete
269        // (we just make a bit of progress by calling poll_flush),
270        // so it's possible that this is actually tolerable.
271        // We should run some tests, and if this turns out to be a performance bottleneck,
272        // we'll have to rethink our flushing approach.
273        let backward_chan_ready = future::poll_fn(|cx| {
274            // The flush outcome doesn't matter,
275            // so we simply move on to the readiness check.
276            // The reason we don't wait on the flush is because we don't
277            // want to flush on *every* reactor loop, but we do want to make
278            // a bit of progress each time.
279            //
280            // (TODO: do we want to handle errors here?)
281            let _ = self.inbound_chan_tx.poll_flush_unpin(cx);
282
283            self.inbound_chan_tx.poll_ready_unpin(cx)
284        });
285
286        // Concurrently, drive :
287        //  1. a future that reads from the StreamReactor, to see if there are
288        //  any application streams that have a message to send
289        //  (this resolves to a message that needs to be delivered to the peer)
290        poll_all.push(async {
291            // Internally, each stream reactor checks if we're allowed to send anything
292            // that counts towards SENDME windows (and ceases to send us stream data if not)
293            //
294            // The reason we don't check that here is because stream_rx multiplexes stream data
295            // from all hops, and we have no way of knowing which hop will want to send us stream
296            // data next, and therefore we can't know which hop's CC object to use
297            self.stream_rx.next().await.map(CircuitEvent::Send)
298        });
299
300        //  2. the stream of commands coming from the ForwardReactor
301        //  (this resolves to a BackwardReactorCmd)
302        poll_all.push(async {
303            let event = match self.forward_reactor_rx.next().await {
304                Some(cmd) => CircuitEvent::Forwarded(cmd),
305                None => {
306                    // The forward reactor has crashed, so we have to shut down.
307                    CircuitEvent::ForwardShutdown
308                }
309            };
310
311            Some(event)
312        });
313
314        // 3. Messages moving from the outbound channel towards the inbound Tor channel,
315        // if we have an outbound Tor channel.
316        //
317        // NOTE: in practice, clients and exits won't have an outbound Tor channel,
318        // so for them this will be a no-op.
319        poll_all.push(async {
320            let event = if let Some(outbound_chan_rx) = self.outbound_chan_rx.as_mut() {
321                // Forward channel unexpectedly closed, we should close too
322                match outbound_chan_rx.next().await {
323                    Some(msg) => match msg.try_into() {
324                        Err(e) => CircuitEvent::ProtoViolation(e),
325                        Ok(cell) => CircuitEvent::Cell(cell),
326                    },
327                    None => {
328                        // The forward reactor has crashed, so we have to shut down.
329                        CircuitEvent::ForwardShutdown
330                    }
331                }
332            } else {
333                future::pending().await
334            };
335
336            Some(event)
337        });
338
339        let poll_all = async move {
340            // Avoid polling **any** of the futures if the outgoing sink is blocked.
341            //
342            // This implements backpressure: we avoid reading from our input sources
343            // if we know we're unable to write to the inbound Tor channel sink.
344            //
345            // More specifically, if our inbound Tor channel sink is full and can no longer
346            // accept cells, we stop reading:
347            //
348            //   1. From the application streams (received from StreamReactor), if there are any.
349            //
350            //   2. From the forward_reactor_rx channel, used by the forward reactor to send us
351            //
352            //     - a circuit-level SENDME that we have received, or
353            //     - a circuit-level SENDME that we need to deliver to the client
354            //
355            //     Not reading from the forward_reactor_rx channel, in turn, causes the forward reactor
356            //     to block and therefore stop reading from **its** input sources,
357            //     propagating backpressure all the way to the other endpoint of the circuit.
358            //
359            //   3. From the outbound Tor channel, if there is one.
360            //
361            // This will delay any SENDMEs the client or exit might have sent along
362            // the way, and therefore count as a congestion signal.
363            //
364            // TODO: memquota setup to make sure this doesn't turn into a memory DOS vector
365            let _ = backward_chan_ready.await;
366
367            // TODO: it's important to not block reading from the forward_reactor_rx channel on the chan
368            // sender readiness (for instance, we should not block the sending of SENDMEs
369            // if the channel is blocked on a padding-induced block).
370            //
371            // This means we will need to move the forward_reactor_rx handling out of the PollAll
372            // to the select_biased! below.
373            poll_all.await
374        };
375
376        let events = select_biased! {
377            res = self.command_rx.next().fuse() => {
378                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
379                self.handle_cmd(cmd)?;
380                return Ok(());
381            }
382            res = self.control_rx.next().fuse() => {
383                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
384                self.handle_msg(msg)?;
385                return Ok(());
386            }
387            res = self.padding_event_stream.next().fuse() => {
388                // If there's a padding event, we need to handle it immediately,
389                // because it might tell us to start blocking the inbound_chan_tx sink,
390                // which, in turn, means we need to stop trying to read from
391                // the application streams.
392                let event = res.ok_or_else(|| ReactorError::Shutdown)?;
393
394                cfg_if::cfg_if! {
395                    if #[cfg(feature = "circ-padding")] {
396                        self.run_padding_event(event).await?;
397                    } else {
398                        // If padding isn't enabled, we never generate a padding event,
399                        // so we can be sure this case will never be called.
400                        void::unreachable(event.0);
401                    }
402                }
403                return Ok(())
404            }
405            res = poll_all.fuse() => res,
406        };
407
408        // Note: there shouldn't be more than N < PER_LOOP_EVENT_COUNT events to handle
409        // per reactor loop. We need to be careful here, because we must avoid blocking
410        // the reactor.
411        //
412        // If handling more than one event per loop turns out to be a problem, we may
413        // need to dispatch this to a background task instead.
414        //
415        // TODO(relay): this loop is actually a problem.
416        // As mentioned in the run_once() docs, this will attempt to send up
417        // to 3 cells on the inbound tor Channel (or 2 cells, assuming no leaky pipe).
418        //
419        // The problem is that the readiness check above (see backward_chan_ready)
420        // only checks that the queue has enough room for 1 cell, not *2 cells*.
421        // Trying to send more than 2 cell when there is only room for one
422        // will cause the reactor to block (and because there is nothing
423        // driving the flushing of this channel, this will be a hard block).
424        //
425        // We need to rethink the strategy here (e.g. by flushing in parallel
426        // with handle_event())
427        for event in events.into_iter().flatten() {
428            self.handle_event(event).await?;
429        }
430
431        Ok(())
432    }
433
434    /// Handle a control command.
435    fn handle_cmd(&mut self, cmd: CtrlCmd<B::CtrlCmd>) -> StdResult<(), ReactorError> {
436        match cmd {
437            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
438        }
439    }
440
441    /// Handle a control message.
442    fn handle_msg(&mut self, msg: CtrlMsg<B::CtrlMsg>) -> StdResult<(), ReactorError> {
443        match msg {
444            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
445        }
446    }
447
448    /// Perform some circuit-padding-based event on the specified circuit.
449    //
450    // TODO(DEDUP): this is almost identical to the client-side Conflux::run_padding_event()
451    #[cfg(feature = "circ-padding")]
452    async fn run_padding_event(
453        &mut self,
454        padding_event: PaddingEvent,
455    ) -> StdResult<(), ReactorError> {
456        use PaddingEvent as E;
457
458        match padding_event {
459            E::SendPadding(send_padding) => {
460                self.send_padding(send_padding).await?;
461            }
462            E::StartBlocking(start_blocking) => {
463                self.start_blocking_for_padding(start_blocking);
464            }
465            E::StopBlocking => {
466                self.stop_blocking_for_padding();
467            }
468        }
469        Ok(())
470    }
471
472    /// Handle a request from our padding subsystem to send a padding packet.
473    //
474    // TODO(DEDUP): this is almost identical to the client-side Client::send_padding()
475    #[cfg(feature = "circ-padding")]
476    async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
477        use CircPaddingDisposition::*;
478
479        let target_hop = send_padding.hop;
480
481        match padding_disposition(
482            &send_padding,
483            &self.inbound_chan_tx,
484            self.padding_block.as_ref(),
485        ) {
486            QueuePaddingNormally => {
487                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
488                self.queue_padding_cell_for_hop(target_hop, queue_info)
489                    .await?;
490            }
491            QueuePaddingAndBypass => {
492                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
493                self.queue_padding_cell_for_hop(target_hop, queue_info)
494                    .await?;
495            }
496            TreatQueuedCellAsPadding => {
497                self.padding_ctrl
498                    .replaceable_padding_already_queued(target_hop, send_padding);
499            }
500        }
501        Ok(())
502    }
503
504    /// Enable padding-based blocking,
505    /// or change the rule for padding-based blocking to the one in `block`.
506    //
507    // TODO(DEDUP): copy of Client::start_blocking_for_padding()
508    #[cfg(feature = "circ-padding")]
509    pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
510        self.inbound_chan_tx.start_blocking();
511        self.padding_block = Some(block);
512    }
513
514    /// Disable padding-based blocking.
515    ///
516    // TODO(DEDUP): copy of Client::stop_blocking_for_padding()
517    #[cfg(feature = "circ-padding")]
518    pub(super) fn stop_blocking_for_padding(&mut self) {
519        self.inbound_chan_tx.stop_blocking();
520        self.padding_block = None;
521    }
522
523    /// Generate and encrypt a padding cell, and send it to a targeted hop.
524    ///
525    /// Ignores any padding-based blocking.
526    ///
527    // TODO(DEDUP): copy of Client::queue_padding_cell_for_hop()
528    #[cfg(feature = "circ-padding")]
529    async fn queue_padding_cell_for_hop(
530        &mut self,
531        target_hop: HopNum,
532        queue_info: Option<QueuedCellPaddingInfo>,
533    ) -> Result<()> {
534        use tor_cell::relaycell::msg::Drop as DropMsg;
535
536        let msg = AnyRelayMsgOuter::new(None, DropMsg::default().into());
537        let hopnum = Some(target_hop);
538
539        // TODO: the ccontrol state isn't actually needed here, because
540        // DROP cells don't count towards SENDME windows.
541        // Technically, we could avoid unnecessarily Arc::clone()ing the CC state
542        // here, and just extract the relay cell format.
543        // But for that we would need a specialized send_relay_cell_inner()-like function
544        // that doesn't take a CC object, or to make the CC object optional in
545        // send_relay_cell_inner().
546        let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
547
548        self.send_relay_cell_inner(hopnum, relay_cell_format, msg, false, &ccontrol, queue_info)
549            .await
550    }
551
552    /// Determine how exactly to handle a request to handle padding.
553    #[cfg(feature = "circ-padding")]
554    fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
555        crate::circuit::padding::padding_disposition(
556            send_padding,
557            &self.inbound_chan_tx,
558            self.padding_block.as_ref(),
559        )
560    }
561
562    /// Handle a circuit event.
563    async fn handle_event(
564        &mut self,
565        event: CircuitEvent<B::CircChanMsg>,
566    ) -> StdResult<(), ReactorError> {
567        use CircuitEvent::*;
568
569        match event {
570            Cell(cell) => self.handle_backward_cell(cell).await,
571            Send(msg) => {
572                let ReadyStreamMsg {
573                    hop,
574                    relay_cell_format,
575                    msg,
576                    ccontrol,
577                } = msg;
578
579                self.send_relay_cell(hop, relay_cell_format, msg, false, &ccontrol)
580                    .await?;
581
582                Ok(())
583            }
584            Forwarded(cmd) => self.handle_reactor_cmd(cmd).await,
585            ForwardShutdown => {
586                // The forward reactor has crashed, so we have to shut down.
587                trace!(
588                    circ_id = %self.unique_id,
589                    "Backward relay reactor shutdown (forward reactor has closed)",
590                );
591
592                Err(ReactorError::Shutdown)
593            }
594            ProtoViolation(err) => Err(err.into()),
595        }
596    }
597
598    /// Return the RelayCellFormat and CC state of a given hop.
599    fn hop_info(
600        &self,
601        hopnum: Option<HopNum>,
602    ) -> Result<(RelayCellFormat, Arc<Mutex<CongestionControl>>)> {
603        let hops = self.hops.read().expect("poisoned lock");
604        let hop = hops
605            .get(hopnum)
606            .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
607        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
608        let ccontrol = Arc::clone(&hop.ccontrol);
609
610        Ok((relay_cell_format, ccontrol))
611    }
612
613    /// Handle a command sent to us by the forward reactor.
614    async fn handle_reactor_cmd(&mut self, msg: BackwardReactorCmd) -> StdResult<(), ReactorError> {
615        use BackwardReactorCmd::*;
616
617        match msg {
618            SendRelayMsg { hop, msg } => {
619                self.send_relay_msg(hop, msg).await?;
620            }
621            HandleSendme { hop, sendme } => {
622                self.handle_sendme(hop, sendme).await?;
623                return Ok(());
624            }
625            #[cfg(feature = "relay")]
626            HandleCircuitExtended {
627                hop,
628                extended2,
629                outbound_chan_rx,
630            } => {
631                self.outbound_chan_rx = Some(outbound_chan_rx);
632                let msg = AnyRelayMsgOuter::new(None, extended2.into());
633                self.send_relay_msg(hop, msg).await?;
634
635                debug!(
636                    circ_id = %self.unique_id,
637                    "Extended circuit to the next hop"
638                );
639            }
640        }
641
642        Ok(())
643    }
644
645    /// Send a relay message to the specified hop.
646    async fn send_relay_msg(
647        &mut self,
648        hopnum: Option<HopNum>,
649        msg: AnyRelayMsgOuter,
650    ) -> StdResult<(), ReactorError> {
651        let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
652        let cmd = msg.cmd();
653
654        // TODO(relay): remove this log once we add some tests
655        // and confirm relaying cells works as expected
656        // (in practice it will be too noisy to be useful, even at trace level).
657        trace!(
658            circ_id = %self.unique_id,
659            hopnum=?hopnum,
660            cmd = %cmd,
661            "Sending backward cell"
662        );
663
664        self.send_relay_cell(hopnum, relay_cell_format, msg, false, &ccontrol)
665            .await?;
666
667        if cmd == RelayCmd::SENDME {
668            ccontrol.lock().expect("poisoned lock").note_sendme_sent();
669        }
670
671        Ok(())
672    }
673
674    /// Handle a circuit-level SENDME (stream ID = 0).
675    ///
676    /// Returns an error if the SENDME does not have an authentication tag
677    /// (versions of Tor <=0.3.5 omit the SENDME tag, but we don't support
678    /// those any longer).
679    ///
680    /// Any error returned from this function will shut down the reactor.
681    ///
682    // TODO(DEDUP): duplicates the logic from the client-side Circuit::handle_sendme()
683    async fn handle_sendme(
684        &mut self,
685        hopnum: Option<HopNum>,
686        sendme: Sendme,
687    ) -> StdResult<(), ReactorError> {
688        let tag = sendme
689            .into_sendme_tag()
690            .ok_or_else(|| Error::CircProto("missing tag on circuit sendme".into()))?;
691
692        // NOTE: it's okay to await. We are only awaiting on the congestion_signals
693        // future which *should* resolve immediately
694        let signals = self.inbound_chan_tx.congestion_signals().await;
695
696        let hops = self.hops.read().expect("poisoned lock");
697        let hop = hops
698            .get(hopnum)
699            .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
700
701        // Update the CC object that we received a SENDME along
702        // with possible congestion signals.
703        hop.ccontrol
704            .lock()
705            .expect("poisoned lock")
706            .note_sendme_received(&self.time_provider, tag, signals)?;
707
708        Ok(())
709    }
710
711    /// Encode `msg` and encrypt it, returning the resulting cell
712    /// and tag that should be expected for an authenticated SENDME sent
713    /// in response to that cell.
714    ///
715    // TODO(DEDUP): duplicates the logic from the client-side Circuit::encode_relay_cell()
716    fn encode_relay_cell(
717        &mut self,
718        relay_format: RelayCellFormat,
719        hop: Option<HopNum>,
720        early: bool,
721        msg: AnyRelayMsgOuter,
722    ) -> Result<(AnyChanMsg, SendmeTag)> {
723        let mut body: RelayCellBody = msg
724            .encode(relay_format, &mut rand::rng())
725            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
726            .into();
727        let cmd = if early {
728            ChanCmd::RELAY_EARLY
729        } else {
730            ChanCmd::RELAY
731        };
732
733        // Use the implementation-dependent encryption logic
734        let tag = self.inner.encrypt_relay_cell(cmd, &mut body, hop);
735        let msg = Relay::from(BoxedCellBody::from(body));
736        let msg = if early {
737            AnyChanMsg::RelayEarly(msg.into())
738        } else {
739            AnyChanMsg::Relay(msg)
740        };
741
742        Ok((msg, tag))
743    }
744
745    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
746    ///
747    /// If there is insufficient outgoing *circuit-level* or *stream-level*
748    /// SENDME window, an error is returned instead.
749    ///
750    /// Does not check whether the cell is well-formed or reasonable.
751    async fn send_relay_cell(
752        &mut self,
753        hop: Option<HopNum>,
754        relay_cell_format: RelayCellFormat,
755        msg: AnyRelayMsgOuter,
756        early: bool,
757        ccontrol: &Arc<Mutex<CongestionControl>>,
758    ) -> Result<()> {
759        self.send_relay_cell_inner(hop, relay_cell_format, msg, early, ccontrol, None)
760            .await
761    }
762
763    /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
764    /// [`QueuedCellPaddingInfo`] in `padding_info`.
765    ///
766    /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
767    /// padding controller.
768    ///
769    // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
770    async fn send_relay_cell_inner(
771        &mut self,
772        hop: Option<HopNum>,
773        relay_cell_format: RelayCellFormat,
774        msg: AnyRelayMsgOuter,
775        early: bool,
776        ccontrol: &Arc<Mutex<CongestionControl>>,
777        padding_info: Option<QueuedCellPaddingInfo>,
778    ) -> Result<()> {
779        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
780        let (msg, tag) = self.encode_relay_cell(relay_cell_format, hop, early, msg)?;
781        let cell = AnyChanCell::new(Some(self.circ_id), msg);
782
783        // TODO: we use HopNum(0) if we're a relay (i.e. if the hop is None).
784        // Is that ok?
785        let hop = hop.unwrap_or_else(|| HopNum::from(0));
786        // Remember that we've enqueued this cell.
787        let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
788
789        // Note: this future is always `Ready`, because we checked the sink for readiness
790        // before polling the async streams, so await won't block.
791        Pin::new(&mut self.inbound_chan_tx)
792            .send_unbounded((cell, padding_info))
793            .await?;
794
795        if c_t_w {
796            ccontrol
797                .lock()
798                .expect("poisoned lock")
799                .note_data_sent(&self.time_provider, &tag)?;
800        }
801
802        Ok(())
803    }
804
805    /// Handle a backward cell (moving from the exit towards the client).
806    async fn handle_backward_cell(&mut self, cell: B::CircChanMsg) -> StdResult<(), ReactorError> {
807        match self.inner.handle_backward_cell(self.unique_id, cell)? {
808            BackwardCellDisposition::Forward(cell) => {
809                let cell = AnyChanCell::new(Some(self.circ_id), cell);
810                self.inbound_chan_tx
811                    .send((cell, None))
812                    .await
813                    .map_err(ReactorError::Err)
814            }
815        }
816    }
817}
818
819impl<B: BackwardHandler> Drop for BackwardReactor<B> {
820    fn drop(&mut self) {
821        // This will send a DESTROY down the inbound Tor channel
822        let _ = self.channel.close_circuit(self.circ_id);
823    }
824}
825
826/// A circuit event that must be handled by the [`BackwardReactor`].
827enum CircuitEvent<M> {
828    /// We received a cell that needs to be handled.
829    ///
830    /// The cell is client-bound if we are a relay, or exit-bound if we are a client).
831    Cell(M),
832    /// We received a RELAY cell from the stream reactor that needs
833    /// to be packaged and written to our Tor channel.
834    ///
835    /// The message is client-bound if we are a relay, or exit-bound if we are a client).
836    Send(ReadyStreamMsg),
837    /// We received a cell from the ForwardReactor that we need to handle.
838    ///
839    /// This might be
840    ///
841    ///   * a circuit-level SENDME that we have received, or
842    ///   * a circuit-level SENDME that we need to deliver to the client
843    Forwarded(BackwardReactorCmd),
844    /// The forward reactor has shut down.
845    ///
846    /// We need to shut down too.
847    ForwardShutdown,
848    /// Protocol violation.
849    ///
850    /// This can happen if we receive a channel message that is not supported on the channel.
851    ProtoViolation(Error),
852}
853
854/// Instructions from the forward reactor.
855pub(crate) enum BackwardReactorCmd {
856    /// A circuit SENDME we received from the other endpoint.
857    HandleSendme {
858        /// The hop the SENDME came on.
859        hop: Option<HopNum>,
860        /// The SENDME.
861        sendme: Sendme,
862    },
863    /// A message we need to send back to the other endpoint.
864    SendRelayMsg {
865        /// The hop to encode the message for.
866        hop: Option<HopNum>,
867        /// The message to send.
868        msg: AnyRelayMsgOuter,
869    },
870    /// This relay circuit was extended by another hop.
871    ///
872    /// This causes the reactor send the `extended2` message on its inbound channel,
873    /// and start reading from `outbound_chan_rx` in the main loop.
874    //
875    ///
876    // TODO: I wish we didn't need to expose this relay-specific variant
877    // in the generic reactor but we have no choice: abstracting it away
878    // means either introducing a mutex between the relay-side forward/backward
879    // handlers, or yet another mpsc between them.
880    #[cfg(feature = "relay")]
881    HandleCircuitExtended {
882        /// The hop to encode the message for.
883        ///
884        /// In practice, this is always None, because only relays use this.
885        hop: Option<HopNum>,
886        /// The cell to send to the specified hop,
887        extended2: Extended2,
888        /// The reading end of the outbound Tor channel, if we are not the last hop.
889        ///
890        /// Yields cells moving from the exit towards the client, if we are a middle relay.
891        outbound_chan_rx: CircuitRxReceiver,
892    },
893}