1use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::Future;
8use oneshot_fused_workaround as oneshot;
9use std::sync::{
10 Arc,
11 atomic::{AtomicU32, Ordering},
12};
13use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
14use tor_error::into_internal;
15use tor_guardmgr::GuardStatus;
16use tor_linkspec::{IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
17use tor_netdir::params::NetParameters;
18use tor_proto::ccparams::{self, AlgorithmType};
19use tor_proto::client::circuit::{CircParameters, PendingClientTunnel};
20use tor_proto::{CellCount, ClientTunnel, FlowCtrlParameters};
21use tor_rtcompat::SpawnExt;
22use tor_rtcompat::{Runtime, SleepProviderExt};
23use tor_units::Percentage;
24use tracing::instrument;
25use web_time_compat::{Duration, Instant};
26
27#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28use tor_guardmgr::vanguards::VanguardMgr;
29
30mod guardstatus;
31
32pub(crate) use guardstatus::GuardStatusHandle;
33
34#[async_trait]
42pub(crate) trait Buildable: Sized {
43 type Chan: Send + Sync;
45
46 async fn open_channel<RT: Runtime>(
49 chanmgr: &ChanMgr<RT>,
50 ct: &OwnedChanTarget,
51 guard_status: &GuardStatusHandle,
52 usage: ChannelUsage,
53 ) -> Result<Arc<Self::Chan>>;
54
55 async fn create_chantarget<RT: Runtime>(
61 chan: Arc<Self::Chan>,
62 rt: &RT,
63 ct: &OwnedChanTarget,
64 params: CircParameters,
65 timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
66 ) -> Result<Self>;
67
68 async fn create<RT: Runtime>(
71 chan: Arc<Self::Chan>,
72 rt: &RT,
73 ct: &OwnedCircTarget,
74 params: CircParameters,
75 timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
76 ) -> Result<Self>;
77
78 async fn extend<RT: Runtime>(
81 &self,
82 rt: &RT,
83 ct: &OwnedCircTarget,
84 params: CircParameters,
85 ) -> Result<()>;
86}
87
88#[instrument(level = "trace", skip_all)]
94async fn create_common<RT: Runtime>(
95 chan: Arc<tor_proto::channel::Channel>,
96 timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
97 rt: &RT,
98) -> Result<PendingClientTunnel> {
99 let (pending_tunnel, reactor) =
101 chan.new_tunnel(timeouts)
102 .await
103 .map_err(|error| Error::Protocol {
104 error,
105 peer: None, action: "initializing circuit",
107 unique_id: None,
108 })?;
109
110 tracing::debug!("Spawning reactor...");
111
112 rt.spawn(async {
113 let _ = reactor.run().await;
114 })
115 .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
116
117 Ok(pending_tunnel)
118}
119
120#[async_trait]
121impl Buildable for ClientTunnel {
122 type Chan = tor_proto::channel::Channel;
123
124 #[instrument(level = "trace", skip_all)]
125 async fn open_channel<RT: Runtime>(
126 chanmgr: &ChanMgr<RT>,
127 target: &OwnedChanTarget,
128 guard_status: &GuardStatusHandle,
129 usage: ChannelUsage,
130 ) -> Result<Arc<Self::Chan>> {
131 guard_status.pending(GuardStatus::Failure);
133
134 let result = chanmgr.get_or_launch(target, usage).await;
136
137 match result {
139 Ok((chan, ChanProvenance::NewlyCreated)) => {
140 guard_status.skew(chan.clock_skew());
141 Ok(chan)
142 }
143 Ok((chan, _)) => Ok(chan),
144 Err(cause) => {
145 if let Some(skew) = cause.clock_skew() {
146 guard_status.skew(skew);
147 }
148 Err(Error::Channel {
149 peer: target.to_logged(),
150 cause,
151 })
152 }
153 }
154 }
155
156 #[instrument(level = "trace", skip_all)]
157 async fn create_chantarget<RT: Runtime>(
158 chan: Arc<Self::Chan>,
159 rt: &RT,
160 ct: &OwnedChanTarget,
161 params: CircParameters,
162 timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
163 ) -> Result<Self> {
164 let pending_tunnel = create_common(chan, timeouts, rt).await?;
165 let unique_id = Some(pending_tunnel.peek_unique_id());
166 pending_tunnel
167 .create_firsthop_fast(params)
168 .await
169 .map_err(|error| Error::Protocol {
170 peer: Some(ct.to_logged()),
171 error,
172 action: "running CREATE_FAST handshake",
173 unique_id,
174 })
175 }
176 #[instrument(level = "trace", skip_all)]
177 async fn create<RT: Runtime>(
178 chan: Arc<Self::Chan>,
179 rt: &RT,
180 ct: &OwnedCircTarget,
181 params: CircParameters,
182 timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
183 ) -> Result<Self> {
184 let pending_tunnel = create_common(chan, timeouts, rt).await?;
185 let unique_id = Some(pending_tunnel.peek_unique_id());
186
187 let handshake_res = pending_tunnel.create_firsthop(ct, params).await;
188 handshake_res.map_err(|error| Error::Protocol {
189 peer: Some(ct.to_logged()),
190 error,
191 action: "creating first hop",
192 unique_id,
193 })
194 }
195 async fn extend<RT: Runtime>(
196 &self,
197 _rt: &RT,
198 ct: &OwnedCircTarget,
199 params: CircParameters,
200 ) -> Result<()> {
201 let circ = self.as_single_circ().map_err(|error| Error::Protocol {
202 peer: Some(ct.to_logged()),
203 error,
204 action: "extend tunnel",
205 unique_id: Some(self.unique_id()),
206 })?;
207
208 let res = circ.extend(ct, params).await;
209 res.map_err(|error| Error::Protocol {
210 error,
211 peer: None,
215 action: "extending circuit",
216 unique_id: Some(self.unique_id()),
217 })
218 }
219}
220
221struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
229 runtime: R,
231 chanmgr: Arc<ChanMgr<R>>,
233 timeouts: Arc<timeouts::Estimator>,
235 _phantom: std::marker::PhantomData<C>,
238}
239
240impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
241 fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
243 Builder {
244 runtime,
245 chanmgr,
246 timeouts: Arc::new(timeouts),
247 _phantom: std::marker::PhantomData,
248 }
249 }
250
251 #[instrument(level = "trace", skip_all)]
262 async fn build_notimeout(
263 self: Arc<Self>,
264 path: OwnedPath,
265 channel: Arc<C::Chan>,
266 params: CircParameters,
267 start_time: Instant,
268 n_hops_built: Arc<AtomicU32>,
269 guard_status: Arc<GuardStatusHandle>,
270 ) -> Result<C> {
271 match path {
272 OwnedPath::ChannelOnly(target) => {
273 let timeouts = Arc::clone(&self.timeouts);
274 let circ =
275 C::create_chantarget(channel, &self.runtime, &target, params, timeouts).await?;
276 self.timeouts
277 .note_hop_completed(0, self.runtime.now() - start_time, true);
278 n_hops_built.fetch_add(1, Ordering::SeqCst);
279 Ok(circ)
280 }
281 OwnedPath::Normal(p) => {
282 assert!(!p.is_empty());
283 let n_hops = p.len() as u8;
284 let timeouts = Arc::clone(&self.timeouts);
285 let circ =
287 C::create(channel, &self.runtime, &p[0], params.clone(), timeouts).await?;
288 self.timeouts
289 .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
290 guard_status.pending(GuardStatus::Indeterminate);
293 n_hops_built.fetch_add(1, Ordering::SeqCst);
294 for (hop_num, relay) in (1..).zip(p[1..].iter()) {
295 circ.extend(&self.runtime, relay, params.clone()).await?;
297 n_hops_built.fetch_add(1, Ordering::SeqCst);
298 self.timeouts.note_hop_completed(
299 hop_num,
300 self.runtime.now() - start_time,
301 hop_num == (n_hops - 1),
302 );
303 }
304 Ok(circ)
305 }
306 }
307 }
308
309 #[instrument(level = "trace", skip_all)]
311 async fn build_owned(
312 self: &Arc<Self>,
313 path: OwnedPath,
314 params: &CircParameters,
315 guard_status: Arc<GuardStatusHandle>,
316 usage: ChannelUsage,
317 ) -> Result<C> {
318 let action = Action::BuildCircuit { length: path.len() };
319 let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
320
321 let hops_built = Arc::new(AtomicU32::new(0));
325
326 let self_clone = Arc::clone(self);
327 let params = params.clone();
328
329 let channel = C::open_channel(
334 &self.chanmgr,
335 path.first_hop_as_chantarget(),
336 guard_status.as_ref(),
337 usage,
338 )
339 .await?;
340
341 let start_time = self.runtime.now();
342
343 let circuit_future = self_clone.build_notimeout(
344 path,
345 channel,
346 params,
347 start_time,
348 Arc::clone(&hops_built),
349 guard_status,
350 );
351
352 match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
353 Ok(circuit) => Ok(circuit),
354 Err(Error::CircTimeout(unique_id)) => {
355 let n_built = hops_built.load(Ordering::SeqCst);
356 self.timeouts
357 .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
358 Err(Error::CircTimeout(unique_id))
359 }
360 Err(e) => Err(e),
361 }
362 }
363
364 pub(crate) fn runtime(&self) -> &R {
366 &self.runtime
367 }
368
369 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
371 &self.timeouts
372 }
373}
374
375pub struct TunnelBuilder<R: Runtime> {
383 builder: Arc<Builder<R, ClientTunnel>>,
385 path_config: tor_config::MutCfg<crate::PathConfig>,
387 storage: crate::TimeoutStateHandle,
389 guardmgr: tor_guardmgr::GuardMgr<R>,
392 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
394 vanguardmgr: Arc<VanguardMgr<R>>,
395}
396
397impl<R: Runtime> TunnelBuilder<R> {
398 pub(crate) fn new(
402 runtime: R,
403 chanmgr: Arc<ChanMgr<R>>,
404 path_config: crate::PathConfig,
405 storage: crate::TimeoutStateHandle,
406 guardmgr: tor_guardmgr::GuardMgr<R>,
407 #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
408 ) -> Self {
409 let timeouts = timeouts::Estimator::from_storage(&storage);
410
411 TunnelBuilder {
412 builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
413 path_config: path_config.into(),
414 storage,
415 guardmgr,
416 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
417 vanguardmgr: Arc::new(vanguardmgr),
418 }
419 }
420
421 pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
423 self.path_config.get()
424 }
425
426 pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
428 self.path_config.replace(new_config);
429 }
430
431 pub(crate) fn save_state(&self) -> Result<bool> {
435 if !self.storage.can_store() {
436 return Ok(false);
437 }
438 self.builder.timeouts.save_state(&self.storage)?;
441 self.guardmgr.store_persistent_state()?;
442 Ok(true)
443 }
444
445 pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
448 self.builder
449 .timeouts
450 .upgrade_to_owning_storage(&self.storage);
451 self.guardmgr.upgrade_to_owned_persistent_state()?;
452 Ok(())
453 }
454
455 #[instrument(level = "trace", skip_all)]
457 pub(crate) fn reload_state(&self) -> Result<()> {
458 if !self.storage.can_store() {
459 self.builder
460 .timeouts
461 .reload_readonly_from_storage(&self.storage);
462 }
463 self.guardmgr.reload_persistent_state()?;
464 Ok(())
465 }
466
467 pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
471 self.builder.timeouts.update_params(p);
472 }
473
474 #[instrument(level = "trace", skip_all)]
476 pub(crate) async fn build_owned(
477 &self,
478 path: OwnedPath,
479 params: &CircParameters,
480 guard_status: Arc<GuardStatusHandle>,
481 usage: ChannelUsage,
482 ) -> Result<ClientTunnel> {
483 self.builder
484 .build_owned(path, params, guard_status, usage)
485 .await
486 }
487
488 #[instrument(level = "trace", skip_all)]
495 pub async fn build(
496 &self,
497 path: &TorPath<'_>,
498 params: &CircParameters,
499 usage: ChannelUsage,
500 ) -> Result<ClientTunnel> {
501 let owned = path.try_into()?;
502 self.build_owned(owned, params, Arc::new(None.into()), usage)
503 .await
504 }
505
506 pub(crate) fn learning_timeouts(&self) -> bool {
508 self.builder.timeouts.learning_timeouts()
509 }
510
511 pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
513 &self.guardmgr
514 }
515
516 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
518 pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
519 &self.vanguardmgr
520 }
521
522 pub(crate) fn runtime(&self) -> &R {
524 self.builder.runtime()
525 }
526
527 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
529 self.builder.estimator()
530 }
531}
532
533#[cfg(feature = "flowctl-cc")]
535fn build_cc_vegas(
536 inp: &NetParameters,
537 vegas_queue_params: ccparams::VegasQueueParams,
538) -> ccparams::Algorithm {
539 ccparams::Algorithm::Vegas(
540 ccparams::VegasParamsBuilder::default()
541 .cell_in_queue_params(vegas_queue_params)
542 .ss_cwnd_max(inp.cc_ss_max.into())
543 .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
544 .cwnd_full_min_pct(Percentage::new(
545 inp.cc_cwnd_full_minpct.as_percent().get() as u32
546 ))
547 .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
548 .build()
549 .expect("Unable to build Vegas params from NetParams"),
550 )
551}
552
553fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
555 ccparams::Algorithm::FixedWindow(build_cc_fixedwindow_params(inp))
556}
557
558fn build_cc_fixedwindow_params(inp: &NetParameters) -> ccparams::FixedWindowParams {
561 ccparams::FixedWindowParamsBuilder::default()
562 .circ_window_start(inp.circuit_window.get() as u16)
563 .circ_window_min(inp.circuit_window.lower() as u16)
564 .circ_window_max(inp.circuit_window.upper() as u16)
565 .build()
566 .expect("Unable to build FixedWindow params from NetParams")
567}
568
569fn circparameters_from_netparameters(
571 inp: &NetParameters,
572 alg: ccparams::Algorithm,
573) -> Result<CircParameters> {
574 let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
575 .cwnd_init(inp.cc_cwnd_init.into())
576 .cwnd_inc_pct_ss(Percentage::new(
577 inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
578 ))
579 .cwnd_inc(inp.cc_cwnd_inc.into())
580 .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
581 .cwnd_min(inp.cc_cwnd_min.into())
582 .cwnd_max(inp.cc_cwnd_max.into())
583 .sendme_inc(inp.cc_sendme_inc.into())
584 .build()
585 .map_err(into_internal!(
586 "Unable to build CongestionWindow params from NetParams"
587 ))?;
588 let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
589 .ewma_cwnd_pct(Percentage::new(
590 inp.cc_ewma_cwnd_pct.as_percent().get() as u32
591 ))
592 .ewma_max(inp.cc_ewma_max.into())
593 .ewma_ss_max(inp.cc_ewma_ss.into())
594 .rtt_reset_pct(Percentage::new(
595 inp.cc_rtt_reset_pct.as_percent().get() as u32
596 ))
597 .build()
598 .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
599 let ccontrol = ccparams::CongestionControlParamsBuilder::default()
600 .alg(alg)
601 .fixed_window_params(build_cc_fixedwindow_params(inp))
602 .cwnd_params(cwnd_params)
603 .rtt_params(rtt_params)
604 .build()
605 .map_err(into_internal!(
606 "Unable to build CongestionControl params from NetParams"
607 ))?;
608 let flow_ctrl_params = FlowCtrlParameters {
609 cc_xoff_client: CellCount::new(inp.cc_xoff_client.get_u32()),
610 cc_xoff_exit: CellCount::new(inp.cc_xoff_exit.get_u32()),
611 cc_xon_rate: CellCount::new(inp.cc_xon_rate.get_u32()),
612 cc_xon_change_pct: inp.cc_xon_change_pct.get_u32(),
613 cc_xon_ewma_cnt: inp.cc_xon_ewma_cnt.get_u32(),
614 };
615 Ok(CircParameters::new(
616 inp.extend_by_ed25519_id.into(),
617 ccontrol,
618 flow_ctrl_params,
619 ))
620}
621
622pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
625 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
626 #[cfg(feature = "flowctl-cc")]
627 AlgorithmType::VEGAS => build_cc_vegas(
628 inp,
629 (
630 inp.cc_vegas_alpha_exit.into(),
631 inp.cc_vegas_beta_exit.into(),
632 inp.cc_vegas_delta_exit.into(),
633 inp.cc_vegas_gamma_exit.into(),
634 inp.cc_vegas_sscap_exit.into(),
635 )
636 .into(),
637 ),
638 _ => build_cc_fixedwindow(inp),
640 };
641 circparameters_from_netparameters(inp, alg)
642}
643
644pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
647 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
648 #[cfg(feature = "flowctl-cc")]
649 AlgorithmType::VEGAS => {
650 build_cc_vegas(
654 inp,
655 (
656 inp.cc_vegas_alpha_onion.into(),
657 inp.cc_vegas_beta_onion.into(),
658 inp.cc_vegas_delta_onion.into(),
659 inp.cc_vegas_gamma_onion.into(),
660 inp.cc_vegas_sscap_onion.into(),
661 )
662 .into(),
663 )
664 }
665 _ => build_cc_fixedwindow(inp),
667 };
668 circparameters_from_netparameters(inp, alg)
669}
670
671async fn double_timeout<R, F, T>(
681 runtime: &R,
682 fut: F,
683 timeout: Duration,
684 abandon: Duration,
685) -> Result<T>
686where
687 R: Runtime,
688 F: Future<Output = Result<T>> + Send + 'static,
689 T: Send + 'static,
690{
691 let (snd, rcv) = oneshot::channel();
692 let rt = runtime.clone();
693 let inner_timeout_future = rt.timeout(abandon, fut);
696 let outer_timeout_future = rt.timeout(timeout, rcv);
697
698 runtime
699 .spawn(async move {
700 let result = inner_timeout_future.await;
701 let _ignore_cancelled_error = snd.send(result);
702 })
703 .map_err(|e| Error::from_spawn("circuit construction task", e))?;
704
705 let outcome = outer_timeout_future.await;
706 outcome
716 .map_err(|_| Error::CircTimeout(None))??
717 .map_err(|_| Error::CircTimeout(None))?
718}
719
720#[cfg(test)]
721mod test {
722 #![allow(clippy::bool_assert_comparison)]
724 #![allow(clippy::clone_on_copy)]
725 #![allow(clippy::dbg_macro)]
726 #![allow(clippy::mixed_attributes_style)]
727 #![allow(clippy::print_stderr)]
728 #![allow(clippy::print_stdout)]
729 #![allow(clippy::single_char_pattern)]
730 #![allow(clippy::unwrap_used)]
731 #![allow(clippy::unchecked_time_subtraction)]
732 #![allow(clippy::useless_vec)]
733 #![allow(clippy::needless_pass_by_value)]
734 use super::*;
736 use crate::timeouts::TimeoutEstimator;
737 use futures::FutureExt;
738 use std::sync::Mutex;
739 use tor_chanmgr::ChannelUsage as CU;
740 use tor_linkspec::ChanTarget;
741 use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
742 use tor_llcrypto::pk::ed25519::Ed25519Identity;
743 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
744 use tor_proto::memquota::ToplevelAccount;
745 use tor_rtcompat::SleepProvider;
746 use tracing::trace;
747
748 fn gs() -> Arc<GuardStatusHandle> {
750 Arc::new(None.into())
751 }
752
753 #[test]
754 fn test_double_timeout() {
756 let t1 = Duration::from_secs(1);
757 let t10 = Duration::from_secs(10);
758 fn duration_close_to(d1: Duration, d2: Duration) -> bool {
760 d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
761 }
762
763 tor_rtmock::MockRuntime::test_with_various(|rto| async move {
764 let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
766 assert!(x.is_ok());
767 assert_eq!(x.unwrap(), 3_u32);
768
769 trace!("acquiesce after test1");
770 #[allow(clippy::clone_on_copy)]
771 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
773
774 let rt_clone = rt.clone();
776 rt_clone.block_advance("manually controlling advances");
778 let x = rt
779 .wait_for(double_timeout(
780 &rt,
781 async move {
782 let sl = rt_clone.sleep(Duration::from_millis(100));
783 rt_clone.allow_one_advance(Duration::from_millis(100));
784 sl.await;
785 Ok(4_u32)
786 },
787 t1,
788 t10,
789 ))
790 .await;
791 assert!(x.is_ok());
792 assert_eq!(x.unwrap(), 4_u32);
793
794 trace!("acquiesce after test2");
795 #[allow(clippy::clone_on_copy)]
796 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
798
799 let rt_clone = rt.clone();
802 let (snd, rcv) = oneshot::channel();
803 let start = rt.now();
804 rt.block_advance("manually controlling advances");
805 let x = rt
806 .wait_for(double_timeout(
807 &rt,
808 async move {
809 let sl = rt_clone.sleep(Duration::from_secs(2));
810 rt_clone.allow_one_advance(Duration::from_secs(2));
811 sl.await;
812 snd.send(()).unwrap();
813 Ok(4_u32)
814 },
815 t1,
816 t10,
817 ))
818 .await;
819 assert!(matches!(x, Err(Error::CircTimeout(_))));
820 let end = rt.now();
821 assert!(duration_close_to(end - start, Duration::from_secs(1)));
822 let waited = rt.wait_for(rcv).await;
823 assert_eq!(waited, Ok(()));
824
825 trace!("acquiesce after test3");
826 #[allow(clippy::clone_on_copy)]
827 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
829
830 let rt_clone = rt.clone();
832 rt.block_advance("manually controlling advances");
833 let (snd, rcv) = oneshot::channel();
834 let start = rt.now();
835 rt.allow_one_advance(Duration::from_secs(1));
837 let x = rt
838 .wait_for(double_timeout(
839 &rt,
840 async move {
841 rt_clone.sleep(Duration::from_secs(30)).await;
842 snd.send(()).unwrap();
843 Ok(4_u32)
844 },
845 t1,
846 t10,
847 ))
848 .await;
849 assert!(matches!(x, Err(Error::CircTimeout(_))));
850 let end = rt.now();
851 rt.allow_one_advance(Duration::from_secs(9));
853 let waited = rt.wait_for(rcv).await;
854 assert!(waited.is_err());
855 let end2 = rt.now();
856 assert!(duration_close_to(end - start, Duration::from_secs(1)));
857 assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
858 });
859 }
860
861 fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
870 let mut be = [0; 8];
871 be[..].copy_from_slice(&id.as_bytes()[0..8]);
872 let dur = u64::from_be_bytes(be);
873 be[..].copy_from_slice(&id.as_bytes()[8..16]);
874 let dur2 = u64::from_be_bytes(be);
875 (Duration::from_millis(dur), Duration::from_millis(dur2))
876 }
877 fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
886 let mut bytes = [0; 32];
887 let dur = (d1.as_millis() as u64).to_be_bytes();
888 bytes[0..8].copy_from_slice(&dur);
889 let dur = (d2.as_millis() as u64).to_be_bytes();
890 bytes[8..16].copy_from_slice(&dur);
891 bytes.into()
892 }
893
894 fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
897 let ed_id = ct
900 .identity(RelayIdType::Ed25519)
901 .expect("No ed25519 key was present for fake ChanTarget‽")
902 .try_into()
903 .expect("ChanTarget provided wrong key type");
904 timeouts_from_key(ed_id)
905 }
906
907 #[derive(Debug, Clone)]
909 struct FakeCirc {
910 hops: Vec<RelayIds>,
911 onehop: bool,
912 }
913 #[async_trait]
914 impl Buildable for Mutex<FakeCirc> {
915 type Chan = ();
916
917 async fn open_channel<RT: Runtime>(
918 _chanmgr: &ChanMgr<RT>,
919 _ct: &OwnedChanTarget,
920 _guard_status: &GuardStatusHandle,
921 _usage: ChannelUsage,
922 ) -> Result<Arc<Self::Chan>> {
923 Ok(Arc::new(()))
924 }
925
926 async fn create_chantarget<RT: Runtime>(
927 _: Arc<Self::Chan>,
928 rt: &RT,
929 ct: &OwnedChanTarget,
930 _: CircParameters,
931 _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
932 ) -> Result<Self> {
933 let (d1, d2) = timeouts_from_chantarget(ct);
934 rt.sleep(d1).await;
935 if !d2.is_zero() {
936 rt.allow_one_advance(d2);
937 }
938
939 let c = FakeCirc {
940 hops: vec![RelayIds::from_relay_ids(ct)],
941 onehop: true,
942 };
943 Ok(Mutex::new(c))
944 }
945 async fn create<RT: Runtime>(
946 _: Arc<Self::Chan>,
947 rt: &RT,
948 ct: &OwnedCircTarget,
949 _: CircParameters,
950 _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
951 ) -> Result<Self> {
952 let (d1, d2) = timeouts_from_chantarget(ct);
953 rt.sleep(d1).await;
954 if !d2.is_zero() {
955 rt.allow_one_advance(d2);
956 }
957
958 let c = FakeCirc {
959 hops: vec![RelayIds::from_relay_ids(ct)],
960 onehop: false,
961 };
962 Ok(Mutex::new(c))
963 }
964 async fn extend<RT: Runtime>(
965 &self,
966 rt: &RT,
967 ct: &OwnedCircTarget,
968 _: CircParameters,
969 ) -> Result<()> {
970 let (d1, d2) = timeouts_from_chantarget(ct);
971 rt.sleep(d1).await;
972 if !d2.is_zero() {
973 rt.allow_one_advance(d2);
974 }
975
976 {
977 let mut c = self.lock().unwrap();
978 c.hops.push(RelayIds::from_relay_ids(ct));
979 }
980 Ok(())
981 }
982 }
983
984 struct TimeoutRecorder<R> {
986 runtime: R,
987 hist: Vec<(bool, u8, Duration)>,
988 on_timeout: Duration,
990 on_success: Duration,
992
993 snd_success: Option<oneshot::Sender<()>>,
994 rcv_success: Option<oneshot::Receiver<()>>,
995 }
996
997 impl<R> TimeoutRecorder<R> {
998 fn new(runtime: R) -> Self {
999 Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
1000 }
1001
1002 fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
1003 let (snd_success, rcv_success) = oneshot::channel();
1004 Self {
1005 runtime,
1006 hist: Vec::new(),
1007 on_timeout,
1008 on_success,
1009 rcv_success: Some(rcv_success),
1010 snd_success: Some(snd_success),
1011 }
1012 }
1013 }
1014 impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
1015 fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
1016 if !is_last {
1017 return;
1018 }
1019 let (rt, advance) = {
1020 let mut this = self.lock().unwrap();
1021 this.hist.push((true, hop, delay));
1022 let _ = this.snd_success.take().unwrap().send(());
1023 (this.runtime.clone(), this.on_success)
1024 };
1025 if !advance.is_zero() {
1026 rt.allow_one_advance(advance);
1027 }
1028 }
1029 fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
1030 let (rt, advance) = {
1031 let mut this = self.lock().unwrap();
1032 this.hist.push((false, hop, delay));
1033 (this.runtime.clone(), this.on_timeout)
1034 };
1035 if !advance.is_zero() {
1036 rt.allow_one_advance(advance);
1037 }
1038 }
1039 fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1040 (Duration::from_secs(3), Duration::from_secs(100))
1041 }
1042 fn learning_timeouts(&self) -> bool {
1043 false
1044 }
1045 fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1046
1047 fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1048 None
1049 }
1050 }
1051
1052 fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1054 let mut builder = OwnedCircTarget::builder();
1055 builder
1056 .chan_target()
1057 .ed_identity(id)
1058 .rsa_identity([0x20; 20].into());
1059 builder
1060 .ntor_onion_key([0x33; 32].into())
1061 .protocols("".parse().unwrap())
1062 .build()
1063 .unwrap()
1064 }
1065 fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1067 OwnedChanTarget::builder()
1068 .ed_identity(id)
1069 .rsa_identity([0x20; 20].into())
1070 .build()
1071 .unwrap()
1072 }
1073
1074 async fn run_builder_test(
1075 rt: tor_rtmock::MockRuntime,
1076 advance_initial: Duration,
1077 path: OwnedPath,
1078 advance_on_timeout: Option<(Duration, Duration)>,
1079 usage: ChannelUsage,
1080 ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1081 let chanmgr = Arc::new(
1082 ChanMgr::new(
1083 rt.clone(),
1084 Default::default(),
1085 Default::default(),
1086 &Default::default(),
1087 ToplevelAccount::new_noop(),
1088 )
1089 .unwrap(),
1090 );
1091 let timeouts = match advance_on_timeout {
1093 Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1094 None => TimeoutRecorder::new(rt.clone()),
1095 };
1096 let timeouts = Arc::new(Mutex::new(timeouts));
1097 let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1098 rt.clone(),
1099 chanmgr,
1100 timeouts::Estimator::new(Arc::clone(&timeouts)),
1101 );
1102
1103 rt.block_advance("manually controlling advances");
1104 rt.allow_one_advance(advance_initial);
1105 let outcome = rt.spawn_join("build-owned", async move {
1106 let arcbuilder = Arc::new(builder);
1107 let params = exit_circparams_from_netparams(&NetParameters::default())?;
1108 arcbuilder.build_owned(path, ¶ms, gs(), usage).await
1109 });
1110
1111 if advance_on_timeout.is_some() {
1113 let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1114 rt.spawn_identified("receiver", async move {
1115 receiver.await.unwrap();
1116 });
1117 }
1118 rt.advance_until_stalled().await;
1119
1120 let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1121 let timeouts = timeouts.lock().unwrap().hist.clone();
1122
1123 (circ, timeouts)
1124 }
1125
1126 #[test]
1127 fn build_onehop() {
1128 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1129 let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1130 let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1131
1132 let (outcome, timeouts) =
1133 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1134 let circ = outcome.unwrap();
1135 assert!(circ.onehop);
1136 assert_eq!(circ.hops.len(), 1);
1137 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1138
1139 assert_eq!(timeouts.len(), 1);
1140 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 0); assert_eq!(timeouts[0].2, Duration::from_millis(100));
1143 });
1144 }
1145
1146 #[test]
1147 fn build_threehop() {
1148 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1149 let id_100ms =
1150 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1151 let id_200ms =
1152 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1153 let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1154 let path =
1155 OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1156
1157 let (outcome, timeouts) =
1158 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1159 let circ = outcome.unwrap();
1160 assert!(!circ.onehop);
1161 assert_eq!(circ.hops.len(), 3);
1162 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1163 assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1164 assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1165
1166 assert_eq!(timeouts.len(), 1);
1167 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 2); assert_eq!(timeouts[0].2, Duration::from_millis(600));
1170 });
1171 }
1172
1173 #[test]
1174 fn build_huge_timeout() {
1175 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1176 let id_100ms =
1177 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1178 let id_200ms =
1179 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1180 let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1181
1182 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1183
1184 let (outcome, timeouts) =
1185 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1186 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1187
1188 assert_eq!(timeouts.len(), 1);
1189 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1194 });
1195 }
1196
1197 #[test]
1198 fn build_modest_timeout() {
1199 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1200 let id_100ms =
1201 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1202 let id_200ms =
1203 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1204 let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1205
1206 let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1207
1208 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1209
1210 let (outcome, timeouts) = run_builder_test(
1211 rt.clone(),
1212 Duration::from_millis(100),
1213 path,
1214 Some(timeout_advance),
1215 CU::UserTraffic,
1216 )
1217 .await;
1218 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1219
1220 assert_eq!(timeouts.len(), 2);
1221 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1226
1227 assert!(timeouts[1].0); assert_eq!(timeouts[1].1, 2); });
1232 }
1233}