Skip to main content

tor_chanmgr/mgr/
state.rs

1//! Simple implementation for the internal map state of a ChanMgr.
2
3use std::time::Duration;
4
5use super::AbstractChannelFactory;
6use super::{AbstractChannel, Pending, Sending, select};
7use crate::{ChannelConfig, Dormancy, Error, Result};
8
9use futures::FutureExt;
10use std::result::Result as StdResult;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13use tor_async_utils::oneshot;
14use tor_basic_utils::RngExt as _;
15use tor_cell::chancell::msg::PaddingNegotiate;
16use tor_config::PaddingLevel;
17use tor_error::{error_report, internal, into_internal};
18use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
19use tor_netdir::{params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND, params::NetParameters};
20use tor_proto::ChannelPaddingInstructions;
21use tor_proto::channel::ChannelPaddingInstructionsUpdates;
22use tor_proto::channel::kist::{KistMode, KistParams};
23use tor_proto::channel::padding::Parameters as PaddingParameters;
24use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
25use tor_units::{BoundedInt32, IntegerMilliseconds};
26use tracing::{info, instrument};
27use void::{ResultVoidExt as _, Void};
28
29#[cfg(test)]
30mod padding_test;
31
32/// All mutable state held by an `AbstractChannelMgr`.
33///
34/// One reason that this is an isolated type is that we want to
35/// to limit the amount of code that can see and
36/// lock the Mutex here.  (We're using a blocking mutex close to async
37/// code, so we need to be careful.)
38pub(crate) struct MgrState<C: AbstractChannelFactory> {
39    /// The data, within a lock
40    ///
41    /// (Danger: this uses a blocking mutex close to async code.  This mutex
42    /// must never be held while an await is happening.)
43    inner: std::sync::Mutex<Inner<C>>,
44}
45
46/// Parameters for channels that we create, and that all existing channels are using
47struct ChannelParams {
48    /// Channel padding instructions
49    padding: ChannelPaddingInstructions,
50
51    /// KIST parameters
52    kist: KistParams,
53}
54
55/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
56struct Inner<C: AbstractChannelFactory> {
57    /// The channel factory type that we store.
58    ///
59    /// In this module we never use this _as_ an AbstractChannelFactory: we just
60    /// hand out clones of it when asked.
61    builder: C,
62
63    /// A map from identity to channels, or to pending channel statuses.
64    channels: ListByRelayIds<ChannelState<C::Channel>>,
65
66    /// A list of unauthenticated channels meaning they are client/bridge -> relay. We populate
67    /// this list as a relay responder accepting incoming connections.
68    ///
69    /// Notice here that [`ChannelState`] is NOT used because we don't need a pending state of
70    /// unauthenticated channels (inbound client/bridge) because multiple channel from the same
71    /// peer can coexist. Furthermore, these channels are never looked up for normal relay
72    /// operations and so a pending state is not needed.
73    #[cfg(feature = "relay")]
74    unauth_channels: Vec<Arc<C::Channel>>,
75
76    /// Parameters for channels that we create, and that all existing channels are using
77    ///
78    /// Will be updated by a background task, which also notifies all existing
79    /// `Open` channels via `channels`.
80    ///
81    /// (Must be protected by the same lock as `channels`, or a channel might be
82    /// created using being-replaced parameters, but not get an update.)
83    channels_params: ChannelParams,
84
85    /// The configuration (from the config file or API caller)
86    config: ChannelConfig,
87
88    /// Dormancy
89    ///
90    /// The last dormancy information we have been told about and passed on to our channels.
91    /// Updated via `MgrState::set_dormancy` and hence `MgrState::reconfigure_general`,
92    /// which then uses it to calculate how to reconfigure the channels.
93    dormancy: Dormancy,
94}
95
96/// The state of a channel (or channel build attempt) within a map.
97///
98/// A ChannelState can be Open (representing a fully negotiated channel) or
99/// Building (representing a pending attempt to build a channel). Both states
100/// have a set of RelayIds, but these RelayIds represent slightly different
101/// things:
102///  * On a Building channel, the set of RelayIds is all the identities that we
103///    require the peer to have. (The peer may turn out to have _more_
104///    identities than this.)
105///  * On an Open channel, the set of RelayIds is all the identities that
106///    we were able to successfully authenticate for the peer.
107pub(crate) enum ChannelState<C> {
108    /// An open channel.
109    ///
110    /// This channel might not be usable: it might be closing or
111    /// broken.  We need to check its is_usable() method before
112    /// yielding it to the user.
113    Open(OpenEntry<C>),
114    /// A channel that's getting built.
115    Building(PendingEntry),
116}
117
118/// An open channel entry.
119#[derive(Clone)]
120pub(crate) struct OpenEntry<C> {
121    /// The underlying open channel.
122    pub(crate) channel: Arc<C>,
123    /// The maximum unused duration allowed for this channel.
124    pub(crate) max_unused_duration: Duration,
125}
126
127/// A unique ID for a pending ([`PendingEntry`]) channel.
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
129pub(crate) struct UniqPendingChanId(u64);
130
131impl UniqPendingChanId {
132    /// Construct a new `UniqPendingChanId`.
133    pub(crate) fn new() -> Self {
134        /// The next unique ID.
135        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
136        // Relaxed ordering is fine; we don't care about how this
137        // is instantiated with respect to other channels.
138        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
139        assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
140        Self(id)
141    }
142}
143
144impl std::fmt::Display for UniqPendingChanId {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        write!(f, "PendingChan {}", self.0)
147    }
148}
149
150/// An entry for a not-yet-build channel
151#[derive(Clone)]
152pub(crate) struct PendingEntry {
153    /// The keys of the relay to which we're trying to open a channel.
154    pub(crate) ids: RelayIds,
155
156    /// A future we can clone and listen on to learn when this channel attempt
157    /// is successful or failed.
158    ///
159    /// This entry will be removed from the map (and possibly replaced with an
160    /// `OpenEntry`) _before_ this future becomes ready.
161    pub(crate) pending: Pending,
162
163    /// A unique ID that allows us to find this exact pending entry later.
164    pub(crate) unique_id: UniqPendingChanId,
165}
166
167impl<C> HasRelayIds for ChannelState<C>
168where
169    C: HasRelayIds,
170{
171    fn identity(
172        &self,
173        key_type: tor_linkspec::RelayIdType,
174    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
175        match self {
176            ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
177            ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
178        }
179    }
180}
181
182impl<C: Clone> ChannelState<C> {
183    /// For testing: either give the Open channel inside this state,
184    /// or panic if there is none.
185    #[cfg(test)]
186    fn unwrap_open(&self) -> &C {
187        match self {
188            ChannelState::Open(ent) => &ent.channel,
189            _ => panic!("Not an open channel"),
190        }
191    }
192}
193
194/// Type of the `nf_ito_*` netdir parameters, convenience alias
195type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
196
197/// Extract from a `NetParameters` which we need, conveniently organized for our processing
198///
199/// This type serves two functions at once:
200///
201///  1. Being a subset of the parameters, we can copy it out of
202///     the netdir, before we do more complex processing - and, in particular,
203///     before we obtain the lock on `inner` (which we need to actually handle the update,
204///     because we need to combine information from the config with that from the netdir).
205///
206///  2. Rather than four separate named fields for the padding options,
207///     it has arrays, so that it is easy to
208///     select the values without error-prone recapitulation of field names.
209#[derive(Debug, Clone)]
210struct NetParamsExtract {
211    /// `nf_ito_*`, the padding timeout parameters from the netdir consensus
212    ///
213    /// `nf_ito[ 0=normal, 1=reduced ][ 0=low, 1=high ]`
214    /// are `nf_ito_{low,high}{,_reduced` from `NetParameters`.
215    // TODO we could use some enum or IndexVec or something to make this less `0` and `1`
216    nf_ito: [[NfIto; 2]; 2],
217
218    /// The KIST parameters.
219    kist: KistParams,
220}
221
222impl From<&NetParameters> for NetParamsExtract {
223    fn from(p: &NetParameters) -> Self {
224        let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
225        // NOTE: in theory, this cast shouldn't be needed
226        // (kist_tcp_notsent_lowat is supposed to be a u32, not an i32).
227        // In practice, however, the type conversion is needed
228        // because consensus params are i32s.
229        //
230        // See the `NetParameters::kist_tcp_notsent_lowat` docs for more details.
231        let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
232        let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
233
234        NetParamsExtract {
235            nf_ito: [
236                [p.nf_ito_low, p.nf_ito_high],
237                [p.nf_ito_low_reduced, p.nf_ito_high_reduced],
238            ],
239            kist,
240        }
241    }
242}
243
244/// Build a `KistMode` from [`NetParameters`].
245///
246/// Used for converting [`kist_enabled`](NetParameters::kist_enabled)
247/// to a corresponding `KistMode`.
248fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
249    caret::caret_int! {
250        /// KIST flavor, defined by a numerical value read from the consensus.
251        struct KistType(i32) {
252            /// KIST disabled
253            DISABLED = 0,
254            /// KIST using TCP_NOTSENT_LOWAT.
255            TCP_NOTSENT_LOWAT = 1,
256        }
257    }
258
259    match val.get().into() {
260        KistType::DISABLED => KistMode::Disabled,
261        KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
262        _ => unreachable!("BoundedInt32 was not bounded?!"),
263    }
264}
265
266impl NetParamsExtract {
267    /// Return the padding timer parameter low end, for reduced-ness `reduced`, as a `u32`
268    fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
269        self.pad_get(reduced, 0)
270    }
271    /// Return the padding timer parameter high end, for reduced-ness `reduced`, as a `u32`
272    fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
273        self.pad_get(reduced, 1)
274    }
275
276    /// Return and converts one padding parameter timer
277    ///
278    /// Internal function.
279    fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
280        self.nf_ito[usize::from(reduced)][low_or_high]
281            .try_map(|v| Ok::<_, Void>(v.into()))
282            .void_unwrap()
283    }
284}
285
286impl<C: AbstractChannel> ChannelState<C> {
287    /// Return true if a channel is ready to expire.
288    /// Update `expire_after` if a smaller duration than
289    /// the given value is required to expire this channel.
290    fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
291        let ChannelState::Open(ent) = self else {
292            return false;
293        };
294        let Some(unused_duration) = ent.channel.duration_unused() else {
295            // still in use
296            return false;
297        };
298        let max_unused_duration = ent.max_unused_duration;
299        let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
300            // no time remaining; drop now.
301            return true;
302        };
303        if remaining.is_zero() {
304            // Ignoring this edge case would result in a fairly benign race
305            // condition outside of Shadow, but deadlock in Shadow.
306            return true;
307        }
308        *expire_after = std::cmp::min(*expire_after, remaining);
309        false
310    }
311}
312
313impl<C: AbstractChannelFactory> MgrState<C> {
314    /// Create a new empty `MgrState`.
315    pub(crate) fn new(
316        builder: C,
317        config: ChannelConfig,
318        dormancy: Dormancy,
319        netparams: &NetParameters,
320    ) -> Self {
321        let mut padding_params = ChannelPaddingInstructions::default();
322        let netparams = NetParamsExtract::from(netparams);
323        let kist_params = netparams.kist;
324        let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
325            .unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
326        let _: Option<_> = update; // there are no channels yet, that would need to be told
327
328        let channels_params = ChannelParams {
329            padding: padding_params,
330            kist: kist_params,
331        };
332
333        MgrState {
334            inner: std::sync::Mutex::new(Inner {
335                builder,
336                channels: ListByRelayIds::new(),
337                #[cfg(feature = "relay")]
338                unauth_channels: Vec::new(),
339                config,
340                channels_params,
341                dormancy,
342            }),
343        }
344    }
345
346    /// Run a function on the [`ListByRelayIds`] that implements the map in this `MgrState`.
347    ///
348    /// This function grabs a mutex: do not provide a slow function.
349    ///
350    /// We provide this function rather than exposing the channels set directly,
351    /// to make sure that the calling code doesn't await while holding the lock.
352    ///
353    /// This is only `cfg(test)` since it can deadlock.
354    ///
355    /// # Deadlock
356    ///
357    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
358    #[cfg(test)]
359    pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
360    where
361        F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
362    {
363        let mut inner = self.inner.lock()?;
364        Ok(func(&mut inner.channels))
365    }
366
367    /// Return a copy of the builder stored in this state.
368    pub(crate) fn builder(&self) -> C
369    where
370        C: Clone,
371    {
372        let inner = self.inner.lock().expect("lock poisoned");
373        inner.builder.clone()
374    }
375
376    /// Run a function to modify the builder stored in this state.
377    ///
378    /// # Deadlock
379    ///
380    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
381    #[allow(unused)]
382    pub(crate) fn with_mut_builder<F>(&self, func: F)
383    where
384        F: FnOnce(&mut C),
385    {
386        let mut inner = self.inner.lock().expect("lock poisoned");
387        func(&mut inner.builder);
388    }
389
390    /// Add an open channel into our list.
391    #[cfg(feature = "relay")]
392    pub(crate) fn add_open(&self, channel: Arc<C::Channel>) -> Result<()> {
393        let mut inner = self.inner.lock()?;
394        // Make sure this channel has verified relay identities. Else, put it in the
395        // unauthenticated channel list (ex: client channel as a relay responder).
396        if channel.has_any_identity() {
397            inner.channels.insert(ChannelState::Open(OpenEntry {
398                channel,
399                // TODO(relay): Relay need a different unused duration (if any). We can't use the
400                // client timeout value. Need to be figured out before production.
401                max_unused_duration: Self::random_max_unused_duration(),
402            }));
403        } else {
404            inner.unauth_channels.push(channel);
405        }
406        Ok(())
407    }
408
409    /// Remove every unusable state from the map in this state.
410    #[cfg(test)]
411    pub(crate) fn remove_unusable(&self) -> Result<()> {
412        let mut inner = self.inner.lock()?;
413        inner.channels.retain(|state| match state {
414            ChannelState::Open(ent) => ent.channel.is_usable(),
415            ChannelState::Building(_) => true,
416        });
417        Ok(())
418    }
419
420    /// Request an open or pending channel to `target`. If `add_new_entry_if_not_found` is true and
421    /// an open or pending channel isn't found, a new pending entry will be added and
422    /// [`ChannelForTarget::NewEntry`] will be returned. This is all done as part of the same method
423    /// so that all operations are performed under the same lock acquisition.
424    pub(crate) fn request_channel(
425        &self,
426        target: &C::BuildSpec,
427        add_new_entry_if_not_found: bool,
428    ) -> Result<Option<ChannelForTarget<C>>> {
429        use ChannelState::*;
430
431        let mut inner = self.inner.lock()?;
432
433        // The idea here is to choose the channel in two steps:
434        //
435        // - Eligibility: Get channels from the channel map and filter them down to only channels
436        //   which are eligible to be returned.
437        // - Ranking: From the eligible channels, choose the best channel.
438        //
439        // Another way to choose the channel could be something like: first try all canonical open
440        // channels, then all non-canonical open channels, then all pending channels with all
441        // matching relay ids, then remaining pending channels, etc. But this ends up being hard to
442        // follow and inflexible (what if you want to prioritize pending channels over non-canonical
443        // open channels?).
444
445        // Open channels which are allowed for requests to `target`.
446        let open_channels = inner
447            .channels
448            // channels with all target relay identifiers
449            .by_all_ids(target)
450            .filter(|entry| match entry {
451                Open(x) => select::open_channel_is_allowed(x, target),
452                Building(_) => false,
453            });
454
455        // Pending channels which will *probably* be allowed for requests to `target` once they
456        // complete.
457        let pending_channels = inner
458            .channels
459            // channels that have a subset of the relay ids of `target`
460            .all_subset(target)
461            .into_iter()
462            .filter(|entry| match entry {
463                Open(_) => false,
464                Building(x) => select::pending_channel_maybe_allowed(x, target),
465            });
466
467        match select::choose_best_channel(open_channels.chain(pending_channels), target) {
468            Some(Open(OpenEntry { channel, .. })) => {
469                // This entry is a perfect match for the target keys: we'll return the open
470                // entry.
471                return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
472            }
473            Some(Building(PendingEntry { pending, .. })) => {
474                // This entry is potentially a match for the target identities: we'll return the
475                // pending entry. (We don't know for sure if it will match once it completes,
476                // since we might discover additional keys beyond those listed for this pending
477                // entry.)
478                return Ok(Some(ChannelForTarget::Pending(pending.clone())));
479            }
480            None => {}
481        }
482
483        // It's possible we know ahead of time that building a channel would be unsuccessful.
484        if inner
485            .channels
486            // channels with at least one id in common with `target`
487            .all_overlapping(target)
488            .into_iter()
489            // but not channels which completely satisfy the id requirements of `target`
490            .filter(|entry| !entry.has_all_relay_ids_from(target))
491            .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
492        {
493            // At least one *open, usable* channel has been negotiated that overlaps only
494            // partially with our target: it has proven itself to have _one_ of our target
495            // identities, but not all.
496            //
497            // Because this channel exists, we know that our target cannot succeed, since relays
498            // are not allowed to share _any_ identities.
499            //return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
500            return Err(Error::IdentityConflict);
501        }
502
503        if !add_new_entry_if_not_found {
504            return Ok(None);
505        }
506
507        // Great, nothing interfered at all.
508        let any_relay_id = target
509            .identities()
510            .next()
511            .ok_or(internal!("relay target had no id"))?
512            .to_owned();
513        let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
514        inner
515            .channels
516            .try_insert(ChannelState::Building(new_state))?;
517        let handle = PendingChannelHandle::new(any_relay_id, unique_id);
518        Ok(Some(ChannelForTarget::NewEntry((handle, send))))
519    }
520
521    /// Remove the pending channel identified by its `handle`.
522    pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
523        let mut inner = self.inner.lock()?;
524        remove_pending(&mut inner.channels, handle);
525        Ok(())
526    }
527
528    /// Upgrade the pending channel identified by its `handle` by replacing it with a new open
529    /// `channel`.
530    #[instrument(skip_all, level = "trace")]
531    pub(crate) fn upgrade_pending_channel_to_open(
532        &self,
533        handle: PendingChannelHandle,
534        channel: Arc<C::Channel>,
535    ) -> Result<()> {
536        // Do all operations under the same lock acquisition.
537        let mut inner = self.inner.lock()?;
538
539        remove_pending(&mut inner.channels, handle);
540
541        // This isn't great.  We context switch to the newly-created
542        // channel just to tell it how and whether to do padding.  Ideally
543        // we would pass the params at some suitable point during
544        // building.  However, that would involve the channel taking a
545        // copy of the params, and that must happen in the same channel
546        // manager lock acquisition span as the one where we insert the
547        // channel into the table so it will receive updates.  I.e.,
548        // here.
549        let update = inner.channels_params.padding.initial_update();
550        if let Some(update) = update {
551            channel
552                .reparameterize(update.into())
553                .map_err(|_| internal!("failure on new channel"))?;
554        }
555        let new_entry = ChannelState::Open(OpenEntry {
556            channel,
557            max_unused_duration: Self::random_max_unused_duration(),
558        });
559        inner.channels.insert(new_entry);
560
561        Ok(())
562    }
563
564    /// Reconfigure all channels as necessary
565    ///
566    /// (By reparameterizing channels as needed)
567    /// This function will handle
568    ///   - netdir update
569    ///   - a reconfiguration
570    ///   - dormancy
571    ///
572    /// For `new_config` and `new_dormancy`, `None` means "no change to previous info".
573    ///
574    /// **Note:** As described in `ChanMgr::set_create_request_handler()`,
575    /// this doesn't update the parameters for any `CreateRequestHandler` if one exists.
576    pub(super) fn reconfigure_general(
577        &self,
578        new_config: Option<&ChannelConfig>,
579        new_dormancy: Option<Dormancy>,
580        netparams: Arc<dyn AsRef<NetParameters>>,
581    ) -> StdResult<(), tor_error::Bug> {
582        use ChannelState as CS;
583
584        // TODO when we support operation as a relay, inter-relay channels ought
585        // not to get padding.
586        let netdir = {
587            let extract = NetParamsExtract::from((*netparams).as_ref());
588            drop(netparams);
589            extract
590        };
591
592        let mut inner = self
593            .inner
594            .lock()
595            .map_err(|_| internal!("poisoned channel manager"))?;
596        let inner = &mut *inner;
597
598        if let Some(new_config) = new_config {
599            inner.config = new_config.clone();
600        }
601        if let Some(new_dormancy) = new_dormancy {
602            inner.dormancy = new_dormancy;
603        }
604
605        let update = parameterize(
606            &mut inner.channels_params.padding,
607            &inner.config,
608            inner.dormancy,
609            &netdir,
610        )?;
611
612        let update = update.map(Arc::new);
613
614        let new_kist_params = netdir.kist;
615        let kist_params = if new_kist_params != inner.channels_params.kist {
616            // The KIST params have changed: remember their value,
617            // and reparameterize_kist()
618            inner.channels_params.kist = new_kist_params;
619            Some(new_kist_params)
620        } else {
621            // If the new KIST params are identical to the previous ones,
622            // we don't need to call reparameterize_kist()
623            None
624        };
625
626        if update.is_none() && kist_params.is_none() {
627            // Return early, nothing to reconfigure
628            return Ok(());
629        }
630
631        let channels = inner.channels.values().filter_map(|chan| match chan {
632            CS::Open(OpenEntry { channel, .. }) => Some(channel),
633            CS::Building(_) => None,
634        });
635        #[cfg(feature = "relay")]
636        let channels = channels.chain(inner.unauth_channels.iter());
637
638        for channel in channels {
639            if let Some(ref update) = update {
640                // Ignore error (which simply means the channel is closed or gone)
641                let _ = channel.reparameterize(Arc::clone(update));
642            }
643
644            if let Some(kist) = kist_params {
645                // Ignore error (which simply means the channel is closed or gone)
646                let _ = channel.reparameterize_kist(kist);
647            }
648        }
649
650        Ok(())
651    }
652
653    /// Expire all channels that have been unused for too long.
654    ///
655    /// Return a Duration until the next time at which
656    /// a channel _could_ expire.
657    pub(crate) fn expire_channels(&self) -> Duration {
658        // TODO(relay): First, I don't think dropping the ChannelState<> from the channel list
659        // actually closes a channel reactor. Because it holds a Arc<Channel>, the ref counter is
660        // simply decremented but the reactor still goes on. We can't have guarantees on an object
661        // in an `Arc<>` to be cleaned up when we drop it as it defeats the purpose of using an Arc
662        // as we don't know if other reference are held elsewhere.
663        //
664        // Second, Unauthenticated channels are relay only as in client/bridge connecting inbound.
665        // We need to expire any unused as well.
666        //
667        // Taking both points above, maybe it could be better to move the expiry logic into the
668        // channel reactor itself and make this function simply retain any channel that are
669        // `is_usable()`. This would remove the convoluted needs for ChannelDetails shared between
670        // a `Channel` (reactor handle) and the channel `Reactor`.
671        //
672        // We'll address all this in https://gitlab.torproject.org/tpo/core/arti/-/work_items/1600
673
674        let mut ret = Duration::from_secs(180);
675        self.inner
676            .lock()
677            .expect("Poisoned lock")
678            .channels
679            .retain(|chan| !chan.ready_to_expire(&mut ret));
680        ret
681    }
682
683    /// Helper: Return the default max unused duration for a channel.
684    fn random_max_unused_duration() -> Duration {
685        Duration::from_secs(
686            rand::rng()
687                .gen_range_checked(180..270)
688                .expect("not 180 < 270 !"),
689        )
690    }
691}
692
693/// A channel for a given target relay.
694pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
695    /// A channel that is open.
696    Open(Arc<CF::Channel>),
697    /// A channel that is building.
698    Pending(Pending),
699    /// Information about a new pending channel entry.
700    NewEntry((PendingChannelHandle, Sending)),
701}
702
703/// A handle for a pending channel.
704///
705/// WARNING: This handle should never be dropped, and should always be passed back into
706/// [`MgrState::remove_pending_channel`] or [`MgrState::upgrade_pending_channel_to_open`], otherwise
707/// the pending channel may be left in the channel map forever.
708///
709/// This handle must only be used with the `MgrState` from which it was given.
710pub(crate) struct PendingChannelHandle {
711    /// Any relay ID for this pending channel.
712    relay_id: tor_linkspec::RelayId,
713    /// The unique ID for this pending channel.
714    unique_id: UniqPendingChanId,
715    /// The pending channel has been removed from the channel map.
716    chan_has_been_removed: bool,
717}
718
719impl PendingChannelHandle {
720    /// Create a new [`PendingChannelHandle`].
721    fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
722        Self {
723            relay_id,
724            unique_id,
725            chan_has_been_removed: false,
726        }
727    }
728
729    /// This should be called when the pending channel has been removed from the pending channel
730    /// map. Not calling this will result in an error log message (and panic in debug builds) when
731    /// this handle is dropped.
732    fn chan_has_been_removed(mut self) {
733        self.chan_has_been_removed = true;
734    }
735}
736
737impl std::ops::Drop for PendingChannelHandle {
738    fn drop(&mut self) {
739        if !self.chan_has_been_removed {
740            #[allow(clippy::missing_docs_in_private_items)]
741            const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
742            error_report!(
743                internal!("{MSG}"),
744                "'PendingChannelHandle' dropped unexpectedly",
745            );
746        }
747    }
748}
749
750/// Helper: return the objects used to inform pending tasks about a newly open or failed channel.
751fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
752    let (snd, rcv) = oneshot::channel();
753    let pending = rcv.shared();
754    let unique_id = UniqPendingChanId::new();
755    let entry = PendingEntry {
756        ids,
757        pending,
758        unique_id,
759    };
760
761    (entry, snd, unique_id)
762}
763
764/// Helper: remove the pending channel identified by `handle` from `channel_map`.
765fn remove_pending<C: AbstractChannel>(
766    channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
767    handle: PendingChannelHandle,
768) {
769    // we need only one relay id to locate it, even if it has multiple relay ids
770    let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
771        let ChannelState::Building(c) = c else {
772            return false;
773        };
774        c.unique_id == handle.unique_id
775    });
776    debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
777
778    handle.chan_has_been_removed();
779}
780
781/// Converts config, dormancy, and netdir, into parameter updates
782///
783/// Calculates new parameters, updating `channels_params` as appropriate.
784/// If anything changed, the corresponding update instruction is returned.
785///
786/// `channels_params` is updated with the new parameters,
787/// and the update message, if one is needed, is returned.
788///
789/// This is called in two places:
790///
791///  1. During chanmgr creation, it is called once to analyze the initial state
792///     and construct a corresponding ChannelPaddingInstructions.
793///
794///  2. During reconfiguration.
795fn parameterize(
796    channels_params: &mut ChannelPaddingInstructions,
797    config: &ChannelConfig,
798    dormancy: Dormancy,
799    netdir: &NetParamsExtract,
800) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
801    // Everything in this calculation applies to *all* channels, disregarding
802    // channel usage.  Usage is handled downstream, in the channel frontend.
803    // See the module doc in `crates/tor-proto/src/channel/padding.rs`.
804
805    let padding_of_level = |level| padding_parameters(level, netdir);
806    let send_padding = padding_of_level(config.padding)?;
807    let padding_default = padding_of_level(PaddingLevel::default())?;
808
809    let send_padding = match dormancy {
810        Dormancy::Active => send_padding,
811        Dormancy::Dormant => None,
812    };
813
814    let recv_padding = match config.padding {
815        PaddingLevel::Reduced => None,
816        PaddingLevel::Normal => send_padding,
817        PaddingLevel::None => None,
818    };
819
820    // Whether the inbound padding approach we are to use, is the same as the default
821    // derived from the netdir (disregarding our config and dormancy).
822    //
823    // Ie, whether the parameters we want are precisely those that a peer would
824    // use by default (assuming they have the same view of the netdir as us).
825    let recv_equals_default = recv_padding == padding_default;
826
827    let padding_negotiate = if recv_equals_default {
828        // Our padding approach is the same as peers' defaults.  So the PADDING_NEGOTIATE
829        // message we need to send is the START(0,0).  (The channel frontend elides an
830        // initial message of this form, - see crates/tor-proto/src/channel.rs::note_usage.)
831        //
832        // If the netdir default is no padding, and we previously negotiated
833        // padding being enabled, and now want to disable it, we would send
834        // START(0,0) rather than STOP.  That is OK (even, arguably, right).
835        PaddingNegotiate::start_default()
836    } else {
837        match recv_padding {
838            None => PaddingNegotiate::stop(),
839            Some(params) => params.padding_negotiate_cell()?,
840        }
841    };
842
843    let mut update = channels_params
844        .start_update()
845        .padding_enable(send_padding.is_some())
846        .padding_negotiate(padding_negotiate);
847    if let Some(params) = send_padding {
848        update = update.padding_parameters(params);
849    }
850    let update = update.finish();
851
852    Ok(update)
853}
854
855/// Given a `NetDirExtract` and whether we're reducing padding, return a `PaddingParameters`
856///
857/// With `PaddingLevel::None`, or the consensus specifies no padding, will return `None`;
858/// but does not account for other reasons why padding might be enabled/disabled.
859fn padding_parameters(
860    config: PaddingLevel,
861    netdir: &NetParamsExtract,
862) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
863    let reduced = match config {
864        PaddingLevel::Reduced => true,
865        PaddingLevel::Normal => false,
866        PaddingLevel::None => return Ok(None),
867    };
868
869    padding_parameters_builder(reduced, netdir)
870        .unwrap_or_else(|e: &str| {
871            info!(
872                "consensus channel padding parameters wrong, using defaults: {}",
873                &e,
874            );
875            Some(PaddingParametersBuilder::default())
876        })
877        .map(|p| {
878            p.build()
879                .map_err(into_internal!("failed to build padding parameters"))
880        })
881        .transpose()
882}
883
884/// Given a `NetDirExtract` and whether we're reducing padding,
885/// return a `PaddingParametersBuilder`
886///
887/// If the consensus specifies no padding, will return `None`;
888/// but does not account for other reasons why padding might be enabled/disabled.
889///
890/// If `Err`, the string is a description of what is wrong with the parameters;
891/// the caller should use `PaddingParameters::Default`.
892fn padding_parameters_builder(
893    reduced: bool,
894    netdir: &NetParamsExtract,
895) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
896    let mut p = PaddingParametersBuilder::default();
897
898    let low = netdir.pad_low(reduced);
899    let high = netdir.pad_high(reduced);
900    if low > high {
901        return Err("low > high");
902    }
903    if low.as_millis() == 0 && high.as_millis() == 0 {
904        // Zeroes for both channel padding consensus parameters means "don't send padding".
905        // padding-spec.txt s2.6, see description of `nf_ito_high`.
906        return Ok(None);
907    }
908    p.low(low);
909    p.high(high);
910    Ok::<_, &'static str>(Some(p))
911}
912
913#[cfg(test)]
914mod test {
915    // @@ begin test lint list maintained by maint/add_warning @@
916    #![allow(clippy::bool_assert_comparison)]
917    #![allow(clippy::clone_on_copy)]
918    #![allow(clippy::dbg_macro)]
919    #![allow(clippy::mixed_attributes_style)]
920    #![allow(clippy::print_stderr)]
921    #![allow(clippy::print_stdout)]
922    #![allow(clippy::single_char_pattern)]
923    #![allow(clippy::unwrap_used)]
924    #![allow(clippy::unchecked_time_subtraction)]
925    #![allow(clippy::useless_vec)]
926    #![allow(clippy::needless_pass_by_value)]
927    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
928
929    use super::*;
930    use crate::factory::BootstrapReporter;
931    use async_trait::async_trait;
932    #[cfg(feature = "relay")]
933    use safelog::Sensitive;
934    use std::sync::{Arc, Mutex};
935    use tor_llcrypto::pk::ed25519::Ed25519Identity;
936    use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
937    use tor_proto::memquota::ChannelAccount;
938
939    fn new_test_state() -> MgrState<FakeChannelFactory> {
940        MgrState::new(
941            FakeChannelFactory::default(),
942            ChannelConfig::default(),
943            Default::default(),
944            &Default::default(),
945        )
946    }
947
948    #[derive(Clone, Debug, Default)]
949    struct FakeChannelFactory {}
950
951    #[allow(clippy::diverging_sub_expression)] // for unimplemented!() + async_trait
952    #[async_trait]
953    impl AbstractChannelFactory for FakeChannelFactory {
954        type Channel = FakeChannel;
955
956        type BuildSpec = tor_linkspec::OwnedChanTarget;
957
958        type Stream = ();
959
960        async fn build_channel(
961            &self,
962            _target: &Self::BuildSpec,
963            _reporter: BootstrapReporter,
964            _memquota: ChannelAccount,
965        ) -> Result<Arc<FakeChannel>> {
966            unimplemented!()
967        }
968
969        #[cfg(feature = "relay")]
970        async fn build_channel_using_incoming(
971            &self,
972            _peer: Sensitive<std::net::SocketAddr>,
973            _stream: Self::Stream,
974            _memquota: ChannelAccount,
975        ) -> Result<Arc<Self::Channel>> {
976            unimplemented!()
977        }
978    }
979
980    #[derive(Clone, Debug)]
981    struct FakeChannel {
982        ed_ident: Ed25519Identity,
983        usable: bool,
984        unused_duration: Option<u64>,
985        params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
986    }
987    impl AbstractChannel for FakeChannel {
988        fn is_canonical(&self) -> bool {
989            unimplemented!()
990        }
991        fn is_canonical_to_peer(&self) -> bool {
992            unimplemented!()
993        }
994        fn is_usable(&self) -> bool {
995            self.usable
996        }
997        fn duration_unused(&self) -> Option<Duration> {
998            self.unused_duration.map(Duration::from_secs)
999        }
1000        fn reparameterize(
1001            &self,
1002            update: Arc<ChannelPaddingInstructionsUpdates>,
1003        ) -> tor_proto::Result<()> {
1004            *self.params_update.lock().unwrap() = Some(update);
1005            Ok(())
1006        }
1007        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
1008            Ok(())
1009        }
1010        fn engage_padding_activities(&self) {}
1011    }
1012    impl tor_linkspec::HasRelayIds for FakeChannel {
1013        fn identity(
1014            &self,
1015            key_type: tor_linkspec::RelayIdType,
1016        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
1017            match key_type {
1018                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
1019                _ => None,
1020            }
1021        }
1022    }
1023    /// Get a fake ed25519 identity from the first byte of a string.
1024    fn str_to_ed(s: &str) -> Ed25519Identity {
1025        let byte = s.as_bytes()[0];
1026        [byte; 32].into()
1027    }
1028    fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
1029        let channel = FakeChannel {
1030            ed_ident: str_to_ed(ident),
1031            usable: true,
1032            unused_duration: None,
1033            params_update: Arc::new(Mutex::new(None)),
1034        };
1035        ChannelState::Open(OpenEntry {
1036            channel: Arc::new(channel),
1037            max_unused_duration: Duration::from_secs(180),
1038        })
1039    }
1040    fn ch_with_details(
1041        ident: &'static str,
1042        max_unused_duration: Duration,
1043        unused_duration: Option<u64>,
1044    ) -> ChannelState<FakeChannel> {
1045        let channel = FakeChannel {
1046            ed_ident: str_to_ed(ident),
1047            usable: true,
1048            unused_duration,
1049            params_update: Arc::new(Mutex::new(None)),
1050        };
1051        ChannelState::Open(OpenEntry {
1052            channel: Arc::new(channel),
1053            max_unused_duration,
1054        })
1055    }
1056    fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
1057        let channel = FakeChannel {
1058            ed_ident: str_to_ed(ident),
1059            usable: false,
1060            unused_duration: None,
1061            params_update: Arc::new(Mutex::new(None)),
1062        };
1063        ChannelState::Open(OpenEntry {
1064            channel: Arc::new(channel),
1065            max_unused_duration: Duration::from_secs(180),
1066        })
1067    }
1068
1069    #[test]
1070    fn rmv_unusable() -> Result<()> {
1071        let map = new_test_state();
1072
1073        map.with_channels(|map| {
1074            map.insert(closed("machen"));
1075            map.insert(closed("wir"));
1076            map.insert(ch("wir"));
1077            map.insert(ch("feinen"));
1078            map.insert(ch("Fug"));
1079            map.insert(ch("Fug"));
1080        })?;
1081
1082        map.remove_unusable().unwrap();
1083
1084        map.with_channels(|map| {
1085            assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
1086            assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
1087            assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
1088            assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
1089        })?;
1090
1091        Ok(())
1092    }
1093
1094    #[test]
1095    fn reparameterize_via_netdir() -> Result<()> {
1096        let map = new_test_state();
1097
1098        // Set some non-default parameters so that we can tell when an update happens
1099        let _ = map
1100            .inner
1101            .lock()
1102            .unwrap()
1103            .channels_params
1104            .padding
1105            .start_update()
1106            .padding_parameters(
1107                PaddingParametersBuilder::default()
1108                    .low(1234.into())
1109                    .build()
1110                    .unwrap(),
1111            )
1112            .finish();
1113
1114        map.with_channels(|map| {
1115            map.insert(ch("track"));
1116        })?;
1117
1118        let netdir = tor_netdir::testnet::construct_netdir()
1119            .unwrap_if_sufficient()
1120            .unwrap();
1121        let netdir = Arc::new(netdir);
1122
1123        let with_ch = |f: &dyn Fn(&FakeChannel)| {
1124            let inner = map.inner.lock().unwrap();
1125            let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
1126            let ch = ch.next().unwrap().unwrap_open();
1127            f(ch);
1128        };
1129
1130        eprintln!("-- process a default netdir, which should send an update --");
1131        map.reconfigure_general(None, None, netdir.clone()).unwrap();
1132        with_ch(&|ch| {
1133            assert_eq!(
1134                format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
1135                // evade field visibility by (ab)using Debug impl
1136                "ChannelPaddingInstructionsUpdates { padding_enable: None, \
1137                    padding_parameters: Some(Parameters { \
1138                        low: IntegerMilliseconds { value: 1500 }, \
1139                        high: IntegerMilliseconds { value: 9500 } }), \
1140                    padding_negotiate: None }"
1141            );
1142        });
1143        eprintln!();
1144
1145        eprintln!("-- process a default netdir again, which should *not* send an update --");
1146        map.reconfigure_general(None, None, netdir).unwrap();
1147        with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
1148
1149        Ok(())
1150    }
1151
1152    #[test]
1153    fn expire_channels() -> Result<()> {
1154        let map = new_test_state();
1155
1156        // Channel that has been unused beyond max duration allowed is expired
1157        map.with_channels(|map| {
1158            map.insert(ch_with_details(
1159                "wello",
1160                Duration::from_secs(180),
1161                Some(181),
1162            ));
1163        })?;
1164
1165        // Minimum value of max unused duration is 180 seconds
1166        assert_eq!(180, map.expire_channels().as_secs());
1167        map.with_channels(|map| {
1168            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
1169        })?;
1170
1171        let map = new_test_state();
1172
1173        // Channel that has been unused for shorter than max unused duration
1174        map.with_channels(|map| {
1175            map.insert(ch_with_details(
1176                "wello",
1177                Duration::from_secs(180),
1178                Some(120),
1179            ));
1180
1181            map.insert(ch_with_details(
1182                "yello",
1183                Duration::from_secs(180),
1184                Some(170),
1185            ));
1186
1187            // Channel that has been unused beyond max duration allowed is expired
1188            map.insert(ch_with_details(
1189                "gello",
1190                Duration::from_secs(180),
1191                Some(181),
1192            ));
1193
1194            // Closed channel should be retained
1195            map.insert(closed("hello"));
1196        })?;
1197
1198        // Return duration until next channel expires
1199        assert_eq!(10, map.expire_channels().as_secs());
1200        map.with_channels(|map| {
1201            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
1202            assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
1203            assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
1204            assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
1205        })?;
1206        Ok(())
1207    }
1208}