Skip to main content

arti/subcommands/
proxy.rs

1//! The `proxy` subcommand.
2
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use cfg_if::cfg_if;
7use clap::ArgMatches;
8#[allow(unused)]
9use tor_config_path::CfgPathResolver;
10use tracing::{info, instrument, warn};
11
12use arti_client::TorClientConfig;
13use tor_config::{ConfigurationSources, Listen};
14use tor_rtcompat::ToplevelRuntime;
15
16#[cfg(feature = "dns-proxy")]
17use crate::dns;
18use crate::{
19    ArtiConfig, TorClient, exit, process,
20    proxy::{self, ListenProtocols, port_info},
21    reload_cfg,
22};
23
24#[cfg(feature = "rpc")]
25use crate::rpc;
26
27#[cfg(feature = "onion-service-service")]
28use crate::onion_proxy;
29
30/// Shorthand for a boxed and pinned Future.
31type PinnedFuture<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
32
33/// Run the `proxy` subcommand.
34#[instrument(skip_all, level = "trace")]
35#[allow(clippy::cognitive_complexity)]
36pub(crate) fn run<R: ToplevelRuntime>(
37    runtime: R,
38    proxy_matches: &ArgMatches,
39    cfg_sources: ConfigurationSources,
40    config: ArtiConfig,
41    client_config: TorClientConfig,
42) -> Result<()> {
43    // Override configured listen addresses from the command line.
44    // This implies listening on localhost ports.
45
46    // TODO: Parse a string rather than calling new_localhost.
47    let socks_listen = match proxy_matches.get_one::<u16>("socks-port") {
48        Some(p) => Listen::new_localhost(*p),
49        None => config.proxy().socks_listen.clone(),
50    };
51
52    // TODO: Parse a string rather than calling new_localhost.
53    let dns_listen = match proxy_matches.get_one::<u16>("dns-port") {
54        Some(p) => Listen::new_localhost(*p),
55        None => config.proxy().dns_listen.clone(),
56    };
57
58    if !socks_listen.is_empty() {
59        info!(
60            "Starting Arti {} in proxy mode on {} ...",
61            env!("CARGO_PKG_VERSION"),
62            socks_listen
63        );
64    }
65
66    if let Some(listen) = {
67        // https://github.com/metrics-rs/metrics/issues/567
68        config
69            .metrics
70            .prometheus
71            .listen
72            .single_address_legacy()
73            .context("can only listen on a single address for Prometheus metrics")?
74    } {
75        cfg_if! {
76            if #[cfg(feature = "metrics")] {
77                metrics_exporter_prometheus::PrometheusBuilder::new()
78                    .with_http_listener(listen)
79                    .install()
80                    .with_context(|| format!(
81                        "set up Prometheus metrics exporter on {listen}"
82                    ))?;
83                info!("Arti Prometheus metrics export scraper endpoint http://{listen}");
84            } else {
85                return Err(anyhow::anyhow!(
86        "`metrics.prometheus.listen` config set but `metrics` cargo feature compiled out in `arti` crate"
87                ));
88            }
89        }
90    }
91
92    #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
93    process::use_max_file_limit(&config);
94
95    let rt_copy = runtime.clone();
96    rt_copy.block_on(run_proxy(
97        runtime,
98        socks_listen,
99        dns_listen,
100        config.proxy().protocols(),
101        cfg_sources,
102        config,
103        client_config,
104    ))?;
105
106    Ok(())
107}
108
109/// Run the main loop of the proxy.
110///
111/// # Panics
112///
113/// Currently, might panic if things go badly enough wrong
114#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
115#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
116#[instrument(skip_all, level = "trace")]
117async fn run_proxy<R: ToplevelRuntime>(
118    runtime: R,
119    socks_listen: Listen,
120    dns_listen: Listen,
121    protocols: ListenProtocols,
122    config_sources: ConfigurationSources,
123    arti_config: ArtiConfig,
124    client_config: TorClientConfig,
125) -> Result<()> {
126    // Using OnDemand arranges that, while we are bootstrapping, incoming connections wait
127    // for bootstrap to complete, rather than getting errors.
128    use arti_client::BootstrapBehavior::OnDemand;
129    use futures::FutureExt;
130
131    // TODO: We may instead want to provide a way to get these items out of TorClient.
132    let fs_mistrust = client_config.fs_mistrust().clone();
133    let path_resolver: CfgPathResolver = AsRef::<CfgPathResolver>::as_ref(&client_config).clone();
134
135    let client_builder = TorClient::with_runtime(runtime.clone())
136        .config(client_config)
137        .bootstrap_behavior(OnDemand);
138    let client = client_builder.create_unbootstrapped_async().await?;
139
140    #[allow(unused_mut)]
141    let mut reconfigurable_modules: Vec<Arc<dyn reload_cfg::ReconfigurableModule>> = vec![
142        Arc::new(client.clone()),
143        Arc::new(reload_cfg::Application::new(arti_config.clone())),
144    ];
145
146    cfg_if::cfg_if! {
147        if #[cfg(feature = "onion-service-service")] {
148            let onion_services =
149                onion_proxy::ProxySet::launch_new(&client, arti_config.onion_services.clone())?;
150            let launched_onion_svc = !onion_services.is_empty();
151            reconfigurable_modules.push(Arc::new(onion_services));
152        } else {
153            let launched_onion_svc = false;
154        }
155    };
156
157    // We weak references here to prevent the thread spawned by watch_for_config_changes from
158    // keeping these modules alive after this function exits.
159    //
160    // NOTE: reconfigurable_modules stores the only strong references to these modules,
161    // so we must keep the variable alive until the end of the function
162    let weak_modules = reconfigurable_modules.iter().map(Arc::downgrade).collect();
163    reload_cfg::watch_for_config_changes(
164        client.runtime(),
165        config_sources,
166        &arti_config,
167        weak_modules,
168    )?;
169
170    cfg_if::cfg_if! {
171        if #[cfg(feature = "rpc")] {
172            let rpc_data = rpc::launch_rpc_mgr(
173                &runtime,
174                &arti_config.rpc,
175                &path_resolver,
176                &fs_mistrust,
177                client.clone(),
178            )
179            .await?;
180            let (rpc_mgr, mut rpc_state_sender) = rpc_data
181                .map(|d| (d.rpc_mgr, d.rpc_state_sender))
182                .unzip();
183        } else {
184            let rpc_mgr = None;
185        }
186    }
187
188    let mut proxy: Vec<PinnedFuture<Result<()>>> = Vec::new();
189    let mut ports = Vec::new();
190    if !socks_listen.is_empty() {
191        let runtime = runtime.clone();
192        let client = client.isolated_client();
193        let socks_listen = socks_listen.clone();
194        let listener_type = protocols.to_string();
195
196        let stream_proxy = proxy::bind_proxy(runtime, client, socks_listen, protocols, rpc_mgr)
197            .await
198            .with_context(|| format!("Unable to launch {listener_type} proxy"))?;
199        let port_info = stream_proxy.port_info()?;
200
201        ports.extend(port_info);
202
203        let failure_message = format!("{listener_type} proxy died unexpectedly");
204        let proxy_future = stream_proxy
205            .run_proxy()
206            .map(|future_result| future_result.context(failure_message));
207        proxy.push(Box::pin(proxy_future));
208    }
209
210    #[cfg(feature = "dns-proxy")]
211    if !dns_listen.is_empty() {
212        let runtime = runtime.clone();
213        let client = client.isolated_client();
214        let dns_proxy = dns::bind_dns_resolver(runtime, client, dns_listen)
215            .await
216            .context("Unable to launch DNS proxy")?;
217        ports.extend(dns_proxy.port_info().context("Unable to find DNS ports")?);
218        let proxy_future = dns_proxy
219            .run_dns_proxy()
220            .map(|future_result| future_result.context("DNS proxy died unexpectedly"));
221        proxy.push(Box::pin(proxy_future));
222    }
223
224    #[cfg(not(feature = "dns-proxy"))]
225    if !dns_listen.is_empty() {
226        warn!(
227            "Tried to specify a DNS proxy address, but Arti was built without dns-proxy support."
228        );
229        return Ok(());
230    }
231
232    if proxy.is_empty() {
233        if !launched_onion_svc {
234            // TODO: rename "socks_listen" to "proxy_listen", preserving compat, once http-connect is stable.
235            warn!(
236                "No proxy address set; \
237                specify -p PORT (to override `socks_listen`) \
238                or -d PORT (to override `dns_listen`). \
239                Alternatively, use the `socks_listen` or `dns_listen` configuration options."
240            );
241            return Ok(());
242        } else {
243            // Push a dummy future to appease future::select_all,
244            // which expects a non-empty list
245            proxy.push(Box::pin(futures::future::pending()));
246        }
247    }
248
249    cfg_if::cfg_if! {
250        if #[cfg(feature="rpc")] {
251            if let Some(rpc_state_sender) = &mut rpc_state_sender {
252                rpc_state_sender.set_stream_listeners(&ports[..]);
253            }
254        }
255    }
256
257    {
258        let port_info = port_info::PortInfo { ports };
259        let port_info_file = arti_config
260            .storage()
261            .port_info_file
262            .path(&path_resolver)
263            .context("Can't find path for port_info_file")?;
264        if port_info_file.to_str() != Some("") {
265            port_info.write_to_file(&fs_mistrust, &port_info_file)?;
266        }
267    }
268
269    let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);
270    futures::select!(
271        r = exit::wait_for_ctrl_c().fuse()
272            => r.context("waiting for termination signal"),
273        r = proxy.fuse()
274            => r,
275        r = async {
276            client.bootstrap().await?;
277            if !socks_listen.is_empty() {
278                info!("Sufficiently bootstrapped; proxy now functional.");
279            } else {
280                info!("Sufficiently bootstrapped.");
281            }
282            futures::future::pending::<Result<()>>().await
283        }.fuse()
284            => r.context("bootstrap"),
285    )?;
286
287    // The modules can be dropped now, because we are exiting.
288    drop(reconfigurable_modules);
289
290    Ok(())
291}