tor_hsservice/publish/reactor.rs
1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//! * the introduction points have changed
25//! * the onion service configuration has changed in a meaningful way (for example,
26//! if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//! has changed. See [`OnionServiceConfigPublisherView`]).
28//! * there is a new consensus
29//! * it is time to republish the descriptor (after we upload a descriptor,
30//! we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//! in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//! update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//! +---------------------------------------+
43//! | |
44//! | v
45//! | +---------------+
46//! | | Bootstrapping |
47//! | +---------------+
48//! | |
49//! | | uploaded to at least
50//! | not enough HsDir uploads succeeded | some HsDirs from each ring
51//! | +-----------------------------+-----------------------+
52//! | | | |
53//! | | all HsDir uploads succeeded |
54//! | | | |
55//! | v v v
56//! | +---------------------+ +---------+ +---------------------+
57//! | | DegradedUnreachable | | Running | | DegradedReachable |
58//! +----------+ | +---------------------+ +---------+ +---------------------+
59//! | Shutdown |-- | | | |
60//! +----------+ | | | |
61//! | | | |
62//! | | | |
63//! | +---------------------------+------------------------+
64//! | | invalid authorized_clients
65//! | | after handling config change
66//! | |
67//! | v
68//! | run_once() returns an error +--------+
69//! +-------------------------------->| Broken |
70//! +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_circmgr::ServiceOnionServiceDirTunnel;
77use tor_config::file_watcher::{
78 self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
79};
80use tor_config_path::{CfgPath, CfgPathResolver};
81use tor_dirclient::SourceInfo;
82use tor_netdir::{DirEvent, NetDir};
83use tracing::instrument;
84
85use crate::config::OnionServiceConfigPublisherView;
86use crate::config::restricted_discovery::{
87 DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
88};
89use crate::status::{DescUploadRetryError, Problem};
90
91use super::*;
92use derive_more::From;
93
94// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
95// for rate-limiting the publish jobs triggered by a change in the config?
96//
97// Currently the descriptor publish tasks triggered by changes in the config
98// are rate-limited via the usual rate limiting mechanism
99// (which rate-limits the uploads for 1m).
100//
101// I think this is OK for now, but we might need to rethink this if it becomes problematic
102// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
103// each time the config is modified).
104
105/// The upload rate-limiting threshold.
106///
107/// Before initiating an upload, the reactor checks if the last upload was at least
108/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
109/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
110/// current time.
111//
112// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
113const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
114
115/// The maximum number of concurrent upload tasks per time period.
116//
117// TODO: this value was arbitrarily chosen and may not be optimal. For now, it
118// will have no effect, since the current number of replicas is far less than
119// this value.
120//
121// The uploads for all TPs happen in parallel. As a result, the actual limit for the maximum
122// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
123// (currently 2, which means the concurrency limit will, in fact, be 32).
124//
125// We should try to decouple this value from the TP parameters.
126const MAX_CONCURRENT_UPLOADS: usize = 16;
127
128/// The maximum time allowed for uploading a descriptor to a single HSDir,
129/// across all attempts.
130pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
131
132/// A reactor for the HsDir [`Publisher`]
133///
134/// The entrypoint is [`Reactor::run`].
135#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
136pub(super) struct Reactor<R: Runtime, M: Mockable> {
137 /// The immutable, shared inner state.
138 imm: Arc<Immutable<R, M>>,
139 /// A source for new network directories that we use to determine
140 /// our HsDirs.
141 dir_provider: Arc<dyn NetDirProvider>,
142 /// The mutable inner state,
143 inner: Arc<Mutex<Inner>>,
144 /// A channel for receiving IPT change notifications.
145 ipt_watcher: IptsPublisherView,
146 /// A channel for receiving onion service config change notifications.
147 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
148 /// A channel for receiving restricted discovery key_dirs change notifications.
149 key_dirs_rx: FileEventReceiver,
150 /// A channel for sending restricted discovery key_dirs change notifications.
151 ///
152 /// A copy of this sender is handed out to every `FileWatcher` created.
153 key_dirs_tx: FileEventSender,
154 /// A channel for receiving updates regarding our [`PublishStatus`].
155 ///
156 /// The main loop of the reactor watches for updates on this channel.
157 ///
158 /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
159 /// we can start publishing descriptors.
160 ///
161 /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
162 /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
163 /// established some introduction points.
164 publish_status_rx: watch::Receiver<PublishStatus>,
165 /// A sender for updating our [`PublishStatus`].
166 ///
167 /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
168 /// we can start publishing descriptors.
169 publish_status_tx: watch::Sender<PublishStatus>,
170 /// A channel for sending upload completion notifications.
171 ///
172 /// This channel is polled in the main loop of the reactor.
173 upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
174 /// A channel for receiving upload completion notifications.
175 ///
176 /// A copy of this sender is handed to each upload task.
177 upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
178 /// A sender for notifying any pending upload tasks that the reactor is shutting down.
179 ///
180 /// Receivers can use this channel to find out when reactor is dropped.
181 ///
182 /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
183 /// Any future background tasks can also use this channel to detect if the reactor is dropped.
184 ///
185 /// Closing this channel will cause any pending upload tasks to be dropped.
186 shutdown_tx: broadcast::Sender<Void>,
187 /// Path resolver for configuration files.
188 path_resolver: Arc<CfgPathResolver>,
189 /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
190 /// rotated and thus we need to republish the descriptor for a particular time period.
191 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
192}
193
194/// The immutable, shared state of the descriptor publisher reactor.
195#[derive(Clone)]
196struct Immutable<R: Runtime, M: Mockable> {
197 /// The runtime.
198 runtime: R,
199 /// Mockable state.
200 ///
201 /// This is used for launching circuits and for obtaining random number generators.
202 mockable: M,
203 /// The service for which we're publishing descriptors.
204 nickname: HsNickname,
205 /// The key manager,
206 keymgr: Arc<KeyMgr>,
207 /// A sender for updating the status of the onion service.
208 status_tx: PublisherStatusSender,
209 /// Proof-of-work state.
210 pow_manager: Arc<PowManager<R>>,
211}
212
213impl<R: Runtime, M: Mockable> Immutable<R, M> {
214 /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
215 /// with the specified [`TimePeriod`].
216 ///
217 /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
218 /// the private part of the blinded identity key. Otherwise, the key is the private part of the
219 /// descriptor signing key.
220 ///
221 /// Returns an error if the service is running in offline mode and the descriptor signing
222 /// keypair of the specified `period` is not available.
223 //
224 // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
225 // built from the blinded id key
226 fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
227 let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
228 Some(key) => {
229 let key: ed25519::ExpandedKeypair = key.into();
230 key.to_secret_key_bytes()[0..32]
231 .try_into()
232 .expect("Wrong length on slice")
233 }
234 None => {
235 // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
236 // is unreachable (for now).
237 let desc_sign_key_spec =
238 DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
239 let key: ed25519::Keypair = self
240 .keymgr
241 .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
242 // TODO (#1194): internal! is not the right type for this error (we need an
243 // error type for the case where a hidden service running in offline mode has
244 // run out of its pre-previsioned keys).
245 //
246 // This will be addressed when we add support for offline hs_id mode
247 .ok_or_else(|| {
248 internal!(
249 "identity keys are offline, but descriptor signing key is unavailable?!"
250 )
251 })?
252 .into();
253 key.to_bytes()
254 }
255 };
256
257 Ok(AesOpeKey::from_secret(&ope_key))
258 }
259
260 /// Generate a revision counter for a descriptor associated with the specified
261 /// [`TimePeriod`].
262 ///
263 /// Returns a revision counter generated according to the [encrypted time in period] scheme.
264 ///
265 /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
266 fn generate_revision_counter(
267 &self,
268 params: &HsDirParams,
269 now: SystemTime,
270 ) -> Result<RevisionCounter, FatalError> {
271 // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
272 // to each time we generate a new descriptor), for performance reasons.
273 let ope_key = self.create_ope_key(params.time_period())?;
274
275 // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
276 let srv_start = params.start_of_shard_rand_period();
277 let offset = params.offset_within_srv_period(now).ok_or_else(|| {
278 internal!(
279 "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
280 now,
281 srv_start
282 )
283 })?;
284 let rev = ope_key.encrypt(offset);
285
286 Ok(RevisionCounter::from(rev))
287 }
288}
289
290/// Mockable state for the descriptor publisher reactor.
291///
292/// This enables us to mock parts of the [`Reactor`] for testing purposes.
293#[async_trait]
294pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
295 /// The type of random number generator.
296 type Rng: rand::Rng + rand::CryptoRng;
297
298 /// The type of client circuit.
299 type Tunnel: MockableDirTunnel;
300
301 /// Return a random number generator.
302 fn thread_rng(&self) -> Self::Rng;
303
304 /// Create a circuit of the specified `kind` to `target`.
305 async fn get_or_launch_hs_dir<T>(
306 &self,
307 netdir: &NetDir,
308 target: T,
309 ) -> Result<Self::Tunnel, tor_circmgr::Error>
310 where
311 T: CircTarget + Send + Sync;
312
313 /// Return an estimate-based value for how long we should allow a single
314 /// directory upload operation to complete.
315 ///
316 /// Includes circuit construction, stream opening, upload, and waiting for a
317 /// response.
318 fn estimate_upload_timeout(&self) -> Duration;
319}
320
321/// Mockable client circuit
322#[async_trait]
323pub(crate) trait MockableDirTunnel: Send + Sync {
324 /// The data stream type.
325 type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
326
327 /// Start a new stream to the last relay in the circuit, using
328 /// a BEGIN_DIR cell.
329 async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error>;
330
331 /// Try to get a SourceInfo for this circuit, for using it in a directory request.
332 fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
333}
334
335#[async_trait]
336impl MockableDirTunnel for ServiceOnionServiceDirTunnel {
337 type DataStream = tor_proto::client::stream::DataStream;
338
339 async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error> {
340 Self::begin_dir_stream(self).await
341 }
342
343 fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
344 SourceInfo::from_tunnel(self)
345 }
346}
347
348/// The real version of the mockable state of the reactor.
349#[derive(Clone, From, Into)]
350pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
351
352#[async_trait]
353impl<R: Runtime> Mockable for Real<R> {
354 type Rng = rand::rngs::ThreadRng;
355 type Tunnel = ServiceOnionServiceDirTunnel;
356
357 fn thread_rng(&self) -> Self::Rng {
358 rand::rng()
359 }
360
361 #[instrument(level = "trace", skip_all)]
362 async fn get_or_launch_hs_dir<T>(
363 &self,
364 netdir: &NetDir,
365 target: T,
366 ) -> Result<Self::Tunnel, tor_circmgr::Error>
367 where
368 T: CircTarget + Send + Sync,
369 {
370 self.0.get_or_launch_svc_dir(netdir, target).await
371 }
372
373 fn estimate_upload_timeout(&self) -> Duration {
374 use tor_circmgr::timeouts::Action;
375 let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
376 let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
377 // We assume that in the worst case we'll have to wait for an entire
378 // circuit construction and two round-trips to the hsdir.
379 let est_total = est_build + est_roundtrip * 2;
380 // We always allow _at least_ this much time, in case our estimate is
381 // ridiculously low.
382 let min_timeout = Duration::from_secs(30);
383 max(est_total, min_timeout)
384 }
385}
386
387/// The mutable state of a [`Reactor`].
388struct Inner {
389 /// The onion service config.
390 config: Arc<OnionServiceConfigPublisherView>,
391 /// Watcher for key_dirs.
392 ///
393 /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
394 ///
395 /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
396 file_watcher: Option<FileWatcher>,
397 /// The relevant time periods.
398 ///
399 /// This includes the current time period, as well as any other time periods we need to be
400 /// publishing descriptors for.
401 ///
402 /// This is empty until we fetch our first netdir in [`Reactor::run`].
403 time_periods: Vec<TimePeriodContext>,
404 /// Our most up to date netdir.
405 ///
406 /// This is initialized in [`Reactor::run`].
407 netdir: Option<Arc<NetDir>>,
408 /// The timestamp of our last upload.
409 ///
410 /// This is the time when the last update was _initiated_ (rather than completed), to prevent
411 /// the publisher from spawning multiple upload tasks at once in response to multiple external
412 /// events happening in quick succession, such as the IPT manager sending multiple IPT change
413 /// notifications in a short time frame (#1142), or an IPT change notification that's
414 /// immediately followed by a consensus change. Starting two upload tasks at once is not only
415 /// inefficient, but it also causes the publisher to generate two different descriptors with
416 /// the same revision counter (the revision counter is derived from the current timestamp),
417 /// which ultimately causes the slower upload task to fail (see #1142).
418 ///
419 /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
420 /// used for retrying failed uploads (these are handled internally by
421 /// [`Reactor::upload_descriptor_with_retries`]).
422 last_uploaded: Option<Instant>,
423 /// A max-heap containing the time periods for which we need to reupload the descriptor.
424 // TODO: we are currently reuploading more than nececessary.
425 // Ideally, this shouldn't contain contain duplicate TimePeriods,
426 // because we only need to retain the latest reupload time for each time period.
427 //
428 // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
429 // we will end up with multiple ReuploadTimer entries for that TP,
430 // each of which will (eventually) result in a reupload.
431 //
432 // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
433 //
434 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
435 reupload_timers: BinaryHeap<ReuploadTimer>,
436 /// The restricted discovery authorized clients.
437 ///
438 /// `None`, unless the service is running in restricted discovery mode.
439 authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
440}
441
442/// The part of the reactor state that changes with every time period.
443struct TimePeriodContext {
444 /// The HsDir params.
445 params: HsDirParams,
446 /// The HsDirs to use in this time period.
447 ///
448 // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
449 // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
450 // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
451 // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
452 hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
453 /// The revision counter of the last successful upload, if any.
454 last_successful: Option<RevisionCounter>,
455 /// The outcome of the last upload, if any.
456 upload_results: Vec<HsDirUploadStatus>,
457}
458
459impl TimePeriodContext {
460 /// Create a new `TimePeriodContext`.
461 ///
462 /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
463 /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
464 fn new<'r>(
465 params: HsDirParams,
466 blind_id: HsBlindId,
467 netdir: &Arc<NetDir>,
468 old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
469 old_upload_results: Vec<HsDirUploadStatus>,
470 ) -> Result<Self, FatalError> {
471 let period = params.time_period();
472 let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
473 let upload_results = old_upload_results
474 .into_iter()
475 .filter(|res|
476 // Check if the HsDir of this result still exists
477 hs_dirs
478 .iter()
479 .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
480 .collect();
481
482 Ok(Self {
483 params,
484 hs_dirs,
485 last_successful: None,
486 upload_results,
487 })
488 }
489
490 /// Recompute the HsDirs for this time period.
491 fn compute_hsdirs<'r>(
492 period: TimePeriod,
493 blind_id: HsBlindId,
494 netdir: &Arc<NetDir>,
495 mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
496 ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
497 let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
498
499 Ok(hs_dirs
500 .map(|hs_dir| {
501 let mut builder = RelayIds::builder();
502 if let Some(ed_id) = hs_dir.ed_identity() {
503 builder.ed_identity(*ed_id);
504 }
505
506 if let Some(rsa_id) = hs_dir.rsa_identity() {
507 builder.rsa_identity(*rsa_id);
508 }
509
510 let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
511
512 // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
513 // reupload it unless it was already dirty and due for a reupload.
514 let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
515 Some((_, status)) => *status,
516 None => DescriptorStatus::Dirty,
517 };
518
519 (relay_id, status)
520 })
521 .collect::<Vec<_>>())
522 }
523
524 /// Mark the descriptor dirty for all HSDirs of this time period.
525 fn mark_all_dirty(&mut self) {
526 self.hs_dirs
527 .iter_mut()
528 .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
529 }
530
531 /// Update the upload result for this time period.
532 fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
533 self.upload_results = upload_results;
534 }
535}
536
537/// An error that occurs while trying to upload a descriptor.
538#[derive(Clone, Debug, thiserror::Error)]
539#[non_exhaustive]
540pub enum UploadError {
541 /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
542 #[error("descriptor upload request failed: {}", _0.error)]
543 Request(#[from] RequestFailedError),
544
545 /// Failed to establish circuit to hidden service directory
546 #[error("could not build circuit to HsDir")]
547 Circuit(#[from] tor_circmgr::Error),
548
549 /// Failed to establish stream to hidden service directory
550 #[error("failed to establish directory stream to HsDir")]
551 Stream(#[source] tor_circmgr::Error),
552
553 /// An internal error.
554 #[error("Internal error")]
555 Bug(#[from] tor_error::Bug),
556}
557define_asref_dyn_std_error!(UploadError);
558
559impl UploadError {
560 /// Return true if this error is one that we should report as a suspicious event,
561 /// along with the dirserver, and description of the relevant document.
562 pub(crate) fn should_report_as_suspicious(&self) -> bool {
563 match self {
564 UploadError::Request(e) => e.error.should_report_as_suspicious_if_anon(),
565 UploadError::Circuit(_) => false, // TODO prop360
566 UploadError::Stream(_) => false, // TODO prop360
567 UploadError::Bug(_) => false,
568 }
569 }
570}
571
572impl<R: Runtime, M: Mockable> Reactor<R, M> {
573 /// Create a new `Reactor`.
574 #[allow(clippy::too_many_arguments)]
575 pub(super) fn new(
576 runtime: R,
577 nickname: HsNickname,
578 dir_provider: Arc<dyn NetDirProvider>,
579 mockable: M,
580 config: &OnionServiceConfig,
581 ipt_watcher: IptsPublisherView,
582 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
583 status_tx: PublisherStatusSender,
584 keymgr: Arc<KeyMgr>,
585 path_resolver: Arc<CfgPathResolver>,
586 pow_manager: Arc<PowManager<R>>,
587 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
588 ) -> Self {
589 /// The maximum size of the upload completion notifier channel.
590 ///
591 /// The channel we use this for is a futures::mpsc channel, which has a capacity of
592 /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
593 /// each sender will send exactly one message.
594 const UPLOAD_CHAN_BUF_SIZE: usize = 0;
595
596 // Internally-generated instructions, no need for mq.
597 let (upload_task_complete_tx, upload_task_complete_rx) =
598 mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
599
600 let (publish_status_tx, publish_status_rx) = watch::channel();
601 // Setting the buffer size to zero here is OK,
602 // since we never actually send anything on this channel.
603 let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
604
605 let authorized_clients =
606 Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
607
608 // Create a channel for watching for changes in the configured
609 // restricted_discovery.key_dirs.
610 let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
611
612 let imm = Immutable {
613 runtime,
614 mockable,
615 nickname,
616 keymgr,
617 status_tx,
618 pow_manager,
619 };
620
621 let inner = Inner {
622 time_periods: vec![],
623 config: Arc::new(config.into()),
624 file_watcher: None,
625 netdir: None,
626 last_uploaded: None,
627 reupload_timers: Default::default(),
628 authorized_clients,
629 };
630
631 Self {
632 imm: Arc::new(imm),
633 inner: Arc::new(Mutex::new(inner)),
634 dir_provider,
635 ipt_watcher,
636 config_rx,
637 key_dirs_rx,
638 key_dirs_tx,
639 publish_status_rx,
640 publish_status_tx,
641 upload_task_complete_rx,
642 upload_task_complete_tx,
643 shutdown_tx,
644 path_resolver,
645 update_from_pow_manager_rx,
646 }
647 }
648
649 /// Start the reactor.
650 ///
651 /// Under normal circumstances, this function runs indefinitely.
652 ///
653 /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
654 /// upload fails or is rate-limited.
655 pub(super) async fn run(mut self) -> Result<(), FatalError> {
656 debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
657
658 {
659 let netdir = self
660 .dir_provider
661 .wait_for_netdir(Timeliness::Timely)
662 .await?;
663 let time_periods = self.compute_time_periods(&netdir, &[])?;
664
665 let mut inner = self.inner.lock().expect("poisoned lock");
666
667 inner.netdir = Some(netdir);
668 inner.time_periods = time_periods;
669 }
670
671 // Create the initial key_dirs watcher.
672 self.update_file_watcher();
673
674 loop {
675 match self.run_once().await {
676 Ok(ShutdownStatus::Continue) => continue,
677 Ok(ShutdownStatus::Terminate) => {
678 debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
679
680 self.imm.status_tx.send_shutdown();
681 return Ok(());
682 }
683 Err(e) => {
684 error_report!(
685 e,
686 "HS service {}: descriptor publisher crashed!",
687 self.imm.nickname
688 );
689
690 self.imm.status_tx.send_broken(e.clone());
691
692 return Err(e);
693 }
694 }
695 }
696 }
697
698 /// Run one iteration of the reactor loop.
699 #[allow(clippy::cognitive_complexity)] // TODO: Refactor
700 async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
701 let mut netdir_events = self.dir_provider.events();
702
703 // Note: TrackingNow tracks the values it is compared with.
704 // This is equivalent to sleeping for (until - now) units of time,
705 let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
706 if let PublishStatus::RateLimited(until) = self.status() {
707 if upload_rate_lim > until {
708 // We are no longer rate-limited
709 self.expire_rate_limit().await?;
710 }
711 }
712
713 let reupload_tracking = TrackingNow::now(&self.imm.runtime);
714 let mut reupload_periods = vec![];
715 {
716 let mut inner = self.inner.lock().expect("poisoned lock");
717 let inner = &mut *inner;
718 while let Some(reupload) = inner.reupload_timers.peek().copied() {
719 // First, extract all the timeouts that already elapsed.
720 if reupload.when <= reupload_tracking {
721 inner.reupload_timers.pop();
722 reupload_periods.push(reupload.period);
723 } else {
724 // We are not ready to schedule any more reuploads.
725 //
726 // How much we need to sleep is implicitly
727 // tracked in reupload_tracking (through
728 // the TrackingNow implementation)
729 break;
730 }
731 }
732 }
733
734 // Check if it's time to schedule any reuploads.
735 for period in reupload_periods {
736 if self.mark_dirty(&period) {
737 debug!(
738 time_period=?period,
739 "descriptor reupload timer elapsed; scheduling reupload",
740 );
741 self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
742 .await?;
743 }
744 }
745
746 select_biased! {
747 res = self.upload_task_complete_rx.next().fuse() => {
748 let Some(upload_res) = res else {
749 return Ok(ShutdownStatus::Terminate);
750 };
751
752 self.handle_upload_results(upload_res);
753 self.upload_result_to_svc_status()?;
754 },
755 () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
756 self.expire_rate_limit().await?;
757 },
758 () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
759 // Run another iteration, executing run_once again. This time, we will remove the
760 // expired reupload from self.reupload_timers, mark the descriptor dirty for all
761 // relevant HsDirs, and schedule the upload by setting our status to
762 // UploadScheduled.
763 return Ok(ShutdownStatus::Continue);
764 },
765 netdir_event = netdir_events.next().fuse() => {
766 let Some(netdir_event) = netdir_event else {
767 debug!("netdir event stream ended");
768 return Ok(ShutdownStatus::Terminate);
769 };
770
771 if !matches!(netdir_event, DirEvent::NewConsensus) {
772 return Ok(ShutdownStatus::Continue);
773 };
774
775 // The consensus changed. Grab a new NetDir.
776 let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
777 Ok(y) => y,
778 Err(e) => {
779 error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
780 // Hopefully a netdir will appear in the future.
781 // in the meantime, suspend operations.
782 //
783 // TODO (#1218): there is a bug here: we stop reading on our inputs
784 // including eg publish_status_rx, but it is our job to log some of
785 // these things. While we are waiting for a netdir, all those messages
786 // are "stuck"; they'll appear later, with misleading timestamps.
787 //
788 // Probably this should be fixed by moving the logging
789 // out of the reactor, where it won't be blocked.
790 self.dir_provider.wait_for_netdir(Timeliness::Timely)
791 .await?
792 }
793 };
794 let relevant_periods = netdir.hs_all_time_periods();
795 self.handle_consensus_change(netdir).await?;
796 expire_publisher_keys(
797 &self.imm.keymgr,
798 &self.imm.nickname,
799 &relevant_periods,
800 ).unwrap_or_else(|e| {
801 error_report!(e, "failed to remove expired keys");
802 });
803 }
804 update = self.ipt_watcher.await_update().fuse() => {
805 if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
806 return Ok(ShutdownStatus::Terminate);
807 }
808 },
809 config = self.config_rx.next().fuse() => {
810 let Some(config) = config else {
811 return Ok(ShutdownStatus::Terminate);
812 };
813
814 self.handle_svc_config_change(&config).await?;
815 },
816 res = self.key_dirs_rx.next().fuse() => {
817 let Some(event) = res else {
818 return Ok(ShutdownStatus::Terminate);
819 };
820
821 while let Some(_ignore) = self.key_dirs_rx.try_recv() {
822 // Discard other events, so that we only reload once.
823 }
824
825 self.handle_key_dirs_change(event).await?;
826 }
827 should_upload = self.publish_status_rx.next().fuse() => {
828 let Some(should_upload) = should_upload else {
829 return Ok(ShutdownStatus::Terminate);
830 };
831
832 // Our PublishStatus changed -- are we ready to publish?
833 if should_upload == PublishStatus::UploadScheduled {
834 self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
835 self.upload_all().await?;
836 }
837 }
838 update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
839 debug!("Update PoW seed for TP!");
840 let Some(time_period) = update_tp_pow_seed else {
841 return Ok(ShutdownStatus::Terminate);
842 };
843 self.mark_dirty(&time_period);
844 self.upload_all().await?;
845 }
846 }
847
848 Ok(ShutdownStatus::Continue)
849 }
850
851 /// Returns the current status of the publisher
852 fn status(&self) -> PublishStatus {
853 *self.publish_status_rx.borrow()
854 }
855
856 /// Handle a batch of upload outcomes,
857 /// possibly updating the status of the descriptor for the corresponding HSDirs.
858 fn handle_upload_results(&self, results: TimePeriodUploadResult) {
859 let mut inner = self.inner.lock().expect("poisoned lock");
860 let inner = &mut *inner;
861
862 // Check which time period these uploads pertain to.
863 let period = inner
864 .time_periods
865 .iter_mut()
866 .find(|ctx| ctx.params.time_period() == results.time_period);
867
868 let Some(period) = period else {
869 // The uploads were for a time period that is no longer relevant, so we
870 // can ignore the result.
871 return;
872 };
873
874 // We will need to reupload this descriptor at some point, so we pick
875 // a random time between 60 minutes and 120 minutes in the future.
876 //
877 // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
878 let mut rng = self.imm.mockable.thread_rng();
879 // TODO SPEC: Control republish period using a consensus parameter?
880 let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
881 let duration = Duration::from_secs(minutes * 60);
882 let reupload_when = self.imm.runtime.now() + duration;
883 let time_period = period.params.time_period();
884
885 info!(
886 time_period=?time_period,
887 "reuploading descriptor in {}",
888 humantime::format_duration(duration),
889 );
890
891 inner.reupload_timers.push(ReuploadTimer {
892 period: time_period,
893 when: reupload_when,
894 });
895
896 let mut upload_results = vec![];
897 for upload_res in results.hsdir_result {
898 let relay = period
899 .hs_dirs
900 .iter_mut()
901 .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
902
903 let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
904 // This HSDir went away, so the result doesn't matter.
905 // Continue processing the rest of the results
906 continue;
907 };
908
909 if upload_res.upload_res.is_ok() {
910 let update_last_successful = match period.last_successful {
911 None => true,
912 Some(counter) => counter <= upload_res.revision_counter,
913 };
914
915 if update_last_successful {
916 period.last_successful = Some(upload_res.revision_counter);
917 // TODO (#1098): Is it possible that this won't update the statuses promptly
918 // enough. For example, it's possible for the reactor to see a Dirty descriptor
919 // and start an upload task for a descriptor has already been uploaded (or is
920 // being uploaded) in another task, but whose upload results have not yet been
921 // processed.
922 //
923 // This is probably made worse by the fact that the statuses are updated in
924 // batches (grouped by time period), rather than one by one as the upload tasks
925 // complete (updating the status involves locking the inner mutex, and I wanted
926 // to minimize the locking/unlocking overheads). I'm not sure handling the
927 // updates in batches was the correct decision here.
928 *status = DescriptorStatus::Clean;
929 }
930 }
931
932 upload_results.push(upload_res);
933 }
934
935 period.set_upload_results(upload_results);
936 }
937
938 /// Maybe update our list of HsDirs.
939 async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
940 trace!("the consensus has changed; recomputing HSDirs");
941
942 let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
943
944 self.recompute_hs_dirs()?;
945 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
946 .await?;
947
948 // If the time period has changed, some of our upload results may now be irrelevant,
949 // so we might need to update our status (for example, if our uploads are
950 // for a no-longer-relevant time period, it means we might be able to update
951 // out status from "degraded" to "running")
952 self.upload_result_to_svc_status()?;
953
954 Ok(())
955 }
956
957 /// Recompute the HsDirs for all relevant time periods.
958 fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
959 let mut inner = self.inner.lock().expect("poisoned lock");
960 let inner = &mut *inner;
961
962 let netdir = Arc::clone(
963 inner
964 .netdir
965 .as_ref()
966 .ok_or_else(|| internal!("started upload task without a netdir"))?,
967 );
968
969 // Update our list of relevant time periods.
970 let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
971 inner.time_periods = new_time_periods;
972
973 Ok(())
974 }
975
976 /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
977 ///
978 /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
979 /// HsDirs where possible.
980 fn compute_time_periods(
981 &self,
982 netdir: &Arc<NetDir>,
983 time_periods: &[TimePeriodContext],
984 ) -> Result<Vec<TimePeriodContext>, FatalError> {
985 netdir
986 .hs_all_time_periods()
987 .iter()
988 .map(|params| {
989 let period = params.time_period();
990 let blind_id_kp =
991 read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
992 // Note: for now, read_blind_id_keypair cannot return Ok(None).
993 // It's supposed to return Ok(None) if we're in offline hsid mode,
994 // but that might change when we do #1194
995 .ok_or_else(|| internal!("offline hsid mode not supported"))?;
996
997 let blind_id: HsBlindIdKey = (&blind_id_kp).into();
998
999 // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
1000 // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
1001 // publishing the descriptor to the HsDirs that already have it (the ones that are
1002 // marked with DescriptorStatus::Clean).
1003 //
1004 // In other words, we only want to publish to those HsDirs that
1005 // * are part of a new time period (which we have never published the descriptor
1006 // for), or
1007 // * have just been added to the ring of a time period we already knew about
1008 if let Some(ctx) = time_periods
1009 .iter()
1010 .find(|ctx| ctx.params.time_period() == period)
1011 {
1012 TimePeriodContext::new(
1013 params.clone(),
1014 blind_id.into(),
1015 netdir,
1016 ctx.hs_dirs.iter(),
1017 ctx.upload_results.clone(),
1018 )
1019 } else {
1020 // Passing an empty iterator here means all HsDirs in this TimePeriodContext
1021 // will be marked as dirty, meaning we will need to upload our descriptor to them.
1022 TimePeriodContext::new(
1023 params.clone(),
1024 blind_id.into(),
1025 netdir,
1026 iter::empty(),
1027 vec![],
1028 )
1029 }
1030 })
1031 .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
1032 }
1033
1034 /// Replace the old netdir with the new, returning the old.
1035 fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
1036 self.inner
1037 .lock()
1038 .expect("poisoned lock")
1039 .netdir
1040 .replace(new_netdir)
1041 }
1042
1043 /// Replace our view of the service config with `new_config` if `new_config` contains changes
1044 /// that would cause us to generate a new descriptor.
1045 fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1046 let mut inner = self.inner.lock().expect("poisoned lock");
1047 let old_config = &mut inner.config;
1048
1049 // The fields we're interested in haven't changed, so there's no need to update
1050 // `inner.config`.
1051 if *old_config == new_config {
1052 return false;
1053 }
1054
1055 let log_change = match (
1056 old_config.restricted_discovery.enabled,
1057 new_config.restricted_discovery.enabled,
1058 ) {
1059 (true, false) => Some("Disabling restricted discovery mode"),
1060 (false, true) => Some("Enabling restricted discovery mode"),
1061 _ => None,
1062 };
1063
1064 if let Some(msg) = log_change {
1065 info!(nickname=%self.imm.nickname, "{}", msg);
1066 }
1067
1068 let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1069
1070 true
1071 }
1072
1073 /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1074 fn update_file_watcher(&self) {
1075 let mut inner = self.inner.lock().expect("poisoned lock");
1076 if inner.config.restricted_discovery.watch_configuration() {
1077 debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1078 let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1079
1080 let dirs = inner.config.restricted_discovery.key_dirs().clone();
1081
1082 watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1083
1084 let watcher = watcher
1085 .start_watching(self.key_dirs_tx.clone())
1086 .map_err(|e| {
1087 // TODO: update the publish status (see also the module-level TODO about this).
1088 error_report!(e, "Cannot set file watcher");
1089 })
1090 .ok();
1091 inner.file_watcher = watcher;
1092 } else {
1093 if inner.file_watcher.is_some() {
1094 debug!("removing key_dirs watcher");
1095 }
1096 inner.file_watcher = None;
1097 }
1098 }
1099
1100 /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1101 /// uploading.
1102 fn note_ipt_change(&self) -> PublishStatus {
1103 let mut ipts = self.ipt_watcher.borrow_for_publish();
1104 match ipts.ipts.as_mut() {
1105 Some(_ipts) => PublishStatus::UploadScheduled,
1106 None => PublishStatus::AwaitingIpts,
1107 }
1108 }
1109
1110 /// Update our list of introduction points.
1111 async fn handle_ipt_change(
1112 &mut self,
1113 update: Option<Result<(), crate::FatalError>>,
1114 ) -> Result<ShutdownStatus, FatalError> {
1115 trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1116 match update {
1117 Some(Ok(())) => {
1118 let should_upload = self.note_ipt_change();
1119 debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1120
1121 self.mark_all_dirty();
1122 self.update_publish_status_unless_rate_lim(should_upload)
1123 .await?;
1124 Ok(ShutdownStatus::Continue)
1125 }
1126 Some(Err(e)) => Err(e),
1127 None => {
1128 debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1129 Ok(ShutdownStatus::Terminate)
1130 }
1131 }
1132 }
1133
1134 /// Update the `PublishStatus` of the reactor with `new_state`,
1135 /// unless the current state is `AwaitingIpts`.
1136 async fn update_publish_status_unless_waiting(
1137 &mut self,
1138 new_state: PublishStatus,
1139 ) -> Result<(), FatalError> {
1140 // Only update the state if we're not waiting for intro points.
1141 if self.status() != PublishStatus::AwaitingIpts {
1142 self.update_publish_status(new_state).await?;
1143 }
1144
1145 Ok(())
1146 }
1147
1148 /// Update the `PublishStatus` of the reactor with `new_state`,
1149 /// unless the current state is `RateLimited`.
1150 async fn update_publish_status_unless_rate_lim(
1151 &mut self,
1152 new_state: PublishStatus,
1153 ) -> Result<(), FatalError> {
1154 // We can't exit this state until the rate-limit expires.
1155 if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1156 self.update_publish_status(new_state).await?;
1157 }
1158
1159 Ok(())
1160 }
1161
1162 /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1163 async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1164 let onion_status = match new_state {
1165 PublishStatus::Idle => None,
1166 PublishStatus::UploadScheduled
1167 | PublishStatus::AwaitingIpts
1168 | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1169 };
1170
1171 if let Some(onion_status) = onion_status {
1172 self.imm.status_tx.send(onion_status, None);
1173 }
1174
1175 trace!(
1176 "publisher reactor status change: {:?} -> {:?}",
1177 self.status(),
1178 new_state
1179 );
1180
1181 self.publish_status_tx.send(new_state).await.map_err(
1182 |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1183 )?;
1184
1185 Ok(())
1186 }
1187
1188 /// Update the onion svc status based on the results of the last descriptor uploads.
1189 fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1190 let inner = self.inner.lock().expect("poisoned lock");
1191 let netdir = inner
1192 .netdir
1193 .as_ref()
1194 .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1195
1196 let (state, err) = upload_result_state(netdir, &inner.time_periods);
1197 self.imm.status_tx.send(state, err);
1198
1199 Ok(())
1200 }
1201
1202 /// Update the descriptors based on the config change.
1203 async fn handle_svc_config_change(
1204 &mut self,
1205 config: &OnionServiceConfig,
1206 ) -> Result<(), FatalError> {
1207 let new_config = Arc::new(config.into());
1208 if self.replace_config_if_changed(Arc::clone(&new_config)) {
1209 self.update_file_watcher();
1210 self.update_authorized_clients_if_changed();
1211
1212 info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1213 self.mark_all_dirty();
1214
1215 // Schedule an upload, unless we're still waiting for IPTs.
1216 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1217 .await?;
1218 }
1219
1220 Ok(())
1221 }
1222
1223 /// Update the descriptors based on a restricted discovery key_dirs change.
1224 ///
1225 /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1226 /// this marks the descriptor as dirty for all time periods,
1227 /// and schedules a reupload.
1228 async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1229 debug!("The configured key_dirs have changed");
1230 match event {
1231 FileEvent::Rescan | FileEvent::FileChanged => {
1232 // These events are handled in the same way, by re-reading the keys from disk
1233 // and republishing the descriptor if necessary
1234 }
1235 _ => return Err(internal!("file watcher event {event:?}").into()),
1236 };
1237
1238 // Update the file watcher, in case the change was triggered by a key_dir move.
1239 self.update_file_watcher();
1240
1241 if self.update_authorized_clients_if_changed() {
1242 self.mark_all_dirty();
1243
1244 // Schedule an upload, unless we're still waiting for IPTs.
1245 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1246 .await?;
1247 }
1248
1249 Ok(())
1250 }
1251
1252 /// Recreate the authorized_clients based on the current config.
1253 ///
1254 /// Returns `true` if the authorized clients have changed.
1255 fn update_authorized_clients_if_changed(&mut self) -> bool {
1256 let mut inner = self.inner.lock().expect("poisoned lock");
1257 let authorized_clients =
1258 Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1259
1260 let clients = &mut inner.authorized_clients;
1261 let changed = clients.as_ref() != authorized_clients.as_ref();
1262
1263 if changed {
1264 info!("The restricted discovery mode authorized clients have changed");
1265 *clients = authorized_clients;
1266 }
1267
1268 changed
1269 }
1270
1271 /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1272 fn read_authorized_clients(
1273 config: &RestrictedDiscoveryConfig,
1274 path_resolver: &CfgPathResolver,
1275 ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1276 let authorized_clients = config.read_keys(path_resolver);
1277
1278 if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1279 warn!(
1280 "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1281 );
1282 }
1283
1284 authorized_clients.map(Arc::new)
1285 }
1286
1287 /// Mark the descriptor dirty for all time periods.
1288 fn mark_all_dirty(&self) {
1289 trace!("marking the descriptor dirty for all time periods");
1290
1291 self.inner
1292 .lock()
1293 .expect("poisoned lock")
1294 .time_periods
1295 .iter_mut()
1296 .for_each(|tp| tp.mark_all_dirty());
1297 }
1298
1299 /// Mark the descriptor dirty for the specified time period.
1300 ///
1301 /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1302 fn mark_dirty(&self, period: &TimePeriod) -> bool {
1303 let mut inner = self.inner.lock().expect("poisoned lock");
1304 let period_ctx = inner
1305 .time_periods
1306 .iter_mut()
1307 .find(|tp| tp.params.time_period() == *period);
1308
1309 match period_ctx {
1310 Some(ctx) => {
1311 trace!(time_period=?period, "marking the descriptor dirty");
1312 ctx.mark_all_dirty();
1313 true
1314 }
1315 None => false,
1316 }
1317 }
1318
1319 /// Try to upload our descriptor to the HsDirs that need it.
1320 ///
1321 /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1322 /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1323 ///
1324 /// Failed uploads are retried
1325 /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1326 ///
1327 /// If restricted discovery mode is enabled and there are no authorized clients,
1328 /// we abort the upload and set our status to [`State::Broken`].
1329 //
1330 // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1331 // (for example if the IPTs change),
1332 // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1333 // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1334 //
1335 /// For each current time period, we spawn a task that uploads the descriptor to
1336 /// all the HsDirs on the HsDir ring of that time period.
1337 /// Each task shuts down on completion, or when the reactor is dropped.
1338 ///
1339 /// Each task reports its upload results (`TimePeriodUploadResult`)
1340 /// via the `upload_task_complete_tx` channel.
1341 /// The results are received and processed in the main loop of the reactor.
1342 ///
1343 /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1344 #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1345 async fn upload_all(&mut self) -> Result<(), FatalError> {
1346 trace!("starting descriptor upload task...");
1347
1348 // Abort the upload entirely if we have an empty list of authorized clients
1349 let authorized_clients = match self.authorized_clients() {
1350 Ok(authorized_clients) => authorized_clients,
1351 Err(e) => {
1352 error_report!(e, "aborting upload");
1353 self.imm.status_tx.send_broken(e.clone());
1354
1355 // Returning an error would shut down the reactor, so we have to return Ok here.
1356 return Ok(());
1357 }
1358 };
1359
1360 let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1361 let now = self.imm.runtime.now();
1362 // Check if we should rate-limit this upload.
1363 if let Some(ts) = last_uploaded {
1364 let duration_since_upload = now.duration_since(ts);
1365
1366 if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1367 return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1368 }
1369 }
1370
1371 let mut inner = self.inner.lock().expect("poisoned lock");
1372 let inner = &mut *inner;
1373
1374 let _ = inner.last_uploaded.insert(now);
1375
1376 for period_ctx in inner.time_periods.iter_mut() {
1377 let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1378
1379 // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1380 // have our latest descriptor, so we filter them out).
1381 let hs_dirs = period_ctx
1382 .hs_dirs
1383 .iter()
1384 .filter_map(|(relay_id, status)| {
1385 if *status == DescriptorStatus::Dirty {
1386 Some(relay_id.clone())
1387 } else {
1388 None
1389 }
1390 })
1391 .collect::<Vec<_>>();
1392
1393 if hs_dirs.is_empty() {
1394 trace!("the descriptor is clean for all HSDirs. Nothing to do");
1395 return Ok(());
1396 }
1397
1398 let time_period = period_ctx.params.time_period();
1399 // This scope exists because rng is not Send, so it needs to fall out of scope before we
1400 // await anything.
1401 let netdir = Arc::clone(
1402 inner
1403 .netdir
1404 .as_ref()
1405 .ok_or_else(|| internal!("started upload task without a netdir"))?,
1406 );
1407
1408 let imm = Arc::clone(&self.imm);
1409 let ipt_upload_view = self.ipt_watcher.upload_view();
1410 let config = Arc::clone(&inner.config);
1411 let authorized_clients = authorized_clients.clone();
1412
1413 trace!(nickname=%self.imm.nickname, time_period=?time_period,
1414 "spawning upload task"
1415 );
1416
1417 let params = period_ctx.params.clone();
1418 let shutdown_rx = self.shutdown_tx.subscribe();
1419
1420 // Spawn a task to upload the descriptor to all HsDirs of this time period.
1421 //
1422 // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1423 // dropped).
1424 let _handle: () = self
1425 .imm
1426 .runtime
1427 .spawn(async move {
1428 if let Err(e) = Self::upload_for_time_period(
1429 hs_dirs,
1430 &netdir,
1431 config,
1432 params,
1433 Arc::clone(&imm),
1434 ipt_upload_view.clone(),
1435 authorized_clients.clone(),
1436 upload_task_complete_tx,
1437 shutdown_rx,
1438 )
1439 .await
1440 {
1441 error_report!(
1442 e,
1443 "descriptor upload failed for HS service {} and time period {:?}",
1444 imm.nickname,
1445 time_period
1446 );
1447 }
1448 })
1449 .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1450 }
1451
1452 Ok(())
1453 }
1454
1455 /// Upload the descriptor for the time period specified in `params`.
1456 ///
1457 /// Failed uploads are retried
1458 /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1459 #[allow(clippy::too_many_arguments)] // TODO: refactor
1460 #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1461 async fn upload_for_time_period(
1462 hs_dirs: Vec<RelayIds>,
1463 netdir: &Arc<NetDir>,
1464 config: Arc<OnionServiceConfigPublisherView>,
1465 params: HsDirParams,
1466 imm: Arc<Immutable<R, M>>,
1467 ipt_upload_view: IptsPublisherUploadView,
1468 authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1469 mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1470 shutdown_rx: broadcast::Receiver<Void>,
1471 ) -> Result<(), FatalError> {
1472 let time_period = params.time_period();
1473 trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1474
1475 let hsdir_count = hs_dirs.len();
1476
1477 /// An error returned from an upload future.
1478 //
1479 // Exhaustive, because this is a private type.
1480 #[derive(Clone, Debug, thiserror::Error)]
1481 enum PublishError {
1482 /// The upload was aborted because there are no IPTs.
1483 ///
1484 /// This happens because of an inevitable TOCTOU race, where after being notified by
1485 /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1486 /// we find out there actually are no IPTs, so we can't build the descriptor.
1487 ///
1488 /// This is a special kind of error that interrupts the current upload task, and is
1489 /// logged at `debug!` level rather than `warn!` or `error!`.
1490 ///
1491 /// Ideally, this shouldn't happen very often (if at all).
1492 #[error("No IPTs")]
1493 NoIpts,
1494
1495 /// The reactor has shut down
1496 #[error("The reactor has shut down")]
1497 Shutdown,
1498
1499 /// An fatal error.
1500 #[error("{0}")]
1501 Fatal(#[from] FatalError),
1502 }
1503
1504 let max_hsdesc_len: usize = netdir
1505 .params()
1506 .hsdir_max_desc_size
1507 .try_into()
1508 .expect("Unable to convert positive int32 to usize!?");
1509
1510 let upload_results = futures::stream::iter(hs_dirs)
1511 .map(|relay_ids| {
1512 let netdir = netdir.clone();
1513 let config = Arc::clone(&config);
1514 let imm = Arc::clone(&imm);
1515 let ipt_upload_view = ipt_upload_view.clone();
1516 let authorized_clients = authorized_clients.clone();
1517 let params = params.clone();
1518 let mut shutdown_rx = shutdown_rx.clone();
1519
1520 let ed_id = relay_ids
1521 .rsa_identity()
1522 .map(|id| id.to_string())
1523 .unwrap_or_else(|| "unknown".into());
1524 let rsa_id = relay_ids
1525 .rsa_identity()
1526 .map(|id| id.to_string())
1527 .unwrap_or_else(|| "unknown".into());
1528
1529 async move {
1530 let run_upload = |desc| async {
1531 let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1532 // This should never happen (all of our relay_ids are from the stored
1533 // netdir).
1534 let err =
1535 "tried to upload descriptor to relay not found in consensus?!";
1536 warn!(
1537 nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1538 "{err}"
1539 );
1540 return Err(internal!("{err}").into());
1541 };
1542
1543 Self::upload_descriptor_with_retries(
1544 desc,
1545 &netdir,
1546 &hsdir,
1547 &ed_id,
1548 &rsa_id,
1549 Arc::clone(&imm),
1550 )
1551 .await
1552 };
1553
1554 // How long until we're supposed to time out?
1555 let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1556 // We generate a new descriptor before _each_ HsDir upload. This means each
1557 // HsDir could, in theory, receive a different descriptor (not just in terms of
1558 // revision-counters, but also with a different set of IPTs). It may seem like
1559 // this could lead to some HsDirs being left with an outdated descriptor, but
1560 // that's not the case: after the upload completes, the publisher will be
1561 // notified by the ipt_watcher of the IPT change event (if there was one to
1562 // begin with), which will trigger another upload job.
1563 let hsdesc = {
1564 // This scope is needed because the ipt_set MutexGuard is not Send, so it
1565 // needs to fall out of scope before the await point below
1566 let mut ipt_set = ipt_upload_view.borrow_for_publish();
1567
1568 // If there are no IPTs, we abort the upload. At this point, we might have
1569 // uploaded the descriptor to some, but not all, HSDirs from the specified
1570 // time period.
1571 //
1572 // Returning an error here means the upload completion task is never
1573 // notified of the outcome of any of these uploads (which means the
1574 // descriptor is not marked clean). This is OK, because if we suddenly find
1575 // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1576 // IPTs, so we need to go back to the main loop to wait for IPT changes,
1577 // and generate a fresh descriptor anyway.
1578 //
1579 // Ideally, this shouldn't happen very often (if at all).
1580 let Some(ipts) = ipt_set.ipts.as_mut() else {
1581 return Err(PublishError::NoIpts);
1582 };
1583
1584 let hsdesc = {
1585 trace!(
1586 nickname=%imm.nickname, time_period=?time_period,
1587 "building descriptor"
1588 );
1589 let mut rng = imm.mockable.thread_rng();
1590 let mut key_rng = tor_llcrypto::rng::CautiousRng;
1591
1592 // We're about to generate a new version of the descriptor,
1593 // so let's generate a new revision counter.
1594 let now = imm.runtime.wallclock();
1595 let revision_counter = imm.generate_revision_counter(¶ms, now)?;
1596
1597 build_sign(
1598 &imm.keymgr,
1599 &imm.pow_manager,
1600 &config,
1601 authorized_clients.as_deref(),
1602 ipts,
1603 time_period,
1604 revision_counter,
1605 &mut rng,
1606 &mut key_rng,
1607 imm.runtime.wallclock(),
1608 max_hsdesc_len,
1609 )?
1610 };
1611
1612 if let Err(e) =
1613 ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1614 {
1615 let wait = e.log_retry_max(&imm.nickname)?;
1616 // TODO (#1226): retry instead of this
1617 return Err(FatalError::Bug(internal!(
1618 "ought to retry after {wait:?}, crashing instead"
1619 ))
1620 .into());
1621 }
1622
1623 hsdesc
1624 };
1625
1626 let VersionedDescriptor {
1627 desc,
1628 revision_counter,
1629 } = hsdesc;
1630
1631 trace!(
1632 nickname=%imm.nickname, time_period=?time_period,
1633 revision_counter=?revision_counter,
1634 "generated new descriptor for time period",
1635 );
1636
1637 // (Actually launch the upload attempt. No timeout is needed
1638 // here, since the backoff::Runner code will handle that for us.)
1639 let upload_res: UploadResult = select_biased! {
1640 shutdown = shutdown_rx.next().fuse() => {
1641 // This will always be None, since Void is uninhabited.
1642 let _: Option<Void> = shutdown;
1643
1644 // It looks like the reactor has shut down,
1645 // so there is no point in uploading the descriptor anymore.
1646 //
1647 // Let's shut down the upload task too.
1648 trace!(
1649 nickname=%imm.nickname, time_period=?time_period,
1650 "upload task received shutdown signal"
1651 );
1652
1653 return Err(PublishError::Shutdown);
1654 },
1655 res = run_upload(desc.clone()).fuse() => res,
1656 };
1657
1658 // Note: UploadResult::Failure is only returned when
1659 // upload_descriptor_with_retries fails, i.e. if all our retry
1660 // attempts have failed
1661 Ok(HsDirUploadStatus {
1662 relay_ids,
1663 upload_res,
1664 revision_counter,
1665 })
1666 }
1667 })
1668 // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1669 .boxed()
1670 .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1671 .try_collect::<Vec<_>>()
1672 .await;
1673
1674 let upload_results = match upload_results {
1675 Ok(v) => v,
1676 Err(PublishError::Fatal(e)) => return Err(e),
1677 Err(PublishError::NoIpts) => {
1678 debug!(
1679 nickname=%imm.nickname, time_period=?time_period,
1680 "no introduction points; skipping upload"
1681 );
1682
1683 return Ok(());
1684 }
1685 Err(PublishError::Shutdown) => {
1686 debug!(
1687 nickname=%imm.nickname, time_period=?time_period,
1688 "the reactor has shut down; aborting upload"
1689 );
1690
1691 return Ok(());
1692 }
1693 };
1694
1695 let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1696 .iter()
1697 .partition(|res| res.upload_res.is_ok());
1698
1699 debug!(
1700 nickname=%imm.nickname, time_period=?time_period,
1701 "descriptor uploaded successfully to {}/{} HSDirs",
1702 succeeded.len(), hsdir_count
1703 );
1704
1705 if upload_task_complete_tx
1706 .send(TimePeriodUploadResult {
1707 time_period,
1708 hsdir_result: upload_results,
1709 })
1710 .await
1711 .is_err()
1712 {
1713 return Err(internal!(
1714 "failed to notify reactor of upload completion (reactor shut down)"
1715 )
1716 .into());
1717 }
1718
1719 Ok(())
1720 }
1721
1722 /// Upload a descriptor to the specified HSDir.
1723 ///
1724 /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1725 /// to the caller to retry on failure.
1726 ///
1727 /// This function does not handle timeouts.
1728 async fn upload_descriptor(
1729 hsdesc: String,
1730 netdir: &Arc<NetDir>,
1731 hsdir: &Relay<'_>,
1732 imm: Arc<Immutable<R, M>>,
1733 ) -> Result<(), UploadError> {
1734 let request = HsDescUploadRequest::new(hsdesc);
1735
1736 trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1737 "starting descriptor upload",
1738 );
1739
1740 let tunnel = imm
1741 .mockable
1742 .get_or_launch_hs_dir(netdir, OwnedCircTarget::from_circ_target(hsdir))
1743 .await?;
1744 let source: Option<SourceInfo> = tunnel
1745 .source_info()
1746 .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
1747
1748 let mut stream = tunnel
1749 .begin_dir_stream()
1750 .await
1751 .map_err(UploadError::Stream)?;
1752
1753 let _response: String = send_request(&imm.runtime, &request, &mut stream, source)
1754 .await
1755 .map_err(|dir_error| -> UploadError {
1756 match dir_error {
1757 DirClientError::RequestFailed(e) => e.into(),
1758 DirClientError::CircMgr(e) => into_internal!(
1759 "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1760 )(e)
1761 .into(),
1762 e => into_internal!("unexpected error")(e).into(),
1763 }
1764 })?
1765 .into_output_string()?; // This returns an error if we received an error response
1766
1767 Ok(())
1768 }
1769
1770 /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1771 ///
1772 /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1773 /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1774 /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1775 /// according to the [`BackoffSchedule::single_attempt_timeout`].
1776 /// This function gives up after the overall timeout elapses,
1777 /// declaring the upload a failure, and never retrying it again.
1778 ///
1779 /// See also [`BackoffSchedule`].
1780 async fn upload_descriptor_with_retries(
1781 hsdesc: String,
1782 netdir: &Arc<NetDir>,
1783 hsdir: &Relay<'_>,
1784 ed_id: &str,
1785 rsa_id: &str,
1786 imm: Arc<Immutable<R, M>>,
1787 ) -> UploadResult {
1788 /// The base delay to use for the backoff schedule.
1789 const BASE_DELAY_MSEC: u32 = 1000;
1790 let schedule = PublisherBackoffSchedule {
1791 retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1792 mockable: imm.mockable.clone(),
1793 };
1794
1795 let runner = Runner::new(
1796 "upload a hidden service descriptor".into(),
1797 schedule.clone(),
1798 imm.runtime.clone(),
1799 );
1800
1801 let fallible_op = || async {
1802 let r = Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm)).await;
1803
1804 if let Err(e) = &r {
1805 if e.should_report_as_suspicious() {
1806 // Note that not every protocol violation is suspicious:
1807 // we only warn on the protocol violations that look like attempts
1808 // to do a traffic tagging attack via hsdir inflation.
1809 // (See proposal 360.)
1810 warn_report!(
1811 e,
1812 "Suspicious error while uploading descriptor to {}/{}",
1813 ed_id,
1814 rsa_id
1815 );
1816 }
1817 }
1818 r
1819 };
1820
1821 let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1822 match outcome {
1823 Ok(()) => {
1824 debug!(
1825 nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1826 "successfully uploaded descriptor to HSDir",
1827 );
1828
1829 Ok(())
1830 }
1831 Err(e) => {
1832 warn_report!(
1833 e,
1834 "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1835 imm.nickname,
1836 ed_id,
1837 rsa_id
1838 );
1839
1840 Err(e.into())
1841 }
1842 }
1843 }
1844
1845 /// Stop publishing descriptors until the specified delay elapses.
1846 async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1847 if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1848 debug!(
1849 "We are rate-limited for {}; pausing descriptor publication",
1850 humantime::format_duration(delay)
1851 );
1852 let until = self.imm.runtime.now() + delay;
1853 self.update_publish_status(PublishStatus::RateLimited(until))
1854 .await?;
1855 }
1856
1857 Ok(())
1858 }
1859
1860 /// Handle the upload rate-limit being lifted.
1861 async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1862 debug!("We are no longer rate-limited; resuming descriptor publication");
1863 self.update_publish_status(PublishStatus::UploadScheduled)
1864 .await?;
1865 Ok(())
1866 }
1867
1868 /// Return the authorized clients, if restricted mode is enabled.
1869 ///
1870 /// Returns `Ok(None)` if restricted discovery mode is disabled.
1871 ///
1872 /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1873 #[cfg_attr(
1874 not(feature = "restricted-discovery"),
1875 allow(clippy::unnecessary_wraps)
1876 )]
1877 fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1878 cfg_if::cfg_if! {
1879 if #[cfg(feature = "restricted-discovery")] {
1880 let authorized_clients = self
1881 .inner
1882 .lock()
1883 .expect("poisoned lock")
1884 .authorized_clients
1885 .clone();
1886
1887 if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1888 return Err(FatalError::RestrictedDiscoveryNoClients);
1889 }
1890
1891 Ok(authorized_clients)
1892 } else {
1893 Ok(None)
1894 }
1895 }
1896 }
1897}
1898
1899/// Try to expand a path, logging a warning on failure.
1900fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1901 // map_err returns unit for clarity
1902 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1903 p.path(r)
1904 .map_err(|e| {
1905 tor_error::warn_report!(e, "invalid path");
1906 ()
1907 })
1908 .ok()
1909}
1910
1911/// Add `path` to the specified `watcher`.
1912macro_rules! watch_path {
1913 ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1914 if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1915 warn_report!(e, "failed to watch path {:?}", $path);
1916 } else {
1917 debug!("watching path {:?}", $path);
1918 }
1919 }}
1920}
1921
1922/// Add the specified directories to the watcher.
1923#[allow(clippy::cognitive_complexity)]
1924fn watch_dirs<R: Runtime>(
1925 watcher: &mut FileWatcherBuilder<R>,
1926 dirs: &DirectoryKeyProviderList,
1927 path_resolver: &CfgPathResolver,
1928) {
1929 for path in dirs {
1930 let path = path.path();
1931 let Some(path) = maybe_expand_path(path, path_resolver) else {
1932 warn!("failed to expand key_dir path {:?}", path);
1933 continue;
1934 };
1935
1936 // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1937 // so we skip over paths that don't exist at this time
1938 // (this obviously suffers from a TOCTOU race, but most of the time,
1939 // it is good enough at preventing the watcher from failing to watch.
1940 // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1941 // but it will fail to set the watcher).
1942 if matches!(path.try_exists(), Ok(true)) {
1943 watch_path!(watcher, &path, watch_dir, "auth",);
1944 }
1945 // FileWatcher::watch_path causes the parent dir of the path to be watched.
1946 if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1947 watch_path!(watcher, &path, watch_path,);
1948 }
1949 }
1950}
1951
1952/// Try to read the blinded identity key for a given `TimePeriod`.
1953///
1954/// Returns `None` if the service is running in "offline" mode.
1955///
1956// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1957// `Ok(None)`.
1958pub(super) fn read_blind_id_keypair(
1959 keymgr: &Arc<KeyMgr>,
1960 nickname: &HsNickname,
1961 period: TimePeriod,
1962) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1963 let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1964 let hsid_kp = keymgr
1965 .get::<HsIdKeypair>(&svc_key_spec)?
1966 .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1967
1968 let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1969
1970 // TODO: make the keystore selector configurable
1971 let keystore_selector = Default::default();
1972 match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1973 Some(kp) => Ok(Some(kp)),
1974 None => {
1975 let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1976 .compute_blinded_key(period)
1977 .map_err(|_| internal!("failed to compute blinded key"))?;
1978
1979 // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1980 // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1981 // which assumes keys are randomly generated, rather than derived from existing keys).
1982
1983 keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1984
1985 let arti_path = |spec: &dyn KeySpecifier| {
1986 spec.arti_path()
1987 .map_err(into_internal!("invalid key specifier?!"))
1988 };
1989
1990 Ok(Some(
1991 keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1992 FatalError::KeystoreRace {
1993 action: "read",
1994 path: arti_path(&blind_id_key_spec)?,
1995 },
1996 )?,
1997 ))
1998 }
1999 }
2000}
2001
2002/// Determine the [`State`] of the publisher based on the upload results
2003/// from the current `time_periods`.
2004fn upload_result_state(
2005 netdir: &NetDir,
2006 time_periods: &[TimePeriodContext],
2007) -> (State, Option<Problem>) {
2008 let current_period = netdir.hs_time_period();
2009 let current_period_res = time_periods
2010 .iter()
2011 .find(|ctx| ctx.params.time_period() == current_period);
2012
2013 let succeeded_current_tp = current_period_res
2014 .iter()
2015 .flat_map(|res| &res.upload_results)
2016 .filter(|res| res.upload_res.is_ok())
2017 .collect_vec();
2018
2019 let secondary_tp_res = time_periods
2020 .iter()
2021 .filter(|ctx| ctx.params.time_period() != current_period)
2022 .collect_vec();
2023
2024 let succeeded_secondary_tp = secondary_tp_res
2025 .iter()
2026 .flat_map(|res| &res.upload_results)
2027 .filter(|res| res.upload_res.is_ok())
2028 .collect_vec();
2029
2030 // All of the failed uploads (for all TPs)
2031 let failed = time_periods
2032 .iter()
2033 .flat_map(|res| &res.upload_results)
2034 .filter(|res| res.upload_res.is_err())
2035 .collect_vec();
2036 let problems: Vec<DescUploadRetryError> = failed
2037 .iter()
2038 .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
2039 .collect();
2040
2041 let err = match problems.as_slice() {
2042 [_, ..] => Some(problems.into()),
2043 [] => None,
2044 };
2045
2046 if time_periods.len() < 2 {
2047 // We need at least TP contexts (one for the primary TP,
2048 // and another for the secondary one).
2049 //
2050 // If either is missing, we are unreachable for some or all clients.
2051 return (State::DegradedUnreachable, err);
2052 }
2053
2054 let state = match (
2055 succeeded_current_tp.as_slice(),
2056 succeeded_secondary_tp.as_slice(),
2057 ) {
2058 (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
2059 // We don't have any upload results for one or both TPs.
2060 // We are still bootstrapping.
2061 State::Bootstrapping
2062 }
2063 (&[_, ..], &[_, ..]) if failed.is_empty() => {
2064 // We have uploaded the descriptor to one or more HsDirs from both
2065 // HsDir rings (primary and secondary), and none of the uploads failed.
2066 // We are fully reachable.
2067 State::Running
2068 }
2069 (&[_, ..], &[_, ..]) => {
2070 // We have uploaded the descriptor to one or more HsDirs from both
2071 // HsDir rings (primary and secondary), but some of the uploads failed.
2072 // We are reachable, but we failed to upload the descriptor to all the HsDirs
2073 // that were supposed to have it.
2074 State::DegradedReachable
2075 }
2076 (&[..], &[]) | (&[], &[..]) => {
2077 // We have either
2078 // * uploaded the descriptor to some of the HsDirs from one of the rings,
2079 // but haven't managed to upload it to any of the HsDirs on the other ring, or
2080 // * all of the uploads failed
2081 //
2082 // Either way, we are definitely not reachable by all clients.
2083 State::DegradedUnreachable
2084 }
2085 };
2086
2087 (state, err)
2088}
2089
2090/// Whether the reactor should initiate an upload.
2091#[derive(Copy, Clone, Debug, Default, PartialEq)]
2092enum PublishStatus {
2093 /// We need to call upload_all.
2094 UploadScheduled,
2095 /// We are rate-limited until the specified [`Instant`].
2096 ///
2097 /// We have tried to schedule multiple uploads in a short time span,
2098 /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2099 /// channel to unblock us.
2100 RateLimited(Instant),
2101 /// We are idle and waiting for external events.
2102 ///
2103 /// We have enough information to build the descriptor, but since we have already called
2104 /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2105 Idle,
2106 /// We are waiting for the IPT manager to establish some introduction points.
2107 ///
2108 /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2109 /// `UploadScheduled`.
2110 #[default]
2111 AwaitingIpts,
2112}
2113
2114/// The backoff schedule for the task that publishes descriptors.
2115#[derive(Clone, Debug)]
2116struct PublisherBackoffSchedule<M: Mockable> {
2117 /// The delays
2118 retry_delay: RetryDelay,
2119 /// The mockable reactor state, needed for obtaining an rng.
2120 mockable: M,
2121}
2122
2123impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2124 fn max_retries(&self) -> Option<usize> {
2125 None
2126 }
2127
2128 fn overall_timeout(&self) -> Option<Duration> {
2129 Some(OVERALL_UPLOAD_TIMEOUT)
2130 }
2131
2132 fn single_attempt_timeout(&self) -> Option<Duration> {
2133 Some(self.mockable.estimate_upload_timeout())
2134 }
2135
2136 fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2137 Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2138 }
2139}
2140
2141impl RetriableError for UploadError {
2142 fn should_retry(&self) -> bool {
2143 match self {
2144 UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2145 UploadError::Bug(_) => false,
2146 }
2147 }
2148}
2149
2150/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2151#[derive(Debug, Clone)]
2152struct TimePeriodUploadResult {
2153 /// The time period.
2154 time_period: TimePeriod,
2155 /// The upload results.
2156 hsdir_result: Vec<HsDirUploadStatus>,
2157}
2158
2159/// The outcome of uploading a descriptor to a particular HsDir.
2160#[derive(Clone, Debug)]
2161struct HsDirUploadStatus {
2162 /// The identity of the HsDir we attempted to upload the descriptor to.
2163 relay_ids: RelayIds,
2164 /// The outcome of this attempt.
2165 upload_res: UploadResult,
2166 /// The revision counter of the descriptor we tried to upload.
2167 revision_counter: RevisionCounter,
2168}
2169
2170/// The outcome of uploading a descriptor.
2171type UploadResult = Result<(), DescUploadRetryError>;
2172
2173impl From<BackoffError<UploadError>> for DescUploadRetryError {
2174 fn from(e: BackoffError<UploadError>) -> Self {
2175 use BackoffError as BE;
2176 use DescUploadRetryError as DURE;
2177
2178 match e {
2179 BE::FatalError(e) => DURE::FatalError(e),
2180 BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2181 BE::Timeout(e) => DURE::Timeout(e),
2182 BE::ExplicitStop(_) => {
2183 DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2184 }
2185 }
2186 }
2187}
2188
2189// NOTE: the rest of the publisher tests live in publish.rs
2190#[cfg(test)]
2191mod test {
2192 // @@ begin test lint list maintained by maint/add_warning @@
2193 #![allow(clippy::bool_assert_comparison)]
2194 #![allow(clippy::clone_on_copy)]
2195 #![allow(clippy::dbg_macro)]
2196 #![allow(clippy::mixed_attributes_style)]
2197 #![allow(clippy::print_stderr)]
2198 #![allow(clippy::print_stdout)]
2199 #![allow(clippy::single_char_pattern)]
2200 #![allow(clippy::unwrap_used)]
2201 #![allow(clippy::unchecked_time_subtraction)]
2202 #![allow(clippy::useless_vec)]
2203 #![allow(clippy::needless_pass_by_value)]
2204 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2205 use super::*;
2206 use tor_netdir::testnet;
2207
2208 /// Create a `TimePeriodContext` from the specified upload results.
2209 fn create_time_period_ctx(
2210 params: &HsDirParams,
2211 upload_results: Vec<HsDirUploadStatus>,
2212 ) -> TimePeriodContext {
2213 TimePeriodContext {
2214 params: params.clone(),
2215 hs_dirs: vec![],
2216 last_successful: None,
2217 upload_results,
2218 }
2219 }
2220
2221 /// Create a single `HsDirUploadStatus`
2222 fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2223 HsDirUploadStatus {
2224 relay_ids: RelayIds::empty(),
2225 upload_res,
2226 revision_counter: RevisionCounter::from(13),
2227 }
2228 }
2229
2230 /// Create a bunch of results, all with the specified `upload_res`.
2231 fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2232 std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2233 .take(10)
2234 .collect()
2235 }
2236
2237 fn construct_netdir() -> NetDir {
2238 const SRV1: [u8; 32] = *b"The door refused to open. ";
2239 const SRV2: [u8; 32] = *b"It said, 'Five cents, please.' ";
2240
2241 let dir = testnet::construct_custom_netdir(|_, _, bld| {
2242 bld.shared_rand_prev(7, SRV1.into(), None)
2243 .shared_rand_prev(7, SRV2.into(), None);
2244 })
2245 .unwrap();
2246
2247 dir.unwrap_if_sufficient().unwrap()
2248 }
2249
2250 #[test]
2251 fn upload_result_status_bootstrapping() {
2252 let netdir = construct_netdir();
2253 let all_params = netdir.hs_all_time_periods();
2254 let current_period = netdir.hs_time_period();
2255 let primary_params = all_params
2256 .iter()
2257 .find(|param| param.time_period() == current_period)
2258 .unwrap();
2259 let results = [
2260 (vec![], vec![]),
2261 (vec![], create_upload_results(Ok(()))),
2262 (create_upload_results(Ok(())), vec![]),
2263 ];
2264
2265 for (primary_result, secondary_result) in results {
2266 let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2267
2268 let secondary_params = all_params
2269 .iter()
2270 .find(|param| param.time_period() != current_period)
2271 .unwrap();
2272 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2273
2274 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2275 assert_eq!(status, State::Bootstrapping);
2276 assert!(err.is_none());
2277 }
2278 }
2279
2280 #[test]
2281 fn upload_result_status_running() {
2282 let netdir = construct_netdir();
2283 let all_params = netdir.hs_all_time_periods();
2284 let current_period = netdir.hs_time_period();
2285 let primary_params = all_params
2286 .iter()
2287 .find(|param| param.time_period() == current_period)
2288 .unwrap();
2289
2290 let secondary_result = create_upload_results(Ok(()));
2291 let secondary_params = all_params
2292 .iter()
2293 .find(|param| param.time_period() != current_period)
2294 .unwrap();
2295 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2296
2297 let primary_result = create_upload_results(Ok(()));
2298 let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2299 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2300 assert_eq!(status, State::Running);
2301 assert!(err.is_none());
2302 }
2303
2304 #[test]
2305 fn upload_result_status_reachable() {
2306 let netdir = construct_netdir();
2307 let all_params = netdir.hs_all_time_periods();
2308 let current_period = netdir.hs_time_period();
2309 let primary_params = all_params
2310 .iter()
2311 .find(|param| param.time_period() == current_period)
2312 .unwrap();
2313
2314 let primary_result = create_upload_results(Ok(()));
2315 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2316 let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2317 let secondary_result = create_upload_results(Ok(()))
2318 .into_iter()
2319 .chain(failed_res.iter().cloned())
2320 .collect();
2321 let secondary_params = all_params
2322 .iter()
2323 .find(|param| param.time_period() != current_period)
2324 .unwrap();
2325 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2326 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2327
2328 // Degraded but reachable (because some of the secondary HsDir uploads failed).
2329 assert_eq!(status, State::DegradedReachable);
2330 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2331 }
2332
2333 #[test]
2334 fn upload_result_status_unreachable() {
2335 let netdir = construct_netdir();
2336 let all_params = netdir.hs_all_time_periods();
2337 let current_period = netdir.hs_time_period();
2338 let primary_params = all_params
2339 .iter()
2340 .find(|param| param.time_period() == current_period)
2341 .unwrap();
2342 let mut primary_result =
2343 create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2344 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2345 // No secondary TP (we are unreachable).
2346 let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2347 assert_eq!(status, State::DegradedUnreachable);
2348 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2349
2350 // Add a successful result
2351 primary_result.push(create_upload_status(Ok(())));
2352 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2353 let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2354 // Still degraded, and unreachable (because we don't have a TimePeriodContext
2355 // for the secondary TP)
2356 assert_eq!(status, State::DegradedUnreachable);
2357 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2358
2359 // If we add another time period where none of the uploads were successful,
2360 // we're *still* unreachable
2361 let secondary_result =
2362 create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2363 let secondary_params = all_params
2364 .iter()
2365 .find(|param| param.time_period() != current_period)
2366 .unwrap();
2367 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2368 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2369 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2370 assert_eq!(status, State::DegradedUnreachable);
2371 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2372 }
2373}