1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
51
52use build::TunnelBuilder;
53use mgr::{AbstractTunnel, AbstractTunnelBuilder};
54use tor_basic_utils::retry::RetryDelay;
55use tor_chanmgr::ChanMgr;
56use tor_dircommon::fallback::FallbackList;
57use tor_error::{error_report, warn_report};
58use tor_guardmgr::RetireCircuits;
59use tor_linkspec::ChanTarget;
60use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
61use tor_proto::circuit::UniqId;
62use tor_proto::client::circuit::CircParameters;
63use tor_rtcompat::Runtime;
64
65#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
66use tor_linkspec::IntoOwnedChanTarget;
67
68use futures::StreamExt;
69use std::sync::{Arc, Mutex, Weak};
70use std::time::Duration;
71use tor_rtcompat::SpawnExt;
72use tracing::{debug, info, instrument, trace, warn};
73
74#[cfg(feature = "testing")]
75pub use config::test_config::TestConfig;
76
77pub mod build;
78mod config;
79mod err;
80#[cfg(feature = "hs-common")]
81pub mod hspool;
82mod impls;
83pub mod isolation;
84mod mgr;
85#[cfg(test)]
86mod mocks;
87mod preemptive;
88pub mod timeouts;
89mod tunnel;
90mod usage;
91
92cfg_if::cfg_if! {
94 if #[cfg(feature = "experimental-api")] {
95 pub mod path;
96 } else {
97 pub(crate) mod path;
98 }
99}
100
101pub use err::Error;
102pub use isolation::IsolationToken;
103pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
104pub use tunnel::{
105 ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
106 ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
107 ServiceOnionServiceIntroTunnel,
108};
109#[cfg(feature = "conflux")]
110pub use tunnel::{
111 ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
112 ServiceMultiPathOnionServiceDataTunnel,
113};
114pub use usage::{TargetPort, TargetPorts};
115
116pub use config::{
117 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
118 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
119};
120
121use crate::isolation::StreamIsolation;
122use crate::mgr::TunnelProvenance;
123use crate::preemptive::PreemptiveCircuitPredictor;
124use usage::TargetTunnelUsage;
125
126use safelog::sensitive as sv;
127#[cfg(feature = "geoip")]
128use tor_geoip::CountryCode;
129pub use tor_guardmgr::{ExternalActivity, FirstHopId};
130use tor_persist::StateMgr;
131use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
132
133#[cfg(feature = "hs-common")]
134use crate::hspool::{HsCircKind, HsCircStemKind};
135#[cfg(all(feature = "vanguards", feature = "hs-common"))]
136use tor_guardmgr::vanguards::VanguardMgr;
137
138pub type Result<T> = std::result::Result<T, Error>;
140
141type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
143
144const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
146
147#[derive(Debug, Copy, Clone)]
155#[non_exhaustive]
156pub enum DirInfo<'a> {
157 Fallbacks(&'a FallbackList),
159 Directory(&'a NetDir),
161 Nothing,
164}
165
166impl<'a> From<&'a FallbackList> for DirInfo<'a> {
167 fn from(v: &'a FallbackList) -> DirInfo<'a> {
168 DirInfo::Fallbacks(v)
169 }
170}
171impl<'a> From<&'a NetDir> for DirInfo<'a> {
172 fn from(v: &'a NetDir) -> DirInfo<'a> {
173 DirInfo::Directory(v)
174 }
175}
176impl<'a> DirInfo<'a> {
177 fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
179 use tor_netdir::params::NetParameters;
180 let defaults = NetParameters::default();
183 let net_params = match self {
184 DirInfo::Directory(d) => d.params(),
185 _ => &defaults,
186 };
187 match usage {
188 #[cfg(feature = "hs-common")]
189 TargetTunnelUsage::HsCircBase { .. } => {
190 build::onion_circparams_from_netparams(net_params)
191 }
192 _ => build::exit_circparams_from_netparams(net_params),
193 }
194 }
195}
196
197pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
207
208impl<R: Runtime> CircMgr<R> {
209 pub fn new<SM, CFG: CircMgrConfig>(
215 config: &CFG,
216 storage: SM,
217 runtime: &R,
218 chanmgr: Arc<ChanMgr<R>>,
219 guardmgr: &tor_guardmgr::GuardMgr<R>,
220 ) -> Result<Self>
221 where
222 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
223 {
224 Ok(Self(Arc::new(CircMgrInner::new(
225 config, storage, runtime, chanmgr, guardmgr,
226 )?)))
227 }
228
229 #[instrument(level = "trace", skip_all)]
232 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
233 let tunnel = self.0.get_or_launch_dir(netdir).await?;
234 Ok(tunnel.into())
235 }
236
237 #[instrument(level = "trace", skip_all)]
243 pub async fn get_or_launch_exit(
244 &self,
245 netdir: DirInfo<'_>, ports: &[TargetPort],
247 isolation: StreamIsolation,
248 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
251 ) -> Result<ClientDataTunnel> {
252 let tunnel = self
253 .0
254 .get_or_launch_exit(
255 netdir,
256 ports,
257 isolation,
258 #[cfg(feature = "geoip")]
259 country_code,
260 )
261 .await?;
262 Ok(tunnel.into())
263 }
264
265 #[cfg(feature = "specific-relay")]
270 #[instrument(level = "trace", skip_all)]
271 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
272 &self,
273 target: T,
274 ) -> Result<ClientDirTunnel> {
275 let tunnel = self.0.get_or_launch_dir_specific(target).await?;
276 Ok(tunnel.into())
277 }
278
279 #[instrument(level = "trace", skip_all)]
285 pub fn launch_background_tasks<D, S>(
286 self: &Arc<Self>,
287 runtime: &R,
288 dir_provider: &Arc<D>,
289 state_mgr: S,
290 ) -> Result<Vec<TaskHandle>>
291 where
292 D: NetDirProvider + 'static + ?Sized,
293 S: StateMgr + std::marker::Send + 'static,
294 {
295 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
296 }
297
298 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
304 self.0.netdir_is_sufficient(netdir)
305 }
306
307 pub fn retire_circ(&self, circ_id: &UniqId) {
310 self.0.retire_circ(circ_id);
311 }
312
313 pub fn note_external_failure(
317 &self,
318 target: &impl ChanTarget,
319 external_failure: ExternalActivity,
320 ) {
321 self.0.note_external_failure(target, external_failure);
322 }
323
324 pub fn note_external_success(
327 &self,
328 target: &impl ChanTarget,
329 external_activity: ExternalActivity,
330 ) {
331 self.0.note_external_success(target, external_activity);
332 }
333
334 pub fn skew_events(&self) -> ClockSkewEvents {
342 self.0.skew_events()
343 }
344
345 #[instrument(level = "trace", skip_all)]
351 pub fn reconfigure<CFG: CircMgrConfig>(
352 &self,
353 new_config: &CFG,
354 how: tor_config::Reconfigure,
355 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
356 self.0.reconfigure(new_config, how)
357 }
358
359 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
385 self.0.estimate_timeout(timeout_action)
386 }
387
388 #[cfg(feature = "experimental-api")]
391 pub fn builder(&self) -> &TunnelBuilder<R> {
392 CircMgrInner::builder(&self.0)
393 }
394}
395
396#[derive(Clone)]
398pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
399 mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
401 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
403}
404
405impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
406 #[allow(clippy::unnecessary_wraps)]
412 pub(crate) fn new<SM, CFG: CircMgrConfig>(
413 config: &CFG,
414 storage: SM,
415 runtime: &R,
416 chanmgr: Arc<ChanMgr<R>>,
417 guardmgr: &tor_guardmgr::GuardMgr<R>,
418 ) -> Result<Self>
419 where
420 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
421 {
422 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423 let vanguardmgr = {
424 let has_onion_svc = false;
429 VanguardMgr::new(
430 config.vanguard_config(),
431 runtime.clone(),
432 storage.clone(),
433 has_onion_svc,
434 )?
435 };
436
437 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
438
439 let builder = build::TunnelBuilder::new(
440 runtime.clone(),
441 chanmgr,
442 config.path_rules().clone(),
443 storage_handle,
444 guardmgr.clone(),
445 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
446 vanguardmgr,
447 );
448
449 Ok(Self::new_generic(config, runtime, guardmgr, builder))
450 }
451}
452
453impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
454 pub(crate) fn new_generic<CFG: CircMgrConfig>(
456 config: &CFG,
457 runtime: &R,
458 guardmgr: &tor_guardmgr::GuardMgr<R>,
459 builder: B,
460 ) -> Self {
461 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
462 config.preemptive_circuits().clone(),
463 )));
464
465 guardmgr.set_filter(config.path_rules().build_guard_filter());
466
467 let mgr =
468 mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
469
470 CircMgrInner {
471 mgr: Arc::new(mgr),
472 predictor: preemptive,
473 }
474 }
475
476 #[instrument(level = "trace", skip_all)]
482 pub(crate) fn launch_background_tasks<D, S>(
483 self: &Arc<Self>,
484 runtime: &R,
485 dir_provider: &Arc<D>,
486 state_mgr: S,
487 ) -> Result<Vec<TaskHandle>>
488 where
489 D: NetDirProvider + 'static + ?Sized,
490 S: StateMgr + std::marker::Send + 'static,
491 {
492 let mut ret = vec![];
493
494 runtime
495 .spawn(Self::keep_circmgr_params_updated(
496 dir_provider.events(),
497 Arc::downgrade(self),
498 Arc::downgrade(dir_provider),
499 ))
500 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
501
502 let (sched, handle) = TaskSchedule::new(runtime.clone());
503 ret.push(handle);
504
505 runtime
506 .spawn(Self::update_persistent_state(
507 sched,
508 Arc::downgrade(self),
509 state_mgr,
510 ))
511 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
512
513 let (sched, handle) = TaskSchedule::new(runtime.clone());
514 ret.push(handle);
515
516 runtime
517 .spawn(Self::continually_launch_timeout_testing_circuits(
518 sched,
519 Arc::downgrade(self),
520 Arc::downgrade(dir_provider),
521 ))
522 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
523
524 let (sched, handle) = TaskSchedule::new(runtime.clone());
525 ret.push(handle);
526
527 runtime
528 .spawn(Self::continually_preemptively_build_circuits(
529 sched,
530 Arc::downgrade(self),
531 Arc::downgrade(dir_provider),
532 ))
533 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
534
535 self.mgr
536 .peek_builder()
537 .guardmgr()
538 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
539
540 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
541 {
542 let () = self
543 .mgr
544 .peek_builder()
545 .vanguardmgr()
546 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
547 }
548
549 Ok(ret)
550 }
551
552 #[instrument(level = "trace", skip_all)]
555 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
556 self.expire_circuits().await;
557 let usage = TargetTunnelUsage::Dir;
558 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
559 }
560
561 #[instrument(level = "trace", skip_all)]
567 pub(crate) async fn get_or_launch_exit(
568 &self,
569 netdir: DirInfo<'_>, ports: &[TargetPort],
571 isolation: StreamIsolation,
572 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
575 ) -> Result<Arc<B::Tunnel>> {
576 self.expire_circuits().await;
577 let time = self.mgr.peek_runtime().now();
578 {
579 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
580 if ports.is_empty() {
581 predictive.note_usage(None, time);
582 } else {
583 for port in ports.iter() {
584 predictive.note_usage(Some(*port), time);
585 }
586 }
587 }
588 let require_stability = ports.iter().any(|p| {
589 self.mgr
590 .peek_builder()
591 .path_config()
592 .long_lived_ports
593 .contains(&p.port)
594 });
595 let ports = ports.iter().map(Clone::clone).collect();
596 #[cfg(not(feature = "geoip"))]
597 let country_code = None;
598 let usage = TargetTunnelUsage::Exit {
599 ports,
600 isolation,
601 country_code,
602 require_stability,
603 };
604 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
605 }
606
607 #[cfg(feature = "specific-relay")]
612 #[instrument(level = "trace", skip_all)]
613 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
614 &self,
615 target: T,
616 ) -> Result<Arc<B::Tunnel>> {
617 self.expire_circuits().await;
618 let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
619 self.mgr
620 .get_or_launch(&usage, DirInfo::Nothing)
621 .await
622 .map(|(c, _)| c)
623 }
624
625 #[instrument(level = "trace", skip_all)]
631 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
632 &self,
633 new_config: &CFG,
634 how: tor_config::Reconfigure,
635 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
636 let old_path_rules = self.mgr.peek_builder().path_config();
637 let predictor = self.predictor.lock().expect("poisoned lock");
638 let preemptive_circuits = predictor.config();
639 if preemptive_circuits.initial_predicted_ports
640 != new_config.preemptive_circuits().initial_predicted_ports
641 {
642 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
644 }
645
646 if how == tor_config::Reconfigure::CheckAllOrNothing {
647 return Ok(RetireCircuits::None);
648 }
649
650 let retire_because_of_guardmgr =
651 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
652
653 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
654 let retire_because_of_vanguardmgr = self
655 .mgr
656 .peek_builder()
657 .vanguardmgr()
658 .reconfigure(new_config.vanguard_config())?;
659
660 let new_reachable = &new_config.path_rules().reachable_addrs;
661 if new_reachable != &old_path_rules.reachable_addrs {
662 let filter = new_config.path_rules().build_guard_filter();
663 self.mgr.peek_builder().guardmgr().set_filter(filter);
664 }
665
666 let discard_all_circuits = !new_config
667 .path_rules()
668 .at_least_as_permissive_as(&old_path_rules)
669 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
670
671 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
672 let discard_all_circuits = discard_all_circuits
673 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
674
675 self.mgr
676 .peek_builder()
677 .set_path_config(new_config.path_rules().clone());
678 self.mgr
679 .set_circuit_timing(new_config.circuit_timing().clone());
680 predictor.set_config(new_config.preemptive_circuits().clone());
681
682 if discard_all_circuits {
683 info!("Path configuration has become more restrictive: retiring existing circuits.");
687 self.retire_all_circuits();
688 return Ok(RetireCircuits::All);
689 }
690 Ok(RetireCircuits::None)
691 }
692
693 #[instrument(level = "trace", skip_all)]
701 async fn keep_circmgr_params_updated<D>(
702 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
703 circmgr: Weak<Self>,
704 dirmgr: Weak<D>,
705 ) where
706 D: NetDirProvider + 'static + ?Sized,
707 {
708 use DirEvent::*;
709 while let Some(event) = events.next().await {
710 if matches!(event, NewConsensus) {
711 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
712 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
713 cm.update_network_parameters(netdir.params());
714 }
715 } else {
716 debug!("Circmgr or dirmgr has disappeared; task exiting.");
717 break;
718 }
719 }
720 }
721 }
722
723 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
726 self.mgr.update_network_parameters(p);
727 self.mgr.peek_builder().update_network_parameters(p);
728 }
729
730 #[instrument(level = "trace", skip_all)]
737 async fn continually_launch_timeout_testing_circuits<D>(
738 mut sched: TaskSchedule<R>,
739 circmgr: Weak<Self>,
740 dirmgr: Weak<D>,
741 ) where
742 D: NetDirProvider + 'static + ?Sized,
743 {
744 while sched.next().await.is_some() {
745 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
746 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
747 if let Err(e) = cm
748 .launch_timeout_testing_circuit_if_appropriate(&netdir)
749 .await
750 {
751 warn_report!(e, "Problem launching a timeout testing circuit");
752 }
753 let delay = netdir
754 .params()
755 .cbt_testing_delay
756 .try_into()
757 .expect("Out-of-bounds value from BoundedInt32");
758
759 drop((cm, dm));
760 sched.fire_in(delay);
761 } else {
762 let _ = dm.events().next().await;
765 sched.fire();
766 }
767 } else {
768 return;
769 }
770 }
771 }
772
773 async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
781 if !self.mgr.peek_builder().learning_timeouts() {
782 return Ok(());
783 }
784 self.expire_circuits().await;
787 let max_circs: u64 = netdir
788 .params()
789 .cbt_max_open_circuits_for_testing
790 .try_into()
791 .expect("Out-of-bounds result from BoundedInt32");
792 if (self.mgr.n_tunnels() as u64) < max_circs {
793 let usage = TargetTunnelUsage::TimeoutTesting;
795 let dirinfo = netdir.into();
796 let mgr = Arc::clone(&self.mgr);
797 debug!("Launching a circuit to test build times.");
798 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
799 drop(receiver);
802 }
803
804 Ok(())
805 }
806
807 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
815 async fn update_persistent_state<S>(
816 mut sched: TaskSchedule<R>,
817 circmgr: Weak<Self>,
818 statemgr: S,
819 ) where
820 S: StateMgr + std::marker::Send,
821 {
822 while sched.next().await.is_some() {
823 if let Some(circmgr) = Weak::upgrade(&circmgr) {
824 use tor_persist::LockStatus::*;
825
826 match statemgr.try_lock() {
827 Err(e) => {
828 error_report!(e, "Problem with state lock file");
829 break;
830 }
831 Ok(NewlyAcquired) => {
832 info!("We now own the lock on our state files.");
833 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
834 error_report!(e, "Unable to upgrade to owned state files");
835 break;
836 }
837 }
838 Ok(AlreadyHeld) => {
839 if let Err(e) = circmgr.store_persistent_state() {
840 error_report!(e, "Unable to flush circmgr state");
841 break;
842 }
843 }
844 Ok(NoLock) => {
845 if let Err(e) = circmgr.reload_persistent_state() {
846 error_report!(e, "Unable to reload circmgr state");
847 break;
848 }
849 }
850 }
851 } else {
852 debug!("Circmgr has disappeared; task exiting.");
853 return;
854 }
855 sched.fire_in(Duration::from_secs(60));
862 }
863
864 debug!("State update task exiting (potentially due to handle drop).");
865 }
866
867 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
871 self.mgr.peek_builder().upgrade_to_owned_state()?;
872 Ok(())
873 }
874
875 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
880 self.mgr.peek_builder().reload_state()?;
881 Ok(())
882 }
883
884 #[instrument(level = "trace", skip_all)]
896 async fn continually_preemptively_build_circuits<D>(
897 mut sched: TaskSchedule<R>,
898 circmgr: Weak<Self>,
899 dirmgr: Weak<D>,
900 ) where
901 D: NetDirProvider + 'static + ?Sized,
902 {
903 let base_delay = Duration::from_secs(10);
904 let mut retry = RetryDelay::from_duration(base_delay);
905
906 while sched.next().await.is_some() {
907 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
908 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
909 let result = cm
910 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
911 .await;
912
913 let delay = match result {
914 Ok(()) => {
915 retry.reset();
916 base_delay
917 }
918 Err(_) => retry.next_delay(&mut rand::rng()),
919 };
920
921 sched.fire_in(delay);
922 } else {
923 let _ = dm.events().next().await;
926 sched.fire();
927 }
928 } else {
929 return;
930 }
931 }
932 }
933
934 #[allow(clippy::cognitive_complexity)]
942 #[instrument(level = "trace", skip_all)]
943 async fn launch_circuits_preemptively(
944 &self,
945 netdir: DirInfo<'_>,
946 ) -> std::result::Result<(), err::PreemptiveCircError> {
947 trace!("Checking preemptive circuit predictions.");
948 let (circs, threshold) = {
949 let path_config = self.mgr.peek_builder().path_config();
950 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
951 let threshold = preemptive.config().disable_at_threshold;
952 (preemptive.predict(&path_config), threshold)
953 };
954
955 if self.mgr.n_tunnels() >= threshold {
956 return Ok(());
957 }
958 let mut n_created = 0_usize;
959 let mut n_errors = 0_usize;
960
961 let futures = circs
962 .iter()
963 .map(|usage| self.mgr.get_or_launch(usage, netdir));
964 let results = futures::future::join_all(futures).await;
965 for (i, result) in results.into_iter().enumerate() {
966 match result {
967 Ok((t, TunnelProvenance::NewlyCreated)) => {
968 debug!(
969 tunnel_id=%t.unique_id(),
970 "Preemptive circuit was created for {:?}", circs[i]);
971 n_created += 1;
972 }
973 Ok((t, TunnelProvenance::Preexisting)) => {
974 trace!(
975 tunnel_id=%t.unique_id(),
976 "Circuit already existed created for {:?}", circs[i]);
977 }
978 Err(e) => {
979 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
980 n_errors += 1;
981 }
982 }
983 }
984
985 if n_created > 0 || n_errors == 0 {
986 Ok(())
990 } else {
991 Err(err::PreemptiveCircError)
994 }
995 }
996
997 #[cfg(feature = "hs-common")]
1014 #[instrument(level = "trace", skip_all)]
1015 pub(crate) async fn launch_hs_unmanaged<T>(
1016 &self,
1017 planned_target: Option<T>,
1018 dir: &NetDir,
1019 stem_kind: HsCircStemKind,
1020 circ_kind: Option<HsCircKind>,
1021 ) -> Result<B::Tunnel>
1022 where
1023 T: IntoOwnedChanTarget,
1024 {
1025 let usage = TargetTunnelUsage::HsCircBase {
1026 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1027 stem_kind,
1028 circ_kind,
1029 };
1030 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1031 Ok(client_circ)
1032 }
1033
1034 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1040 self.mgr
1041 .peek_builder()
1042 .guardmgr()
1043 .netdir_is_sufficient(netdir)
1044 }
1045
1046 pub(crate) fn estimate_timeout(
1048 &self,
1049 timeout_action: &timeouts::Action,
1050 ) -> std::time::Duration {
1051 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1052 timeout
1053 }
1054
1055 pub(crate) fn builder(&self) -> &B {
1057 self.mgr.peek_builder()
1058 }
1059
1060 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1065 self.mgr.peek_builder().save_state()
1066 }
1067
1068 async fn expire_circuits(&self) {
1073 let now = self.mgr.peek_runtime().now();
1079
1080 let _next_expiration = self.mgr.expire_tunnels(now).await;
1082 }
1083
1084 pub(crate) fn retire_all_circuits(&self) {
1092 self.mgr.retire_all_tunnels();
1093 }
1094
1095 pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1098 let _ = self.mgr.take_tunnel(circ_id);
1099 }
1100
1101 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1109 self.mgr.peek_builder().guardmgr().skew_events()
1110 }
1111
1112 pub(crate) fn note_external_failure(
1116 &self,
1117 target: &impl ChanTarget,
1118 external_failure: ExternalActivity,
1119 ) {
1120 self.mgr
1121 .peek_builder()
1122 .guardmgr()
1123 .note_external_failure(target, external_failure);
1124 }
1125
1126 pub(crate) fn note_external_success(
1129 &self,
1130 target: &impl ChanTarget,
1131 external_activity: ExternalActivity,
1132 ) {
1133 self.mgr
1134 .peek_builder()
1135 .guardmgr()
1136 .note_external_success(target, external_activity);
1137 }
1138}
1139
1140impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1141 fn drop(&mut self) {
1142 match self.store_persistent_state() {
1143 Ok(true) => info!("Flushed persistent state at exit."),
1144 Ok(false) => debug!("Lock not held; no state to flush."),
1145 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1146 }
1147 }
1148}
1149
1150#[cfg(test)]
1151mod test {
1152 #![allow(clippy::bool_assert_comparison)]
1154 #![allow(clippy::clone_on_copy)]
1155 #![allow(clippy::dbg_macro)]
1156 #![allow(clippy::mixed_attributes_style)]
1157 #![allow(clippy::print_stderr)]
1158 #![allow(clippy::print_stdout)]
1159 #![allow(clippy::single_char_pattern)]
1160 #![allow(clippy::unwrap_used)]
1161 #![allow(clippy::unchecked_time_subtraction)]
1162 #![allow(clippy::useless_vec)]
1163 #![allow(clippy::needless_pass_by_value)]
1164 use mocks::FakeBuilder;
1166 use tor_guardmgr::GuardMgr;
1167 use tor_linkspec::OwnedChanTarget;
1168 use tor_netdir::testprovider::TestNetDirProvider;
1169 use tor_persist::TestingStateMgr;
1170
1171 use super::*;
1172
1173 #[test]
1174 fn get_params() {
1175 use tor_netdir::{MdReceiver, PartialNetDir};
1176 use tor_netdoc::doc::netstatus::NetParams;
1177 let fb = FallbackList::from([]);
1179 let di: DirInfo<'_> = (&fb).into();
1180
1181 let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1182 assert!(!p1.extend_by_ed25519_id);
1183
1184 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1186 let mut params = NetParams::default();
1187 params.set("circwindow".into(), 100);
1188 params.set("ExtendByEd25519ID".into(), 1);
1189 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1190 for m in microdescs {
1191 dir.add_microdesc(m);
1192 }
1193 let netdir = dir.unwrap_if_sufficient().unwrap();
1194 let di: DirInfo<'_> = (&netdir).into();
1195 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1196 assert!(p2.extend_by_ed25519_id);
1197
1198 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1200 let mut params = NetParams::default();
1201 params.set("circwindow".into(), 100_000);
1202 params.set("ExtendByEd25519ID".into(), 1);
1203 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1204 for m in microdescs {
1205 dir.add_microdesc(m);
1206 }
1207 let netdir = dir.unwrap_if_sufficient().unwrap();
1208 let di: DirInfo<'_> = (&netdir).into();
1209 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1210 assert!(p2.extend_by_ed25519_id);
1211 }
1212
1213 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1214 let config = crate::config::test_config::TestConfig::default();
1215 let statemgr = TestingStateMgr::new();
1216 let guardmgr =
1217 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1218 let builder = FakeBuilder::new(
1219 &runtime,
1220 statemgr.clone(),
1221 &tor_guardmgr::TestConfig::default(),
1222 );
1223 let circmgr = Arc::new(CircMgrInner::new_generic(
1224 &config, &runtime, &guardmgr, builder,
1225 ));
1226 let netdir = Arc::new(TestNetDirProvider::new());
1227 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1228 .expect("launch CircMgrInner background tasks");
1229 circmgr
1230 }
1231
1232 #[test]
1233 #[cfg(feature = "hs-common")]
1234 fn test_launch_hs_unmanaged() {
1235 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1236 let circmgr = make_circmgr(runtime.clone());
1237 let netdir = tor_netdir::testnet::construct_netdir()
1238 .unwrap_if_sufficient()
1239 .unwrap();
1240
1241 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1242 runtime.spawn_identified("launch_hs_unamanged", async move {
1243 ret_tx
1244 .send(
1245 circmgr
1246 .launch_hs_unmanaged::<OwnedChanTarget>(
1247 None,
1248 &netdir,
1249 HsCircStemKind::Naive,
1250 None,
1251 )
1252 .await,
1253 )
1254 .unwrap();
1255 });
1256 runtime.advance_by(Duration::from_millis(60)).await;
1257 ret_rx.await.unwrap().unwrap();
1258 });
1259 }
1260}