Skip to main content

tor_proto/relay/reactor/forward/
extend_handler.rs

1//! Handler for EXTEND2 cells.
2
3use super::{CircEvent, ExtendResult, Outbound};
4
5use crate::Error;
6use crate::circuit::UniqId;
7use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
8use crate::peer::PeerInfo;
9use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
10use crate::relay::reactor::CircuitAccount;
11use crate::util::err::ReactorError;
12use tor_cell::chancell::AnyChanCell;
13use tor_cell::relaycell::UnparsedRelayMsg;
14use tor_cell::relaycell::msg::{Extend2, Extended2};
15use tor_error::{internal, into_internal, warn_report};
16use tor_linkspec::decode::Strictness;
17use tor_linkspec::{HasRelayIds, OwnedChanTarget, OwnedChanTargetBuilder};
18use tor_rtcompat::{Runtime, SpawnExt as _};
19
20use futures::channel::mpsc;
21use futures::{SinkExt as _, StreamExt as _};
22use tracing::{debug, trace};
23
24use std::result::Result as StdResult;
25use std::sync::Arc;
26
27/// Helper for handling EXTEND2 cells.
28pub(super) struct ExtendRequestHandler {
29    /// An identifier for logging about this handler.
30    unique_id: UniqId,
31    /// Whether we have received an EXTEND2 on this circuit.
32    ///
33    // TODO(relay): bools can be finicky.
34    // Maybe we should combine this bool and the optional
35    // outbound into a new state machine type
36    // (with states Initial -> Extending -> Extended(Outbound))?
37    // But should not do this if it turns out more convoluted than the bool-based approach.
38    have_seen_extend2: bool,
39    /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
40    ///
41    /// Note: all circuit reactors of a relay need to be initialized
42    /// with the *same* underlying Tor channel provider (`ChanMgr`),
43    /// to enable the reuse of existing Tor channels where possible.
44    chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
45    /// The identity of the inbound relay (the previous hop).
46    inbound_peer: Arc<PeerInfo>,
47    /// A stream of events to be read from the main loop of the reactor.
48    event_tx: mpsc::Sender<CircEvent>,
49    /// Memory quota account
50    memquota: CircuitAccount,
51}
52
53impl ExtendRequestHandler {
54    /// Create a new [`ExtendRequestHandler`].
55    pub(super) fn new(
56        unique_id: UniqId,
57        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
58        inbound_peer: Arc<PeerInfo>,
59        event_tx: mpsc::Sender<CircEvent>,
60        memquota: CircuitAccount,
61    ) -> Self {
62        Self {
63            unique_id,
64            have_seen_extend2: false,
65            chan_provider,
66            inbound_peer,
67            event_tx,
68            memquota,
69        }
70    }
71
72    /// Handle an EXTEND2 cell.
73    ///
74    /// This spawns a background task for dealing with the circuit extension,
75    /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
76    /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
77    /// and each `CircEvent` is passed back to [`Forward`](super::Forward)'s
78    /// [`ForwardHandler::handle_event`](crate::circuit::reactor::forward::ForwardHandler::handle_event)
79    /// implementation for handling.
80    pub(super) fn handle_extend2<R: Runtime>(
81        &mut self,
82        runtime: &R,
83        early: bool,
84        msg: UnparsedRelayMsg,
85    ) -> StdResult<(), ReactorError> {
86        // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
87        // param is set (arti#2349)
88        if !early {
89            return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
90        }
91
92        // Check if we're in the right state before parsing the EXTEND2
93        if self.have_seen_extend2 {
94            return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
95        }
96
97        self.have_seen_extend2 = true;
98
99        let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");
100
101        let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();
102
103        let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
104            Strictness::Standard,
105            extend2.linkspecs(),
106        )
107        .map_err(|err| Error::LinkspecDecodeErr {
108            object: "EXTEND2",
109            err,
110        })?
111        .build()
112        .map_err(|_| {
113            // TODO: should we include the error in the circ proto error context?
114            Error::CircProto("Invalid channel target".into())
115        })?;
116
117        if chan_target.has_any_relay_id_from(&*self.inbound_peer) {
118            return Err(Error::CircProto("Cannot extend circuit to previous hop".into()).into());
119        }
120
121        // Note: we don't do any further validation on the EXTEND2 here,
122        // under the assumption it will be handled by the ChannelProvider.
123
124        let (chan_tx, chan_rx) = mpsc::unbounded();
125
126        let chan_tx = OutboundChanSender(chan_tx);
127        Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;
128
129        let mut result_tx = self.event_tx.clone();
130        let rt = runtime.clone();
131        let unique_id = self.unique_id;
132        let memquota = self.memquota.clone();
133
134        // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
135        // we don't really need the channel provider to send us the outcome via an MPSC channel,
136        // because get_or_launch() could simply be async (it wouldn't block the reactor,
137        // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
138        runtime
139            .spawn(async move {
140                let res = Self::extend_circuit(rt, unique_id, extend2, chan_rx, memquota).await;
141
142                // Discard the error if the reactor shut down before we had
143                // a chance to complete the extend handshake
144                let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
145            })
146            .map_err(into_internal!("failed to spawn extend task?!"))?;
147
148        Ok(())
149    }
150
151    /// Extend this circuit on the channel received on `chan_rx`.
152    ///
153    /// Note: this gets spawned in a background task from
154    /// [`Self::handle_extend2`] so as not to block the reactor main loop.
155    async fn extend_circuit<R: Runtime>(
156        _runtime: R,
157        unique_id: UniqId,
158        extend2: Extend2,
159        mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
160        memquota: CircuitAccount,
161    ) -> StdResult<ExtendResult, ReactorError> {
162        // We expect the channel build timeout to be enforced by the ChannelProvider
163        let chan_res = chan_rx
164            .next()
165            .await
166            .ok_or_else(|| internal!("channel provider task exited"))?;
167
168        let channel = match chan_res {
169            Ok(c) => c,
170            Err(e) => {
171                warn_report!(e, "Failed to launch outgoing channel");
172                // Note: retries are handled within
173                // get_or_launch(), so if we receive an
174                // error at this point, we need to bail
175                return Err(ReactorError::Shutdown);
176            }
177        };
178
179        debug!(
180            circ_id = %unique_id,
181            "Launched channel to the next hop"
182        );
183
184        // Now that we finally have a forward Tor channel,
185        // it's time to forward the onion skin and extend the circuit...
186        //
187        // Note: the only reason we need to await here is because internally
188        // new_outbound_circ() sends a control message to the channel reactor handles,
189        // which is handled asynchronously. In practice, we're not actually waiting on
190        // the network here, so in theory we shouldn't need a timeout for this operation.
191        let (circ_id, outbound_chan_rx, createdreceiver) =
192            channel.new_outbound_circ(memquota).await?;
193
194        // We have allocated a circuit in the channel's circmap,
195        // now it's time to send the CREATE2 and wait for the response.
196        let create2_wrap = Create2Wrap {
197            handshake_type: extend2.handshake_type(),
198        };
199        let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());
200
201        // Time to write the CREATE2 to the outbound channel...
202        let mut outbound_chan_tx = channel.sender();
203        let cell = AnyChanCell::new(Some(circ_id), create2);
204
205        trace!(
206            circ_id = %unique_id,
207            "Sending CREATE2 to the next hop"
208        );
209
210        outbound_chan_tx.send((cell, None)).await?;
211
212        // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
213        // for the CREATED2 to arrive.
214        //
215        // There is some complexity here, see
216        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
217        let response = createdreceiver
218            .await
219            .map_err(|_| internal!("channel disappeared?"))?;
220
221        trace!(
222            circ_id = %unique_id,
223            "Got CREATED2 response from next hop"
224        );
225
226        let outbound = Outbound {
227            circ_id,
228            channel: Arc::clone(&channel),
229            outbound_chan_tx,
230        };
231
232        // If we reach this point, it means we have extended
233        // the circuit by one hop, so we need to take the contents
234        // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
235        // to send back to the client.
236        let created2_body = create2_wrap.decode_chanmsg(response)?;
237        let extended2 = Extended2::new(created2_body);
238
239        Ok(ExtendResult {
240            extended2,
241            outbound,
242            outbound_chan_rx,
243        })
244    }
245}