1use crate::FlowCtrlParameters;
4use crate::ccparams::{
5 Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6 FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7};
8use crate::channel::Channel;
9use crate::circuit::CircuitRxSender;
10use crate::circuit::UniqId;
11use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13use crate::client::circuit::CircParameters;
14use crate::client::circuit::padding::PaddingController;
15use crate::crypto::cell::CryptInit as _;
16use crate::crypto::cell::RelayLayer as _;
17use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, tor1};
18use crate::crypto::handshake::RelayHandshakeError;
19use crate::crypto::handshake::ServerHandshake as _;
20use crate::crypto::handshake::fast::CreateFastServer;
21use crate::memquota::SpecificAccount as _;
22use crate::memquota::{ChannelAccount, CircuitAccount};
23use crate::relay::RelayCirc;
24use crate::relay::channel_provider::ChannelProvider;
25use crate::relay::reactor::Reactor;
26use std::sync::{Arc, RwLock, Weak};
27use tor_cell::chancell::ChanMsg as _;
28use tor_cell::chancell::CircId;
29use tor_cell::chancell::msg::{CreateFast, CreatedFast, Destroy, DestroyReason};
30use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
31use tor_linkspec::OwnedChanTarget;
32use tor_llcrypto::cipher::aes::Aes128Ctr;
33use tor_llcrypto::d::Sha1;
34use tor_llcrypto::pk::ed25519::Ed25519Identity;
35use tor_llcrypto::pk::rsa::RsaIdentity;
36use tor_memquota::mq_queue::ChannelSpec as _;
37use tor_memquota::mq_queue::MpscSpec;
38use tor_relay_crypto::pk::RelayNtorKeys;
39use tor_rtcompat::SpawnExt as _;
40use tor_rtcompat::{DynTimeProvider, Runtime};
41use tracing::warn;
42
43#[derive(derive_more::Debug)]
45pub struct CreateRequestHandler {
46 chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
48 circ_net_params: RwLock<CircNetParameters>,
50 #[debug(skip)]
52 ntor_keys: RwLock<RelayNtorKeys>,
53}
54
55impl CreateRequestHandler {
56 pub fn new(
58 chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
59 circ_net_params: CircNetParameters,
60 ntor_keys: RelayNtorKeys,
61 ) -> Self {
62 Self {
63 chan_provider,
64 circ_net_params: RwLock::new(circ_net_params),
65 ntor_keys: RwLock::new(ntor_keys),
66 }
67 }
68
69 pub fn update_params(&self, circ_net_params: CircNetParameters) {
71 *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
72 }
73
74 pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
78 *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
79 }
80
81 #[allow(clippy::too_many_arguments)]
89 pub(crate) fn handle_create<R: Runtime>(
90 &self,
91 runtime: &R,
92 channel: &Arc<Channel>,
93 our_ed25519_id: &Ed25519Identity,
94 our_rsa_id: &RsaIdentity,
95 circ_id: CircId,
96 msg: &CreateRequest,
97 memquota: &ChannelAccount,
98 circ_unique_id: UniqId,
99 ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
100 let result = self.handle_create_inner(
101 runtime,
102 channel,
103 our_ed25519_id,
104 our_rsa_id,
105 circ_id,
106 msg,
107 memquota,
108 circ_unique_id,
109 );
110
111 match result {
112 Ok(x) => Ok(x),
113 Err(e) => {
114 let cmd = msg.cmd();
116 debug_report!(&e, %cmd, "Failed to handle circuit create request");
117 Err(Destroy::new(e.destroy_reason()))
118 }
119 }
120 }
121
122 #[allow(clippy::too_many_arguments)]
124 fn handle_create_inner<R: Runtime>(
125 &self,
126 runtime: &R,
127 channel: &Arc<Channel>,
128 _our_ed25519_id: &Ed25519Identity,
130 _our_rsa_id: &RsaIdentity,
131 circ_id: CircId,
132 msg: &CreateRequest,
133 memquota: &ChannelAccount,
134 circ_unique_id: UniqId,
135 ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
136 let handshake_components = match msg {
138 CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
139 CreateRequest::Create2(_) => {
140 return Err(internal!("Not implemented").into());
143 }
144 };
145
146 let memquota = CircuitAccount::new(memquota)?;
147
148 let time_provider = DynTimeProvider::new(runtime.clone());
154 let account = memquota.as_raw_account();
155 let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
156
157 let (padding_ctrl, padding_stream) =
159 crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
160
161 let Some(chan_provider) = self.chan_provider.upgrade() else {
163 return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
164 };
165
166 let (reactor, circ) = Reactor::new(
168 runtime.clone(),
169 channel,
170 circ_id,
171 circ_unique_id,
172 receiver,
173 handshake_components.crypto_in,
174 handshake_components.crypto_out,
175 &handshake_components.hop_settings,
176 chan_provider,
177 padding_ctrl.clone(),
178 padding_stream,
179 &memquota,
180 )
181 .map_err(into_internal!("Failed to start circuit reactor"))?;
182
183 let () = runtime.spawn(async {
185 match reactor.run().await {
186 Ok(()) => {}
187 Err(e) => {
188 debug_report!(e, "Relay circuit reactor exited with an error");
189 }
190 }
191 })?;
192
193 Ok((
194 handshake_components.response,
195 RelayCircComponents {
196 circ,
197 sender,
198 padding_ctrl,
199 },
200 ))
201 }
202
203 fn handle_create_fast(
205 &self,
206 msg: &CreateFast,
207 ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
208 let (keygen, handshake_msg) = CreateFastServer::server(
210 &mut rand::rng(),
211 &mut |_: &()| Some(()),
212 &[()],
213 msg.handshake(),
214 )?;
215
216 let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
217 .map_err(into_internal!("Circuit crypt state construction failed"))?;
218
219 let circ_params = self
220 .circ_net_params
221 .read()
222 .expect("rwlock poisoned")
223 .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
225
226 let protos = tor_protover::Protocols::default();
228
229 let hop_settings =
232 HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
233 .map_err(into_internal!("Unable to build `HopSettings`"))?;
234
235 let response = CreatedFast::new(handshake_msg);
236 let response = CreateResponse::CreatedFast(response);
237
238 let (crypto_out, crypto_in, _binding) = crypt.split_relay_layer();
239 let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
240
241 Ok(CompletedHandshakeComponents {
242 response,
243 hop_settings,
244 crypto_out,
245 crypto_in,
246 })
247 }
248}
249
250#[derive(Debug, thiserror::Error)]
252enum HandleCreateError {
253 #[error("Circuit relay handshake failed")]
255 Handshake(#[from] RelayHandshakeError),
256 #[error("Memquota error")]
258 Memquota(#[from] tor_memquota::Error),
259 #[error("Runtime task spawn error")]
261 Spawn(#[from] futures::task::SpawnError),
262 #[error("Internal error")]
267 Internal(#[from] tor_error::Bug),
268}
269
270impl HandleCreateError {
271 fn destroy_reason(&self) -> DestroyReason {
273 match self {
276 Self::Handshake(e) => e.destroy_reason(),
277 Self::Memquota(_) => DestroyReason::INTERNAL,
278 Self::Spawn(_) => DestroyReason::INTERNAL,
279 Self::Internal(_) => DestroyReason::INTERNAL,
280 }
281 }
282}
283
284impl HasKind for HandleCreateError {
285 fn kind(&self) -> ErrorKind {
286 match self {
287 Self::Handshake(e) => e.kind(),
288 Self::Memquota(e) => e.kind(),
289 Self::Spawn(e) => e.kind(),
290 Self::Internal(_) => ErrorKind::Internal,
291 }
292 }
293}
294
295struct CompletedHandshakeComponents {
297 response: CreateResponse,
299 hop_settings: HopSettings,
301 crypto_out: Box<dyn OutboundRelayLayer + Send>,
303 crypto_in: Box<dyn InboundRelayLayer + Send>,
305}
306
307pub(crate) struct RelayCircComponents {
309 pub(crate) circ: Arc<RelayCirc>,
311 pub(crate) sender: CircuitRxSender,
313 pub(crate) padding_ctrl: PaddingController,
315}
316
317#[derive(Debug, Clone)]
319#[allow(clippy::exhaustive_structs)]
320pub struct CongestionControlNetParams {
321 pub fixed_window: FixedWindowParams,
323
324 pub vegas_exit: VegasParams,
329
330 pub cwnd: CongestionWindowParams,
332
333 pub rtt: RoundTripEstimatorParams,
335
336 pub flow_ctrl: FlowCtrlParameters,
338}
339
340impl CongestionControlNetParams {
341 #[cfg(test)]
342 pub(crate) fn defaults_for_tests() -> Self {
344 Self {
345 fixed_window: FixedWindowParams::defaults_for_tests(),
346 vegas_exit: VegasParams::defaults_for_tests(),
347 cwnd: CongestionWindowParams::defaults_for_tests(),
348 rtt: RoundTripEstimatorParams::defaults_for_tests(),
349 flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
350 }
351 }
352}
353
354#[derive(Debug, Clone)]
362#[allow(clippy::exhaustive_structs)]
363pub struct CircNetParameters {
364 pub extend_by_ed25519_id: bool,
366
367 pub cc: CongestionControlNetParams,
369}
370
371impl CircNetParameters {
372 #[warn(unused)]
380 fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
381 let Self {
384 extend_by_ed25519_id,
385 cc:
386 CongestionControlNetParams {
387 fixed_window,
388 vegas_exit,
389 cwnd,
390 rtt,
391 flow_ctrl,
392 },
393 } = self;
394
395 let algorithm = match algorithm {
396 AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
397 AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
398 };
399
400 let cc = CongestionControlParams::builder()
402 .alg(algorithm)
403 .fixed_window_params(*fixed_window)
404 .cwnd_params(*cwnd)
405 .rtt_params(rtt.clone())
406 .build()
407 .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
408
409 Ok(CircParameters::new(
410 *extend_by_ed25519_id,
411 cc,
412 flow_ctrl.clone(),
413 ))
414 }
415}