Skip to main content

arti_rpcserver/
stream.rs

1//! Objects that can become or wrap a [`arti_client::DataStream`].
2
3use arti_client::rpc::{
4    ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5};
6use derive_deftly::Deftly;
7use std::{
8    net::IpAddr,
9    sync::{Arc, Mutex},
10};
11use tor_error::into_internal;
12use tor_proto::client::stream::ClientDataStreamCtrl;
13use tor_rpcbase::{self as rpc, templates::*};
14
15use crate::RpcSession;
16
17/// An RPC object representing a single-use client that captures a data-stream.
18///
19/// This object is returned by the `arti:new_oneshot_client` method, and starts out with
20/// enough information to know how to create a `DataStream`, or to respond
21/// to some other SOCKS request.
22/// When this object is the target of a SOCKS request,
23/// it takes its target address, port, and isolation parameters from the SOCKS handshake,
24/// and launches a data stream.
25/// It then becomes interchangeable with the stream that was launched.
26///
27/// This object is single-use: once a SOCKS request has referred to it,
28/// it cannot be used for any other SOCKS request.
29/// (Otherwise, it could not be usable interchangeably with the `DataStream` it creates.)
30///
31/// The ObjectID for this object can be used as the target of a SOCKS request.
32#[derive(Deftly)]
33#[derive_deftly(Object)]
34#[deftly(rpc(
35    delegate_with = "|this: &Self| this.get_ctrl()",
36    delegate_type = "tor_proto::client::stream::ClientDataStreamCtrl",
37    expose_outside_of_session
38))]
39pub(crate) struct OneshotClient {
40    /// The inner state of this object.
41    inner: Mutex<Inner>,
42}
43
44/// The inner state of an `OneshotClient`.
45///
46/// A stream is created in the "Unused" state.
47enum Inner {
48    /// Newly constructed: Waiting for a SOCKS command.
49    ///
50    /// This is the initial state for every OneshotClient.
51    ///
52    /// It may become `Launching` or `UsedToResolve`.
53    Unused(Arc<dyn rpc::Object>),
54
55    /// The actual connection is being made, ie we are within `connect_with_prefs`
56    ///
57    /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
58    ///
59    /// From this state, a stream may become `Stream`, or `StreamFailed`.
60    Launching,
61
62    /// Stream constructed; may or may not be connected.
63    ///
64    /// A stream does not exit this state.  Even if the stream is closed or fails,
65    /// its `ClientDataStreamCtrl` remains until it is dropped.
66    Stream(Arc<ClientDataStreamCtrl>),
67
68    /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
69    ///
70    /// A stream does not exit this state, even if resolve request fails.
71    //
72    // TODO RPC: We may want to make this state hold more information if someday we
73    // make DNS requests into objects that we can inspect while they are running.
74    UsedToResolve,
75
76    /// Failed to construct the tor_proto::DataStream object.
77    ///
78    /// A stream does not exit this state.
79    StreamFailed,
80}
81
82/// Error returned by an operations from OneshotClient.
83#[derive(Debug, Clone, thiserror::Error)]
84enum OneshotClientError {
85    /// Application tried to open a stream using a OneshotClient,
86    /// but that OneshotClient had already been used previously.
87    #[error("Data stream object already used")]
88    AlreadyUsed,
89}
90
91impl tor_error::HasKind for OneshotClientError {
92    fn kind(&self) -> tor_error::ErrorKind {
93        use OneshotClientError as E;
94        use tor_error::ErrorKind as EK;
95        match self {
96            E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
97        }
98    }
99}
100
101impl OneshotClient {
102    /// Construct a new unused OneshotClient that will make its connection
103    /// with `connector`.
104    ///
105    /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
106    /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
107    pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
108        Self {
109            inner: Mutex::new(Inner::Unused(connector)),
110        }
111    }
112
113    /// If this `OneshotClient` is in state Unused, replace its state with `new_state`
114    /// and return the ClientConnectionTarget.  Otherwise, leave its state unchanged
115    /// and return an error.
116    fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
117        let mut inner = self.inner.lock().expect("poisoned lock");
118        let val = std::mem::replace(&mut *inner, new_state);
119        if let Inner::Unused(conn) = val {
120            Ok(conn)
121        } else {
122            *inner = val;
123            Err(OneshotClientError::AlreadyUsed)
124        }
125    }
126
127    /// Return the `ClientDataStreamCtrl` for this stream, if it has one.
128    fn get_ctrl(&self) -> Option<Arc<ClientDataStreamCtrl>> {
129        let inner = self.inner.lock().expect("poisoned lock");
130        if let Inner::Stream(s) = &*inner {
131            Some(s.clone())
132        } else {
133            None
134        }
135    }
136}
137
138/// Invoke ConnectWithPrefs on an OneshotClient.
139///
140/// Unlike the other methods on OneshotClient, this one is somewhat complex, since it must
141/// re-register the resulting datastream once it has one.
142async fn oneshot_client_connect_with_prefs(
143    rpc_data_stream: Arc<OneshotClient>,
144    mut method: Box<ConnectWithPrefs>,
145    ctx: Arc<dyn rpc::Context>,
146) -> ClientConnectionResult<arti_client::DataStream> {
147    // Extract the connector.
148    //
149    // As we do this, we put this OneshotClient into a Launching state.
150    //
151    // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
152    // but it isn't synchronous, so `Launching` is an observable state.)
153    let connector = rpc_data_stream
154        .take_connector(Inner::Launching)
155        .map_err(|e| Box::new(e) as _)?;
156
157    // Internally, we're going to tell tor-proto to make an optimistic stream.
158    // The only effect here is that the DataStream will be returned immediately by
159    // our invoke_special_method call, which would otherwise call `wait_for_connection`
160    // if the stream was _not_ originally optimistic.
161    //
162    // We use `was_optimistic` to remember whether the prefs was _originally_
163    // configured to give an optimistic stream,
164    // so that we know whether _we_ should do the `wait_for_connection``.
165    //
166    // From the POV of the SOCKS proxy code that is calling this function,
167    // it will still receive the requested optimistic or non-optimistic behavior,
168    // since the `wait_for_connection` call will still happen (or not happen)
169    // as requested, causing _this_ function to possibly wait.
170    //
171    // The only observable impact here is that this object
172    // will immediately transition to its new state,
173    // so that other RPC calls will see a `DataStreamCtrl` object.
174    let was_optimistic = method.prefs.is_optimistic();
175    method.prefs.optimistic();
176
177    // Now, launch the connection.  Since we marked it as optimistic,
178    // this call should return almost immediately.
179    let stream: Result<arti_client::DataStream, _> =
180        *rpc::invoke_special_method(ctx, connector, method)
181            .await
182            .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
183
184    // Pick the new state for this object, and install it.
185    let new_obj = match &stream {
186        Ok(s) => Inner::Stream(
187            s.client_stream_ctrl()
188                .expect("Created a client stream with no ClientDataStreamCtrl!?")
189                .clone(),
190        ),
191        Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
192    };
193    {
194        let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
195        *inner = new_obj;
196    }
197    // Return early on failure.
198    let mut stream = stream?;
199
200    if !was_optimistic {
201        // Implement non-optimistic behavior, if that is what was originally configured.
202        stream
203            .wait_for_connection()
204            .await
205            .map_err(|e| Box::new(e) as _)?;
206    }
207
208    // Return the stream; the SOCKS layer will take it from here.
209    Ok(stream)
210}
211
212/// Invoke ResolveWithPrefs on an OneshotClient
213async fn oneshot_client_resolve_with_prefs(
214    rpc_data_stream: Arc<OneshotClient>,
215    method: Box<ResolveWithPrefs>,
216    ctx: Arc<dyn rpc::Context>,
217) -> ClientConnectionResult<Vec<IpAddr>> {
218    let connector = rpc_data_stream
219        .take_connector(Inner::UsedToResolve)
220        .map_err(|e| Box::new(e) as _)?;
221
222    let result = rpc::invoke_special_method(ctx, connector, method)
223        .await
224        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
225
226    *result
227}
228
229/// Invoke ResolvePtrWithPrefs on an OneshotClient
230async fn oneshot_client_resolve_ptr_with_prefs(
231    rpc_data_stream: Arc<OneshotClient>,
232    method: Box<ResolvePtrWithPrefs>,
233    ctx: Arc<dyn rpc::Context>,
234) -> ClientConnectionResult<Vec<String>> {
235    let connector = rpc_data_stream
236        .take_connector(Inner::UsedToResolve)
237        .map_err(|e| Box::new(e) as _)?;
238
239    let result = rpc::invoke_special_method(ctx, connector, method)
240        .await
241        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
242
243    *result
244}
245
246/// Create a new `RpcOneshotClient` to wait for a SOCKS request.
247///
248/// The resulting ObjectID will be a handle to an `RpcOneshotClient`.
249/// It can be used as the target of a single SOCKS request.
250///
251/// Once used for a SOCKS connect request,
252/// the object will become a handle for the underlying DataStream
253/// that was created with the request.
254#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
255#[derive_deftly(DynMethod)]
256#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
257pub(crate) struct NewOneshotClient {}
258
259impl rpc::RpcMethod for NewOneshotClient {
260    type Output = rpc::SingleIdResponse;
261    type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
262}
263
264/// Helper: construct and register an OneshotClient.
265fn new_oneshot_client_impl(
266    connector: Arc<dyn rpc::Object>,
267    ctx: &dyn rpc::Context,
268) -> rpc::ObjectId {
269    let rpc_stream = Arc::new(OneshotClient::new(connector));
270    ctx.register_owned(rpc_stream as _)
271}
272
273/// Implement NewOneshotClient for clients.
274pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
275    client: Arc<arti_client::TorClient<R>>,
276    _method: Box<NewOneshotClient>,
277    ctx: Arc<dyn rpc::Context>,
278) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
279    Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
280}
281
282/// Implement NewOneshotClient for RpcSession.
283async fn new_oneshot_client_on_session(
284    session: Arc<RpcSession>,
285    _method: Box<NewOneshotClient>,
286    ctx: Arc<dyn rpc::Context>,
287) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
288    Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
289}
290rpc::static_rpc_invoke_fn! {
291    new_oneshot_client_on_session;
292    @special oneshot_client_connect_with_prefs;
293    @special oneshot_client_resolve_with_prefs;
294    @special oneshot_client_resolve_ptr_with_prefs;
295}