tor_circmgr/mgr.rs
1//! Abstract code to manage a set of tunnels which has underlying circuit(s).
2//!
3//! This module implements the real logic for deciding when and how to
4//! launch tunnels, and for which tunnels to hand out in response to
5//! which requests.
6//!
7//! For testing and abstraction purposes, this module _does not_
8//! actually know anything about tunnels _per se_. Instead,
9//! everything is handled using a set of traits that are internal to this
10//! crate:
11//!
12//! * [`AbstractTunnel`] is a view of a tunnel.
13//! * [`AbstractTunnelBuilder`] knows how to build an `AbstractCirc`.
14//!
15//! Using these traits, the [`AbstractTunnelMgr`] object manages a set of
16//! tunnels , launching them as necessary, and keeping track of the
17//! restrictions on their use.
18
19// TODO:
20// - Testing
21// - Error from prepare_action()
22// - Error reported by restrict_mut?
23
24use crate::config::CircuitTiming;
25use crate::usage::{SupportedTunnelUsage, TargetTunnelUsage};
26use crate::{DirInfo, Error, PathConfig, Result, timeouts};
27
28use retry_error::RetryError;
29use tor_async_utils::mpsc_channel_no_memquota;
30use tor_basic_utils::retry::RetryDelay;
31use tor_config::MutCfg;
32use tor_error::{AbsRetryTime, HasRetryTime, debug_report, info_report, internal, warn_report};
33#[cfg(feature = "vanguards")]
34use tor_guardmgr::vanguards::VanguardMgr;
35use tor_linkspec::CircTarget;
36use tor_proto::circuit::UniqId;
37use tor_proto::client::circuit::{CircParameters, Path};
38use tor_rtcompat::{Runtime, SleepProviderExt};
39
40use async_trait::async_trait;
41use futures::channel::mpsc;
42use futures::future::{FutureExt, Shared};
43use futures::stream::{FuturesUnordered, StreamExt};
44use oneshot_fused_workaround as oneshot;
45use std::collections::HashMap;
46use std::fmt::Debug;
47use std::hash::Hash;
48use std::panic::AssertUnwindSafe;
49use std::sync::{self, Arc, Weak};
50use tor_rtcompat::SpawnExt;
51use tracing::{debug, instrument, trace, warn};
52use web_time_compat::{Duration, Instant};
53mod streams;
54
55/// Alias to force use of RandomState, regardless of features enabled in `weak_tables`.
56///
57/// See <https://github.com/tov/weak-table-rs/issues/23> for discussion.
58///
59/// (We could probably get away with a weaker hash function in this case, since
60/// the attacker _probably_ doesn't have control over our pointers.)
61type PtrWeakHashSet<T> = weak_table::PtrWeakHashSet<T, std::hash::RandomState>;
62
63/// Description of how we got a tunnel.
64#[non_exhaustive]
65#[derive(Debug, Copy, Clone, Eq, PartialEq)]
66pub(crate) enum TunnelProvenance {
67 /// This channel was newly launched, or was in progress and finished while
68 /// we were waiting.
69 NewlyCreated,
70 /// This channel already existed when we asked for it.
71 Preexisting,
72}
73
74/// An error returned when we cannot apply circuit restriction.
75#[derive(Clone, Debug, thiserror::Error)]
76#[non_exhaustive]
77pub enum RestrictionFailed {
78 /// Tried to restrict a specification, but the tunnel didn't support the
79 /// requested usage.
80 #[error("Specification did not support desired usage")]
81 NotSupported,
82}
83
84/// Minimal abstract view of a tunnel.
85///
86/// From this module's point of view, tunnels are simply objects
87/// with unique identities, and a possible closed-state.
88#[async_trait]
89pub(crate) trait AbstractTunnel: Debug {
90 /// Type for a unique identifier for tunnels.
91 type Id: Clone + Debug + Hash + Eq + Send + Sync;
92 /// Return the unique identifier for this tunnel.
93 ///
94 /// # Requirements
95 ///
96 /// The values returned by this function are unique for distinct
97 /// tunnels.
98 fn id(&self) -> Self::Id;
99
100 /// Return true if this tunnel is usable for some purpose.
101 ///
102 /// Reasons a tunnel might be unusable include being closed.
103 fn usable(&self) -> bool;
104
105 /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
106 ///
107 /// Returns an error if the tunnel has more than one tunnel.
108 fn single_path(&self) -> tor_proto::Result<Arc<Path>>;
109
110 /// Return the number of hops in this tunnel.
111 ///
112 /// Returns an error if the circuit is closed.
113 ///
114 /// NOTE: This function will currently return only the number of hops
115 /// _currently_ in the tunnel. If there is an extend operation in progress,
116 /// the currently pending hop may or may not be counted, depending on whether
117 /// the extend operation finishes before this call is done.
118 fn n_hops(&self) -> tor_proto::Result<usize>;
119
120 /// Return true if this tunnel is closed and therefore unusable.
121 fn is_closing(&self) -> bool;
122
123 /// Return a process-unique identifier for this tunnel.
124 fn unique_id(&self) -> UniqId;
125
126 /// Extend the tunnel via the most appropriate handshake to a new `target` hop.
127 async fn extend<T: CircTarget + Sync>(
128 &self,
129 target: &T,
130 params: CircParameters,
131 ) -> tor_proto::Result<()>;
132
133 /// Return a time at which this tunnel is last known to be used,
134 /// or None if it is in use right now (or has never been used).
135 async fn last_known_to_be_used_at(&self) -> tor_proto::Result<Option<Instant>>;
136}
137
138/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
139///
140/// You should implement this trait using all default methods for all code that isn't test code.
141pub(crate) trait MockablePlan {
142 /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
143 /// so that it knows what to pass to `::release_advance()`.
144 fn add_blocked_advance_reason(&mut self, _reason: String) {}
145}
146
147/// An object that knows how to build tunnels.
148///
149/// This creates tunnels in two phases. First, a plan is
150/// made for how to build the tunnel. This planning phase should be
151/// relatively fast, and must not suspend or block. Its purpose is to
152/// get an early estimate of which operations the tunnel will be able
153/// to support when it's done.
154///
155/// Second, the tunnel is actually built, using the plan as input.
156
157#[async_trait]
158pub(crate) trait AbstractTunnelBuilder<R: Runtime>: Send + Sync {
159 /// The tunnel type that this builder knows how to build.
160 type Tunnel: AbstractTunnel + Send + Sync;
161 /// An opaque type describing how a given tunnel will be built.
162 /// It may represent some or all of a path-or it may not.
163 //
164 // TODO: It would be nice to have this parameterized on a lifetime,
165 // and have that lifetime depend on the lifetime of the directory.
166 // But I don't think that rust can do that.
167 //
168 // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
169 type Plan: Send + Debug + MockablePlan;
170
171 // TODO: I'd like to have a Dir type here to represent
172 // create::DirInfo, but that would need to be parameterized too,
173 // and would make everything complicated.
174
175 /// Form a plan for how to build a new tunnel that supports `usage`.
176 ///
177 /// Return an opaque Plan object, and a new spec describing what
178 /// the tunnel will actually support when it's built. (For
179 /// example, if the input spec requests a tunnel that connect to
180 /// port 80, then "planning" the tunnel might involve picking an
181 /// exit that supports port 80, and the resulting spec might be
182 /// the exit's complete list of supported ports.)
183 ///
184 /// # Requirements
185 ///
186 /// The resulting Spec must support `usage`.
187 fn plan_tunnel(
188 &self,
189 usage: &TargetTunnelUsage,
190 dir: DirInfo<'_>,
191 ) -> Result<(Self::Plan, SupportedTunnelUsage)>;
192
193 /// Construct a tunnel according to a given plan.
194 ///
195 /// On success, return a spec describing what the tunnel can be used for,
196 /// and the tunnel that was just constructed.
197 ///
198 /// This function should implement some kind of a timeout for
199 /// tunnel that are taking too long.
200 ///
201 /// # Requirements
202 ///
203 /// The spec that this function returns _must_ support the usage
204 /// that was originally passed to `plan_tunnel`. It _must_ also
205 /// contain the spec that was originally returned by
206 /// `plan_tunnel`.
207 async fn build_tunnel(&self, plan: Self::Plan) -> Result<(SupportedTunnelUsage, Self::Tunnel)>;
208
209 /// Return a "parallelism factor" with which tunnels should be
210 /// constructed for a given purpose.
211 ///
212 /// If this function returns N, then whenever we launch tunnels
213 /// for this purpose, then we launch N in parallel.
214 ///
215 /// The default implementation returns 1. The value of 0 is
216 /// treated as if it were 1.
217 fn launch_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
218 let _ = usage; // default implementation ignores this.
219 1
220 }
221
222 /// Return a "parallelism factor" for which tunnels should be
223 /// used for a given purpose.
224 ///
225 /// If this function returns N, then whenever we select among
226 /// open tunnels for this purpose, we choose at random from the
227 /// best N.
228 ///
229 /// The default implementation returns 1. The value of 0 is
230 /// treated as if it were 1.
231 // TODO: Possibly this doesn't belong in this trait.
232 fn select_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
233 let _ = usage; // default implementation ignores this.
234 1
235 }
236
237 /// Return true if we are currently attempting to learn tunnel
238 /// timeouts by building testing tunnels.
239 fn learning_timeouts(&self) -> bool;
240
241 /// Flush state to the state manager if we own the lock.
242 ///
243 /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
244 fn save_state(&self) -> Result<bool>;
245
246 /// Return this builder's [`PathConfig`].
247 fn path_config(&self) -> Arc<PathConfig>;
248
249 /// Replace this builder's [`PathConfig`].
250 // TODO: This is dead_code because we only call this for the CircuitBuilder specialization of
251 // CircMgr, not from the generic version, because this trait doesn't provide guardmgr, which is
252 // needed by the [`CircMgr::reconfigure`] function that would be the only caller of this. We
253 // should add `guardmgr` to this trait, make [`CircMgr::reconfigure`] generic, and remove this
254 // dead_code marking.
255 #[allow(dead_code)]
256 fn set_path_config(&self, new_config: PathConfig);
257
258 /// Return a reference to this builder's timeout estimator.
259 fn estimator(&self) -> &timeouts::Estimator;
260
261 /// Return a reference to this builder's `VanguardMgr`.
262 #[cfg(feature = "vanguards")]
263 fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
264
265 /// Replace our state with a new owning state, assuming we have
266 /// storage permission.
267 fn upgrade_to_owned_state(&self) -> Result<()>;
268
269 /// Reload persistent state from disk, if we don't have storage permission.
270 fn reload_state(&self) -> Result<()>;
271
272 /// Return a reference to this builder's `GuardMgr`.
273 fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
274
275 /// Reconfigure this builder using the latest set of network parameters.
276 ///
277 /// (NOTE: for now, this only affects tunnel timeout estimation.)
278 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
279}
280
281/// Enumeration to track the expiration state of a tunnel.
282///
283/// A tunnel an either be unused (at which point it should expire if it is
284/// _still unused_ by a certain time, or dirty (at which point it should
285/// expire after a certain duration).
286///
287/// All tunnels start out "unused" and become "dirty" when their spec
288/// is first restricted -- that is, when they are first handed out to be
289/// used for a request.
290#[derive(Debug, Clone, PartialEq, Eq)]
291enum ExpirationInfo {
292 /// The tunnel has never been used, and has never been restricted for use with a request.
293 Unused {
294 /// A time when the tunnel was created.
295 created: Instant,
296 },
297
298 /// The tunnel is not-long-lived; we will expire by waiting until a certain amount of time
299 /// after it was first used.
300 Dirty {
301 /// The time at which this tunnel's spec was first restricted.
302 dirty_since: Instant,
303 },
304
305 /// The tunnel is long-lived; we will expire by waiting until it has passed
306 /// a certain amount of time without having any streams attached to it.
307 LongLived {
308 /// Last time at which the tunnel was checked and found not to have any streams.
309 ///
310 /// (This is a bit complicated: We have to be vague here, since we need
311 /// an async check to find out that a tunnel is used, or when it actually
312 /// became disused.)
313 last_known_to_be_used_at: Instant,
314 },
315}
316
317impl ExpirationInfo {
318 /// Return an ExpirationInfo for a newly created tunnel.
319 fn new(now: Instant) -> Self {
320 ExpirationInfo::Unused { created: now }
321 }
322
323 /// Mark this ExpirationInfo as having been in-use at `now`.
324 ///
325 /// If `long_lived` is false, the associated tunnel should expire a certain amount of time
326 /// after it was _first_ used.
327 /// If `long_lived` is true, the associated tunnel should expire a certain amount of time
328 /// after it was _last_ used.
329 fn mark_used(&mut self, now: Instant, long_lived: bool) {
330 if long_lived {
331 *self = ExpirationInfo::LongLived {
332 last_known_to_be_used_at: now,
333 };
334 } else {
335 match self {
336 ExpirationInfo::Unused { .. } => {
337 // This is our first time using this circuit; mark it dirty
338 *self = ExpirationInfo::Dirty { dirty_since: now };
339 }
340 ExpirationInfo::Dirty { .. } => {
341 // no need to update; we're tracking the time when the circuit _first_ became
342 // dirty, so further uses don't matter.
343 }
344 ExpirationInfo::LongLived { .. } => {
345 // shouldn't occur: we shouldn't be able to attach a stream with non-long-lived isolation
346 // to a tunnel marked as long-lived. In this case we leave the timestamp alone.
347 // (If there were a bug here, it would be harmless, since we would
348 // correct the timestamp the next time we tried to expire the circuit.)
349 }
350 }
351 }
352 }
353
354 /// Return an internal error if this ExpirationInfo is not marked as long-lived.
355 fn check_long_lived(&self) -> Result<()> {
356 match self {
357 ExpirationInfo::Unused { .. } | ExpirationInfo::Dirty { .. } => Err(internal!(
358 "Tunnel was not long-lived as expected. (Expiration status: {:?})",
359 self
360 )
361 .into()),
362 ExpirationInfo::LongLived { .. } => Ok(()),
363 }
364 }
365}
366
367/// Settings to determine when circuits are expired.
368#[derive(Clone, Debug)]
369pub(crate) struct ExpirationParameters {
370 /// Any unused circuit is expired this long after it was created.
371 expire_unused_after: Duration,
372 /// Any non long-lived dirty circuit is expired this long after it first becomes dirty.
373 expire_dirty_after: Duration,
374 /// Any long-lived circuit is expired after having been disused for this long.
375 expire_disused_after: Duration,
376}
377
378/// An entry for an open tunnel held by an `AbstractTunnelMgr`.
379#[derive(Debug, Clone)]
380pub(crate) struct OpenEntry<T> {
381 /// The supported usage for this tunnel.
382 spec: SupportedTunnelUsage,
383 /// The tunnel under management.
384 tunnel: Arc<T>,
385 /// When does this tunnel expire?
386 ///
387 /// (Note that expired tunnels are removed from the manager,
388 /// which does not actually close them until there are no more
389 /// references to them.)
390 expiration: ExpirationInfo,
391}
392
393impl<T: AbstractTunnel> OpenEntry<T> {
394 /// Make a new OpenEntry for a given tunnel and spec.
395 fn new(spec: SupportedTunnelUsage, tunnel: T, expiration: ExpirationInfo) -> Self {
396 OpenEntry {
397 spec,
398 tunnel: tunnel.into(),
399 expiration,
400 }
401 }
402
403 /// Return true if the underlying tunnel can be used for `usage`.
404 pub(crate) fn supports(&self, usage: &TargetTunnelUsage) -> bool {
405 self.tunnel.usable() && self.spec.supports(usage)
406 }
407
408 /// Change the underlying tunnel's permissible usage, based on its having
409 /// been used for `usage` at time `now`.
410 ///
411 /// Return an error if the tunnel may not be used for `usage`.
412 fn restrict_mut(&mut self, usage: &TargetTunnelUsage, now: Instant) -> Result<()> {
413 self.spec.restrict_mut(usage)?;
414 self.expiration.mark_used(now, self.spec.is_long_lived());
415 Ok(())
416 }
417
418 /// Find the "best" entry from a slice of OpenEntry for supporting
419 /// a given `usage`.
420 ///
421 /// If `parallelism` is some N greater than 1, we pick randomly
422 /// from the best `N` tunnels.
423 ///
424 /// # Requirements
425 ///
426 /// Requires that `ents` is nonempty, and that every element of `ents`
427 /// supports `spec`.
428 fn find_best<'a>(
429 // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
430 ents: &'a mut [&'a mut Self],
431 usage: &TargetTunnelUsage,
432 parallelism: usize,
433 ) -> &'a mut Self {
434 let _ = usage; // not yet used.
435 use rand::seq::IndexedMutRandom as _;
436 let parallelism = parallelism.clamp(1, ents.len());
437 // TODO: Actually look over the whole list to see which is better.
438 let slice = &mut ents[0..parallelism];
439 let mut rng = rand::rng();
440 slice.choose_mut(&mut rng).expect("Input list was empty")
441 }
442
443 /// Return true if this tunnel should be expired given that the current time is `now`,
444 /// and the current settings are `params`.
445 fn should_expire(&self, now: Instant, params: &ExpirationParameters) -> ShouldExpire {
446 match self.expiration {
447 ExpirationInfo::Unused { created } => {
448 ShouldExpire::certain(now, created + params.expire_unused_after)
449 }
450 ExpirationInfo::Dirty { dirty_since } => {
451 ShouldExpire::certain(now, dirty_since + params.expire_dirty_after)
452 }
453 ExpirationInfo::LongLived {
454 last_known_to_be_used_at,
455 } => {
456 ShouldExpire::uncertain(now, last_known_to_be_used_at + params.expire_disused_after)
457 }
458 }
459 }
460}
461
462/// When should a tunnel expire?
463///
464/// Reflects possible uncertainty.
465#[derive(Clone, Copy, Debug, Eq, PartialEq)]
466enum ShouldExpire {
467 /// The tunnel should expire now.
468 Now,
469 /// The circuit might expire now; we need to check.
470 ///
471 /// (This is the result we get when we know that this is a tunnel that should expire
472 /// if it has gone for some duration D without having any streams on it,
473 /// and that it definitely had a stream at time T. It is now at least time T+D,
474 /// but we don't know whether the tunnel has any streams in the intervening time.
475 /// We need to call the async fn `last_known_to_be_used_at` to check.)
476 PossiblyNow,
477 /// The tunnel will not expire before the specified time.
478 NotBefore(Instant),
479}
480
481impl ShouldExpire {
482 /// Return a ShouldExpire reflecting an expiration that is known to be happening at `expiration`.
483 fn certain(now: Instant, expiration: Instant) -> Self {
484 if now >= expiration {
485 ShouldExpire::Now
486 } else {
487 ShouldExpire::NotBefore(expiration)
488 }
489 }
490
491 /// Return a ShouldExpire reflecting an expiration that is known to be no sooner than `expiration`,
492 /// but possibly later.
493 fn uncertain(now: Instant, expiration: Instant) -> Self {
494 if now >= expiration {
495 ShouldExpire::PossiblyNow
496 } else {
497 ShouldExpire::NotBefore(expiration)
498 }
499 }
500}
501
502/// A result type whose "Ok" value is the Id for a tunnel from B.
503type PendResult<B, R> = Result<<<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id>;
504
505/// An in-progress tunnel request tracked by an `AbstractTunnelMgr`.
506///
507/// (In addition to tracking tunnels, `AbstractTunnelMgr` tracks
508/// _requests_ for tunnels. The manager uses these entries if it
509/// finds that some tunnel created _after_ a request first launched
510/// might meet the request's requirements.)
511struct PendingRequest<B: AbstractTunnelBuilder<R>, R: Runtime> {
512 /// Usage for the operation requested by this request
513 usage: TargetTunnelUsage,
514 /// A channel to use for telling this request about tunnels that it
515 /// might like.
516 notify: mpsc::Sender<PendResult<B, R>>,
517}
518
519impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingRequest<B, R> {
520 /// Return true if this request would be supported by `spec`.
521 fn supported_by(&self, spec: &SupportedTunnelUsage) -> bool {
522 spec.supports(&self.usage)
523 }
524}
525
526/// An entry for an under-construction in-progress tunnel tracked by
527/// an `AbstractTunnelMgr`.
528#[derive(Debug)]
529struct PendingEntry<B: AbstractTunnelBuilder<R>, R: Runtime> {
530 /// Specification that this tunnel will support, if every pending
531 /// request that is waiting for it is attached to it.
532 ///
533 /// This spec becomes more and more restricted as more pending
534 /// requests are waiting for this tunnel.
535 ///
536 /// This spec is contained by circ_spec, and must support the usage
537 /// of every pending request that's waiting for this tunnel.
538 tentative_assignment: sync::Mutex<SupportedTunnelUsage>,
539 /// A shared future for requests to use when waiting for
540 /// notification of this tunnel's success.
541 receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
542}
543
544impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingEntry<B, R> {
545 /// Make a new PendingEntry that starts out supporting a given
546 /// spec. Return that PendingEntry, along with a Sender to use to
547 /// report the result of building this tunnel.
548 fn new(spec: &SupportedTunnelUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
549 let tentative_assignment = sync::Mutex::new(spec.clone());
550 let (sender, receiver) = oneshot::channel();
551 let receiver = receiver.shared();
552 let entry = PendingEntry {
553 tentative_assignment,
554 receiver,
555 };
556 (entry, sender)
557 }
558
559 /// Return true if this tunnel's current tentative assignment
560 /// supports `usage`.
561 fn supports(&self, usage: &TargetTunnelUsage) -> bool {
562 let assignment = self.tentative_assignment.lock().expect("poisoned lock");
563 assignment.supports(usage)
564 }
565
566 /// Try to change the tentative assignment of this tunnel by
567 /// restricting it for use with `usage`.
568 ///
569 /// Return an error if the current tentative assignment didn't
570 /// support `usage` in the first place.
571 fn tentative_restrict_mut(&self, usage: &TargetTunnelUsage) -> Result<()> {
572 if let Ok(mut assignment) = self.tentative_assignment.lock() {
573 assignment.restrict_mut(usage)?;
574 }
575 Ok(())
576 }
577
578 /// Find the best PendingEntry values from a slice for use with
579 /// `usage`.
580 ///
581 /// # Requirements
582 ///
583 /// The `ents` slice must not be empty. Every element of `ents`
584 /// must support the given spec.
585 fn find_best(ents: &[Arc<Self>], usage: &TargetTunnelUsage) -> Vec<Arc<Self>> {
586 // TODO: Actually look over the whole list to see which is better.
587 let _ = usage; // currently unused
588 vec![Arc::clone(&ents[0])]
589 }
590}
591
592/// Wrapper type to represent the state between planning to build a
593/// tunnel and constructing it.
594#[derive(Debug)]
595struct TunnelBuildPlan<B: AbstractTunnelBuilder<R>, R: Runtime> {
596 /// The Plan object returned by [`AbstractTunnelBuilder::plan_tunnel`].
597 plan: B::Plan,
598 /// A sender to notify any pending requests when this tunnel is done.
599 sender: oneshot::Sender<PendResult<B, R>>,
600 /// A strong entry to the PendingEntry for this tunnel build attempt.
601 pending: Arc<PendingEntry<B, R>>,
602}
603
604/// The inner state of an [`AbstractTunnelMgr`].
605struct TunnelList<B: AbstractTunnelBuilder<R>, R: Runtime> {
606 /// A map from tunnel ID to [`OpenEntry`] values for all managed
607 /// open tunnels.
608 ///
609 /// A tunnel is added here from [`AbstractTunnelMgr::do_launch`] when we find
610 /// that it completes successfully, and has not been cancelled.
611 /// When we decide that such a tunnel should no longer be handed out for
612 /// any new requests, we "retire" the tunnel by removing it from this map.
613 #[allow(clippy::type_complexity)]
614 open_tunnels: HashMap<<B::Tunnel as AbstractTunnel>::Id, OpenEntry<B::Tunnel>>,
615 /// Weak-set of PendingEntry for tunnels that are being built.
616 ///
617 /// Because this set only holds weak references, and the only strong
618 /// reference to the PendingEntry is held by the task building the tunnel,
619 /// this set's members are lazily removed after the tunnel is either built
620 /// or fails to build.
621 ///
622 /// This set is used for two purposes:
623 ///
624 /// 1. When a tunnel request finds that there is no open tunnel for its
625 /// purposes, it checks here to see if there is a pending tunnel that it
626 /// could wait for.
627 /// 2. When a pending tunnel finishes building, it checks here to make sure
628 /// that it has not been cancelled. (Removing an entry from this set marks
629 /// it as cancelled.)
630 ///
631 /// An entry is added here in [`AbstractTunnelMgr::prepare_action`] when we
632 /// decide that a tunnel needs to be launched.
633 ///
634 /// Later, in [`AbstractTunnelMgr::do_launch`], once the tunnel has finished
635 /// (or failed), we remove the entry (by pointer identity).
636 /// If we cannot find the entry, we conclude that the request has been
637 /// _cancelled_, and so we discard any tunnel that was created.
638 pending_tunnels: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
639 /// Weak-set of PendingRequest for requests that are waiting for a
640 /// tunnel to be built.
641 ///
642 /// Because this set only holds weak references, and the only
643 /// strong reference to the PendingRequest is held by the task
644 /// waiting for the tunnel to be built, this set's members are
645 /// lazily removed after the request succeeds or fails.
646 pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
647}
648
649impl<B: AbstractTunnelBuilder<R>, R: Runtime> TunnelList<B, R> {
650 /// Make a new empty `CircList`
651 fn new() -> Self {
652 TunnelList {
653 open_tunnels: HashMap::new(),
654 pending_tunnels: PtrWeakHashSet::new(),
655 pending_requests: PtrWeakHashSet::new(),
656 }
657 }
658
659 /// Add `e` to the list of open tunnels.
660 fn add_open(&mut self, e: OpenEntry<B::Tunnel>) {
661 let id = e.tunnel.id();
662 self.open_tunnels.insert(id, e);
663 }
664
665 /// Find all the usable open tunnels that support `usage`.
666 ///
667 /// Return None if there are no such tunnels.
668 fn find_open(&mut self, usage: &TargetTunnelUsage) -> Option<Vec<&mut OpenEntry<B::Tunnel>>> {
669 let list = self.open_tunnels.values_mut();
670 let v = SupportedTunnelUsage::find_supported(list, usage);
671 if v.is_empty() { None } else { Some(v) }
672 }
673
674 /// Find an open tunnel by ID.
675 ///
676 /// Return None if no such tunnels exists in this list.
677 fn get_open_mut(
678 &mut self,
679 id: &<B::Tunnel as AbstractTunnel>::Id,
680 ) -> Option<&mut OpenEntry<B::Tunnel>> {
681 self.open_tunnels.get_mut(id)
682 }
683
684 /// Extract an open tunnel by ID, removing it from this list.
685 ///
686 /// Return None if no such tunnel exists in this list.
687 fn take_open(
688 &mut self,
689 id: &<B::Tunnel as AbstractTunnel>::Id,
690 ) -> Option<OpenEntry<B::Tunnel>> {
691 self.open_tunnels.remove(id)
692 }
693
694 /// Remove tunnels based on expiration times.
695 ///
696 /// We remove every unused tunnel that is set to expire by
697 /// `unused_cutoff`, and every dirty tunnel that has been dirty
698 /// since before `dirty_cutoff`.
699 ///
700 /// Return the next time at which anything will definitely expire,
701 /// and a list of long-lived tunnels where we need to check their usage status
702 /// before we can be sure if they are expired.
703 #[must_use]
704 fn expire_tunnels(
705 &mut self,
706 now: Instant,
707 params: &ExpirationParameters,
708 ) -> (Option<Instant>, Vec<Weak<B::Tunnel>>) {
709 let mut need_check = Vec::new();
710 let mut earliest_expiration = None;
711 self.open_tunnels
712 .retain(|_k, v| match v.should_expire(now, params) {
713 // Expires now: Do not retain.
714 ShouldExpire::Now => false,
715
716 // Will expire at `when`: keep, but update `earliest_expiration`.
717 ShouldExpire::NotBefore(when) => {
718 earliest_expiration = match earliest_expiration {
719 Some(t) if t < when => Some(t),
720 _ => Some(when),
721 };
722 true
723 }
724
725 // Need to check tunnel to see if/when it is disused.
726 ShouldExpire::PossiblyNow => {
727 need_check.push(Arc::downgrade(&v.tunnel));
728 true
729 }
730 });
731 (earliest_expiration, need_check)
732 }
733
734 /// Return the time when the tunnel with given `id`, should expire.
735 ///
736 /// Return None if no such tunnel exists.
737 fn tunnel_should_expire(
738 &mut self,
739 id: &<B::Tunnel as AbstractTunnel>::Id,
740 now: Instant,
741 params: &ExpirationParameters,
742 ) -> Option<ShouldExpire> {
743 self.open_tunnels
744 .get(id)
745 .map(|v| v.should_expire(now, params))
746 }
747
748 /// Update the "last known to be in use" time of a long-lived tunnel with ID `id`,
749 /// based on learning when it was last used.
750 ///
751 /// Expire the tunnel if appropriate.
752 ///
753 /// If the tunnel is still part of the map, return the next instant at which it might expire.
754 ///
755 /// Returns an error if the tunnel was present but was _not_ already marked as long-lived.
756 fn update_long_lived_tunnel_last_used(
757 &mut self,
758 id: &<B::Tunnel as AbstractTunnel>::Id,
759 now: Instant,
760 params: &ExpirationParameters,
761 disused_since: &tor_proto::Result<Option<Instant>>,
762 ) -> crate::Result<Option<Instant>> {
763 let Ok(disused_since) = disused_since else {
764 // got an error looking up disused time: discard the circuit.
765 let discard = self.take_open(id);
766 if let Some(ent) = discard {
767 ent.expiration.check_long_lived()?;
768 }
769 return Ok(None);
770 };
771 let Some(tun) = self.open_tunnels.get_mut(id) else {
772 // Circuit isn't there. Return.
773 return Ok(None);
774 };
775 tun.expiration.check_long_lived()?;
776 let last_known_in_use_at = disused_since.unwrap_or(now);
777
778 tun.expiration.mark_used(last_known_in_use_at, true);
779 match tun.should_expire(now, params) {
780 ShouldExpire::Now | ShouldExpire::PossiblyNow => {
781 let _discard = self.take_open(id);
782 Ok(None)
783 }
784 ShouldExpire::NotBefore(instant) => Ok(Some(instant)),
785 }
786 }
787
788 /// Add `pending` to the set of in-progress tunnels.
789 fn add_pending_tunnel(&mut self, pending: Arc<PendingEntry<B, R>>) {
790 self.pending_tunnels.insert(pending);
791 }
792
793 /// Find all pending tunnels that support `usage`.
794 ///
795 /// If no such tunnels are currently being built, return None.
796 fn find_pending_tunnels(
797 &self,
798 usage: &TargetTunnelUsage,
799 ) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
800 let result: Vec<_> = self
801 .pending_tunnels
802 .iter()
803 .filter(|p| p.supports(usage))
804 .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
805 .collect();
806
807 if result.is_empty() {
808 None
809 } else {
810 Some(result)
811 }
812 }
813
814 /// Return true if `circ` is still pending.
815 ///
816 /// A tunnel will become non-pending when finishes (successfully or not), or when it's
817 /// removed from this list via `clear_all_tunnels()`.
818 fn tunnel_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
819 self.pending_tunnels.contains(circ)
820 }
821
822 /// Construct and add a new entry to the set of request waiting
823 /// for a tunnel.
824 ///
825 /// Return the request, and a new receiver stream that it should
826 /// use for notification of possible tunnels to use.
827 fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
828 self.pending_requests.insert(Arc::clone(pending));
829 }
830
831 /// Return all pending requests that would be satisfied by a tunnel
832 /// that supports `circ_spec`.
833 fn find_pending_requests(
834 &self,
835 circ_spec: &SupportedTunnelUsage,
836 ) -> Vec<Arc<PendingRequest<B, R>>> {
837 self.pending_requests
838 .iter()
839 .filter(|pend| pend.supported_by(circ_spec))
840 .collect()
841 }
842
843 /// Clear all pending and open tunnels.
844 ///
845 /// Calling `clear_all_tunnels` ensures that any request that is answered _after
846 /// this method runs_ will receive a tunnels that was launched _after this
847 /// method runs_.
848 fn clear_all_tunnels(&mut self) {
849 // Note that removing entries from pending_circs will also cause the
850 // tunnel tasks to realize that they are cancelled when they
851 // go to tell anybody about their results.
852 self.pending_tunnels.clear();
853 self.open_tunnels.clear();
854 }
855}
856
857/// Timing information for tunnels that have been built but never used.
858///
859/// Currently taken from the network parameters.
860struct UnusedTimings {
861 /// Minimum lifetime of a tunnel created while learning
862 /// tunnel timeouts.
863 learning: Duration,
864 /// Minimum lifetime of a tunnel created while not learning
865 /// tunnel timeouts.
866 not_learning: Duration,
867}
868
869// This isn't really fallible, given the definitions of the underlying
870// types.
871#[allow(clippy::fallible_impl_from)]
872impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
873 fn from(v: &tor_netdir::params::NetParameters) -> Self {
874 // These try_into() calls can't fail, so unwrap() can't panic.
875 #[allow(clippy::unwrap_used)]
876 UnusedTimings {
877 learning: v
878 .unused_client_circ_timeout_while_learning_cbt
879 .try_into()
880 .unwrap(),
881 not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
882 }
883 }
884}
885
886/// Abstract implementation for tunnel management.
887///
888/// The algorithm provided here is fairly simple. In its simplest form:
889///
890/// When somebody asks for a tunnel for a given operation: if we find
891/// one open already, we return it. If we find in-progress tunnels
892/// that would meet our needs, we wait for one to finish (or for all
893/// to fail). And otherwise, we launch one or more tunnels to meet the
894/// request's needs.
895///
896/// If this process fails, then we retry it, up to a timeout or a
897/// numerical limit.
898///
899/// If a tunnel not previously considered for a given request
900/// finishes before the request is satisfied, and if the tunnel would
901/// satisfy the request, we try to give that tunnel as an answer to
902/// that request even if it was not one of the tunnels that request
903/// was waiting for.
904pub(crate) struct AbstractTunnelMgr<B: AbstractTunnelBuilder<R>, R: Runtime> {
905 /// Builder used to construct tunnels.
906 builder: B,
907 /// An asynchronous runtime to use for launching tasks and
908 /// checking timeouts.
909 runtime: R,
910 /// A CircList to manage our list of tunnels, requests, and
911 /// pending tunnels.
912 tunnels: sync::Mutex<TunnelList<B, R>>,
913
914 /// Configured information about when to expire tunnels and requests.
915 circuit_timing: MutCfg<CircuitTiming>,
916
917 /// Minimum lifetime of an unused tunnel.
918 ///
919 /// Derived from the network parameters.
920 unused_timing: sync::Mutex<UnusedTimings>,
921}
922
923/// An action to take in order to satisfy a request for a tunnel.
924enum Action<B: AbstractTunnelBuilder<R>, R: Runtime> {
925 /// We found an open tunnel: return immediately.
926 Open(Arc<B::Tunnel>),
927 /// We found one or more pending tunnels: wait until one succeeds,
928 /// or all fail.
929 Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
930 /// We should launch tunnels: here are the instructions for how
931 /// to do so.
932 Build(Vec<TunnelBuildPlan<B, R>>),
933}
934
935impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> AbstractTunnelMgr<B, R> {
936 /// Construct a new AbstractTunnelMgr.
937 pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
938 let circs = sync::Mutex::new(TunnelList::new());
939 let dflt_params = tor_netdir::params::NetParameters::default();
940 let unused_timing = (&dflt_params).into();
941 AbstractTunnelMgr {
942 builder,
943 runtime,
944 tunnels: circs,
945 circuit_timing: circuit_timing.into(),
946 unused_timing: sync::Mutex::new(unused_timing),
947 }
948 }
949
950 /// Reconfigure this manager using the latest set of network parameters.
951 pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
952 let mut u = self
953 .unused_timing
954 .lock()
955 .expect("Poisoned lock for unused_timing");
956 *u = p.into();
957 }
958
959 /// Return this manager's [`CircuitTiming`].
960 pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
961 self.circuit_timing.get()
962 }
963
964 /// Return this manager's [`CircuitTiming`].
965 pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
966 self.circuit_timing.replace(new_config);
967 }
968 /// Return a circuit suitable for use with a given `usage`,
969 /// creating that circuit if necessary, and restricting it
970 /// under the assumption that it will be used for that spec.
971 ///
972 /// This is the primary entry point for AbstractTunnelMgr.
973 #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor?
974 #[instrument(level = "trace", skip_all)]
975 pub(crate) async fn get_or_launch(
976 self: &Arc<Self>,
977 usage: &TargetTunnelUsage,
978 dir: DirInfo<'_>,
979 ) -> Result<(Arc<B::Tunnel>, TunnelProvenance)> {
980 /// Largest number of "resets" that we will accept in this attempt.
981 ///
982 /// A "reset" is an internally generated error that does not represent a
983 /// real problem; only a "whoops, got to try again" kind of a situation.
984 /// For example, if we reconfigure in the middle of an attempt and need
985 /// to re-launch the circuit, that counts as a "reset", since there was
986 /// nothing actually _wrong_ with the circuit we were building.
987 ///
988 /// We accept more resets than we do real failures. However,
989 /// we don't accept an unlimited number: we don't want to inadvertently
990 /// permit infinite loops here. If we ever bump against this limit, we
991 /// should not automatically increase it: we should instead figure out
992 /// why it is happening and try to make it not happen.
993 const MAX_RESETS: usize = 8;
994
995 let circuit_timing = self.circuit_timing();
996 let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
997 let max_tries = circuit_timing.request_max_retries;
998 // We compute the maximum number of failures by dividing the maximum
999 // number of circuits to attempt by the number that will be launched in
1000 // parallel for each iteration.
1001 let max_failures = usize::div_ceil(
1002 max_tries as usize,
1003 std::cmp::max(1, self.builder.launch_parallelism(usage)),
1004 );
1005
1006 let mut retry_schedule = RetryDelay::from_msec(100);
1007 let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a tunnel");
1008
1009 let mut n_failures = 0;
1010 let mut n_resets = 0;
1011
1012 for attempt_num in 1.. {
1013 // How much time is remaining?
1014 let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
1015 None => {
1016 retry_err.push_timed(
1017 Error::RequestTimeout,
1018 self.runtime.now(),
1019 Some(self.runtime.wallclock()),
1020 );
1021 break;
1022 }
1023 Some(t) => t,
1024 };
1025
1026 let error = match self.prepare_action(usage, dir, true) {
1027 Ok(action) => {
1028 // We successfully found an action: Take that action.
1029 let outcome = self
1030 .runtime
1031 .timeout(remaining, Arc::clone(self).take_action(action, usage))
1032 .await;
1033
1034 match outcome {
1035 Ok(Ok(circ)) => return Ok(circ),
1036 Ok(Err(e)) => {
1037 debug!("Circuit attempt {} failed.", attempt_num);
1038 Error::RequestFailed(e)
1039 }
1040 Err(_) => {
1041 // We ran out of "remaining" time; there is nothing
1042 // more to be done.
1043 warn!("All tunnel attempts failed due to timeout");
1044 retry_err.push_timed(
1045 Error::RequestTimeout,
1046 self.runtime.now(),
1047 Some(self.runtime.wallclock()),
1048 );
1049 break;
1050 }
1051 }
1052 }
1053 Err(e) => {
1054 // We couldn't pick the action!
1055 debug_report!(
1056 &e,
1057 "Couldn't pick action for tunnel attempt {}",
1058 attempt_num,
1059 );
1060 e
1061 }
1062 };
1063
1064 // There's been an error. See how long we wait before we retry.
1065 let now = self.runtime.now();
1066 let retry_time =
1067 error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
1068
1069 let (count, count_limit) = if error.is_internal_reset() {
1070 (&mut n_resets, MAX_RESETS)
1071 } else {
1072 (&mut n_failures, max_failures)
1073 };
1074 // Record the error, flattening it if needed.
1075 match error {
1076 // Flatten nested RetryError, using mockable time for each error
1077 Error::RequestFailed(e) => {
1078 retry_err.extend_from_retry_error(e);
1079 }
1080 e => retry_err.push_timed(e, now, Some(self.runtime.wallclock())),
1081 }
1082
1083 *count += 1;
1084 // If we have reached our limit of this kind of problem, we're done.
1085 if *count >= count_limit {
1086 warn!("Reached circuit build retry limit, exiting...");
1087 break;
1088 }
1089
1090 // Wait, or not, as appropriate.
1091 match retry_time {
1092 AbsRetryTime::Immediate => {}
1093 AbsRetryTime::Never => break,
1094 AbsRetryTime::At(t) => {
1095 let remaining = timeout_at.saturating_duration_since(now);
1096 let delay = t.saturating_duration_since(now);
1097 trace!(?delay, "Waiting to retry...");
1098 self.runtime.sleep(std::cmp::min(delay, remaining)).await;
1099 }
1100 }
1101 }
1102
1103 warn!("Request failed");
1104 Err(Error::RequestFailed(retry_err))
1105 }
1106
1107 /// Make sure a circuit exists, without actually asking for it.
1108 ///
1109 /// Make sure that there is a circuit (built or in-progress) that could be
1110 /// used for `usage`, and launch one or more circuits in a background task
1111 /// if there is not.
1112 // TODO: This should probably take some kind of parallelism parameter.
1113 #[cfg(test)]
1114 pub(crate) fn ensure_tunnel(
1115 self: &Arc<Self>,
1116 usage: &TargetTunnelUsage,
1117 dir: DirInfo<'_>,
1118 ) -> Result<()> {
1119 let action = self.prepare_action(usage, dir, false)?;
1120 if let Action::Build(plans) = action {
1121 for plan in plans {
1122 let self_clone = Arc::clone(self);
1123 let _ignore_receiver = self_clone.spawn_launch(usage, plan);
1124 }
1125 }
1126
1127 Ok(())
1128 }
1129
1130 /// Choose which action we should take in order to provide a tunnel
1131 /// for a given `usage`.
1132 ///
1133 /// If `restrict_circ` is true, we restrict the spec of any
1134 /// circ we decide to use to mark that it _is_ being used for
1135 /// `usage`.
1136 #[instrument(level = "trace", skip_all)]
1137 fn prepare_action(
1138 &self,
1139 usage: &TargetTunnelUsage,
1140 dir: DirInfo<'_>,
1141 restrict_circ: bool,
1142 ) -> Result<Action<B, R>> {
1143 let mut list = self.tunnels.lock().expect("poisoned lock");
1144
1145 if let Some(mut open) = list.find_open(usage) {
1146 // We have open tunnels that meet the spec: return the best one.
1147 let parallelism = self.builder.select_parallelism(usage);
1148 let best = OpenEntry::find_best(&mut open, usage, parallelism);
1149 if restrict_circ {
1150 let now = self.runtime.now();
1151 best.restrict_mut(usage, now)?;
1152 }
1153 // TODO: If we have fewer tunnels here than our select
1154 // parallelism, perhaps we should launch more?
1155
1156 return Ok(Action::Open(best.tunnel.clone()));
1157 }
1158
1159 if let Some(pending) = list.find_pending_tunnels(usage) {
1160 // There are pending tunnels that could meet the spec.
1161 // Restrict them under the assumption that they could all
1162 // be used for this, and then wait until one is ready (or
1163 // all have failed)
1164 let best = PendingEntry::find_best(&pending, usage);
1165 if restrict_circ {
1166 for item in &best {
1167 // TODO: Do we want to tentatively restrict _all_ of these?
1168 // not clear to me.
1169 item.tentative_restrict_mut(usage)?;
1170 }
1171 }
1172 let stream = best.iter().map(|item| item.receiver.clone()).collect();
1173 // TODO: if we have fewer tunnels here than our launch
1174 // parallelism, we might want to launch more.
1175
1176 return Ok(Action::Wait(stream));
1177 }
1178
1179 // Okay, we need to launch tunnels here.
1180 let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
1181 let mut plans = Vec::new();
1182 let mut last_err = None;
1183 for _ in 0..parallelism {
1184 match self.plan_by_usage(dir, usage) {
1185 Ok((pending, plan)) => {
1186 list.add_pending_tunnel(pending);
1187 plans.push(plan);
1188 }
1189 Err(e) => {
1190 debug!("Unable to make a plan for {:?}: {}", usage, e);
1191 last_err = Some(e);
1192 }
1193 }
1194 }
1195 if !plans.is_empty() {
1196 Ok(Action::Build(plans))
1197 } else if let Some(last_err) = last_err {
1198 Err(last_err)
1199 } else {
1200 // we didn't even try to plan anything!
1201 Err(internal!("no plans were built, but no errors were found").into())
1202 }
1203 }
1204
1205 /// Execute an action returned by pick-action, and return the
1206 /// resulting tunnel or error.
1207 #[allow(clippy::cognitive_complexity, clippy::type_complexity)] // TODO #2010: Refactor
1208 #[instrument(level = "trace", skip_all)]
1209 async fn take_action(
1210 self: Arc<Self>,
1211 act: Action<B, R>,
1212 usage: &TargetTunnelUsage,
1213 ) -> std::result::Result<(Arc<B::Tunnel>, TunnelProvenance), RetryError<Box<Error>>> {
1214 /// Store the error `err` into `retry_err`, as appropriate.
1215 fn record_error<R: Runtime>(
1216 retry_err: &mut RetryError<Box<Error>>,
1217 source: streams::Source,
1218 building: bool,
1219 mut err: Error,
1220 runtime: &R,
1221 ) {
1222 if source == streams::Source::Right {
1223 // We don't care about this error, since it is from neither a tunnel we launched
1224 // nor one that we're waiting on.
1225 return;
1226 }
1227 if !building {
1228 // We aren't building our own tunnels, so our errors are
1229 // secondary reports of other tunnels' failures.
1230 err = Error::PendingFailed(Box::new(err));
1231 }
1232 retry_err.push_timed(err, runtime.now(), Some(runtime.wallclock()));
1233 }
1234 /// Return a string describing what it means, within the context of this
1235 /// function, to have gotten an answer from `source`.
1236 fn describe_source(building: bool, source: streams::Source) -> &'static str {
1237 match (building, source) {
1238 (_, streams::Source::Right) => "optimistic advice",
1239 (true, streams::Source::Left) => "tunnel we're building",
1240 (false, streams::Source::Left) => "pending tunnel",
1241 }
1242 }
1243
1244 // Get or make a stream of futures to wait on.
1245 let (building, wait_on_stream) = match act {
1246 Action::Open(c) => {
1247 // There's already a perfectly good open tunnel; we can return
1248 // it now.
1249 trace!("Returning existing tunnel.");
1250 return Ok((c, TunnelProvenance::Preexisting));
1251 }
1252 Action::Wait(f) => {
1253 // There is one or more pending tunnel that we're waiting for.
1254 // If any succeeds, we try to use it. If they all fail, we
1255 // fail.
1256 trace!("Waiting for tunnel.");
1257 (false, f)
1258 }
1259 Action::Build(plans) => {
1260 // We're going to launch one or more tunnels in parallel. We
1261 // report success if any succeeds, and failure of they all fail.
1262 trace!("Building new tunnel.");
1263 let futures = FuturesUnordered::new();
1264 for plan in plans {
1265 let self_clone = Arc::clone(&self);
1266 // (This is where we actually launch tunnels.)
1267 futures.push(self_clone.spawn_launch(usage, plan));
1268 }
1269 (true, futures)
1270 }
1271 };
1272
1273 // Insert ourself into the list of pending requests, and make a
1274 // stream for us to listen on for notification from pending tunnels
1275 // other than those we are pending on.
1276 let (pending_request, additional_stream) = {
1277 // We don't want this queue to participate in memory quota tracking.
1278 // There isn't any tunnel yet, so there wouldn't be anything to account it to.
1279 // If this queue has the oldest data, probably the whole system is badly broken.
1280 // Tearing down the whole tunnel manager won't help.
1281 let (send, recv) = mpsc_channel_no_memquota(8);
1282 let pending = Arc::new(PendingRequest {
1283 usage: usage.clone(),
1284 notify: send,
1285 });
1286
1287 let mut list = self.tunnels.lock().expect("poisoned lock");
1288 list.add_pending_request(&pending);
1289
1290 (pending, recv)
1291 };
1292
1293 // We use our "select_biased" stream combiner here to ensure that:
1294 // 1) Circuits from wait_on_stream (the ones we're pending on) are
1295 // preferred.
1296 // 2) We exit this function when those tunnels are exhausted.
1297 // 3) We still get notified about other tunnels that might meet our
1298 // interests.
1299 //
1300 // The events from Left stream are the oes that we explicitly asked for,
1301 // so we'll treat errors there as real problems. The events from the
1302 // Right stream are ones that we got opportunistically told about; it's
1303 // not a big deal if those fail.
1304 let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
1305
1306 let mut retry_error = RetryError::in_attempt_to("wait for tunnels");
1307
1308 while let Some((src, id)) = incoming.next().await {
1309 match id {
1310 Ok(Ok(ref id)) => {
1311 // Great, we have a tunnel . See if we can use it!
1312 let mut list = self.tunnels.lock().expect("poisoned lock");
1313 if let Some(ent) = list.get_open_mut(id) {
1314 let now = self.runtime.now();
1315 match ent.restrict_mut(usage, now) {
1316 Ok(()) => {
1317 // Great, this will work. We drop the
1318 // pending request now explicitly to remove
1319 // it from the list.
1320 drop(pending_request);
1321 if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
1322 let try_to_expire_after = if ent.spec.is_long_lived() {
1323 self.circuit_timing().disused_circuit_timeout
1324 } else {
1325 self.circuit_timing().max_dirtiness
1326 };
1327 // Since this tunnel hasn't been used yet, schedule expiration
1328 // task after `max_dirtiness` from now.
1329 spawn_expiration_task(
1330 &self.runtime,
1331 Arc::downgrade(&self),
1332 ent.tunnel.id(),
1333 now + try_to_expire_after,
1334 );
1335 }
1336 return Ok((ent.tunnel.clone(), TunnelProvenance::NewlyCreated));
1337 }
1338 Err(e) => {
1339 // In this case, a `UsageMismatched` error just means that we lost the race
1340 // to restrict this tunnel.
1341 let e = match e {
1342 Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
1343 x => x,
1344 };
1345 if src == streams::Source::Left {
1346 info_report!(
1347 &e,
1348 "{} suggested we use {:?}, but restrictions failed",
1349 describe_source(building, src),
1350 id,
1351 );
1352 } else {
1353 debug_report!(
1354 &e,
1355 "{} suggested we use {:?}, but restrictions failed",
1356 describe_source(building, src),
1357 id,
1358 );
1359 }
1360 record_error(&mut retry_error, src, building, e, &self.runtime);
1361 continue;
1362 }
1363 }
1364 }
1365 }
1366 Ok(Err(ref e)) => {
1367 debug!("{} sent error {:?}", describe_source(building, src), e);
1368 record_error(&mut retry_error, src, building, e.clone(), &self.runtime);
1369 }
1370 Err(oneshot::Canceled) => {
1371 debug!(
1372 "{} went away (Canceled), quitting take_action right away",
1373 describe_source(building, src)
1374 );
1375 record_error(
1376 &mut retry_error,
1377 src,
1378 building,
1379 Error::PendingCanceled,
1380 &self.runtime,
1381 );
1382 return Err(retry_error);
1383 }
1384 }
1385
1386 debug!(
1387 "While waiting on tunnel: {:?} from {}",
1388 id,
1389 describe_source(building, src)
1390 );
1391 }
1392
1393 // Nothing worked. We drop the pending request now explicitly
1394 // to remove it from the list. (We could just let it get dropped
1395 // implicitly, but that's a bit confusing.)
1396 drop(pending_request);
1397
1398 Err(retry_error)
1399 }
1400
1401 /// Given a directory and usage, compute the necessary objects to
1402 /// build a tunnel: A [`PendingEntry`] to keep track of the in-process
1403 /// tunnel, and a [`TunnelBuildPlan`] that we'll give to the thread
1404 /// that will build the tunnel.
1405 ///
1406 /// The caller should probably add the resulting `PendingEntry` to
1407 /// `self.circs`.
1408 ///
1409 /// This is an internal function that we call when we're pretty sure
1410 /// we want to build a tunnel.
1411 #[allow(clippy::type_complexity)]
1412 fn plan_by_usage(
1413 &self,
1414 dir: DirInfo<'_>,
1415 usage: &TargetTunnelUsage,
1416 ) -> Result<(Arc<PendingEntry<B, R>>, TunnelBuildPlan<B, R>)> {
1417 let (plan, bspec) = self.builder.plan_tunnel(usage, dir)?;
1418 let (pending, sender) = PendingEntry::new(&bspec);
1419 let pending = Arc::new(pending);
1420
1421 let plan = TunnelBuildPlan {
1422 plan,
1423 sender,
1424 pending: Arc::clone(&pending),
1425 };
1426
1427 Ok((pending, plan))
1428 }
1429
1430 /// Launch a managed tunnel for a target usage, without checking
1431 /// whether one already exists or is pending.
1432 ///
1433 /// Return a listener that will be informed when the tunnel is done.
1434 #[instrument(level = "trace", skip_all)]
1435 pub(crate) fn launch_by_usage(
1436 self: &Arc<Self>,
1437 usage: &TargetTunnelUsage,
1438 dir: DirInfo<'_>,
1439 ) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
1440 let (pending, plan) = self.plan_by_usage(dir, usage)?;
1441
1442 self.tunnels
1443 .lock()
1444 .expect("Poisoned lock for tunnel list")
1445 .add_pending_tunnel(pending);
1446
1447 Ok(Arc::clone(self).spawn_launch(usage, plan))
1448 }
1449
1450 /// Spawn a background task to launch a tunnel, and report its status.
1451 ///
1452 /// The `usage` argument is the usage from the original request that made
1453 /// us build this tunnel.
1454 #[instrument(level = "trace", skip_all)]
1455 fn spawn_launch(
1456 self: Arc<Self>,
1457 usage: &TargetTunnelUsage,
1458 plan: TunnelBuildPlan<B, R>,
1459 ) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
1460 let _ = usage; // Currently unused.
1461 let TunnelBuildPlan {
1462 mut plan,
1463 sender,
1464 pending,
1465 } = plan;
1466 let request_loyalty = self.circuit_timing().request_loyalty;
1467
1468 let wait_on_future = pending.receiver.clone();
1469 let runtime = self.runtime.clone();
1470 let runtime_copy = self.runtime.clone();
1471
1472 let tid = rand::random::<u64>();
1473 // We release this block when the tunnel builder task terminates.
1474 let reason = format!("tunnel builder task {}", tid);
1475 runtime.block_advance(reason.clone());
1476 // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
1477 // correctly.
1478 plan.add_blocked_advance_reason(reason);
1479
1480 runtime
1481 .spawn(async move {
1482 let self_clone = Arc::clone(&self);
1483 let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
1484 let (new_spec, reply) = match future.await {
1485 Ok(x) => x, // Success or regular failure
1486 Err(e) => {
1487 // Okay, this is a panic. We have to tell the calling
1488 // thread about it, then exit this tunnel builder task.
1489 let _ = sender.send(Err(internal!("tunnel build task panicked").into()));
1490 std::panic::panic_any(e);
1491 }
1492 };
1493
1494 // Tell anybody who was listening about it that this
1495 // tunnel is now usable or failed.
1496 //
1497 // (We ignore any errors from `send`: That just means that nobody
1498 // was waiting for this tunnel.)
1499 let _ = sender.send(reply.clone());
1500
1501 if let Some(new_spec) = new_spec {
1502 // Wait briefly before we notify opportunistically. This
1503 // delay will give the tunnels that were originally
1504 // specifically intended for a request a little more time
1505 // to finish, before we offer it this tunnel instead.
1506 let sl = runtime_copy.sleep(request_loyalty);
1507 runtime_copy.allow_one_advance(request_loyalty);
1508 sl.await;
1509
1510 let pending = {
1511 let list = self.tunnels.lock().expect("poisoned lock");
1512 list.find_pending_requests(&new_spec)
1513 };
1514 for pending_request in pending {
1515 let _ = pending_request.notify.clone().try_send(reply.clone());
1516 }
1517 }
1518 runtime_copy.release_advance(format!("tunnel builder task {}", tid));
1519 })
1520 .expect("Couldn't spawn tunnel-building task");
1521
1522 wait_on_future
1523 }
1524
1525 /// Run in the background to launch a tunnel. Return a 2-tuple of the new
1526 /// tunnel spec and the outcome that should be sent to the initiator.
1527 #[instrument(level = "trace", skip_all)]
1528 async fn do_launch(
1529 self: Arc<Self>,
1530 plan: <B as AbstractTunnelBuilder<R>>::Plan,
1531 pending: Arc<PendingEntry<B, R>>,
1532 ) -> (Option<SupportedTunnelUsage>, PendResult<B, R>) {
1533 let outcome = self.builder.build_tunnel(plan).await;
1534
1535 match outcome {
1536 Err(e) => (None, Err(e)),
1537 Ok((new_spec, tunnel)) => {
1538 let id = tunnel.id();
1539
1540 let use_duration = self.pick_use_duration();
1541 let now = self.runtime.now();
1542 let exp_inst = now + use_duration;
1543 let runtime_copy = self.runtime.clone();
1544 spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), tunnel.id(), exp_inst);
1545 // I used to call restrict_mut here, but now I'm not so
1546 // sure. Doing restrict_mut makes sure that this
1547 // tunnel will be suitable for the request that asked
1548 // for us in the first place, but that should be
1549 // ensured anyway by our tracking its tentative
1550 // assignment.
1551 //
1552 // new_spec.restrict_mut(&usage_copy).unwrap();
1553 let use_before = ExpirationInfo::new(now);
1554 let open_ent = OpenEntry::new(new_spec.clone(), tunnel, use_before);
1555 {
1556 let mut list = self.tunnels.lock().expect("poisoned lock");
1557 // Finally, before we return this tunnel, we need to make
1558 // sure that this pending tunnel is still pending. (If it
1559 // is not pending, then it was cancelled through a call to
1560 // `retire_all_tunnels`, and the configuration that we used
1561 // to launch it is now sufficiently outdated that we should
1562 // no longer give this tunnel to a client.)
1563 if list.tunnel_is_pending(&pending) {
1564 list.add_open(open_ent);
1565 // We drop our reference to 'pending' here:
1566 // this should make all the weak references to
1567 // the `PendingEntry` become dangling.
1568 drop(pending);
1569 (Some(new_spec), Ok(id))
1570 } else {
1571 // This tunnel is no longer pending! It must have been cancelled, probably
1572 // by a call to retire_all_tunnels()
1573 drop(pending); // ibid
1574 (None, Err(Error::CircCanceled))
1575 }
1576 }
1577 }
1578 }
1579 }
1580
1581 /// Return the currently configured expiration parameters.
1582 fn expiration_params(&self) -> ExpirationParameters {
1583 let expire_unused_after = self.pick_use_duration();
1584 let expire_dirty_after = self.circuit_timing().max_dirtiness;
1585 let expire_disused_after = self.circuit_timing().disused_circuit_timeout;
1586
1587 ExpirationParameters {
1588 expire_unused_after,
1589 expire_dirty_after,
1590 expire_disused_after,
1591 }
1592 }
1593
1594 /// Plan and launch a new tunnel to a given target, bypassing our managed
1595 /// pool of tunnels.
1596 ///
1597 /// This method will always return a new tunnel, and never return a tunnel
1598 /// that this CircMgr gives out for anything else.
1599 ///
1600 /// The new tunnel will participate in the guard and timeout apparatus as
1601 /// appropriate, no retry attempt will be made if the tunnel fails.
1602 #[cfg(feature = "hs-common")]
1603 #[instrument(level = "trace", skip_all)]
1604 pub(crate) async fn launch_unmanaged(
1605 &self,
1606 usage: &TargetTunnelUsage,
1607 dir: DirInfo<'_>,
1608 ) -> Result<(SupportedTunnelUsage, B::Tunnel)> {
1609 let (_, plan) = self.plan_by_usage(dir, usage)?;
1610 self.builder.build_tunnel(plan.plan).await
1611 }
1612
1613 /// Remove the tunnel with a given `id` from this manager.
1614 ///
1615 /// After this function is called, that tunnel will no longer be handed
1616 /// out to any future requests.
1617 ///
1618 /// Return None if we have no tunnel with the given ID.
1619 pub(crate) fn take_tunnel(
1620 &self,
1621 id: &<B::Tunnel as AbstractTunnel>::Id,
1622 ) -> Option<Arc<B::Tunnel>> {
1623 let mut list = self.tunnels.lock().expect("poisoned lock");
1624 list.take_open(id).map(|e| e.tunnel)
1625 }
1626
1627 /// Remove all open and pending tunnels and from this manager, to ensure
1628 /// they can't be given out for any more requests.
1629 ///
1630 /// Calling `retire_all_tunnels` ensures that any tunnel request that gets
1631 /// an answer _after this method runs_ will receive a tunnel that was
1632 /// launched _after this method runs_.
1633 ///
1634 /// We call this method this when our configuration changes in such a way
1635 /// that we want to make sure that any new (or pending) requests will
1636 /// receive tunnels that are built using the new configuration.
1637 //
1638 // For more information, see documentation on [`CircuitList::open_circs`],
1639 // [`CircuitList::pending_circs`], and comments in `do_launch`.
1640 pub(crate) fn retire_all_tunnels(&self) {
1641 let mut list = self.tunnels.lock().expect("poisoned lock");
1642 list.clear_all_tunnels();
1643 }
1644
1645 /// Expire tunnels according to the rules in `config` and the
1646 /// current time `now`.
1647 ///
1648 /// Expired tunnels will not be automatically closed, but they will
1649 /// no longer be given out for new tunnels.
1650 ///
1651 /// Return the earliest time at which any current tunnel will expire.
1652 pub(crate) async fn expire_tunnels(&self, now: Instant) -> Option<Instant> {
1653 let expiration_params = self.expiration_params();
1654
1655 // While holding the lock, we call TunnelList::expire_tunnels.
1656 // That function will expire what it can, and return a list of the tunnels for which
1657 // we need to call `disused_since`.
1658 let (mut earliest_expiration, need_to_check) = {
1659 let mut list = self.tunnels.lock().expect("poisoned lock");
1660 list.expire_tunnels(now, &expiration_params)
1661 };
1662
1663 // Now we've dropped the lock, and can do async checks.
1664 let mut last_known_usage = Vec::new();
1665 for tunnel in need_to_check {
1666 let Some(tunnel) = Weak::upgrade(&tunnel) else {
1667 continue; // The tunnel is already gone.
1668 };
1669 last_known_usage.push((tunnel.id(), tunnel.last_known_to_be_used_at().await));
1670 }
1671
1672 // Now get the lock again, and tell the list what we learned.
1673 //
1674 // Note that if this function is called twice simultaneously, in some corner cases, we might
1675 // decide to expire something twice. That's okay.
1676 {
1677 let mut list = self.tunnels.lock().expect("poisoned lock");
1678 for (id, disused_since) in last_known_usage {
1679 match list.update_long_lived_tunnel_last_used(
1680 &id,
1681 now,
1682 &expiration_params,
1683 &disused_since,
1684 ) {
1685 Ok(Some(may_expire)) => {
1686 earliest_expiration = match earliest_expiration {
1687 Some(exp) if exp < may_expire => Some(exp),
1688 _ => Some(may_expire),
1689 };
1690 }
1691 Ok(None) => {}
1692 Err(e) => warn_report!(e, "Error while updating status on tunnel {:?}", id),
1693 }
1694 }
1695 }
1696
1697 earliest_expiration
1698 }
1699
1700 /// Consider expiring the tunnel with given tunnel `id`,
1701 /// according to the rules in `config` and the current time `now`.
1702 ///
1703 /// Returns None if the circuit is expired; otherwise returns the next time at which the circuit may expire.
1704 pub(crate) async fn consider_expiring_tunnel(
1705 &self,
1706 tun_id: &<B::Tunnel as AbstractTunnel>::Id,
1707 now: Instant,
1708 ) -> Result<Option<Instant>> {
1709 let expiration_params = self.expiration_params();
1710
1711 // With the lock, call TunneList::tunnel_should_expire, and expire it (or don't)
1712 // if the decision is obvious.
1713 let tunnel = {
1714 let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
1715 self.tunnels.lock().expect("poisoned lock");
1716 let Some(should_expire) = list.tunnel_should_expire(tun_id, now, &expiration_params)
1717 else {
1718 return Ok(None);
1719 };
1720 match should_expire {
1721 ShouldExpire::Now => {
1722 let _discard = list.take_open(tun_id);
1723 return Ok(None);
1724 }
1725 ShouldExpire::NotBefore(t) => return Ok(Some(t)),
1726 ShouldExpire::PossiblyNow => {
1727 let Some(tunnel_ent) = list.get_open_mut(tun_id) else {
1728 return Ok(None);
1729 };
1730 Arc::clone(&tunnel_ent.tunnel)
1731 }
1732 }
1733 };
1734
1735 // If we get here, then we have a long-lived tunnel for which we need to check `disused_since`
1736 let last_known_in_use_at = tunnel.last_known_to_be_used_at().await;
1737
1738 // Now we tell the TunnelList what we learned.
1739 {
1740 let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
1741 self.tunnels.lock().expect("poisoned lock");
1742 list.update_long_lived_tunnel_last_used(
1743 tun_id,
1744 now,
1745 &expiration_params,
1746 &last_known_in_use_at,
1747 )
1748 }
1749 }
1750
1751 /// Return the number of open tunnels held by this tunnel manager.
1752 pub(crate) fn n_tunnels(&self) -> usize {
1753 let list = self.tunnels.lock().expect("poisoned lock");
1754 list.open_tunnels.len()
1755 }
1756
1757 /// Return the number of pending tunnels tracked by this tunnel manager.
1758 #[cfg(test)]
1759 pub(crate) fn n_pending_tunnels(&self) -> usize {
1760 let list = self.tunnels.lock().expect("poisoned lock");
1761 list.pending_tunnels.len()
1762 }
1763
1764 /// Get a reference to this manager's runtime.
1765 pub(crate) fn peek_runtime(&self) -> &R {
1766 &self.runtime
1767 }
1768
1769 /// Get a reference to this manager's builder.
1770 pub(crate) fn peek_builder(&self) -> &B {
1771 &self.builder
1772 }
1773
1774 /// Pick a duration by when a new tunnel should expire from now
1775 /// if it has not yet been used
1776 fn pick_use_duration(&self) -> Duration {
1777 let timings = self
1778 .unused_timing
1779 .lock()
1780 .expect("Poisoned lock for unused_timing");
1781
1782 if self.builder.learning_timeouts() {
1783 timings.learning
1784 } else {
1785 // TODO: In Tor, this calculation also depends on
1786 // stuff related to predicted ports and channel
1787 // padding.
1788 use tor_basic_utils::RngExt as _;
1789 let mut rng = rand::rng();
1790 rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
1791 .expect("T .. 2x T turned out to be an empty duration range?!")
1792 }
1793 }
1794}
1795
1796/// Spawn an expiration task that expires a tunnel at given instant.
1797///
1798/// When the timeout occurs, if the tunnel manager is still present,
1799/// the task will ask the manager to expire the tunnel, if the tunnel
1800/// is ready to expire.
1801//
1802// TODO: It would be good to do away with this function entirely, and have a smarter expiration
1803// function. This one only exists because there is not an "expire some circuits" background task.
1804fn spawn_expiration_task<B, R>(
1805 runtime: &R,
1806 circmgr: Weak<AbstractTunnelMgr<B, R>>,
1807 circ_id: <<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id,
1808 exp_inst: Instant,
1809) where
1810 R: Runtime,
1811 B: 'static + AbstractTunnelBuilder<R>,
1812{
1813 let now = runtime.now();
1814 let rt_copy = runtime.clone();
1815 let mut duration = exp_inst.saturating_duration_since(now);
1816
1817 // NOTE: Once there was an optimization here that ran the expiration immediately if
1818 // `duration` was zero.
1819 // I discarded that optimization when I made `consider_expiring_tunnel` async,
1820 // since we really want this function _not_ to be async,
1821 // because we run it in contexts where we hold a Mutex on the tunnel list.
1822
1823 // Spawn a timer expiration task with given expiration instant.
1824 if let Err(e) = runtime.spawn(async move {
1825 loop {
1826 rt_copy.sleep(duration).await;
1827 let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1828 cm
1829 } else {
1830 return;
1831 };
1832 match cm.consider_expiring_tunnel(&circ_id, exp_inst).await {
1833 Ok(None) => return,
1834 Ok(Some(when)) => {
1835 duration = when.saturating_duration_since(rt_copy.now());
1836 }
1837 Err(e) => {
1838 warn_report!(
1839 e,
1840 "Error while considering expiration for tunnel {:?}",
1841 circ_id
1842 );
1843 return;
1844 }
1845 }
1846 }
1847 }) {
1848 warn_report!(e, "Unable to launch expiration task");
1849 }
1850}
1851
1852#[cfg(test)]
1853mod test {
1854 // @@ begin test lint list maintained by maint/add_warning @@
1855 #![allow(clippy::bool_assert_comparison)]
1856 #![allow(clippy::clone_on_copy)]
1857 #![allow(clippy::dbg_macro)]
1858 #![allow(clippy::mixed_attributes_style)]
1859 #![allow(clippy::print_stderr)]
1860 #![allow(clippy::print_stdout)]
1861 #![allow(clippy::single_char_pattern)]
1862 #![allow(clippy::unwrap_used)]
1863 #![allow(clippy::unchecked_time_subtraction)]
1864 #![allow(clippy::useless_vec)]
1865 #![allow(clippy::needless_pass_by_value)]
1866 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1867 use super::*;
1868 use crate::isolation::test::{IsolationTokenEq, assert_isoleq};
1869 use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
1870 use crate::usage::{ExitPolicy, SupportedTunnelUsage};
1871 use crate::{
1872 Error, IsolationToken, StreamIsolation, TargetPort, TargetPorts, TargetTunnelUsage,
1873 };
1874 use std::sync::LazyLock;
1875 use tor_dircommon::fallback::FallbackList;
1876 use tor_guardmgr::TestConfig;
1877 use tor_llcrypto::pk::ed25519::Ed25519Identity;
1878 use tor_netdir::testnet;
1879 use tor_persist::TestingStateMgr;
1880 use tor_rtcompat::SleepProvider;
1881 use tor_rtmock::MockRuntime;
1882 use web_time_compat::InstantExt;
1883
1884 #[allow(deprecated)] // TODO #1885
1885 use tor_rtmock::MockSleepRuntime;
1886
1887 static FALLBACKS_EMPTY: LazyLock<FallbackList> = LazyLock::new(|| [].into());
1888
1889 fn di() -> DirInfo<'static> {
1890 (&*FALLBACKS_EMPTY).into()
1891 }
1892
1893 fn target_to_spec(target: &TargetTunnelUsage) -> SupportedTunnelUsage {
1894 match target {
1895 TargetTunnelUsage::Exit {
1896 ports,
1897 isolation,
1898 country_code,
1899 require_stability,
1900 } => SupportedTunnelUsage::Exit {
1901 policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
1902 isolation: Some(isolation.clone()),
1903 country_code: country_code.clone(),
1904 all_relays_stable: *require_stability,
1905 },
1906 _ => unimplemented!(),
1907 }
1908 }
1909
1910 impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
1911 fn isol_eq(&self, other: &Self) -> bool {
1912 self.spec.isol_eq(&other.spec)
1913 && self.tunnel == other.tunnel
1914 && self.expiration == other.expiration
1915 }
1916 }
1917
1918 impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
1919 fn isol_eq(&self, other: &Self) -> bool {
1920 self.spec.isol_eq(&other.spec)
1921 && self.tunnel == other.tunnel
1922 && self.expiration == other.expiration
1923 }
1924 }
1925
1926 fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
1927 let state_mgr = TestingStateMgr::new();
1928 let guard_config = TestConfig::default();
1929 FakeBuilder::new(runtime, state_mgr, &guard_config)
1930 }
1931
1932 #[test]
1933 fn basic_tests() {
1934 MockRuntime::test_with_various(|rt| async move {
1935 #[allow(deprecated)] // TODO #1885
1936 let rt = MockSleepRuntime::new(rt);
1937
1938 let builder = make_builder(&rt);
1939
1940 let mgr = Arc::new(AbstractTunnelMgr::new(
1941 builder,
1942 rt.clone(),
1943 CircuitTiming::default(),
1944 ));
1945
1946 let webports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
1947
1948 // Check initialization.
1949 assert_eq!(mgr.n_tunnels(), 0);
1950 assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
1951
1952 // Launch a tunnel ; make sure we get it.
1953 let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
1954 let c1 = c1.unwrap().0;
1955 assert_eq!(mgr.n_tunnels(), 1);
1956
1957 // Make sure we get the one we already made if we ask for it.
1958 let port80 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
1959 let c2 = mgr.get_or_launch(&port80, di()).await;
1960
1961 let c2 = c2.unwrap().0;
1962 assert!(FakeCirc::eq(&c1, &c2));
1963 assert_eq!(mgr.n_tunnels(), 1);
1964
1965 // Now try launching two tunnels "at once" to make sure that our
1966 // pending-tunnel code works.
1967
1968 let dnsport = TargetTunnelUsage::new_from_ipv4_ports(&[53]);
1969 let dnsport_restrict = TargetTunnelUsage::Exit {
1970 ports: vec![TargetPort::ipv4(53)],
1971 isolation: StreamIsolation::builder().build().unwrap(),
1972 country_code: None,
1973 require_stability: false,
1974 };
1975
1976 let (c3, c4) = rt
1977 .wait_for(futures::future::join(
1978 mgr.get_or_launch(&dnsport, di()),
1979 mgr.get_or_launch(&dnsport_restrict, di()),
1980 ))
1981 .await;
1982
1983 let c3 = c3.unwrap().0;
1984 let c4 = c4.unwrap().0;
1985 assert!(!FakeCirc::eq(&c1, &c3));
1986 assert!(FakeCirc::eq(&c3, &c4));
1987 assert_eq!(c3.id(), c4.id());
1988 assert_eq!(mgr.n_tunnels(), 2);
1989
1990 // Now we're going to remove c3 from consideration. It's the
1991 // same as c4, so removing c4 will give us None.
1992 let c3_taken = mgr.take_tunnel(&c3.id()).unwrap();
1993 let now_its_gone = mgr.take_tunnel(&c4.id());
1994 assert!(FakeCirc::eq(&c3_taken, &c3));
1995 assert!(now_its_gone.is_none());
1996 assert_eq!(mgr.n_tunnels(), 1);
1997
1998 // Having removed them, let's launch another dnsport and make
1999 // sure we get a different tunnel.
2000 let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
2001 let c5 = c5.unwrap().0;
2002 assert!(!FakeCirc::eq(&c3, &c5));
2003 assert!(!FakeCirc::eq(&c4, &c5));
2004 assert_eq!(mgr.n_tunnels(), 2);
2005
2006 // Now try launch_by_usage.
2007 let prev = mgr.n_pending_tunnels();
2008 assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
2009 assert_eq!(mgr.n_pending_tunnels(), prev + 1);
2010 // TODO: Actually make sure that launch_by_usage launched
2011 // the right thing.
2012 });
2013 }
2014
2015 #[test]
2016 fn request_timeout() {
2017 MockRuntime::test_with_various(|rt| async move {
2018 #[allow(deprecated)] // TODO #1885
2019 let rt = MockSleepRuntime::new(rt);
2020
2021 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2022
2023 // This will fail once, and then completely time out. The
2024 // result will be a failure.
2025 let builder = make_builder(&rt);
2026 builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
2027
2028 let mgr = Arc::new(AbstractTunnelMgr::new(
2029 builder,
2030 rt.clone(),
2031 CircuitTiming::default(),
2032 ));
2033 let c1 = mgr
2034 .peek_runtime()
2035 .wait_for(mgr.get_or_launch(&ports, di()))
2036 .await;
2037
2038 assert!(matches!(c1, Err(Error::RequestFailed(_))));
2039 });
2040 }
2041
2042 #[test]
2043 fn request_timeout2() {
2044 MockRuntime::test_with_various(|rt| async move {
2045 #[allow(deprecated)] // TODO #1885
2046 let rt = MockSleepRuntime::new(rt);
2047
2048 // Now try a more complicated case: we'll try to get things so
2049 // that we wait for a little over our predicted time because
2050 // of our wait-for-next-action logic.
2051 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2052 let builder = make_builder(&rt);
2053 builder.set(
2054 &ports,
2055 vec![
2056 FakeOp::Delay(Duration::from_millis(60_000 - 25)),
2057 FakeOp::NoPlan,
2058 ],
2059 );
2060
2061 let mgr = Arc::new(AbstractTunnelMgr::new(
2062 builder,
2063 rt.clone(),
2064 CircuitTiming::default(),
2065 ));
2066 let c1 = mgr
2067 .peek_runtime()
2068 .wait_for(mgr.get_or_launch(&ports, di()))
2069 .await;
2070
2071 assert!(matches!(c1, Err(Error::RequestFailed(_))));
2072 });
2073 }
2074
2075 #[test]
2076 fn request_unplannable() {
2077 MockRuntime::test_with_various(|rt| async move {
2078 #[allow(deprecated)] // TODO #1885
2079 let rt = MockSleepRuntime::new(rt);
2080
2081 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2082
2083 // This will fail a the planning stages, a lot.
2084 let builder = make_builder(&rt);
2085 builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
2086
2087 let mgr = Arc::new(AbstractTunnelMgr::new(
2088 builder,
2089 rt.clone(),
2090 CircuitTiming::default(),
2091 ));
2092 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2093
2094 assert!(matches!(c1, Err(Error::RequestFailed(_))));
2095 });
2096 }
2097
2098 #[test]
2099 fn request_fails_too_much() {
2100 MockRuntime::test_with_various(|rt| async move {
2101 #[allow(deprecated)] // TODO #1885
2102 let rt = MockSleepRuntime::new(rt);
2103 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2104
2105 // This will fail 1000 times, which is above the retry limit.
2106 let builder = make_builder(&rt);
2107 builder.set(&ports, vec![FakeOp::Fail; 1000]);
2108
2109 let mgr = Arc::new(AbstractTunnelMgr::new(
2110 builder,
2111 rt.clone(),
2112 CircuitTiming::default(),
2113 ));
2114 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2115
2116 assert!(matches!(c1, Err(Error::RequestFailed(_))));
2117 });
2118 }
2119
2120 #[test]
2121 fn request_wrong_spec() {
2122 MockRuntime::test_with_various(|rt| async move {
2123 #[allow(deprecated)] // TODO #1885
2124 let rt = MockSleepRuntime::new(rt);
2125 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2126
2127 // The first time this is called, it will build a tunnel
2128 // with the wrong spec. (A tunnel builder should never
2129 // actually _do_ that, but it's something we code for.)
2130 let builder = make_builder(&rt);
2131 builder.set(
2132 &ports,
2133 vec![FakeOp::WrongSpec(target_to_spec(
2134 &TargetTunnelUsage::new_from_ipv4_ports(&[22]),
2135 ))],
2136 );
2137
2138 let mgr = Arc::new(AbstractTunnelMgr::new(
2139 builder,
2140 rt.clone(),
2141 CircuitTiming::default(),
2142 ));
2143 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2144
2145 assert!(c1.is_ok());
2146 });
2147 }
2148
2149 #[test]
2150 fn request_retried() {
2151 MockRuntime::test_with_various(|rt| async move {
2152 #[allow(deprecated)] // TODO #1885
2153 let rt = MockSleepRuntime::new(rt);
2154 let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2155
2156 // This will fail twice, and then succeed. The result will be
2157 // a success.
2158 let builder = make_builder(&rt);
2159 builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
2160
2161 let mgr = Arc::new(AbstractTunnelMgr::new(
2162 builder,
2163 rt.clone(),
2164 CircuitTiming::default(),
2165 ));
2166
2167 // This test doesn't exercise any timeout behaviour.
2168 rt.block_advance("test doesn't require advancing");
2169
2170 let (c1, c2) = rt
2171 .wait_for(futures::future::join(
2172 mgr.get_or_launch(&ports, di()),
2173 mgr.get_or_launch(&ports, di()),
2174 ))
2175 .await;
2176
2177 let c1 = c1.unwrap().0;
2178 let c2 = c2.unwrap().0;
2179
2180 assert!(FakeCirc::eq(&c1, &c2));
2181 });
2182 }
2183
2184 #[test]
2185 fn isolated() {
2186 MockRuntime::test_with_various(|rt| async move {
2187 #[allow(deprecated)] // TODO #1885
2188 let rt = MockSleepRuntime::new(rt);
2189 let builder = make_builder(&rt);
2190 let mgr = Arc::new(AbstractTunnelMgr::new(
2191 builder,
2192 rt.clone(),
2193 CircuitTiming::default(),
2194 ));
2195
2196 // Set our isolation so that iso1 and iso2 can't share a tunnel,
2197 // but no_iso can share a tunnel with either.
2198 let iso1 = TargetTunnelUsage::Exit {
2199 ports: vec![TargetPort::ipv4(443)],
2200 isolation: StreamIsolation::builder()
2201 .owner_token(IsolationToken::new())
2202 .build()
2203 .unwrap(),
2204 country_code: None,
2205 require_stability: false,
2206 };
2207 let iso2 = TargetTunnelUsage::Exit {
2208 ports: vec![TargetPort::ipv4(443)],
2209 isolation: StreamIsolation::builder()
2210 .owner_token(IsolationToken::new())
2211 .build()
2212 .unwrap(),
2213 country_code: None,
2214 require_stability: false,
2215 };
2216 let no_iso1 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
2217 let no_iso2 = no_iso1.clone();
2218
2219 // We're going to try launching these tunnels in 24 different
2220 // orders, to make sure that the outcome is correct each time.
2221 use itertools::Itertools;
2222 let timeouts: Vec<_> = [0_u64, 2, 4, 6]
2223 .iter()
2224 .map(|d| Duration::from_millis(*d))
2225 .collect();
2226
2227 for delays in timeouts.iter().permutations(4) {
2228 let d1 = delays[0];
2229 let d2 = delays[1];
2230 let d3 = delays[2];
2231 let d4 = delays[2];
2232 let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
2233 .wait_for(futures::future::join4(
2234 async {
2235 rt.sleep(*d1).await;
2236 mgr.get_or_launch(&iso1, di()).await
2237 },
2238 async {
2239 rt.sleep(*d2).await;
2240 mgr.get_or_launch(&iso2, di()).await
2241 },
2242 async {
2243 rt.sleep(*d3).await;
2244 mgr.get_or_launch(&no_iso1, di()).await
2245 },
2246 async {
2247 rt.sleep(*d4).await;
2248 mgr.get_or_launch(&no_iso2, di()).await
2249 },
2250 ))
2251 .await;
2252
2253 let c_iso1 = c_iso1.unwrap().0;
2254 let c_iso2 = c_iso2.unwrap().0;
2255 let c_no_iso1 = c_no_iso1.unwrap().0;
2256 let c_no_iso2 = c_no_iso2.unwrap().0;
2257
2258 assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
2259 assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
2260 assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
2261 assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
2262 assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
2263 assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
2264 }
2265 });
2266 }
2267
2268 #[test]
2269 fn opportunistic() {
2270 MockRuntime::test_with_various(|rt| async move {
2271 #[allow(deprecated)] // TODO #1885
2272 let rt = MockSleepRuntime::new(rt);
2273
2274 // The first request will time out completely, but we're
2275 // making a second request after we launch it. That
2276 // request should succeed, and notify the first request.
2277
2278 let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2279 let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2280
2281 let builder = make_builder(&rt);
2282 builder.set(&ports1, vec![FakeOp::Timeout]);
2283
2284 let mgr = Arc::new(AbstractTunnelMgr::new(
2285 builder,
2286 rt.clone(),
2287 CircuitTiming::default(),
2288 ));
2289 // Note that ports2 will be wider than ports1, so the second
2290 // request will have to launch a new tunnel.
2291
2292 let (c1, c2) = rt
2293 .wait_for(futures::future::join(
2294 mgr.get_or_launch(&ports1, di()),
2295 async {
2296 rt.sleep(Duration::from_millis(100)).await;
2297 mgr.get_or_launch(&ports2, di()).await
2298 },
2299 ))
2300 .await;
2301
2302 if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
2303 assert!(FakeCirc::eq(&c1, &c2));
2304 } else {
2305 panic!();
2306 };
2307 });
2308 }
2309
2310 #[test]
2311 fn prebuild() {
2312 MockRuntime::test_with_various(|rt| async move {
2313 // This time we're going to use ensure_tunnel() to make
2314 // sure that a tunnel gets built, and then launch two
2315 // other tunnels that will use it.
2316 #[allow(deprecated)] // TODO #1885
2317 let rt = MockSleepRuntime::new(rt);
2318 let builder = make_builder(&rt);
2319 let mgr = Arc::new(AbstractTunnelMgr::new(
2320 builder,
2321 rt.clone(),
2322 CircuitTiming::default(),
2323 ));
2324
2325 let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2326 let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2327 let ports3 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
2328
2329 let ok = mgr.ensure_tunnel(&ports1, di());
2330 let (c1, c2) = rt
2331 .wait_for(futures::future::join(
2332 async {
2333 rt.sleep(Duration::from_millis(10)).await;
2334 mgr.get_or_launch(&ports2, di()).await
2335 },
2336 async {
2337 rt.sleep(Duration::from_millis(50)).await;
2338 mgr.get_or_launch(&ports3, di()).await
2339 },
2340 ))
2341 .await;
2342
2343 assert!(ok.is_ok());
2344
2345 let c1 = c1.unwrap().0;
2346 let c2 = c2.unwrap().0;
2347
2348 // If we had launched these separately, they wouldn't share
2349 // a tunnel.
2350 assert!(FakeCirc::eq(&c1, &c2));
2351 });
2352 }
2353
2354 #[test]
2355 fn expiration() {
2356 MockRuntime::test_with_various(|rt| async move {
2357 use crate::config::CircuitTimingBuilder;
2358 // Now let's make some tunnels -- one dirty, one clean, and
2359 // make sure that one expires and one doesn't.
2360 #[allow(deprecated)] // TODO #1885
2361 let rt = MockSleepRuntime::new(rt);
2362 let builder = make_builder(&rt);
2363
2364 let circuit_timing = CircuitTimingBuilder::default()
2365 .max_dirtiness(Duration::from_secs(15))
2366 .build()
2367 .unwrap();
2368
2369 let mgr = Arc::new(AbstractTunnelMgr::new(builder, rt.clone(), circuit_timing));
2370
2371 let imap = TargetTunnelUsage::new_from_ipv4_ports(&[993]);
2372 let pop = TargetTunnelUsage::new_from_ipv4_ports(&[995]);
2373
2374 let ok = mgr.ensure_tunnel(&imap, di());
2375 let pop1 = rt.wait_for(mgr.get_or_launch(&pop, di())).await;
2376
2377 assert!(ok.is_ok());
2378 let pop1 = pop1.unwrap().0;
2379
2380 rt.advance(Duration::from_secs(30)).await;
2381 rt.advance(Duration::from_secs(15)).await;
2382 let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
2383
2384 // This should expire the pop tunnel, since it came from
2385 // get_or_launch() [which marks the tunnel as being
2386 // used]. It should not expire the imap tunnel, since
2387 // it was not dirty until 15 seconds after the cutoff.
2388 let now = rt.now();
2389
2390 mgr.expire_tunnels(now).await;
2391
2392 let (pop2, imap2) = rt
2393 .wait_for(futures::future::join(
2394 mgr.get_or_launch(&pop, di()),
2395 mgr.get_or_launch(&imap, di()),
2396 ))
2397 .await;
2398
2399 let pop2 = pop2.unwrap().0;
2400 let imap2 = imap2.unwrap().0;
2401
2402 assert!(!FakeCirc::eq(&pop2, &pop1));
2403 assert!(FakeCirc::eq(&imap2, &imap1));
2404 });
2405 }
2406
2407 /// Returns three exit policies; one that permits nothing, one that permits ports 80
2408 /// and 443 only, and one that permits all ports.
2409 fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
2410 // FIXME(eta): the below is copypasta; would be nice to have a better way of
2411 // constructing ExitPolicy objects for testing maybe
2412 let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2413
2414 // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
2415 // exits. Odd-numbered ones allow only ports 80 and 443;
2416 // even-numbered ones allow all ports.
2417 let id_noexit: Ed25519Identity = [0x05; 32].into();
2418 let id_webexit: Ed25519Identity = [0x11; 32].into();
2419 let id_fullexit: Ed25519Identity = [0x20; 32].into();
2420
2421 let not_exit = network.by_id(&id_noexit).unwrap();
2422 let web_exit = network.by_id(&id_webexit).unwrap();
2423 let full_exit = network.by_id(&id_fullexit).unwrap();
2424
2425 let ep_none = ExitPolicy::from_relay(¬_exit);
2426 let ep_web = ExitPolicy::from_relay(&web_exit);
2427 let ep_full = ExitPolicy::from_relay(&full_exit);
2428 (ep_none, ep_web, ep_full)
2429 }
2430
2431 #[test]
2432 fn test_find_supported() {
2433 let (ep_none, ep_web, ep_full) = get_exit_policies();
2434 let fake_circ = FakeCirc { id: FakeId::next() };
2435 let expiration = ExpirationInfo::Unused {
2436 created: Instant::get(),
2437 };
2438
2439 let mut entry_none = OpenEntry::new(
2440 SupportedTunnelUsage::Exit {
2441 policy: ep_none,
2442 isolation: None,
2443 country_code: None,
2444 all_relays_stable: true,
2445 },
2446 fake_circ.clone(),
2447 expiration.clone(),
2448 );
2449 let mut entry_none_c = entry_none.clone();
2450 let mut entry_web = OpenEntry::new(
2451 SupportedTunnelUsage::Exit {
2452 policy: ep_web,
2453 isolation: None,
2454 country_code: None,
2455 all_relays_stable: true,
2456 },
2457 fake_circ.clone(),
2458 expiration.clone(),
2459 );
2460 let mut entry_web_c = entry_web.clone();
2461 let mut entry_full = OpenEntry::new(
2462 SupportedTunnelUsage::Exit {
2463 policy: ep_full,
2464 isolation: None,
2465 country_code: None,
2466 all_relays_stable: true,
2467 },
2468 fake_circ,
2469 expiration,
2470 );
2471 let mut entry_full_c = entry_full.clone();
2472
2473 let usage_web = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2474 let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
2475
2476 assert_isoleq!(
2477 SupportedTunnelUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
2478 empty
2479 );
2480
2481 // HACK(eta): We have to faff around with clones and such because
2482 // `abstract_spec_find_supported` has a silly signature that involves `&mut`
2483 // refs, which we can't have more than one of.
2484
2485 assert_isoleq!(
2486 SupportedTunnelUsage::find_supported(
2487 vec![&mut entry_none, &mut entry_web].into_iter(),
2488 &usage_web,
2489 ),
2490 vec![&mut entry_web_c]
2491 );
2492
2493 assert_isoleq!(
2494 SupportedTunnelUsage::find_supported(
2495 vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2496 &usage_web,
2497 ),
2498 vec![&mut entry_web_c, &mut entry_full_c]
2499 );
2500
2501 // Test preemptive tunnel usage:
2502
2503 let usage_preemptive_web = TargetTunnelUsage::Preemptive {
2504 port: Some(TargetPort::ipv4(80)),
2505 circs: 2,
2506 require_stability: false,
2507 };
2508 let usage_preemptive_dns = TargetTunnelUsage::Preemptive {
2509 port: None,
2510 circs: 2,
2511 require_stability: false,
2512 };
2513
2514 // shouldn't return anything unless there are >=2 tunnels
2515
2516 assert_isoleq!(
2517 SupportedTunnelUsage::find_supported(
2518 vec![&mut entry_none].into_iter(),
2519 &usage_preemptive_web
2520 ),
2521 empty
2522 );
2523
2524 assert_isoleq!(
2525 SupportedTunnelUsage::find_supported(
2526 vec![&mut entry_none].into_iter(),
2527 &usage_preemptive_dns
2528 ),
2529 empty
2530 );
2531
2532 assert_isoleq!(
2533 SupportedTunnelUsage::find_supported(
2534 vec![&mut entry_none, &mut entry_web].into_iter(),
2535 &usage_preemptive_web
2536 ),
2537 empty
2538 );
2539
2540 assert_isoleq!(
2541 SupportedTunnelUsage::find_supported(
2542 vec![&mut entry_none, &mut entry_web].into_iter(),
2543 &usage_preemptive_dns
2544 ),
2545 vec![&mut entry_none_c, &mut entry_web_c]
2546 );
2547
2548 assert_isoleq!(
2549 SupportedTunnelUsage::find_supported(
2550 vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2551 &usage_preemptive_web
2552 ),
2553 vec![&mut entry_web_c, &mut entry_full_c]
2554 );
2555 }
2556
2557 #[test]
2558 fn test_circlist_preemptive_target_circs() {
2559 MockRuntime::test_with_various(|rt| async move {
2560 #[allow(deprecated)] // TODO #1885
2561 let rt = MockSleepRuntime::new(rt);
2562 let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2563 let dirinfo = DirInfo::Directory(&netdir);
2564
2565 let builder = make_builder(&rt);
2566
2567 for circs in [2, 8].iter() {
2568 let mut circlist = TunnelList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
2569
2570 let preemptive_target = TargetTunnelUsage::Preemptive {
2571 port: Some(TargetPort::ipv4(80)),
2572 circs: *circs,
2573 require_stability: false,
2574 };
2575
2576 for _ in 0..*circs {
2577 assert!(circlist.find_open(&preemptive_target).is_none());
2578
2579 let usage = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2580 let (plan, _) = builder.plan_tunnel(&usage, dirinfo).unwrap();
2581 let (spec, circ) = rt.wait_for(builder.build_tunnel(plan)).await.unwrap();
2582 let entry = OpenEntry::new(
2583 spec,
2584 circ,
2585 ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
2586 );
2587 circlist.add_open(entry);
2588 }
2589
2590 assert!(circlist.find_open(&preemptive_target).is_some());
2591 }
2592 });
2593 }
2594}