Skip to main content

tor_proto/relay/reactor/
forward.rs

1//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2
3mod extend_handler;
4
5use extend_handler::ExtendRequestHandler;
6
7use crate::channel::{Channel, ChannelSender};
8use crate::circuit::CircuitRxReceiver;
9use crate::circuit::UniqId;
10use crate::circuit::reactor::ControlHandler;
11use crate::circuit::reactor::backward::BackwardReactorCmd;
12use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
13use crate::circuit::reactor::hop_mgr::HopMgr;
14use crate::crypto::cell::OutboundRelayLayer;
15use crate::crypto::cell::RelayCellBody;
16use crate::relay::RelayCircChanMsg;
17use crate::util::err::ReactorError;
18use crate::{Error, HopNum, Result};
19
20// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21use crate::client::circuit::padding::QueuedCellPaddingInfo;
22
23use crate::relay::channel_provider::ChannelProvider;
24use crate::relay::reactor::CircuitAccount;
25use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
26use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
27use tor_cell::relaycell::msg::{Extended2, SendmeTag};
28use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
29use tor_error::internal;
30use tor_linkspec::OwnedChanTarget;
31use tor_rtcompat::Runtime;
32
33use futures::channel::mpsc;
34use futures::{SinkExt as _, future};
35use tracing::trace;
36
37use std::result::Result as StdResult;
38use std::sync::Arc;
39use std::task::Poll;
40
41/// Placeholder for our custom control message type.
42type CtrlMsg = ();
43
44/// Placeholder for our custom control command type.
45type CtrlCmd = ();
46
47/// The maximum number of RELAY_EARLY cells allowed on a circuit.
48///
49// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
50const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
51
52/// Relay-specific state for the forward reactor.
53pub(crate) struct Forward {
54    /// An identifier for logging about this reactor's circuit.
55    unique_id: UniqId,
56    /// The outbound view of this circuit, if we are not the last hop.
57    ///
58    /// Delivers cells towards the exit.
59    ///
60    /// Only set for middle relays.
61    outbound: Option<Outbound>,
62    /// The cryptographic state for this circuit for inbound cells.
63    crypto_out: Box<dyn OutboundRelayLayer + Send>,
64    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
65    ///
66    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
67    relay_early_count: usize,
68    /// Helper for handling circuit extension requests.
69    ///
70    /// Used for validating EXTEND2 cells.
71    extend_handler: ExtendRequestHandler,
72}
73
74/// A type of event issued by the relay forward reactor.
75pub(crate) enum CircEvent {
76    /// The outcome of an EXTEND2 request.
77    ExtendResult(StdResult<ExtendResult, ReactorError>),
78}
79
80/// A successful circuit extension result.
81pub(crate) struct ExtendResult {
82    /// The EXTENDED2 cell to send back to the client.
83    extended2: Extended2,
84    /// The outbound channel.
85    outbound: Outbound,
86    /// The reading end of the outbound Tor channel, if we are not the last hop.
87    ///
88    /// Yields cells moving from the exit towards the client, if we are a middle relay.
89    outbound_chan_rx: CircuitRxReceiver,
90}
91
92/// The outbound view of a relay circuit.
93struct Outbound {
94    /// The circuit identifier on the outbound Tor channel.
95    circ_id: CircId,
96    /// The outbound Tor channel.
97    channel: Arc<Channel>,
98    /// The sending end of the outbound Tor channel.
99    outbound_chan_tx: ChannelSender,
100}
101
102/// The outcome of `decode_relay_cell`.
103enum CellDecodeResult {
104    /// A decrypted cell.
105    Recognized(SendmeTag, RelayCellDecoderResult),
106    /// A cell we could not decrypt.
107    Unrecognizd(RelayCellBody),
108}
109
110impl Forward {
111    /// Create a new [`Forward`].
112    pub(crate) fn new(
113        inbound_chan: &Arc<Channel>,
114        unique_id: UniqId,
115        crypto_out: Box<dyn OutboundRelayLayer + Send>,
116        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
117        event_tx: mpsc::Sender<CircEvent>,
118        memquota: CircuitAccount,
119    ) -> Self {
120        let inbound_peer = Arc::clone(inbound_chan.peer_info());
121        let extend_handler =
122            ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
123
124        Self {
125            unique_id,
126            // Initially, we are the last hop in the circuit.
127            outbound: None,
128            crypto_out,
129            relay_early_count: 0,
130            extend_handler,
131        }
132    }
133
134    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
135    fn decode_relay_cell<R: Runtime>(
136        &mut self,
137        hop_mgr: &mut HopMgr<R>,
138        cell: Relay,
139    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
140        // Note: the client reactor will return the actual source hopnum
141        let hopnum = None;
142        let cmd = cell.cmd();
143        let mut body = cell.into_relay_body().into();
144        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
145            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
146        };
147
148        // The message is addressed to us! Now it's time to handle it...
149        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
150        let decode_res = hops
151            .get_mut(hopnum)
152            .ok_or_else(|| internal!("msg from non-existent hop???"))?
153            .inbound
154            .decode(body.into())?;
155
156        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
157    }
158
159    /// Handle a DROP message.
160    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
161    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
162        cfg_if::cfg_if! {
163            if #[cfg(feature = "circ-padding")] {
164                Err(internal!("relay circuit padding not yet supported").into())
165            } else {
166                Ok(())
167            }
168        }
169    }
170
171    /// Handle the outcome of handling an EXTEND2.
172    fn handle_extend_result(
173        &mut self,
174        res: StdResult<ExtendResult, ReactorError>,
175    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
176        let ExtendResult {
177            extended2,
178            outbound,
179            outbound_chan_rx,
180        } = res?;
181
182        self.outbound = Some(outbound);
183
184        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
185            hop: None,
186            extended2,
187            outbound_chan_rx,
188        }))
189    }
190
191    /// Handle a RELAY or RELAY_EARLY cell.
192    fn handle_relay_cell<R: Runtime>(
193        &mut self,
194        hop_mgr: &mut HopMgr<R>,
195        cell: Relay,
196        early: bool,
197    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
198        if early {
199            self.relay_early_count += 1;
200
201            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
202                return Err(
203                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
204                );
205            }
206        }
207
208        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
209        let (tag, decode_res) = match res {
210            CellDecodeResult::Unrecognizd(body) => {
211                self.handle_unrecognized_cell(body, None, early)?;
212                return Ok(None);
213            }
214            CellDecodeResult::Recognized(tag, res) => (tag, res),
215        };
216
217        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
218            cell: decode_res,
219            early,
220            hopnum,
221            tag,
222        }))
223    }
224
225    /// Handle a forward cell that we could not decrypt.
226    fn handle_unrecognized_cell(
227        &mut self,
228        body: RelayCellBody,
229        info: Option<QueuedCellPaddingInfo>,
230        early: bool,
231    ) -> StdResult<(), ReactorError> {
232        // TODO(relay): remove this log once we add some tests
233        // and confirm relaying cells works as expected
234        // (in practice it will be too noisy to be useful, even at trace level).
235        trace!(
236            circ_id = %self.unique_id,
237            "Forwarding unrecognized cell"
238        );
239
240        let Some(chan) = self.outbound.as_mut() else {
241            // The client shouldn't try to send us any cells before it gets
242            // an EXTENDED2 cell from us
243            return Err(Error::CircProto(
244                "Asked to forward cell before the circuit was extended?!".into(),
245            )
246            .into());
247        };
248
249        let msg = Relay::from(BoxedCellBody::from(body));
250        let relay = if early {
251            AnyChanMsg::RelayEarly(msg.into())
252        } else {
253            AnyChanMsg::Relay(msg)
254        };
255        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
256
257        // Note: this future is always `Ready`, because we checked the sink for readiness
258        // before polling the input channel, so await won't block.
259        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
260
261        Ok(())
262    }
263
264    /// Handle a TRUNCATE cell.
265    #[allow(clippy::unused_async)] // TODO(relay)
266    async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
267        // TODO(relay): when we implement this, we should try to do better than C Tor:
268        // if we have some cells queued for the next hop in the circuit,
269        // we should try to flush them *before* tearing it down.
270        //
271        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3487#note_3296035
272        Err(internal!("TRUNCATE is not implemented").into())
273    }
274
275    /// Handle a DESTROY cell originating from the client.
276    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
277    fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
278        Err(internal!("DESTROY is not implemented").into())
279    }
280
281    /// Handle a PADDING_NEGOTIATE cell originating from the client.
282    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
283    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
284        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
285    }
286}
287
288impl ForwardHandler for Forward {
289    type BuildSpec = OwnedChanTarget;
290    type CircChanMsg = RelayCircChanMsg;
291    type CircEvent = CircEvent;
292
293    async fn handle_meta_msg<R: Runtime>(
294        &mut self,
295        runtime: &R,
296        early: bool,
297        _hopnum: Option<HopNum>,
298        msg: UnparsedRelayMsg,
299        _relay_cell_format: RelayCellFormat,
300    ) -> StdResult<(), ReactorError> {
301        match msg.cmd() {
302            RelayCmd::DROP => self.handle_drop(),
303            RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
304            RelayCmd::TRUNCATE => self.handle_truncate().await,
305            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
306        }
307    }
308
309    async fn handle_forward_cell<R: Runtime>(
310        &mut self,
311        hop_mgr: &mut HopMgr<R>,
312        cell: RelayCircChanMsg,
313    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
314        use RelayCircChanMsg::*;
315
316        match cell {
317            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
318            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
319            Destroy(d) => {
320                self.handle_destroy_cell(d)?;
321                Ok(None)
322            }
323            PaddingNegotiate(p) => {
324                self.handle_padding_negotiate(p)?;
325                Ok(None)
326            }
327        }
328    }
329
330    fn handle_event(
331        &mut self,
332        event: Self::CircEvent,
333    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
334        match event {
335            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
336        }
337    }
338
339    async fn outbound_chan_ready(&mut self) -> Result<()> {
340        future::poll_fn(|cx| match &mut self.outbound {
341            Some(chan) => {
342                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
343
344                chan.outbound_chan_tx.poll_ready_unpin(cx)
345            }
346            None => {
347                // Pedantically, if the channel doesn't exist, it can't be ready,
348                // but we have no choice here than to return Ready
349                // (returning Pending would cause the reactor to lock up).
350                //
351                // Returning ready here means the base reactor is allowed to read
352                // from its inbound channel. This is OK, because if we *do*
353                // read a cell from that channel and find ourselves needing to
354                // forward it to the next hop, we simply return a proto violation error,
355                // shutting down the reactor.
356                Poll::Ready(Ok(()))
357            }
358        })
359        .await
360    }
361}
362
363impl ControlHandler for Forward {
364    type CtrlMsg = CtrlMsg;
365    type CtrlCmd = CtrlCmd;
366
367    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
368        let () = cmd;
369        Ok(())
370    }
371
372    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
373        let () = msg;
374        Ok(())
375    }
376}
377
378impl Drop for Forward {
379    fn drop(&mut self) {
380        if let Some(outbound) = self.outbound.as_mut() {
381            // This will send a DESTROY down the outbound channel
382            let _ = outbound.channel.close_circuit(outbound.circ_id);
383        }
384    }
385}