arti/subcommands/
proxy.rs1use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use cfg_if::cfg_if;
7use clap::ArgMatches;
8#[allow(unused)]
9use tor_config_path::CfgPathResolver;
10use tracing::{info, instrument, warn};
11
12use arti_client::TorClientConfig;
13use tor_config::{ConfigurationSources, Listen};
14use tor_rtcompat::ToplevelRuntime;
15
16#[cfg(feature = "dns-proxy")]
17use crate::dns;
18use crate::{
19 ArtiConfig, TorClient, exit, process,
20 proxy::{self, ListenProtocols, port_info},
21 reload_cfg,
22};
23
24#[cfg(feature = "rpc")]
25use crate::rpc;
26
27#[cfg(feature = "onion-service-service")]
28use crate::onion_proxy;
29
30type PinnedFuture<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
32
33#[instrument(skip_all, level = "trace")]
35#[allow(clippy::cognitive_complexity)]
36pub(crate) fn run<R: ToplevelRuntime>(
37 runtime: R,
38 proxy_matches: &ArgMatches,
39 cfg_sources: ConfigurationSources,
40 config: ArtiConfig,
41 client_config: TorClientConfig,
42) -> Result<()> {
43 let socks_listen = match proxy_matches.get_one::<u16>("socks-port") {
48 Some(p) => Listen::new_localhost(*p),
49 None => config.proxy().socks_listen.clone(),
50 };
51
52 let dns_listen = match proxy_matches.get_one::<u16>("dns-port") {
54 Some(p) => Listen::new_localhost(*p),
55 None => config.proxy().dns_listen.clone(),
56 };
57
58 if !socks_listen.is_empty() {
59 info!(
60 "Starting Arti {} in proxy mode on {} ...",
61 env!("CARGO_PKG_VERSION"),
62 socks_listen
63 );
64 }
65
66 if let Some(listen) = {
67 config
69 .metrics
70 .prometheus
71 .listen
72 .single_address_legacy()
73 .context("can only listen on a single address for Prometheus metrics")?
74 } {
75 cfg_if! {
76 if #[cfg(feature = "metrics")] {
77 metrics_exporter_prometheus::PrometheusBuilder::new()
78 .with_http_listener(listen)
79 .install()
80 .with_context(|| format!(
81 "set up Prometheus metrics exporter on {listen}"
82 ))?;
83 info!("Arti Prometheus metrics export scraper endpoint http://{listen}");
84 } else {
85 return Err(anyhow::anyhow!(
86 "`metrics.prometheus.listen` config set but `metrics` cargo feature compiled out in `arti` crate"
87 ));
88 }
89 }
90 }
91
92 #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
93 process::use_max_file_limit(&config);
94
95 let rt_copy = runtime.clone();
96 rt_copy.block_on(run_proxy(
97 runtime,
98 socks_listen,
99 dns_listen,
100 config.proxy().protocols(),
101 cfg_sources,
102 config,
103 client_config,
104 ))?;
105
106 Ok(())
107}
108
109#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
115#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
116#[instrument(skip_all, level = "trace")]
117async fn run_proxy<R: ToplevelRuntime>(
118 runtime: R,
119 socks_listen: Listen,
120 dns_listen: Listen,
121 protocols: ListenProtocols,
122 config_sources: ConfigurationSources,
123 arti_config: ArtiConfig,
124 client_config: TorClientConfig,
125) -> Result<()> {
126 use arti_client::BootstrapBehavior::OnDemand;
129 use futures::FutureExt;
130
131 let fs_mistrust = client_config.fs_mistrust().clone();
133 let path_resolver: CfgPathResolver = AsRef::<CfgPathResolver>::as_ref(&client_config).clone();
134
135 let client_builder = TorClient::with_runtime(runtime.clone())
136 .config(client_config)
137 .bootstrap_behavior(OnDemand);
138 let client = client_builder.create_unbootstrapped_async().await?;
139
140 #[allow(unused_mut)]
141 let mut reconfigurable_modules: Vec<Arc<dyn reload_cfg::ReconfigurableModule>> = vec![
142 Arc::new(client.clone()),
143 Arc::new(reload_cfg::Application::new(arti_config.clone())),
144 ];
145
146 cfg_if::cfg_if! {
147 if #[cfg(feature = "onion-service-service")] {
148 let onion_services =
149 onion_proxy::ProxySet::launch_new(&client, arti_config.onion_services.clone())?;
150 let launched_onion_svc = !onion_services.is_empty();
151 reconfigurable_modules.push(Arc::new(onion_services));
152 } else {
153 let launched_onion_svc = false;
154 }
155 };
156
157 let weak_modules = reconfigurable_modules.iter().map(Arc::downgrade).collect();
163 reload_cfg::watch_for_config_changes(
164 client.runtime(),
165 config_sources,
166 &arti_config,
167 weak_modules,
168 )?;
169
170 cfg_if::cfg_if! {
171 if #[cfg(feature = "rpc")] {
172 let rpc_data = rpc::launch_rpc_mgr(
173 &runtime,
174 &arti_config.rpc,
175 &path_resolver,
176 &fs_mistrust,
177 client.clone(),
178 )
179 .await?;
180 let (rpc_mgr, mut rpc_state_sender) = rpc_data
181 .map(|d| (d.rpc_mgr, d.rpc_state_sender))
182 .unzip();
183 } else {
184 let rpc_mgr = None;
185 }
186 }
187
188 let mut proxy: Vec<PinnedFuture<Result<()>>> = Vec::new();
189 let mut ports = Vec::new();
190 if !socks_listen.is_empty() {
191 let runtime = runtime.clone();
192 let client = client.isolated_client();
193 let socks_listen = socks_listen.clone();
194 let listener_type = protocols.to_string();
195
196 let stream_proxy = proxy::bind_proxy(runtime, client, socks_listen, protocols, rpc_mgr)
197 .await
198 .with_context(|| format!("Unable to launch {listener_type} proxy"))?;
199 let port_info = stream_proxy.port_info()?;
200
201 ports.extend(port_info);
202
203 let failure_message = format!("{listener_type} proxy died unexpectedly");
204 let proxy_future = stream_proxy
205 .run_proxy()
206 .map(|future_result| future_result.context(failure_message));
207 proxy.push(Box::pin(proxy_future));
208 }
209
210 #[cfg(feature = "dns-proxy")]
211 if !dns_listen.is_empty() {
212 let runtime = runtime.clone();
213 let client = client.isolated_client();
214 let dns_proxy = dns::bind_dns_resolver(runtime, client, dns_listen)
215 .await
216 .context("Unable to launch DNS proxy")?;
217 ports.extend(dns_proxy.port_info().context("Unable to find DNS ports")?);
218 let proxy_future = dns_proxy
219 .run_dns_proxy()
220 .map(|future_result| future_result.context("DNS proxy died unexpectedly"));
221 proxy.push(Box::pin(proxy_future));
222 }
223
224 #[cfg(not(feature = "dns-proxy"))]
225 if !dns_listen.is_empty() {
226 warn!(
227 "Tried to specify a DNS proxy address, but Arti was built without dns-proxy support."
228 );
229 return Ok(());
230 }
231
232 if proxy.is_empty() {
233 if !launched_onion_svc {
234 warn!(
236 "No proxy address set; \
237 specify -p PORT (to override `socks_listen`) \
238 or -d PORT (to override `dns_listen`). \
239 Alternatively, use the `socks_listen` or `dns_listen` configuration options."
240 );
241 return Ok(());
242 } else {
243 proxy.push(Box::pin(futures::future::pending()));
246 }
247 }
248
249 cfg_if::cfg_if! {
250 if #[cfg(feature="rpc")] {
251 if let Some(rpc_state_sender) = &mut rpc_state_sender {
252 rpc_state_sender.set_stream_listeners(&ports[..]);
253 }
254 }
255 }
256
257 {
258 let port_info = port_info::PortInfo { ports };
259 let port_info_file = arti_config
260 .storage()
261 .port_info_file
262 .path(&path_resolver)
263 .context("Can't find path for port_info_file")?;
264 if port_info_file.to_str() != Some("") {
265 port_info.write_to_file(&fs_mistrust, &port_info_file)?;
266 }
267 }
268
269 let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);
270 futures::select!(
271 r = exit::wait_for_ctrl_c().fuse()
272 => r.context("waiting for termination signal"),
273 r = proxy.fuse()
274 => r,
275 r = async {
276 client.bootstrap().await?;
277 if !socks_listen.is_empty() {
278 info!("Sufficiently bootstrapped; proxy now functional.");
279 } else {
280 info!("Sufficiently bootstrapped.");
281 }
282 futures::future::pending::<Result<()>>().await
283 }.fuse()
284 => r.context("bootstrap"),
285 )?;
286
287 drop(reconfigurable_modules);
289
290 Ok(())
291}