Skip to main content

arti/proxy/
http_connect.rs

1//! Implement an HTTP1 CONNECT proxy using `hyper`.
2//!
3//! Note that Tor defines several extensions to HTTP CONNECT;
4//! See [the spec](spec.torproject.org/http-connect.html)
5//! for more information.
6
7use super::{ListenerIsolation, ProxyContext};
8use anyhow::{Context as _, anyhow};
9use arti_client::{StreamPrefs, TorAddr};
10use futures::{AsyncRead, AsyncWrite, io::BufReader};
11use http::{Method, StatusCode, response::Builder as ResponseBuilder};
12use hyper::{Response, server::conn::http1::Builder as ServerBuilder, service::service_fn};
13use safelog::{Sensitive as Sv, sensitive as sv};
14use tor_error::{ErrorKind, ErrorReport as _, HasKind, into_internal, warn_report};
15use tor_rtcompat::Runtime;
16use tor_rtcompat::SpawnExt as _;
17use tracing::{instrument, warn};
18
19use hyper_futures_io::FuturesIoCompat;
20
21#[cfg(feature = "rpc")]
22use {crate::rpc::conntarget::ConnTarget, tor_rpcbase as rpc};
23
24cfg_if::cfg_if! {
25    if #[cfg(feature="rpc")] {
26        /// Error type returned from a failed connect_with_prefs.
27        type ClientError = Box<dyn arti_client::rpc::ClientConnectionError>;
28    } else {
29        /// Error type returned from a failed connect_with_prefs.
30        type ClientError = arti_client::Error;
31    }
32}
33
34/// Request type that we receive from Hyper.
35type Request = hyper::Request<hyper::body::Incoming>;
36
37/// We use "String" as our body type, since we only return a body on error,
38/// in which case it already starts life as a formatted string.
39///
40/// (We could use () or `Empty` for our (200 OK) replies,
41/// but empty strings are cheap enough that it isn't worth it.)
42type Body = String;
43
44/// A value used to isolate streams received via HTTP CONNECT.
45#[derive(Clone, Debug, Eq, PartialEq)]
46pub(super) struct Isolation {
47    /// The value of the Proxy-Authorization header.
48    proxy_auth: Option<ProxyAuthorization>,
49    /// The legacy X-Tor-Isolation token.
50    x_tor_isolation: Option<String>,
51    /// The up-to-date Tor-Isolation token.
52    tor_isolation: Option<String>,
53}
54
55impl Isolation {
56    /// Return true if no isolation field in this object is set.
57    pub(super) fn is_empty(&self) -> bool {
58        let Isolation {
59            proxy_auth,
60            x_tor_isolation,
61            tor_isolation,
62        } = self;
63        proxy_auth.as_ref().is_none_or(ProxyAuthorization::is_empty)
64            && x_tor_isolation.as_ref().is_none_or(String::is_empty)
65            && tor_isolation.as_ref().is_none_or(String::is_empty)
66    }
67}
68
69/// Constants and code for the HTTP headers we use.
70mod hdr {
71    pub(super) use http::header::{CONTENT_TYPE, HOST, PROXY_AUTHORIZATION, SERVER, VIA};
72
73    /// Client-to-proxy: Which IP family should we use?
74    pub(super) const TOR_FAMILY_PREFERENCE: &str = "Tor-Family-Preference";
75
76    /// Client-To-Proxy: The ID of an RPC object to receive our request.
77    pub(super) const TOR_RPC_TARGET: &str = "Tor-RPC-Target";
78
79    /// Client-To-Proxy: An isolation token to use with our stream.
80    /// (Legacy name.)
81    pub(super) const X_TOR_STREAM_ISOLATION: &str = "X-Tor-Stream-Isolation";
82
83    /// Client-To-Proxy: An isolation token to use with our stream.
84    pub(super) const TOR_STREAM_ISOLATION: &str = "Tor-Stream-Isolation";
85
86    /// Proxy-to-client: A list of the capabilities that this proxy provides.
87    pub(super) const TOR_CAPABILITIES: &str = "Tor-Capabilities";
88
89    /// Proxy-to-client: A machine-readable list of failure reasons.
90    pub(super) const TOR_REQUEST_FAILED: &str = "Tor-Request-Failed";
91
92    /// A list of all the headers that we support from client-to-proxy.
93    ///
94    /// Does not include headers that we check for HTTP conformance,
95    /// but not for any other purpose.
96    pub(super) const ALL_REQUEST_HEADERS: &[&str] = &[
97        TOR_FAMILY_PREFERENCE,
98        TOR_RPC_TARGET,
99        X_TOR_STREAM_ISOLATION,
100        TOR_STREAM_ISOLATION,
101        // Can't use 'PROXY_AUTHORIZATION', since it isn't a str, and its as_str() isn't const.
102        "Proxy-Authorization",
103    ];
104
105    /// Return the unique string-valued value of the header `name`;
106    /// or None if the header doesn't exist,
107    /// or an error if the header is duplicated or not UTF-8.
108    pub(super) fn uniq_utf8(
109        map: &http::HeaderMap,
110        name: impl http::header::AsHeaderName,
111    ) -> Result<Option<&str>, super::HttpConnectError> {
112        let mut iter = map.get_all(name).iter();
113        let val = match iter.next() {
114            Some(v) => v,
115            None => return Ok(None),
116        };
117        match iter.next() {
118            Some(_) => Err(super::HttpConnectError::DuplicateHeader),
119            None => val
120                .to_str()
121                .map(Some)
122                .map_err(|_| super::HttpConnectError::HeaderNotUtf8),
123        }
124    }
125}
126
127/// Given a just-received TCP connection `S` on a HTTP proxy port, handle the
128/// HTTP handshake and relay the connection over the Tor network.
129///
130/// Uses `isolation_info` to decide which circuits this connection
131/// may use.  Requires that `isolation_info` is a pair listing the listener
132/// id and the source address for the HTTP request.
133#[instrument(skip_all, level = "trace")]
134pub(super) async fn handle_http_conn<R, S>(
135    context: super::ProxyContext<R>,
136    stream: BufReader<S>,
137    isolation_info: ListenerIsolation,
138) -> crate::Result<()>
139where
140    R: Runtime,
141    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
142{
143    // NOTES:
144    // * We _could_ use a timeout, but we trust that the client is not trying to DOS us.
145    ServerBuilder::new()
146        .half_close(false)
147        .keep_alive(true)
148        .max_headers(256)
149        .max_buf_size(16 * 1024)
150        .title_case_headers(true)
151        .auto_date_header(false) // We omit the date header out of general principle.
152        .serve_connection(
153            FuturesIoCompat(stream),
154            service_fn(|request| handle_http_request::<R, S>(request, &context, isolation_info)),
155        )
156        .with_upgrades()
157        .await?;
158
159    Ok(())
160}
161
162/// Handle a single HTTP request.
163///
164/// This function is invoked by hyper.
165async fn handle_http_request<R, S>(
166    request: Request,
167    context: &ProxyContext<R>,
168    listener_isolation: ListenerIsolation,
169) -> Result<Response<Body>, anyhow::Error>
170where
171    R: Runtime,
172    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
173{
174    // Avoid cross-site attacks based on DNS forgery by validating that the Host
175    // header is in fact localhost.  In these cases, we don't want to reply at all,
176    // _even with an error message_, since our headers could be used to tell a hostile
177    // webpage information about the local arti process.
178    //
179    // We don't do this for CONNECT requests, since those are forbidden by
180    // XHR and JS fetch(), and since Host _will_ be non-localhost for those.
181    if request.method() != Method::CONNECT {
182        match hdr::uniq_utf8(request.headers(), hdr::HOST) {
183            Err(e) => return Err(e).context("Host header invalid. Rejecting request."),
184            Ok(Some(host)) if !host_is_localhost(host) => {
185                return Err(anyhow!(
186                    "Host header {host:?} was not localhost. Rejecting request."
187                ));
188            }
189            Ok(_) => {}
190        }
191    }
192
193    match *request.method() {
194        Method::OPTIONS => handle_options_request(&request),
195        Method::CONNECT => {
196            handle_connect_request::<R, S>(request, context, listener_isolation).await
197        }
198        _ => Ok(ResponseBuilder::new()
199            .status(StatusCode::NOT_IMPLEMENTED)
200            .err(
201                request.method(),
202                format!("{} is not supported", request.method()),
203            )?),
204    }
205}
206
207/// Return an appropriate reply to the given OPTIONS request.
208fn handle_options_request(request: &Request) -> Result<Response<Body>, anyhow::Error> {
209    use hyper::body::Body as _;
210
211    let target = request.uri().to_string();
212    match target.as_str() {
213        "*" => {}
214        s if TorAddr::from(s).is_ok() => {}
215        _ => {
216            return Ok(ResponseBuilder::new()
217                .status(StatusCode::BAD_REQUEST)
218                .err(&Method::OPTIONS, "Target was not a valid address")?);
219        }
220    }
221    if request.headers().contains_key(hdr::CONTENT_TYPE) {
222        // RFC 9110 says that if a client wants to include a body with its OPTIONS request (!),
223        // it must include a Content-Type header.  Therefore, we reject such requests.
224        return Ok(ResponseBuilder::new()
225            .status(StatusCode::BAD_REQUEST)
226            .err(&Method::OPTIONS, "Unexpected Content-Type on OPTIONS")?);
227
228        // TODO: It would be cool to detect nonempty bodies in other ways, though in practice
229        // it should never come up.
230    }
231    if !request.body().is_end_stream() {
232        return Ok(ResponseBuilder::new()
233            .status(StatusCode::BAD_REQUEST)
234            .err(&Method::OPTIONS, "Unexpected body on OPTIONS request")?);
235    }
236
237    Ok(ResponseBuilder::new()
238        .header("Allow", "OPTIONS, CONNECT")
239        .status(StatusCode::OK)
240        .ok(&Method::OPTIONS)?)
241}
242
243/// Return an appropriate reply to the given CONNECT request.
244async fn handle_connect_request<R, S>(
245    request: Request,
246    context: &ProxyContext<R>,
247    listener_isolation: ListenerIsolation,
248) -> anyhow::Result<Response<Body>>
249where
250    R: Runtime,
251    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
252{
253    match handle_connect_request_impl::<R, S>(request, context, listener_isolation).await {
254        Ok(response) => Ok(response),
255        Err(e) => Ok(e.try_into_response()?),
256    }
257}
258
259/// Helper for handle_connect_request:
260/// return an error type that can be converted into an HTTP message.
261///
262/// (This is a separate function to make error handling simpler.)
263async fn handle_connect_request_impl<R, S>(
264    request: Request,
265    context: &ProxyContext<R>,
266    listener_isolation: ListenerIsolation,
267) -> Result<Response<Body>, HttpConnectError>
268where
269    R: Runtime,
270    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
271{
272    let target = request.uri().to_string();
273    let tor_addr =
274        TorAddr::from(&target).map_err(|e| HttpConnectError::InvalidStreamTarget(sv(target), e))?;
275
276    let mut stream_prefs = StreamPrefs::default();
277    set_family_preference(&mut stream_prefs, &tor_addr, request.headers())?;
278
279    set_isolation(&mut stream_prefs, request.headers(), listener_isolation)?;
280
281    let client = find_conn_target(
282        context,
283        hdr::uniq_utf8(request.headers(), hdr::TOR_RPC_TARGET)?,
284    )?;
285
286    // If we reach this point, the request looks okay, so we'll try to connect.
287    let tor_stream = client
288        .connect_with_prefs(&tor_addr, &stream_prefs)
289        .await
290        .map_err(|e| HttpConnectError::ConnectFailed(sv(tor_addr), e))?;
291
292    // We have connected.  We need to launch a separate task to actually be the proxy, though,
293    // since IIUC hyper::upgrade::on won't return an answer
294    // until after the response is given to the client.
295    context
296        .tor_client
297        .runtime()
298        .spawn(async move {
299            match transfer::<S>(request, tor_stream).await {
300                Ok(()) => {}
301                Err(e) => {
302                    warn_report!(e, "Error while launching transfer");
303                }
304            }
305        })
306        .map_err(into_internal!("Unable to spawn transfer task"))?;
307
308    ResponseBuilder::new()
309        .status(StatusCode::OK)
310        .ok(&Method::CONNECT)
311}
312
313/// Set the IP family preference in `prefs`.
314fn set_family_preference(
315    prefs: &mut StreamPrefs,
316    addr: &TorAddr,
317    headers: &http::HeaderMap,
318) -> Result<(), HttpConnectError> {
319    if let Some(val) = hdr::uniq_utf8(headers, hdr::TOR_FAMILY_PREFERENCE)? {
320        match val.trim() {
321            "ipv4-preferred" => prefs.ipv4_preferred(),
322            "ipv6-preferred" => prefs.ipv6_preferred(),
323            "ipv4-only" => prefs.ipv4_only(),
324            "ipv6-only" => prefs.ipv6_only(),
325            _ => return Err(HttpConnectError::InvalidFamilyPreference),
326        };
327    } else if let Some(ip) = addr.as_ip_address() {
328        // TODO: Perhaps we should check unconditionally whether the IP address is consistent with header,
329        // if one was given?  On the other hand, if the application tells us to make an IPV6-only
330        // connection to an IPv4 address, it probably deserves what it gets.
331        if ip.is_ipv4() {
332            prefs.ipv4_only();
333        } else {
334            prefs.ipv6_only();
335        }
336    }
337
338    Ok(())
339}
340
341/// Configure the stream isolation from the provided headers.
342fn set_isolation(
343    prefs: &mut StreamPrefs,
344    headers: &http::HeaderMap,
345    listener_isolation: ListenerIsolation,
346) -> Result<(), HttpConnectError> {
347    let proxy_auth =
348        hdr::uniq_utf8(headers, hdr::PROXY_AUTHORIZATION)?.map(ProxyAuthorization::from_header);
349    let x_tor_isolation = hdr::uniq_utf8(headers, hdr::X_TOR_STREAM_ISOLATION)?.map(str::to_owned);
350    let tor_isolation = hdr::uniq_utf8(headers, hdr::TOR_STREAM_ISOLATION)?.map(str::to_owned);
351
352    let isolation = super::ProvidedIsolation::Http(Isolation {
353        proxy_auth,
354        x_tor_isolation,
355        tor_isolation,
356    });
357
358    let isolation = super::StreamIsolationKey(listener_isolation, isolation);
359    prefs.set_isolation(isolation);
360
361    Ok(())
362}
363
364/// An isolation value based on the Proxy-Authorization header.
365#[derive(Debug, Clone, Eq, PartialEq)]
366pub(super) enum ProxyAuthorization {
367    /// The entire contents of the Proxy-Authorization header.
368    Legacy(String),
369    /// The decoded value of the basic authorization, with the user set to "tor-iso".
370    Modern(Vec<u8>),
371}
372
373impl ProxyAuthorization {
374    /// Return a ProxyAuthorization based on the value of the Proxy-Authorization header.
375    ///
376    /// Give a warning if the header is in the legacy (obsolete) format.
377    fn from_header(value: &str) -> Self {
378        if let Some(result) = Self::modern_from_header(value) {
379            result
380        } else {
381            warn!(
382                "{} header in obsolete format. If you want isolation, use {}, \
383                 or {} with Basic authentication and username 'tor-iso'",
384                hdr::PROXY_AUTHORIZATION,
385                hdr::X_TOR_STREAM_ISOLATION,
386                hdr::PROXY_AUTHORIZATION
387            );
388            Self::Legacy(value.to_owned())
389        }
390    }
391
392    /// Helper: Try to return a Modern authorization value, if this is one.
393    fn modern_from_header(value: &str) -> Option<Self> {
394        use base64ct::Encoding as _;
395        let value = value.trim_ascii();
396        let (kind, value) = value.split_once(' ')?;
397        if kind != "Basic" {
398            return None;
399        }
400        let value = value.trim_ascii();
401        // TODO: Is this the right format, or should we allow missing padding?
402        let decoded = base64ct::Base64::decode_vec(value).ok()?;
403        if decoded.starts_with(b"tor-iso:") {
404            Some(ProxyAuthorization::Modern(decoded))
405        } else {
406            None
407        }
408    }
409
410    /// Return true if this ProxyAuthorization has no authorization information.
411    fn is_empty(&self) -> bool {
412        match self {
413            ProxyAuthorization::Legacy(s) => s.is_empty(),
414            ProxyAuthorization::Modern(v) => v.is_empty(),
415        }
416    }
417}
418
419/// Look up the connection target given the value of an Tor-RPC-Target header.
420#[cfg(feature = "rpc")]
421fn find_conn_target<R: Runtime>(
422    context: &ProxyContext<R>,
423    rpc_target: Option<&str>,
424) -> Result<ConnTarget<R>, HttpConnectError> {
425    let Some(target_id) = rpc_target else {
426        return Ok(ConnTarget::Client(Box::new(context.tor_client.clone())));
427    };
428
429    let Some(rpc_mgr) = &context.rpc_mgr else {
430        return Err(HttpConnectError::NoRpcSupport);
431    };
432
433    let (context, object) = rpc_mgr
434        .lookup_object(&rpc::ObjectId::from(target_id))
435        .map_err(|_| HttpConnectError::RpcObjectNotFound)?;
436
437    Ok(ConnTarget::Rpc { object, context })
438}
439
440/// Look up the connection target given the value of an Tor-RPC-Target header
441//
442// (This is the implementation when we have no RPC support.)
443#[cfg(not(feature = "rpc"))]
444fn find_conn_target<R: Runtime>(
445    context: &ProxyContext<R>,
446    rpc_target: Option<&str>,
447) -> Result<arti_client::TorClient<R>, HttpConnectError> {
448    if rpc_target.is_some() {
449        Err(HttpConnectError::NoRpcSupport)
450    } else {
451        Ok(context.tor_client.clone())
452    }
453}
454
455/// Extension trait on ResponseBuilder
456trait RespBldExt {
457    /// Return a response for a successful builder.
458    fn ok(self, method: &Method) -> anyhow::Result<Response<Body>, HttpConnectError>;
459
460    /// Return a response for an error message.
461    fn err(
462        self,
463        method: &Method,
464        message: impl Into<String>,
465    ) -> Result<Response<Body>, HttpConnectError>;
466}
467
468impl RespBldExt for ResponseBuilder {
469    fn ok(self, method: &Method) -> Result<Response<Body>, HttpConnectError> {
470        let bld = add_common_headers(self, method);
471        Ok(bld
472            .body("".into())
473            .map_err(into_internal!("Formatting HTTP response"))?)
474    }
475
476    fn err(
477        self,
478        method: &Method,
479        message: impl Into<String>,
480    ) -> Result<Response<Body>, HttpConnectError> {
481        let bld = add_common_headers(self, method).header(hdr::CONTENT_TYPE, "text/plain");
482        Ok(bld
483            .body(message.into())
484            .map_err(into_internal!("Formatting HTTP response"))?)
485    }
486}
487
488/// Return a string representing our capabilities.
489fn capabilities() -> &'static str {
490    use std::sync::LazyLock;
491    static CAPS: LazyLock<String> = LazyLock::new(|| {
492        let mut caps = hdr::ALL_REQUEST_HEADERS.to_vec();
493        caps.sort();
494        caps.join(" ")
495    });
496
497    CAPS.as_str()
498}
499
500/// Add all common headers to the builder `bld`, and return a new builder.
501fn add_common_headers(mut bld: ResponseBuilder, method: &Method) -> ResponseBuilder {
502    bld = bld.header(hdr::TOR_CAPABILITIES, capabilities());
503    if let (Some(software), Some(version)) = (
504        option_env!("CARGO_PKG_NAME"),
505        option_env!("CARGO_PKG_VERSION"),
506    ) {
507        if method == Method::CONNECT {
508            bld = bld.header(
509                hdr::VIA,
510                format!("tor/1.0 tor-network ({software} {version})"),
511            );
512        } else {
513            bld = bld.header(hdr::SERVER, format!("tor/1.0 ({software} {version})"));
514        }
515    }
516    bld
517}
518
519/// An error that occurs during an HTTP CONNECT attempt, which can (usually)
520/// be reported to the client.
521#[derive(Clone, Debug, thiserror::Error)]
522enum HttpConnectError {
523    /// Tried to connect to an invalid stream target.
524    #[error("Invalid target address {0:?}")]
525    InvalidStreamTarget(Sv<String>, #[source] arti_client::TorAddrError),
526
527    /// We found a duplicate HTTP header that we do not allow.
528    ///
529    /// (We only enforce this for the headers that we look at ourselves.)
530    #[error("Duplicate HTTP header found.")]
531    DuplicateHeader,
532
533    /// We tried to found an HTTP header whose value wasn't encode as UTF-8.
534    ///
535    /// (We only enforce this for the headers that we look at ourselves.)
536    #[error("HTTP header value was not in UTF-8")]
537    HeaderNotUtf8,
538
539    /// The Tor-Family-Preference header wasn't as expected.
540    #[error("Unrecognized value for {}", hdr::TOR_FAMILY_PREFERENCE)]
541    InvalidFamilyPreference,
542
543    /// The user asked to use an RPC object, but we don't support RPC.
544    #[error(
545        "Found {} header, but we are running without RPC support",
546        hdr::TOR_RPC_TARGET
547    )]
548    NoRpcSupport,
549
550    /// The user asked to use an RPC object, but we didn't find the one they wanted.
551    #[error("RPC target object not found")]
552    RpcObjectNotFound,
553
554    /// arti_client was unable to connect to a stream target.
555    #[error("Unable to connect to {0}")]
556    ConnectFailed(Sv<TorAddr>, #[source] ClientError),
557
558    /// We encountered an internal error.
559    #[error("Internal error while handling request")]
560    Internal(#[from] tor_error::Bug),
561}
562
563impl HasKind for HttpConnectError {
564    fn kind(&self) -> ErrorKind {
565        use ErrorKind as EK;
566        use HttpConnectError as HCE;
567        match self {
568            HCE::InvalidStreamTarget(_, _)
569            | HCE::DuplicateHeader
570            | HCE::HeaderNotUtf8
571            | HCE::InvalidFamilyPreference
572            | HCE::RpcObjectNotFound => EK::LocalProtocolViolation,
573            HCE::NoRpcSupport => EK::FeatureDisabled,
574            HCE::ConnectFailed(_, e) => e.kind(),
575            HCE::Internal(e) => e.kind(),
576        }
577    }
578}
579
580impl HttpConnectError {
581    /// Return an appropriate HTTP status code for this error.
582    fn status_code(&self) -> StatusCode {
583        use HttpConnectError as HCE; // Not a Joyce reference
584        use StatusCode as SC;
585        if let Some(end_reason) = self.remote_end_reason() {
586            return end_reason_to_http_status(end_reason);
587        }
588        match self {
589            HCE::InvalidStreamTarget(_, _)
590            | HCE::DuplicateHeader
591            | HCE::HeaderNotUtf8
592            | HCE::InvalidFamilyPreference
593            | HCE::RpcObjectNotFound
594            | HCE::NoRpcSupport => SC::BAD_REQUEST,
595            HCE::ConnectFailed(_, e) => e.kind().http_status_code(),
596            HCE::Internal(e) => e.kind().http_status_code(),
597        }
598    }
599
600    /// If possible, return a response that we should give to this error.
601    fn try_into_response(self) -> Result<Response<Body>, HttpConnectError> {
602        let error_kind = self.kind();
603        let end_reason = self.remote_end_reason();
604        let status_code = self.status_code();
605        let mut request_failed = format!("arti/{error_kind:?}");
606        if let Some(end_reason) = end_reason {
607            request_failed.push_str(&format!(" end/{end_reason}"));
608        }
609
610        ResponseBuilder::new()
611            .status(status_code)
612            .header(hdr::TOR_REQUEST_FAILED, request_failed)
613            .err(&Method::CONNECT, self.report().to_string())
614    }
615
616    /// Return the end reason for this error, if this error does in fact represent an END message
617    /// from the remote side of a stream.
618    //
619    // TODO: This function is a bit fragile; it forces us to use APIs from tor-proto and
620    // tor-cell that are not re-exported from arti-client.  It also relies on the fact that
621    // there is a single error type way down in `tor-proto` representing a received END message.
622    fn remote_end_reason(&self) -> Option<tor_cell::relaycell::msg::EndReason> {
623        use tor_proto::Error::EndReceived;
624        if let Some(EndReceived(reason)) = super::extract_proto_err(self) {
625            Some(*reason)
626        } else {
627            None
628        }
629    }
630}
631
632/// Return the appropriate HTTP status code for a remote END reason.
633///
634/// Return `None` if the END reason is unrecognized and we should use the `ErrorKind`
635///
636/// (We  _could_ use the ErrorKind unconditionally,
637/// but the mapping from END reason to ErrorKind is [given in the spec][spec],
638/// so we try to obey it.)
639///
640/// [spec]: https://spec.torproject.org/http-connect.html#error-codes
641fn end_reason_to_http_status(end_reason: tor_cell::relaycell::msg::EndReason) -> StatusCode {
642    use StatusCode as S;
643    use tor_cell::relaycell::msg::EndReason as R;
644    match end_reason {
645        //
646        R::CONNECTREFUSED => S::FORBIDDEN, // 403
647        // 500: Internal server error.
648        R::MISC | R::NOTDIRECTORY => S::INTERNAL_SERVER_ERROR,
649
650        // 502: Bad Gateway.
651        R::DESTROY | R::DONE | R::HIBERNATING | R::INTERNAL | R::RESOURCELIMIT | R::TORPROTOCOL => {
652            S::BAD_GATEWAY
653        }
654        // 503: Service unavailable
655        R::CONNRESET | R::EXITPOLICY | R::NOROUTE | R::RESOLVEFAILED => S::SERVICE_UNAVAILABLE,
656
657        // 504: Gateway timeout.
658        R::TIMEOUT => S::GATEWAY_TIMEOUT,
659
660        // This is possible if the other side sent an unrecognized error code.
661        _ => S::INTERNAL_SERVER_ERROR, // 500
662    }
663}
664
665/// Recover the original stream from a [`hyper::upgrade::Upgraded`].
666fn deconstruct_upgrade<S>(upgraded: hyper::upgrade::Upgraded) -> Result<BufReader<S>, anyhow::Error>
667where
668    S: AsyncRead + AsyncWrite + Unpin + 'static,
669{
670    let parts: hyper::upgrade::Parts<FuturesIoCompat<BufReader<S>>> = upgraded
671        .downcast()
672        .map_err(|_| anyhow!("downcast failed!"))?;
673    let hyper::upgrade::Parts { io, read_buf, .. } = parts;
674    if !read_buf.is_empty() {
675        // TODO Figure out whether this can happen, due to possible race conditions if the client
676        // gets the OK before we check this?.
677        return Err(anyhow!(
678            "Extraneous data on hyper buffer after upgrade to proxy mode"
679        ));
680    }
681    let io: BufReader<S> = io.0;
682    Ok(io)
683}
684
685/// Recover the application stream from `request`, and launch tasks to transfer data between the application and
686/// the `tor_stream`.
687async fn transfer<S>(request: Request, tor_stream: arti_client::DataStream) -> anyhow::Result<()>
688where
689    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
690{
691    let upgraded = hyper::upgrade::on(request)
692        .await
693        .context("Unable to upgrade connection")?;
694    let app_stream: BufReader<S> = deconstruct_upgrade(upgraded)?;
695    let tor_stream = BufReader::with_capacity(super::APP_STREAM_BUF_LEN, tor_stream);
696
697    // Finally. relay traffic between
698    // the application stream and the tor stream, forever.
699    let _ = futures_copy::copy_buf_bidirectional(
700        app_stream,
701        tor_stream,
702        futures_copy::eof::Close,
703        futures_copy::eof::Close,
704    )
705    .await?;
706
707    Ok(())
708}
709
710/// Return true if `host` is a possible value for a Host header addressing localhost.
711fn host_is_localhost(host: &str) -> bool {
712    if let Ok(addr) = host.parse::<std::net::SocketAddr>() {
713        addr.ip().is_loopback()
714    } else if let Ok(ip) = host.parse::<std::net::IpAddr>() {
715        ip.is_loopback()
716    } else if let Some((addr, port)) = host.split_once(':') {
717        port.parse::<std::num::NonZeroU16>().is_ok() && addr.eq_ignore_ascii_case("localhost")
718    } else {
719        host.eq_ignore_ascii_case("localhost")
720    }
721}
722
723/// Helper module: Make `futures` types usable by `hyper`.
724//
725// TODO: We may want to expose this as a separate crate, or move it into tor-async-utils,
726// if we turn out to need it elsewhere.
727mod hyper_futures_io {
728    use pin_project::pin_project;
729    use std::{
730        io,
731        pin::Pin,
732        task::{Context, Poll, ready},
733    };
734
735    use hyper::rt::ReadBufCursor;
736
737    /// A wrapper around an AsyncBufRead + AsyncWrite to implement traits required by hyper.
738    #[derive(Debug)]
739    #[pin_project]
740    pub(super) struct FuturesIoCompat<T>(#[pin] pub(super) T);
741
742    impl<T> hyper::rt::Read for FuturesIoCompat<T>
743    where
744        // We require AsyncBufRead here it is a good match for ReadBufCursor::put_slice.
745        T: futures::io::AsyncBufRead,
746    {
747        fn poll_read(
748            self: Pin<&mut Self>,
749            cx: &mut Context<'_>,
750            mut buf: ReadBufCursor<'_>,
751        ) -> Poll<Result<(), io::Error>> {
752            let mut this = self.project();
753
754            let available: &[u8] = ready!(this.0.as_mut().poll_fill_buf(cx))?;
755            let n_available = available.len();
756
757            if !available.is_empty() {
758                buf.put_slice(available);
759                this.0.consume(n_available);
760            }
761
762            // This means either "data arrived" or "EOF" depending on whether we added new bytes.
763            Poll::Ready(Ok(()))
764        }
765    }
766
767    impl<T> hyper::rt::Write for FuturesIoCompat<T>
768    where
769        T: futures::io::AsyncWrite,
770    {
771        fn poll_write(
772            self: Pin<&mut Self>,
773            cx: &mut Context<'_>,
774            buf: &[u8],
775        ) -> Poll<Result<usize, std::io::Error>> {
776            self.project().0.poll_write(cx, buf)
777        }
778
779        fn poll_flush(
780            self: Pin<&mut Self>,
781            cx: &mut Context<'_>,
782        ) -> Poll<Result<(), std::io::Error>> {
783            self.project().0.poll_flush(cx)
784        }
785
786        fn poll_shutdown(
787            self: Pin<&mut Self>,
788            cx: &mut Context<'_>,
789        ) -> Poll<Result<(), std::io::Error>> {
790            self.project().0.poll_close(cx)
791        }
792    }
793}
794
795#[cfg(test)]
796mod test {
797    // @@ begin test lint list maintained by maint/add_warning @@
798    #![allow(clippy::bool_assert_comparison)]
799    #![allow(clippy::clone_on_copy)]
800    #![allow(clippy::dbg_macro)]
801    #![allow(clippy::mixed_attributes_style)]
802    #![allow(clippy::print_stderr)]
803    #![allow(clippy::print_stdout)]
804    #![allow(clippy::single_char_pattern)]
805    #![allow(clippy::unwrap_used)]
806    #![allow(clippy::unchecked_time_subtraction)]
807    #![allow(clippy::useless_vec)]
808    #![allow(clippy::needless_pass_by_value)]
809    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
810
811    use arti_client::{BootstrapBehavior, TorClient, config::TorClientConfigBuilder};
812    use futures::{AsyncReadExt as _, AsyncWriteExt as _};
813    use tor_rtmock::{MockRuntime, io::stream_pair};
814
815    use super::*;
816
817    // Make sure that HeaderMap is case-insensitive as the documentation implies.
818    #[test]
819    fn headermap_casei() {
820        use http::header::{HeaderMap, HeaderValue};
821        let mut hm = HeaderMap::new();
822        hm.append(
823            "my-head-is-a-house-for",
824            HeaderValue::from_str("a-secret").unwrap(),
825        );
826        assert_eq!(
827            hm.get("My-Head-Is-A-House-For").unwrap().as_bytes(),
828            b"a-secret"
829        );
830        assert_eq!(
831            hm.get("MY-HEAD-IS-A-HOUSE-FOR").unwrap().as_bytes(),
832            b"a-secret"
833        );
834    }
835
836    #[test]
837    fn host_header_localhost() {
838        assert_eq!(host_is_localhost("localhost"), true);
839        assert_eq!(host_is_localhost("localhost:9999"), true);
840        assert_eq!(host_is_localhost("localHOSt:9999"), true);
841        assert_eq!(host_is_localhost("127.0.0.1:9999"), true);
842        assert_eq!(host_is_localhost("[::1]:9999"), true);
843        assert_eq!(host_is_localhost("127.1.2.3:1234"), true);
844        assert_eq!(host_is_localhost("127.0.0.1"), true);
845        assert_eq!(host_is_localhost("::1"), true);
846
847        assert_eq!(host_is_localhost("[::1]"), false); // not in the right format!
848        assert_eq!(host_is_localhost("www.torproject.org"), false);
849        assert_eq!(host_is_localhost("www.torproject.org:1234"), false);
850        assert_eq!(host_is_localhost("localhost:0"), false);
851        assert_eq!(host_is_localhost("localhost:999999"), false);
852        assert_eq!(host_is_localhost("plocalhost:1234"), false);
853        assert_eq!(host_is_localhost("[::0]:1234"), false);
854        assert_eq!(host_is_localhost("192.0.2.55:1234"), false);
855        assert_eq!(host_is_localhost("3fff::1"), false);
856        assert_eq!(host_is_localhost("[3fff::1]:1234"), false);
857    }
858
859    fn interactive_test_setup(
860        rt: &MockRuntime,
861    ) -> anyhow::Result<(
862        tor_rtmock::io::LocalStream,
863        impl Future<Output = anyhow::Result<()>>,
864        tempfile::TempDir,
865    )> {
866        let (s1, s2) = stream_pair();
867        let s1: BufReader<_> = BufReader::new(s1);
868
869        let iso: ListenerIsolation = (7, "127.0.0.1".parse().unwrap());
870        let dir = tempfile::TempDir::new().unwrap();
871        let cfg = TorClientConfigBuilder::from_directories(
872            dir.as_ref().join("state"),
873            dir.as_ref().join("cache"),
874        )
875        .build()
876        .unwrap();
877        let tor_client = TorClient::with_runtime(rt.clone())
878            .config(cfg)
879            .bootstrap_behavior(BootstrapBehavior::Manual)
880            .create_unbootstrapped()?;
881        let context: ProxyContext<_> = ProxyContext {
882            tor_client,
883            #[cfg(feature = "rpc")]
884            rpc_mgr: None,
885            protocols: crate::proxy::ListenProtocols::SocksAndHttpConnect,
886        };
887        let handle = rt.spawn_join("HTTP Handler", handle_http_conn(context, s1, iso));
888        Ok((s2, handle, dir))
889    }
890
891    #[test]
892    fn successful_options_test() -> anyhow::Result<()> {
893        // Try an OPTIONS request and make sure we get a plausible-looking answer.
894        //
895        // (This test is mostly here to make sure that invalid_host_test() isn't failing because
896        // of anything besides the Host header.)
897        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
898            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
899
900            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: localhost\r\n\r\n")
901                .await?;
902            let mut buf = Vec::new();
903            let _n_read = s.read_to_end(&mut buf).await?;
904            let () = join.await?;
905
906            let reply = std::str::from_utf8(&buf)?;
907            assert!(dbg!(reply).starts_with("HTTP/1.0 200 OK\r\n"));
908
909            Ok(())
910        })
911    }
912
913    #[test]
914    fn invalid_host_test() -> anyhow::Result<()> {
915        // Try a hostname that looks like a CSRF attempt and make sure that we discard it without
916        // any reply.
917        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
918            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
919
920            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: csrf.example.com\r\n\r\n")
921                .await?;
922            let mut buf = Vec::new();
923            let n_read = s.read_to_end(&mut buf).await?;
924            let http_outcome = join.await;
925
926            assert_eq!(n_read, 0);
927            assert!(buf.is_empty());
928            assert!(http_outcome.is_err());
929
930            let error_msg = http_outcome.unwrap_err().source().unwrap().to_string();
931            assert_eq!(
932                error_msg,
933                r#"Host header "csrf.example.com" was not localhost. Rejecting request."#
934            );
935
936            Ok(())
937        })
938    }
939}