Skip to main content

tor_circmgr/
build.rs

1//! Facilities to build circuits directly, instead of via a circuit manager.
2
3use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::Future;
8use oneshot_fused_workaround as oneshot;
9use std::sync::{
10    Arc,
11    atomic::{AtomicU32, Ordering},
12};
13use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
14use tor_error::into_internal;
15use tor_guardmgr::GuardStatus;
16use tor_linkspec::{IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
17use tor_netdir::params::NetParameters;
18use tor_proto::ccparams::{self, AlgorithmType};
19use tor_proto::client::circuit::{CircParameters, PendingClientTunnel};
20use tor_proto::{CellCount, ClientTunnel, FlowCtrlParameters};
21use tor_rtcompat::SpawnExt;
22use tor_rtcompat::{Runtime, SleepProviderExt};
23use tor_units::Percentage;
24use tracing::instrument;
25use web_time_compat::{Duration, Instant};
26
27#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28use tor_guardmgr::vanguards::VanguardMgr;
29
30mod guardstatus;
31
32pub(crate) use guardstatus::GuardStatusHandle;
33
34/// Represents an objects that can be constructed in a circuit-like way.
35///
36/// This is only a separate trait for testing purposes, so that we can swap
37/// our some other type when we're testing Builder.
38///
39/// TODO: I'd like to have a simpler testing strategy here; this one
40/// complicates things a bit.
41#[async_trait]
42pub(crate) trait Buildable: Sized {
43    /// Our equivalent to a tor_proto::Channel.
44    type Chan: Send + Sync;
45
46    /// Use a channel manager to open a new channel (or find an existing channel)
47    /// to a provided [`OwnedChanTarget`].
48    async fn open_channel<RT: Runtime>(
49        chanmgr: &ChanMgr<RT>,
50        ct: &OwnedChanTarget,
51        guard_status: &GuardStatusHandle,
52        usage: ChannelUsage,
53    ) -> Result<Arc<Self::Chan>>;
54
55    /// Launch a new one-hop circuit to a given relay, given only a
56    /// channel target `ct` specifying that relay.
57    ///
58    /// (Since we don't have a CircTarget here, we can't extend the circuit
59    /// to be multihop later on.)
60    async fn create_chantarget<RT: Runtime>(
61        chan: Arc<Self::Chan>,
62        rt: &RT,
63        ct: &OwnedChanTarget,
64        params: CircParameters,
65        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
66    ) -> Result<Self>;
67
68    /// Launch a new circuit through a given relay, given a circuit target
69    /// `ct` specifying that relay.
70    async fn create<RT: Runtime>(
71        chan: Arc<Self::Chan>,
72        rt: &RT,
73        ct: &OwnedCircTarget,
74        params: CircParameters,
75        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
76    ) -> Result<Self>;
77
78    /// Extend this circuit-like object by one hop, to the location described
79    /// in `ct`.
80    async fn extend<RT: Runtime>(
81        &self,
82        rt: &RT,
83        ct: &OwnedCircTarget,
84        params: CircParameters,
85    ) -> Result<()>;
86}
87
88/// Try to make a [`PendingClientTunnel`] to a given relay, and start its
89/// reactor.
90///
91/// This is common code, shared by all the first-hop functions in the
92/// implementation of `Buildable` for `ClientTunnel`.
93#[instrument(level = "trace", skip_all)]
94async fn create_common<RT: Runtime>(
95    chan: Arc<tor_proto::channel::Channel>,
96    timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
97    rt: &RT,
98) -> Result<PendingClientTunnel> {
99    // Construct the (zero-hop) circuit.
100    let (pending_tunnel, reactor) =
101        chan.new_tunnel(timeouts)
102            .await
103            .map_err(|error| Error::Protocol {
104                error,
105                peer: None, // we don't blame the peer, because new_tunnel() does no networking.
106                action: "initializing circuit",
107                unique_id: None,
108            })?;
109
110    tracing::debug!("Spawning reactor...");
111
112    rt.spawn(async {
113        let _ = reactor.run().await;
114    })
115    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
116
117    Ok(pending_tunnel)
118}
119
120#[async_trait]
121impl Buildable for ClientTunnel {
122    type Chan = tor_proto::channel::Channel;
123
124    #[instrument(level = "trace", skip_all)]
125    async fn open_channel<RT: Runtime>(
126        chanmgr: &ChanMgr<RT>,
127        target: &OwnedChanTarget,
128        guard_status: &GuardStatusHandle,
129        usage: ChannelUsage,
130    ) -> Result<Arc<Self::Chan>> {
131        // If we fail now, it's the guard's fault.
132        guard_status.pending(GuardStatus::Failure);
133
134        // Get or construct the channel.
135        let result = chanmgr.get_or_launch(target, usage).await;
136
137        // Report the clock skew if appropriate, and exit if there has been an error.
138        match result {
139            Ok((chan, ChanProvenance::NewlyCreated)) => {
140                guard_status.skew(chan.clock_skew());
141                Ok(chan)
142            }
143            Ok((chan, _)) => Ok(chan),
144            Err(cause) => {
145                if let Some(skew) = cause.clock_skew() {
146                    guard_status.skew(skew);
147                }
148                Err(Error::Channel {
149                    peer: target.to_logged(),
150                    cause,
151                })
152            }
153        }
154    }
155
156    #[instrument(level = "trace", skip_all)]
157    async fn create_chantarget<RT: Runtime>(
158        chan: Arc<Self::Chan>,
159        rt: &RT,
160        ct: &OwnedChanTarget,
161        params: CircParameters,
162        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
163    ) -> Result<Self> {
164        let pending_tunnel = create_common(chan, timeouts, rt).await?;
165        let unique_id = Some(pending_tunnel.peek_unique_id());
166        pending_tunnel
167            .create_firsthop_fast(params)
168            .await
169            .map_err(|error| Error::Protocol {
170                peer: Some(ct.to_logged()),
171                error,
172                action: "running CREATE_FAST handshake",
173                unique_id,
174            })
175    }
176    #[instrument(level = "trace", skip_all)]
177    async fn create<RT: Runtime>(
178        chan: Arc<Self::Chan>,
179        rt: &RT,
180        ct: &OwnedCircTarget,
181        params: CircParameters,
182        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
183    ) -> Result<Self> {
184        let pending_tunnel = create_common(chan, timeouts, rt).await?;
185        let unique_id = Some(pending_tunnel.peek_unique_id());
186
187        let handshake_res = pending_tunnel.create_firsthop(ct, params).await;
188        handshake_res.map_err(|error| Error::Protocol {
189            peer: Some(ct.to_logged()),
190            error,
191            action: "creating first hop",
192            unique_id,
193        })
194    }
195    async fn extend<RT: Runtime>(
196        &self,
197        _rt: &RT,
198        ct: &OwnedCircTarget,
199        params: CircParameters,
200    ) -> Result<()> {
201        let circ = self.as_single_circ().map_err(|error| Error::Protocol {
202            peer: Some(ct.to_logged()),
203            error,
204            action: "extend tunnel",
205            unique_id: Some(self.unique_id()),
206        })?;
207
208        let res = circ.extend(ct, params).await;
209        res.map_err(|error| Error::Protocol {
210            error,
211            // We can't know who caused the error, since it may have been
212            // the hop we were extending from, or the hop we were extending
213            // to.
214            peer: None,
215            action: "extending circuit",
216            unique_id: Some(self.unique_id()),
217        })
218    }
219}
220
221/// An implementation type for [`TunnelBuilder`].
222///
223/// A `TunnelBuilder` holds references to all the objects that are needed
224/// to build circuits correctly.
225///
226/// In general, you should not need to construct or use this object yourself,
227/// unless you are choosing your own paths.
228struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
229    /// The runtime used by this circuit builder.
230    runtime: R,
231    /// A channel manager that this circuit builder uses to make channels.
232    chanmgr: Arc<ChanMgr<R>>,
233    /// An estimator to determine the correct timeouts for circuit building.
234    timeouts: Arc<timeouts::Estimator>,
235    /// We don't actually hold any clientcircs, so we need to put this
236    /// type here so the compiler won't freak out.
237    _phantom: std::marker::PhantomData<C>,
238}
239
240impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
241    /// Construct a new [`Builder`].
242    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
243        Builder {
244            runtime,
245            chanmgr,
246            timeouts: Arc::new(timeouts),
247            _phantom: std::marker::PhantomData,
248        }
249    }
250
251    /// Build a circuit, without performing any timeout operations.
252    ///
253    /// After each hop is built, increments n_hops_built.  Make sure that
254    /// `guard_status` has its pending status set correctly to correspond
255    /// to a circuit failure at any given stage.
256    ///
257    /// Requires that `channel` is a channel to the first hop of `path`.
258    ///
259    /// (TODO: Find
260    /// a better design there.)
261    #[instrument(level = "trace", skip_all)]
262    async fn build_notimeout(
263        self: Arc<Self>,
264        path: OwnedPath,
265        channel: Arc<C::Chan>,
266        params: CircParameters,
267        start_time: Instant,
268        n_hops_built: Arc<AtomicU32>,
269        guard_status: Arc<GuardStatusHandle>,
270    ) -> Result<C> {
271        match path {
272            OwnedPath::ChannelOnly(target) => {
273                let timeouts = Arc::clone(&self.timeouts);
274                let circ =
275                    C::create_chantarget(channel, &self.runtime, &target, params, timeouts).await?;
276                self.timeouts
277                    .note_hop_completed(0, self.runtime.now() - start_time, true);
278                n_hops_built.fetch_add(1, Ordering::SeqCst);
279                Ok(circ)
280            }
281            OwnedPath::Normal(p) => {
282                assert!(!p.is_empty());
283                let n_hops = p.len() as u8;
284                let timeouts = Arc::clone(&self.timeouts);
285                // Each hop has its own circ parameters. This is for the first hop (CREATE).
286                let circ =
287                    C::create(channel, &self.runtime, &p[0], params.clone(), timeouts).await?;
288                self.timeouts
289                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
290                // If we fail after this point, we can't tell whether it's
291                // the fault of the guard or some later relay.
292                guard_status.pending(GuardStatus::Indeterminate);
293                n_hops_built.fetch_add(1, Ordering::SeqCst);
294                for (hop_num, relay) in (1..).zip(p[1..].iter()) {
295                    // Get the params per subsequent hop (EXTEND).
296                    circ.extend(&self.runtime, relay, params.clone()).await?;
297                    n_hops_built.fetch_add(1, Ordering::SeqCst);
298                    self.timeouts.note_hop_completed(
299                        hop_num,
300                        self.runtime.now() - start_time,
301                        hop_num == (n_hops - 1),
302                    );
303                }
304                Ok(circ)
305            }
306        }
307    }
308
309    /// Build a circuit from an [`OwnedPath`].
310    #[instrument(level = "trace", skip_all)]
311    async fn build_owned(
312        self: &Arc<Self>,
313        path: OwnedPath,
314        params: &CircParameters,
315        guard_status: Arc<GuardStatusHandle>,
316        usage: ChannelUsage,
317    ) -> Result<C> {
318        let action = Action::BuildCircuit { length: path.len() };
319        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
320
321        // TODO: This is probably not the best way for build_notimeout to
322        // tell us how many hops it managed to build, but at least it is
323        // isolated here.
324        let hops_built = Arc::new(AtomicU32::new(0));
325
326        let self_clone = Arc::clone(self);
327        let params = params.clone();
328
329        // We open the channel separately from the rest of the circuit, since we don't want to count
330        // it towards the circuit timeout.
331        //
332        // We don't need a separate timeout here, since ChanMgr already implements its own timeouts.
333        let channel = C::open_channel(
334            &self.chanmgr,
335            path.first_hop_as_chantarget(),
336            guard_status.as_ref(),
337            usage,
338        )
339        .await?;
340
341        let start_time = self.runtime.now();
342
343        let circuit_future = self_clone.build_notimeout(
344            path,
345            channel,
346            params,
347            start_time,
348            Arc::clone(&hops_built),
349            guard_status,
350        );
351
352        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
353            Ok(circuit) => Ok(circuit),
354            Err(Error::CircTimeout(unique_id)) => {
355                let n_built = hops_built.load(Ordering::SeqCst);
356                self.timeouts
357                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
358                Err(Error::CircTimeout(unique_id))
359            }
360            Err(e) => Err(e),
361        }
362    }
363
364    /// Return a reference to this Builder runtime.
365    pub(crate) fn runtime(&self) -> &R {
366        &self.runtime
367    }
368
369    /// Return a reference to this Builder's timeout estimator.
370    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
371        &self.timeouts
372    }
373}
374
375/// A factory object to build circuits.
376///
377/// A `TunnelBuilder` holds references to all the objects that are needed
378/// to build circuits correctly.
379///
380/// In general, you should not need to construct or use this object yourself,
381/// unless you are choosing your own paths.
382pub struct TunnelBuilder<R: Runtime> {
383    /// The underlying [`Builder`] object
384    builder: Arc<Builder<R, ClientTunnel>>,
385    /// Configuration for how to choose paths for circuits.
386    path_config: tor_config::MutCfg<crate::PathConfig>,
387    /// State-manager object to use in storing current state.
388    storage: crate::TimeoutStateHandle,
389    /// Guard manager to tell us which guards nodes to use for the circuits
390    /// we build.
391    guardmgr: tor_guardmgr::GuardMgr<R>,
392    /// The vanguard manager object used for HS circuits.
393    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
394    vanguardmgr: Arc<VanguardMgr<R>>,
395}
396
397impl<R: Runtime> TunnelBuilder<R> {
398    /// Construct a new [`TunnelBuilder`].
399    // TODO: eventually I'd like to make this a public function, but
400    // TimeoutStateHandle is private.
401    pub(crate) fn new(
402        runtime: R,
403        chanmgr: Arc<ChanMgr<R>>,
404        path_config: crate::PathConfig,
405        storage: crate::TimeoutStateHandle,
406        guardmgr: tor_guardmgr::GuardMgr<R>,
407        #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
408    ) -> Self {
409        let timeouts = timeouts::Estimator::from_storage(&storage);
410
411        TunnelBuilder {
412            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
413            path_config: path_config.into(),
414            storage,
415            guardmgr,
416            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
417            vanguardmgr: Arc::new(vanguardmgr),
418        }
419    }
420
421    /// Return this builder's [`PathConfig`](crate::PathConfig).
422    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
423        self.path_config.get()
424    }
425
426    /// Replace this builder's [`PathConfig`](crate::PathConfig).
427    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
428        self.path_config.replace(new_config);
429    }
430
431    /// Flush state to the state manager if we own the lock.
432    ///
433    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
434    pub(crate) fn save_state(&self) -> Result<bool> {
435        if !self.storage.can_store() {
436            return Ok(false);
437        }
438        // TODO: someday we'll want to only do this if there is something
439        // changed.
440        self.builder.timeouts.save_state(&self.storage)?;
441        self.guardmgr.store_persistent_state()?;
442        Ok(true)
443    }
444
445    /// Replace our state with a new owning state, assuming we have
446    /// storage permission.
447    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
448        self.builder
449            .timeouts
450            .upgrade_to_owning_storage(&self.storage);
451        self.guardmgr.upgrade_to_owned_persistent_state()?;
452        Ok(())
453    }
454
455    /// Reload persistent state from disk, if we don't have storage permission.
456    #[instrument(level = "trace", skip_all)]
457    pub(crate) fn reload_state(&self) -> Result<()> {
458        if !self.storage.can_store() {
459            self.builder
460                .timeouts
461                .reload_readonly_from_storage(&self.storage);
462        }
463        self.guardmgr.reload_persistent_state()?;
464        Ok(())
465    }
466
467    /// Reconfigure this builder using the latest set of network parameters.
468    ///
469    /// (NOTE: for now, this only affects circuit timeout estimation.)
470    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
471        self.builder.timeouts.update_params(p);
472    }
473
474    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
475    #[instrument(level = "trace", skip_all)]
476    pub(crate) async fn build_owned(
477        &self,
478        path: OwnedPath,
479        params: &CircParameters,
480        guard_status: Arc<GuardStatusHandle>,
481        usage: ChannelUsage,
482    ) -> Result<ClientTunnel> {
483        self.builder
484            .build_owned(path, params, guard_status, usage)
485            .await
486    }
487
488    /// Try to construct a new circuit from a given path, using appropriate
489    /// timeouts.
490    ///
491    /// This circuit is _not_ automatically registered with any
492    /// circuit manager; if you don't hang on it it, it will
493    /// automatically go away when the last reference is dropped.
494    #[instrument(level = "trace", skip_all)]
495    pub async fn build(
496        &self,
497        path: &TorPath<'_>,
498        params: &CircParameters,
499        usage: ChannelUsage,
500    ) -> Result<ClientTunnel> {
501        let owned = path.try_into()?;
502        self.build_owned(owned, params, Arc::new(None.into()), usage)
503            .await
504    }
505
506    /// Return true if this builder is currently learning timeout info.
507    pub(crate) fn learning_timeouts(&self) -> bool {
508        self.builder.timeouts.learning_timeouts()
509    }
510
511    /// Return a reference to this builder's `GuardMgr`.
512    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
513        &self.guardmgr
514    }
515
516    /// Return a reference to this builder's `VanguardMgr`.
517    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
518    pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
519        &self.vanguardmgr
520    }
521
522    /// Return a reference to this builder's runtime
523    pub(crate) fn runtime(&self) -> &R {
524        self.builder.runtime()
525    }
526
527    /// Return a reference to this builder's timeout estimator.
528    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
529        self.builder.estimator()
530    }
531}
532
533/// Return the congestion control Vegas algorithm using the given network parameters.
534#[cfg(feature = "flowctl-cc")]
535fn build_cc_vegas(
536    inp: &NetParameters,
537    vegas_queue_params: ccparams::VegasQueueParams,
538) -> ccparams::Algorithm {
539    ccparams::Algorithm::Vegas(
540        ccparams::VegasParamsBuilder::default()
541            .cell_in_queue_params(vegas_queue_params)
542            .ss_cwnd_max(inp.cc_ss_max.into())
543            .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
544            .cwnd_full_min_pct(Percentage::new(
545                inp.cc_cwnd_full_minpct.as_percent().get() as u32
546            ))
547            .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
548            .build()
549            .expect("Unable to build Vegas params from NetParams"),
550    )
551}
552
553/// Return the congestion control FixedWindow algorithm using the given network parameters.
554fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
555    ccparams::Algorithm::FixedWindow(build_cc_fixedwindow_params(inp))
556}
557
558/// Return the parameters for the congestion control FixedWindow algorithm
559/// using the given network parameters.
560fn build_cc_fixedwindow_params(inp: &NetParameters) -> ccparams::FixedWindowParams {
561    ccparams::FixedWindowParamsBuilder::default()
562        .circ_window_start(inp.circuit_window.get() as u16)
563        .circ_window_min(inp.circuit_window.lower() as u16)
564        .circ_window_max(inp.circuit_window.upper() as u16)
565        .build()
566        .expect("Unable to build FixedWindow params from NetParams")
567}
568
569/// Return a new circuit parameter struct using the given network parameters and algorithm to use.
570fn circparameters_from_netparameters(
571    inp: &NetParameters,
572    alg: ccparams::Algorithm,
573) -> Result<CircParameters> {
574    let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
575        .cwnd_init(inp.cc_cwnd_init.into())
576        .cwnd_inc_pct_ss(Percentage::new(
577            inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
578        ))
579        .cwnd_inc(inp.cc_cwnd_inc.into())
580        .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
581        .cwnd_min(inp.cc_cwnd_min.into())
582        .cwnd_max(inp.cc_cwnd_max.into())
583        .sendme_inc(inp.cc_sendme_inc.into())
584        .build()
585        .map_err(into_internal!(
586            "Unable to build CongestionWindow params from NetParams"
587        ))?;
588    let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
589        .ewma_cwnd_pct(Percentage::new(
590            inp.cc_ewma_cwnd_pct.as_percent().get() as u32
591        ))
592        .ewma_max(inp.cc_ewma_max.into())
593        .ewma_ss_max(inp.cc_ewma_ss.into())
594        .rtt_reset_pct(Percentage::new(
595            inp.cc_rtt_reset_pct.as_percent().get() as u32
596        ))
597        .build()
598        .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
599    let ccontrol = ccparams::CongestionControlParamsBuilder::default()
600        .alg(alg)
601        .fixed_window_params(build_cc_fixedwindow_params(inp))
602        .cwnd_params(cwnd_params)
603        .rtt_params(rtt_params)
604        .build()
605        .map_err(into_internal!(
606            "Unable to build CongestionControl params from NetParams"
607        ))?;
608    let flow_ctrl_params = FlowCtrlParameters {
609        cc_xoff_client: CellCount::new(inp.cc_xoff_client.get_u32()),
610        cc_xoff_exit: CellCount::new(inp.cc_xoff_exit.get_u32()),
611        cc_xon_rate: CellCount::new(inp.cc_xon_rate.get_u32()),
612        cc_xon_change_pct: inp.cc_xon_change_pct.get_u32(),
613        cc_xon_ewma_cnt: inp.cc_xon_ewma_cnt.get_u32(),
614    };
615    Ok(CircParameters::new(
616        inp.extend_by_ed25519_id.into(),
617        ccontrol,
618        flow_ctrl_params,
619    ))
620}
621
622/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an exit circuit or
623/// single onion service (when implemented).
624pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
625    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
626        #[cfg(feature = "flowctl-cc")]
627        AlgorithmType::VEGAS => build_cc_vegas(
628            inp,
629            (
630                inp.cc_vegas_alpha_exit.into(),
631                inp.cc_vegas_beta_exit.into(),
632                inp.cc_vegas_delta_exit.into(),
633                inp.cc_vegas_gamma_exit.into(),
634                inp.cc_vegas_sscap_exit.into(),
635            )
636                .into(),
637        ),
638        // Unrecognized, fallback to fixed window as in SENDME v0.
639        _ => build_cc_fixedwindow(inp),
640    };
641    circparameters_from_netparameters(inp, alg)
642}
643
644/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an onion circuit
645/// which also includes an onion service with Vanguard.
646pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
647    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
648        #[cfg(feature = "flowctl-cc")]
649        AlgorithmType::VEGAS => {
650            // NOTE: At the time of writing, we don't yet support cc negotiation for onion services.
651            // See `HopSettings::onion_circparams_from_netparams()` where we use a fallback
652            // algorithm for HsV3 circuits instead, and see arti#2037.
653            build_cc_vegas(
654                inp,
655                (
656                    inp.cc_vegas_alpha_onion.into(),
657                    inp.cc_vegas_beta_onion.into(),
658                    inp.cc_vegas_delta_onion.into(),
659                    inp.cc_vegas_gamma_onion.into(),
660                    inp.cc_vegas_sscap_onion.into(),
661                )
662                    .into(),
663            )
664        }
665        // Unrecognized, fallback to fixed window as in SENDME v0.
666        _ => build_cc_fixedwindow(inp),
667    };
668    circparameters_from_netparameters(inp, alg)
669}
670
671/// Helper function: spawn a future as a background task, and run it with
672/// two separate timeouts.
673///
674/// If the future does not complete by `timeout`, then return a
675/// timeout error immediately, but keep running the future in the
676/// background.
677///
678/// If the future does not complete by `abandon`, then abandon the
679/// future completely.
680async fn double_timeout<R, F, T>(
681    runtime: &R,
682    fut: F,
683    timeout: Duration,
684    abandon: Duration,
685) -> Result<T>
686where
687    R: Runtime,
688    F: Future<Output = Result<T>> + Send + 'static,
689    T: Send + 'static,
690{
691    let (snd, rcv) = oneshot::channel();
692    let rt = runtime.clone();
693    // We create these futures now, since we want them to look at the current
694    // time when they decide when to expire.
695    let inner_timeout_future = rt.timeout(abandon, fut);
696    let outer_timeout_future = rt.timeout(timeout, rcv);
697
698    runtime
699        .spawn(async move {
700            let result = inner_timeout_future.await;
701            let _ignore_cancelled_error = snd.send(result);
702        })
703        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
704
705    let outcome = outer_timeout_future.await;
706    // 4 layers of error to collapse:
707    //     One from the receiver being cancelled.
708    //     One from the outer timeout.
709    //     One from the inner timeout.
710    //     One from the actual future's result.
711    //
712    // (Technically, we could refrain from unwrapping the future's result,
713    // but doing it this way helps make it more certain that we really are
714    // collapsing all the layers into one.)
715    outcome
716        .map_err(|_| Error::CircTimeout(None))??
717        .map_err(|_| Error::CircTimeout(None))?
718}
719
720#[cfg(test)]
721mod test {
722    // @@ begin test lint list maintained by maint/add_warning @@
723    #![allow(clippy::bool_assert_comparison)]
724    #![allow(clippy::clone_on_copy)]
725    #![allow(clippy::dbg_macro)]
726    #![allow(clippy::mixed_attributes_style)]
727    #![allow(clippy::print_stderr)]
728    #![allow(clippy::print_stdout)]
729    #![allow(clippy::single_char_pattern)]
730    #![allow(clippy::unwrap_used)]
731    #![allow(clippy::unchecked_time_subtraction)]
732    #![allow(clippy::useless_vec)]
733    #![allow(clippy::needless_pass_by_value)]
734    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
735    use super::*;
736    use crate::timeouts::TimeoutEstimator;
737    use futures::FutureExt;
738    use std::sync::Mutex;
739    use tor_chanmgr::ChannelUsage as CU;
740    use tor_linkspec::ChanTarget;
741    use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
742    use tor_llcrypto::pk::ed25519::Ed25519Identity;
743    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
744    use tor_proto::memquota::ToplevelAccount;
745    use tor_rtcompat::SleepProvider;
746    use tracing::trace;
747
748    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
749    fn gs() -> Arc<GuardStatusHandle> {
750        Arc::new(None.into())
751    }
752
753    #[test]
754    // Re-enabled after work from eta, discussed in arti#149
755    fn test_double_timeout() {
756        let t1 = Duration::from_secs(1);
757        let t10 = Duration::from_secs(10);
758        /// Return true if d1 is in range [d2...d2 + 0.5sec]
759        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
760            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
761        }
762
763        tor_rtmock::MockRuntime::test_with_various(|rto| async move {
764            // Try a future that's ready immediately.
765            let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
766            assert!(x.is_ok());
767            assert_eq!(x.unwrap(), 3_u32);
768
769            trace!("acquiesce after test1");
770            #[allow(clippy::clone_on_copy)]
771            #[allow(deprecated)] // TODO #1885
772            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
773
774            // Try a future that's ready after a short delay.
775            let rt_clone = rt.clone();
776            // (We only want the short delay to fire, not any of the other timeouts.)
777            rt_clone.block_advance("manually controlling advances");
778            let x = rt
779                .wait_for(double_timeout(
780                    &rt,
781                    async move {
782                        let sl = rt_clone.sleep(Duration::from_millis(100));
783                        rt_clone.allow_one_advance(Duration::from_millis(100));
784                        sl.await;
785                        Ok(4_u32)
786                    },
787                    t1,
788                    t10,
789                ))
790                .await;
791            assert!(x.is_ok());
792            assert_eq!(x.unwrap(), 4_u32);
793
794            trace!("acquiesce after test2");
795            #[allow(clippy::clone_on_copy)]
796            #[allow(deprecated)] // TODO #1885
797            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
798
799            // Try a future that passes the first timeout, and make sure that
800            // it keeps running after it times out.
801            let rt_clone = rt.clone();
802            let (snd, rcv) = oneshot::channel();
803            let start = rt.now();
804            rt.block_advance("manually controlling advances");
805            let x = rt
806                .wait_for(double_timeout(
807                    &rt,
808                    async move {
809                        let sl = rt_clone.sleep(Duration::from_secs(2));
810                        rt_clone.allow_one_advance(Duration::from_secs(2));
811                        sl.await;
812                        snd.send(()).unwrap();
813                        Ok(4_u32)
814                    },
815                    t1,
816                    t10,
817                ))
818                .await;
819            assert!(matches!(x, Err(Error::CircTimeout(_))));
820            let end = rt.now();
821            assert!(duration_close_to(end - start, Duration::from_secs(1)));
822            let waited = rt.wait_for(rcv).await;
823            assert_eq!(waited, Ok(()));
824
825            trace!("acquiesce after test3");
826            #[allow(clippy::clone_on_copy)]
827            #[allow(deprecated)] // TODO #1885
828            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
829
830            // Try a future that times out and gets abandoned.
831            let rt_clone = rt.clone();
832            rt.block_advance("manually controlling advances");
833            let (snd, rcv) = oneshot::channel();
834            let start = rt.now();
835            // Let it hit the first timeout...
836            rt.allow_one_advance(Duration::from_secs(1));
837            let x = rt
838                .wait_for(double_timeout(
839                    &rt,
840                    async move {
841                        rt_clone.sleep(Duration::from_secs(30)).await;
842                        snd.send(()).unwrap();
843                        Ok(4_u32)
844                    },
845                    t1,
846                    t10,
847                ))
848                .await;
849            assert!(matches!(x, Err(Error::CircTimeout(_))));
850            let end = rt.now();
851            // ...and let it hit the second, too.
852            rt.allow_one_advance(Duration::from_secs(9));
853            let waited = rt.wait_for(rcv).await;
854            assert!(waited.is_err());
855            let end2 = rt.now();
856            assert!(duration_close_to(end - start, Duration::from_secs(1)));
857            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
858        });
859    }
860
861    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
862    ///
863    /// In our FakeCircuit code below, the first timeout is the amount of
864    /// time that we should sleep while building a hop to this key,
865    /// and the second timeout is the length of time-advance we should allow
866    /// after the hop is built.
867    ///
868    /// (This is pretty silly, but it's good enough for testing.)
869    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
870        let mut be = [0; 8];
871        be[..].copy_from_slice(&id.as_bytes()[0..8]);
872        let dur = u64::from_be_bytes(be);
873        be[..].copy_from_slice(&id.as_bytes()[8..16]);
874        let dur2 = u64::from_be_bytes(be);
875        (Duration::from_millis(dur), Duration::from_millis(dur2))
876    }
877    /// Encode a pair of timeouts as an Ed25519 identity.
878    ///
879    /// In our FakeCircuit code below, the first timeout is the amount of
880    /// time that we should sleep while building a hop to this key,
881    /// and the second timeout is the length of time-advance we should allow
882    /// after the hop is built.
883    ///
884    /// (This is pretty silly but it's good enough for testing.)
885    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
886        let mut bytes = [0; 32];
887        let dur = (d1.as_millis() as u64).to_be_bytes();
888        bytes[0..8].copy_from_slice(&dur);
889        let dur = (d2.as_millis() as u64).to_be_bytes();
890        bytes[8..16].copy_from_slice(&dur);
891        bytes.into()
892    }
893
894    /// As [`timeouts_from_key`], but first extract the relevant key from the
895    /// OwnedChanTarget.
896    fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
897        // Extracting the Ed25519 identity should always succeed in this case:
898        // we put it there ourselves!
899        let ed_id = ct
900            .identity(RelayIdType::Ed25519)
901            .expect("No ed25519 key was present for fake ChanTarget‽")
902            .try_into()
903            .expect("ChanTarget provided wrong key type");
904        timeouts_from_key(ed_id)
905    }
906
907    /// Replacement type for circuit, to implement buildable.
908    #[derive(Debug, Clone)]
909    struct FakeCirc {
910        hops: Vec<RelayIds>,
911        onehop: bool,
912    }
913    #[async_trait]
914    impl Buildable for Mutex<FakeCirc> {
915        type Chan = ();
916
917        async fn open_channel<RT: Runtime>(
918            _chanmgr: &ChanMgr<RT>,
919            _ct: &OwnedChanTarget,
920            _guard_status: &GuardStatusHandle,
921            _usage: ChannelUsage,
922        ) -> Result<Arc<Self::Chan>> {
923            Ok(Arc::new(()))
924        }
925
926        async fn create_chantarget<RT: Runtime>(
927            _: Arc<Self::Chan>,
928            rt: &RT,
929            ct: &OwnedChanTarget,
930            _: CircParameters,
931            _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
932        ) -> Result<Self> {
933            let (d1, d2) = timeouts_from_chantarget(ct);
934            rt.sleep(d1).await;
935            if !d2.is_zero() {
936                rt.allow_one_advance(d2);
937            }
938
939            let c = FakeCirc {
940                hops: vec![RelayIds::from_relay_ids(ct)],
941                onehop: true,
942            };
943            Ok(Mutex::new(c))
944        }
945        async fn create<RT: Runtime>(
946            _: Arc<Self::Chan>,
947            rt: &RT,
948            ct: &OwnedCircTarget,
949            _: CircParameters,
950            _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
951        ) -> Result<Self> {
952            let (d1, d2) = timeouts_from_chantarget(ct);
953            rt.sleep(d1).await;
954            if !d2.is_zero() {
955                rt.allow_one_advance(d2);
956            }
957
958            let c = FakeCirc {
959                hops: vec![RelayIds::from_relay_ids(ct)],
960                onehop: false,
961            };
962            Ok(Mutex::new(c))
963        }
964        async fn extend<RT: Runtime>(
965            &self,
966            rt: &RT,
967            ct: &OwnedCircTarget,
968            _: CircParameters,
969        ) -> Result<()> {
970            let (d1, d2) = timeouts_from_chantarget(ct);
971            rt.sleep(d1).await;
972            if !d2.is_zero() {
973                rt.allow_one_advance(d2);
974            }
975
976            {
977                let mut c = self.lock().unwrap();
978                c.hops.push(RelayIds::from_relay_ids(ct));
979            }
980            Ok(())
981        }
982    }
983
984    /// Fake implementation of TimeoutEstimator that just records its inputs.
985    struct TimeoutRecorder<R> {
986        runtime: R,
987        hist: Vec<(bool, u8, Duration)>,
988        // How much advance to permit after being told of a timeout?
989        on_timeout: Duration,
990        // How much advance to permit after being told of a success?
991        on_success: Duration,
992
993        snd_success: Option<oneshot::Sender<()>>,
994        rcv_success: Option<oneshot::Receiver<()>>,
995    }
996
997    impl<R> TimeoutRecorder<R> {
998        fn new(runtime: R) -> Self {
999            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
1000        }
1001
1002        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
1003            let (snd_success, rcv_success) = oneshot::channel();
1004            Self {
1005                runtime,
1006                hist: Vec::new(),
1007                on_timeout,
1008                on_success,
1009                rcv_success: Some(rcv_success),
1010                snd_success: Some(snd_success),
1011            }
1012        }
1013    }
1014    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
1015        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
1016            if !is_last {
1017                return;
1018            }
1019            let (rt, advance) = {
1020                let mut this = self.lock().unwrap();
1021                this.hist.push((true, hop, delay));
1022                let _ = this.snd_success.take().unwrap().send(());
1023                (this.runtime.clone(), this.on_success)
1024            };
1025            if !advance.is_zero() {
1026                rt.allow_one_advance(advance);
1027            }
1028        }
1029        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
1030            let (rt, advance) = {
1031                let mut this = self.lock().unwrap();
1032                this.hist.push((false, hop, delay));
1033                (this.runtime.clone(), this.on_timeout)
1034            };
1035            if !advance.is_zero() {
1036                rt.allow_one_advance(advance);
1037            }
1038        }
1039        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1040            (Duration::from_secs(3), Duration::from_secs(100))
1041        }
1042        fn learning_timeouts(&self) -> bool {
1043            false
1044        }
1045        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1046
1047        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1048            None
1049        }
1050    }
1051
1052    /// Testing only: create a bogus circuit target
1053    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1054        let mut builder = OwnedCircTarget::builder();
1055        builder
1056            .chan_target()
1057            .ed_identity(id)
1058            .rsa_identity([0x20; 20].into());
1059        builder
1060            .ntor_onion_key([0x33; 32].into())
1061            .protocols("".parse().unwrap())
1062            .build()
1063            .unwrap()
1064    }
1065    /// Testing only: create a bogus channel target
1066    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1067        OwnedChanTarget::builder()
1068            .ed_identity(id)
1069            .rsa_identity([0x20; 20].into())
1070            .build()
1071            .unwrap()
1072    }
1073
1074    async fn run_builder_test(
1075        rt: tor_rtmock::MockRuntime,
1076        advance_initial: Duration,
1077        path: OwnedPath,
1078        advance_on_timeout: Option<(Duration, Duration)>,
1079        usage: ChannelUsage,
1080    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1081        let chanmgr = Arc::new(
1082            ChanMgr::new(
1083                rt.clone(),
1084                Default::default(),
1085                Default::default(),
1086                &Default::default(),
1087                ToplevelAccount::new_noop(),
1088            )
1089            .unwrap(),
1090        );
1091        // always has 3 second timeout, 100 second abandon.
1092        let timeouts = match advance_on_timeout {
1093            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1094            None => TimeoutRecorder::new(rt.clone()),
1095        };
1096        let timeouts = Arc::new(Mutex::new(timeouts));
1097        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1098            rt.clone(),
1099            chanmgr,
1100            timeouts::Estimator::new(Arc::clone(&timeouts)),
1101        );
1102
1103        rt.block_advance("manually controlling advances");
1104        rt.allow_one_advance(advance_initial);
1105        let outcome = rt.spawn_join("build-owned", async move {
1106            let arcbuilder = Arc::new(builder);
1107            let params = exit_circparams_from_netparams(&NetParameters::default())?;
1108            arcbuilder.build_owned(path, &params, gs(), usage).await
1109        });
1110
1111        // Now we wait for a success to finally, finally be reported.
1112        if advance_on_timeout.is_some() {
1113            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1114            rt.spawn_identified("receiver", async move {
1115                receiver.await.unwrap();
1116            });
1117        }
1118        rt.advance_until_stalled().await;
1119
1120        let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1121        let timeouts = timeouts.lock().unwrap().hist.clone();
1122
1123        (circ, timeouts)
1124    }
1125
1126    #[test]
1127    fn build_onehop() {
1128        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1129            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1130            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1131
1132            let (outcome, timeouts) =
1133                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1134            let circ = outcome.unwrap();
1135            assert!(circ.onehop);
1136            assert_eq!(circ.hops.len(), 1);
1137            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1138
1139            assert_eq!(timeouts.len(), 1);
1140            assert!(timeouts[0].0); // success
1141            assert_eq!(timeouts[0].1, 0); // one-hop
1142            assert_eq!(timeouts[0].2, Duration::from_millis(100));
1143        });
1144    }
1145
1146    #[test]
1147    fn build_threehop() {
1148        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1149            let id_100ms =
1150                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1151            let id_200ms =
1152                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1153            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1154            let path =
1155                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1156
1157            let (outcome, timeouts) =
1158                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1159            let circ = outcome.unwrap();
1160            assert!(!circ.onehop);
1161            assert_eq!(circ.hops.len(), 3);
1162            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1163            assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1164            assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1165
1166            assert_eq!(timeouts.len(), 1);
1167            assert!(timeouts[0].0); // success
1168            assert_eq!(timeouts[0].1, 2); // three-hop
1169            assert_eq!(timeouts[0].2, Duration::from_millis(600));
1170        });
1171    }
1172
1173    #[test]
1174    fn build_huge_timeout() {
1175        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1176            let id_100ms =
1177                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1178            let id_200ms =
1179                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1180            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1181
1182            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1183
1184            let (outcome, timeouts) =
1185                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1186            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1187
1188            assert_eq!(timeouts.len(), 1);
1189            assert!(!timeouts[0].0); // timeout
1190
1191            // BUG: Sometimes this is 1 and sometimes this is 2.
1192            // assert_eq!(timeouts[0].1, 2); // at third hop.
1193            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1194        });
1195    }
1196
1197    #[test]
1198    fn build_modest_timeout() {
1199        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1200            let id_100ms =
1201                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1202            let id_200ms =
1203                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1204            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1205
1206            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1207
1208            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1209
1210            let (outcome, timeouts) = run_builder_test(
1211                rt.clone(),
1212                Duration::from_millis(100),
1213                path,
1214                Some(timeout_advance),
1215                CU::UserTraffic,
1216            )
1217            .await;
1218            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1219
1220            assert_eq!(timeouts.len(), 2);
1221            assert!(!timeouts[0].0); // timeout
1222
1223            // BUG: Sometimes this is 1 and sometimes this is 2.
1224            //assert_eq!(timeouts[0].1, 2); // at third hop.
1225            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1226
1227            assert!(timeouts[1].0); // success
1228            assert_eq!(timeouts[1].1, 2); // three-hop
1229            // BUG: This timer is not always reliable, due to races.
1230            //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
1231        });
1232    }
1233}