arti/proxy.rs
1//! Implement a simple proxy that relays connections over Tor.
2//!
3//! A proxy is launched with [`bind_proxy()`], which opens listener ports.
4//! `StreamProxy::run_proxy` then listens for new
5//! connections, handles an appropriate handshake,
6//! and then relays traffic as appropriate.
7
8semipublic_mod! {
9 #[cfg(feature="http-connect")]
10 mod http_connect;
11 mod socks;
12 pub(crate) mod port_info;
13}
14
15use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, Error as IoError};
16use futures::stream::StreamExt;
17use std::net::IpAddr;
18use std::sync::Arc;
19use tor_basic_utils::error_sources::ErrorSources;
20use tor_rtcompat::{NetStreamProvider, SpawnExt};
21use tracing::{debug, error, info, instrument, warn};
22
23#[allow(unused)]
24use arti_client::HasKind;
25use arti_client::TorClient;
26#[cfg(feature = "rpc")]
27use arti_rpcserver::RpcMgr;
28use tor_config::Listen;
29use tor_error::{debug_report, warn_report};
30use tor_rtcompat::{NetStreamListener, Runtime};
31use tor_socksproto::SocksAuth;
32
33use anyhow::{Context, Result, anyhow};
34
35/// Placeholder type when RPC is disabled at compile time.
36#[cfg(not(feature = "rpc"))]
37#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
38pub(crate) enum RpcMgr {}
39
40/// A set of proxy protocols to support on a listener.
41#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
42#[derive(Copy, Clone, Debug)]
43#[non_exhaustive]
44pub(crate) enum ListenProtocols {
45 /// Only the socks protocol.
46 SocksOnly,
47 /// Socks _and_ HTTP CONNECT.
48 SocksAndHttpConnect,
49}
50
51impl ListenProtocols {
52 /// Return true if http connect is included in this set of protocols.
53 fn http_connect_supported(self) -> bool {
54 matches!(self, Self::SocksAndHttpConnect)
55 }
56}
57
58impl std::fmt::Display for ListenProtocols {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 ListenProtocols::SocksOnly => write!(f, "SOCKS"),
62 ListenProtocols::SocksAndHttpConnect => write!(f, "SOCKS+HTTP"),
63 }
64 }
65}
66
67/// A Key used to isolate connections.
68///
69/// Composed of an usize (representing which listener socket accepted
70/// the connection, the source IpAddr of the client, and the
71/// authentication string provided by the client).
72#[derive(Debug, Clone, PartialEq, Eq)]
73struct StreamIsolationKey(ListenerIsolation, ProvidedIsolation);
74
75/// Isolation information provided through the proxy connection
76#[derive(Debug, Clone, PartialEq, Eq)]
77enum ProvidedIsolation {
78 /// The socks isolation itself.
79 LegacySocks(SocksAuth),
80 /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
81 ExtendedSocks {
82 /// Which format was negotiated?
83 ///
84 /// (At present, different format codes can't share a circuit.)
85 format_code: u8,
86 /// What's the isolation string?
87 isolation: Box<[u8]>,
88 },
89 #[cfg(feature = "http-connect")]
90 /// An HTTP token, taken from headers.
91 Http(http_connect::Isolation),
92}
93
94impl arti_client::isolation::IsolationHelper for StreamIsolationKey {
95 fn compatible_same_type(&self, other: &Self) -> bool {
96 self == other
97 }
98
99 fn join_same_type(&self, other: &Self) -> Option<Self> {
100 if self == other {
101 Some(self.clone())
102 } else {
103 None
104 }
105 }
106
107 fn enables_long_lived_circuits(&self) -> bool {
108 use ProvidedIsolation as PI;
109 use SocksAuth as SA;
110 match &self.1 {
111 PI::LegacySocks(SA::Socks4(auth)) => !auth.is_empty(),
112 PI::LegacySocks(SA::Username(uname, pass)) => !(uname.is_empty() && pass.is_empty()),
113 PI::LegacySocks(_) => false,
114 PI::ExtendedSocks { isolation, .. } => !isolation.is_empty(),
115 #[cfg(feature = "http-connect")]
116 PI::Http(isolation) => !isolation.is_empty(),
117 }
118 }
119}
120
121/// Size of read buffer to apply to application data streams
122/// and Tor data streams when copying.
123//
124// This particular value is chosen more or less arbitrarily.
125// Larger values let us do fewer reads from the application,
126// but consume more memory.
127//
128// (The default value for BufReader is 8k as of this writing.)
129const APP_STREAM_BUF_LEN: usize = 4096;
130
131const _: () = {
132 assert!(APP_STREAM_BUF_LEN >= tor_socksproto::SOCKS_BUF_LEN);
133};
134
135/// NOTE: The following documentation belongs in a spec.
136/// But for now, it's our best attempt to document the design and protocol
137/// implemented here
138/// for integrating proxies with our RPC system. --nickm
139///
140/// Roughly speaking:
141///
142/// ## Key concepts
143///
144/// A data stream is "RPC-visible" if, when it is created via a proxy connection,
145/// the RPC system is told about it.
146///
147/// Every RPC-visible stream is associated with a given RPC object when it is created.
148/// (Since the RPC object is being specified in the proxy protocol,
149/// it must be one with an externally visible Object ID.
150/// Such Object IDs are cryptographically unguessable and unforgeable,
151/// and are qualified with a unique identifier for their associated RPC session.)
152/// Call this RPC Object the "target" object for now.
153/// This target RPC object must implement
154/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
155///
156/// Right now, there are two general kinds of objects that implement this method:
157/// client-like objects, and one-shot clients.
158///
159/// A client-like object is either a `TorClient` or an RPC `Session`.
160/// It knows about and it is capable of opening multiple data streams.
161/// Using it as the target object for a proxy connection tells Arti
162/// that the resulting data stream (if any)
163/// should be built by it, and associated with its RPC session.
164///
165/// An application gets a TorClient by asking the session for one,
166/// or for asking a TorClient to give you a new variant clone of itself.
167///
168/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
169/// It is created from a client-like object, but can only be used for a single data stream.
170/// When created, it it not yet connected or trying to connect to anywhere:
171/// the act of using it as the target Object for a proxy connection causes
172/// it to begin connecting.
173///
174/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
175/// on any client-like object.
176///
177/// ## The Proxy protocol
178///
179/// See the specification for
180/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
181/// for full details on integrating RPC with SOCKS.
182/// For HTTP integration, see
183/// [the relevant section of prop365](https://spec.torproject.org/proposals/365-http-connect-ext.html#x-tor-rpc-target-arti-rpc-support).
184///
185/// ### Further restrictions on Object IDs and isolation
186///
187/// In some cases,
188/// the RPC Object ID may denote an object
189/// that already includes information about its intended stream isolation.
190/// In such cases, the stream isolation MUST be blank.
191/// Implementations MUST reject non-blank stream isolation in such cases.
192///
193/// In some cases, the RPC object ID may denote an object
194/// that already includes information
195/// about its intended destination address and port.
196/// In such cases, the destination address MUST be `0.0.0.0` or `::`
197/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
198/// and the destination port MUST be 0.
199/// Implementations MUST reject other addresses in such cases.
200///
201/// ### Another proposed change
202///
203/// We could add a new method to clients, with a name like
204/// "open_stream" or "connect_stream".
205/// This method would include all target and isolation information in its parameters.
206/// It would actually create a DataStream immediately, tell it to begin connecting,
207/// and return an externally visible object ID.
208/// The RPC protocol could be used to watch the DataStream object,
209/// to see when it was connected.
210///
211/// The resulting DataStream object could also be used as the target of a proxy connection.
212/// We would require in such a case that no isolation be provided in the proxy handshake,
213/// and that the target address was (e.g.) INADDR_ANY.
214///
215/// ## Intended use cases (examples)
216///
217/// (These examples assume that the application
218/// already knows the proxy port it should use.
219/// I'm leaving out the isolation strings as orthogonal.)
220///
221/// These are **NOT** the only possible use cases;
222/// they're just the two that help understand this system best (I hope).
223///
224/// ### Case 1: Using a client-like object directly.
225///
226/// Here the application has authenticated to RPC
227/// and gotten the session ID `SESSION-1`.
228/// (In reality, this would be a longer ID, and full of crypto).
229///
230/// The application wants to open a new stream to www.example.com.
231/// They don't particularly care about isolation,
232/// but they do want their stream to use their RPC session.
233/// They don't want an Object ID for the stream.
234///
235/// To do this, they make a SOCKS connection to arti,
236/// with target address www.example.com.
237/// They set the username to `<torS0X>0SESSION-1`,
238/// and the password to the empty string.
239///
240/// (Alternatively, it could use HTTP CONNECT, setting
241/// Tor-Rpc-Target to SESSION-1.)
242///
243/// Arti looks up the Session object via the `SESSION-1` object ID
244/// and tells it (via the ConnectWithPrefs special method)
245/// to connect to www.example.com.
246/// The session creates a new DataStream using its internal TorClient,
247/// but does not register the stream with an RPC Object ID.
248/// Arti proxies the application's connection through this DataStream.
249///
250///
251/// ### Case 2: Creating an identifiable stream.
252///
253/// Here the application wants to be able to refer to its DataStream
254/// after the stream is created.
255/// As before, we assume that it's on an RPC session
256/// where the Session ID is `SESSION-1`.
257///
258/// The application sends an RPC request of the form:
259/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
260///
261/// It receives a reply like:
262/// `{"id": 123, "result": {"id": "STREAM-1"} }`
263///
264/// (In reality, `STREAM-1` would also be longer and full of crypto.)
265///
266/// Now the application has an object called `STREAM-1` that is not yet a connected
267/// stream, but which may become one.
268///
269/// This time, it wants to set its isolation string to "xyzzy".
270///
271/// The application opens a socks connection as before.
272/// For the username it sends `<torS0X>0STREAM-1`,
273/// and for the password it sends `xyzzy`.
274///
275/// (Alternatively, it could use HTTP CONNECT, setting Tor-Isolation to xyzzy,
276/// and Tor-Rpc-Target to STREAM-1.)
277///
278/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
279/// and tells it (via the ConnectWithPrefs special method)
280/// to connect to www.example.com.
281/// This causes the `RpcDataStream` internally to create a new `DataStream`,
282/// and to store that `DataStream` in itself.
283/// The `RpcDataStream` with Object ID `STREAM-1`
284/// is now an alias for the newly created `DataStream`.
285/// Arti proxies the application's connection through that `DataStream`.
286///
287#[cfg(feature = "rpc")]
288#[allow(dead_code)]
289mod socks_and_rpc {}
290
291/// Information used to implement a proxy listener.
292struct ProxyContext<R: Runtime> {
293 /// A TorClient to use (by default) to anonymize requests.
294 tor_client: TorClient<R>,
295 /// If present, an RpcMgr to use when for attaching requests to RPC
296 /// sessions.
297 #[cfg(feature = "rpc")]
298 rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
299 /// The protocols that we support.
300 protocols: ListenProtocols,
301}
302
303/// Type alias for the isolation information associated with a given proxy
304/// connection _before_ any negotiation occurs.
305///
306/// Currently this is an index for which listener accepted the connection, plus
307/// the address of the client that connected to the proxy port.
308type ListenerIsolation = (usize, IpAddr);
309
310/// write_all the data to the writer & flush the writer if write_all is successful.
311async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
312where
313 W: AsyncWrite + Unpin,
314{
315 writer
316 .write_all(buf)
317 .await
318 .context("Error while writing proxy reply")?;
319 writer
320 .flush()
321 .await
322 .context("Error while flushing proxy stream")
323}
324
325/// write_all the data to the writer & close the writer if write_all is successful.
326async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
327where
328 W: AsyncWrite + Unpin,
329{
330 writer
331 .write_all(buf)
332 .await
333 .context("Error while writing proxy reply")?;
334 writer
335 .close()
336 .await
337 .context("Error while closing proxy stream")
338}
339
340/// Return true if a given IoError, when received from accept, is a fatal
341/// error.
342fn accept_err_is_fatal(err: &IoError) -> bool {
343 #![allow(clippy::match_like_matches_macro)]
344
345 /// Re-declaration of WSAEMFILE with the right type to match
346 /// `raw_os_error()`.
347 #[cfg(windows)]
348 const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
349
350 // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
351 // we need to use OS-specific errors. :P
352 match err.raw_os_error() {
353 #[cfg(unix)]
354 Some(libc::EMFILE) | Some(libc::ENFILE) => false,
355 #[cfg(windows)]
356 Some(WSAEMFILE) => false,
357 _ => true,
358 }
359}
360
361/// A stream proxy listening on one or more local ports, ready to relay traffic.
362#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
363#[must_use]
364pub(crate) struct StreamProxy<R: Runtime> {
365 /// A tor client to use when relaying traffic.
366 tor_client: TorClient<R>,
367 /// The listeners that we've actually bound to.
368 listeners: Vec<<R as NetStreamProvider>::Listener>,
369 /// The protocols we respond to.
370 protocols: ListenProtocols,
371 /// An RPC manager to use when incoming requests are tied to streams.
372 rpc_mgr: Option<Arc<RpcMgr>>,
373}
374
375/// Launch a proxy to listen on a given set of ports.
376///
377/// Requires a `runtime` to use for launching tasks and handling
378/// timeouts, and a `tor_client` to use in connecting over the Tor
379/// network.
380///
381/// Returns the proxy, and a list of the ports that we have
382/// bound to.
383#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
384#[instrument(skip_all, level = "trace")]
385pub(crate) async fn bind_proxy<R: Runtime>(
386 runtime: R,
387 tor_client: TorClient<R>,
388 listen: Listen,
389 protocols: ListenProtocols,
390 rpc_mgr: Option<Arc<RpcMgr>>,
391) -> Result<StreamProxy<R>> {
392 if !listen.is_loopback_only() {
393 warn!(
394 "Configured to listen for proxy connections on non-local addresses. \
395 This is usually insecure! We recommend listening on localhost only."
396 );
397 }
398
399 let mut listeners = Vec::new();
400
401 // Try to bind to the listener ports.
402 match listen.ip_addrs() {
403 Ok(addrgroups) => {
404 for addrgroup in addrgroups {
405 for addr in addrgroup {
406 match runtime.listen(&addr).await {
407 Ok(listener) => {
408 let bound_addr = listener.local_addr()?;
409 info!("Listening on {:?}", bound_addr);
410 listeners.push(listener);
411 }
412 #[cfg(unix)]
413 Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
414 warn_report!(e, "Address family not supported {}", addr);
415 }
416 Err(ref e) => {
417 return Err(anyhow!("Can't listen on {}: {e}", addr));
418 }
419 }
420 }
421 // TODO: We are supposed to fail if every address in the group failed!
422 }
423 }
424 Err(e) => warn_report!(e, "Invalid listen spec"),
425 }
426
427 // We weren't able to bind any ports: There's nothing to do.
428 if listeners.is_empty() {
429 error!("Couldn't open any listeners.");
430 return Err(anyhow!("Couldn't open listeners"));
431 }
432
433 Ok(StreamProxy {
434 tor_client,
435 listeners,
436 protocols,
437 rpc_mgr,
438 })
439}
440
441impl<R: Runtime> StreamProxy<R> {
442 /// Run indefinitely, processing incoming connections and relaying traffic.
443 pub(crate) async fn run_proxy(self) -> Result<()> {
444 let StreamProxy {
445 tor_client,
446 listeners,
447 protocols,
448 rpc_mgr,
449 } = self;
450 run_proxy_with_listeners(tor_client, listeners, protocols, rpc_mgr).await
451 }
452
453 /// Return a list of the ports that we've bound to.
454 pub(crate) fn port_info(&self) -> Result<Vec<port_info::Port>> {
455 let mut ports = Vec::new();
456 for listener in &self.listeners {
457 let address = listener.local_addr()?;
458 ports.push(port_info::Port {
459 protocol: port_info::SupportedProtocol::Socks,
460 address: address.into(),
461 });
462 #[cfg(feature = "http-connect")]
463 if self.protocols.http_connect_supported() {
464 ports.push(port_info::Port {
465 protocol: port_info::SupportedProtocol::Http,
466 address: address.into(),
467 });
468 }
469 }
470
471 Ok(ports)
472 }
473}
474
475/// Launch a proxy from a given set of already bound listeners.
476#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
477#[instrument(skip_all, level = "trace")]
478pub(crate) async fn run_proxy_with_listeners<R: Runtime>(
479 tor_client: TorClient<R>,
480 listeners: Vec<<R as tor_rtcompat::NetStreamProvider>::Listener>,
481 protocols: ListenProtocols,
482 rpc_mgr: Option<Arc<RpcMgr>>,
483) -> Result<()> {
484 // Create a stream of (incoming socket, listener_id) pairs, selected
485 // across all the listeners.
486 let mut incoming = futures::stream::select_all(
487 listeners
488 .into_iter()
489 .map(NetStreamListener::incoming)
490 .enumerate()
491 .map(|(listener_id, incoming_conns)| {
492 incoming_conns.map(move |socket| (socket, listener_id))
493 }),
494 );
495
496 // Loop over all incoming connections. For each one, call
497 // handle_proxy_conn() in a new task.
498 while let Some((stream, sock_id)) = incoming.next().await {
499 let (stream, addr) = match stream {
500 Ok((s, a)) => (s, a),
501 Err(err) => {
502 if accept_err_is_fatal(&err) {
503 return Err(err).context("Failed to receive incoming stream on proxy port");
504 } else {
505 warn_report!(err, "Incoming stream failed");
506 continue;
507 }
508 }
509 };
510 let proxy_context = ProxyContext {
511 tor_client: tor_client.clone(),
512 #[cfg(feature = "rpc")]
513 rpc_mgr: rpc_mgr.clone(),
514 protocols,
515 };
516 tor_client.runtime().spawn(async move {
517 let res = handle_proxy_conn(proxy_context, stream, (sock_id, addr.ip())).await;
518 if let Err(ref e) = res {
519 report_proxy_error(e);
520 }
521 })?;
522 }
523
524 Ok(())
525}
526
527/// A (possibly) supported proxy protocol.
528enum ProxyProtocols {
529 /// Some HTTP/1 command or other.
530 ///
531 /// (We only support CONNECT and OPTIONS, but we reject other commands in [`http_connect`].)
532 Http1,
533 /// SOCKS4 or SOCKS5.
534 Socks,
535}
536
537/// Look at the first byte of a proxy connection, and guess what protocol
538/// what protocol it is trying to speak.
539fn classify_protocol_from_first_byte(byte: u8) -> Option<ProxyProtocols> {
540 match byte {
541 b'a'..=b'z' | b'A'..=b'Z' => Some(ProxyProtocols::Http1),
542 4 | 5 => Some(ProxyProtocols::Socks),
543 _ => None,
544 }
545}
546
547/// Handle a single connection `stream` from an application.
548///
549/// Depending on what protocol the application is speaking
550/// (and what protocols we support!), negotiate an appropriate set of options,
551/// and relay traffic to and from the application.
552async fn handle_proxy_conn<R, S>(
553 context: ProxyContext<R>,
554 stream: S,
555 isolation_info: ListenerIsolation,
556) -> Result<()>
557where
558 R: Runtime,
559 S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
560{
561 let mut stream = BufReader::with_capacity(APP_STREAM_BUF_LEN, stream);
562 use futures::AsyncBufReadExt as _;
563
564 let buf: &[u8] = stream.fill_buf().await?;
565 if buf.is_empty() {
566 // connection closed
567 return Ok(());
568 }
569 match classify_protocol_from_first_byte(buf[0]) {
570 Some(ProxyProtocols::Http1) => {
571 #[cfg(feature = "http-connect")]
572 if context.protocols.http_connect_supported() {
573 return http_connect::handle_http_conn(context, stream, isolation_info).await;
574 }
575
576 write_all_and_close(&mut stream, socks::WRONG_PROTOCOL_PAYLOAD).await?;
577 Ok(())
578 }
579 Some(ProxyProtocols::Socks) => {
580 socks::handle_socks_conn(context, stream, isolation_info).await
581 }
582 None => {
583 // We have no idea what protocol the client expects,
584 // so we have no idea how to tell it so.
585 warn!(
586 "Unrecognized protocol on proxy listener (first byte {:x})",
587 buf[0]
588 );
589 Ok(())
590 }
591 }
592}
593
594/// If any source of the provided `error` is a [`tor_proto::Error`], return a reference to that
595/// [`tor_proto::Error`].
596fn extract_proto_err<'a>(
597 error: &'a (dyn std::error::Error + 'static),
598) -> Option<&'a tor_proto::Error> {
599 for error in ErrorSources::new(error) {
600 if let Some(downcast) = error.downcast_ref::<tor_proto::Error>() {
601 return Some(downcast);
602 }
603 }
604
605 None
606}
607
608/// Report an error that occurred within a single proxy task.
609#[expect(clippy::cognitive_complexity)]
610fn report_proxy_error(e: &anyhow::Error) {
611 use tor_proto::Error as PE;
612 // TODO: In the long run it might be a good idea to use an ErrorKind here if we can get one.
613 // This is a bit of a kludge based on the fact that we're using anyhow.
614 //
615 // TODO: It might be handy to have a way to collapse CircuitClosed into EOF earlier.
616 // But that loses information, so it should be optional.
617 //
618 // TODO: Maybe we should look at io::ErrorKind as well, if it's there. That's another reason
619 // to discard or restrict our anyhow usage.
620 match extract_proto_err(e.as_ref()) {
621 Some(e @ PE::CircuitClosed) => debug_report!(e, "Connection exited"),
622 // We can't use `debug_report!` here because `NotConnected`s error kind is `BadApiUsage`,
623 // which upgrades this to a warning.
624 // https://gitlab.torproject.org/tpo/core/arti/-/issues/2439
625 Some(e @ PE::NotConnected) => debug!(error = (e as &dyn std::error::Error), "Connection exited"),
626 _ => warn_report!(*e, "Connection exited"),
627 }
628}