1use crate::config::{ManagedTransportOptions, TransportOptions};
4use crate::err;
5use crate::err::PtError;
6use crate::ipc::{
7 PluggableClientTransport, PluggableTransport, PtClientParameters, PtCommonParameters,
8 sealed::PluggableTransportPrivate,
9};
10use crate::{PtClientMethod, PtSharedState};
11use futures::channel::mpsc::UnboundedReceiver;
12use futures::stream::FuturesUnordered;
13use futures::{FutureExt, StreamExt, select};
14use oneshot_fused_workaround as oneshot;
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::path::{Path, PathBuf};
18use std::pin::Pin;
19use std::sync::{Arc, RwLock};
20use tor_config_path::CfgPathResolver;
21use tor_error::internal;
22use tor_linkspec::PtTransportName;
23use tor_rtcompat::Runtime;
24use tracing::{debug, warn};
25
26pub(crate) enum PtReactorMessage {
28 Reconfigured,
30 Spawn {
32 pt: PtTransportName,
34 result: oneshot::Sender<err::Result<PtClientMethod>>,
36 },
37}
38
39type SpawnResult = (Vec<PtTransportName>, err::Result<PluggableClientTransport>);
41
42pub(crate) struct PtReactor<R> {
44 rt: R,
46 running: Vec<PluggableClientTransport>,
48 requests: HashMap<PtTransportName, Vec<oneshot::Sender<err::Result<PtClientMethod>>>>,
53 spawning: FuturesUnordered<Pin<Box<dyn Future<Output = SpawnResult> + Send>>>,
57 state: Arc<RwLock<PtSharedState>>,
59 rx: UnboundedReceiver<PtReactorMessage>,
63 state_dir: PathBuf,
65 path_resolver: Arc<CfgPathResolver>,
67}
68
69impl<R: Runtime> PtReactor<R> {
70 pub(crate) fn new(
72 rt: R,
73 state: Arc<RwLock<PtSharedState>>,
74 rx: UnboundedReceiver<PtReactorMessage>,
75 state_dir: PathBuf,
76 path_resolver: Arc<CfgPathResolver>,
77 ) -> Self {
78 let spawning = FuturesUnordered::new();
79 spawning.push(Box::pin(futures::future::pending::<SpawnResult>())
80 as Pin<Box<dyn Future<Output = _> + Send>>);
81 Self {
82 rt,
83 running: vec![],
84 requests: Default::default(),
85 spawning,
86 state,
87 rx,
88 state_dir,
89 path_resolver,
90 }
91 }
92
93 #[allow(clippy::needless_pass_by_value)]
95 fn handle_spawned(
96 &mut self,
97 covers: Vec<PtTransportName>,
98 result: err::Result<PluggableClientTransport>,
99 ) {
100 match result {
101 Err(e) => {
102 warn!("Spawning PT for {:?} failed: {}", covers, e);
103 let senders = covers
105 .iter()
106 .flat_map(|x| self.requests.remove(x))
107 .flatten();
108 for sender in senders {
109 let _ = sender.send(Err(e.clone()));
111 }
112 }
113 Ok(pt) => {
114 let mut state = self.state.write().expect("ptmgr state poisoned");
115 for (transport, method) in pt.transport_methods() {
116 state
117 .managed_cmethods
118 .insert(transport.clone(), method.clone());
119 for sender in self.requests.remove(transport).into_iter().flatten() {
120 let _ = sender.send(Ok(method.clone()));
121 }
122 }
123
124 let requested: HashSet<_> = covers.iter().collect();
125 let found: HashSet<_> = pt.transport_methods().keys().collect();
126 if requested != found {
127 warn!(
128 "Bug: PT {} succeeded, but did not give the same transports we asked for. ({:?} vs {:?})",
129 pt.identifier(),
130 found,
131 requested
132 );
133 }
134 self.running.push(pt);
135 }
136 }
137 }
138
139 fn remove_pt(&self, pt: PluggableClientTransport) {
141 let mut state = self.state.write().expect("ptmgr state poisoned");
142 for transport in pt.transport_methods().keys() {
143 state.managed_cmethods.remove(transport);
144 }
145 drop(pt);
148 }
149
150 pub(crate) async fn run_one_step(&mut self) -> err::Result<bool> {
152 use futures::future::Either;
153
154 let mut all_next_messages = self
157 .running
158 .iter_mut()
159 .map(|pt| Box::pin(pt.next_message()))
162 .collect::<Vec<_>>();
163
164 let mut next_message = if all_next_messages.is_empty() {
166 Either::Left(futures::future::pending())
167 } else {
168 Either::Right(futures::future::select_all(all_next_messages.iter_mut()).fuse())
169 };
170
171 select! {
172 (result, idx, _) = next_message => {
173 drop(all_next_messages); match result {
176 Ok(m) => {
177 debug!("PT {} message: {:?}", self.running[idx].identifier(), m);
179 },
180 Err(e) => {
181 warn!("PT {} quit: {:?}", self.running[idx].identifier(), e);
182 let pt = self.running.remove(idx);
183 self.remove_pt(pt);
184 }
185 }
186 },
187 spawn_result = self.spawning.next() => {
188 drop(all_next_messages);
189 let (covers, result) = spawn_result.expect("self.spawning should never dry up");
191 self.handle_spawned(covers, result);
192 }
193 internal = self.rx.next() => {
194 drop(all_next_messages);
195
196 match internal {
197 Some(PtReactorMessage::Reconfigured) => {},
198 Some(PtReactorMessage::Spawn { pt, result }) => {
199 if let Some(requests) = self.requests.get_mut(&pt) {
201 requests.push(result);
202 return Ok(false);
203 }
204 for rpt in self.running.iter() {
206 if let Some(cmethod) = rpt.transport_methods().get(&pt) {
207 let _ = result.send(Ok(cmethod.clone()));
208 return Ok(false);
209 }
210 }
211 let config = {
213 let state = self.state.read().expect("ptmgr state poisoned");
214 state.configured.get(&pt).cloned()
215 };
216
217 let Some(config) = config else {
218 let _ = result.send(Err(PtError::UnconfiguredTransportDueToConcurrentReconfiguration));
219 return Ok(false);
220 };
221
222 let TransportOptions::Managed(config) = config else {
223 let _ = result.send(Err(internal!("Tried to spawn an unmanaged transport").into()));
224 return Ok(false);
225 };
226
227 self.requests.entry(pt).or_default().push(result);
230 for proto in config.protocols.iter() {
231 self.requests.entry(proto.clone()).or_default();
232 }
233
234 let spawn_fut = Box::pin(
236 spawn_from_config(
237 self.rt.clone(),
238 self.state_dir.clone(),
239 config.clone(),
240 Arc::clone(&self.path_resolver)
241 )
242 .map(|result| (config.protocols, result))
243 );
244 self.spawning.push(spawn_fut);
245 },
246 None => return Ok(true)
247 }
248 }
249 }
250 Ok(false)
251 }
252}
253
254async fn spawn_from_config<R: Runtime>(
256 rt: R,
257 state_dir: PathBuf,
258 cfg: ManagedTransportOptions,
259 path_resolver: Arc<CfgPathResolver>,
260) -> Result<PluggableClientTransport, PtError> {
261 let cfg_path = cfg.path;
264
265 let binary_path = cfg_path
266 .path(&path_resolver)
267 .map_err(|e| PtError::PathExpansionFailed {
268 path: cfg_path.clone(),
269 error: e,
270 })?;
271
272 let filename = pt_identifier_as_path(&binary_path)?;
273
274 let new_state_dir = state_dir.join(filename);
277 std::fs::create_dir_all(&new_state_dir).map_err(|e| PtError::StatedirCreateFailed {
278 path: new_state_dir.clone(),
279 error: Arc::new(e),
280 })?;
281
282 let pt_common_params = PtCommonParameters::builder()
284 .state_location(new_state_dir)
285 .build()
286 .expect("PtCommonParameters constructed incorrectly");
287
288 let pt_client_params = PtClientParameters::builder()
289 .transports(cfg.protocols)
290 .build()
291 .expect("PtClientParameters constructed incorrectly");
292
293 let mut pt = PluggableClientTransport::new(
294 binary_path,
295 cfg.arguments,
296 pt_common_params,
297 pt_client_params,
298 );
299 pt.launch(rt).await?;
300 Ok(pt)
301}
302
303fn pt_identifier_as_path(binary_path: impl AsRef<Path>) -> Result<PathBuf, PtError> {
306 let mut filename =
308 PathBuf::from(
309 binary_path
310 .as_ref()
311 .file_name()
312 .ok_or_else(|| PtError::NotAFile {
313 path: binary_path.as_ref().to_path_buf(),
314 })?,
315 );
316
317 if let Some(ext) = filename.extension() {
319 if ext.eq_ignore_ascii_case(std::env::consts::EXE_EXTENSION) {
320 filename.set_extension("");
321 }
322 }
323
324 Ok(filename)
325}
326
327pub(crate) fn pt_identifier(binary_path: impl AsRef<Path>) -> Result<String, PtError> {
330 Ok(pt_identifier_as_path(binary_path)?
331 .to_string_lossy()
332 .to_string())
333}