tor_hsservice/ipt_mgr.rs
1//! IPT Manager
2//!
3//! Maintains introduction points and publishes descriptors.
4//! Provides a stream of rendezvous requests.
5//!
6//! See [`IptManager::run_once`] for discussion of the implementation approach.
7
8use crate::{internal_prelude::*, replay::OpenReplayLogError};
9
10use IptStatusStatus as ISS;
11use TrackedStatus as TS;
12use tor_relay_selection::{RelayExclusion, RelaySelector, RelayUsage};
13
14mod persist;
15pub(crate) use persist::IptStorageHandle;
16
17pub use crate::ipt_establish::IptError;
18
19/// Expiry time to put on an interim descriptor (IPT publication set Uncertain)
20///
21/// (Note that we use the same value in both cases, since it doesn't actually do
22/// much good to have a short expiration time. This expiration time only affects
23/// caches, and we can supersede an old descriptor just by publishing it. Thus,
24/// we pick a uniform publication time as done by the C tor implementation.)
25const IPT_PUBLISH_UNCERTAIN: Duration = Duration::from_secs(3 * 60 * 60); // 3 hours
26/// Expiry time to put on a final descriptor (IPT publication set Certain
27const IPT_PUBLISH_CERTAIN: Duration = IPT_PUBLISH_UNCERTAIN;
28
29//========== data structures ==========
30
31/// IPT Manager (for one hidden service)
32#[derive(Educe)]
33#[educe(Debug(bound))]
34pub(crate) struct IptManager<R, M> {
35 /// Immutable contents
36 imm: Immutable<R>,
37
38 /// Mutable state
39 state: State<R, M>,
40}
41
42/// Immutable contents of an IPT Manager
43///
44/// Contains things inherent to our identity, and
45/// handles to services that we'll be using.
46#[derive(Educe)]
47#[educe(Debug(bound))]
48pub(crate) struct Immutable<R> {
49 /// Runtime
50 #[educe(Debug(ignore))]
51 runtime: R,
52
53 /// Netdir provider
54 #[educe(Debug(ignore))]
55 dirprovider: Arc<dyn NetDirProvider>,
56
57 /// Nickname
58 nick: HsNickname,
59
60 /// Output MPSC for rendezvous requests
61 ///
62 /// Passed to IPT Establishers we create
63 output_rend_reqs: mpsc::Sender<RendRequest>,
64
65 /// Internal channel for updates from IPT Establishers (sender)
66 ///
67 /// When we make a new `IptEstablisher` we use this arrange for
68 /// its status updates to arrive, appropriately tagged, via `status_recv`
69 status_send: mpsc::Sender<(IptLocalId, IptStatus)>,
70
71 /// The key manager.
72 #[educe(Debug(ignore))]
73 keymgr: Arc<KeyMgr>,
74
75 /// Replay log directory
76 ///
77 /// Files are named after the (bare) IptLocalId
78 #[educe(Debug(ignore))]
79 replay_log_dir: tor_persist::state_dir::InstanceRawSubdir,
80
81 /// A sender for updating the status of the onion service.
82 #[educe(Debug(ignore))]
83 status_tx: IptMgrStatusSender,
84}
85
86/// State of an IPT Manager
87#[derive(Educe)]
88#[educe(Debug(bound))]
89pub(crate) struct State<R, M> {
90 /// Source of configuration updates
91 //
92 // TODO #1209 reject reconfigurations we can't cope with
93 // for example, state dir changes will go quite wrong
94 new_configs: watch::Receiver<Arc<OnionServiceConfig>>,
95
96 /// Last configuration update we received
97 ///
98 /// This is the snapshot of the config we are currently using.
99 /// (Doing it this way avoids running our algorithms
100 /// with a mixture of old and new config.)
101 current_config: Arc<OnionServiceConfig>,
102
103 /// Channel for updates from IPT Establishers (receiver)
104 ///
105 /// We arrange for all the updates to be multiplexed,
106 /// as that makes handling them easy in our event loop.
107 status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>,
108
109 /// State: selected relays
110 ///
111 /// We append to this, and call `retain` on it,
112 /// so these are in chronological order of selection.
113 irelays: Vec<IptRelay>,
114
115 /// Did we fail to select a relay last time?
116 ///
117 /// This can only be caused (or triggered) by a busted netdir or config.
118 last_irelay_selection_outcome: Result<(), ()>,
119
120 /// Have we removed any IPTs but not yet cleaned up keys and logfiles?
121 #[educe(Debug(ignore))]
122 ipt_removal_cleanup_needed: bool,
123
124 /// Signal for us to shut down
125 shutdown: broadcast::Receiver<Void>,
126
127 /// The on-disk state storage handle.
128 #[educe(Debug(ignore))]
129 storage: IptStorageHandle,
130
131 /// Mockable state, normally [`Real`]
132 ///
133 /// This is in `State` so it can be passed mutably to tests,
134 /// even though the main code doesn't need `mut`
135 /// since `HsCircPool` is a service with interior mutability.
136 mockable: M,
137
138 /// Runtime (to placate compiler)
139 runtime: PhantomData<R>,
140}
141
142/// One selected relay, at which we are establishing (or relavantly advertised) IPTs
143struct IptRelay {
144 /// The actual relay
145 relay: RelayIds,
146
147 /// The retirement time we selected for this relay
148 planned_retirement: Instant,
149
150 /// IPTs at this relay
151 ///
152 /// At most one will have [`IsCurrent`].
153 ///
154 /// We append to this, and call `retain` on it,
155 /// so these are in chronological order of selection.
156 ipts: Vec<Ipt>,
157}
158
159/// One introduction point, representation in memory
160#[derive(Debug)]
161struct Ipt {
162 /// Local persistent identifier
163 lid: IptLocalId,
164
165 /// Handle for the establisher; we keep this here just for its `Drop` action
166 establisher: Box<ErasedIptEstablisher>,
167
168 /// `KS_hs_ipt_sid`, `KP_hs_ipt_sid`
169 ///
170 /// This is an `Arc` because:
171 /// * The manager needs a copy so that it can save it to disk.
172 /// * The establisher needs a copy to actually use.
173 /// * The underlying secret key type is not `Clone`.
174 k_sid: Arc<HsIntroPtSessionIdKeypair>,
175
176 /// `KS_hss_ntor`, `KP_hss_ntor`
177 k_hss_ntor: Arc<HsSvcNtorKeypair>,
178
179 /// Last information about how it's doing including timing info
180 status_last: TrackedStatus,
181
182 /// Until when ought we to try to maintain it
183 ///
184 /// For introduction points we are publishing,
185 /// this is a copy of the value set by the publisher
186 /// in the `IptSet` we share with the publisher,
187 ///
188 /// (`None` means the IPT has not been advertised at all yet.)
189 ///
190 /// We must duplicate the information because:
191 ///
192 /// * We can't have it just live in the shared `IptSet`
193 /// because we need to retain it for no-longer-being published IPTs.
194 ///
195 /// * We can't have it just live here because the publisher needs to update it.
196 ///
197 /// (An alternative would be to more seriously entangle the manager and publisher.)
198 last_descriptor_expiry_including_slop: Option<Instant>,
199
200 /// Is this IPT current - should we include it in descriptors ?
201 ///
202 /// `None` might mean:
203 /// * WantsToRetire
204 /// * We have >N IPTs and we have been using this IPT so long we want to rotate it out
205 /// (the [`IptRelay`] has reached its `planned_retirement` time)
206 /// * The IPT has wrong parameters of some kind, and needs to be replaced
207 /// (Eg, we set it up with the wrong DOS_PARAMS extension)
208 is_current: Option<IsCurrent>,
209}
210
211/// Last information from establisher about an IPT, with timing info added by us
212#[derive(Debug)]
213enum TrackedStatus {
214 /// Corresponds to [`IptStatusStatus::Faulty`]
215 Faulty {
216 /// When we were first told this started to establish, if we know it
217 ///
218 /// This might be an early estimate, which would give an overestimate
219 /// of the establishment time, which is fine.
220 /// Or it might be `Err` meaning we don't know.
221 started: Result<Instant, ()>,
222
223 /// The error, if any.
224 error: Option<IptError>,
225 },
226
227 /// Corresponds to [`IptStatusStatus::Establishing`]
228 Establishing {
229 /// When we were told we started to establish, for calculating `time_to_establish`
230 started: Instant,
231 },
232
233 /// Corresponds to [`IptStatusStatus::Good`]
234 Good {
235 /// How long it took to establish (if we could determine that information)
236 ///
237 /// Can only be `Err` in strange situations.
238 time_to_establish: Result<Duration, ()>,
239
240 /// Details, from the Establisher
241 details: ipt_establish::GoodIptDetails,
242 },
243}
244
245/// Token indicating that this introduction point is current (not Retiring)
246#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
247struct IsCurrent;
248
249//---------- related to mockability ----------
250
251/// Type-erased version of `Box<IptEstablisher>`
252///
253/// The real type is `M::IptEstablisher`.
254/// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc.
255type ErasedIptEstablisher = dyn Any + Send + Sync + 'static;
256
257/// Mockable state in an IPT Manager - real version
258#[derive(Educe)]
259#[educe(Debug)]
260pub(crate) struct Real<R: Runtime> {
261 /// Circuit pool for circuits we need to make
262 ///
263 /// Passed to the each new Establisher
264 #[educe(Debug(ignore))]
265 pub(crate) circ_pool: Arc<HsCircPool<R>>,
266}
267
268//---------- errors ----------
269
270/// An error that happened while trying to select a relay
271///
272/// Used only within the IPT manager.
273/// Can only be caused by bad netdir or maybe bad config.
274#[derive(Debug, Error)]
275enum ChooseIptError {
276 /// Bad or insufficient netdir
277 #[error("bad or insufficient netdir")]
278 NetDir(#[from] tor_netdir::Error),
279 /// Too few suitable relays
280 #[error("too few suitable relays")]
281 TooFewUsableRelays,
282 /// Time overflow
283 #[error("time overflow (system clock set wrong?)")]
284 TimeOverflow,
285 /// Internal error
286 #[error("internal error")]
287 Bug(#[from] Bug),
288}
289
290/// An error that happened while trying to crate an IPT (at a selected relay)
291///
292/// Used only within the IPT manager.
293#[derive(Clone, Debug, Error)]
294pub(crate) enum CreateIptError {
295 /// Fatal error
296 #[error("fatal error")]
297 Fatal(#[from] FatalError),
298
299 /// Error accessing keystore
300 #[error("problems with keystores")]
301 Keystore(#[from] tor_keymgr::Error),
302
303 /// Error opening the intro request replay log
304 #[error(transparent)]
305 OpenReplayLog(#[from] OpenReplayLogError),
306}
307
308//========== Relays we've chosen, and IPTs ==========
309
310impl IptRelay {
311 /// Get a reference to this IPT relay's current intro point state (if any)
312 ///
313 /// `None` means this IPT has no current introduction points.
314 /// That might be, briefly, because a new intro point needs to be created;
315 /// or it might be because we are retiring the relay.
316 fn current_ipt(&self) -> Option<&Ipt> {
317 self.ipts
318 .iter()
319 .find(|ipt| ipt.is_current == Some(IsCurrent))
320 }
321
322 /// Get a mutable reference to this IPT relay's current intro point state (if any)
323 fn current_ipt_mut(&mut self) -> Option<&mut Ipt> {
324 self.ipts
325 .iter_mut()
326 .find(|ipt| ipt.is_current == Some(IsCurrent))
327 }
328
329 /// Should this IPT Relay be retired ?
330 ///
331 /// This is determined by our IPT relay rotation time.
332 fn should_retire(&self, now: &TrackingNow) -> bool {
333 now > &self.planned_retirement
334 }
335
336 /// Make a new introduction point at this relay
337 ///
338 /// It becomes the current IPT.
339 fn make_new_ipt<R: Runtime, M: Mockable<R>>(
340 &mut self,
341 imm: &Immutable<R>,
342 new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
343 mockable: &mut M,
344 ) -> Result<(), CreateIptError> {
345 let lid: IptLocalId = mockable.thread_rng().random();
346
347 let ipt = Ipt::start_establisher(
348 imm,
349 new_configs,
350 mockable,
351 &self.relay,
352 lid,
353 Some(IsCurrent),
354 None::<IptExpectExistingKeys>,
355 // None is precisely right: the descriptor hasn't been published.
356 PromiseLastDescriptorExpiryNoneIsGood {},
357 )?;
358
359 self.ipts.push(ipt);
360
361 Ok(())
362 }
363}
364
365/// Token, representing promise by caller of `start_establisher`
366///
367/// Caller who makes one of these structs promises that it is OK for `start_establisher`
368/// to set `last_descriptor_expiry_including_slop` to `None`.
369struct PromiseLastDescriptorExpiryNoneIsGood {}
370
371/// Token telling [`Ipt::start_establisher`] to expect existing keys in the keystore
372#[derive(Debug, Clone, Copy)]
373struct IptExpectExistingKeys;
374
375impl Ipt {
376 /// Start a new IPT establisher, and create and return an `Ipt`
377 #[allow(clippy::too_many_arguments)] // There's only two call sites
378 fn start_establisher<R: Runtime, M: Mockable<R>>(
379 imm: &Immutable<R>,
380 new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
381 mockable: &mut M,
382 relay: &RelayIds,
383 lid: IptLocalId,
384 is_current: Option<IsCurrent>,
385 expect_existing_keys: Option<IptExpectExistingKeys>,
386 _: PromiseLastDescriptorExpiryNoneIsGood,
387 ) -> Result<Ipt, CreateIptError> {
388 let mut rng = tor_llcrypto::rng::CautiousRng;
389
390 /// Load (from disk) or generate an IPT key with role IptKeyRole::$role
391 ///
392 /// Ideally this would be a closure, but it has to be generic over the
393 /// returned key type. So it's a macro. (A proper function would have
394 /// many type parameters and arguments and be quite annoying.)
395 macro_rules! get_or_gen_key { { $Keypair:ty, $role:ident } => { (||{
396 let spec = IptKeySpecifier {
397 nick: imm.nick.clone(),
398 role: IptKeyRole::$role,
399 lid,
400 };
401 // Our desired behaviour:
402 // expect_existing_keys == None
403 // The keys shouldn't exist. Generate and insert.
404 // If they do exist then things are badly messed up
405 // (we're creating a new IPT with a fres lid).
406 // So, then, crash.
407 // expect_existing_keys == Some(IptExpectExistingKeys)
408 // The key is supposed to exist. Load them.
409 // We ought to have stored them before storing in our on-disk records that
410 // this IPT exists. But this could happen due to file deletion or something.
411 // And we could recover by creating fresh keys, although maybe some clients
412 // would find the previous keys in old descriptors.
413 // So if the keys are missing, make and store new ones, logging an error msg.
414 let k: Option<$Keypair> = imm.keymgr.get(&spec)?;
415 let arti_path = || {
416 spec
417 .arti_path()
418 .map_err(|e| {
419 CreateIptError::Fatal(
420 into_internal!("bad ArtiPath from IPT key spec")(e).into()
421 )
422 })
423 };
424 match (expect_existing_keys, k) {
425 (None, None) => { }
426 (Some(_), Some(k)) => return Ok(Arc::new(k)),
427 (None, Some(_)) => {
428 return Err(FatalError::IptKeysFoundUnexpectedly(arti_path()?).into())
429 },
430 (Some(_), None) => {
431 error!("bug: HS service {} missing previous key {:?}. Regenerating.",
432 &imm.nick, arti_path()?);
433 }
434 }
435
436 let res = imm.keymgr.generate::<$Keypair>(
437 &spec,
438 tor_keymgr::KeystoreSelector::Primary,
439 &mut rng,
440 false, /* overwrite */
441 );
442
443 match res {
444 Ok(k) => Ok::<_, CreateIptError>(Arc::new(k)),
445 Err(tor_keymgr::Error::KeyAlreadyExists) => {
446 Err(FatalError::KeystoreRace { action: "generate", path: arti_path()? }.into() )
447 },
448 Err(e) => Err(e.into()),
449 }
450 })() } }
451
452 let k_hss_ntor = get_or_gen_key!(HsSvcNtorKeypair, KHssNtor)?;
453 let k_sid = get_or_gen_key!(HsIntroPtSessionIdKeypair, KSid)?;
454
455 // we'll treat it as Establishing until we find otherwise
456 let status_last = TS::Establishing {
457 started: imm.runtime.now(),
458 };
459
460 // TODO #1186 Support ephemeral services (without persistent replay log)
461 let replay_log = IptReplayLog::new_logged(&imm.replay_log_dir, &lid)?;
462
463 let params = IptParameters {
464 replay_log,
465 config_rx: new_configs.clone(),
466 netdir_provider: imm.dirprovider.clone(),
467 introduce_tx: imm.output_rend_reqs.clone(),
468 lid,
469 target: relay.clone(),
470 k_sid: k_sid.clone(),
471 k_ntor: Arc::clone(&k_hss_ntor),
472 accepting_requests: ipt_establish::RequestDisposition::NotAdvertised,
473 };
474 let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, params)?;
475
476 // This task will shut down when self.establisher is dropped, causing
477 // watch_tx to close.
478 imm.runtime
479 .spawn({
480 let mut status_send = imm.status_send.clone();
481 async move {
482 loop {
483 let Some(status) = watch_rx.next().await else {
484 trace!("HS service IPT status task: establisher went away");
485 break;
486 };
487 match status_send.send((lid, status)).await {
488 Ok(()) => {}
489 Err::<_, mpsc::SendError>(e) => {
490 // Not using trace_report because SendError isn't HasKind
491 trace!("HS service IPT status task: manager went away: {e}");
492 break;
493 }
494 }
495 }
496 }
497 })
498 .map_err(|cause| FatalError::Spawn {
499 spawning: "IPT establisher watch status task",
500 cause: cause.into(),
501 })?;
502
503 let ipt = Ipt {
504 lid,
505 establisher: Box::new(establisher),
506 k_hss_ntor,
507 k_sid,
508 status_last,
509 is_current,
510 last_descriptor_expiry_including_slop: None,
511 };
512
513 debug!(
514 "Hs service {}: {lid:?} establishing {} IPT at relay {}",
515 &imm.nick,
516 match expect_existing_keys {
517 None => "new",
518 Some(_) => "previous",
519 },
520 &relay,
521 );
522
523 Ok(ipt)
524 }
525
526 /// Returns `true` if this IPT has status Good (and should perhaps be published)
527 fn is_good(&self) -> bool {
528 match self.status_last {
529 TS::Good { .. } => true,
530 TS::Establishing { .. } | TS::Faulty { .. } => false,
531 }
532 }
533
534 /// Returns the error, if any, we are currently encountering at this IPT.
535 fn error(&self) -> Option<&IptError> {
536 match &self.status_last {
537 TS::Good { .. } | TS::Establishing { .. } => None,
538 TS::Faulty { error, .. } => error.as_ref(),
539 }
540 }
541
542 /// Construct the information needed by the publisher for this intro point
543 fn for_publish(&self, details: &ipt_establish::GoodIptDetails) -> Result<ipt_set::Ipt, Bug> {
544 let k_sid: &ed25519::Keypair = (*self.k_sid).as_ref();
545 tor_netdoc::doc::hsdesc::IntroPointDesc::builder()
546 .link_specifiers(details.link_specifiers.clone())
547 .ipt_kp_ntor(details.ipt_kp_ntor)
548 .kp_hs_ipt_sid(k_sid.verifying_key().into())
549 .kp_hss_ntor(self.k_hss_ntor.public().clone())
550 .build()
551 .map_err(into_internal!("failed to construct IntroPointDesc"))
552 }
553}
554
555impl HasKind for ChooseIptError {
556 fn kind(&self) -> ErrorKind {
557 use ChooseIptError as E;
558 use ErrorKind as EK;
559 match self {
560 E::NetDir(e) => e.kind(),
561 E::TooFewUsableRelays => EK::TorDirectoryUnusable,
562 E::TimeOverflow => EK::ClockSkew,
563 E::Bug(e) => e.kind(),
564 }
565 }
566}
567
568// This is somewhat abbreviated but it is legible and enough for most purposes.
569impl Debug for IptRelay {
570 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
571 writeln!(f, "IptRelay {}", self.relay)?;
572 write!(
573 f,
574 " planned_retirement: {:?}",
575 self.planned_retirement
576 )?;
577 for ipt in &self.ipts {
578 write!(
579 f,
580 "\n ipt {} {} {:?} ldeis={:?}",
581 match ipt.is_current {
582 Some(IsCurrent) => "cur",
583 None => "old",
584 },
585 &ipt.lid,
586 &ipt.status_last,
587 &ipt.last_descriptor_expiry_including_slop,
588 )?;
589 }
590 Ok(())
591 }
592}
593
594//========== impls on IptManager and State ==========
595
596impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
597 //
598 //---------- constructor and setup ----------
599
600 /// Create a new IptManager
601 #[allow(clippy::too_many_arguments)] // this is an internal function with 1 call site
602 pub(crate) fn new(
603 runtime: R,
604 dirprovider: Arc<dyn NetDirProvider>,
605 nick: HsNickname,
606 config: watch::Receiver<Arc<OnionServiceConfig>>,
607 output_rend_reqs: mpsc::Sender<RendRequest>,
608 shutdown: broadcast::Receiver<Void>,
609 state_handle: &tor_persist::state_dir::InstanceStateHandle,
610 mockable: M,
611 keymgr: Arc<KeyMgr>,
612 status_tx: IptMgrStatusSender,
613 ) -> Result<Self, StartupError> {
614 let irelays = vec![]; // See TODO near persist::load call, in launch_background_tasks
615
616 // We don't need buffering; since this is written to by dedicated tasks which
617 // are reading watches.
618 //
619 // Internally-generated status updates (hopefully rate limited?), no need for mq.
620 let (status_send, status_recv) = mpsc_channel_no_memquota(0);
621
622 let storage = state_handle
623 .storage_handle("ipts")
624 .map_err(StartupError::StateDirectoryInaccessible)?;
625
626 let replay_log_dir = state_handle
627 .raw_subdir("iptreplay")
628 .map_err(StartupError::StateDirectoryInaccessible)?;
629
630 let imm = Immutable {
631 runtime,
632 dirprovider,
633 nick,
634 status_send,
635 output_rend_reqs,
636 keymgr,
637 replay_log_dir,
638 status_tx,
639 };
640 let current_config = config.borrow().clone();
641
642 let state = State {
643 current_config,
644 new_configs: config,
645 status_recv,
646 storage,
647 mockable,
648 shutdown,
649 irelays,
650 last_irelay_selection_outcome: Ok(()),
651 ipt_removal_cleanup_needed: false,
652 runtime: PhantomData,
653 };
654 let mgr = IptManager { imm, state };
655
656 Ok(mgr)
657 }
658
659 /// Send the IPT manager off to run and establish intro points
660 pub(crate) fn launch_background_tasks(
661 mut self,
662 mut publisher: IptsManagerView,
663 ) -> Result<(), StartupError> {
664 // TODO maybe this should be done in new(), so we don't have this dummy irelays
665 // but then new() would need the IptsManagerView
666 assert!(self.state.irelays.is_empty());
667 self.state.irelays = persist::load(
668 &self.imm,
669 &self.state.storage,
670 &self.state.new_configs,
671 &mut self.state.mockable,
672 &publisher.borrow_for_read(),
673 )?;
674
675 // Now that we've populated `irelays` and its `ipts` from the on-disk state,
676 // we should check any leftover disk files from previous runs. Make a note.
677 self.state.ipt_removal_cleanup_needed = true;
678
679 let runtime = self.imm.runtime.clone();
680
681 self.imm.status_tx.send(IptMgrState::Bootstrapping, None);
682
683 // This task will shut down when the RunningOnionService is dropped, causing
684 // self.state.shutdown to become ready.
685 runtime
686 .spawn(self.main_loop_task(publisher))
687 .map_err(|cause| StartupError::Spawn {
688 spawning: "ipt manager",
689 cause: cause.into(),
690 })?;
691 Ok(())
692 }
693
694 //---------- internal utility and helper methods ----------
695
696 /// Iterate over *all* the IPTs we know about
697 ///
698 /// Yields each `IptRelay` at most once.
699 fn all_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
700 self.state
701 .irelays
702 .iter()
703 .flat_map(|ir| ir.ipts.iter().map(move |ipt| (ir, ipt)))
704 }
705
706 /// Iterate over the *current* IPTs
707 ///
708 /// Yields each `IptRelay` at most once.
709 fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
710 self.state
711 .irelays
712 .iter()
713 .filter_map(|ir| Some((ir, ir.current_ipt()?)))
714 }
715
716 /// Iterate over the *current* IPTs in `Good` state
717 fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
718 self.current_ipts().filter(|(_ir, ipt)| ipt.is_good())
719 }
720
721 /// Iterate over the current IPT errors.
722 ///
723 /// Used when reporting our state as [`Recovering`](crate::status::State::Recovering).
724 fn ipt_errors(&self) -> impl Iterator<Item = &IptError> {
725 self.all_ipts().filter_map(|(_ir, ipt)| ipt.error())
726 }
727
728 /// Target number of intro points
729 pub(crate) fn target_n_intro_points(&self) -> usize {
730 self.state.current_config.num_intro_points.into()
731 }
732
733 /// Maximum number of concurrent intro point relays
734 pub(crate) fn max_n_intro_relays(&self) -> usize {
735 let params = self.imm.dirprovider.params();
736 let num_extra = (*params).as_ref().hs_intro_num_extra_intropoints.get() as usize;
737 self.target_n_intro_points() + num_extra
738 }
739
740 //---------- main implementation logic ----------
741
742 /// Make some progress, if possible, and say when to wake up again
743 ///
744 /// Examines the current state and attempts to improve it.
745 ///
746 /// If `idempotently_progress_things_now` makes any changes,
747 /// it will return `None`.
748 /// It should then be called again immediately.
749 ///
750 /// Otherwise, it returns the time in the future when further work ought to be done:
751 /// i.e., the time of the earliest timeout or planned future state change -
752 /// as a [`TrackingNow`].
753 ///
754 /// In that case, the caller must call `compute_iptsetstatus_publish`,
755 /// since the IPT set etc. may have changed.
756 ///
757 /// ### Goals and algorithms
758 ///
759 /// We attempt to maintain a pool of N established and verified IPTs,
760 /// at N IPT Relays.
761 ///
762 /// When we have fewer than N IPT Relays
763 /// that have `Establishing` or `Good` IPTs (see below)
764 /// and fewer than k*N IPT Relays overall,
765 /// we choose a new IPT Relay at random from the consensus
766 /// and try to establish an IPT on it.
767 ///
768 /// (Rationale for the k*N limit:
769 /// we do want to try to replace faulty IPTs, but
770 /// we don't want an attacker to be able to provoke us into
771 /// rapidly churning through IPT candidates.)
772 ///
773 /// When we select a new IPT Relay, we randomly choose a planned replacement time,
774 /// after which it becomes `Retiring`.
775 ///
776 /// Additionally, any IPT becomes `Retiring`
777 /// after it has been used for a certain number of introductions
778 /// (c.f. C Tor `#define INTRO_POINT_MIN_LIFETIME_INTRODUCTIONS 16384`.)
779 /// When this happens we retain the IPT Relay,
780 /// and make new parameters to make a new IPT at the same Relay.
781 ///
782 /// An IPT is removed from our records, and we give up on it,
783 /// when it is no longer `Good` or `Establishing`
784 /// and all descriptors that mentioned it have expired.
785 ///
786 /// (Until all published descriptors mentioning an IPT expire,
787 /// we consider ourselves bound by those previously-published descriptors,
788 /// and try to maintain the IPT.
789 /// TODO: Allegedly this is unnecessary, but I don't see how it could be.)
790 ///
791 /// ### Performance
792 ///
793 /// This function is at worst O(N) where N is the number of IPTs.
794 /// When handling state changes relating to a particular IPT (or IPT relay)
795 /// it needs at most O(1) calls to progress that one IPT to its proper new state.
796 ///
797 /// See the performance note on [`run_once()`](Self::run_once).
798 #[allow(clippy::redundant_closure_call)]
799 fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> {
800 /// Return value which means "we changed something, please run me again"
801 ///
802 /// In each case, if we make any changes which indicate we might
803 /// want to restart, , we `return CONTINUE`, and
804 /// our caller will just call us again.
805 ///
806 /// This approach simplifies the logic: everything here is idempotent.
807 /// (It does mean the algorithm can be quadratic in the number of intro points,
808 /// but that number is reasonably small for a modern computer and the constant
809 /// factor is small too.)
810 const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None);
811
812 // This tracks everything we compare it to, using interior mutability,
813 // so that if there is no work to do and no timeouts have expired,
814 // we know when we will want to wake up.
815 let now = TrackingNow::now(&self.imm.runtime);
816
817 // ---------- collect garbage ----------
818
819 // Rotate out an old IPT(s)
820 for ir in &mut self.state.irelays {
821 if ir.should_retire(&now) {
822 if let Some(ipt) = ir.current_ipt_mut() {
823 ipt.is_current = None;
824 return CONTINUE;
825 }
826 }
827 }
828
829 // Forget old IPTs (after the last descriptor mentioning them has expired)
830 for ir in &mut self.state.irelays {
831 // When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point
832 ir.ipts.retain(|ipt| {
833 let keep = ipt.is_current.is_some()
834 || match ipt.last_descriptor_expiry_including_slop {
835 None => false,
836 Some(last) => now < last,
837 };
838 // This is the only place in the manager where an IPT is dropped,
839 // other than when the whole service is dropped.
840 self.state.ipt_removal_cleanup_needed |= !keep;
841 keep
842 });
843 // No need to return CONTINUE, since there is no other future work implied
844 // by discarding a non-current IPT.
845 }
846
847 // Forget retired IPT relays (all their IPTs are gone)
848 self.state
849 .irelays
850 .retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty()));
851 // If we deleted relays, we might want to select new ones. That happens below.
852
853 // ---------- make progress ----------
854 //
855 // Consider selecting new relays and setting up new IPTs.
856
857 // Create new IPTs at already-chosen relays
858 for ir in &mut self.state.irelays {
859 if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() {
860 // We don't have a current IPT at this relay, but we should.
861 match ir.make_new_ipt(&self.imm, &self.state.new_configs, &mut self.state.mockable)
862 {
863 Ok(()) => return CONTINUE,
864 Err(CreateIptError::Fatal(fatal)) => return Err(fatal),
865 Err(
866 e @ (CreateIptError::Keystore(_) | CreateIptError::OpenReplayLog { .. }),
867 ) => {
868 error_report!(e, "HS {}: failed to prepare new IPT", &self.imm.nick);
869 // Let's not try any more of this.
870 // We'll run the rest of our "make progress" algorithms,
871 // presenting them with possibly-suboptimal state. That's fine.
872 // At some point we'll be poked to run again and then we'll retry.
873 /// Retry no later than this:
874 const STORAGE_RETRY: Duration = Duration::from_secs(60);
875 now.update(STORAGE_RETRY);
876 break;
877 }
878 }
879 }
880 }
881
882 // Consider choosing a new IPT relay
883 {
884 // block {} prevents use of `n_good_ish_relays` for other (wrong) purposes
885
886 // We optimistically count an Establishing IPT as good-ish;
887 // specifically, for the purposes of deciding whether to select a new
888 // relay because we don't have enough good-looking ones.
889 let n_good_ish_relays = self
890 .current_ipts()
891 .filter(|(_ir, ipt)| match ipt.status_last {
892 TS::Good { .. } | TS::Establishing { .. } => true,
893 TS::Faulty { .. } => false,
894 })
895 .count();
896
897 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] // in map_err
898 if n_good_ish_relays < self.target_n_intro_points()
899 && self.state.irelays.len() < self.max_n_intro_relays()
900 && self.state.last_irelay_selection_outcome.is_ok()
901 {
902 self.state.last_irelay_selection_outcome = self
903 .state
904 .choose_new_ipt_relay(&self.imm, now.instant().get_now_untracked())
905 .map_err(|error| {
906 /// Call $report! with the message.
907 // The macros are annoying and want a cost argument.
908 macro_rules! report { { $report:ident } => {
909 $report!(
910 error,
911 "HS service {} failed to select IPT relay",
912 &self.imm.nick,
913 )
914 }}
915 use ChooseIptError as E;
916 match &error {
917 E::NetDir(_) => report!(info_report),
918 _ => report!(error_report),
919 };
920 ()
921 });
922 return CONTINUE;
923 }
924 }
925
926 //---------- caller (run_once) will update publisher, and wait ----------
927
928 Ok(Some(now))
929 }
930
931 /// Import publisher's updates to latest descriptor expiry times
932 ///
933 /// Copies the `last_descriptor_expiry_including_slop` field
934 /// from each ipt in `publish_set` to the corresponding ipt in `self`.
935 ///
936 /// ### Performance
937 ///
938 /// This function is at worst O(N) where N is the number of IPTs.
939 /// See the performance note on [`run_once()`](Self::run_once).
940 fn import_new_expiry_times(irelays: &mut [IptRelay], publish_set: &PublishIptSet) {
941 // Every entry in the PublishIptSet ought to correspond to an ipt in self.
942 //
943 // If there are IPTs in publish_set.last_descriptor_expiry_including_slop
944 // that aren't in self, those are IPTs that we know were published,
945 // but can't establish since we have forgotten their details.
946 //
947 // We are not supposed to allow that to happen:
948 // we save IPTs to disk before we allow them to be published.
949 //
950 // (This invariant is across two data structures:
951 // `ipt_mgr::State` (specifically, `Ipt`) which is modified only here,
952 // and `ipt_set::PublishIptSet` which is shared with the publisher.
953 // See the comments in PublishIptSet.)
954
955 let all_ours = irelays.iter_mut().flat_map(|ir| ir.ipts.iter_mut());
956
957 for ours in all_ours {
958 if let Some(theirs) = publish_set
959 .last_descriptor_expiry_including_slop
960 .get(&ours.lid)
961 {
962 ours.last_descriptor_expiry_including_slop = Some(*theirs);
963 }
964 }
965 }
966
967 /// Expire old entries in publish_set.last_descriptor_expiry_including_slop
968 ///
969 /// Deletes entries where `now` > `last_descriptor_expiry_including_slop`,
970 /// ie, entries where the publication's validity time has expired,
971 /// meaning we don't need to maintain that IPT any more,
972 /// at least, not just because we've published it.
973 ///
974 /// We may expire even entries for IPTs that we, the manager, still want to maintain.
975 /// That's fine: this is (just) the information about what we have previously published.
976 ///
977 /// ### Performance
978 ///
979 /// This function is at worst O(N) where N is the number of IPTs.
980 /// See the performance note on [`run_once()`](Self::run_once).
981 fn expire_old_expiry_times(&self, publish_set: &mut PublishIptSet, now: &TrackingNow) {
982 // We don't want to bother waking up just to expire things,
983 // so use an untracked comparison.
984 let now = now.instant().get_now_untracked();
985
986 publish_set
987 .last_descriptor_expiry_including_slop
988 .retain(|_lid, expiry| *expiry <= now);
989 }
990
991 /// Compute the IPT set to publish, and update the data shared with the publisher
992 ///
993 /// `now` is current time and also the earliest wakeup,
994 /// which we are in the process of planning.
995 /// The noted earliest wakeup can be updated by this function,
996 /// for example, with a future time at which the IPT set ought to be published
997 /// (eg, the status goes from Unknown to Uncertain).
998 ///
999 /// ## IPT sets and lifetimes
1000 ///
1001 /// We remember every IPT we have published that is still valid.
1002 ///
1003 /// At each point in time we have an idea of set of IPTs we want to publish.
1004 /// The possibilities are:
1005 ///
1006 /// * `Certain`:
1007 /// We are sure of which IPTs we want to publish.
1008 /// We try to do so, talking to hsdirs as necessary,
1009 /// updating any existing information.
1010 /// (We also republish to an hsdir if its descriptor will expire soon,
1011 /// or we haven't published there since Arti was restarted.)
1012 ///
1013 /// * `Unknown`:
1014 /// We have no idea which IPTs to publish.
1015 /// We leave whatever is on the hsdirs as-is.
1016 ///
1017 /// * `Uncertain`:
1018 /// We have some IPTs we could publish,
1019 /// but we're not confident about them.
1020 /// We publish these to a particular hsdir if:
1021 /// - our last-published descriptor has expired
1022 /// - or it will expire soon
1023 /// - or if we haven't published since Arti was restarted.
1024 ///
1025 /// The idea of what to publish is calculated as follows:
1026 ///
1027 /// * If we have at least N `Good` IPTs: `Certain`.
1028 /// (We publish the "best" N IPTs for some definition of "best".
1029 /// TODO: should we use the fault count? recency?)
1030 ///
1031 /// * Unless we have at least one `Good` IPT: `Unknown`.
1032 ///
1033 /// * Otherwise: if there are IPTs in `Establishing`,
1034 /// and they have been in `Establishing` only a short time \[1\]:
1035 /// `Unknown`; otherwise `Uncertain`.
1036 ///
1037 /// The effect is that we delay publishing an initial descriptor
1038 /// by at most 1x the fastest IPT setup time,
1039 /// at most doubling the initial setup time.
1040 ///
1041 /// Each update to the IPT set that isn't `Unknown` comes with a
1042 /// proposed descriptor expiry time,
1043 /// which is used if the descriptor is to be actually published.
1044 /// The proposed descriptor lifetime for `Uncertain`
1045 /// is the minimum (30 minutes).
1046 /// Otherwise, we double the lifetime each time,
1047 /// unless any IPT in the previous descriptor was declared `Faulty`,
1048 /// in which case we reset it back to the minimum.
1049 /// TODO: Perhaps we should just pick fixed short and long lifetimes instead,
1050 /// to limit distinguishability.
1051 ///
1052 /// (Rationale: if IPTs are regularly misbehaving,
1053 /// we should be cautious and limit our exposure to the damage.)
1054 ///
1055 /// \[1\] NOTE: We wait a "short time" between establishing our first IPT,
1056 /// and publishing an incomplete (<N) descriptor -
1057 /// this is a compromise between
1058 /// availability (publishing as soon as we have any working IPT)
1059 /// and
1060 /// exposure and hsdir load
1061 /// (which would suggest publishing only when our IPT set is stable).
1062 /// One possible strategy is to wait as long again
1063 /// as the time it took to establish our first IPT.
1064 /// Another is to somehow use our circuit timing estimator.
1065 ///
1066 /// ### Performance
1067 ///
1068 /// This function is at worst O(N) where N is the number of IPTs.
1069 /// See the performance note on [`run_once()`](Self::run_once).
1070 #[allow(clippy::unnecessary_wraps)] // for regularity
1071 #[allow(clippy::cognitive_complexity)] // this function is in fact largely linear
1072 fn compute_iptsetstatus_publish(
1073 &mut self,
1074 now: &TrackingNow,
1075 publish_set: &mut PublishIptSet,
1076 ) -> Result<(), IptStoreError> {
1077 //---------- tell the publisher what to announce ----------
1078
1079 let very_recently: Option<(TrackingInstantOffsetNow, Duration)> = (|| {
1080 // on time overflow, don't treat any as started establishing very recently
1081
1082 let fastest_good_establish_time = self
1083 .current_ipts()
1084 .filter_map(|(_ir, ipt)| match ipt.status_last {
1085 TS::Good {
1086 time_to_establish, ..
1087 } => Some(time_to_establish.ok()?),
1088 TS::Establishing { .. } | TS::Faulty { .. } => None,
1089 })
1090 .min()?;
1091
1092 // Rationale:
1093 // we could use circuit timings etc., but arguably the actual time to establish
1094 // our fastest IPT is a better estimator here (and we want an optimistic,
1095 // rather than pessimistic estimate).
1096 //
1097 // This algorithm has potential to publish too early and frequently,
1098 // but our overall rate-limiting should keep it from getting out of hand.
1099 //
1100 // TODO: We might want to make this "1" tuneable, and/or tune the
1101 // algorithm as a whole based on experience.
1102 let wait_more = fastest_good_establish_time * 1;
1103 let very_recently = fastest_good_establish_time.checked_add(wait_more)?;
1104
1105 let very_recently = now.checked_sub(very_recently)?;
1106 Some((very_recently, wait_more))
1107 })();
1108
1109 let started_establishing_very_recently = || {
1110 let (very_recently, wait_more) = very_recently?;
1111 let lid = self
1112 .current_ipts()
1113 .filter_map(|(_ir, ipt)| {
1114 let started = match ipt.status_last {
1115 TS::Establishing { started } => Some(started),
1116 TS::Good { .. } | TS::Faulty { .. } => None,
1117 }?;
1118
1119 (started > very_recently).then_some(ipt.lid)
1120 })
1121 .next()?;
1122 Some((lid, wait_more))
1123 };
1124
1125 let n_good_ipts = self.good_ipts().count();
1126 let publish_lifetime = if n_good_ipts >= self.target_n_intro_points() {
1127 // "Certain" - we are sure of which IPTs we want to publish
1128 debug!(
1129 "HS service {}: {} good IPTs, >= target {}, publishing",
1130 &self.imm.nick,
1131 n_good_ipts,
1132 self.target_n_intro_points()
1133 );
1134
1135 self.imm.status_tx.send(IptMgrState::Running, None);
1136
1137 Some(IPT_PUBLISH_CERTAIN)
1138 } else if self.good_ipts().next().is_none()
1139 /* !... .is_empty() */
1140 {
1141 // "Unknown" - we have no idea which IPTs to publish.
1142 debug!("HS service {}: no good IPTs", &self.imm.nick);
1143
1144 self.imm
1145 .status_tx
1146 .send_recovering(self.ipt_errors().cloned().collect_vec());
1147
1148 None
1149 } else if let Some((wait_for, wait_more)) = started_establishing_very_recently() {
1150 // "Unknown" - we say have no idea which IPTs to publish:
1151 // although we have *some* idea, we hold off a bit to see if things improve.
1152 // The wait_more period started counting when the fastest IPT became ready,
1153 // so the printed value isn't an offset from the message timestamp.
1154 debug!(
1155 "HS service {}: {} good IPTs, < target {}, waiting up to {}ms for {:?}",
1156 &self.imm.nick,
1157 n_good_ipts,
1158 self.target_n_intro_points(),
1159 wait_more.as_millis(),
1160 wait_for
1161 );
1162
1163 self.imm
1164 .status_tx
1165 .send_recovering(self.ipt_errors().cloned().collect_vec());
1166
1167 None
1168 } else {
1169 // "Uncertain" - we have some IPTs we could publish, but we're not confident
1170 debug!(
1171 "HS service {}: {} good IPTs, < target {}, publishing what we have",
1172 &self.imm.nick,
1173 n_good_ipts,
1174 self.target_n_intro_points()
1175 );
1176
1177 // We are close to being Running -- we just need more IPTs!
1178 let errors = self.ipt_errors().cloned().collect_vec();
1179 let errors = if errors.is_empty() {
1180 None
1181 } else {
1182 Some(errors)
1183 };
1184
1185 self.imm
1186 .status_tx
1187 .send(IptMgrState::DegradedReachable, errors.map(|e| e.into()));
1188
1189 Some(IPT_PUBLISH_UNCERTAIN)
1190 };
1191
1192 publish_set.ipts = if let Some(lifetime) = publish_lifetime {
1193 let selected = self.publish_set_select();
1194 for ipt in &selected {
1195 self.state.mockable.start_accepting(&*ipt.establisher);
1196 }
1197 Some(Self::make_publish_set(selected, lifetime)?)
1198 } else {
1199 None
1200 };
1201
1202 //---------- store persistent state ----------
1203
1204 persist::store(&self.imm, &mut self.state)?;
1205
1206 Ok(())
1207 }
1208
1209 /// Select IPTs to publish, given that we have decided to publish *something*
1210 ///
1211 /// Calculates set of ipts to publish, selecting up to the target `N`
1212 /// from the available good current IPTs.
1213 /// (Old, non-current IPTs, that we are trying to retire, are never published.)
1214 ///
1215 /// The returned list is in the same order as our data structure:
1216 /// firstly, by the ordering in `State.irelays`, and then within each relay,
1217 /// by the ordering in `IptRelay.ipts`. Both of these are stable.
1218 ///
1219 /// ### Performance
1220 ///
1221 /// This function is at worst O(N) where N is the number of IPTs.
1222 /// See the performance note on [`run_once()`](Self::run_once).
1223 fn publish_set_select(&self) -> VecDeque<&Ipt> {
1224 /// Good candidate introduction point for publication
1225 type Candidate<'i> = &'i Ipt;
1226
1227 let target_n = self.target_n_intro_points();
1228
1229 let mut candidates: VecDeque<_> = self
1230 .state
1231 .irelays
1232 .iter()
1233 .filter_map(|ir: &_| -> Option<Candidate<'_>> {
1234 let current_ipt = ir.current_ipt()?;
1235 if !current_ipt.is_good() {
1236 return None;
1237 }
1238 Some(current_ipt)
1239 })
1240 .collect();
1241
1242 // Take the last N good IPT relays
1243 //
1244 // The way we manage irelays means that this is always
1245 // the ones we selected most recently.
1246 //
1247 // TODO SPEC Publication strategy when we have more than >N IPTs
1248 //
1249 // We could have a number of strategies here. We could take some timing
1250 // measurements, or use the establishment time, or something; but we don't
1251 // want to add distinguishability.
1252 //
1253 // Another concern is manipulability, but
1254 // We can't be forced to churn because we don't remove relays
1255 // from our list of relays to try to use, other than on our own schedule.
1256 // But we probably won't want to be too reactive to the network environment.
1257 //
1258 // Since we only choose new relays when old ones are to retire, or are faulty,
1259 // choosing the most recently selected, rather than the least recently,
1260 // has the effect of preferring relays we don't know to be faulty,
1261 // to ones we have considered faulty least once.
1262 //
1263 // That's better than the opposite. Also, choosing more recently selected relays
1264 // for publication may slightly bring forward the time at which all descriptors
1265 // mentioning that relay have expired, and then we can forget about it.
1266 while candidates.len() > target_n {
1267 // WTB: VecDeque::truncate_front
1268 let _: Candidate = candidates.pop_front().expect("empty?!");
1269 }
1270
1271 candidates
1272 }
1273
1274 /// Produce a `publish::IptSet`, from a list of IPT selected for publication
1275 ///
1276 /// Updates each chosen `Ipt`'s `last_descriptor_expiry_including_slop`
1277 ///
1278 /// The returned `IptSet` set is in the same order as `selected`.
1279 ///
1280 /// ### Performance
1281 ///
1282 /// This function is at worst O(N) where N is the number of IPTs.
1283 /// See the performance note on [`run_once()`](Self::run_once).
1284 fn make_publish_set<'i>(
1285 selected: impl IntoIterator<Item = &'i Ipt>,
1286 lifetime: Duration,
1287 ) -> Result<ipt_set::IptSet, FatalError> {
1288 let ipts = selected
1289 .into_iter()
1290 .map(|current_ipt| {
1291 let TS::Good { details, .. } = ¤t_ipt.status_last else {
1292 return Err(internal!("was good but now isn't?!").into());
1293 };
1294
1295 let publish = current_ipt.for_publish(details)?;
1296
1297 // last_descriptor_expiry_including_slop was earlier merged in from
1298 // the previous IptSet, and here we copy it back
1299 let publish = ipt_set::IptInSet {
1300 ipt: publish,
1301 lid: current_ipt.lid,
1302 };
1303
1304 Ok::<_, FatalError>(publish)
1305 })
1306 .collect::<Result<_, _>>()?;
1307
1308 Ok(ipt_set::IptSet { ipts, lifetime })
1309 }
1310
1311 /// Delete persistent on-disk data (including keys) for old IPTs
1312 ///
1313 /// More precisely, scan places where per-IPT data files live,
1314 /// and delete anything that doesn't correspond to
1315 /// one of the IPTs in our main in-memory data structure.
1316 ///
1317 /// Does *not* deal with deletion of data handled via storage handles
1318 /// (`state_dir::StorageHandle`), `ipt_mgr/persist.rs` etc.;
1319 /// those are one file for each service, so old data is removed as we rewrite them.
1320 ///
1321 /// Does *not* deal with deletion of entire old hidden services.
1322 ///
1323 /// (This function works on the basis of the invariant that every IPT
1324 /// in [`ipt_set::PublishIptSet`] is also an [`Ipt`] in [`ipt_mgr::State`](State).
1325 /// See the comment in [`IptManager::import_new_expiry_times`].
1326 /// If that invariant is violated, we would delete on-disk files for the affected IPTs.
1327 /// That's fine since we couldn't re-establish them anyway.)
1328 #[allow(clippy::cognitive_complexity)] // Splitting this up would make it worse
1329 fn expire_old_ipts_external_persistent_state(&self) -> Result<(), StateExpiryError> {
1330 self.state
1331 .mockable
1332 .expire_old_ipts_external_persistent_state_hook();
1333
1334 let all_ipts: HashSet<_> = self.all_ipts().map(|(_, ipt)| &ipt.lid).collect();
1335
1336 // Keys
1337
1338 let pat = IptKeySpecifierPattern {
1339 nick: Some(self.imm.nick.clone()),
1340 role: None,
1341 lid: None,
1342 }
1343 .arti_pattern()?;
1344
1345 let found = self.imm.keymgr.list_matching(&pat)?;
1346
1347 for entry in found {
1348 let path = entry.key_path();
1349 // Try to identify this key (including its IptLocalId)
1350 match IptKeySpecifier::try_from(path) {
1351 Ok(spec) if all_ipts.contains(&spec.lid) => continue,
1352 Ok(_) => trace!("deleting key for old IPT: {path}"),
1353 Err(bad) => info!("deleting unrecognised IPT key: {path} ({})", bad.report()),
1354 };
1355 // Not known, remove it
1356 self.imm.keymgr.remove_entry(&entry)?;
1357 }
1358
1359 // IPT replay logs
1360
1361 let handle_rl_err = |operation, path: &Path| {
1362 let path = path.to_owned();
1363 move |source| StateExpiryError::ReplayLog {
1364 operation,
1365 path,
1366 source: Arc::new(source),
1367 }
1368 };
1369
1370 // fs-mistrust doesn't offer CheckedDir::read_this_directory.
1371 // But, we probably don't mind that we're not doing many checks here.
1372 let replay_logs = self.imm.replay_log_dir.as_path();
1373 let replay_logs_dir =
1374 fs::read_dir(replay_logs).map_err(handle_rl_err("open dir", replay_logs))?;
1375
1376 for ent in replay_logs_dir {
1377 let ent = ent.map_err(handle_rl_err("read dir", replay_logs))?;
1378 let leaf = ent.file_name();
1379 // Try to identify this replay logfile (including its IptLocalId)
1380 match IptReplayLog::parse_log_leafname(&leaf) {
1381 Ok(lid) if all_ipts.contains(&lid) => continue,
1382 Ok(_) => trace!(
1383 leaf = leaf.to_string_lossy().as_ref(),
1384 "deleting replay log for old IPT"
1385 ),
1386 Err(bad) => info!(
1387 "deleting garbage in IPT replay log dir: {} ({})",
1388 leaf.to_string_lossy(),
1389 bad
1390 ),
1391 }
1392 // Not known, remove it
1393 let path = ent.path();
1394 fs::remove_file(&path).map_err(handle_rl_err("remove", &path))?;
1395 }
1396
1397 Ok(())
1398 }
1399
1400 /// Run one iteration of the loop
1401 ///
1402 /// Either do some work, making changes to our state,
1403 /// or, if there's nothing to be done, wait until there *is* something to do.
1404 ///
1405 /// ### Implementation approach
1406 ///
1407 /// Every time we wake up we idempotently make progress
1408 /// by searching our whole state machine, looking for something to do.
1409 /// If we find something to do, we do that one thing, and search again.
1410 /// When we're done, we unconditionally recalculate the IPTs to publish, and sleep.
1411 ///
1412 /// This approach avoids the need for complicated reasoning about
1413 /// which state updates need to trigger other state updates,
1414 /// and thereby avoids several classes of potential bugs.
1415 /// However, it has some performance implications:
1416 ///
1417 /// ### Performance
1418 ///
1419 /// Events relating to an IPT occur, at worst,
1420 /// at a rate proportional to the current number of IPTs,
1421 /// times the maximum flap rate of any one IPT.
1422 ///
1423 /// [`idempotently_progress_things_now`](Self::idempotently_progress_things_now)
1424 /// can be called more than once for each such event,
1425 /// but only a finite number of times per IPT.
1426 ///
1427 /// Therefore, overall, our work rate is O(N^2) where N is the number of IPTs.
1428 /// We think this is tolerable,
1429 /// but it does mean that the principal functions should be written
1430 /// with an eye to avoiding "accidentally quadratic" algorithms,
1431 /// because that would make the whole manager cubic.
1432 /// Ideally we would avoid O(N.log(N)) algorithms.
1433 ///
1434 /// (Note that the number of IPTs can be significantly larger than
1435 /// the maximum target of 20, if the service is very busy so the intro points
1436 /// are cycling rapidly due to the need to replace the replay database.)
1437 #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
1438 async fn run_once(
1439 &mut self,
1440 // This is a separate argument for borrowck reasons
1441 publisher: &mut IptsManagerView,
1442 ) -> Result<ShutdownStatus, FatalError> {
1443 let now = {
1444 // Block to persuade borrow checker that publish_set isn't
1445 // held over an await point.
1446
1447 let mut publish_set = publisher.borrow_for_update(self.imm.runtime.clone());
1448
1449 Self::import_new_expiry_times(&mut self.state.irelays, &publish_set);
1450
1451 let mut loop_limit = 0..(
1452 // Work we do might be O(number of intro points),
1453 // but we might also have cycled the intro points due to many requests.
1454 // 10K is a guess at a stupid upper bound on the number of times we
1455 // might cycle ipts during a descriptor lifetime.
1456 // We don't need a tight bound; if we're going to crash. we can spin a bit first.
1457 (self.target_n_intro_points() + 1) * 10_000
1458 );
1459 let now = loop {
1460 let _: usize = loop_limit.next().expect("IPT manager is looping");
1461
1462 if let Some(now) = self.idempotently_progress_things_now()? {
1463 break now;
1464 }
1465 };
1466
1467 // TODO #1214 Maybe something at level Error or Info, for example
1468 // Log an error if everything is terrilbe
1469 // - we have >=N Faulty IPTs ?
1470 // we have only Faulty IPTs and can't select another due to 2N limit ?
1471 // Log at info if and when we publish? Maybe the publisher should do that?
1472
1473 if let Err(operr) = self.compute_iptsetstatus_publish(&now, &mut publish_set) {
1474 // This is not good, is it.
1475 publish_set.ipts = None;
1476 let wait = operr.log_retry_max(&self.imm.nick)?;
1477 now.update(wait);
1478 };
1479
1480 self.expire_old_expiry_times(&mut publish_set, &now);
1481
1482 drop(publish_set); // release lock, and notify publisher of any changes
1483
1484 if self.state.ipt_removal_cleanup_needed {
1485 let outcome = self.expire_old_ipts_external_persistent_state();
1486 log_ratelim!("removing state for old IPT(s)"; outcome);
1487 match outcome {
1488 Ok(()) => self.state.ipt_removal_cleanup_needed = false,
1489 Err(_already_logged) => {}
1490 }
1491 }
1492
1493 now
1494 };
1495
1496 assert_ne!(
1497 now.clone().shortest(),
1498 Some(Duration::ZERO),
1499 "IPT manager zero timeout, would loop"
1500 );
1501
1502 let mut new_configs = self.state.new_configs.next().fuse();
1503
1504 select_biased! {
1505 () = now.wait_for_earliest(&self.imm.runtime).fuse() => {},
1506 shutdown = self.state.shutdown.next().fuse() => {
1507 info!("HS service {}: terminating due to shutdown signal", &self.imm.nick);
1508 // We shouldn't be receiving anything on thisi channel.
1509 assert!(shutdown.is_none());
1510 return Ok(ShutdownStatus::Terminate)
1511 },
1512
1513 update = self.state.status_recv.next() => {
1514 let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?;
1515 self.state.handle_ipt_status_update(&self.imm, lid, update);
1516 }
1517
1518 _dir_event = async {
1519 match self.state.last_irelay_selection_outcome {
1520 Ok(()) => future::pending().await,
1521 // This boxes needlessly but it shouldn't really happen
1522 Err(()) => self.imm.dirprovider.events().next().await,
1523 }
1524 }.fuse() => {
1525 self.state.last_irelay_selection_outcome = Ok(());
1526 }
1527
1528 new_config = new_configs => {
1529 let Some(new_config) = new_config else {
1530 trace!("HS service {}: terminating due to EOF on config updates stream",
1531 &self.imm.nick);
1532 return Ok(ShutdownStatus::Terminate);
1533 };
1534 if let Err(why) = (|| {
1535 let dos = |config: &OnionServiceConfig| config.dos_extension()
1536 .map_err(|e| e.report().to_string());
1537 if dos(&self.state.current_config)? != dos(&new_config)? {
1538 return Err("DOS parameters (rate limit) changed".to_string());
1539 }
1540 Ok(())
1541 })() {
1542 // We need new IPTs with the new parameters. (The previously-published
1543 // IPTs will automatically be retained so long as needed, by the
1544 // rest of our algorithm.)
1545 info!("HS service {}: replacing IPTs: {}", &self.imm.nick, &why);
1546 for ir in &mut self.state.irelays {
1547 for ipt in &mut ir.ipts {
1548 ipt.is_current = None;
1549 }
1550 }
1551 }
1552 self.state.current_config = new_config;
1553 self.state.last_irelay_selection_outcome = Ok(());
1554 }
1555 }
1556
1557 Ok(ShutdownStatus::Continue)
1558 }
1559
1560 /// IPT Manager main loop, runs as a task
1561 ///
1562 /// Contains the error handling, including catching panics.
1563 async fn main_loop_task(mut self, mut publisher: IptsManagerView) {
1564 loop {
1565 match async {
1566 AssertUnwindSafe(self.run_once(&mut publisher))
1567 .catch_unwind()
1568 .await
1569 .map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))?
1570 }
1571 .await
1572 {
1573 Err(crash) => {
1574 error!("bug: HS service {} crashed! {}", &self.imm.nick, crash);
1575
1576 self.imm.status_tx.send_broken(crash);
1577 break;
1578 }
1579 Ok(ShutdownStatus::Continue) => continue,
1580 Ok(ShutdownStatus::Terminate) => {
1581 self.imm.status_tx.send_shutdown();
1582
1583 break;
1584 }
1585 }
1586 }
1587 }
1588}
1589
1590impl<R: Runtime, M: Mockable<R>> State<R, M> {
1591 /// Find the `Ipt` with persistent local id `lid`
1592 fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> {
1593 self.irelays
1594 .iter_mut()
1595 .find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle))
1596 }
1597
1598 /// Choose a new relay to use for IPTs
1599 fn choose_new_ipt_relay(
1600 &mut self,
1601 imm: &Immutable<R>,
1602 now: Instant,
1603 ) -> Result<(), ChooseIptError> {
1604 let netdir = imm.dirprovider.timely_netdir()?;
1605
1606 let mut rng = self.mockable.thread_rng();
1607
1608 let relay = {
1609 let exclude_ids = self
1610 .irelays
1611 .iter()
1612 .flat_map(|e| e.relay.identities())
1613 .map(|id| id.to_owned())
1614 .collect();
1615 let selector = RelaySelector::new(
1616 RelayUsage::new_intro_point(),
1617 RelayExclusion::exclude_identities(exclude_ids),
1618 );
1619 selector
1620 .select_relay(&mut rng, &netdir)
1621 .0 // TODO: Someday we might want to report why we rejected everything on failure.
1622 .ok_or(ChooseIptError::TooFewUsableRelays)?
1623 };
1624
1625 let lifetime_low = netdir
1626 .params()
1627 .hs_intro_min_lifetime
1628 .try_into()
1629 .expect("Could not convert param to duration.");
1630 let lifetime_high = netdir
1631 .params()
1632 .hs_intro_max_lifetime
1633 .try_into()
1634 .expect("Could not convert param to duration.");
1635 let lifetime_range: std::ops::RangeInclusive<Duration> = lifetime_low..=lifetime_high;
1636 let retirement = rng
1637 .gen_range_checked(lifetime_range)
1638 // If the range from the consensus is invalid, just pick the high-bound.
1639 .unwrap_or(lifetime_high);
1640 let retirement = now
1641 .checked_add(retirement)
1642 .ok_or(ChooseIptError::TimeOverflow)?;
1643
1644 let new_irelay = IptRelay {
1645 relay: RelayIds::from_relay_ids(&relay),
1646 planned_retirement: retirement,
1647 ipts: vec![],
1648 };
1649 self.irelays.push(new_irelay);
1650
1651 debug!(
1652 "HS service {}: choosing new IPT relay {}",
1653 &imm.nick,
1654 relay.display_relay_ids()
1655 );
1656
1657 Ok(())
1658 }
1659
1660 /// Update `self`'s status tracking for one introduction point
1661 fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) {
1662 let Some(ipt) = self.ipt_by_lid_mut(lid) else {
1663 // update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task)
1664 return;
1665 };
1666
1667 debug!("HS service {}: {lid:?} status update {update:?}", &imm.nick);
1668
1669 let IptStatus {
1670 status: update,
1671 wants_to_retire,
1672 ..
1673 } = update;
1674
1675 #[allow(clippy::single_match)] // want to be explicit about the Ok type
1676 match wants_to_retire {
1677 Err(IptWantsToRetire) => ipt.is_current = None,
1678 Ok(()) => {}
1679 }
1680
1681 let now = || imm.runtime.now();
1682
1683 let started = match &ipt.status_last {
1684 TS::Establishing { started, .. } => Ok(*started),
1685 TS::Faulty { started, .. } => *started,
1686 TS::Good { .. } => Err(()),
1687 };
1688
1689 ipt.status_last = match update {
1690 ISS::Establishing => TS::Establishing {
1691 started: started.unwrap_or_else(|()| now()),
1692 },
1693 ISS::Good(details) => {
1694 let time_to_establish = started.and_then(|started| {
1695 // return () at end of ok_or_else closure, for clarity
1696 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1697 now().checked_duration_since(started).ok_or_else(|| {
1698 warn!("monotonic clock went backwards! (HS IPT)");
1699 ()
1700 })
1701 });
1702 TS::Good {
1703 time_to_establish,
1704 details,
1705 }
1706 }
1707 ISS::Faulty(error) => TS::Faulty { started, error },
1708 };
1709 }
1710}
1711
1712//========== mockability ==========
1713
1714/// Mockable state for the IPT Manager
1715///
1716/// This allows us to use a fake IPT Establisher and IPT Publisher,
1717/// so that we can unit test the Manager.
1718pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static {
1719 /// IPT establisher type
1720 type IptEstablisher: Send + Sync + 'static;
1721
1722 /// A random number generator
1723 type Rng<'m>: rand::Rng + rand::CryptoRng + 'm;
1724
1725 /// Return a random number generator
1726 fn thread_rng(&mut self) -> Self::Rng<'_>;
1727
1728 /// Call `IptEstablisher::new`
1729 fn make_new_ipt(
1730 &mut self,
1731 imm: &Immutable<R>,
1732 params: IptParameters,
1733 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>;
1734
1735 /// Call `IptEstablisher::start_accepting`
1736 fn start_accepting(&self, establisher: &ErasedIptEstablisher);
1737
1738 /// Allow tests to see when [`IptManager::expire_old_ipts_external_persistent_state`]
1739 /// is called.
1740 ///
1741 /// This lets tests see that it gets called at the right times,
1742 /// and not the wrong ones.
1743 fn expire_old_ipts_external_persistent_state_hook(&self);
1744}
1745
1746impl<R: Runtime> Mockable<R> for Real<R> {
1747 type IptEstablisher = IptEstablisher;
1748
1749 /// A random number generator
1750 type Rng<'m> = rand::rngs::ThreadRng;
1751
1752 /// Return a random number generator
1753 fn thread_rng(&mut self) -> Self::Rng<'_> {
1754 rand::rng()
1755 }
1756
1757 fn make_new_ipt(
1758 &mut self,
1759 imm: &Immutable<R>,
1760 params: IptParameters,
1761 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1762 IptEstablisher::launch(&imm.runtime, params, self.circ_pool.clone(), &imm.keymgr)
1763 }
1764
1765 fn start_accepting(&self, establisher: &ErasedIptEstablisher) {
1766 let establisher: &IptEstablisher = <dyn Any>::downcast_ref(establisher)
1767 .expect("upcast failure, ErasedIptEstablisher is not IptEstablisher!");
1768 establisher.start_accepting();
1769 }
1770
1771 fn expire_old_ipts_external_persistent_state_hook(&self) {}
1772}
1773
1774// TODO #1213 add more unit tests for IptManager
1775// Especially, we want to exercise all code paths in idempotently_progress_things_now
1776
1777#[cfg(test)]
1778mod test {
1779 // @@ begin test lint list maintained by maint/add_warning @@
1780 #![allow(clippy::bool_assert_comparison)]
1781 #![allow(clippy::clone_on_copy)]
1782 #![allow(clippy::dbg_macro)]
1783 #![allow(clippy::mixed_attributes_style)]
1784 #![allow(clippy::print_stderr)]
1785 #![allow(clippy::print_stdout)]
1786 #![allow(clippy::single_char_pattern)]
1787 #![allow(clippy::unwrap_used)]
1788 #![allow(clippy::unchecked_time_subtraction)]
1789 #![allow(clippy::useless_vec)]
1790 #![allow(clippy::needless_pass_by_value)]
1791 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1792 #![allow(clippy::match_single_binding)] // false positives, need the lifetime extension
1793 use super::*;
1794
1795 use crate::config::OnionServiceConfigBuilder;
1796 use crate::ipt_establish::GoodIptDetails;
1797 use crate::status::{OnionServiceStatus, StatusSender};
1798 use crate::test::{create_keymgr, create_storage_handles_from_state_dir};
1799 use rand::SeedableRng as _;
1800 use slotmap_careful::DenseSlotMap;
1801 use std::collections::BTreeMap;
1802 use std::sync::Mutex;
1803 use test_temp_dir::{TestTempDir, test_temp_dir};
1804 use tor_basic_utils::test_rng::TestingRng;
1805 use tor_netdir::testprovider::TestNetDirProvider;
1806 use tor_rtmock::MockRuntime;
1807 use tracing_test::traced_test;
1808 use walkdir::WalkDir;
1809
1810 slotmap_careful::new_key_type! {
1811 struct MockEstabId;
1812 }
1813
1814 type MockEstabs = Arc<Mutex<DenseSlotMap<MockEstabId, MockEstabState>>>;
1815
1816 fn ms(ms: u64) -> Duration {
1817 Duration::from_millis(ms)
1818 }
1819
1820 #[derive(Debug)]
1821 struct Mocks {
1822 rng: TestingRng,
1823 estabs: MockEstabs,
1824 expect_expire_ipts_calls: Arc<Mutex<usize>>,
1825 }
1826
1827 #[derive(Debug)]
1828 struct MockEstabState {
1829 st_tx: watch::Sender<IptStatus>,
1830 params: IptParameters,
1831 }
1832
1833 #[derive(Debug)]
1834 struct MockEstab {
1835 esid: MockEstabId,
1836 estabs: MockEstabs,
1837 }
1838
1839 impl Mockable<MockRuntime> for Mocks {
1840 type IptEstablisher = MockEstab;
1841 type Rng<'m> = &'m mut TestingRng;
1842
1843 fn thread_rng(&mut self) -> Self::Rng<'_> {
1844 &mut self.rng
1845 }
1846
1847 fn make_new_ipt(
1848 &mut self,
1849 _imm: &Immutable<MockRuntime>,
1850 params: IptParameters,
1851 ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1852 let (st_tx, st_rx) = watch::channel();
1853 let estab = MockEstabState { st_tx, params };
1854 let esid = self.estabs.lock().unwrap().insert(estab);
1855 let estab = MockEstab {
1856 esid,
1857 estabs: self.estabs.clone(),
1858 };
1859 Ok((estab, st_rx))
1860 }
1861
1862 fn start_accepting(&self, _establisher: &ErasedIptEstablisher) {}
1863
1864 fn expire_old_ipts_external_persistent_state_hook(&self) {
1865 let mut expect = self.expect_expire_ipts_calls.lock().unwrap();
1866 eprintln!("expire_old_ipts_external_persistent_state_hook, expect={expect}");
1867 *expect = expect.checked_sub(1).expect("unexpected expiry");
1868 }
1869 }
1870
1871 impl Drop for MockEstab {
1872 fn drop(&mut self) {
1873 let mut estabs = self.estabs.lock().unwrap();
1874 let _: MockEstabState = estabs
1875 .remove(self.esid)
1876 .expect("dropping non-recorded MockEstab");
1877 }
1878 }
1879
1880 struct MockedIptManager<'d> {
1881 estabs: MockEstabs,
1882 pub_view: ipt_set::IptsPublisherView,
1883 shut_tx: broadcast::Sender<Void>,
1884 #[allow(dead_code)]
1885 cfg_tx: watch::Sender<Arc<OnionServiceConfig>>,
1886 #[allow(dead_code)] // ensures temp dir lifetime; paths stored in self
1887 temp_dir: &'d TestTempDir,
1888 expect_expire_ipts_calls: Arc<Mutex<usize>>, // use usize::MAX to not mind
1889 }
1890
1891 impl<'d> MockedIptManager<'d> {
1892 fn startup(
1893 runtime: MockRuntime,
1894 temp_dir: &'d TestTempDir,
1895 seed: u64,
1896 expect_expire_ipts_calls: usize,
1897 ) -> Self {
1898 let dir: TestNetDirProvider = tor_netdir::testnet::construct_netdir()
1899 .unwrap_if_sufficient()
1900 .unwrap()
1901 .into();
1902
1903 let nick: HsNickname = "nick".to_string().try_into().unwrap();
1904
1905 let cfg = OnionServiceConfigBuilder::default()
1906 .nickname(nick.clone())
1907 .build()
1908 .unwrap();
1909
1910 let (cfg_tx, cfg_rx) = watch::channel_with(Arc::new(cfg));
1911
1912 let (rend_tx, _rend_rx) = mpsc::channel(10);
1913 let (shut_tx, shut_rx) = broadcast::channel::<Void>(0);
1914
1915 let estabs: MockEstabs = Default::default();
1916 let expect_expire_ipts_calls = Arc::new(Mutex::new(expect_expire_ipts_calls));
1917
1918 let mocks = Mocks {
1919 rng: TestingRng::seed_from_u64(seed),
1920 estabs: estabs.clone(),
1921 expect_expire_ipts_calls: expect_expire_ipts_calls.clone(),
1922 };
1923
1924 // Don't provide a subdir; the ipt_mgr is supposed to add any needed subdirs
1925 let state_dir = temp_dir
1926 // untracked is OK because our return value captures 'd
1927 .subdir_untracked("state_dir");
1928
1929 let (state_handle, iptpub_state_handle) =
1930 create_storage_handles_from_state_dir(&state_dir, &nick);
1931
1932 let (mgr_view, pub_view) =
1933 ipt_set::ipts_channel(&runtime, iptpub_state_handle).unwrap();
1934
1935 let keymgr = create_keymgr(temp_dir);
1936 let keymgr = keymgr.into_untracked(); // OK because our return value captures 'd
1937 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
1938 let mgr = IptManager::new(
1939 runtime.clone(),
1940 Arc::new(dir),
1941 nick,
1942 cfg_rx,
1943 rend_tx,
1944 shut_rx,
1945 &state_handle,
1946 mocks,
1947 keymgr,
1948 status_tx,
1949 )
1950 .unwrap();
1951
1952 mgr.launch_background_tasks(mgr_view).unwrap();
1953
1954 MockedIptManager {
1955 estabs,
1956 pub_view,
1957 shut_tx,
1958 cfg_tx,
1959 temp_dir,
1960 expect_expire_ipts_calls,
1961 }
1962 }
1963
1964 async fn shutdown_check_no_tasks(self, runtime: &MockRuntime) {
1965 drop(self.shut_tx);
1966 runtime.progress_until_stalled().await;
1967 assert_eq!(runtime.mock_task().n_tasks(), 1); // just us
1968 }
1969
1970 fn estabs_inventory(&self) -> impl Eq + Debug + 'static + use<> {
1971 let estabs = self.estabs.lock().unwrap();
1972 estabs
1973 .values()
1974 .map(|MockEstabState { params: p, .. }| {
1975 (
1976 p.lid,
1977 (
1978 p.target.clone(),
1979 // We want to check the key values, but they're very hard to get at
1980 // in a way we can compare. Especially the private keys, for which
1981 // we can't getting a clone or copy of the private key material out of the Arc.
1982 // They're keypairs, we can use the debug rep which shows the public half.
1983 // That will have to do.
1984 format!("{:?}", p.k_sid),
1985 format!("{:?}", p.k_ntor),
1986 ),
1987 )
1988 })
1989 .collect::<BTreeMap<_, _>>()
1990 }
1991 }
1992
1993 #[test]
1994 #[traced_test]
1995 fn test_mgr_lifecycle() {
1996 MockRuntime::test_with_various(|runtime| async move {
1997 let temp_dir = test_temp_dir!();
1998
1999 let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 0, 1);
2000 runtime.progress_until_stalled().await;
2001
2002 assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2003
2004 // We expect it to try to establish 3 IPTs
2005 const EXPECT_N_IPTS: usize = 3;
2006 const EXPECT_MAX_IPTS: usize = EXPECT_N_IPTS + 2 /* num_extra */;
2007 assert_eq!(m.estabs.lock().unwrap().len(), EXPECT_N_IPTS);
2008 assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2009
2010 // Advancing time a bit and it still shouldn't publish anything
2011 runtime.advance_by(ms(500)).await;
2012 runtime.progress_until_stalled().await;
2013 assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2014
2015 let good = GoodIptDetails {
2016 link_specifiers: vec![],
2017 ipt_kp_ntor: [0x55; 32].into(),
2018 };
2019
2020 // Imagine that one of our IPTs becomes good
2021 m.estabs
2022 .lock()
2023 .unwrap()
2024 .values_mut()
2025 .next()
2026 .unwrap()
2027 .st_tx
2028 .borrow_mut()
2029 .status = IptStatusStatus::Good(good.clone());
2030
2031 // TODO #1213 test that we haven't called start_accepting
2032
2033 // It won't publish until a further fastest establish time
2034 // Ie, until a further 500ms = 1000ms
2035 runtime.progress_until_stalled().await;
2036 assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2037 runtime.advance_by(ms(499)).await;
2038 assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2039 runtime.advance_by(ms(1)).await;
2040 match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2041 pub_view => {
2042 assert_eq!(pub_view.ipts.len(), 1);
2043 assert_eq!(pub_view.lifetime, IPT_PUBLISH_UNCERTAIN);
2044 }
2045 };
2046
2047 // TODO #1213 test that we have called start_accepting on the right IPTs
2048
2049 // Set the other IPTs to be Good too
2050 for e in m.estabs.lock().unwrap().values_mut().skip(1) {
2051 e.st_tx.borrow_mut().status = IptStatusStatus::Good(good.clone());
2052 }
2053 runtime.progress_until_stalled().await;
2054 match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2055 pub_view => {
2056 assert_eq!(pub_view.ipts.len(), EXPECT_N_IPTS);
2057 assert_eq!(pub_view.lifetime, IPT_PUBLISH_CERTAIN);
2058 }
2059 };
2060
2061 // TODO #1213 test that we have called start_accepting on the right IPTs
2062
2063 let estabs_inventory = m.estabs_inventory();
2064
2065 // Shut down
2066 m.shutdown_check_no_tasks(&runtime).await;
2067
2068 // ---------- restart! ----------
2069 info!("*** Restarting ***");
2070
2071 let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 1, 1);
2072 runtime.progress_until_stalled().await;
2073 assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2074
2075 assert_eq!(estabs_inventory, m.estabs_inventory());
2076
2077 // TODO #1213 test that we have called start_accepting on all the old IPTs
2078
2079 // ---------- New IPT relay selection ----------
2080
2081 let old_lids: Vec<String> = m
2082 .estabs
2083 .lock()
2084 .unwrap()
2085 .values()
2086 .map(|ess| ess.params.lid.to_string())
2087 .collect();
2088 eprintln!("IPTs to rotate out: {old_lids:?}");
2089
2090 let old_lid_files = || {
2091 WalkDir::new(temp_dir.as_path_untracked())
2092 .into_iter()
2093 .map(|ent| {
2094 ent.unwrap()
2095 .into_path()
2096 .into_os_string()
2097 .into_string()
2098 .unwrap()
2099 })
2100 .filter(|path| old_lids.iter().any(|lid| path.contains(lid)))
2101 .collect_vec()
2102 };
2103
2104 let no_files: [String; 0] = [];
2105
2106 assert_ne!(old_lid_files(), no_files);
2107
2108 // It might call the expiry function once, or once per IPT.
2109 // The latter is quadratic but this is quite rare, so that's fine.
2110 *m.expect_expire_ipts_calls.lock().unwrap() = EXPECT_MAX_IPTS;
2111
2112 // wait 2 days, > hs_intro_max_lifetime
2113 runtime.advance_by(ms(48 * 60 * 60 * 1_000)).await;
2114 runtime.progress_until_stalled().await;
2115
2116 // It must have called it at least once.
2117 assert_ne!(*m.expect_expire_ipts_calls.lock().unwrap(), EXPECT_MAX_IPTS);
2118
2119 // There should now be no files names after old IptLocalIds.
2120 assert_eq!(old_lid_files(), no_files);
2121
2122 // Shut down
2123 m.shutdown_check_no_tasks(&runtime).await;
2124 });
2125 }
2126}