Skip to main content

tor_proto/relay/channel/
create_handler.rs

1//! Handler for CREATE* cells.
2
3use crate::FlowCtrlParameters;
4use crate::ccparams::{
5    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7};
8use crate::channel::Channel;
9use crate::circuit::CircuitRxSender;
10use crate::circuit::UniqId;
11use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13use crate::client::circuit::CircParameters;
14use crate::client::circuit::padding::PaddingController;
15use crate::crypto::cell::CryptInit as _;
16use crate::crypto::cell::RelayLayer as _;
17use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, tor1};
18use crate::crypto::handshake::RelayHandshakeError;
19use crate::crypto::handshake::ServerHandshake as _;
20use crate::crypto::handshake::fast::CreateFastServer;
21use crate::memquota::SpecificAccount as _;
22use crate::memquota::{ChannelAccount, CircuitAccount};
23use crate::relay::RelayCirc;
24use crate::relay::channel_provider::ChannelProvider;
25use crate::relay::reactor::Reactor;
26use std::sync::{Arc, RwLock, Weak};
27use tor_cell::chancell::ChanMsg as _;
28use tor_cell::chancell::CircId;
29use tor_cell::chancell::msg::{CreateFast, CreatedFast, Destroy, DestroyReason};
30use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
31use tor_linkspec::OwnedChanTarget;
32use tor_llcrypto::cipher::aes::Aes128Ctr;
33use tor_llcrypto::d::Sha1;
34use tor_llcrypto::pk::ed25519::Ed25519Identity;
35use tor_llcrypto::pk::rsa::RsaIdentity;
36use tor_memquota::mq_queue::ChannelSpec as _;
37use tor_memquota::mq_queue::MpscSpec;
38use tor_relay_crypto::pk::RelayNtorKeys;
39use tor_rtcompat::SpawnExt as _;
40use tor_rtcompat::{DynTimeProvider, Runtime};
41use tracing::warn;
42
43/// Everything needed to handle CREATE* messages on channels.
44#[derive(derive_more::Debug)]
45pub struct CreateRequestHandler {
46    /// Something that can launch channels. Typically the `ChanMgr`.
47    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
48    /// Circuit-related network parameters.
49    circ_net_params: RwLock<CircNetParameters>,
50    /// The circuit extension keys.
51    #[debug(skip)]
52    ntor_keys: RwLock<RelayNtorKeys>,
53}
54
55impl CreateRequestHandler {
56    /// Build a new [`CreateRequestHandler`].
57    pub fn new(
58        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
59        circ_net_params: CircNetParameters,
60        ntor_keys: RelayNtorKeys,
61    ) -> Self {
62        Self {
63            chan_provider,
64            circ_net_params: RwLock::new(circ_net_params),
65            ntor_keys: RwLock::new(ntor_keys),
66        }
67    }
68
69    /// Update the circuit parameters from a network consensus.
70    pub fn update_params(&self, circ_net_params: CircNetParameters) {
71        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
72    }
73
74    /// Update the handler with a new set of circuit extension keys.
75    ///
76    /// This is called periodically by the relay key rotation task.
77    pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
78        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
79    }
80
81    /// Handle a CREATE* cell.
82    ///
83    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
84    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
85    /// channel may cause us to leak information about paths of circuits travelling through this
86    /// relay. This is especially important here since we're handling data that is controllable from
87    /// the other end of the circuit.
88    #[allow(clippy::too_many_arguments)]
89    pub(crate) fn handle_create<R: Runtime>(
90        &self,
91        runtime: &R,
92        channel: &Arc<Channel>,
93        our_ed25519_id: &Ed25519Identity,
94        our_rsa_id: &RsaIdentity,
95        circ_id: CircId,
96        msg: &CreateRequest,
97        memquota: &ChannelAccount,
98        circ_unique_id: UniqId,
99    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
100        let result = self.handle_create_inner(
101            runtime,
102            channel,
103            our_ed25519_id,
104            our_rsa_id,
105            circ_id,
106            msg,
107            memquota,
108            circ_unique_id,
109        );
110
111        match result {
112            Ok(x) => Ok(x),
113            Err(e) => {
114                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
115                let cmd = msg.cmd();
116                debug_report!(&e, %cmd, "Failed to handle circuit create request");
117                Err(Destroy::new(e.destroy_reason()))
118            }
119        }
120    }
121
122    /// See [`Self::handle_create`].
123    #[allow(clippy::too_many_arguments)]
124    fn handle_create_inner<R: Runtime>(
125        &self,
126        runtime: &R,
127        channel: &Arc<Channel>,
128        // TODO(relay): Use these for ntor handshakes.
129        _our_ed25519_id: &Ed25519Identity,
130        _our_rsa_id: &RsaIdentity,
131        circ_id: CircId,
132        msg: &CreateRequest,
133        memquota: &ChannelAccount,
134        circ_unique_id: UniqId,
135    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
136        // Perform the handshake crypto and build the response.
137        let handshake_components = match msg {
138            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
139            CreateRequest::Create2(_) => {
140                // TODO(relay): We might want to offload this to a CPU worker in the future.
141                // TODO(relay): Implement this.
142                return Err(internal!("Not implemented").into());
143            }
144        };
145
146        let memquota = CircuitAccount::new(memquota)?;
147
148        // We use a large mpsc queue here since a circuit should never block the channel,
149        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
150        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
151        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
152        // a bounded queue.
153        let time_provider = DynTimeProvider::new(runtime.clone());
154        let account = memquota.as_raw_account();
155        let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
156
157        // TODO(relay): Do we really want a client padding machine here?
158        let (padding_ctrl, padding_stream) =
159            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
160
161        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
162        let Some(chan_provider) = self.chan_provider.upgrade() else {
163            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
164        };
165
166        // Build the relay circuit reactor.
167        let (reactor, circ) = Reactor::new(
168            runtime.clone(),
169            channel,
170            circ_id,
171            circ_unique_id,
172            receiver,
173            handshake_components.crypto_in,
174            handshake_components.crypto_out,
175            &handshake_components.hop_settings,
176            chan_provider,
177            padding_ctrl.clone(),
178            padding_stream,
179            &memquota,
180        )
181        .map_err(into_internal!("Failed to start circuit reactor"))?;
182
183        // Start the reactor in a task.
184        let () = runtime.spawn(async {
185            match reactor.run().await {
186                Ok(()) => {}
187                Err(e) => {
188                    debug_report!(e, "Relay circuit reactor exited with an error");
189                }
190            }
191        })?;
192
193        Ok((
194            handshake_components.response,
195            RelayCircComponents {
196                circ,
197                sender,
198                padding_ctrl,
199            },
200        ))
201    }
202
203    /// The handshake code for a CREATE_FAST request.
204    fn handle_create_fast(
205        &self,
206        msg: &CreateFast,
207    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
208        // TODO(relay): We might want to offload this to a CPU worker in the future.
209        let (keygen, handshake_msg) = CreateFastServer::server(
210            &mut rand::rng(),
211            &mut |_: &()| Some(()),
212            &[()],
213            msg.handshake(),
214        )?;
215
216        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
217            .map_err(into_internal!("Circuit crypt state construction failed"))?;
218
219        let circ_params = self
220            .circ_net_params
221            .read()
222            .expect("rwlock poisoned")
223            // CREATE_FAST always uses fixed-window flow control.
224            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
225
226        // TODO(relay): I think we might want to get these from the consensus instead?
227        let protos = tor_protover::Protocols::default();
228
229        // TODO(relay): I'm not sure if this is the right way to do this. It works for
230        // CREATE_FAST, but we might want to rethink it for CREATE2.
231        let hop_settings =
232            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
233                .map_err(into_internal!("Unable to build `HopSettings`"))?;
234
235        let response = CreatedFast::new(handshake_msg);
236        let response = CreateResponse::CreatedFast(response);
237
238        let (crypto_out, crypto_in, _binding) = crypt.split_relay_layer();
239        let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
240
241        Ok(CompletedHandshakeComponents {
242            response,
243            hop_settings,
244            crypto_out,
245            crypto_in,
246        })
247    }
248}
249
250/// An error that occurred while handling a CREATE* request.
251#[derive(Debug, thiserror::Error)]
252enum HandleCreateError {
253    /// Circuit relay handshake failed.
254    #[error("Circuit relay handshake failed")]
255    Handshake(#[from] RelayHandshakeError),
256    /// A memquota error.
257    #[error("Memquota error")]
258    Memquota(#[from] tor_memquota::Error),
259    /// Error when spawning a task.
260    #[error("Runtime task spawn error")]
261    Spawn(#[from] futures::task::SpawnError),
262    /// An internal error.
263    ///
264    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
265    /// may themselves contain internal errors.
266    #[error("Internal error")]
267    Internal(#[from] tor_error::Bug),
268}
269
270impl HandleCreateError {
271    /// The reason to use in a DESTROY message for this failure.
272    fn destroy_reason(&self) -> DestroyReason {
273        // Note that this may return an INTERNAL destroy reason even when
274        // the inner error is not `ErrorKind::Internal`.
275        match self {
276            Self::Handshake(e) => e.destroy_reason(),
277            Self::Memquota(_) => DestroyReason::INTERNAL,
278            Self::Spawn(_) => DestroyReason::INTERNAL,
279            Self::Internal(_) => DestroyReason::INTERNAL,
280        }
281    }
282}
283
284impl HasKind for HandleCreateError {
285    fn kind(&self) -> ErrorKind {
286        match self {
287            Self::Handshake(e) => e.kind(),
288            Self::Memquota(e) => e.kind(),
289            Self::Spawn(e) => e.kind(),
290            Self::Internal(_) => ErrorKind::Internal,
291        }
292    }
293}
294
295/// The components of a completed CREATE* handshake.
296struct CompletedHandshakeComponents {
297    /// The message to send in response.
298    response: CreateResponse,
299    /// The negotiated hop settings.
300    hop_settings: HopSettings,
301    /// Outbound onion crypto.
302    crypto_out: Box<dyn OutboundRelayLayer + Send>,
303    /// Inbound onion crypto.
304    crypto_in: Box<dyn InboundRelayLayer + Send>,
305}
306
307/// A collection of objects built for a new relay circuit.
308pub(crate) struct RelayCircComponents {
309    /// The relay circuit handle.
310    pub(crate) circ: Arc<RelayCirc>,
311    /// Used to send data from the channel to the circuit reactor.
312    pub(crate) sender: CircuitRxSender,
313    /// The circuit's padding controller.
314    pub(crate) padding_ctrl: PaddingController,
315}
316
317/// Congestion control network parameters.
318#[derive(Debug, Clone)]
319#[allow(clippy::exhaustive_structs)]
320pub struct CongestionControlNetParams {
321    /// Fixed-window algorithm parameters.
322    pub fixed_window: FixedWindowParams,
323
324    /// Vegas algorithm parameters for exit circuits.
325    // NOTE: In this module we are handling CREATE* cells,
326    // which only happens for non-hs circuits.
327    // So we don't need to store the vegas hs parameters here.
328    pub vegas_exit: VegasParams,
329
330    /// Congestion window parameters.
331    pub cwnd: CongestionWindowParams,
332
333    /// RTT calculation parameters.
334    pub rtt: RoundTripEstimatorParams,
335
336    /// Flow control parameters to use for all streams on this circuit.
337    pub flow_ctrl: FlowCtrlParameters,
338}
339
340impl CongestionControlNetParams {
341    #[cfg(test)]
342    // These have been copied from C-tor.
343    pub(crate) fn defaults_for_tests() -> Self {
344        Self {
345            fixed_window: FixedWindowParams::defaults_for_tests(),
346            vegas_exit: VegasParams::defaults_for_tests(),
347            cwnd: CongestionWindowParams::defaults_for_tests(),
348            rtt: RoundTripEstimatorParams::defaults_for_tests(),
349            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
350        }
351    }
352}
353
354/// Network consensus parameters for handling incoming circuits.
355///
356/// Unlike `CircParameters`,
357/// this is unopinionated and contains all relevant consensus parameters,
358/// which is needed when handling an incoming CREATE* request where the
359/// circuit origin chooses the type/settings
360/// (for example congestion control type) of the circuit.
361#[derive(Debug, Clone)]
362#[allow(clippy::exhaustive_structs)]
363pub struct CircNetParameters {
364    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
365    pub extend_by_ed25519_id: bool,
366
367    /// Congestion control network parameters.
368    pub cc: CongestionControlNetParams,
369}
370
371impl CircNetParameters {
372    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
373    ///
374    /// We expect the circuit creation handshake to know what congestion control algorithm was
375    /// negotiated, and provide that as `algorithm`.
376    //
377    // We disable `unused` warnings at the root of tor-proto,
378    // but it's nice to have here so we re-enable it.
379    #[warn(unused)]
380    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
381        // Unpack everything to make sure that we aren't missing anything
382        // (otherwise clippy would warn).
383        let Self {
384            extend_by_ed25519_id,
385            cc:
386                CongestionControlNetParams {
387                    fixed_window,
388                    vegas_exit,
389                    cwnd,
390                    rtt,
391                    flow_ctrl,
392                },
393        } = self;
394
395        let algorithm = match algorithm {
396            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
397            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
398        };
399
400        // TODO(arti#2442): The builder pattern here seems like a footgun.
401        let cc = CongestionControlParams::builder()
402            .alg(algorithm)
403            .fixed_window_params(*fixed_window)
404            .cwnd_params(*cwnd)
405            .rtt_params(rtt.clone())
406            .build()
407            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
408
409        Ok(CircParameters::new(
410            *extend_by_ed25519_id,
411            cc,
412            flow_ctrl.clone(),
413        ))
414    }
415}