arti_rpcserver/stream.rs
1//! Objects that can become or wrap a [`arti_client::DataStream`].
2
3use arti_client::rpc::{
4 ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5};
6use derive_deftly::Deftly;
7use std::{
8 net::IpAddr,
9 sync::{Arc, Mutex},
10};
11use tor_error::into_internal;
12use tor_proto::client::stream::ClientDataStreamCtrl;
13use tor_rpcbase::{self as rpc, templates::*};
14
15use crate::RpcSession;
16
17/// An RPC object representing a single-use client that captures a data-stream.
18///
19/// This object is returned by the `arti:new_oneshot_client` method, and starts out with
20/// enough information to know how to create a `DataStream`, or to respond
21/// to some other SOCKS request.
22/// When this object is the target of a SOCKS request,
23/// it takes its target address, port, and isolation parameters from the SOCKS handshake,
24/// and launches a data stream.
25/// It then becomes interchangeable with the stream that was launched.
26///
27/// This object is single-use: once a SOCKS request has referred to it,
28/// it cannot be used for any other SOCKS request.
29/// (Otherwise, it could not be usable interchangeably with the `DataStream` it creates.)
30///
31/// The ObjectID for this object can be used as the target of a SOCKS request.
32#[derive(Deftly)]
33#[derive_deftly(Object)]
34#[deftly(rpc(
35 delegate_with = "|this: &Self| this.get_ctrl()",
36 delegate_type = "tor_proto::client::stream::ClientDataStreamCtrl",
37 expose_outside_of_session
38))]
39pub(crate) struct OneshotClient {
40 /// The inner state of this object.
41 inner: Mutex<Inner>,
42}
43
44/// The inner state of an `OneshotClient`.
45///
46/// A stream is created in the "Unused" state.
47enum Inner {
48 /// Newly constructed: Waiting for a SOCKS command.
49 ///
50 /// This is the initial state for every OneshotClient.
51 ///
52 /// It may become `Launching` or `UsedToResolve`.
53 Unused(Arc<dyn rpc::Object>),
54
55 /// The actual connection is being made, ie we are within `connect_with_prefs`
56 ///
57 /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
58 ///
59 /// From this state, a stream may become `Stream`, or `StreamFailed`.
60 Launching,
61
62 /// Stream constructed; may or may not be connected.
63 ///
64 /// A stream does not exit this state. Even if the stream is closed or fails,
65 /// its `ClientDataStreamCtrl` remains until it is dropped.
66 Stream(Arc<ClientDataStreamCtrl>),
67
68 /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
69 ///
70 /// A stream does not exit this state, even if resolve request fails.
71 //
72 // TODO RPC: We may want to make this state hold more information if someday we
73 // make DNS requests into objects that we can inspect while they are running.
74 UsedToResolve,
75
76 /// Failed to construct the tor_proto::DataStream object.
77 ///
78 /// A stream does not exit this state.
79 StreamFailed,
80}
81
82/// Error returned by an operations from OneshotClient.
83#[derive(Debug, Clone, thiserror::Error)]
84enum OneshotClientError {
85 /// Application tried to open a stream using a OneshotClient,
86 /// but that OneshotClient had already been used previously.
87 #[error("Data stream object already used")]
88 AlreadyUsed,
89}
90
91impl tor_error::HasKind for OneshotClientError {
92 fn kind(&self) -> tor_error::ErrorKind {
93 use OneshotClientError as E;
94 use tor_error::ErrorKind as EK;
95 match self {
96 E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
97 }
98 }
99}
100
101impl OneshotClient {
102 /// Construct a new unused OneshotClient that will make its connection
103 /// with `connector`.
104 ///
105 /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
106 /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
107 pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
108 Self {
109 inner: Mutex::new(Inner::Unused(connector)),
110 }
111 }
112
113 /// If this `OneshotClient` is in state Unused, replace its state with `new_state`
114 /// and return the ClientConnectionTarget. Otherwise, leave its state unchanged
115 /// and return an error.
116 fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
117 let mut inner = self.inner.lock().expect("poisoned lock");
118 let val = std::mem::replace(&mut *inner, new_state);
119 if let Inner::Unused(conn) = val {
120 Ok(conn)
121 } else {
122 *inner = val;
123 Err(OneshotClientError::AlreadyUsed)
124 }
125 }
126
127 /// Return the `ClientDataStreamCtrl` for this stream, if it has one.
128 fn get_ctrl(&self) -> Option<Arc<ClientDataStreamCtrl>> {
129 let inner = self.inner.lock().expect("poisoned lock");
130 if let Inner::Stream(s) = &*inner {
131 Some(s.clone())
132 } else {
133 None
134 }
135 }
136}
137
138/// Invoke ConnectWithPrefs on an OneshotClient.
139///
140/// Unlike the other methods on OneshotClient, this one is somewhat complex, since it must
141/// re-register the resulting datastream once it has one.
142async fn oneshot_client_connect_with_prefs(
143 rpc_data_stream: Arc<OneshotClient>,
144 mut method: Box<ConnectWithPrefs>,
145 ctx: Arc<dyn rpc::Context>,
146) -> ClientConnectionResult<arti_client::DataStream> {
147 // Extract the connector.
148 //
149 // As we do this, we put this OneshotClient into a Launching state.
150 //
151 // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
152 // but it isn't synchronous, so `Launching` is an observable state.)
153 let connector = rpc_data_stream
154 .take_connector(Inner::Launching)
155 .map_err(|e| Box::new(e) as _)?;
156
157 // Internally, we're going to tell tor-proto to make an optimistic stream.
158 // The only effect here is that the DataStream will be returned immediately by
159 // our invoke_special_method call, which would otherwise call `wait_for_connection`
160 // if the stream was _not_ originally optimistic.
161 //
162 // We use `was_optimistic` to remember whether the prefs was _originally_
163 // configured to give an optimistic stream,
164 // so that we know whether _we_ should do the `wait_for_connection``.
165 //
166 // From the POV of the SOCKS proxy code that is calling this function,
167 // it will still receive the requested optimistic or non-optimistic behavior,
168 // since the `wait_for_connection` call will still happen (or not happen)
169 // as requested, causing _this_ function to possibly wait.
170 //
171 // The only observable impact here is that this object
172 // will immediately transition to its new state,
173 // so that other RPC calls will see a `DataStreamCtrl` object.
174 let was_optimistic = method.prefs.is_optimistic();
175 method.prefs.optimistic();
176
177 // Now, launch the connection. Since we marked it as optimistic,
178 // this call should return almost immediately.
179 let stream: Result<arti_client::DataStream, _> =
180 *rpc::invoke_special_method(ctx, connector, method)
181 .await
182 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
183
184 // Pick the new state for this object, and install it.
185 let new_obj = match &stream {
186 Ok(s) => Inner::Stream(
187 s.client_stream_ctrl()
188 .expect("Created a client stream with no ClientDataStreamCtrl!?")
189 .clone(),
190 ),
191 Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
192 };
193 {
194 let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
195 *inner = new_obj;
196 }
197 // Return early on failure.
198 let mut stream = stream?;
199
200 if !was_optimistic {
201 // Implement non-optimistic behavior, if that is what was originally configured.
202 stream
203 .wait_for_connection()
204 .await
205 .map_err(|e| Box::new(e) as _)?;
206 }
207
208 // Return the stream; the SOCKS layer will take it from here.
209 Ok(stream)
210}
211
212/// Invoke ResolveWithPrefs on an OneshotClient
213async fn oneshot_client_resolve_with_prefs(
214 rpc_data_stream: Arc<OneshotClient>,
215 method: Box<ResolveWithPrefs>,
216 ctx: Arc<dyn rpc::Context>,
217) -> ClientConnectionResult<Vec<IpAddr>> {
218 let connector = rpc_data_stream
219 .take_connector(Inner::UsedToResolve)
220 .map_err(|e| Box::new(e) as _)?;
221
222 let result = rpc::invoke_special_method(ctx, connector, method)
223 .await
224 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
225
226 *result
227}
228
229/// Invoke ResolvePtrWithPrefs on an OneshotClient
230async fn oneshot_client_resolve_ptr_with_prefs(
231 rpc_data_stream: Arc<OneshotClient>,
232 method: Box<ResolvePtrWithPrefs>,
233 ctx: Arc<dyn rpc::Context>,
234) -> ClientConnectionResult<Vec<String>> {
235 let connector = rpc_data_stream
236 .take_connector(Inner::UsedToResolve)
237 .map_err(|e| Box::new(e) as _)?;
238
239 let result = rpc::invoke_special_method(ctx, connector, method)
240 .await
241 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
242
243 *result
244}
245
246/// Create a new `RpcOneshotClient` to wait for a SOCKS request.
247///
248/// The resulting ObjectID will be a handle to an `RpcOneshotClient`.
249/// It can be used as the target of a single SOCKS request.
250///
251/// Once used for a SOCKS connect request,
252/// the object will become a handle for the underlying DataStream
253/// that was created with the request.
254#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
255#[derive_deftly(DynMethod)]
256#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
257pub(crate) struct NewOneshotClient {}
258
259impl rpc::RpcMethod for NewOneshotClient {
260 type Output = rpc::SingleIdResponse;
261 type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
262}
263
264/// Helper: construct and register an OneshotClient.
265fn new_oneshot_client_impl(
266 connector: Arc<dyn rpc::Object>,
267 ctx: &dyn rpc::Context,
268) -> rpc::ObjectId {
269 let rpc_stream = Arc::new(OneshotClient::new(connector));
270 ctx.register_owned(rpc_stream as _)
271}
272
273/// Implement NewOneshotClient for clients.
274pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
275 client: Arc<arti_client::TorClient<R>>,
276 _method: Box<NewOneshotClient>,
277 ctx: Arc<dyn rpc::Context>,
278) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
279 Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
280}
281
282/// Implement NewOneshotClient for RpcSession.
283async fn new_oneshot_client_on_session(
284 session: Arc<RpcSession>,
285 _method: Box<NewOneshotClient>,
286 ctx: Arc<dyn rpc::Context>,
287) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
288 Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
289}
290rpc::static_rpc_invoke_fn! {
291 new_oneshot_client_on_session;
292 @special oneshot_client_connect_with_prefs;
293 @special oneshot_client_resolve_with_prefs;
294 @special oneshot_client_resolve_ptr_with_prefs;
295}