Skip to main content

tor_hsservice/publish/
reactor.rs

1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//!   * the introduction points have changed
25//!   * the onion service configuration has changed in a meaningful way (for example,
26//!     if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//!     has changed. See [`OnionServiceConfigPublisherView`]).
28//!   * there is a new consensus
29//!   * it is time to republish the descriptor (after we upload a descriptor,
30//!     we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//!     in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//!                +---------------------------------------+
43//!                |                                       |
44//!                |                                       v
45//!                |                               +---------------+
46//!                |                               | Bootstrapping |
47//!                |                               +---------------+
48//!                |                                       |
49//!                |                                       |           uploaded to at least
50//!                |  not enough HsDir uploads succeeded   |        some HsDirs from each ring
51//!                |         +-----------------------------+-----------------------+
52//!                |         |                             |                       |
53//!                |         |              all HsDir uploads succeeded            |
54//!                |         |                             |                       |
55//!                |         v                             v                       v
56//!                |  +---------------------+         +---------+        +---------------------+
57//!                |  | DegradedUnreachable |         | Running |        |  DegradedReachable  |
58//! +----------+   |  +---------------------+         +---------+        +---------------------+
59//! | Shutdown |-- |         |                           |                        |
60//! +----------+   |         |                           |                        |
61//!                |         |                           |                        |
62//!                |         |                           |                        |
63//!                |         +---------------------------+------------------------+
64//!                |                                     |   invalid authorized_clients
65//!                |                                     |      after handling config change
66//!                |                                     |
67//!                |                                     v
68//!                |     run_once() returns an error +--------+
69//!                +-------------------------------->| Broken |
70//!                                                  +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_circmgr::ServiceOnionServiceDirTunnel;
77use tor_config::file_watcher::{
78    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
79};
80use tor_config_path::{CfgPath, CfgPathResolver};
81use tor_dirclient::SourceInfo;
82use tor_netdir::{DirEvent, NetDir};
83use tracing::instrument;
84
85use crate::config::OnionServiceConfigPublisherView;
86use crate::config::restricted_discovery::{
87    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
88};
89use crate::status::{DescUploadRetryError, Problem};
90
91use super::*;
92use derive_more::From;
93
94// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
95// for rate-limiting the publish jobs triggered by a change in the config?
96//
97// Currently the descriptor publish tasks triggered by changes in the config
98// are rate-limited via the usual rate limiting mechanism
99// (which rate-limits the uploads for 1m).
100//
101// I think this is OK for now, but we might need to rethink this if it becomes problematic
102// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
103// each time the config is modified).
104
105/// The upload rate-limiting threshold.
106///
107/// Before initiating an upload, the reactor checks if the last upload was at least
108/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
109/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
110/// current time.
111//
112// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
113const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
114
115/// The maximum number of concurrent upload tasks per time period.
116//
117// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
118// will have no effect, since the current number of replicas is far less than
119// this value.
120//
121// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
122// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
123// (currently 2, which means the concurrency limit will, in fact, be 32).
124//
125// We should try to decouple this value from the TP parameters.
126const MAX_CONCURRENT_UPLOADS: usize = 16;
127
128/// The maximum time allowed for uploading a descriptor to a single HSDir,
129/// across all attempts.
130pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
131
132/// A reactor for the HsDir [`Publisher`]
133///
134/// The entrypoint is [`Reactor::run`].
135#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
136pub(super) struct Reactor<R: Runtime, M: Mockable> {
137    /// The immutable, shared inner state.
138    imm: Arc<Immutable<R, M>>,
139    /// A source for new network directories that we use to determine
140    /// our HsDirs.
141    dir_provider: Arc<dyn NetDirProvider>,
142    /// The mutable inner state,
143    inner: Arc<Mutex<Inner>>,
144    /// A channel for receiving IPT change notifications.
145    ipt_watcher: IptsPublisherView,
146    /// A channel for receiving onion service config change notifications.
147    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
148    /// A channel for receiving restricted discovery key_dirs change notifications.
149    key_dirs_rx: FileEventReceiver,
150    /// A channel for sending restricted discovery key_dirs change notifications.
151    ///
152    /// A copy of this sender is handed out to every `FileWatcher` created.
153    key_dirs_tx: FileEventSender,
154    /// A channel for receiving updates regarding our [`PublishStatus`].
155    ///
156    /// The main loop of the reactor watches for updates on this channel.
157    ///
158    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
159    /// we can start publishing descriptors.
160    ///
161    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
162    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
163    /// established some introduction points.
164    publish_status_rx: watch::Receiver<PublishStatus>,
165    /// A sender for updating our [`PublishStatus`].
166    ///
167    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
168    /// we can start publishing descriptors.
169    publish_status_tx: watch::Sender<PublishStatus>,
170    /// A channel for sending upload completion notifications.
171    ///
172    /// This channel is polled in the main loop of the reactor.
173    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
174    /// A channel for receiving upload completion notifications.
175    ///
176    /// A copy of this sender is handed to each upload task.
177    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
178    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
179    ///
180    /// Receivers can use this channel to find out when reactor is dropped.
181    ///
182    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
183    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
184    ///
185    /// Closing this channel will cause any pending upload tasks to be dropped.
186    shutdown_tx: broadcast::Sender<Void>,
187    /// Path resolver for configuration files.
188    path_resolver: Arc<CfgPathResolver>,
189    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
190    /// rotated and thus we need to republish the descriptor for a particular time period.
191    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
192}
193
194/// The immutable, shared state of the descriptor publisher reactor.
195#[derive(Clone)]
196struct Immutable<R: Runtime, M: Mockable> {
197    /// The runtime.
198    runtime: R,
199    /// Mockable state.
200    ///
201    /// This is used for launching circuits and for obtaining random number generators.
202    mockable: M,
203    /// The service for which we're publishing descriptors.
204    nickname: HsNickname,
205    /// The key manager,
206    keymgr: Arc<KeyMgr>,
207    /// A sender for updating the status of the onion service.
208    status_tx: PublisherStatusSender,
209    /// Proof-of-work state.
210    pow_manager: Arc<PowManager<R>>,
211}
212
213impl<R: Runtime, M: Mockable> Immutable<R, M> {
214    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
215    /// with the specified [`TimePeriod`].
216    ///
217    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
218    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
219    /// descriptor signing key.
220    ///
221    /// Returns an error if the service is running in offline mode and the descriptor signing
222    /// keypair of the specified `period` is not available.
223    //
224    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
225    // built from the blinded id key
226    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
227        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
228            Some(key) => {
229                let key: ed25519::ExpandedKeypair = key.into();
230                key.to_secret_key_bytes()[0..32]
231                    .try_into()
232                    .expect("Wrong length on slice")
233            }
234            None => {
235                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
236                // is unreachable (for now).
237                let desc_sign_key_spec =
238                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
239                let key: ed25519::Keypair = self
240                    .keymgr
241                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
242                    // TODO (#1194): internal! is not the right type for this error (we need an
243                    // error type for the case where a hidden service running in offline mode has
244                    // run out of its pre-previsioned keys).
245                    //
246                    // This will be addressed when we add support for offline hs_id mode
247                    .ok_or_else(|| {
248                        internal!(
249                            "identity keys are offline, but descriptor signing key is unavailable?!"
250                        )
251                    })?
252                    .into();
253                key.to_bytes()
254            }
255        };
256
257        Ok(AesOpeKey::from_secret(&ope_key))
258    }
259
260    /// Generate a revision counter for a descriptor associated with the specified
261    /// [`TimePeriod`].
262    ///
263    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
264    ///
265    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
266    fn generate_revision_counter(
267        &self,
268        params: &HsDirParams,
269        now: SystemTime,
270    ) -> Result<RevisionCounter, FatalError> {
271        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
272        // to each time we generate a new descriptor), for performance reasons.
273        let ope_key = self.create_ope_key(params.time_period())?;
274
275        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
276        let srv_start = params.start_of_shard_rand_period();
277        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
278            internal!(
279                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
280                now,
281                srv_start
282            )
283        })?;
284        let rev = ope_key.encrypt(offset);
285
286        Ok(RevisionCounter::from(rev))
287    }
288}
289
290/// Mockable state for the descriptor publisher reactor.
291///
292/// This enables us to mock parts of the [`Reactor`] for testing purposes.
293#[async_trait]
294pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
295    /// The type of random number generator.
296    type Rng: rand::Rng + rand::CryptoRng;
297
298    /// The type of client circuit.
299    type Tunnel: MockableDirTunnel;
300
301    /// Return a random number generator.
302    fn thread_rng(&self) -> Self::Rng;
303
304    /// Create a circuit of the specified `kind` to `target`.
305    async fn get_or_launch_hs_dir<T>(
306        &self,
307        netdir: &NetDir,
308        target: T,
309    ) -> Result<Self::Tunnel, tor_circmgr::Error>
310    where
311        T: CircTarget + Send + Sync;
312
313    /// Return an estimate-based value for how long we should allow a single
314    /// directory upload operation to complete.
315    ///
316    /// Includes circuit construction, stream opening, upload, and waiting for a
317    /// response.
318    fn estimate_upload_timeout(&self) -> Duration;
319}
320
321/// Mockable client circuit
322#[async_trait]
323pub(crate) trait MockableDirTunnel: Send + Sync {
324    /// The data stream type.
325    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
326
327    /// Start a new stream to the last relay in the circuit, using
328    /// a BEGIN_DIR cell.
329    async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error>;
330
331    /// Try to get a SourceInfo for this circuit, for using it in a directory request.
332    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
333}
334
335#[async_trait]
336impl MockableDirTunnel for ServiceOnionServiceDirTunnel {
337    type DataStream = tor_proto::client::stream::DataStream;
338
339    async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error> {
340        Self::begin_dir_stream(self).await
341    }
342
343    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
344        SourceInfo::from_tunnel(self)
345    }
346}
347
348/// The real version of the mockable state of the reactor.
349#[derive(Clone, From, Into)]
350pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
351
352#[async_trait]
353impl<R: Runtime> Mockable for Real<R> {
354    type Rng = rand::rngs::ThreadRng;
355    type Tunnel = ServiceOnionServiceDirTunnel;
356
357    fn thread_rng(&self) -> Self::Rng {
358        rand::rng()
359    }
360
361    #[instrument(level = "trace", skip_all)]
362    async fn get_or_launch_hs_dir<T>(
363        &self,
364        netdir: &NetDir,
365        target: T,
366    ) -> Result<Self::Tunnel, tor_circmgr::Error>
367    where
368        T: CircTarget + Send + Sync,
369    {
370        self.0.get_or_launch_svc_dir(netdir, target).await
371    }
372
373    fn estimate_upload_timeout(&self) -> Duration {
374        use tor_circmgr::timeouts::Action;
375        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
376        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
377        // We assume that in the worst case we'll have to wait for an entire
378        // circuit construction and two round-trips to the hsdir.
379        let est_total = est_build + est_roundtrip * 2;
380        // We always allow _at least_ this much time, in case our estimate is
381        // ridiculously low.
382        let min_timeout = Duration::from_secs(30);
383        max(est_total, min_timeout)
384    }
385}
386
387/// The mutable state of a [`Reactor`].
388struct Inner {
389    /// The onion service config.
390    config: Arc<OnionServiceConfigPublisherView>,
391    /// Watcher for key_dirs.
392    ///
393    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
394    ///
395    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
396    file_watcher: Option<FileWatcher>,
397    /// The relevant time periods.
398    ///
399    /// This includes the current time period, as well as any other time periods we need to be
400    /// publishing descriptors for.
401    ///
402    /// This is empty until we fetch our first netdir in [`Reactor::run`].
403    time_periods: Vec<TimePeriodContext>,
404    /// Our most up to date netdir.
405    ///
406    /// This is initialized in [`Reactor::run`].
407    netdir: Option<Arc<NetDir>>,
408    /// The timestamp of our last upload.
409    ///
410    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
411    /// the publisher from spawning multiple upload tasks at once in response to multiple external
412    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
413    /// notifications in a short time frame (#1142), or an IPT change notification that's
414    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
415    /// inefficient, but it also causes the publisher to generate two different descriptors with
416    /// the same revision counter (the revision counter is derived from the current timestamp),
417    /// which ultimately causes the slower upload task to fail (see #1142).
418    ///
419    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
420    /// used for retrying failed uploads (these are handled internally by
421    /// [`Reactor::upload_descriptor_with_retries`]).
422    last_uploaded: Option<Instant>,
423    /// A max-heap containing the time periods for which we need to reupload the descriptor.
424    // TODO: we are currently reuploading more than nececessary.
425    // Ideally, this shouldn't contain contain duplicate TimePeriods,
426    // because we only need to retain the latest reupload time for each time period.
427    //
428    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
429    // we will end up with multiple ReuploadTimer entries for that TP,
430    // each of which will (eventually) result in a reupload.
431    //
432    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
433    //
434    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
435    reupload_timers: BinaryHeap<ReuploadTimer>,
436    /// The restricted discovery authorized clients.
437    ///
438    /// `None`, unless the service is running in restricted discovery mode.
439    authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
440}
441
442/// The part of the reactor state that changes with every time period.
443struct TimePeriodContext {
444    /// The HsDir params.
445    params: HsDirParams,
446    /// The HsDirs to use in this time period.
447    ///
448    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
449    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
450    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
451    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
452    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
453    /// The revision counter of the last successful upload, if any.
454    last_successful: Option<RevisionCounter>,
455    /// The outcome of the last upload, if any.
456    upload_results: Vec<HsDirUploadStatus>,
457}
458
459impl TimePeriodContext {
460    /// Create a new `TimePeriodContext`.
461    ///
462    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
463    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
464    fn new<'r>(
465        params: HsDirParams,
466        blind_id: HsBlindId,
467        netdir: &Arc<NetDir>,
468        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
469        old_upload_results: Vec<HsDirUploadStatus>,
470    ) -> Result<Self, FatalError> {
471        let period = params.time_period();
472        let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
473        let upload_results = old_upload_results
474            .into_iter()
475            .filter(|res|
476                // Check if the HsDir of this result still exists
477                hs_dirs
478                    .iter()
479                    .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
480            .collect();
481
482        Ok(Self {
483            params,
484            hs_dirs,
485            last_successful: None,
486            upload_results,
487        })
488    }
489
490    /// Recompute the HsDirs for this time period.
491    fn compute_hsdirs<'r>(
492        period: TimePeriod,
493        blind_id: HsBlindId,
494        netdir: &Arc<NetDir>,
495        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
496    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
497        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
498
499        Ok(hs_dirs
500            .map(|hs_dir| {
501                let mut builder = RelayIds::builder();
502                if let Some(ed_id) = hs_dir.ed_identity() {
503                    builder.ed_identity(*ed_id);
504                }
505
506                if let Some(rsa_id) = hs_dir.rsa_identity() {
507                    builder.rsa_identity(*rsa_id);
508                }
509
510                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
511
512                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
513                // reupload it unless it was already dirty and due for a reupload.
514                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
515                    Some((_, status)) => *status,
516                    None => DescriptorStatus::Dirty,
517                };
518
519                (relay_id, status)
520            })
521            .collect::<Vec<_>>())
522    }
523
524    /// Mark the descriptor dirty for all HSDirs of this time period.
525    fn mark_all_dirty(&mut self) {
526        self.hs_dirs
527            .iter_mut()
528            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
529    }
530
531    /// Update the upload result for this time period.
532    fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
533        self.upload_results = upload_results;
534    }
535}
536
537/// An error that occurs while trying to upload a descriptor.
538#[derive(Clone, Debug, thiserror::Error)]
539#[non_exhaustive]
540pub enum UploadError {
541    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
542    #[error("descriptor upload request failed: {}", _0.error)]
543    Request(#[from] RequestFailedError),
544
545    /// Failed to establish circuit to hidden service directory
546    #[error("could not build circuit to HsDir")]
547    Circuit(#[from] tor_circmgr::Error),
548
549    /// Failed to establish stream to hidden service directory
550    #[error("failed to establish directory stream to HsDir")]
551    Stream(#[source] tor_circmgr::Error),
552
553    /// An internal error.
554    #[error("Internal error")]
555    Bug(#[from] tor_error::Bug),
556}
557define_asref_dyn_std_error!(UploadError);
558
559impl UploadError {
560    /// Return true if this error is one that we should report as a suspicious event,
561    /// along with the dirserver, and description of the relevant document.
562    pub(crate) fn should_report_as_suspicious(&self) -> bool {
563        match self {
564            UploadError::Request(e) => e.error.should_report_as_suspicious_if_anon(),
565            UploadError::Circuit(_) => false, // TODO prop360
566            UploadError::Stream(_) => false,  // TODO prop360
567            UploadError::Bug(_) => false,
568        }
569    }
570}
571
572impl<R: Runtime, M: Mockable> Reactor<R, M> {
573    /// Create a new `Reactor`.
574    #[allow(clippy::too_many_arguments)]
575    pub(super) fn new(
576        runtime: R,
577        nickname: HsNickname,
578        dir_provider: Arc<dyn NetDirProvider>,
579        mockable: M,
580        config: &OnionServiceConfig,
581        ipt_watcher: IptsPublisherView,
582        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
583        status_tx: PublisherStatusSender,
584        keymgr: Arc<KeyMgr>,
585        path_resolver: Arc<CfgPathResolver>,
586        pow_manager: Arc<PowManager<R>>,
587        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
588    ) -> Self {
589        /// The maximum size of the upload completion notifier channel.
590        ///
591        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
592        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
593        /// each sender will send exactly one message.
594        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
595
596        // Internally-generated instructions, no need for mq.
597        let (upload_task_complete_tx, upload_task_complete_rx) =
598            mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
599
600        let (publish_status_tx, publish_status_rx) = watch::channel();
601        // Setting the buffer size to zero here is OK,
602        // since we never actually send anything on this channel.
603        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
604
605        let authorized_clients =
606            Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
607
608        // Create a channel for watching for changes in the configured
609        // restricted_discovery.key_dirs.
610        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
611
612        let imm = Immutable {
613            runtime,
614            mockable,
615            nickname,
616            keymgr,
617            status_tx,
618            pow_manager,
619        };
620
621        let inner = Inner {
622            time_periods: vec![],
623            config: Arc::new(config.into()),
624            file_watcher: None,
625            netdir: None,
626            last_uploaded: None,
627            reupload_timers: Default::default(),
628            authorized_clients,
629        };
630
631        Self {
632            imm: Arc::new(imm),
633            inner: Arc::new(Mutex::new(inner)),
634            dir_provider,
635            ipt_watcher,
636            config_rx,
637            key_dirs_rx,
638            key_dirs_tx,
639            publish_status_rx,
640            publish_status_tx,
641            upload_task_complete_rx,
642            upload_task_complete_tx,
643            shutdown_tx,
644            path_resolver,
645            update_from_pow_manager_rx,
646        }
647    }
648
649    /// Start the reactor.
650    ///
651    /// Under normal circumstances, this function runs indefinitely.
652    ///
653    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
654    /// upload fails or is rate-limited.
655    pub(super) async fn run(mut self) -> Result<(), FatalError> {
656        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
657
658        {
659            let netdir = self
660                .dir_provider
661                .wait_for_netdir(Timeliness::Timely)
662                .await?;
663            let time_periods = self.compute_time_periods(&netdir, &[])?;
664
665            let mut inner = self.inner.lock().expect("poisoned lock");
666
667            inner.netdir = Some(netdir);
668            inner.time_periods = time_periods;
669        }
670
671        // Create the initial key_dirs watcher.
672        self.update_file_watcher();
673
674        loop {
675            match self.run_once().await {
676                Ok(ShutdownStatus::Continue) => continue,
677                Ok(ShutdownStatus::Terminate) => {
678                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
679
680                    self.imm.status_tx.send_shutdown();
681                    return Ok(());
682                }
683                Err(e) => {
684                    error_report!(
685                        e,
686                        "HS service {}: descriptor publisher crashed!",
687                        self.imm.nickname
688                    );
689
690                    self.imm.status_tx.send_broken(e.clone());
691
692                    return Err(e);
693                }
694            }
695        }
696    }
697
698    /// Run one iteration of the reactor loop.
699    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
700    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
701        let mut netdir_events = self.dir_provider.events();
702
703        // Note: TrackingNow tracks the values it is compared with.
704        // This is equivalent to sleeping for (until - now) units of time,
705        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
706        if let PublishStatus::RateLimited(until) = self.status() {
707            if upload_rate_lim > until {
708                // We are no longer rate-limited
709                self.expire_rate_limit().await?;
710            }
711        }
712
713        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
714        let mut reupload_periods = vec![];
715        {
716            let mut inner = self.inner.lock().expect("poisoned lock");
717            let inner = &mut *inner;
718            while let Some(reupload) = inner.reupload_timers.peek().copied() {
719                // First, extract all the timeouts that already elapsed.
720                if reupload.when <= reupload_tracking {
721                    inner.reupload_timers.pop();
722                    reupload_periods.push(reupload.period);
723                } else {
724                    // We are not ready to schedule any more reuploads.
725                    //
726                    // How much we need to sleep is implicitly
727                    // tracked in reupload_tracking (through
728                    // the TrackingNow implementation)
729                    break;
730                }
731            }
732        }
733
734        // Check if it's time to schedule any reuploads.
735        for period in reupload_periods {
736            if self.mark_dirty(&period) {
737                debug!(
738                    time_period=?period,
739                    "descriptor reupload timer elapsed; scheduling reupload",
740                );
741                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
742                    .await?;
743            }
744        }
745
746        select_biased! {
747            res = self.upload_task_complete_rx.next().fuse() => {
748                let Some(upload_res) = res else {
749                    return Ok(ShutdownStatus::Terminate);
750                };
751
752                self.handle_upload_results(upload_res);
753                self.upload_result_to_svc_status()?;
754            },
755            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
756                self.expire_rate_limit().await?;
757            },
758            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
759                // Run another iteration, executing run_once again. This time, we will remove the
760                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
761                // relevant HsDirs, and schedule the upload by setting our status to
762                // UploadScheduled.
763                return Ok(ShutdownStatus::Continue);
764            },
765            netdir_event = netdir_events.next().fuse() => {
766                let Some(netdir_event) = netdir_event else {
767                    debug!("netdir event stream ended");
768                    return Ok(ShutdownStatus::Terminate);
769                };
770
771                if !matches!(netdir_event, DirEvent::NewConsensus) {
772                    return Ok(ShutdownStatus::Continue);
773                };
774
775                // The consensus changed. Grab a new NetDir.
776                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
777                    Ok(y) => y,
778                    Err(e) => {
779                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
780                        // Hopefully a netdir will appear in the future.
781                        // in the meantime, suspend operations.
782                        //
783                        // TODO (#1218): there is a bug here: we stop reading on our inputs
784                        // including eg publish_status_rx, but it is our job to log some of
785                        // these things.  While we are waiting for a netdir, all those messages
786                        // are "stuck"; they'll appear later, with misleading timestamps.
787                        //
788                        // Probably this should be fixed by moving the logging
789                        // out of the reactor, where it won't be blocked.
790                        self.dir_provider.wait_for_netdir(Timeliness::Timely)
791                            .await?
792                    }
793                };
794                let relevant_periods = netdir.hs_all_time_periods();
795                self.handle_consensus_change(netdir).await?;
796                expire_publisher_keys(
797                    &self.imm.keymgr,
798                    &self.imm.nickname,
799                    &relevant_periods,
800                ).unwrap_or_else(|e| {
801                    error_report!(e, "failed to remove expired keys");
802                });
803            }
804            update = self.ipt_watcher.await_update().fuse() => {
805                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
806                    return Ok(ShutdownStatus::Terminate);
807                }
808            },
809            config = self.config_rx.next().fuse() => {
810                let Some(config) = config else {
811                    return Ok(ShutdownStatus::Terminate);
812                };
813
814                self.handle_svc_config_change(&config).await?;
815            },
816            res = self.key_dirs_rx.next().fuse() => {
817                let Some(event) = res else {
818                    return Ok(ShutdownStatus::Terminate);
819                };
820
821                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
822                    // Discard other events, so that we only reload once.
823                }
824
825                self.handle_key_dirs_change(event).await?;
826            }
827            should_upload = self.publish_status_rx.next().fuse() => {
828                let Some(should_upload) = should_upload else {
829                    return Ok(ShutdownStatus::Terminate);
830                };
831
832                // Our PublishStatus changed -- are we ready to publish?
833                if should_upload == PublishStatus::UploadScheduled {
834                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
835                    self.upload_all().await?;
836                }
837            }
838            update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
839                debug!("Update PoW seed for TP!");
840                let Some(time_period) = update_tp_pow_seed else {
841                    return Ok(ShutdownStatus::Terminate);
842                };
843                self.mark_dirty(&time_period);
844                self.upload_all().await?;
845            }
846        }
847
848        Ok(ShutdownStatus::Continue)
849    }
850
851    /// Returns the current status of the publisher
852    fn status(&self) -> PublishStatus {
853        *self.publish_status_rx.borrow()
854    }
855
856    /// Handle a batch of upload outcomes,
857    /// possibly updating the status of the descriptor for the corresponding HSDirs.
858    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
859        let mut inner = self.inner.lock().expect("poisoned lock");
860        let inner = &mut *inner;
861
862        // Check which time period these uploads pertain to.
863        let period = inner
864            .time_periods
865            .iter_mut()
866            .find(|ctx| ctx.params.time_period() == results.time_period);
867
868        let Some(period) = period else {
869            // The uploads were for a time period that is no longer relevant, so we
870            // can ignore the result.
871            return;
872        };
873
874        // We will need to reupload this descriptor at some point, so we pick
875        // a random time between 60 minutes and 120 minutes in the future.
876        //
877        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
878        let mut rng = self.imm.mockable.thread_rng();
879        // TODO SPEC: Control republish period using a consensus parameter?
880        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
881        let duration = Duration::from_secs(minutes * 60);
882        let reupload_when = self.imm.runtime.now() + duration;
883        let time_period = period.params.time_period();
884
885        info!(
886            time_period=?time_period,
887            "reuploading descriptor in {}",
888            humantime::format_duration(duration),
889        );
890
891        inner.reupload_timers.push(ReuploadTimer {
892            period: time_period,
893            when: reupload_when,
894        });
895
896        let mut upload_results = vec![];
897        for upload_res in results.hsdir_result {
898            let relay = period
899                .hs_dirs
900                .iter_mut()
901                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
902
903            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
904                // This HSDir went away, so the result doesn't matter.
905                // Continue processing the rest of the results
906                continue;
907            };
908
909            if upload_res.upload_res.is_ok() {
910                let update_last_successful = match period.last_successful {
911                    None => true,
912                    Some(counter) => counter <= upload_res.revision_counter,
913                };
914
915                if update_last_successful {
916                    period.last_successful = Some(upload_res.revision_counter);
917                    // TODO (#1098): Is it possible that this won't update the statuses promptly
918                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
919                    // and start an upload task for a descriptor has already been uploaded (or is
920                    // being uploaded) in another task, but whose upload results have not yet been
921                    // processed.
922                    //
923                    // This is probably made worse by the fact that the statuses are updated in
924                    // batches (grouped by time period), rather than one by one as the upload tasks
925                    // complete (updating the status involves locking the inner mutex, and I wanted
926                    // to minimize the locking/unlocking overheads). I'm not sure handling the
927                    // updates in batches was the correct decision here.
928                    *status = DescriptorStatus::Clean;
929                }
930            }
931
932            upload_results.push(upload_res);
933        }
934
935        period.set_upload_results(upload_results);
936    }
937
938    /// Maybe update our list of HsDirs.
939    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
940        trace!("the consensus has changed; recomputing HSDirs");
941
942        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
943
944        self.recompute_hs_dirs()?;
945        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
946            .await?;
947
948        // If the time period has changed, some of our upload results may now be irrelevant,
949        // so we might need to update our status (for example, if our uploads are
950        // for a no-longer-relevant time period, it means we might be able to update
951        // out status from "degraded" to "running")
952        self.upload_result_to_svc_status()?;
953
954        Ok(())
955    }
956
957    /// Recompute the HsDirs for all relevant time periods.
958    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
959        let mut inner = self.inner.lock().expect("poisoned lock");
960        let inner = &mut *inner;
961
962        let netdir = Arc::clone(
963            inner
964                .netdir
965                .as_ref()
966                .ok_or_else(|| internal!("started upload task without a netdir"))?,
967        );
968
969        // Update our list of relevant time periods.
970        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
971        inner.time_periods = new_time_periods;
972
973        Ok(())
974    }
975
976    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
977    ///
978    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
979    /// HsDirs where possible.
980    fn compute_time_periods(
981        &self,
982        netdir: &Arc<NetDir>,
983        time_periods: &[TimePeriodContext],
984    ) -> Result<Vec<TimePeriodContext>, FatalError> {
985        netdir
986            .hs_all_time_periods()
987            .iter()
988            .map(|params| {
989                let period = params.time_period();
990                let blind_id_kp =
991                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
992                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
993                        // It's supposed to return Ok(None) if we're in offline hsid mode,
994                        // but that might change when we do #1194
995                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
996
997                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
998
999                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
1000                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
1001                // publishing the descriptor to the HsDirs that already have it (the ones that are
1002                // marked with DescriptorStatus::Clean).
1003                //
1004                // In other words, we only want to publish to those HsDirs that
1005                //   * are part of a new time period (which we have never published the descriptor
1006                //   for), or
1007                //   * have just been added to the ring of a time period we already knew about
1008                if let Some(ctx) = time_periods
1009                    .iter()
1010                    .find(|ctx| ctx.params.time_period() == period)
1011                {
1012                    TimePeriodContext::new(
1013                        params.clone(),
1014                        blind_id.into(),
1015                        netdir,
1016                        ctx.hs_dirs.iter(),
1017                        ctx.upload_results.clone(),
1018                    )
1019                } else {
1020                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
1021                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
1022                    TimePeriodContext::new(
1023                        params.clone(),
1024                        blind_id.into(),
1025                        netdir,
1026                        iter::empty(),
1027                        vec![],
1028                    )
1029                }
1030            })
1031            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
1032    }
1033
1034    /// Replace the old netdir with the new, returning the old.
1035    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
1036        self.inner
1037            .lock()
1038            .expect("poisoned lock")
1039            .netdir
1040            .replace(new_netdir)
1041    }
1042
1043    /// Replace our view of the service config with `new_config` if `new_config` contains changes
1044    /// that would cause us to generate a new descriptor.
1045    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1046        let mut inner = self.inner.lock().expect("poisoned lock");
1047        let old_config = &mut inner.config;
1048
1049        // The fields we're interested in haven't changed, so there's no need to update
1050        // `inner.config`.
1051        if *old_config == new_config {
1052            return false;
1053        }
1054
1055        let log_change = match (
1056            old_config.restricted_discovery.enabled,
1057            new_config.restricted_discovery.enabled,
1058        ) {
1059            (true, false) => Some("Disabling restricted discovery mode"),
1060            (false, true) => Some("Enabling restricted discovery mode"),
1061            _ => None,
1062        };
1063
1064        if let Some(msg) = log_change {
1065            info!(nickname=%self.imm.nickname, "{}", msg);
1066        }
1067
1068        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1069
1070        true
1071    }
1072
1073    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1074    fn update_file_watcher(&self) {
1075        let mut inner = self.inner.lock().expect("poisoned lock");
1076        if inner.config.restricted_discovery.watch_configuration() {
1077            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1078            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1079
1080            let dirs = inner.config.restricted_discovery.key_dirs().clone();
1081
1082            watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1083
1084            let watcher = watcher
1085                .start_watching(self.key_dirs_tx.clone())
1086                .map_err(|e| {
1087                    // TODO: update the publish status (see also the module-level TODO about this).
1088                    error_report!(e, "Cannot set file watcher");
1089                })
1090                .ok();
1091            inner.file_watcher = watcher;
1092        } else {
1093            if inner.file_watcher.is_some() {
1094                debug!("removing key_dirs watcher");
1095            }
1096            inner.file_watcher = None;
1097        }
1098    }
1099
1100    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1101    /// uploading.
1102    fn note_ipt_change(&self) -> PublishStatus {
1103        let mut ipts = self.ipt_watcher.borrow_for_publish();
1104        match ipts.ipts.as_mut() {
1105            Some(_ipts) => PublishStatus::UploadScheduled,
1106            None => PublishStatus::AwaitingIpts,
1107        }
1108    }
1109
1110    /// Update our list of introduction points.
1111    async fn handle_ipt_change(
1112        &mut self,
1113        update: Option<Result<(), crate::FatalError>>,
1114    ) -> Result<ShutdownStatus, FatalError> {
1115        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1116        match update {
1117            Some(Ok(())) => {
1118                let should_upload = self.note_ipt_change();
1119                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1120
1121                self.mark_all_dirty();
1122                self.update_publish_status_unless_rate_lim(should_upload)
1123                    .await?;
1124                Ok(ShutdownStatus::Continue)
1125            }
1126            Some(Err(e)) => Err(e),
1127            None => {
1128                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1129                Ok(ShutdownStatus::Terminate)
1130            }
1131        }
1132    }
1133
1134    /// Update the `PublishStatus` of the reactor with `new_state`,
1135    /// unless the current state is `AwaitingIpts`.
1136    async fn update_publish_status_unless_waiting(
1137        &mut self,
1138        new_state: PublishStatus,
1139    ) -> Result<(), FatalError> {
1140        // Only update the state if we're not waiting for intro points.
1141        if self.status() != PublishStatus::AwaitingIpts {
1142            self.update_publish_status(new_state).await?;
1143        }
1144
1145        Ok(())
1146    }
1147
1148    /// Update the `PublishStatus` of the reactor with `new_state`,
1149    /// unless the current state is `RateLimited`.
1150    async fn update_publish_status_unless_rate_lim(
1151        &mut self,
1152        new_state: PublishStatus,
1153    ) -> Result<(), FatalError> {
1154        // We can't exit this state until the rate-limit expires.
1155        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1156            self.update_publish_status(new_state).await?;
1157        }
1158
1159        Ok(())
1160    }
1161
1162    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1163    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1164        let onion_status = match new_state {
1165            PublishStatus::Idle => None,
1166            PublishStatus::UploadScheduled
1167            | PublishStatus::AwaitingIpts
1168            | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1169        };
1170
1171        if let Some(onion_status) = onion_status {
1172            self.imm.status_tx.send(onion_status, None);
1173        }
1174
1175        trace!(
1176            "publisher reactor status change: {:?} -> {:?}",
1177            self.status(),
1178            new_state
1179        );
1180
1181        self.publish_status_tx.send(new_state).await.map_err(
1182            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1183        )?;
1184
1185        Ok(())
1186    }
1187
1188    /// Update the onion svc status based on the results of the last descriptor uploads.
1189    fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1190        let inner = self.inner.lock().expect("poisoned lock");
1191        let netdir = inner
1192            .netdir
1193            .as_ref()
1194            .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1195
1196        let (state, err) = upload_result_state(netdir, &inner.time_periods);
1197        self.imm.status_tx.send(state, err);
1198
1199        Ok(())
1200    }
1201
1202    /// Update the descriptors based on the config change.
1203    async fn handle_svc_config_change(
1204        &mut self,
1205        config: &OnionServiceConfig,
1206    ) -> Result<(), FatalError> {
1207        let new_config = Arc::new(config.into());
1208        if self.replace_config_if_changed(Arc::clone(&new_config)) {
1209            self.update_file_watcher();
1210            self.update_authorized_clients_if_changed();
1211
1212            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1213            self.mark_all_dirty();
1214
1215            // Schedule an upload, unless we're still waiting for IPTs.
1216            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1217                .await?;
1218        }
1219
1220        Ok(())
1221    }
1222
1223    /// Update the descriptors based on a restricted discovery key_dirs change.
1224    ///
1225    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1226    /// this marks the descriptor as dirty for all time periods,
1227    /// and schedules a reupload.
1228    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1229        debug!("The configured key_dirs have changed");
1230        match event {
1231            FileEvent::Rescan | FileEvent::FileChanged => {
1232                // These events are handled in the same way, by re-reading the keys from disk
1233                // and republishing the descriptor if necessary
1234            }
1235            _ => return Err(internal!("file watcher event {event:?}").into()),
1236        };
1237
1238        // Update the file watcher, in case the change was triggered by a key_dir move.
1239        self.update_file_watcher();
1240
1241        if self.update_authorized_clients_if_changed() {
1242            self.mark_all_dirty();
1243
1244            // Schedule an upload, unless we're still waiting for IPTs.
1245            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1246                .await?;
1247        }
1248
1249        Ok(())
1250    }
1251
1252    /// Recreate the authorized_clients based on the current config.
1253    ///
1254    /// Returns `true` if the authorized clients have changed.
1255    fn update_authorized_clients_if_changed(&mut self) -> bool {
1256        let mut inner = self.inner.lock().expect("poisoned lock");
1257        let authorized_clients =
1258            Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1259
1260        let clients = &mut inner.authorized_clients;
1261        let changed = clients.as_ref() != authorized_clients.as_ref();
1262
1263        if changed {
1264            info!("The restricted discovery mode authorized clients have changed");
1265            *clients = authorized_clients;
1266        }
1267
1268        changed
1269    }
1270
1271    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1272    fn read_authorized_clients(
1273        config: &RestrictedDiscoveryConfig,
1274        path_resolver: &CfgPathResolver,
1275    ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1276        let authorized_clients = config.read_keys(path_resolver);
1277
1278        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1279            warn!(
1280                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1281            );
1282        }
1283
1284        authorized_clients.map(Arc::new)
1285    }
1286
1287    /// Mark the descriptor dirty for all time periods.
1288    fn mark_all_dirty(&self) {
1289        trace!("marking the descriptor dirty for all time periods");
1290
1291        self.inner
1292            .lock()
1293            .expect("poisoned lock")
1294            .time_periods
1295            .iter_mut()
1296            .for_each(|tp| tp.mark_all_dirty());
1297    }
1298
1299    /// Mark the descriptor dirty for the specified time period.
1300    ///
1301    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1302    fn mark_dirty(&self, period: &TimePeriod) -> bool {
1303        let mut inner = self.inner.lock().expect("poisoned lock");
1304        let period_ctx = inner
1305            .time_periods
1306            .iter_mut()
1307            .find(|tp| tp.params.time_period() == *period);
1308
1309        match period_ctx {
1310            Some(ctx) => {
1311                trace!(time_period=?period, "marking the descriptor dirty");
1312                ctx.mark_all_dirty();
1313                true
1314            }
1315            None => false,
1316        }
1317    }
1318
1319    /// Try to upload our descriptor to the HsDirs that need it.
1320    ///
1321    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1322    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1323    ///
1324    /// Failed uploads are retried
1325    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1326    ///
1327    /// If restricted discovery mode is enabled and there are no authorized clients,
1328    /// we abort the upload and set our status to [`State::Broken`].
1329    //
1330    // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1331    // (for example if the IPTs change),
1332    // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1333    // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1334    //
1335    /// For each current time period, we spawn a task that uploads the descriptor to
1336    /// all the HsDirs on the HsDir ring of that time period.
1337    /// Each task shuts down on completion, or when the reactor is dropped.
1338    ///
1339    /// Each task reports its upload results (`TimePeriodUploadResult`)
1340    /// via the `upload_task_complete_tx` channel.
1341    /// The results are received and processed in the main loop of the reactor.
1342    ///
1343    /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1344    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1345    async fn upload_all(&mut self) -> Result<(), FatalError> {
1346        trace!("starting descriptor upload task...");
1347
1348        // Abort the upload entirely if we have an empty list of authorized clients
1349        let authorized_clients = match self.authorized_clients() {
1350            Ok(authorized_clients) => authorized_clients,
1351            Err(e) => {
1352                error_report!(e, "aborting upload");
1353                self.imm.status_tx.send_broken(e.clone());
1354
1355                // Returning an error would shut down the reactor, so we have to return Ok here.
1356                return Ok(());
1357            }
1358        };
1359
1360        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1361        let now = self.imm.runtime.now();
1362        // Check if we should rate-limit this upload.
1363        if let Some(ts) = last_uploaded {
1364            let duration_since_upload = now.duration_since(ts);
1365
1366            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1367                return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1368            }
1369        }
1370
1371        let mut inner = self.inner.lock().expect("poisoned lock");
1372        let inner = &mut *inner;
1373
1374        let _ = inner.last_uploaded.insert(now);
1375
1376        for period_ctx in inner.time_periods.iter_mut() {
1377            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1378
1379            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1380            // have our latest descriptor, so we filter them out).
1381            let hs_dirs = period_ctx
1382                .hs_dirs
1383                .iter()
1384                .filter_map(|(relay_id, status)| {
1385                    if *status == DescriptorStatus::Dirty {
1386                        Some(relay_id.clone())
1387                    } else {
1388                        None
1389                    }
1390                })
1391                .collect::<Vec<_>>();
1392
1393            if hs_dirs.is_empty() {
1394                trace!("the descriptor is clean for all HSDirs. Nothing to do");
1395                return Ok(());
1396            }
1397
1398            let time_period = period_ctx.params.time_period();
1399            // This scope exists because rng is not Send, so it needs to fall out of scope before we
1400            // await anything.
1401            let netdir = Arc::clone(
1402                inner
1403                    .netdir
1404                    .as_ref()
1405                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
1406            );
1407
1408            let imm = Arc::clone(&self.imm);
1409            let ipt_upload_view = self.ipt_watcher.upload_view();
1410            let config = Arc::clone(&inner.config);
1411            let authorized_clients = authorized_clients.clone();
1412
1413            trace!(nickname=%self.imm.nickname, time_period=?time_period,
1414                "spawning upload task"
1415            );
1416
1417            let params = period_ctx.params.clone();
1418            let shutdown_rx = self.shutdown_tx.subscribe();
1419
1420            // Spawn a task to upload the descriptor to all HsDirs of this time period.
1421            //
1422            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1423            // dropped).
1424            let _handle: () = self
1425                .imm
1426                .runtime
1427                .spawn(async move {
1428                    if let Err(e) = Self::upload_for_time_period(
1429                        hs_dirs,
1430                        &netdir,
1431                        config,
1432                        params,
1433                        Arc::clone(&imm),
1434                        ipt_upload_view.clone(),
1435                        authorized_clients.clone(),
1436                        upload_task_complete_tx,
1437                        shutdown_rx,
1438                    )
1439                    .await
1440                    {
1441                        error_report!(
1442                            e,
1443                            "descriptor upload failed for HS service {} and time period {:?}",
1444                            imm.nickname,
1445                            time_period
1446                        );
1447                    }
1448                })
1449                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1450        }
1451
1452        Ok(())
1453    }
1454
1455    /// Upload the descriptor for the time period specified in `params`.
1456    ///
1457    /// Failed uploads are retried
1458    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1459    #[allow(clippy::too_many_arguments)] // TODO: refactor
1460    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1461    async fn upload_for_time_period(
1462        hs_dirs: Vec<RelayIds>,
1463        netdir: &Arc<NetDir>,
1464        config: Arc<OnionServiceConfigPublisherView>,
1465        params: HsDirParams,
1466        imm: Arc<Immutable<R, M>>,
1467        ipt_upload_view: IptsPublisherUploadView,
1468        authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1469        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1470        shutdown_rx: broadcast::Receiver<Void>,
1471    ) -> Result<(), FatalError> {
1472        let time_period = params.time_period();
1473        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1474
1475        let hsdir_count = hs_dirs.len();
1476
1477        /// An error returned from an upload future.
1478        //
1479        // Exhaustive, because this is a private type.
1480        #[derive(Clone, Debug, thiserror::Error)]
1481        enum PublishError {
1482            /// The upload was aborted because there are no IPTs.
1483            ///
1484            /// This happens because of an inevitable TOCTOU race, where after being notified by
1485            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1486            /// we find out there actually are no IPTs, so we can't build the descriptor.
1487            ///
1488            /// This is a special kind of error that interrupts the current upload task, and is
1489            /// logged at `debug!` level rather than `warn!` or `error!`.
1490            ///
1491            /// Ideally, this shouldn't happen very often (if at all).
1492            #[error("No IPTs")]
1493            NoIpts,
1494
1495            /// The reactor has shut down
1496            #[error("The reactor has shut down")]
1497            Shutdown,
1498
1499            /// An fatal error.
1500            #[error("{0}")]
1501            Fatal(#[from] FatalError),
1502        }
1503
1504        let max_hsdesc_len: usize = netdir
1505            .params()
1506            .hsdir_max_desc_size
1507            .try_into()
1508            .expect("Unable to convert positive int32 to usize!?");
1509
1510        let upload_results = futures::stream::iter(hs_dirs)
1511            .map(|relay_ids| {
1512                let netdir = netdir.clone();
1513                let config = Arc::clone(&config);
1514                let imm = Arc::clone(&imm);
1515                let ipt_upload_view = ipt_upload_view.clone();
1516                let authorized_clients = authorized_clients.clone();
1517                let params = params.clone();
1518                let mut shutdown_rx = shutdown_rx.clone();
1519
1520                let ed_id = relay_ids
1521                    .rsa_identity()
1522                    .map(|id| id.to_string())
1523                    .unwrap_or_else(|| "unknown".into());
1524                let rsa_id = relay_ids
1525                    .rsa_identity()
1526                    .map(|id| id.to_string())
1527                    .unwrap_or_else(|| "unknown".into());
1528
1529                async move {
1530                    let run_upload = |desc| async {
1531                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1532                            // This should never happen (all of our relay_ids are from the stored
1533                            // netdir).
1534                            let err =
1535                                "tried to upload descriptor to relay not found in consensus?!";
1536                            warn!(
1537                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1538                                "{err}"
1539                            );
1540                            return Err(internal!("{err}").into());
1541                        };
1542
1543                        Self::upload_descriptor_with_retries(
1544                            desc,
1545                            &netdir,
1546                            &hsdir,
1547                            &ed_id,
1548                            &rsa_id,
1549                            Arc::clone(&imm),
1550                        )
1551                        .await
1552                    };
1553
1554                    // How long until we're supposed to time out?
1555                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1556                    // We generate a new descriptor before _each_ HsDir upload. This means each
1557                    // HsDir could, in theory, receive a different descriptor (not just in terms of
1558                    // revision-counters, but also with a different set of IPTs). It may seem like
1559                    // this could lead to some HsDirs being left with an outdated descriptor, but
1560                    // that's not the case: after the upload completes, the publisher will be
1561                    // notified by the ipt_watcher of the IPT change event (if there was one to
1562                    // begin with), which will trigger another upload job.
1563                    let hsdesc = {
1564                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
1565                        // needs to fall out of scope before the await point below
1566                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
1567
1568                        // If there are no IPTs, we abort the upload. At this point, we might have
1569                        // uploaded the descriptor to some, but not all, HSDirs from the specified
1570                        // time period.
1571                        //
1572                        // Returning an error here means the upload completion task is never
1573                        // notified of the outcome of any of these uploads (which means the
1574                        // descriptor is not marked clean). This is OK, because if we suddenly find
1575                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1576                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
1577                        // and generate a fresh descriptor anyway.
1578                        //
1579                        // Ideally, this shouldn't happen very often (if at all).
1580                        let Some(ipts) = ipt_set.ipts.as_mut() else {
1581                            return Err(PublishError::NoIpts);
1582                        };
1583
1584                        let hsdesc = {
1585                            trace!(
1586                                nickname=%imm.nickname, time_period=?time_period,
1587                                "building descriptor"
1588                            );
1589                            let mut rng = imm.mockable.thread_rng();
1590                            let mut key_rng = tor_llcrypto::rng::CautiousRng;
1591
1592                            // We're about to generate a new version of the descriptor,
1593                            // so let's generate a new revision counter.
1594                            let now = imm.runtime.wallclock();
1595                            let revision_counter = imm.generate_revision_counter(&params, now)?;
1596
1597                            build_sign(
1598                                &imm.keymgr,
1599                                &imm.pow_manager,
1600                                &config,
1601                                authorized_clients.as_deref(),
1602                                ipts,
1603                                time_period,
1604                                revision_counter,
1605                                &mut rng,
1606                                &mut key_rng,
1607                                imm.runtime.wallclock(),
1608                                max_hsdesc_len,
1609                            )?
1610                        };
1611
1612                        if let Err(e) =
1613                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1614                        {
1615                            let wait = e.log_retry_max(&imm.nickname)?;
1616                            // TODO (#1226): retry instead of this
1617                            return Err(FatalError::Bug(internal!(
1618                                "ought to retry after {wait:?}, crashing instead"
1619                            ))
1620                            .into());
1621                        }
1622
1623                        hsdesc
1624                    };
1625
1626                    let VersionedDescriptor {
1627                        desc,
1628                        revision_counter,
1629                    } = hsdesc;
1630
1631                    trace!(
1632                        nickname=%imm.nickname, time_period=?time_period,
1633                        revision_counter=?revision_counter,
1634                        "generated new descriptor for time period",
1635                    );
1636
1637                    // (Actually launch the upload attempt. No timeout is needed
1638                    // here, since the backoff::Runner code will handle that for us.)
1639                    let upload_res: UploadResult = select_biased! {
1640                        shutdown = shutdown_rx.next().fuse() => {
1641                            // This will always be None, since Void is uninhabited.
1642                            let _: Option<Void> = shutdown;
1643
1644                            // It looks like the reactor has shut down,
1645                            // so there is no point in uploading the descriptor anymore.
1646                            //
1647                            // Let's shut down the upload task too.
1648                            trace!(
1649                                nickname=%imm.nickname, time_period=?time_period,
1650                                "upload task received shutdown signal"
1651                            );
1652
1653                            return Err(PublishError::Shutdown);
1654                        },
1655                        res = run_upload(desc.clone()).fuse() => res,
1656                    };
1657
1658                    // Note: UploadResult::Failure is only returned when
1659                    // upload_descriptor_with_retries fails, i.e. if all our retry
1660                    // attempts have failed
1661                    Ok(HsDirUploadStatus {
1662                        relay_ids,
1663                        upload_res,
1664                        revision_counter,
1665                    })
1666                }
1667            })
1668            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1669            .boxed()
1670            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1671            .try_collect::<Vec<_>>()
1672            .await;
1673
1674        let upload_results = match upload_results {
1675            Ok(v) => v,
1676            Err(PublishError::Fatal(e)) => return Err(e),
1677            Err(PublishError::NoIpts) => {
1678                debug!(
1679                    nickname=%imm.nickname, time_period=?time_period,
1680                     "no introduction points; skipping upload"
1681                );
1682
1683                return Ok(());
1684            }
1685            Err(PublishError::Shutdown) => {
1686                debug!(
1687                    nickname=%imm.nickname, time_period=?time_period,
1688                     "the reactor has shut down; aborting upload"
1689                );
1690
1691                return Ok(());
1692            }
1693        };
1694
1695        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1696            .iter()
1697            .partition(|res| res.upload_res.is_ok());
1698
1699        debug!(
1700            nickname=%imm.nickname, time_period=?time_period,
1701            "descriptor uploaded successfully to {}/{} HSDirs",
1702            succeeded.len(), hsdir_count
1703        );
1704
1705        if upload_task_complete_tx
1706            .send(TimePeriodUploadResult {
1707                time_period,
1708                hsdir_result: upload_results,
1709            })
1710            .await
1711            .is_err()
1712        {
1713            return Err(internal!(
1714                "failed to notify reactor of upload completion (reactor shut down)"
1715            )
1716            .into());
1717        }
1718
1719        Ok(())
1720    }
1721
1722    /// Upload a descriptor to the specified HSDir.
1723    ///
1724    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1725    /// to the caller to retry on failure.
1726    ///
1727    /// This function does not handle timeouts.
1728    async fn upload_descriptor(
1729        hsdesc: String,
1730        netdir: &Arc<NetDir>,
1731        hsdir: &Relay<'_>,
1732        imm: Arc<Immutable<R, M>>,
1733    ) -> Result<(), UploadError> {
1734        let request = HsDescUploadRequest::new(hsdesc);
1735
1736        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1737            "starting descriptor upload",
1738        );
1739
1740        let tunnel = imm
1741            .mockable
1742            .get_or_launch_hs_dir(netdir, OwnedCircTarget::from_circ_target(hsdir))
1743            .await?;
1744        let source: Option<SourceInfo> = tunnel
1745            .source_info()
1746            .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
1747
1748        let mut stream = tunnel
1749            .begin_dir_stream()
1750            .await
1751            .map_err(UploadError::Stream)?;
1752
1753        let _response: String = send_request(&imm.runtime, &request, &mut stream, source)
1754            .await
1755            .map_err(|dir_error| -> UploadError {
1756                match dir_error {
1757                    DirClientError::RequestFailed(e) => e.into(),
1758                    DirClientError::CircMgr(e) => into_internal!(
1759                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1760                    )(e)
1761                    .into(),
1762                    e => into_internal!("unexpected error")(e).into(),
1763                }
1764            })?
1765            .into_output_string()?; // This returns an error if we received an error response
1766
1767        Ok(())
1768    }
1769
1770    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1771    ///
1772    /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1773    /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1774    /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1775    /// according to the [`BackoffSchedule::single_attempt_timeout`].
1776    /// This function gives up after the overall timeout elapses,
1777    /// declaring the upload a failure, and never retrying it again.
1778    ///
1779    /// See also [`BackoffSchedule`].
1780    async fn upload_descriptor_with_retries(
1781        hsdesc: String,
1782        netdir: &Arc<NetDir>,
1783        hsdir: &Relay<'_>,
1784        ed_id: &str,
1785        rsa_id: &str,
1786        imm: Arc<Immutable<R, M>>,
1787    ) -> UploadResult {
1788        /// The base delay to use for the backoff schedule.
1789        const BASE_DELAY_MSEC: u32 = 1000;
1790        let schedule = PublisherBackoffSchedule {
1791            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1792            mockable: imm.mockable.clone(),
1793        };
1794
1795        let runner = Runner::new(
1796            "upload a hidden service descriptor".into(),
1797            schedule.clone(),
1798            imm.runtime.clone(),
1799        );
1800
1801        let fallible_op = || async {
1802            let r = Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm)).await;
1803
1804            if let Err(e) = &r {
1805                if e.should_report_as_suspicious() {
1806                    // Note that not every protocol violation is suspicious:
1807                    // we only warn on the protocol violations that look like attempts
1808                    // to do a traffic tagging attack via hsdir inflation.
1809                    // (See proposal 360.)
1810                    warn_report!(
1811                        e,
1812                        "Suspicious error while uploading descriptor to {}/{}",
1813                        ed_id,
1814                        rsa_id
1815                    );
1816                }
1817            }
1818            r
1819        };
1820
1821        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1822        match outcome {
1823            Ok(()) => {
1824                debug!(
1825                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1826                    "successfully uploaded descriptor to HSDir",
1827                );
1828
1829                Ok(())
1830            }
1831            Err(e) => {
1832                warn_report!(
1833                    e,
1834                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1835                    imm.nickname,
1836                    ed_id,
1837                    rsa_id
1838                );
1839
1840                Err(e.into())
1841            }
1842        }
1843    }
1844
1845    /// Stop publishing descriptors until the specified delay elapses.
1846    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1847        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1848            debug!(
1849                "We are rate-limited for {}; pausing descriptor publication",
1850                humantime::format_duration(delay)
1851            );
1852            let until = self.imm.runtime.now() + delay;
1853            self.update_publish_status(PublishStatus::RateLimited(until))
1854                .await?;
1855        }
1856
1857        Ok(())
1858    }
1859
1860    /// Handle the upload rate-limit being lifted.
1861    async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1862        debug!("We are no longer rate-limited; resuming descriptor publication");
1863        self.update_publish_status(PublishStatus::UploadScheduled)
1864            .await?;
1865        Ok(())
1866    }
1867
1868    /// Return the authorized clients, if restricted mode is enabled.
1869    ///
1870    /// Returns `Ok(None)` if restricted discovery mode is disabled.
1871    ///
1872    /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1873    #[cfg_attr(
1874        not(feature = "restricted-discovery"),
1875        allow(clippy::unnecessary_wraps)
1876    )]
1877    fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1878        cfg_if::cfg_if! {
1879            if #[cfg(feature = "restricted-discovery")] {
1880                let authorized_clients = self
1881                    .inner
1882                    .lock()
1883                    .expect("poisoned lock")
1884                    .authorized_clients
1885                    .clone();
1886
1887                if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1888                    return Err(FatalError::RestrictedDiscoveryNoClients);
1889                }
1890
1891                Ok(authorized_clients)
1892            } else {
1893                Ok(None)
1894            }
1895        }
1896    }
1897}
1898
1899/// Try to expand a path, logging a warning on failure.
1900fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1901    // map_err returns unit for clarity
1902    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1903    p.path(r)
1904        .map_err(|e| {
1905            tor_error::warn_report!(e, "invalid path");
1906            ()
1907        })
1908        .ok()
1909}
1910
1911/// Add `path` to the specified `watcher`.
1912macro_rules! watch_path {
1913    ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1914        if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1915            warn_report!(e, "failed to watch path {:?}", $path);
1916        } else {
1917            debug!("watching path {:?}", $path);
1918        }
1919    }}
1920}
1921
1922/// Add the specified directories to the watcher.
1923#[allow(clippy::cognitive_complexity)]
1924fn watch_dirs<R: Runtime>(
1925    watcher: &mut FileWatcherBuilder<R>,
1926    dirs: &DirectoryKeyProviderList,
1927    path_resolver: &CfgPathResolver,
1928) {
1929    for path in dirs {
1930        let path = path.path();
1931        let Some(path) = maybe_expand_path(path, path_resolver) else {
1932            warn!("failed to expand key_dir path {:?}", path);
1933            continue;
1934        };
1935
1936        // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1937        // so we skip over paths that don't exist at this time
1938        // (this obviously suffers from a TOCTOU race, but most of the time,
1939        // it is good enough at preventing the watcher from failing to watch.
1940        // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1941        // but it will fail to set the watcher).
1942        if matches!(path.try_exists(), Ok(true)) {
1943            watch_path!(watcher, &path, watch_dir, "auth",);
1944        }
1945        // FileWatcher::watch_path causes the parent dir of the path to be watched.
1946        if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1947            watch_path!(watcher, &path, watch_path,);
1948        }
1949    }
1950}
1951
1952/// Try to read the blinded identity key for a given `TimePeriod`.
1953///
1954/// Returns `None` if the service is running in "offline" mode.
1955///
1956// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1957// `Ok(None)`.
1958pub(super) fn read_blind_id_keypair(
1959    keymgr: &Arc<KeyMgr>,
1960    nickname: &HsNickname,
1961    period: TimePeriod,
1962) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1963    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1964    let hsid_kp = keymgr
1965        .get::<HsIdKeypair>(&svc_key_spec)?
1966        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1967
1968    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1969
1970    // TODO: make the keystore selector configurable
1971    let keystore_selector = Default::default();
1972    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1973        Some(kp) => Ok(Some(kp)),
1974        None => {
1975            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1976                .compute_blinded_key(period)
1977                .map_err(|_| internal!("failed to compute blinded key"))?;
1978
1979            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1980            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1981            // which assumes keys are randomly generated, rather than derived from existing keys).
1982
1983            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1984
1985            let arti_path = |spec: &dyn KeySpecifier| {
1986                spec.arti_path()
1987                    .map_err(into_internal!("invalid key specifier?!"))
1988            };
1989
1990            Ok(Some(
1991                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1992                    FatalError::KeystoreRace {
1993                        action: "read",
1994                        path: arti_path(&blind_id_key_spec)?,
1995                    },
1996                )?,
1997            ))
1998        }
1999    }
2000}
2001
2002/// Determine the [`State`] of the publisher based on the upload results
2003/// from the current `time_periods`.
2004fn upload_result_state(
2005    netdir: &NetDir,
2006    time_periods: &[TimePeriodContext],
2007) -> (State, Option<Problem>) {
2008    let current_period = netdir.hs_time_period();
2009    let current_period_res = time_periods
2010        .iter()
2011        .find(|ctx| ctx.params.time_period() == current_period);
2012
2013    let succeeded_current_tp = current_period_res
2014        .iter()
2015        .flat_map(|res| &res.upload_results)
2016        .filter(|res| res.upload_res.is_ok())
2017        .collect_vec();
2018
2019    let secondary_tp_res = time_periods
2020        .iter()
2021        .filter(|ctx| ctx.params.time_period() != current_period)
2022        .collect_vec();
2023
2024    let succeeded_secondary_tp = secondary_tp_res
2025        .iter()
2026        .flat_map(|res| &res.upload_results)
2027        .filter(|res| res.upload_res.is_ok())
2028        .collect_vec();
2029
2030    // All of the failed uploads (for all TPs)
2031    let failed = time_periods
2032        .iter()
2033        .flat_map(|res| &res.upload_results)
2034        .filter(|res| res.upload_res.is_err())
2035        .collect_vec();
2036    let problems: Vec<DescUploadRetryError> = failed
2037        .iter()
2038        .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
2039        .collect();
2040
2041    let err = match problems.as_slice() {
2042        [_, ..] => Some(problems.into()),
2043        [] => None,
2044    };
2045
2046    if time_periods.len() < 2 {
2047        // We need at least TP contexts (one for the primary TP,
2048        // and another for the secondary one).
2049        //
2050        // If either is missing, we are unreachable for some or all clients.
2051        return (State::DegradedUnreachable, err);
2052    }
2053
2054    let state = match (
2055        succeeded_current_tp.as_slice(),
2056        succeeded_secondary_tp.as_slice(),
2057    ) {
2058        (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
2059            // We don't have any upload results for one or both TPs.
2060            // We are still bootstrapping.
2061            State::Bootstrapping
2062        }
2063        (&[_, ..], &[_, ..]) if failed.is_empty() => {
2064            // We have uploaded the descriptor to one or more HsDirs from both
2065            // HsDir rings (primary and secondary), and none of the uploads failed.
2066            // We are fully reachable.
2067            State::Running
2068        }
2069        (&[_, ..], &[_, ..]) => {
2070            // We have uploaded the descriptor to one or more HsDirs from both
2071            // HsDir rings (primary and secondary), but some of the uploads failed.
2072            // We are reachable, but we failed to upload the descriptor to all the HsDirs
2073            // that were supposed to have it.
2074            State::DegradedReachable
2075        }
2076        (&[..], &[]) | (&[], &[..]) => {
2077            // We have either
2078            //   * uploaded the descriptor to some of the HsDirs from one of the rings,
2079            //   but haven't managed to upload it to any of the HsDirs on the other ring, or
2080            //   * all of the uploads failed
2081            //
2082            // Either way, we are definitely not reachable by all clients.
2083            State::DegradedUnreachable
2084        }
2085    };
2086
2087    (state, err)
2088}
2089
2090/// Whether the reactor should initiate an upload.
2091#[derive(Copy, Clone, Debug, Default, PartialEq)]
2092enum PublishStatus {
2093    /// We need to call upload_all.
2094    UploadScheduled,
2095    /// We are rate-limited until the specified [`Instant`].
2096    ///
2097    /// We have tried to schedule multiple uploads in a short time span,
2098    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2099    /// channel to unblock us.
2100    RateLimited(Instant),
2101    /// We are idle and waiting for external events.
2102    ///
2103    /// We have enough information to build the descriptor, but since we have already called
2104    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2105    Idle,
2106    /// We are waiting for the IPT manager to establish some introduction points.
2107    ///
2108    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2109    /// `UploadScheduled`.
2110    #[default]
2111    AwaitingIpts,
2112}
2113
2114/// The backoff schedule for the task that publishes descriptors.
2115#[derive(Clone, Debug)]
2116struct PublisherBackoffSchedule<M: Mockable> {
2117    /// The delays
2118    retry_delay: RetryDelay,
2119    /// The mockable reactor state, needed for obtaining an rng.
2120    mockable: M,
2121}
2122
2123impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2124    fn max_retries(&self) -> Option<usize> {
2125        None
2126    }
2127
2128    fn overall_timeout(&self) -> Option<Duration> {
2129        Some(OVERALL_UPLOAD_TIMEOUT)
2130    }
2131
2132    fn single_attempt_timeout(&self) -> Option<Duration> {
2133        Some(self.mockable.estimate_upload_timeout())
2134    }
2135
2136    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2137        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2138    }
2139}
2140
2141impl RetriableError for UploadError {
2142    fn should_retry(&self) -> bool {
2143        match self {
2144            UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2145            UploadError::Bug(_) => false,
2146        }
2147    }
2148}
2149
2150/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2151#[derive(Debug, Clone)]
2152struct TimePeriodUploadResult {
2153    /// The time period.
2154    time_period: TimePeriod,
2155    /// The upload results.
2156    hsdir_result: Vec<HsDirUploadStatus>,
2157}
2158
2159/// The outcome of uploading a descriptor to a particular HsDir.
2160#[derive(Clone, Debug)]
2161struct HsDirUploadStatus {
2162    /// The identity of the HsDir we attempted to upload the descriptor to.
2163    relay_ids: RelayIds,
2164    /// The outcome of this attempt.
2165    upload_res: UploadResult,
2166    /// The revision counter of the descriptor we tried to upload.
2167    revision_counter: RevisionCounter,
2168}
2169
2170/// The outcome of uploading a descriptor.
2171type UploadResult = Result<(), DescUploadRetryError>;
2172
2173impl From<BackoffError<UploadError>> for DescUploadRetryError {
2174    fn from(e: BackoffError<UploadError>) -> Self {
2175        use BackoffError as BE;
2176        use DescUploadRetryError as DURE;
2177
2178        match e {
2179            BE::FatalError(e) => DURE::FatalError(e),
2180            BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2181            BE::Timeout(e) => DURE::Timeout(e),
2182            BE::ExplicitStop(_) => {
2183                DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2184            }
2185        }
2186    }
2187}
2188
2189// NOTE: the rest of the publisher tests live in publish.rs
2190#[cfg(test)]
2191mod test {
2192    // @@ begin test lint list maintained by maint/add_warning @@
2193    #![allow(clippy::bool_assert_comparison)]
2194    #![allow(clippy::clone_on_copy)]
2195    #![allow(clippy::dbg_macro)]
2196    #![allow(clippy::mixed_attributes_style)]
2197    #![allow(clippy::print_stderr)]
2198    #![allow(clippy::print_stdout)]
2199    #![allow(clippy::single_char_pattern)]
2200    #![allow(clippy::unwrap_used)]
2201    #![allow(clippy::unchecked_time_subtraction)]
2202    #![allow(clippy::useless_vec)]
2203    #![allow(clippy::needless_pass_by_value)]
2204    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2205    use super::*;
2206    use tor_netdir::testnet;
2207
2208    /// Create a `TimePeriodContext` from the specified upload results.
2209    fn create_time_period_ctx(
2210        params: &HsDirParams,
2211        upload_results: Vec<HsDirUploadStatus>,
2212    ) -> TimePeriodContext {
2213        TimePeriodContext {
2214            params: params.clone(),
2215            hs_dirs: vec![],
2216            last_successful: None,
2217            upload_results,
2218        }
2219    }
2220
2221    /// Create a single `HsDirUploadStatus`
2222    fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2223        HsDirUploadStatus {
2224            relay_ids: RelayIds::empty(),
2225            upload_res,
2226            revision_counter: RevisionCounter::from(13),
2227        }
2228    }
2229
2230    /// Create a bunch of results, all with the specified `upload_res`.
2231    fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2232        std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2233            .take(10)
2234            .collect()
2235    }
2236
2237    fn construct_netdir() -> NetDir {
2238        const SRV1: [u8; 32] = *b"The door refused to open.       ";
2239        const SRV2: [u8; 32] = *b"It said, 'Five cents, please.'  ";
2240
2241        let dir = testnet::construct_custom_netdir(|_, _, bld| {
2242            bld.shared_rand_prev(7, SRV1.into(), None)
2243                .shared_rand_prev(7, SRV2.into(), None);
2244        })
2245        .unwrap();
2246
2247        dir.unwrap_if_sufficient().unwrap()
2248    }
2249
2250    #[test]
2251    fn upload_result_status_bootstrapping() {
2252        let netdir = construct_netdir();
2253        let all_params = netdir.hs_all_time_periods();
2254        let current_period = netdir.hs_time_period();
2255        let primary_params = all_params
2256            .iter()
2257            .find(|param| param.time_period() == current_period)
2258            .unwrap();
2259        let results = [
2260            (vec![], vec![]),
2261            (vec![], create_upload_results(Ok(()))),
2262            (create_upload_results(Ok(())), vec![]),
2263        ];
2264
2265        for (primary_result, secondary_result) in results {
2266            let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2267
2268            let secondary_params = all_params
2269                .iter()
2270                .find(|param| param.time_period() != current_period)
2271                .unwrap();
2272            let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2273
2274            let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2275            assert_eq!(status, State::Bootstrapping);
2276            assert!(err.is_none());
2277        }
2278    }
2279
2280    #[test]
2281    fn upload_result_status_running() {
2282        let netdir = construct_netdir();
2283        let all_params = netdir.hs_all_time_periods();
2284        let current_period = netdir.hs_time_period();
2285        let primary_params = all_params
2286            .iter()
2287            .find(|param| param.time_period() == current_period)
2288            .unwrap();
2289
2290        let secondary_result = create_upload_results(Ok(()));
2291        let secondary_params = all_params
2292            .iter()
2293            .find(|param| param.time_period() != current_period)
2294            .unwrap();
2295        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2296
2297        let primary_result = create_upload_results(Ok(()));
2298        let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2299        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2300        assert_eq!(status, State::Running);
2301        assert!(err.is_none());
2302    }
2303
2304    #[test]
2305    fn upload_result_status_reachable() {
2306        let netdir = construct_netdir();
2307        let all_params = netdir.hs_all_time_periods();
2308        let current_period = netdir.hs_time_period();
2309        let primary_params = all_params
2310            .iter()
2311            .find(|param| param.time_period() == current_period)
2312            .unwrap();
2313
2314        let primary_result = create_upload_results(Ok(()));
2315        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2316        let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2317        let secondary_result = create_upload_results(Ok(()))
2318            .into_iter()
2319            .chain(failed_res.iter().cloned())
2320            .collect();
2321        let secondary_params = all_params
2322            .iter()
2323            .find(|param| param.time_period() != current_period)
2324            .unwrap();
2325        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2326        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2327
2328        // Degraded but reachable (because some of the secondary HsDir uploads failed).
2329        assert_eq!(status, State::DegradedReachable);
2330        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2331    }
2332
2333    #[test]
2334    fn upload_result_status_unreachable() {
2335        let netdir = construct_netdir();
2336        let all_params = netdir.hs_all_time_periods();
2337        let current_period = netdir.hs_time_period();
2338        let primary_params = all_params
2339            .iter()
2340            .find(|param| param.time_period() == current_period)
2341            .unwrap();
2342        let mut primary_result =
2343            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2344        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2345        // No secondary TP (we are unreachable).
2346        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2347        assert_eq!(status, State::DegradedUnreachable);
2348        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2349
2350        // Add a successful result
2351        primary_result.push(create_upload_status(Ok(())));
2352        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2353        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2354        // Still degraded, and unreachable (because we don't have a TimePeriodContext
2355        // for the secondary TP)
2356        assert_eq!(status, State::DegradedUnreachable);
2357        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2358
2359        // If we add another time period where none of the uploads were successful,
2360        // we're *still* unreachable
2361        let secondary_result =
2362            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2363        let secondary_params = all_params
2364            .iter()
2365            .find(|param| param.time_period() != current_period)
2366            .unwrap();
2367        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2368        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2369        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2370        assert_eq!(status, State::DegradedUnreachable);
2371        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2372    }
2373}