Skip to main content

arti/
proxy.rs

1//! Implement a simple proxy that relays connections over Tor.
2//!
3//! A proxy is launched with [`bind_proxy()`], which opens listener ports.
4//! `StreamProxy::run_proxy` then listens for new
5//! connections, handles an appropriate handshake,
6//! and then relays traffic as appropriate.
7
8semipublic_mod! {
9    #[cfg(feature="http-connect")]
10    mod http_connect;
11    mod socks;
12    pub(crate) mod port_info;
13}
14
15use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, Error as IoError};
16use futures::stream::StreamExt;
17use std::net::IpAddr;
18use std::sync::Arc;
19use tor_basic_utils::error_sources::ErrorSources;
20use tor_rtcompat::{NetStreamProvider, SpawnExt};
21use tracing::{debug, error, info, instrument, warn};
22
23#[allow(unused)]
24use arti_client::HasKind;
25use arti_client::TorClient;
26#[cfg(feature = "rpc")]
27use arti_rpcserver::RpcMgr;
28use tor_config::Listen;
29use tor_error::{debug_report, warn_report};
30use tor_rtcompat::{NetStreamListener, Runtime};
31use tor_socksproto::SocksAuth;
32
33use anyhow::{Context, Result, anyhow};
34
35/// Placeholder type when RPC is disabled at compile time.
36#[cfg(not(feature = "rpc"))]
37#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
38pub(crate) enum RpcMgr {}
39
40/// A set of proxy protocols to support on a listener.
41#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
42#[derive(Copy, Clone, Debug)]
43#[non_exhaustive]
44pub(crate) enum ListenProtocols {
45    /// Only the socks protocol.
46    SocksOnly,
47    /// Socks _and_ HTTP CONNECT.
48    SocksAndHttpConnect,
49}
50
51impl ListenProtocols {
52    /// Return true if http connect is included in this set of protocols.
53    fn http_connect_supported(self) -> bool {
54        matches!(self, Self::SocksAndHttpConnect)
55    }
56}
57
58impl std::fmt::Display for ListenProtocols {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            ListenProtocols::SocksOnly => write!(f, "SOCKS"),
62            ListenProtocols::SocksAndHttpConnect => write!(f, "SOCKS+HTTP"),
63        }
64    }
65}
66
67/// A Key used to isolate connections.
68///
69/// Composed of an usize (representing which listener socket accepted
70/// the connection, the source IpAddr of the client, and the
71/// authentication string provided by the client).
72#[derive(Debug, Clone, PartialEq, Eq)]
73struct StreamIsolationKey(ListenerIsolation, ProvidedIsolation);
74
75/// Isolation information provided through the proxy connection
76#[derive(Debug, Clone, PartialEq, Eq)]
77enum ProvidedIsolation {
78    /// The socks isolation itself.
79    LegacySocks(SocksAuth),
80    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
81    ExtendedSocks {
82        /// Which format was negotiated?
83        ///
84        /// (At present, different format codes can't share a circuit.)
85        format_code: u8,
86        /// What's the isolation string?
87        isolation: Box<[u8]>,
88    },
89    #[cfg(feature = "http-connect")]
90    /// An HTTP token, taken from headers.
91    Http(http_connect::Isolation),
92}
93
94impl arti_client::isolation::IsolationHelper for StreamIsolationKey {
95    fn compatible_same_type(&self, other: &Self) -> bool {
96        self == other
97    }
98
99    fn join_same_type(&self, other: &Self) -> Option<Self> {
100        if self == other {
101            Some(self.clone())
102        } else {
103            None
104        }
105    }
106
107    fn enables_long_lived_circuits(&self) -> bool {
108        use ProvidedIsolation as PI;
109        use SocksAuth as SA;
110        match &self.1 {
111            PI::LegacySocks(SA::Socks4(auth)) => !auth.is_empty(),
112            PI::LegacySocks(SA::Username(uname, pass)) => !(uname.is_empty() && pass.is_empty()),
113            PI::LegacySocks(_) => false,
114            PI::ExtendedSocks { isolation, .. } => !isolation.is_empty(),
115            #[cfg(feature = "http-connect")]
116            PI::Http(isolation) => !isolation.is_empty(),
117        }
118    }
119}
120
121/// Size of read buffer to apply to application data streams
122/// and Tor data streams when copying.
123//
124// This particular value is chosen more or less arbitrarily.
125// Larger values let us do fewer reads from the application,
126// but consume more memory.
127//
128// (The default value for BufReader is 8k as of this writing.)
129const APP_STREAM_BUF_LEN: usize = 4096;
130
131const _: () = {
132    assert!(APP_STREAM_BUF_LEN >= tor_socksproto::SOCKS_BUF_LEN);
133};
134
135/// NOTE: The following documentation belongs in a spec.
136/// But for now, it's our best attempt to document the design and protocol
137/// implemented here
138/// for integrating proxies with our RPC system. --nickm
139///
140/// Roughly speaking:
141///
142/// ## Key concepts
143///
144/// A data stream is "RPC-visible" if, when it is created via a proxy connection,
145/// the RPC system is told about it.
146///
147/// Every RPC-visible stream is associated with a given RPC object when it is created.
148/// (Since the RPC object is being specified in the proxy protocol,
149/// it must be one with an externally visible Object ID.
150/// Such Object IDs are cryptographically unguessable and unforgeable,
151/// and are qualified with a unique identifier for their associated RPC session.)
152/// Call this RPC Object the "target" object for now.
153/// This target RPC object must implement
154/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
155///
156/// Right now, there are two general kinds of objects that implement this method:
157/// client-like objects, and one-shot clients.
158///
159/// A client-like object is either a `TorClient` or an RPC `Session`.
160/// It knows about and it is capable of opening multiple data streams.
161/// Using it as the target object for a proxy connection tells Arti
162/// that the resulting data stream (if any)
163/// should be built by it, and associated with its RPC session.
164///
165/// An application gets a TorClient by asking the session for one,
166/// or for asking a TorClient to give you a new variant clone of itself.
167///
168/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
169/// It is created from a client-like object, but can only be used for a single data stream.
170/// When created, it it not yet connected or trying to connect to anywhere:
171/// the act of using it as the target Object for a proxy connection causes
172/// it to begin connecting.
173///
174/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
175/// on any client-like object.
176///
177/// ## The Proxy protocol
178///
179/// See the specification for
180/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
181/// for full details on integrating RPC with SOCKS.
182/// For HTTP integration, see
183/// [the relevant section of prop365](https://spec.torproject.org/proposals/365-http-connect-ext.html#x-tor-rpc-target-arti-rpc-support).
184///
185/// ### Further restrictions on Object IDs and isolation
186///
187/// In some cases,
188/// the RPC Object ID may denote an object
189/// that already includes information about its intended stream isolation.
190/// In such cases, the stream isolation MUST be blank.
191/// Implementations MUST reject non-blank stream isolation in such cases.
192///
193/// In some cases, the RPC object ID may denote an object
194/// that already includes information
195/// about its intended destination address and port.
196/// In such cases, the destination address MUST be `0.0.0.0` or `::`
197/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
198/// and the destination port MUST be 0.
199/// Implementations MUST reject other addresses in such cases.
200///
201/// ### Another proposed change
202///
203/// We could add a new method to clients, with a name like
204/// "open_stream" or "connect_stream".
205/// This method would include all target and isolation information in its parameters.
206/// It would actually create a DataStream immediately, tell it to begin connecting,
207/// and return an externally visible object ID.
208/// The RPC protocol could be used to watch the DataStream object,
209/// to see when it was connected.
210///
211/// The resulting DataStream object could also be used as the target of a proxy connection.
212/// We would require in such a case that no isolation be provided in the proxy handshake,
213/// and that the target address was (e.g.) INADDR_ANY.
214///
215/// ## Intended use cases (examples)
216///
217/// (These examples assume that the application
218/// already knows the proxy port it should use.
219/// I'm leaving out the isolation strings as orthogonal.)
220///
221/// These are **NOT** the only possible use cases;
222/// they're just the two that help understand this system best (I hope).
223///
224/// ### Case 1: Using a client-like object directly.
225///
226/// Here the application has authenticated to RPC
227/// and gotten the session ID `SESSION-1`.
228/// (In reality, this would be a longer ID, and full of crypto).
229///
230/// The application wants to open a new stream to www.example.com.
231/// They don't particularly care about isolation,
232/// but they do want their stream to use their RPC session.
233/// They don't want an Object ID for the stream.
234///
235/// To do this, they make a SOCKS connection to arti,
236/// with target address www.example.com.
237/// They set the username to `<torS0X>0SESSION-1`,
238/// and the password to the empty string.
239///
240/// (Alternatively, it could use HTTP CONNECT, setting
241/// Tor-Rpc-Target to SESSION-1.)
242///
243/// Arti looks up the Session object via the `SESSION-1` object ID
244/// and tells it (via the ConnectWithPrefs special method)
245/// to connect to www.example.com.
246/// The session creates a new DataStream using its internal TorClient,
247/// but does not register the stream with an RPC Object ID.
248/// Arti proxies the application's connection through this DataStream.
249///
250///
251/// ### Case 2: Creating an identifiable stream.
252///
253/// Here the application wants to be able to refer to its DataStream
254/// after the stream is created.
255/// As before, we assume that it's on an RPC session
256/// where the Session ID is `SESSION-1`.
257///
258/// The application sends an RPC request of the form:
259/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
260///
261/// It receives a reply like:
262/// `{"id": 123, "result": {"id": "STREAM-1"} }`
263///
264/// (In reality, `STREAM-1` would also be longer and full of crypto.)
265///
266/// Now the application has an object called `STREAM-1` that is not yet a connected
267/// stream, but which may become one.
268///
269/// This time, it wants to set its isolation string to "xyzzy".
270///
271/// The application opens a socks connection as before.
272/// For the username it sends `<torS0X>0STREAM-1`,
273/// and for the password it sends `xyzzy`.
274///
275/// (Alternatively, it could use HTTP CONNECT, setting Tor-Isolation to xyzzy,
276/// and Tor-Rpc-Target to STREAM-1.)
277///
278/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
279/// and tells it (via the ConnectWithPrefs special method)
280/// to connect to www.example.com.
281/// This causes the `RpcDataStream` internally to create a new `DataStream`,
282/// and to store that `DataStream` in itself.
283/// The `RpcDataStream` with Object ID `STREAM-1`
284/// is now an alias for the newly created `DataStream`.
285/// Arti proxies the application's connection through that `DataStream`.
286///
287#[cfg(feature = "rpc")]
288#[allow(dead_code)]
289mod socks_and_rpc {}
290
291/// Information used to implement a proxy listener.
292struct ProxyContext<R: Runtime> {
293    /// A TorClient to use (by default) to anonymize requests.
294    tor_client: TorClient<R>,
295    /// If present, an RpcMgr to use when for attaching requests to RPC
296    /// sessions.
297    #[cfg(feature = "rpc")]
298    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
299    /// The protocols that we support.
300    protocols: ListenProtocols,
301}
302
303/// Type alias for the isolation information associated with a given proxy
304/// connection _before_ any negotiation occurs.
305///
306/// Currently this is an index for which listener accepted the connection, plus
307/// the address of the client that connected to the proxy port.
308type ListenerIsolation = (usize, IpAddr);
309
310/// write_all the data to the writer & flush the writer if write_all is successful.
311async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
312where
313    W: AsyncWrite + Unpin,
314{
315    writer
316        .write_all(buf)
317        .await
318        .context("Error while writing proxy reply")?;
319    writer
320        .flush()
321        .await
322        .context("Error while flushing proxy stream")
323}
324
325/// write_all the data to the writer & close the writer if write_all is successful.
326async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
327where
328    W: AsyncWrite + Unpin,
329{
330    writer
331        .write_all(buf)
332        .await
333        .context("Error while writing proxy reply")?;
334    writer
335        .close()
336        .await
337        .context("Error while closing proxy stream")
338}
339
340/// Return true if a given IoError, when received from accept, is a fatal
341/// error.
342fn accept_err_is_fatal(err: &IoError) -> bool {
343    #![allow(clippy::match_like_matches_macro)]
344
345    /// Re-declaration of WSAEMFILE with the right type to match
346    /// `raw_os_error()`.
347    #[cfg(windows)]
348    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
349
350    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
351    // we need to use OS-specific errors. :P
352    match err.raw_os_error() {
353        #[cfg(unix)]
354        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
355        #[cfg(windows)]
356        Some(WSAEMFILE) => false,
357        _ => true,
358    }
359}
360
361/// A stream proxy listening on one or more local ports, ready to relay traffic.
362#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
363#[must_use]
364pub(crate) struct StreamProxy<R: Runtime> {
365    /// A tor client to use when relaying traffic.
366    tor_client: TorClient<R>,
367    /// The listeners that we've actually bound to.
368    listeners: Vec<<R as NetStreamProvider>::Listener>,
369    /// The protocols we respond to.
370    protocols: ListenProtocols,
371    /// An RPC manager to use when incoming requests are tied to streams.
372    rpc_mgr: Option<Arc<RpcMgr>>,
373}
374
375/// Launch a proxy to listen on a given set of ports.
376///
377/// Requires a `runtime` to use for launching tasks and handling
378/// timeouts, and a `tor_client` to use in connecting over the Tor
379/// network.
380///
381/// Returns the proxy, and a list of the ports that we have
382/// bound to.
383#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
384#[instrument(skip_all, level = "trace")]
385pub(crate) async fn bind_proxy<R: Runtime>(
386    runtime: R,
387    tor_client: TorClient<R>,
388    listen: Listen,
389    protocols: ListenProtocols,
390    rpc_mgr: Option<Arc<RpcMgr>>,
391) -> Result<StreamProxy<R>> {
392    if !listen.is_loopback_only() {
393        warn!(
394            "Configured to listen for proxy connections on non-local addresses. \
395            This is usually insecure! We recommend listening on localhost only."
396        );
397    }
398
399    let mut listeners = Vec::new();
400
401    // Try to bind to the listener ports.
402    match listen.ip_addrs() {
403        Ok(addrgroups) => {
404            for addrgroup in addrgroups {
405                for addr in addrgroup {
406                    match runtime.listen(&addr).await {
407                        Ok(listener) => {
408                            let bound_addr = listener.local_addr()?;
409                            info!("Listening on {:?}", bound_addr);
410                            listeners.push(listener);
411                        }
412                        #[cfg(unix)]
413                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
414                            warn_report!(e, "Address family not supported {}", addr);
415                        }
416                        Err(ref e) => {
417                            return Err(anyhow!("Can't listen on {}: {e}", addr));
418                        }
419                    }
420                }
421                // TODO: We are supposed to fail if every address in the group failed!
422            }
423        }
424        Err(e) => warn_report!(e, "Invalid listen spec"),
425    }
426
427    // We weren't able to bind any ports: There's nothing to do.
428    if listeners.is_empty() {
429        error!("Couldn't open any listeners.");
430        return Err(anyhow!("Couldn't open listeners"));
431    }
432
433    Ok(StreamProxy {
434        tor_client,
435        listeners,
436        protocols,
437        rpc_mgr,
438    })
439}
440
441impl<R: Runtime> StreamProxy<R> {
442    /// Run indefinitely, processing incoming connections and relaying traffic.
443    pub(crate) async fn run_proxy(self) -> Result<()> {
444        let StreamProxy {
445            tor_client,
446            listeners,
447            protocols,
448            rpc_mgr,
449        } = self;
450        run_proxy_with_listeners(tor_client, listeners, protocols, rpc_mgr).await
451    }
452
453    /// Return a list of the ports that we've bound to.
454    pub(crate) fn port_info(&self) -> Result<Vec<port_info::Port>> {
455        let mut ports = Vec::new();
456        for listener in &self.listeners {
457            let address = listener.local_addr()?;
458            ports.push(port_info::Port {
459                protocol: port_info::SupportedProtocol::Socks,
460                address: address.into(),
461            });
462            #[cfg(feature = "http-connect")]
463            if self.protocols.http_connect_supported() {
464                ports.push(port_info::Port {
465                    protocol: port_info::SupportedProtocol::Http,
466                    address: address.into(),
467                });
468            }
469        }
470
471        Ok(ports)
472    }
473}
474
475/// Launch a proxy from a given set of already bound listeners.
476#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
477#[instrument(skip_all, level = "trace")]
478pub(crate) async fn run_proxy_with_listeners<R: Runtime>(
479    tor_client: TorClient<R>,
480    listeners: Vec<<R as tor_rtcompat::NetStreamProvider>::Listener>,
481    protocols: ListenProtocols,
482    rpc_mgr: Option<Arc<RpcMgr>>,
483) -> Result<()> {
484    // Create a stream of (incoming socket, listener_id) pairs, selected
485    // across all the listeners.
486    let mut incoming = futures::stream::select_all(
487        listeners
488            .into_iter()
489            .map(NetStreamListener::incoming)
490            .enumerate()
491            .map(|(listener_id, incoming_conns)| {
492                incoming_conns.map(move |socket| (socket, listener_id))
493            }),
494    );
495
496    // Loop over all incoming connections.  For each one, call
497    // handle_proxy_conn() in a new task.
498    while let Some((stream, sock_id)) = incoming.next().await {
499        let (stream, addr) = match stream {
500            Ok((s, a)) => (s, a),
501            Err(err) => {
502                if accept_err_is_fatal(&err) {
503                    return Err(err).context("Failed to receive incoming stream on proxy port");
504                } else {
505                    warn_report!(err, "Incoming stream failed");
506                    continue;
507                }
508            }
509        };
510        let proxy_context = ProxyContext {
511            tor_client: tor_client.clone(),
512            #[cfg(feature = "rpc")]
513            rpc_mgr: rpc_mgr.clone(),
514            protocols,
515        };
516        tor_client.runtime().spawn(async move {
517            let res = handle_proxy_conn(proxy_context, stream, (sock_id, addr.ip())).await;
518            if let Err(ref e) = res {
519                report_proxy_error(e);
520            }
521        })?;
522    }
523
524    Ok(())
525}
526
527/// A (possibly) supported proxy protocol.
528enum ProxyProtocols {
529    /// Some HTTP/1 command or other.
530    ///
531    /// (We only support CONNECT and OPTIONS, but we reject other commands in [`http_connect`].)
532    Http1,
533    /// SOCKS4 or SOCKS5.
534    Socks,
535}
536
537/// Look at the first byte of a proxy connection, and guess what protocol
538/// what protocol it is trying to speak.
539fn classify_protocol_from_first_byte(byte: u8) -> Option<ProxyProtocols> {
540    match byte {
541        b'a'..=b'z' | b'A'..=b'Z' => Some(ProxyProtocols::Http1),
542        4 | 5 => Some(ProxyProtocols::Socks),
543        _ => None,
544    }
545}
546
547/// Handle a single connection `stream` from an application.
548///
549/// Depending on what protocol the application is speaking
550/// (and what protocols we support!), negotiate an appropriate set of options,
551/// and relay traffic to and from the application.
552async fn handle_proxy_conn<R, S>(
553    context: ProxyContext<R>,
554    stream: S,
555    isolation_info: ListenerIsolation,
556) -> Result<()>
557where
558    R: Runtime,
559    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
560{
561    let mut stream = BufReader::with_capacity(APP_STREAM_BUF_LEN, stream);
562    use futures::AsyncBufReadExt as _;
563
564    let buf: &[u8] = stream.fill_buf().await?;
565    if buf.is_empty() {
566        // connection closed
567        return Ok(());
568    }
569    match classify_protocol_from_first_byte(buf[0]) {
570        Some(ProxyProtocols::Http1) => {
571            #[cfg(feature = "http-connect")]
572            if context.protocols.http_connect_supported() {
573                return http_connect::handle_http_conn(context, stream, isolation_info).await;
574            }
575
576            write_all_and_close(&mut stream, socks::WRONG_PROTOCOL_PAYLOAD).await?;
577            Ok(())
578        }
579        Some(ProxyProtocols::Socks) => {
580            socks::handle_socks_conn(context, stream, isolation_info).await
581        }
582        None => {
583            // We have no idea what protocol the client expects,
584            // so we have no idea how to tell it so.
585            warn!(
586                "Unrecognized protocol on proxy listener (first byte {:x})",
587                buf[0]
588            );
589            Ok(())
590        }
591    }
592}
593
594/// If any source of the provided `error` is a [`tor_proto::Error`], return a reference to that
595/// [`tor_proto::Error`].
596fn extract_proto_err<'a>(
597    error: &'a (dyn std::error::Error + 'static),
598) -> Option<&'a tor_proto::Error> {
599    for error in ErrorSources::new(error) {
600        if let Some(downcast) = error.downcast_ref::<tor_proto::Error>() {
601            return Some(downcast);
602        }
603    }
604
605    None
606}
607
608/// Report an error that occurred within a single proxy task.
609#[expect(clippy::cognitive_complexity)]
610fn report_proxy_error(e: &anyhow::Error) {
611    use tor_proto::Error as PE;
612    // TODO: In the long run it might be a good idea to use an ErrorKind here if we can get one.
613    // This is a bit of a kludge based on the fact that we're using anyhow.
614    //
615    // TODO: It might be handy to have a way to collapse CircuitClosed into EOF earlier.
616    // But that loses information, so it should be optional.
617    //
618    // TODO: Maybe we should look at io::ErrorKind as well, if it's there.  That's another reason
619    // to discard or restrict our anyhow usage.
620    match extract_proto_err(e.as_ref()) {
621        Some(e @ PE::CircuitClosed) => debug_report!(e, "Connection exited"),
622        // We can't use `debug_report!` here because `NotConnected`s error kind is `BadApiUsage`,
623        // which upgrades this to a warning.
624        // https://gitlab.torproject.org/tpo/core/arti/-/issues/2439
625        Some(e @ PE::NotConnected) => debug!(error = (e as &dyn std::error::Error), "Connection exited"),
626        _ => warn_report!(*e, "Connection exited"),
627    }
628}