tor_proto/relay/reactor/forward/extend_handler.rs
1//! Handler for EXTEND2 cells.
2
3use super::{CircEvent, ExtendResult, Outbound};
4
5use crate::Error;
6use crate::circuit::UniqId;
7use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
8use crate::peer::PeerInfo;
9use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
10use crate::relay::reactor::CircuitAccount;
11use crate::util::err::ReactorError;
12use tor_cell::chancell::AnyChanCell;
13use tor_cell::relaycell::UnparsedRelayMsg;
14use tor_cell::relaycell::msg::{Extend2, Extended2};
15use tor_error::{internal, into_internal, warn_report};
16use tor_linkspec::decode::Strictness;
17use tor_linkspec::{HasRelayIds, OwnedChanTarget, OwnedChanTargetBuilder};
18use tor_rtcompat::{Runtime, SpawnExt as _};
19
20use futures::channel::mpsc;
21use futures::{SinkExt as _, StreamExt as _};
22use tracing::{debug, trace};
23
24use std::result::Result as StdResult;
25use std::sync::Arc;
26
27/// Helper for handling EXTEND2 cells.
28pub(super) struct ExtendRequestHandler {
29 /// An identifier for logging about this handler.
30 unique_id: UniqId,
31 /// Whether we have received an EXTEND2 on this circuit.
32 ///
33 // TODO(relay): bools can be finicky.
34 // Maybe we should combine this bool and the optional
35 // outbound into a new state machine type
36 // (with states Initial -> Extending -> Extended(Outbound))?
37 // But should not do this if it turns out more convoluted than the bool-based approach.
38 have_seen_extend2: bool,
39 /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
40 ///
41 /// Note: all circuit reactors of a relay need to be initialized
42 /// with the *same* underlying Tor channel provider (`ChanMgr`),
43 /// to enable the reuse of existing Tor channels where possible.
44 chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
45 /// The identity of the inbound relay (the previous hop).
46 inbound_peer: Arc<PeerInfo>,
47 /// A stream of events to be read from the main loop of the reactor.
48 event_tx: mpsc::Sender<CircEvent>,
49 /// Memory quota account
50 memquota: CircuitAccount,
51}
52
53impl ExtendRequestHandler {
54 /// Create a new [`ExtendRequestHandler`].
55 pub(super) fn new(
56 unique_id: UniqId,
57 chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
58 inbound_peer: Arc<PeerInfo>,
59 event_tx: mpsc::Sender<CircEvent>,
60 memquota: CircuitAccount,
61 ) -> Self {
62 Self {
63 unique_id,
64 have_seen_extend2: false,
65 chan_provider,
66 inbound_peer,
67 event_tx,
68 memquota,
69 }
70 }
71
72 /// Handle an EXTEND2 cell.
73 ///
74 /// This spawns a background task for dealing with the circuit extension,
75 /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
76 /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
77 /// and each `CircEvent` is passed back to [`Forward`](super::Forward)'s
78 /// [`ForwardHandler::handle_event`](crate::circuit::reactor::forward::ForwardHandler::handle_event)
79 /// implementation for handling.
80 pub(super) fn handle_extend2<R: Runtime>(
81 &mut self,
82 runtime: &R,
83 early: bool,
84 msg: UnparsedRelayMsg,
85 ) -> StdResult<(), ReactorError> {
86 // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
87 // param is set (arti#2349)
88 if !early {
89 return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
90 }
91
92 // Check if we're in the right state before parsing the EXTEND2
93 if self.have_seen_extend2 {
94 return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
95 }
96
97 self.have_seen_extend2 = true;
98
99 let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");
100
101 let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();
102
103 let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
104 Strictness::Standard,
105 extend2.linkspecs(),
106 )
107 .map_err(|err| Error::LinkspecDecodeErr {
108 object: "EXTEND2",
109 err,
110 })?
111 .build()
112 .map_err(|_| {
113 // TODO: should we include the error in the circ proto error context?
114 Error::CircProto("Invalid channel target".into())
115 })?;
116
117 if chan_target.has_any_relay_id_from(&*self.inbound_peer) {
118 return Err(Error::CircProto("Cannot extend circuit to previous hop".into()).into());
119 }
120
121 // Note: we don't do any further validation on the EXTEND2 here,
122 // under the assumption it will be handled by the ChannelProvider.
123
124 let (chan_tx, chan_rx) = mpsc::unbounded();
125
126 let chan_tx = OutboundChanSender(chan_tx);
127 Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;
128
129 let mut result_tx = self.event_tx.clone();
130 let rt = runtime.clone();
131 let unique_id = self.unique_id;
132 let memquota = self.memquota.clone();
133
134 // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
135 // we don't really need the channel provider to send us the outcome via an MPSC channel,
136 // because get_or_launch() could simply be async (it wouldn't block the reactor,
137 // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
138 runtime
139 .spawn(async move {
140 let res = Self::extend_circuit(rt, unique_id, extend2, chan_rx, memquota).await;
141
142 // Discard the error if the reactor shut down before we had
143 // a chance to complete the extend handshake
144 let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
145 })
146 .map_err(into_internal!("failed to spawn extend task?!"))?;
147
148 Ok(())
149 }
150
151 /// Extend this circuit on the channel received on `chan_rx`.
152 ///
153 /// Note: this gets spawned in a background task from
154 /// [`Self::handle_extend2`] so as not to block the reactor main loop.
155 async fn extend_circuit<R: Runtime>(
156 _runtime: R,
157 unique_id: UniqId,
158 extend2: Extend2,
159 mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
160 memquota: CircuitAccount,
161 ) -> StdResult<ExtendResult, ReactorError> {
162 // We expect the channel build timeout to be enforced by the ChannelProvider
163 let chan_res = chan_rx
164 .next()
165 .await
166 .ok_or_else(|| internal!("channel provider task exited"))?;
167
168 let channel = match chan_res {
169 Ok(c) => c,
170 Err(e) => {
171 warn_report!(e, "Failed to launch outgoing channel");
172 // Note: retries are handled within
173 // get_or_launch(), so if we receive an
174 // error at this point, we need to bail
175 return Err(ReactorError::Shutdown);
176 }
177 };
178
179 debug!(
180 circ_id = %unique_id,
181 "Launched channel to the next hop"
182 );
183
184 // Now that we finally have a forward Tor channel,
185 // it's time to forward the onion skin and extend the circuit...
186 //
187 // Note: the only reason we need to await here is because internally
188 // new_outbound_circ() sends a control message to the channel reactor handles,
189 // which is handled asynchronously. In practice, we're not actually waiting on
190 // the network here, so in theory we shouldn't need a timeout for this operation.
191 let (circ_id, outbound_chan_rx, createdreceiver) =
192 channel.new_outbound_circ(memquota).await?;
193
194 // We have allocated a circuit in the channel's circmap,
195 // now it's time to send the CREATE2 and wait for the response.
196 let create2_wrap = Create2Wrap {
197 handshake_type: extend2.handshake_type(),
198 };
199 let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());
200
201 // Time to write the CREATE2 to the outbound channel...
202 let mut outbound_chan_tx = channel.sender();
203 let cell = AnyChanCell::new(Some(circ_id), create2);
204
205 trace!(
206 circ_id = %unique_id,
207 "Sending CREATE2 to the next hop"
208 );
209
210 outbound_chan_tx.send((cell, None)).await?;
211
212 // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
213 // for the CREATED2 to arrive.
214 //
215 // There is some complexity here, see
216 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
217 let response = createdreceiver
218 .await
219 .map_err(|_| internal!("channel disappeared?"))?;
220
221 trace!(
222 circ_id = %unique_id,
223 "Got CREATED2 response from next hop"
224 );
225
226 let outbound = Outbound {
227 circ_id,
228 channel: Arc::clone(&channel),
229 outbound_chan_tx,
230 };
231
232 // If we reach this point, it means we have extended
233 // the circuit by one hop, so we need to take the contents
234 // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
235 // to send back to the client.
236 let created2_body = create2_wrap.decode_chanmsg(response)?;
237 let extended2 = Extended2::new(created2_body);
238
239 Ok(ExtendResult {
240 extended2,
241 outbound,
242 outbound_chan_rx,
243 })
244 }
245}