Skip to main content

tor_circmgr/
mgr.rs

1//! Abstract code to manage a set of tunnels which has underlying circuit(s).
2//!
3//! This module implements the real logic for deciding when and how to
4//! launch tunnels, and for which tunnels to hand out in response to
5//! which requests.
6//!
7//! For testing and abstraction purposes, this module _does not_
8//! actually know anything about tunnels _per se_.  Instead,
9//! everything is handled using a set of traits that are internal to this
10//! crate:
11//!
12//!  * [`AbstractTunnel`] is a view of a tunnel.
13//!  * [`AbstractTunnelBuilder`] knows how to build an `AbstractCirc`.
14//!
15//! Using these traits, the [`AbstractTunnelMgr`] object manages a set of
16//! tunnels , launching them as necessary, and keeping track of the
17//! restrictions on their use.
18
19// TODO:
20// - Testing
21//    - Error from prepare_action()
22//    - Error reported by restrict_mut?
23
24use crate::config::CircuitTiming;
25use crate::usage::{SupportedTunnelUsage, TargetTunnelUsage};
26use crate::{DirInfo, Error, PathConfig, Result, timeouts};
27
28use retry_error::RetryError;
29use tor_async_utils::mpsc_channel_no_memquota;
30use tor_basic_utils::retry::RetryDelay;
31use tor_config::MutCfg;
32use tor_error::{AbsRetryTime, HasRetryTime, debug_report, info_report, internal, warn_report};
33#[cfg(feature = "vanguards")]
34use tor_guardmgr::vanguards::VanguardMgr;
35use tor_linkspec::CircTarget;
36use tor_proto::circuit::UniqId;
37use tor_proto::client::circuit::{CircParameters, Path};
38use tor_rtcompat::{Runtime, SleepProviderExt};
39
40use async_trait::async_trait;
41use futures::channel::mpsc;
42use futures::future::{FutureExt, Shared};
43use futures::stream::{FuturesUnordered, StreamExt};
44use oneshot_fused_workaround as oneshot;
45use std::collections::HashMap;
46use std::fmt::Debug;
47use std::hash::Hash;
48use std::panic::AssertUnwindSafe;
49use std::sync::{self, Arc, Weak};
50use tor_rtcompat::SpawnExt;
51use tracing::{debug, instrument, trace, warn};
52use web_time_compat::{Duration, Instant};
53mod streams;
54
55/// Alias to force use of RandomState, regardless of features enabled in `weak_tables`.
56///
57/// See <https://github.com/tov/weak-table-rs/issues/23> for discussion.
58///
59/// (We could probably get away with a weaker hash function in this case, since
60/// the attacker _probably_ doesn't have control over our pointers.)
61type PtrWeakHashSet<T> = weak_table::PtrWeakHashSet<T, std::hash::RandomState>;
62
63/// Description of how we got a tunnel.
64#[non_exhaustive]
65#[derive(Debug, Copy, Clone, Eq, PartialEq)]
66pub(crate) enum TunnelProvenance {
67    /// This channel was newly launched, or was in progress and finished while
68    /// we were waiting.
69    NewlyCreated,
70    /// This channel already existed when we asked for it.
71    Preexisting,
72}
73
74/// An error returned when we cannot apply circuit restriction.
75#[derive(Clone, Debug, thiserror::Error)]
76#[non_exhaustive]
77pub enum RestrictionFailed {
78    /// Tried to restrict a specification, but the tunnel didn't support the
79    /// requested usage.
80    #[error("Specification did not support desired usage")]
81    NotSupported,
82}
83
84/// Minimal abstract view of a tunnel.
85///
86/// From this module's point of view, tunnels are simply objects
87/// with unique identities, and a possible closed-state.
88#[async_trait]
89pub(crate) trait AbstractTunnel: Debug {
90    /// Type for a unique identifier for tunnels.
91    type Id: Clone + Debug + Hash + Eq + Send + Sync;
92    /// Return the unique identifier for this tunnel.
93    ///
94    /// # Requirements
95    ///
96    /// The values returned by this function are unique for distinct
97    /// tunnels.
98    fn id(&self) -> Self::Id;
99
100    /// Return true if this tunnel is usable for some purpose.
101    ///
102    /// Reasons a tunnel might be unusable include being closed.
103    fn usable(&self) -> bool;
104
105    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
106    ///
107    /// Returns an error if the tunnel has more than one tunnel.
108    fn single_path(&self) -> tor_proto::Result<Arc<Path>>;
109
110    /// Return the number of hops in this tunnel.
111    ///
112    /// Returns an error if the circuit is closed.
113    ///
114    /// NOTE: This function will currently return only the number of hops
115    /// _currently_ in the tunnel. If there is an extend operation in progress,
116    /// the currently pending hop may or may not be counted, depending on whether
117    /// the extend operation finishes before this call is done.
118    fn n_hops(&self) -> tor_proto::Result<usize>;
119
120    /// Return true if this tunnel is closed and therefore unusable.
121    fn is_closing(&self) -> bool;
122
123    /// Return a process-unique identifier for this tunnel.
124    fn unique_id(&self) -> UniqId;
125
126    /// Extend the tunnel via the most appropriate handshake to a new `target` hop.
127    async fn extend<T: CircTarget + Sync>(
128        &self,
129        target: &T,
130        params: CircParameters,
131    ) -> tor_proto::Result<()>;
132
133    /// Return a time at which this tunnel is last known to be used,
134    /// or None if it is in use right now (or has never been used).
135    async fn last_known_to_be_used_at(&self) -> tor_proto::Result<Option<Instant>>;
136}
137
138/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
139///
140/// You should implement this trait using all default methods for all code that isn't test code.
141pub(crate) trait MockablePlan {
142    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
143    /// so that it knows what to pass to `::release_advance()`.
144    fn add_blocked_advance_reason(&mut self, _reason: String) {}
145}
146
147/// An object that knows how to build tunnels.
148///
149/// This creates tunnels in two phases. First, a plan is
150/// made for how to build the tunnel. This planning phase should be
151/// relatively fast, and must not suspend or block.  Its purpose is to
152/// get an early estimate of which operations the tunnel will be able
153/// to support when it's done.
154///
155/// Second, the tunnel is actually built, using the plan as input.
156
157#[async_trait]
158pub(crate) trait AbstractTunnelBuilder<R: Runtime>: Send + Sync {
159    /// The tunnel type that this builder knows how to build.
160    type Tunnel: AbstractTunnel + Send + Sync;
161    /// An opaque type describing how a given tunnel will be built.
162    /// It may represent some or all of a path-or it may not.
163    //
164    // TODO: It would be nice to have this parameterized on a lifetime,
165    // and have that lifetime depend on the lifetime of the directory.
166    // But I don't think that rust can do that.
167    //
168    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
169    type Plan: Send + Debug + MockablePlan;
170
171    // TODO: I'd like to have a Dir type here to represent
172    // create::DirInfo, but that would need to be parameterized too,
173    // and would make everything complicated.
174
175    /// Form a plan for how to build a new tunnel that supports `usage`.
176    ///
177    /// Return an opaque Plan object, and a new spec describing what
178    /// the tunnel will actually support when it's built.  (For
179    /// example, if the input spec requests a tunnel that connect to
180    /// port 80, then "planning" the tunnel might involve picking an
181    /// exit that supports port 80, and the resulting spec might be
182    /// the exit's complete list of supported ports.)
183    ///
184    /// # Requirements
185    ///
186    /// The resulting Spec must support `usage`.
187    fn plan_tunnel(
188        &self,
189        usage: &TargetTunnelUsage,
190        dir: DirInfo<'_>,
191    ) -> Result<(Self::Plan, SupportedTunnelUsage)>;
192
193    /// Construct a tunnel according to a given plan.
194    ///
195    /// On success, return a spec describing what the tunnel can be used for,
196    /// and the tunnel that was just constructed.
197    ///
198    /// This function should implement some kind of a timeout for
199    /// tunnel that are taking too long.
200    ///
201    /// # Requirements
202    ///
203    /// The spec that this function returns _must_ support the usage
204    /// that was originally passed to `plan_tunnel`.  It _must_ also
205    /// contain the spec that was originally returned by
206    /// `plan_tunnel`.
207    async fn build_tunnel(&self, plan: Self::Plan) -> Result<(SupportedTunnelUsage, Self::Tunnel)>;
208
209    /// Return a "parallelism factor" with which tunnels should be
210    /// constructed for a given purpose.
211    ///
212    /// If this function returns N, then whenever we launch tunnels
213    /// for this purpose, then we launch N in parallel.
214    ///
215    /// The default implementation returns 1.  The value of 0 is
216    /// treated as if it were 1.
217    fn launch_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
218        let _ = usage; // default implementation ignores this.
219        1
220    }
221
222    /// Return a "parallelism factor" for which tunnels should be
223    /// used for a given purpose.
224    ///
225    /// If this function returns N, then whenever we select among
226    /// open tunnels for this purpose, we choose at random from the
227    /// best N.
228    ///
229    /// The default implementation returns 1.  The value of 0 is
230    /// treated as if it were 1.
231    // TODO: Possibly this doesn't belong in this trait.
232    fn select_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
233        let _ = usage; // default implementation ignores this.
234        1
235    }
236
237    /// Return true if we are currently attempting to learn tunnel
238    /// timeouts by building testing tunnels.
239    fn learning_timeouts(&self) -> bool;
240
241    /// Flush state to the state manager if we own the lock.
242    ///
243    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
244    fn save_state(&self) -> Result<bool>;
245
246    /// Return this builder's [`PathConfig`].
247    fn path_config(&self) -> Arc<PathConfig>;
248
249    /// Replace this builder's [`PathConfig`].
250    // TODO: This is dead_code because we only call this for the CircuitBuilder specialization of
251    // CircMgr, not from the generic version, because this trait doesn't provide guardmgr, which is
252    // needed by the [`CircMgr::reconfigure`] function that would be the only caller of this. We
253    // should add `guardmgr` to this trait, make [`CircMgr::reconfigure`] generic, and remove this
254    // dead_code marking.
255    #[allow(dead_code)]
256    fn set_path_config(&self, new_config: PathConfig);
257
258    /// Return a reference to this builder's timeout estimator.
259    fn estimator(&self) -> &timeouts::Estimator;
260
261    /// Return a reference to this builder's `VanguardMgr`.
262    #[cfg(feature = "vanguards")]
263    fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
264
265    /// Replace our state with a new owning state, assuming we have
266    /// storage permission.
267    fn upgrade_to_owned_state(&self) -> Result<()>;
268
269    /// Reload persistent state from disk, if we don't have storage permission.
270    fn reload_state(&self) -> Result<()>;
271
272    /// Return a reference to this builder's `GuardMgr`.
273    fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
274
275    /// Reconfigure this builder using the latest set of network parameters.
276    ///
277    /// (NOTE: for now, this only affects tunnel timeout estimation.)
278    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
279}
280
281/// Enumeration to track the expiration state of a tunnel.
282///
283/// A tunnel an either be unused (at which point it should expire if it is
284/// _still unused_ by a certain time, or dirty (at which point it should
285/// expire after a certain duration).
286///
287/// All tunnels start out "unused" and become "dirty" when their spec
288/// is first restricted -- that is, when they are first handed out to be
289/// used for a request.
290#[derive(Debug, Clone, PartialEq, Eq)]
291enum ExpirationInfo {
292    /// The tunnel has never been used, and has never been restricted for use with a request.
293    Unused {
294        /// A time when the tunnel was created.
295        created: Instant,
296    },
297
298    /// The tunnel is not-long-lived; we will expire by waiting until a certain amount of time
299    /// after it was first used.
300    Dirty {
301        /// The time at which this tunnel's spec was first restricted.
302        dirty_since: Instant,
303    },
304
305    /// The tunnel is long-lived; we will expire by waiting until it has passed
306    /// a certain amount of time without having any streams attached to it.
307    LongLived {
308        /// Last time at which the tunnel was checked and found not to have any streams.
309        ///
310        /// (This is a bit complicated: We have to be vague here, since we need
311        /// an async check to find out that a tunnel is used, or when it actually
312        /// became disused.)
313        last_known_to_be_used_at: Instant,
314    },
315}
316
317impl ExpirationInfo {
318    /// Return an ExpirationInfo for a newly created tunnel.
319    fn new(now: Instant) -> Self {
320        ExpirationInfo::Unused { created: now }
321    }
322
323    /// Mark this ExpirationInfo as having been in-use at `now`.
324    ///
325    /// If `long_lived` is false, the associated tunnel should expire a certain amount of time
326    /// after it was _first_ used.
327    /// If `long_lived` is true, the associated tunnel should expire a certain amount of time
328    /// after it was _last_ used.
329    fn mark_used(&mut self, now: Instant, long_lived: bool) {
330        if long_lived {
331            *self = ExpirationInfo::LongLived {
332                last_known_to_be_used_at: now,
333            };
334        } else {
335            match self {
336                ExpirationInfo::Unused { .. } => {
337                    // This is our first time using this circuit; mark it dirty
338                    *self = ExpirationInfo::Dirty { dirty_since: now };
339                }
340                ExpirationInfo::Dirty { .. } => {
341                    // no need to update; we're tracking the time when the circuit _first_ became
342                    // dirty, so further uses don't matter.
343                }
344                ExpirationInfo::LongLived { .. } => {
345                    // shouldn't occur: we shouldn't be able to attach a stream with non-long-lived isolation
346                    // to a tunnel marked as long-lived.  In this case we leave the timestamp alone.
347                    // (If there were a bug here, it would be harmless, since we would
348                    // correct the timestamp the next time we tried to expire the circuit.)
349                }
350            }
351        }
352    }
353
354    /// Return an internal error if this ExpirationInfo is not marked as long-lived.
355    fn check_long_lived(&self) -> Result<()> {
356        match self {
357            ExpirationInfo::Unused { .. } | ExpirationInfo::Dirty { .. } => Err(internal!(
358                "Tunnel was not long-lived as expected. (Expiration status: {:?})",
359                self
360            )
361            .into()),
362            ExpirationInfo::LongLived { .. } => Ok(()),
363        }
364    }
365}
366
367/// Settings to determine when circuits are expired.
368#[derive(Clone, Debug)]
369pub(crate) struct ExpirationParameters {
370    /// Any unused circuit is expired this long after it was created.
371    expire_unused_after: Duration,
372    /// Any non long-lived dirty circuit is expired this long after it first becomes dirty.
373    expire_dirty_after: Duration,
374    /// Any long-lived circuit is expired after having been disused for this long.
375    expire_disused_after: Duration,
376}
377
378/// An entry for an open tunnel held by an `AbstractTunnelMgr`.
379#[derive(Debug, Clone)]
380pub(crate) struct OpenEntry<T> {
381    /// The supported usage for this tunnel.
382    spec: SupportedTunnelUsage,
383    /// The tunnel under management.
384    tunnel: Arc<T>,
385    /// When does this tunnel expire?
386    ///
387    /// (Note that expired tunnels are removed from the manager,
388    /// which does not actually close them until there are no more
389    /// references to them.)
390    expiration: ExpirationInfo,
391}
392
393impl<T: AbstractTunnel> OpenEntry<T> {
394    /// Make a new OpenEntry for a given tunnel and spec.
395    fn new(spec: SupportedTunnelUsage, tunnel: T, expiration: ExpirationInfo) -> Self {
396        OpenEntry {
397            spec,
398            tunnel: tunnel.into(),
399            expiration,
400        }
401    }
402
403    /// Return true if the underlying tunnel can be used for `usage`.
404    pub(crate) fn supports(&self, usage: &TargetTunnelUsage) -> bool {
405        self.tunnel.usable() && self.spec.supports(usage)
406    }
407
408    /// Change the underlying tunnel's permissible usage, based on its having
409    /// been used for `usage` at time `now`.
410    ///
411    /// Return an error if the tunnel may not be used for `usage`.
412    fn restrict_mut(&mut self, usage: &TargetTunnelUsage, now: Instant) -> Result<()> {
413        self.spec.restrict_mut(usage)?;
414        self.expiration.mark_used(now, self.spec.is_long_lived());
415        Ok(())
416    }
417
418    /// Find the "best" entry from a slice of OpenEntry for supporting
419    /// a given `usage`.
420    ///
421    /// If `parallelism` is some N greater than 1, we pick randomly
422    /// from the best `N` tunnels.
423    ///
424    /// # Requirements
425    ///
426    /// Requires that `ents` is nonempty, and that every element of `ents`
427    /// supports `spec`.
428    fn find_best<'a>(
429        // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
430        ents: &'a mut [&'a mut Self],
431        usage: &TargetTunnelUsage,
432        parallelism: usize,
433    ) -> &'a mut Self {
434        let _ = usage; // not yet used.
435        use rand::seq::IndexedMutRandom as _;
436        let parallelism = parallelism.clamp(1, ents.len());
437        // TODO: Actually look over the whole list to see which is better.
438        let slice = &mut ents[0..parallelism];
439        let mut rng = rand::rng();
440        slice.choose_mut(&mut rng).expect("Input list was empty")
441    }
442
443    /// Return true if this tunnel should be expired given that the current time is `now`,
444    /// and the current settings are `params`.
445    fn should_expire(&self, now: Instant, params: &ExpirationParameters) -> ShouldExpire {
446        match self.expiration {
447            ExpirationInfo::Unused { created } => {
448                ShouldExpire::certain(now, created + params.expire_unused_after)
449            }
450            ExpirationInfo::Dirty { dirty_since } => {
451                ShouldExpire::certain(now, dirty_since + params.expire_dirty_after)
452            }
453            ExpirationInfo::LongLived {
454                last_known_to_be_used_at,
455            } => {
456                ShouldExpire::uncertain(now, last_known_to_be_used_at + params.expire_disused_after)
457            }
458        }
459    }
460}
461
462/// When should a tunnel expire?
463///
464/// Reflects possible uncertainty.
465#[derive(Clone, Copy, Debug, Eq, PartialEq)]
466enum ShouldExpire {
467    /// The tunnel should expire now.
468    Now,
469    /// The circuit might expire now; we need to check.
470    ///
471    /// (This is the result we get when we know that this is a tunnel that should expire
472    /// if it has gone for some duration D without having any streams on it,
473    /// and that it definitely had a stream at time T.  It is now at least time T+D,
474    /// but we don't know whether the tunnel has any streams in the intervening time.
475    /// We need to call the async fn `last_known_to_be_used_at` to check.)
476    PossiblyNow,
477    /// The tunnel will not expire before the specified time.
478    NotBefore(Instant),
479}
480
481impl ShouldExpire {
482    /// Return a ShouldExpire reflecting an expiration that is known to be happening at `expiration`.
483    fn certain(now: Instant, expiration: Instant) -> Self {
484        if now >= expiration {
485            ShouldExpire::Now
486        } else {
487            ShouldExpire::NotBefore(expiration)
488        }
489    }
490
491    /// Return a ShouldExpire reflecting an expiration that is known to be no sooner than `expiration`,
492    /// but possibly later.
493    fn uncertain(now: Instant, expiration: Instant) -> Self {
494        if now >= expiration {
495            ShouldExpire::PossiblyNow
496        } else {
497            ShouldExpire::NotBefore(expiration)
498        }
499    }
500}
501
502/// A result type whose "Ok" value is the Id for a tunnel from B.
503type PendResult<B, R> = Result<<<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id>;
504
505/// An in-progress tunnel request tracked by an `AbstractTunnelMgr`.
506///
507/// (In addition to tracking tunnels, `AbstractTunnelMgr` tracks
508/// _requests_ for tunnels.  The manager uses these entries if it
509/// finds that some tunnel created _after_ a request first launched
510/// might meet the request's requirements.)
511struct PendingRequest<B: AbstractTunnelBuilder<R>, R: Runtime> {
512    /// Usage for the operation requested by this request
513    usage: TargetTunnelUsage,
514    /// A channel to use for telling this request about tunnels that it
515    /// might like.
516    notify: mpsc::Sender<PendResult<B, R>>,
517}
518
519impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingRequest<B, R> {
520    /// Return true if this request would be supported by `spec`.
521    fn supported_by(&self, spec: &SupportedTunnelUsage) -> bool {
522        spec.supports(&self.usage)
523    }
524}
525
526/// An entry for an under-construction in-progress tunnel tracked by
527/// an `AbstractTunnelMgr`.
528#[derive(Debug)]
529struct PendingEntry<B: AbstractTunnelBuilder<R>, R: Runtime> {
530    /// Specification that this tunnel will support, if every pending
531    /// request that is waiting for it is attached to it.
532    ///
533    /// This spec becomes more and more restricted as more pending
534    /// requests are waiting for this tunnel.
535    ///
536    /// This spec is contained by circ_spec, and must support the usage
537    /// of every pending request that's waiting for this tunnel.
538    tentative_assignment: sync::Mutex<SupportedTunnelUsage>,
539    /// A shared future for requests to use when waiting for
540    /// notification of this tunnel's success.
541    receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
542}
543
544impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingEntry<B, R> {
545    /// Make a new PendingEntry that starts out supporting a given
546    /// spec.  Return that PendingEntry, along with a Sender to use to
547    /// report the result of building this tunnel.
548    fn new(spec: &SupportedTunnelUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
549        let tentative_assignment = sync::Mutex::new(spec.clone());
550        let (sender, receiver) = oneshot::channel();
551        let receiver = receiver.shared();
552        let entry = PendingEntry {
553            tentative_assignment,
554            receiver,
555        };
556        (entry, sender)
557    }
558
559    /// Return true if this tunnel's current tentative assignment
560    /// supports `usage`.
561    fn supports(&self, usage: &TargetTunnelUsage) -> bool {
562        let assignment = self.tentative_assignment.lock().expect("poisoned lock");
563        assignment.supports(usage)
564    }
565
566    /// Try to change the tentative assignment of this tunnel by
567    /// restricting it for use with `usage`.
568    ///
569    /// Return an error if the current tentative assignment didn't
570    /// support `usage` in the first place.
571    fn tentative_restrict_mut(&self, usage: &TargetTunnelUsage) -> Result<()> {
572        if let Ok(mut assignment) = self.tentative_assignment.lock() {
573            assignment.restrict_mut(usage)?;
574        }
575        Ok(())
576    }
577
578    /// Find the best PendingEntry values from a slice for use with
579    /// `usage`.
580    ///
581    /// # Requirements
582    ///
583    /// The `ents` slice must not be empty.  Every element of `ents`
584    /// must support the given spec.
585    fn find_best(ents: &[Arc<Self>], usage: &TargetTunnelUsage) -> Vec<Arc<Self>> {
586        // TODO: Actually look over the whole list to see which is better.
587        let _ = usage; // currently unused
588        vec![Arc::clone(&ents[0])]
589    }
590}
591
592/// Wrapper type to represent the state between planning to build a
593/// tunnel and constructing it.
594#[derive(Debug)]
595struct TunnelBuildPlan<B: AbstractTunnelBuilder<R>, R: Runtime> {
596    /// The Plan object returned by [`AbstractTunnelBuilder::plan_tunnel`].
597    plan: B::Plan,
598    /// A sender to notify any pending requests when this tunnel is done.
599    sender: oneshot::Sender<PendResult<B, R>>,
600    /// A strong entry to the PendingEntry for this tunnel build attempt.
601    pending: Arc<PendingEntry<B, R>>,
602}
603
604/// The inner state of an [`AbstractTunnelMgr`].
605struct TunnelList<B: AbstractTunnelBuilder<R>, R: Runtime> {
606    /// A map from tunnel ID to [`OpenEntry`] values for all managed
607    /// open tunnels.
608    ///
609    /// A tunnel is added here from [`AbstractTunnelMgr::do_launch`] when we find
610    /// that it completes successfully, and has not been cancelled.
611    /// When we decide that such a tunnel should no longer be handed out for
612    /// any new requests, we "retire" the tunnel by removing it from this map.
613    #[allow(clippy::type_complexity)]
614    open_tunnels: HashMap<<B::Tunnel as AbstractTunnel>::Id, OpenEntry<B::Tunnel>>,
615    /// Weak-set of PendingEntry for tunnels that are being built.
616    ///
617    /// Because this set only holds weak references, and the only strong
618    /// reference to the PendingEntry is held by the task building the tunnel,
619    /// this set's members are lazily removed after the tunnel is either built
620    /// or fails to build.
621    ///
622    /// This set is used for two purposes:
623    ///
624    /// 1. When a tunnel request finds that there is no open tunnel for its
625    ///    purposes, it checks here to see if there is a pending tunnel that it
626    ///    could wait for.
627    /// 2. When a pending tunnel finishes building, it checks here to make sure
628    ///    that it has not been cancelled. (Removing an entry from this set marks
629    ///    it as cancelled.)
630    ///
631    /// An entry is added here in [`AbstractTunnelMgr::prepare_action`] when we
632    /// decide that a tunnel needs to be launched.
633    ///
634    /// Later, in [`AbstractTunnelMgr::do_launch`], once the tunnel has finished
635    /// (or failed), we remove the entry (by pointer identity).
636    /// If we cannot find the entry, we conclude that the request has been
637    /// _cancelled_, and so we discard any tunnel that was created.
638    pending_tunnels: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
639    /// Weak-set of PendingRequest for requests that are waiting for a
640    /// tunnel to be built.
641    ///
642    /// Because this set only holds weak references, and the only
643    /// strong reference to the PendingRequest is held by the task
644    /// waiting for the tunnel to be built, this set's members are
645    /// lazily removed after the request succeeds or fails.
646    pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
647}
648
649impl<B: AbstractTunnelBuilder<R>, R: Runtime> TunnelList<B, R> {
650    /// Make a new empty `CircList`
651    fn new() -> Self {
652        TunnelList {
653            open_tunnels: HashMap::new(),
654            pending_tunnels: PtrWeakHashSet::new(),
655            pending_requests: PtrWeakHashSet::new(),
656        }
657    }
658
659    /// Add `e` to the list of open tunnels.
660    fn add_open(&mut self, e: OpenEntry<B::Tunnel>) {
661        let id = e.tunnel.id();
662        self.open_tunnels.insert(id, e);
663    }
664
665    /// Find all the usable open tunnels that support `usage`.
666    ///
667    /// Return None if there are no such tunnels.
668    fn find_open(&mut self, usage: &TargetTunnelUsage) -> Option<Vec<&mut OpenEntry<B::Tunnel>>> {
669        let list = self.open_tunnels.values_mut();
670        let v = SupportedTunnelUsage::find_supported(list, usage);
671        if v.is_empty() { None } else { Some(v) }
672    }
673
674    /// Find an open tunnel by ID.
675    ///
676    /// Return None if no such tunnels exists in this list.
677    fn get_open_mut(
678        &mut self,
679        id: &<B::Tunnel as AbstractTunnel>::Id,
680    ) -> Option<&mut OpenEntry<B::Tunnel>> {
681        self.open_tunnels.get_mut(id)
682    }
683
684    /// Extract an open tunnel by ID, removing it from this list.
685    ///
686    /// Return None if no such tunnel exists in this list.
687    fn take_open(
688        &mut self,
689        id: &<B::Tunnel as AbstractTunnel>::Id,
690    ) -> Option<OpenEntry<B::Tunnel>> {
691        self.open_tunnels.remove(id)
692    }
693
694    /// Remove tunnels based on expiration times.
695    ///
696    /// We remove every unused tunnel that is set to expire by
697    /// `unused_cutoff`, and every dirty tunnel that has been dirty
698    /// since before `dirty_cutoff`.
699    ///
700    /// Return the next time at which anything will definitely expire,
701    /// and a list of long-lived tunnels where we need to check their usage status
702    /// before we can be sure if they are expired.
703    #[must_use]
704    fn expire_tunnels(
705        &mut self,
706        now: Instant,
707        params: &ExpirationParameters,
708    ) -> (Option<Instant>, Vec<Weak<B::Tunnel>>) {
709        let mut need_check = Vec::new();
710        let mut earliest_expiration = None;
711        self.open_tunnels
712            .retain(|_k, v| match v.should_expire(now, params) {
713                // Expires now: Do not retain.
714                ShouldExpire::Now => false,
715
716                // Will expire at `when`: keep, but update `earliest_expiration`.
717                ShouldExpire::NotBefore(when) => {
718                    earliest_expiration = match earliest_expiration {
719                        Some(t) if t < when => Some(t),
720                        _ => Some(when),
721                    };
722                    true
723                }
724
725                // Need to check tunnel to see if/when it is disused.
726                ShouldExpire::PossiblyNow => {
727                    need_check.push(Arc::downgrade(&v.tunnel));
728                    true
729                }
730            });
731        (earliest_expiration, need_check)
732    }
733
734    /// Return the time when the tunnel with given `id`, should expire.
735    ///
736    /// Return None if no such tunnel exists.
737    fn tunnel_should_expire(
738        &mut self,
739        id: &<B::Tunnel as AbstractTunnel>::Id,
740        now: Instant,
741        params: &ExpirationParameters,
742    ) -> Option<ShouldExpire> {
743        self.open_tunnels
744            .get(id)
745            .map(|v| v.should_expire(now, params))
746    }
747
748    /// Update the "last known to be in use" time of a long-lived tunnel with ID `id`,
749    /// based on learning when it was last used.
750    ///
751    /// Expire the tunnel if appropriate.
752    ///
753    /// If the tunnel is still part of the map, return the next instant at which it might expire.
754    ///
755    /// Returns an error if the tunnel was present but was _not_ already marked as long-lived.
756    fn update_long_lived_tunnel_last_used(
757        &mut self,
758        id: &<B::Tunnel as AbstractTunnel>::Id,
759        now: Instant,
760        params: &ExpirationParameters,
761        disused_since: &tor_proto::Result<Option<Instant>>,
762    ) -> crate::Result<Option<Instant>> {
763        let Ok(disused_since) = disused_since else {
764            // got an error looking up disused time: discard the circuit.
765            let discard = self.take_open(id);
766            if let Some(ent) = discard {
767                ent.expiration.check_long_lived()?;
768            }
769            return Ok(None);
770        };
771        let Some(tun) = self.open_tunnels.get_mut(id) else {
772            // Circuit isn't there. Return.
773            return Ok(None);
774        };
775        tun.expiration.check_long_lived()?;
776        let last_known_in_use_at = disused_since.unwrap_or(now);
777
778        tun.expiration.mark_used(last_known_in_use_at, true);
779        match tun.should_expire(now, params) {
780            ShouldExpire::Now | ShouldExpire::PossiblyNow => {
781                let _discard = self.take_open(id);
782                Ok(None)
783            }
784            ShouldExpire::NotBefore(instant) => Ok(Some(instant)),
785        }
786    }
787
788    /// Add `pending` to the set of in-progress tunnels.
789    fn add_pending_tunnel(&mut self, pending: Arc<PendingEntry<B, R>>) {
790        self.pending_tunnels.insert(pending);
791    }
792
793    /// Find all pending tunnels that support `usage`.
794    ///
795    /// If no such tunnels are currently being built, return None.
796    fn find_pending_tunnels(
797        &self,
798        usage: &TargetTunnelUsage,
799    ) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
800        let result: Vec<_> = self
801            .pending_tunnels
802            .iter()
803            .filter(|p| p.supports(usage))
804            .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
805            .collect();
806
807        if result.is_empty() {
808            None
809        } else {
810            Some(result)
811        }
812    }
813
814    /// Return true if `circ` is still pending.
815    ///
816    /// A tunnel will become non-pending when finishes (successfully or not), or when it's
817    /// removed from this list via `clear_all_tunnels()`.
818    fn tunnel_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
819        self.pending_tunnels.contains(circ)
820    }
821
822    /// Construct and add a new entry to the set of request waiting
823    /// for a tunnel.
824    ///
825    /// Return the request, and a new receiver stream that it should
826    /// use for notification of possible tunnels to use.
827    fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
828        self.pending_requests.insert(Arc::clone(pending));
829    }
830
831    /// Return all pending requests that would be satisfied by a tunnel
832    /// that supports `circ_spec`.
833    fn find_pending_requests(
834        &self,
835        circ_spec: &SupportedTunnelUsage,
836    ) -> Vec<Arc<PendingRequest<B, R>>> {
837        self.pending_requests
838            .iter()
839            .filter(|pend| pend.supported_by(circ_spec))
840            .collect()
841    }
842
843    /// Clear all pending and open tunnels.
844    ///
845    /// Calling `clear_all_tunnels` ensures that any request that is answered _after
846    /// this method runs_ will receive a tunnels that was launched _after this
847    /// method runs_.
848    fn clear_all_tunnels(&mut self) {
849        // Note that removing entries from pending_circs will also cause the
850        // tunnel tasks to realize that they are cancelled when they
851        // go to tell anybody about their results.
852        self.pending_tunnels.clear();
853        self.open_tunnels.clear();
854    }
855}
856
857/// Timing information for tunnels that have been built but never used.
858///
859/// Currently taken from the network parameters.
860struct UnusedTimings {
861    /// Minimum lifetime of a tunnel created while learning
862    /// tunnel timeouts.
863    learning: Duration,
864    /// Minimum lifetime of a tunnel created while not learning
865    /// tunnel timeouts.
866    not_learning: Duration,
867}
868
869// This isn't really fallible, given the definitions of the underlying
870// types.
871#[allow(clippy::fallible_impl_from)]
872impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
873    fn from(v: &tor_netdir::params::NetParameters) -> Self {
874        // These try_into() calls can't fail, so unwrap() can't panic.
875        #[allow(clippy::unwrap_used)]
876        UnusedTimings {
877            learning: v
878                .unused_client_circ_timeout_while_learning_cbt
879                .try_into()
880                .unwrap(),
881            not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
882        }
883    }
884}
885
886/// Abstract implementation for tunnel management.
887///
888/// The algorithm provided here is fairly simple. In its simplest form:
889///
890/// When somebody asks for a tunnel for a given operation: if we find
891/// one open already, we return it.  If we find in-progress tunnels
892/// that would meet our needs, we wait for one to finish (or for all
893/// to fail).  And otherwise, we launch one or more tunnels to meet the
894/// request's needs.
895///
896/// If this process fails, then we retry it, up to a timeout or a
897/// numerical limit.
898///
899/// If a tunnel not previously considered for a given request
900/// finishes before the request is satisfied, and if the tunnel would
901/// satisfy the request, we try to give that tunnel as an answer to
902/// that request even if it was not one of the tunnels that request
903/// was waiting for.
904pub(crate) struct AbstractTunnelMgr<B: AbstractTunnelBuilder<R>, R: Runtime> {
905    /// Builder used to construct tunnels.
906    builder: B,
907    /// An asynchronous runtime to use for launching tasks and
908    /// checking timeouts.
909    runtime: R,
910    /// A CircList to manage our list of tunnels, requests, and
911    /// pending tunnels.
912    tunnels: sync::Mutex<TunnelList<B, R>>,
913
914    /// Configured information about when to expire tunnels and requests.
915    circuit_timing: MutCfg<CircuitTiming>,
916
917    /// Minimum lifetime of an unused tunnel.
918    ///
919    /// Derived from the network parameters.
920    unused_timing: sync::Mutex<UnusedTimings>,
921}
922
923/// An action to take in order to satisfy a request for a tunnel.
924enum Action<B: AbstractTunnelBuilder<R>, R: Runtime> {
925    /// We found an open tunnel: return immediately.
926    Open(Arc<B::Tunnel>),
927    /// We found one or more pending tunnels: wait until one succeeds,
928    /// or all fail.
929    Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
930    /// We should launch tunnels: here are the instructions for how
931    /// to do so.
932    Build(Vec<TunnelBuildPlan<B, R>>),
933}
934
935impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> AbstractTunnelMgr<B, R> {
936    /// Construct a new AbstractTunnelMgr.
937    pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
938        let circs = sync::Mutex::new(TunnelList::new());
939        let dflt_params = tor_netdir::params::NetParameters::default();
940        let unused_timing = (&dflt_params).into();
941        AbstractTunnelMgr {
942            builder,
943            runtime,
944            tunnels: circs,
945            circuit_timing: circuit_timing.into(),
946            unused_timing: sync::Mutex::new(unused_timing),
947        }
948    }
949
950    /// Reconfigure this manager using the latest set of network parameters.
951    pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
952        let mut u = self
953            .unused_timing
954            .lock()
955            .expect("Poisoned lock for unused_timing");
956        *u = p.into();
957    }
958
959    /// Return this manager's [`CircuitTiming`].
960    pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
961        self.circuit_timing.get()
962    }
963
964    /// Return this manager's [`CircuitTiming`].
965    pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
966        self.circuit_timing.replace(new_config);
967    }
968    /// Return a circuit suitable for use with a given `usage`,
969    /// creating that circuit if necessary, and restricting it
970    /// under the assumption that it will be used for that spec.
971    ///
972    /// This is the primary entry point for AbstractTunnelMgr.
973    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor?
974    #[instrument(level = "trace", skip_all)]
975    pub(crate) async fn get_or_launch(
976        self: &Arc<Self>,
977        usage: &TargetTunnelUsage,
978        dir: DirInfo<'_>,
979    ) -> Result<(Arc<B::Tunnel>, TunnelProvenance)> {
980        /// Largest number of "resets" that we will accept in this attempt.
981        ///
982        /// A "reset" is an internally generated error that does not represent a
983        /// real problem; only a "whoops, got to try again" kind of a situation.
984        /// For example, if we reconfigure in the middle of an attempt and need
985        /// to re-launch the circuit, that counts as a "reset", since there was
986        /// nothing actually _wrong_ with the circuit we were building.
987        ///
988        /// We accept more resets than we do real failures. However,
989        /// we don't accept an unlimited number: we don't want to inadvertently
990        /// permit infinite loops here. If we ever bump against this limit, we
991        /// should not automatically increase it: we should instead figure out
992        /// why it is happening and try to make it not happen.
993        const MAX_RESETS: usize = 8;
994
995        let circuit_timing = self.circuit_timing();
996        let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
997        let max_tries = circuit_timing.request_max_retries;
998        // We compute the maximum number of failures by dividing the maximum
999        // number of circuits to attempt by the number that will be launched in
1000        // parallel for each iteration.
1001        let max_failures = usize::div_ceil(
1002            max_tries as usize,
1003            std::cmp::max(1, self.builder.launch_parallelism(usage)),
1004        );
1005
1006        let mut retry_schedule = RetryDelay::from_msec(100);
1007        let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a tunnel");
1008
1009        let mut n_failures = 0;
1010        let mut n_resets = 0;
1011
1012        for attempt_num in 1.. {
1013            // How much time is remaining?
1014            let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
1015                None => {
1016                    retry_err.push_timed(
1017                        Error::RequestTimeout,
1018                        self.runtime.now(),
1019                        Some(self.runtime.wallclock()),
1020                    );
1021                    break;
1022                }
1023                Some(t) => t,
1024            };
1025
1026            let error = match self.prepare_action(usage, dir, true) {
1027                Ok(action) => {
1028                    // We successfully found an action: Take that action.
1029                    let outcome = self
1030                        .runtime
1031                        .timeout(remaining, Arc::clone(self).take_action(action, usage))
1032                        .await;
1033
1034                    match outcome {
1035                        Ok(Ok(circ)) => return Ok(circ),
1036                        Ok(Err(e)) => {
1037                            debug!("Circuit attempt {} failed.", attempt_num);
1038                            Error::RequestFailed(e)
1039                        }
1040                        Err(_) => {
1041                            // We ran out of "remaining" time; there is nothing
1042                            // more to be done.
1043                            warn!("All tunnel attempts failed due to timeout");
1044                            retry_err.push_timed(
1045                                Error::RequestTimeout,
1046                                self.runtime.now(),
1047                                Some(self.runtime.wallclock()),
1048                            );
1049                            break;
1050                        }
1051                    }
1052                }
1053                Err(e) => {
1054                    // We couldn't pick the action!
1055                    debug_report!(
1056                        &e,
1057                        "Couldn't pick action for tunnel attempt {}",
1058                        attempt_num,
1059                    );
1060                    e
1061                }
1062            };
1063
1064            // There's been an error.  See how long we wait before we retry.
1065            let now = self.runtime.now();
1066            let retry_time =
1067                error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
1068
1069            let (count, count_limit) = if error.is_internal_reset() {
1070                (&mut n_resets, MAX_RESETS)
1071            } else {
1072                (&mut n_failures, max_failures)
1073            };
1074            // Record the error, flattening it if needed.
1075            match error {
1076                // Flatten nested RetryError, using mockable time for each error
1077                Error::RequestFailed(e) => {
1078                    retry_err.extend_from_retry_error(e);
1079                }
1080                e => retry_err.push_timed(e, now, Some(self.runtime.wallclock())),
1081            }
1082
1083            *count += 1;
1084            // If we have reached our limit of this kind of problem, we're done.
1085            if *count >= count_limit {
1086                warn!("Reached circuit build retry limit, exiting...");
1087                break;
1088            }
1089
1090            // Wait, or not, as appropriate.
1091            match retry_time {
1092                AbsRetryTime::Immediate => {}
1093                AbsRetryTime::Never => break,
1094                AbsRetryTime::At(t) => {
1095                    let remaining = timeout_at.saturating_duration_since(now);
1096                    let delay = t.saturating_duration_since(now);
1097                    trace!(?delay, "Waiting to retry...");
1098                    self.runtime.sleep(std::cmp::min(delay, remaining)).await;
1099                }
1100            }
1101        }
1102
1103        warn!("Request failed");
1104        Err(Error::RequestFailed(retry_err))
1105    }
1106
1107    /// Make sure a circuit exists, without actually asking for it.
1108    ///
1109    /// Make sure that there is a circuit (built or in-progress) that could be
1110    /// used for `usage`, and launch one or more circuits in a background task
1111    /// if there is not.
1112    // TODO: This should probably take some kind of parallelism parameter.
1113    #[cfg(test)]
1114    pub(crate) fn ensure_tunnel(
1115        self: &Arc<Self>,
1116        usage: &TargetTunnelUsage,
1117        dir: DirInfo<'_>,
1118    ) -> Result<()> {
1119        let action = self.prepare_action(usage, dir, false)?;
1120        if let Action::Build(plans) = action {
1121            for plan in plans {
1122                let self_clone = Arc::clone(self);
1123                let _ignore_receiver = self_clone.spawn_launch(usage, plan);
1124            }
1125        }
1126
1127        Ok(())
1128    }
1129
1130    /// Choose which action we should take in order to provide a tunnel
1131    /// for a given `usage`.
1132    ///
1133    /// If `restrict_circ` is true, we restrict the spec of any
1134    /// circ we decide to use to mark that it _is_ being used for
1135    /// `usage`.
1136    #[instrument(level = "trace", skip_all)]
1137    fn prepare_action(
1138        &self,
1139        usage: &TargetTunnelUsage,
1140        dir: DirInfo<'_>,
1141        restrict_circ: bool,
1142    ) -> Result<Action<B, R>> {
1143        let mut list = self.tunnels.lock().expect("poisoned lock");
1144
1145        if let Some(mut open) = list.find_open(usage) {
1146            // We have open tunnels that meet the spec: return the best one.
1147            let parallelism = self.builder.select_parallelism(usage);
1148            let best = OpenEntry::find_best(&mut open, usage, parallelism);
1149            if restrict_circ {
1150                let now = self.runtime.now();
1151                best.restrict_mut(usage, now)?;
1152            }
1153            // TODO: If we have fewer tunnels here than our select
1154            // parallelism, perhaps we should launch more?
1155
1156            return Ok(Action::Open(best.tunnel.clone()));
1157        }
1158
1159        if let Some(pending) = list.find_pending_tunnels(usage) {
1160            // There are pending tunnels that could meet the spec.
1161            // Restrict them under the assumption that they could all
1162            // be used for this, and then wait until one is ready (or
1163            // all have failed)
1164            let best = PendingEntry::find_best(&pending, usage);
1165            if restrict_circ {
1166                for item in &best {
1167                    // TODO: Do we want to tentatively restrict _all_ of these?
1168                    // not clear to me.
1169                    item.tentative_restrict_mut(usage)?;
1170                }
1171            }
1172            let stream = best.iter().map(|item| item.receiver.clone()).collect();
1173            // TODO: if we have fewer tunnels here than our launch
1174            // parallelism, we might want to launch more.
1175
1176            return Ok(Action::Wait(stream));
1177        }
1178
1179        // Okay, we need to launch tunnels here.
1180        let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
1181        let mut plans = Vec::new();
1182        let mut last_err = None;
1183        for _ in 0..parallelism {
1184            match self.plan_by_usage(dir, usage) {
1185                Ok((pending, plan)) => {
1186                    list.add_pending_tunnel(pending);
1187                    plans.push(plan);
1188                }
1189                Err(e) => {
1190                    debug!("Unable to make a plan for {:?}: {}", usage, e);
1191                    last_err = Some(e);
1192                }
1193            }
1194        }
1195        if !plans.is_empty() {
1196            Ok(Action::Build(plans))
1197        } else if let Some(last_err) = last_err {
1198            Err(last_err)
1199        } else {
1200            // we didn't even try to plan anything!
1201            Err(internal!("no plans were built, but no errors were found").into())
1202        }
1203    }
1204
1205    /// Execute an action returned by pick-action, and return the
1206    /// resulting tunnel or error.
1207    #[allow(clippy::cognitive_complexity, clippy::type_complexity)] // TODO #2010: Refactor
1208    #[instrument(level = "trace", skip_all)]
1209    async fn take_action(
1210        self: Arc<Self>,
1211        act: Action<B, R>,
1212        usage: &TargetTunnelUsage,
1213    ) -> std::result::Result<(Arc<B::Tunnel>, TunnelProvenance), RetryError<Box<Error>>> {
1214        /// Store the error `err` into `retry_err`, as appropriate.
1215        fn record_error<R: Runtime>(
1216            retry_err: &mut RetryError<Box<Error>>,
1217            source: streams::Source,
1218            building: bool,
1219            mut err: Error,
1220            runtime: &R,
1221        ) {
1222            if source == streams::Source::Right {
1223                // We don't care about this error, since it is from neither a tunnel we launched
1224                // nor one that we're waiting on.
1225                return;
1226            }
1227            if !building {
1228                // We aren't building our own tunnels, so our errors are
1229                // secondary reports of other tunnels' failures.
1230                err = Error::PendingFailed(Box::new(err));
1231            }
1232            retry_err.push_timed(err, runtime.now(), Some(runtime.wallclock()));
1233        }
1234        /// Return a string describing what it means, within the context of this
1235        /// function, to have gotten an answer from `source`.
1236        fn describe_source(building: bool, source: streams::Source) -> &'static str {
1237            match (building, source) {
1238                (_, streams::Source::Right) => "optimistic advice",
1239                (true, streams::Source::Left) => "tunnel we're building",
1240                (false, streams::Source::Left) => "pending tunnel",
1241            }
1242        }
1243
1244        // Get or make a stream of futures to wait on.
1245        let (building, wait_on_stream) = match act {
1246            Action::Open(c) => {
1247                // There's already a perfectly good open tunnel; we can return
1248                // it now.
1249                trace!("Returning existing tunnel.");
1250                return Ok((c, TunnelProvenance::Preexisting));
1251            }
1252            Action::Wait(f) => {
1253                // There is one or more pending tunnel that we're waiting for.
1254                // If any succeeds, we try to use it.  If they all fail, we
1255                // fail.
1256                trace!("Waiting for tunnel.");
1257                (false, f)
1258            }
1259            Action::Build(plans) => {
1260                // We're going to launch one or more tunnels in parallel.  We
1261                // report success if any succeeds, and failure of they all fail.
1262                trace!("Building new tunnel.");
1263                let futures = FuturesUnordered::new();
1264                for plan in plans {
1265                    let self_clone = Arc::clone(&self);
1266                    // (This is where we actually launch tunnels.)
1267                    futures.push(self_clone.spawn_launch(usage, plan));
1268                }
1269                (true, futures)
1270            }
1271        };
1272
1273        // Insert ourself into the list of pending requests, and make a
1274        // stream for us to listen on for notification from pending tunnels
1275        // other than those we are pending on.
1276        let (pending_request, additional_stream) = {
1277            // We don't want this queue to participate in memory quota tracking.
1278            // There isn't any tunnel yet, so there wouldn't be anything to account it to.
1279            // If this queue has the oldest data, probably the whole system is badly broken.
1280            // Tearing down the whole tunnel manager won't help.
1281            let (send, recv) = mpsc_channel_no_memquota(8);
1282            let pending = Arc::new(PendingRequest {
1283                usage: usage.clone(),
1284                notify: send,
1285            });
1286
1287            let mut list = self.tunnels.lock().expect("poisoned lock");
1288            list.add_pending_request(&pending);
1289
1290            (pending, recv)
1291        };
1292
1293        // We use our "select_biased" stream combiner here to ensure that:
1294        //   1) Circuits from wait_on_stream (the ones we're pending on) are
1295        //      preferred.
1296        //   2) We exit this function when those tunnels are exhausted.
1297        //   3) We still get notified about other tunnels that might meet our
1298        //      interests.
1299        //
1300        // The events from Left stream are the oes that we explicitly asked for,
1301        // so we'll treat errors there as real problems.  The events from the
1302        // Right stream are ones that we got opportunistically told about; it's
1303        // not a big deal if those fail.
1304        let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
1305
1306        let mut retry_error = RetryError::in_attempt_to("wait for tunnels");
1307
1308        while let Some((src, id)) = incoming.next().await {
1309            match id {
1310                Ok(Ok(ref id)) => {
1311                    // Great, we have a tunnel . See if we can use it!
1312                    let mut list = self.tunnels.lock().expect("poisoned lock");
1313                    if let Some(ent) = list.get_open_mut(id) {
1314                        let now = self.runtime.now();
1315                        match ent.restrict_mut(usage, now) {
1316                            Ok(()) => {
1317                                // Great, this will work.  We drop the
1318                                // pending request now explicitly to remove
1319                                // it from the list.
1320                                drop(pending_request);
1321                                if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
1322                                    let try_to_expire_after = if ent.spec.is_long_lived() {
1323                                        self.circuit_timing().disused_circuit_timeout
1324                                    } else {
1325                                        self.circuit_timing().max_dirtiness
1326                                    };
1327                                    // Since this tunnel hasn't been used yet, schedule expiration
1328                                    // task after `max_dirtiness` from now.
1329                                    spawn_expiration_task(
1330                                        &self.runtime,
1331                                        Arc::downgrade(&self),
1332                                        ent.tunnel.id(),
1333                                        now + try_to_expire_after,
1334                                    );
1335                                }
1336                                return Ok((ent.tunnel.clone(), TunnelProvenance::NewlyCreated));
1337                            }
1338                            Err(e) => {
1339                                // In this case, a `UsageMismatched` error just means that we lost the race
1340                                // to restrict this tunnel.
1341                                let e = match e {
1342                                    Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
1343                                    x => x,
1344                                };
1345                                if src == streams::Source::Left {
1346                                    info_report!(
1347                                        &e,
1348                                        "{} suggested we use {:?}, but restrictions failed",
1349                                        describe_source(building, src),
1350                                        id,
1351                                    );
1352                                } else {
1353                                    debug_report!(
1354                                        &e,
1355                                        "{} suggested we use {:?}, but restrictions failed",
1356                                        describe_source(building, src),
1357                                        id,
1358                                    );
1359                                }
1360                                record_error(&mut retry_error, src, building, e, &self.runtime);
1361                                continue;
1362                            }
1363                        }
1364                    }
1365                }
1366                Ok(Err(ref e)) => {
1367                    debug!("{} sent error {:?}", describe_source(building, src), e);
1368                    record_error(&mut retry_error, src, building, e.clone(), &self.runtime);
1369                }
1370                Err(oneshot::Canceled) => {
1371                    debug!(
1372                        "{} went away (Canceled), quitting take_action right away",
1373                        describe_source(building, src)
1374                    );
1375                    record_error(
1376                        &mut retry_error,
1377                        src,
1378                        building,
1379                        Error::PendingCanceled,
1380                        &self.runtime,
1381                    );
1382                    return Err(retry_error);
1383                }
1384            }
1385
1386            debug!(
1387                "While waiting on tunnel: {:?} from {}",
1388                id,
1389                describe_source(building, src)
1390            );
1391        }
1392
1393        // Nothing worked.  We drop the pending request now explicitly
1394        // to remove it from the list.  (We could just let it get dropped
1395        // implicitly, but that's a bit confusing.)
1396        drop(pending_request);
1397
1398        Err(retry_error)
1399    }
1400
1401    /// Given a directory and usage, compute the necessary objects to
1402    /// build a tunnel: A [`PendingEntry`] to keep track of the in-process
1403    /// tunnel, and a [`TunnelBuildPlan`] that we'll give to the thread
1404    /// that will build the tunnel.
1405    ///
1406    /// The caller should probably add the resulting `PendingEntry` to
1407    /// `self.circs`.
1408    ///
1409    /// This is an internal function that we call when we're pretty sure
1410    /// we want to build a tunnel.
1411    #[allow(clippy::type_complexity)]
1412    fn plan_by_usage(
1413        &self,
1414        dir: DirInfo<'_>,
1415        usage: &TargetTunnelUsage,
1416    ) -> Result<(Arc<PendingEntry<B, R>>, TunnelBuildPlan<B, R>)> {
1417        let (plan, bspec) = self.builder.plan_tunnel(usage, dir)?;
1418        let (pending, sender) = PendingEntry::new(&bspec);
1419        let pending = Arc::new(pending);
1420
1421        let plan = TunnelBuildPlan {
1422            plan,
1423            sender,
1424            pending: Arc::clone(&pending),
1425        };
1426
1427        Ok((pending, plan))
1428    }
1429
1430    /// Launch a managed tunnel for a target usage, without checking
1431    /// whether one already exists or is pending.
1432    ///
1433    /// Return a listener that will be informed when the tunnel is done.
1434    #[instrument(level = "trace", skip_all)]
1435    pub(crate) fn launch_by_usage(
1436        self: &Arc<Self>,
1437        usage: &TargetTunnelUsage,
1438        dir: DirInfo<'_>,
1439    ) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
1440        let (pending, plan) = self.plan_by_usage(dir, usage)?;
1441
1442        self.tunnels
1443            .lock()
1444            .expect("Poisoned lock for tunnel list")
1445            .add_pending_tunnel(pending);
1446
1447        Ok(Arc::clone(self).spawn_launch(usage, plan))
1448    }
1449
1450    /// Spawn a background task to launch a tunnel, and report its status.
1451    ///
1452    /// The `usage` argument is the usage from the original request that made
1453    /// us build this tunnel.
1454    #[instrument(level = "trace", skip_all)]
1455    fn spawn_launch(
1456        self: Arc<Self>,
1457        usage: &TargetTunnelUsage,
1458        plan: TunnelBuildPlan<B, R>,
1459    ) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
1460        let _ = usage; // Currently unused.
1461        let TunnelBuildPlan {
1462            mut plan,
1463            sender,
1464            pending,
1465        } = plan;
1466        let request_loyalty = self.circuit_timing().request_loyalty;
1467
1468        let wait_on_future = pending.receiver.clone();
1469        let runtime = self.runtime.clone();
1470        let runtime_copy = self.runtime.clone();
1471
1472        let tid = rand::random::<u64>();
1473        // We release this block when the tunnel builder task terminates.
1474        let reason = format!("tunnel builder task {}", tid);
1475        runtime.block_advance(reason.clone());
1476        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
1477        // correctly.
1478        plan.add_blocked_advance_reason(reason);
1479
1480        runtime
1481            .spawn(async move {
1482                let self_clone = Arc::clone(&self);
1483                let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
1484                let (new_spec, reply) = match future.await {
1485                    Ok(x) => x, // Success or regular failure
1486                    Err(e) => {
1487                        // Okay, this is a panic.  We have to tell the calling
1488                        // thread about it, then exit this tunnel builder task.
1489                        let _ = sender.send(Err(internal!("tunnel build task panicked").into()));
1490                        std::panic::panic_any(e);
1491                    }
1492                };
1493
1494                // Tell anybody who was listening about it that this
1495                // tunnel is now usable or failed.
1496                //
1497                // (We ignore any errors from `send`: That just means that nobody
1498                // was waiting for this tunnel.)
1499                let _ = sender.send(reply.clone());
1500
1501                if let Some(new_spec) = new_spec {
1502                    // Wait briefly before we notify opportunistically.  This
1503                    // delay will give the tunnels that were originally
1504                    // specifically intended for a request a little more time
1505                    // to finish, before we offer it this tunnel instead.
1506                    let sl = runtime_copy.sleep(request_loyalty);
1507                    runtime_copy.allow_one_advance(request_loyalty);
1508                    sl.await;
1509
1510                    let pending = {
1511                        let list = self.tunnels.lock().expect("poisoned lock");
1512                        list.find_pending_requests(&new_spec)
1513                    };
1514                    for pending_request in pending {
1515                        let _ = pending_request.notify.clone().try_send(reply.clone());
1516                    }
1517                }
1518                runtime_copy.release_advance(format!("tunnel builder task {}", tid));
1519            })
1520            .expect("Couldn't spawn tunnel-building task");
1521
1522        wait_on_future
1523    }
1524
1525    /// Run in the background to launch a tunnel. Return a 2-tuple of the new
1526    /// tunnel spec and the outcome that should be sent to the initiator.
1527    #[instrument(level = "trace", skip_all)]
1528    async fn do_launch(
1529        self: Arc<Self>,
1530        plan: <B as AbstractTunnelBuilder<R>>::Plan,
1531        pending: Arc<PendingEntry<B, R>>,
1532    ) -> (Option<SupportedTunnelUsage>, PendResult<B, R>) {
1533        let outcome = self.builder.build_tunnel(plan).await;
1534
1535        match outcome {
1536            Err(e) => (None, Err(e)),
1537            Ok((new_spec, tunnel)) => {
1538                let id = tunnel.id();
1539
1540                let use_duration = self.pick_use_duration();
1541                let now = self.runtime.now();
1542                let exp_inst = now + use_duration;
1543                let runtime_copy = self.runtime.clone();
1544                spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), tunnel.id(), exp_inst);
1545                // I used to call restrict_mut here, but now I'm not so
1546                // sure. Doing restrict_mut makes sure that this
1547                // tunnel will be suitable for the request that asked
1548                // for us in the first place, but that should be
1549                // ensured anyway by our tracking its tentative
1550                // assignment.
1551                //
1552                // new_spec.restrict_mut(&usage_copy).unwrap();
1553                let use_before = ExpirationInfo::new(now);
1554                let open_ent = OpenEntry::new(new_spec.clone(), tunnel, use_before);
1555                {
1556                    let mut list = self.tunnels.lock().expect("poisoned lock");
1557                    // Finally, before we return this tunnel, we need to make
1558                    // sure that this pending tunnel is still pending.  (If it
1559                    // is not pending, then it was cancelled through a call to
1560                    // `retire_all_tunnels`, and the configuration that we used
1561                    // to launch it is now sufficiently outdated that we should
1562                    // no longer give this tunnel to a client.)
1563                    if list.tunnel_is_pending(&pending) {
1564                        list.add_open(open_ent);
1565                        // We drop our reference to 'pending' here:
1566                        // this should make all the weak references to
1567                        // the `PendingEntry` become dangling.
1568                        drop(pending);
1569                        (Some(new_spec), Ok(id))
1570                    } else {
1571                        // This tunnel is no longer pending! It must have been cancelled, probably
1572                        // by a call to retire_all_tunnels()
1573                        drop(pending); // ibid
1574                        (None, Err(Error::CircCanceled))
1575                    }
1576                }
1577            }
1578        }
1579    }
1580
1581    /// Return the currently configured expiration parameters.
1582    fn expiration_params(&self) -> ExpirationParameters {
1583        let expire_unused_after = self.pick_use_duration();
1584        let expire_dirty_after = self.circuit_timing().max_dirtiness;
1585        let expire_disused_after = self.circuit_timing().disused_circuit_timeout;
1586
1587        ExpirationParameters {
1588            expire_unused_after,
1589            expire_dirty_after,
1590            expire_disused_after,
1591        }
1592    }
1593
1594    /// Plan and launch a new tunnel to a given target, bypassing our managed
1595    /// pool of tunnels.
1596    ///
1597    /// This method will always return a new tunnel, and never return a tunnel
1598    /// that this CircMgr gives out for anything else.
1599    ///
1600    /// The new tunnel will participate in the guard and timeout apparatus as
1601    /// appropriate, no retry attempt will be made if the tunnel fails.
1602    #[cfg(feature = "hs-common")]
1603    #[instrument(level = "trace", skip_all)]
1604    pub(crate) async fn launch_unmanaged(
1605        &self,
1606        usage: &TargetTunnelUsage,
1607        dir: DirInfo<'_>,
1608    ) -> Result<(SupportedTunnelUsage, B::Tunnel)> {
1609        let (_, plan) = self.plan_by_usage(dir, usage)?;
1610        self.builder.build_tunnel(plan.plan).await
1611    }
1612
1613    /// Remove the tunnel with a given `id` from this manager.
1614    ///
1615    /// After this function is called, that tunnel will no longer be handed
1616    /// out to any future requests.
1617    ///
1618    /// Return None if we have no tunnel with the given ID.
1619    pub(crate) fn take_tunnel(
1620        &self,
1621        id: &<B::Tunnel as AbstractTunnel>::Id,
1622    ) -> Option<Arc<B::Tunnel>> {
1623        let mut list = self.tunnels.lock().expect("poisoned lock");
1624        list.take_open(id).map(|e| e.tunnel)
1625    }
1626
1627    /// Remove all open and pending tunnels and from this manager, to ensure
1628    /// they can't be given out for any more requests.
1629    ///
1630    /// Calling `retire_all_tunnels` ensures that any tunnel request that gets
1631    /// an  answer _after this method runs_ will receive a tunnel that was
1632    /// launched _after this method runs_.
1633    ///
1634    /// We call this method this when our configuration changes in such a way
1635    /// that we want to make sure that any new (or pending) requests will
1636    /// receive tunnels that are built using the new configuration.
1637    //
1638    // For more information, see documentation on [`CircuitList::open_circs`],
1639    // [`CircuitList::pending_circs`], and comments in `do_launch`.
1640    pub(crate) fn retire_all_tunnels(&self) {
1641        let mut list = self.tunnels.lock().expect("poisoned lock");
1642        list.clear_all_tunnels();
1643    }
1644
1645    /// Expire tunnels according to the rules in `config` and the
1646    /// current time `now`.
1647    ///
1648    /// Expired tunnels will not be automatically closed, but they will
1649    /// no longer be given out for new tunnels.
1650    ///
1651    /// Return the earliest time at which any current tunnel will expire.
1652    pub(crate) async fn expire_tunnels(&self, now: Instant) -> Option<Instant> {
1653        let expiration_params = self.expiration_params();
1654
1655        // While holding the lock, we call TunnelList::expire_tunnels.
1656        // That function will expire what it can, and return a list of the tunnels for which
1657        // we need to call `disused_since`.
1658        let (mut earliest_expiration, need_to_check) = {
1659            let mut list = self.tunnels.lock().expect("poisoned lock");
1660            list.expire_tunnels(now, &expiration_params)
1661        };
1662
1663        // Now we've dropped the lock, and can do async checks.
1664        let mut last_known_usage = Vec::new();
1665        for tunnel in need_to_check {
1666            let Some(tunnel) = Weak::upgrade(&tunnel) else {
1667                continue; // The tunnel is already gone.
1668            };
1669            last_known_usage.push((tunnel.id(), tunnel.last_known_to_be_used_at().await));
1670        }
1671
1672        // Now get the lock again, and tell the list what we learned.
1673        //
1674        // Note that if this function is called twice simultaneously, in some corner cases, we might
1675        // decide to expire something twice.  That's okay.
1676        {
1677            let mut list = self.tunnels.lock().expect("poisoned lock");
1678            for (id, disused_since) in last_known_usage {
1679                match list.update_long_lived_tunnel_last_used(
1680                    &id,
1681                    now,
1682                    &expiration_params,
1683                    &disused_since,
1684                ) {
1685                    Ok(Some(may_expire)) => {
1686                        earliest_expiration = match earliest_expiration {
1687                            Some(exp) if exp < may_expire => Some(exp),
1688                            _ => Some(may_expire),
1689                        };
1690                    }
1691                    Ok(None) => {}
1692                    Err(e) => warn_report!(e, "Error while updating status on tunnel {:?}", id),
1693                }
1694            }
1695        }
1696
1697        earliest_expiration
1698    }
1699
1700    /// Consider expiring the tunnel with given tunnel `id`,
1701    /// according to the rules in `config` and the current time `now`.
1702    ///
1703    /// Returns None if the circuit is expired; otherwise returns the next time at which the circuit may expire.
1704    pub(crate) async fn consider_expiring_tunnel(
1705        &self,
1706        tun_id: &<B::Tunnel as AbstractTunnel>::Id,
1707        now: Instant,
1708    ) -> Result<Option<Instant>> {
1709        let expiration_params = self.expiration_params();
1710
1711        // With the lock, call TunneList::tunnel_should_expire, and expire it (or don't)
1712        // if the decision is obvious.
1713        let tunnel = {
1714            let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
1715                self.tunnels.lock().expect("poisoned lock");
1716            let Some(should_expire) = list.tunnel_should_expire(tun_id, now, &expiration_params)
1717            else {
1718                return Ok(None);
1719            };
1720            match should_expire {
1721                ShouldExpire::Now => {
1722                    let _discard = list.take_open(tun_id);
1723                    return Ok(None);
1724                }
1725                ShouldExpire::NotBefore(t) => return Ok(Some(t)),
1726                ShouldExpire::PossiblyNow => {
1727                    let Some(tunnel_ent) = list.get_open_mut(tun_id) else {
1728                        return Ok(None);
1729                    };
1730                    Arc::clone(&tunnel_ent.tunnel)
1731                }
1732            }
1733        };
1734
1735        // If we get here, then we have a long-lived tunnel for which we need to check `disused_since`
1736        let last_known_in_use_at = tunnel.last_known_to_be_used_at().await;
1737
1738        // Now we tell the TunnelList what we learned.
1739        {
1740            let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
1741                self.tunnels.lock().expect("poisoned lock");
1742            list.update_long_lived_tunnel_last_used(
1743                tun_id,
1744                now,
1745                &expiration_params,
1746                &last_known_in_use_at,
1747            )
1748        }
1749    }
1750
1751    /// Return the number of open tunnels held by this tunnel manager.
1752    pub(crate) fn n_tunnels(&self) -> usize {
1753        let list = self.tunnels.lock().expect("poisoned lock");
1754        list.open_tunnels.len()
1755    }
1756
1757    /// Return the number of pending tunnels tracked by this tunnel manager.
1758    #[cfg(test)]
1759    pub(crate) fn n_pending_tunnels(&self) -> usize {
1760        let list = self.tunnels.lock().expect("poisoned lock");
1761        list.pending_tunnels.len()
1762    }
1763
1764    /// Get a reference to this manager's runtime.
1765    pub(crate) fn peek_runtime(&self) -> &R {
1766        &self.runtime
1767    }
1768
1769    /// Get a reference to this manager's builder.
1770    pub(crate) fn peek_builder(&self) -> &B {
1771        &self.builder
1772    }
1773
1774    /// Pick a duration by when a new tunnel should expire from now
1775    /// if it has not yet been used
1776    fn pick_use_duration(&self) -> Duration {
1777        let timings = self
1778            .unused_timing
1779            .lock()
1780            .expect("Poisoned lock for unused_timing");
1781
1782        if self.builder.learning_timeouts() {
1783            timings.learning
1784        } else {
1785            // TODO: In Tor, this calculation also depends on
1786            // stuff related to predicted ports and channel
1787            // padding.
1788            use tor_basic_utils::RngExt as _;
1789            let mut rng = rand::rng();
1790            rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
1791                .expect("T .. 2x T turned out to be an empty duration range?!")
1792        }
1793    }
1794}
1795
1796/// Spawn an expiration task that expires a tunnel at given instant.
1797///
1798/// When the timeout occurs, if the tunnel manager is still present,
1799/// the task will ask the manager to expire the tunnel, if the tunnel
1800/// is ready to expire.
1801//
1802// TODO: It would be good to do away with this function entirely, and have a smarter expiration
1803// function.  This one only exists because there is not an "expire some circuits" background task.
1804fn spawn_expiration_task<B, R>(
1805    runtime: &R,
1806    circmgr: Weak<AbstractTunnelMgr<B, R>>,
1807    circ_id: <<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id,
1808    exp_inst: Instant,
1809) where
1810    R: Runtime,
1811    B: 'static + AbstractTunnelBuilder<R>,
1812{
1813    let now = runtime.now();
1814    let rt_copy = runtime.clone();
1815    let mut duration = exp_inst.saturating_duration_since(now);
1816
1817    // NOTE: Once there was an optimization here that ran the expiration immediately if
1818    // `duration` was zero.
1819    // I discarded that optimization when I made `consider_expiring_tunnel` async,
1820    // since we really want this function _not_ to be async,
1821    // because we run it in contexts where we hold a Mutex on the tunnel list.
1822
1823    // Spawn a timer expiration task with given expiration instant.
1824    if let Err(e) = runtime.spawn(async move {
1825        loop {
1826            rt_copy.sleep(duration).await;
1827            let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1828                cm
1829            } else {
1830                return;
1831            };
1832            match cm.consider_expiring_tunnel(&circ_id, exp_inst).await {
1833                Ok(None) => return,
1834                Ok(Some(when)) => {
1835                    duration = when.saturating_duration_since(rt_copy.now());
1836                }
1837                Err(e) => {
1838                    warn_report!(
1839                        e,
1840                        "Error while considering expiration for tunnel {:?}",
1841                        circ_id
1842                    );
1843                    return;
1844                }
1845            }
1846        }
1847    }) {
1848        warn_report!(e, "Unable to launch expiration task");
1849    }
1850}
1851
1852#[cfg(test)]
1853mod test {
1854    // @@ begin test lint list maintained by maint/add_warning @@
1855    #![allow(clippy::bool_assert_comparison)]
1856    #![allow(clippy::clone_on_copy)]
1857    #![allow(clippy::dbg_macro)]
1858    #![allow(clippy::mixed_attributes_style)]
1859    #![allow(clippy::print_stderr)]
1860    #![allow(clippy::print_stdout)]
1861    #![allow(clippy::single_char_pattern)]
1862    #![allow(clippy::unwrap_used)]
1863    #![allow(clippy::unchecked_time_subtraction)]
1864    #![allow(clippy::useless_vec)]
1865    #![allow(clippy::needless_pass_by_value)]
1866    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1867    use super::*;
1868    use crate::isolation::test::{IsolationTokenEq, assert_isoleq};
1869    use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
1870    use crate::usage::{ExitPolicy, SupportedTunnelUsage};
1871    use crate::{
1872        Error, IsolationToken, StreamIsolation, TargetPort, TargetPorts, TargetTunnelUsage,
1873    };
1874    use std::sync::LazyLock;
1875    use tor_dircommon::fallback::FallbackList;
1876    use tor_guardmgr::TestConfig;
1877    use tor_llcrypto::pk::ed25519::Ed25519Identity;
1878    use tor_netdir::testnet;
1879    use tor_persist::TestingStateMgr;
1880    use tor_rtcompat::SleepProvider;
1881    use tor_rtmock::MockRuntime;
1882    use web_time_compat::InstantExt;
1883
1884    #[allow(deprecated)] // TODO #1885
1885    use tor_rtmock::MockSleepRuntime;
1886
1887    static FALLBACKS_EMPTY: LazyLock<FallbackList> = LazyLock::new(|| [].into());
1888
1889    fn di() -> DirInfo<'static> {
1890        (&*FALLBACKS_EMPTY).into()
1891    }
1892
1893    fn target_to_spec(target: &TargetTunnelUsage) -> SupportedTunnelUsage {
1894        match target {
1895            TargetTunnelUsage::Exit {
1896                ports,
1897                isolation,
1898                country_code,
1899                require_stability,
1900            } => SupportedTunnelUsage::Exit {
1901                policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
1902                isolation: Some(isolation.clone()),
1903                country_code: country_code.clone(),
1904                all_relays_stable: *require_stability,
1905            },
1906            _ => unimplemented!(),
1907        }
1908    }
1909
1910    impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
1911        fn isol_eq(&self, other: &Self) -> bool {
1912            self.spec.isol_eq(&other.spec)
1913                && self.tunnel == other.tunnel
1914                && self.expiration == other.expiration
1915        }
1916    }
1917
1918    impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
1919        fn isol_eq(&self, other: &Self) -> bool {
1920            self.spec.isol_eq(&other.spec)
1921                && self.tunnel == other.tunnel
1922                && self.expiration == other.expiration
1923        }
1924    }
1925
1926    fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
1927        let state_mgr = TestingStateMgr::new();
1928        let guard_config = TestConfig::default();
1929        FakeBuilder::new(runtime, state_mgr, &guard_config)
1930    }
1931
1932    #[test]
1933    fn basic_tests() {
1934        MockRuntime::test_with_various(|rt| async move {
1935            #[allow(deprecated)] // TODO #1885
1936            let rt = MockSleepRuntime::new(rt);
1937
1938            let builder = make_builder(&rt);
1939
1940            let mgr = Arc::new(AbstractTunnelMgr::new(
1941                builder,
1942                rt.clone(),
1943                CircuitTiming::default(),
1944            ));
1945
1946            let webports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
1947
1948            // Check initialization.
1949            assert_eq!(mgr.n_tunnels(), 0);
1950            assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
1951
1952            // Launch a tunnel ; make sure we get it.
1953            let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
1954            let c1 = c1.unwrap().0;
1955            assert_eq!(mgr.n_tunnels(), 1);
1956
1957            // Make sure we get the one we already made if we ask for it.
1958            let port80 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
1959            let c2 = mgr.get_or_launch(&port80, di()).await;
1960
1961            let c2 = c2.unwrap().0;
1962            assert!(FakeCirc::eq(&c1, &c2));
1963            assert_eq!(mgr.n_tunnels(), 1);
1964
1965            // Now try launching two tunnels "at once" to make sure that our
1966            // pending-tunnel code works.
1967
1968            let dnsport = TargetTunnelUsage::new_from_ipv4_ports(&[53]);
1969            let dnsport_restrict = TargetTunnelUsage::Exit {
1970                ports: vec![TargetPort::ipv4(53)],
1971                isolation: StreamIsolation::builder().build().unwrap(),
1972                country_code: None,
1973                require_stability: false,
1974            };
1975
1976            let (c3, c4) = rt
1977                .wait_for(futures::future::join(
1978                    mgr.get_or_launch(&dnsport, di()),
1979                    mgr.get_or_launch(&dnsport_restrict, di()),
1980                ))
1981                .await;
1982
1983            let c3 = c3.unwrap().0;
1984            let c4 = c4.unwrap().0;
1985            assert!(!FakeCirc::eq(&c1, &c3));
1986            assert!(FakeCirc::eq(&c3, &c4));
1987            assert_eq!(c3.id(), c4.id());
1988            assert_eq!(mgr.n_tunnels(), 2);
1989
1990            // Now we're going to remove c3 from consideration.  It's the
1991            // same as c4, so removing c4 will give us None.
1992            let c3_taken = mgr.take_tunnel(&c3.id()).unwrap();
1993            let now_its_gone = mgr.take_tunnel(&c4.id());
1994            assert!(FakeCirc::eq(&c3_taken, &c3));
1995            assert!(now_its_gone.is_none());
1996            assert_eq!(mgr.n_tunnels(), 1);
1997
1998            // Having removed them, let's launch another dnsport and make
1999            // sure we get a different tunnel.
2000            let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
2001            let c5 = c5.unwrap().0;
2002            assert!(!FakeCirc::eq(&c3, &c5));
2003            assert!(!FakeCirc::eq(&c4, &c5));
2004            assert_eq!(mgr.n_tunnels(), 2);
2005
2006            // Now try launch_by_usage.
2007            let prev = mgr.n_pending_tunnels();
2008            assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
2009            assert_eq!(mgr.n_pending_tunnels(), prev + 1);
2010            // TODO: Actually make sure that launch_by_usage launched
2011            // the right thing.
2012        });
2013    }
2014
2015    #[test]
2016    fn request_timeout() {
2017        MockRuntime::test_with_various(|rt| async move {
2018            #[allow(deprecated)] // TODO #1885
2019            let rt = MockSleepRuntime::new(rt);
2020
2021            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2022
2023            // This will fail once, and then completely time out.  The
2024            // result will be a failure.
2025            let builder = make_builder(&rt);
2026            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
2027
2028            let mgr = Arc::new(AbstractTunnelMgr::new(
2029                builder,
2030                rt.clone(),
2031                CircuitTiming::default(),
2032            ));
2033            let c1 = mgr
2034                .peek_runtime()
2035                .wait_for(mgr.get_or_launch(&ports, di()))
2036                .await;
2037
2038            assert!(matches!(c1, Err(Error::RequestFailed(_))));
2039        });
2040    }
2041
2042    #[test]
2043    fn request_timeout2() {
2044        MockRuntime::test_with_various(|rt| async move {
2045            #[allow(deprecated)] // TODO #1885
2046            let rt = MockSleepRuntime::new(rt);
2047
2048            // Now try a more complicated case: we'll try to get things so
2049            // that we wait for a little over our predicted time because
2050            // of our wait-for-next-action logic.
2051            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2052            let builder = make_builder(&rt);
2053            builder.set(
2054                &ports,
2055                vec![
2056                    FakeOp::Delay(Duration::from_millis(60_000 - 25)),
2057                    FakeOp::NoPlan,
2058                ],
2059            );
2060
2061            let mgr = Arc::new(AbstractTunnelMgr::new(
2062                builder,
2063                rt.clone(),
2064                CircuitTiming::default(),
2065            ));
2066            let c1 = mgr
2067                .peek_runtime()
2068                .wait_for(mgr.get_or_launch(&ports, di()))
2069                .await;
2070
2071            assert!(matches!(c1, Err(Error::RequestFailed(_))));
2072        });
2073    }
2074
2075    #[test]
2076    fn request_unplannable() {
2077        MockRuntime::test_with_various(|rt| async move {
2078            #[allow(deprecated)] // TODO #1885
2079            let rt = MockSleepRuntime::new(rt);
2080
2081            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2082
2083            // This will fail a the planning stages, a lot.
2084            let builder = make_builder(&rt);
2085            builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
2086
2087            let mgr = Arc::new(AbstractTunnelMgr::new(
2088                builder,
2089                rt.clone(),
2090                CircuitTiming::default(),
2091            ));
2092            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2093
2094            assert!(matches!(c1, Err(Error::RequestFailed(_))));
2095        });
2096    }
2097
2098    #[test]
2099    fn request_fails_too_much() {
2100        MockRuntime::test_with_various(|rt| async move {
2101            #[allow(deprecated)] // TODO #1885
2102            let rt = MockSleepRuntime::new(rt);
2103            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2104
2105            // This will fail 1000 times, which is above the retry limit.
2106            let builder = make_builder(&rt);
2107            builder.set(&ports, vec![FakeOp::Fail; 1000]);
2108
2109            let mgr = Arc::new(AbstractTunnelMgr::new(
2110                builder,
2111                rt.clone(),
2112                CircuitTiming::default(),
2113            ));
2114            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2115
2116            assert!(matches!(c1, Err(Error::RequestFailed(_))));
2117        });
2118    }
2119
2120    #[test]
2121    fn request_wrong_spec() {
2122        MockRuntime::test_with_various(|rt| async move {
2123            #[allow(deprecated)] // TODO #1885
2124            let rt = MockSleepRuntime::new(rt);
2125            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2126
2127            // The first time this is called, it will build a tunnel
2128            // with the wrong spec.  (A tunnel builder should never
2129            // actually _do_ that, but it's something we code for.)
2130            let builder = make_builder(&rt);
2131            builder.set(
2132                &ports,
2133                vec![FakeOp::WrongSpec(target_to_spec(
2134                    &TargetTunnelUsage::new_from_ipv4_ports(&[22]),
2135                ))],
2136            );
2137
2138            let mgr = Arc::new(AbstractTunnelMgr::new(
2139                builder,
2140                rt.clone(),
2141                CircuitTiming::default(),
2142            ));
2143            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
2144
2145            assert!(c1.is_ok());
2146        });
2147    }
2148
2149    #[test]
2150    fn request_retried() {
2151        MockRuntime::test_with_various(|rt| async move {
2152            #[allow(deprecated)] // TODO #1885
2153            let rt = MockSleepRuntime::new(rt);
2154            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2155
2156            // This will fail twice, and then succeed. The result will be
2157            // a success.
2158            let builder = make_builder(&rt);
2159            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
2160
2161            let mgr = Arc::new(AbstractTunnelMgr::new(
2162                builder,
2163                rt.clone(),
2164                CircuitTiming::default(),
2165            ));
2166
2167            // This test doesn't exercise any timeout behaviour.
2168            rt.block_advance("test doesn't require advancing");
2169
2170            let (c1, c2) = rt
2171                .wait_for(futures::future::join(
2172                    mgr.get_or_launch(&ports, di()),
2173                    mgr.get_or_launch(&ports, di()),
2174                ))
2175                .await;
2176
2177            let c1 = c1.unwrap().0;
2178            let c2 = c2.unwrap().0;
2179
2180            assert!(FakeCirc::eq(&c1, &c2));
2181        });
2182    }
2183
2184    #[test]
2185    fn isolated() {
2186        MockRuntime::test_with_various(|rt| async move {
2187            #[allow(deprecated)] // TODO #1885
2188            let rt = MockSleepRuntime::new(rt);
2189            let builder = make_builder(&rt);
2190            let mgr = Arc::new(AbstractTunnelMgr::new(
2191                builder,
2192                rt.clone(),
2193                CircuitTiming::default(),
2194            ));
2195
2196            // Set our isolation so that iso1 and iso2 can't share a tunnel,
2197            // but no_iso can share a tunnel with either.
2198            let iso1 = TargetTunnelUsage::Exit {
2199                ports: vec![TargetPort::ipv4(443)],
2200                isolation: StreamIsolation::builder()
2201                    .owner_token(IsolationToken::new())
2202                    .build()
2203                    .unwrap(),
2204                country_code: None,
2205                require_stability: false,
2206            };
2207            let iso2 = TargetTunnelUsage::Exit {
2208                ports: vec![TargetPort::ipv4(443)],
2209                isolation: StreamIsolation::builder()
2210                    .owner_token(IsolationToken::new())
2211                    .build()
2212                    .unwrap(),
2213                country_code: None,
2214                require_stability: false,
2215            };
2216            let no_iso1 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
2217            let no_iso2 = no_iso1.clone();
2218
2219            // We're going to try launching these tunnels in 24 different
2220            // orders, to make sure that the outcome is correct each time.
2221            use itertools::Itertools;
2222            let timeouts: Vec<_> = [0_u64, 2, 4, 6]
2223                .iter()
2224                .map(|d| Duration::from_millis(*d))
2225                .collect();
2226
2227            for delays in timeouts.iter().permutations(4) {
2228                let d1 = delays[0];
2229                let d2 = delays[1];
2230                let d3 = delays[2];
2231                let d4 = delays[2];
2232                let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
2233                    .wait_for(futures::future::join4(
2234                        async {
2235                            rt.sleep(*d1).await;
2236                            mgr.get_or_launch(&iso1, di()).await
2237                        },
2238                        async {
2239                            rt.sleep(*d2).await;
2240                            mgr.get_or_launch(&iso2, di()).await
2241                        },
2242                        async {
2243                            rt.sleep(*d3).await;
2244                            mgr.get_or_launch(&no_iso1, di()).await
2245                        },
2246                        async {
2247                            rt.sleep(*d4).await;
2248                            mgr.get_or_launch(&no_iso2, di()).await
2249                        },
2250                    ))
2251                    .await;
2252
2253                let c_iso1 = c_iso1.unwrap().0;
2254                let c_iso2 = c_iso2.unwrap().0;
2255                let c_no_iso1 = c_no_iso1.unwrap().0;
2256                let c_no_iso2 = c_no_iso2.unwrap().0;
2257
2258                assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
2259                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
2260                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
2261                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
2262                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
2263                assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
2264            }
2265        });
2266    }
2267
2268    #[test]
2269    fn opportunistic() {
2270        MockRuntime::test_with_various(|rt| async move {
2271            #[allow(deprecated)] // TODO #1885
2272            let rt = MockSleepRuntime::new(rt);
2273
2274            // The first request will time out completely, but we're
2275            // making a second request after we launch it.  That
2276            // request should succeed, and notify the first request.
2277
2278            let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2279            let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2280
2281            let builder = make_builder(&rt);
2282            builder.set(&ports1, vec![FakeOp::Timeout]);
2283
2284            let mgr = Arc::new(AbstractTunnelMgr::new(
2285                builder,
2286                rt.clone(),
2287                CircuitTiming::default(),
2288            ));
2289            // Note that ports2 will be wider than ports1, so the second
2290            // request will have to launch a new tunnel.
2291
2292            let (c1, c2) = rt
2293                .wait_for(futures::future::join(
2294                    mgr.get_or_launch(&ports1, di()),
2295                    async {
2296                        rt.sleep(Duration::from_millis(100)).await;
2297                        mgr.get_or_launch(&ports2, di()).await
2298                    },
2299                ))
2300                .await;
2301
2302            if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
2303                assert!(FakeCirc::eq(&c1, &c2));
2304            } else {
2305                panic!();
2306            };
2307        });
2308    }
2309
2310    #[test]
2311    fn prebuild() {
2312        MockRuntime::test_with_various(|rt| async move {
2313            // This time we're going to use ensure_tunnel() to make
2314            // sure that a tunnel gets built, and then launch two
2315            // other tunnels that will use it.
2316            #[allow(deprecated)] // TODO #1885
2317            let rt = MockSleepRuntime::new(rt);
2318            let builder = make_builder(&rt);
2319            let mgr = Arc::new(AbstractTunnelMgr::new(
2320                builder,
2321                rt.clone(),
2322                CircuitTiming::default(),
2323            ));
2324
2325            let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
2326            let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2327            let ports3 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
2328
2329            let ok = mgr.ensure_tunnel(&ports1, di());
2330            let (c1, c2) = rt
2331                .wait_for(futures::future::join(
2332                    async {
2333                        rt.sleep(Duration::from_millis(10)).await;
2334                        mgr.get_or_launch(&ports2, di()).await
2335                    },
2336                    async {
2337                        rt.sleep(Duration::from_millis(50)).await;
2338                        mgr.get_or_launch(&ports3, di()).await
2339                    },
2340                ))
2341                .await;
2342
2343            assert!(ok.is_ok());
2344
2345            let c1 = c1.unwrap().0;
2346            let c2 = c2.unwrap().0;
2347
2348            // If we had launched these separately, they wouldn't share
2349            // a tunnel.
2350            assert!(FakeCirc::eq(&c1, &c2));
2351        });
2352    }
2353
2354    #[test]
2355    fn expiration() {
2356        MockRuntime::test_with_various(|rt| async move {
2357            use crate::config::CircuitTimingBuilder;
2358            // Now let's make some tunnels -- one dirty, one clean, and
2359            // make sure that one expires and one doesn't.
2360            #[allow(deprecated)] // TODO #1885
2361            let rt = MockSleepRuntime::new(rt);
2362            let builder = make_builder(&rt);
2363
2364            let circuit_timing = CircuitTimingBuilder::default()
2365                .max_dirtiness(Duration::from_secs(15))
2366                .build()
2367                .unwrap();
2368
2369            let mgr = Arc::new(AbstractTunnelMgr::new(builder, rt.clone(), circuit_timing));
2370
2371            let imap = TargetTunnelUsage::new_from_ipv4_ports(&[993]);
2372            let pop = TargetTunnelUsage::new_from_ipv4_ports(&[995]);
2373
2374            let ok = mgr.ensure_tunnel(&imap, di());
2375            let pop1 = rt.wait_for(mgr.get_or_launch(&pop, di())).await;
2376
2377            assert!(ok.is_ok());
2378            let pop1 = pop1.unwrap().0;
2379
2380            rt.advance(Duration::from_secs(30)).await;
2381            rt.advance(Duration::from_secs(15)).await;
2382            let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
2383
2384            // This should expire the pop tunnel, since it came from
2385            // get_or_launch() [which marks the tunnel as being
2386            // used].  It should not expire the imap tunnel, since
2387            // it was not dirty until 15 seconds after the cutoff.
2388            let now = rt.now();
2389
2390            mgr.expire_tunnels(now).await;
2391
2392            let (pop2, imap2) = rt
2393                .wait_for(futures::future::join(
2394                    mgr.get_or_launch(&pop, di()),
2395                    mgr.get_or_launch(&imap, di()),
2396                ))
2397                .await;
2398
2399            let pop2 = pop2.unwrap().0;
2400            let imap2 = imap2.unwrap().0;
2401
2402            assert!(!FakeCirc::eq(&pop2, &pop1));
2403            assert!(FakeCirc::eq(&imap2, &imap1));
2404        });
2405    }
2406
2407    /// Returns three exit policies; one that permits nothing, one that permits ports 80
2408    /// and 443 only, and one that permits all ports.
2409    fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
2410        // FIXME(eta): the below is copypasta; would be nice to have a better way of
2411        //             constructing ExitPolicy objects for testing maybe
2412        let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2413
2414        // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
2415        // exits.  Odd-numbered ones allow only ports 80 and 443;
2416        // even-numbered ones allow all ports.
2417        let id_noexit: Ed25519Identity = [0x05; 32].into();
2418        let id_webexit: Ed25519Identity = [0x11; 32].into();
2419        let id_fullexit: Ed25519Identity = [0x20; 32].into();
2420
2421        let not_exit = network.by_id(&id_noexit).unwrap();
2422        let web_exit = network.by_id(&id_webexit).unwrap();
2423        let full_exit = network.by_id(&id_fullexit).unwrap();
2424
2425        let ep_none = ExitPolicy::from_relay(&not_exit);
2426        let ep_web = ExitPolicy::from_relay(&web_exit);
2427        let ep_full = ExitPolicy::from_relay(&full_exit);
2428        (ep_none, ep_web, ep_full)
2429    }
2430
2431    #[test]
2432    fn test_find_supported() {
2433        let (ep_none, ep_web, ep_full) = get_exit_policies();
2434        let fake_circ = FakeCirc { id: FakeId::next() };
2435        let expiration = ExpirationInfo::Unused {
2436            created: Instant::get(),
2437        };
2438
2439        let mut entry_none = OpenEntry::new(
2440            SupportedTunnelUsage::Exit {
2441                policy: ep_none,
2442                isolation: None,
2443                country_code: None,
2444                all_relays_stable: true,
2445            },
2446            fake_circ.clone(),
2447            expiration.clone(),
2448        );
2449        let mut entry_none_c = entry_none.clone();
2450        let mut entry_web = OpenEntry::new(
2451            SupportedTunnelUsage::Exit {
2452                policy: ep_web,
2453                isolation: None,
2454                country_code: None,
2455                all_relays_stable: true,
2456            },
2457            fake_circ.clone(),
2458            expiration.clone(),
2459        );
2460        let mut entry_web_c = entry_web.clone();
2461        let mut entry_full = OpenEntry::new(
2462            SupportedTunnelUsage::Exit {
2463                policy: ep_full,
2464                isolation: None,
2465                country_code: None,
2466                all_relays_stable: true,
2467            },
2468            fake_circ,
2469            expiration,
2470        );
2471        let mut entry_full_c = entry_full.clone();
2472
2473        let usage_web = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2474        let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
2475
2476        assert_isoleq!(
2477            SupportedTunnelUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
2478            empty
2479        );
2480
2481        // HACK(eta): We have to faff around with clones and such because
2482        //            `abstract_spec_find_supported` has a silly signature that involves `&mut`
2483        //            refs, which we can't have more than one of.
2484
2485        assert_isoleq!(
2486            SupportedTunnelUsage::find_supported(
2487                vec![&mut entry_none, &mut entry_web].into_iter(),
2488                &usage_web,
2489            ),
2490            vec![&mut entry_web_c]
2491        );
2492
2493        assert_isoleq!(
2494            SupportedTunnelUsage::find_supported(
2495                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2496                &usage_web,
2497            ),
2498            vec![&mut entry_web_c, &mut entry_full_c]
2499        );
2500
2501        // Test preemptive tunnel usage:
2502
2503        let usage_preemptive_web = TargetTunnelUsage::Preemptive {
2504            port: Some(TargetPort::ipv4(80)),
2505            circs: 2,
2506            require_stability: false,
2507        };
2508        let usage_preemptive_dns = TargetTunnelUsage::Preemptive {
2509            port: None,
2510            circs: 2,
2511            require_stability: false,
2512        };
2513
2514        // shouldn't return anything unless there are >=2 tunnels
2515
2516        assert_isoleq!(
2517            SupportedTunnelUsage::find_supported(
2518                vec![&mut entry_none].into_iter(),
2519                &usage_preemptive_web
2520            ),
2521            empty
2522        );
2523
2524        assert_isoleq!(
2525            SupportedTunnelUsage::find_supported(
2526                vec![&mut entry_none].into_iter(),
2527                &usage_preemptive_dns
2528            ),
2529            empty
2530        );
2531
2532        assert_isoleq!(
2533            SupportedTunnelUsage::find_supported(
2534                vec![&mut entry_none, &mut entry_web].into_iter(),
2535                &usage_preemptive_web
2536            ),
2537            empty
2538        );
2539
2540        assert_isoleq!(
2541            SupportedTunnelUsage::find_supported(
2542                vec![&mut entry_none, &mut entry_web].into_iter(),
2543                &usage_preemptive_dns
2544            ),
2545            vec![&mut entry_none_c, &mut entry_web_c]
2546        );
2547
2548        assert_isoleq!(
2549            SupportedTunnelUsage::find_supported(
2550                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2551                &usage_preemptive_web
2552            ),
2553            vec![&mut entry_web_c, &mut entry_full_c]
2554        );
2555    }
2556
2557    #[test]
2558    fn test_circlist_preemptive_target_circs() {
2559        MockRuntime::test_with_various(|rt| async move {
2560            #[allow(deprecated)] // TODO #1885
2561            let rt = MockSleepRuntime::new(rt);
2562            let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2563            let dirinfo = DirInfo::Directory(&netdir);
2564
2565            let builder = make_builder(&rt);
2566
2567            for circs in [2, 8].iter() {
2568                let mut circlist = TunnelList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
2569
2570                let preemptive_target = TargetTunnelUsage::Preemptive {
2571                    port: Some(TargetPort::ipv4(80)),
2572                    circs: *circs,
2573                    require_stability: false,
2574                };
2575
2576                for _ in 0..*circs {
2577                    assert!(circlist.find_open(&preemptive_target).is_none());
2578
2579                    let usage = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
2580                    let (plan, _) = builder.plan_tunnel(&usage, dirinfo).unwrap();
2581                    let (spec, circ) = rt.wait_for(builder.build_tunnel(plan)).await.unwrap();
2582                    let entry = OpenEntry::new(
2583                        spec,
2584                        circ,
2585                        ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
2586                    );
2587                    circlist.add_open(entry);
2588                }
2589
2590                assert!(circlist.find_open(&preemptive_target).is_some());
2591            }
2592        });
2593    }
2594}