Skip to main content

tor_dirmgr/
state.rs

1//! Implementation for the primary directory state machine.
2//!
3//! There are three (active) states that a download can be in: looking
4//! for a consensus ([`GetConsensusState`]), looking for certificates
5//! to validate that consensus ([`GetCertsState`]), and looking for
6//! microdescriptors ([`GetMicrodescsState`]).
7//!
8//! These states have no contact with the network, and are purely
9//! reactive to other code that drives them.  See the
10//! [`bootstrap`](crate::bootstrap) module for functions that actually
11//! load or download directory information.
12
13use std::collections::{HashMap, HashSet};
14use std::fmt::Debug;
15use std::mem;
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, SystemTime};
18use time::OffsetDateTime;
19use tor_basic_utils::RngExt as _;
20use tor_dircommon::retry::DownloadSchedule;
21use tor_error::{internal, warn_report};
22use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
23use tor_netdoc::doc::authcert::UncheckedAuthCert;
24use tor_netdoc::doc::netstatus::{Lifetime, ProtoStatuses};
25use tracing::{debug, warn};
26
27use crate::event::DirProgress;
28
29use crate::storage::DynStore;
30use crate::{
31    CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
32    docmeta::{AuthCertMeta, ConsensusMeta},
33    event,
34};
35use crate::{DocSource, SharedMutArc};
36use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
37#[cfg(feature = "geoip")]
38use tor_geoip::GeoipDb;
39use tor_llcrypto::pk::rsa::RsaIdentity;
40use tor_netdoc::doc::{
41    microdesc::{MdDigest, Microdesc},
42    netstatus::MdConsensus,
43};
44use tor_netdoc::{
45    AllowAnnotations,
46    doc::{
47        authcert::{AuthCert, AuthCertKeyIds},
48        microdesc::MicrodescReader,
49        netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
50    },
51};
52use tor_rtcompat::Runtime;
53
54/// A change to the currently running `NetDir`, returned by the state machines in this module.
55#[derive(Debug)]
56pub(crate) enum NetDirChange<'a> {
57    /// If the provided `NetDir` is suitable for use (i.e. the caller determines it can build
58    /// circuits with it), replace the current `NetDir` with it.
59    ///
60    /// The caller must call `DirState::on_netdir_replaced` if the replace was successful.
61    AttemptReplace {
62        /// The netdir to replace the current one with, if it's usable.
63        ///
64        /// The `Option` is always `Some` when returned from the state machine; it's there
65        /// so that the caller can call `.take()` to avoid cloning the netdir.
66        netdir: &'a mut Option<NetDir>,
67        /// The consensus metadata for this netdir.
68        consensus_meta: &'a ConsensusMeta,
69    },
70    /// Add the provided microdescriptors to the current `NetDir`.
71    AddMicrodescs(&'a mut Vec<Microdesc>),
72    /// Replace the recommended set of subprotocols.
73    SetRequiredProtocol {
74        /// The time at which the protocol statuses were recommended
75        timestamp: SystemTime,
76        /// The recommended set of protocols.
77        protos: Arc<ProtoStatuses>,
78    },
79}
80
81/// A "state" object used to represent our progress in downloading a
82/// directory.
83///
84/// These state objects are not meant to know about the network, or
85/// how to fetch documents at all.  Instead, they keep track of what
86/// information they are missing, and what to do when they get that
87/// information.
88///
89/// Every state object has two possible transitions: "resetting", and
90/// "advancing".  Advancing happens when a state has no more work to
91/// do, and needs to transform into a different kind of object.
92/// Resetting happens when this state needs to go back to an initial
93/// state in order to start over -- either because of an error or
94/// because the information it has downloaded is no longer timely.
95pub(crate) trait DirState: Send {
96    /// Return a human-readable description of this state.
97    fn describe(&self) -> String;
98    /// Return a list of the documents we're missing.
99    ///
100    /// If every document on this list were to be loaded or downloaded, then
101    /// the state should either become "ready to advance", or "complete."
102    ///
103    /// This list should never _grow_ on a given state; only advancing
104    /// or resetting the state should add new DocIds that weren't
105    /// there before.
106    fn missing_docs(&self) -> Vec<DocId>;
107    /// Describe whether this state has reached `ready` status.
108    fn is_ready(&self, ready: Readiness) -> bool;
109    /// If the state object wants to make changes to the currently running `NetDir`,
110    /// return the proposed changes.
111    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
112        None
113    }
114    /// Return true if this state can advance to another state via its
115    /// `advance` method.
116    fn can_advance(&self) -> bool;
117    /// Add one or more documents from our cache; returns 'true' if there
118    /// was any change in this state.
119    ///
120    /// Set `changed` to true if any semantic changes in this state were made.
121    ///
122    /// An error return does not necessarily mean that no data was added;
123    /// partial successes are possible.
124    fn add_from_cache(
125        &mut self,
126        docs: HashMap<DocId, DocumentText>,
127        changed: &mut bool,
128    ) -> Result<()>;
129
130    /// Add information that we have just downloaded to this state.
131    ///
132    /// This method receives a copy of the original request, and should reject
133    /// any documents that do not pertain to it.
134    ///
135    /// If `storage` is provided, then we should write any accepted documents
136    /// into `storage` so they can be saved in a cache.
137    ///
138    /// Set `changed` to true if any semantic changes in this state were made.
139    ///
140    /// An error return does not necessarily mean that no data was added;
141    /// partial successes are possible.
142    fn add_from_download(
143        &mut self,
144        text: &str,
145        request: &ClientRequest,
146        source: DocSource,
147        storage: Option<&Mutex<DynStore>>,
148        changed: &mut bool,
149    ) -> Result<()>;
150    /// Return a summary of this state as a [`DirProgress`].
151    fn bootstrap_progress(&self) -> event::DirProgress;
152    /// Return a configuration for attempting downloads.
153    fn dl_config(&self) -> DownloadSchedule;
154    /// If possible, advance to the next state.
155    fn advance(self: Box<Self>) -> Box<dyn DirState>;
156    /// Return a time (if any) when downloaders should stop attempting to
157    /// advance this state, and should instead reset it and start over.
158    fn reset_time(&self) -> Option<SystemTime>;
159    /// Reset this state and start over.
160    fn reset(self: Box<Self>) -> Box<dyn DirState>;
161}
162
163/// An object that can provide a previous netdir for the bootstrapping state machines to use.
164pub(crate) trait PreviousNetDir: Send + Sync + 'static + Debug {
165    /// Get the previous netdir, if there still is one.
166    fn get_netdir(&self) -> Option<Arc<NetDir>>;
167}
168
169impl PreviousNetDir for SharedMutArc<NetDir> {
170    fn get_netdir(&self) -> Option<Arc<NetDir>> {
171        self.get()
172    }
173}
174
175/// Initial state: fetching or loading a consensus directory.
176#[derive(Clone, Debug)]
177pub(crate) struct GetConsensusState<R: Runtime> {
178    /// How should we get the consensus from the cache, if at all?
179    cache_usage: CacheUsage,
180
181    /// If present, a time after which we want our consensus to have
182    /// been published.
183    //
184    // TODO: This is not yet used everywhere it could be.  In the future maybe
185    // it should be inserted into the DocId::LatestConsensus  alternative rather
186    // than being recalculated in make_consensus_request,
187    after: Option<SystemTime>,
188
189    /// If present, our next state.
190    ///
191    /// (This is present once we have a consensus.)
192    next: Option<GetCertsState<R>>,
193
194    /// A list of RsaIdentity for the authorities that we believe in.
195    ///
196    /// No consensus can be valid unless it purports to be signed by
197    /// more than half of these authorities.
198    authority_ids: Vec<RsaIdentity>,
199
200    /// A `Runtime` implementation.
201    rt: R,
202    /// The configuration of the directory manager. Used for download configuration
203    /// purposes.
204    config: Arc<DirMgrConfig>,
205    /// If one exists, the netdir we're trying to update.
206    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
207
208    /// A filter that gets applied to directory objects before we use them.
209    #[cfg(feature = "dirfilter")]
210    filter: Arc<dyn crate::filter::DirFilter>,
211}
212
213impl<R: Runtime> GetConsensusState<R> {
214    /// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as
215    /// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that
216    /// directory may be used to complete the next one.
217    pub(crate) fn new(
218        rt: R,
219        config: Arc<DirMgrConfig>,
220        cache_usage: CacheUsage,
221        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
222        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
223    ) -> Self {
224        let authority_ids = config.authorities().v3idents().clone();
225        let after = prev_netdir
226            .as_ref()
227            .and_then(|x| x.get_netdir())
228            .map(|nd| nd.lifetime().valid_after());
229
230        GetConsensusState {
231            cache_usage,
232            after,
233            next: None,
234            authority_ids,
235            rt,
236            config,
237            prev_netdir,
238            #[cfg(feature = "dirfilter")]
239            filter,
240        }
241    }
242}
243
244impl<R: Runtime> DirState for GetConsensusState<R> {
245    fn describe(&self) -> String {
246        if self.next.is_some() {
247            "About to fetch certificates."
248        } else {
249            match self.cache_usage {
250                CacheUsage::CacheOnly => "Looking for a cached consensus.",
251                CacheUsage::CacheOkay => "Looking for a consensus.",
252                CacheUsage::MustDownload => "Downloading a consensus.",
253            }
254        }
255        .to_string()
256    }
257    fn missing_docs(&self) -> Vec<DocId> {
258        if self.can_advance() {
259            return Vec::new();
260        }
261        let flavor = ConsensusFlavor::Microdesc;
262        vec![DocId::LatestConsensus {
263            flavor,
264            cache_usage: self.cache_usage,
265        }]
266    }
267    fn is_ready(&self, _ready: Readiness) -> bool {
268        false
269    }
270    fn can_advance(&self) -> bool {
271        self.next.is_some()
272    }
273    fn bootstrap_progress(&self) -> DirProgress {
274        if let Some(next) = &self.next {
275            next.bootstrap_progress()
276        } else {
277            DirProgress::NoConsensus { after: self.after }
278        }
279    }
280    fn dl_config(&self) -> DownloadSchedule {
281        self.config.schedule.retry_consensus()
282    }
283    fn add_from_cache(
284        &mut self,
285        docs: HashMap<DocId, DocumentText>,
286        changed: &mut bool,
287    ) -> Result<()> {
288        let text = match docs.into_iter().next() {
289            None => return Ok(()),
290            Some((
291                DocId::LatestConsensus {
292                    flavor: ConsensusFlavor::Microdesc,
293                    ..
294                },
295                text,
296            )) => text,
297            _ => return Err(Error::CacheCorruption("Not an md consensus")),
298        };
299
300        let source = DocSource::LocalCache;
301
302        self.add_consensus_text(
303            source,
304            text.as_str().map_err(Error::BadUtf8InCache)?,
305            None,
306            changed,
307        )?;
308        Ok(())
309    }
310    fn add_from_download(
311        &mut self,
312        text: &str,
313        request: &ClientRequest,
314        source: DocSource,
315        storage: Option<&Mutex<DynStore>>,
316        changed: &mut bool,
317    ) -> Result<()> {
318        let requested_newer_than = match request {
319            ClientRequest::Consensus(r) => r.last_consensus_date(),
320            _ => None,
321        };
322        let meta = self.add_consensus_text(source, text, requested_newer_than, changed)?;
323
324        if let Some(store) = storage {
325            let mut w = store.lock().expect("Directory storage lock poisoned");
326            w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
327        }
328        Ok(())
329    }
330    fn advance(self: Box<Self>) -> Box<dyn DirState> {
331        match self.next {
332            Some(next) => Box::new(next),
333            None => self,
334        }
335    }
336    fn reset_time(&self) -> Option<SystemTime> {
337        None
338    }
339    fn reset(self: Box<Self>) -> Box<dyn DirState> {
340        self
341    }
342}
343
344impl<R: Runtime> GetConsensusState<R> {
345    /// Helper: try to set the current consensus text from an input string
346    /// `text`.  Refuse it if the authorities could never be correct, or if it
347    /// is ill-formed.
348    ///
349    /// If `cutoff` is provided, treat any consensus older than `cutoff` as
350    /// older-than-requested.
351    ///
352    /// Errors from this method are not fatal to the download process.
353    fn add_consensus_text(
354        &mut self,
355        source: DocSource,
356        text: &str,
357        cutoff: Option<SystemTime>,
358        changed: &mut bool,
359    ) -> Result<&ConsensusMeta> {
360        // Try to parse it and get its metadata.
361        let (consensus_meta, unvalidated) = {
362            let (signedval, remainder, parsed) =
363                MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
364            #[cfg(feature = "dirfilter")]
365            let parsed = self.filter.filter_consensus(parsed)?;
366            let parsed = self.config.tolerance.extend_tolerance(parsed);
367            let now = self.rt.wallclock();
368            let timely = parsed.check_valid_at(&now)?;
369            if let Some(cutoff) = cutoff {
370                if timely.peek_lifetime().valid_after() < cutoff {
371                    return Err(Error::Unwanted("consensus was older than requested"));
372                }
373            }
374            let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
375            (meta, timely)
376        };
377
378        // Check out what authorities we believe in, and see if enough
379        // of them are purported to have signed this consensus.
380        let unvalidated = unvalidated.set_n_authorities(self.authority_ids.len());
381
382        let id_refs: Vec<_> = self.authority_ids.iter().collect();
383        if !unvalidated.authorities_are_correct(&id_refs[..]) {
384            return Err(Error::UnrecognizedAuthorities);
385        }
386        // Yes, we've added the consensus.  That's a change.
387        *changed = true;
388
389        // Make a set of all the certificates we want -- the subset of
390        // those listed on the consensus that we would indeed accept as
391        // authoritative.
392        let desired_certs = unvalidated
393            .signing_cert_ids()
394            .filter(|m| self.recognizes_authority(&m.id_fingerprint))
395            .collect();
396
397        self.next = Some(GetCertsState {
398            cache_usage: self.cache_usage,
399            consensus_source: source,
400            consensus: GetCertsConsensus::Unvalidated(unvalidated),
401            consensus_meta,
402            missing_certs: desired_certs,
403            certs: Vec::new(),
404            rt: self.rt.clone(),
405            config: self.config.clone(),
406            prev_netdir: self.prev_netdir.take(),
407            protocol_statuses: None,
408            #[cfg(feature = "dirfilter")]
409            filter: self.filter.clone(),
410        });
411
412        // Unwrap should be safe because `next` was just assigned
413        #[allow(clippy::unwrap_used)]
414        Ok(&self.next.as_ref().unwrap().consensus_meta)
415    }
416
417    /// Return true if `id` is an authority identity we recognize
418    fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
419        self.authority_ids.iter().any(|auth| auth == id)
420    }
421}
422
423/// One of two possible internal states for the consensus in a GetCertsState.
424///
425/// This inner object is advanced by `try_checking_sigs`.
426#[derive(Clone, Debug)]
427enum GetCertsConsensus {
428    /// We have an unvalidated consensus; we haven't checked its signatures.
429    Unvalidated(UnvalidatedMdConsensus),
430    /// A validated consensus: the signatures are fine and we can advance.
431    Validated(MdConsensus),
432    /// We failed to validate the consensus, even after getting enough certificates.
433    Failed,
434}
435
436/// Second state: fetching or loading authority certificates.
437///
438/// TODO: we should probably do what C tor does, and try to use the
439/// same directory that gave us the consensus.
440///
441/// TODO SECURITY: This needs better handling for the DOS attack where
442/// we are given a bad consensus signed with fictional certificates
443/// that we can never find.
444#[derive(Clone, Debug)]
445struct GetCertsState<R: Runtime> {
446    /// The cache usage we had in mind when we began.  Used to reset.
447    cache_usage: CacheUsage,
448    /// Where did we get our consensus?
449    consensus_source: DocSource,
450    /// The consensus that we are trying to validate, or an error if we've given
451    /// up on validating it.
452    consensus: GetCertsConsensus,
453    /// Metadata for the consensus.
454    consensus_meta: ConsensusMeta,
455    /// A set of the certificate keypairs for the certificates we don't
456    /// have yet.
457    missing_certs: HashSet<AuthCertKeyIds>,
458    /// A list of the certificates we've been able to load or download.
459    certs: Vec<AuthCert>,
460
461    /// A `Runtime` implementation.
462    rt: R,
463    /// The configuration of the directory manager. Used for download configuration
464    /// purposes.
465    config: Arc<DirMgrConfig>,
466    /// If one exists, the netdir we're trying to update.
467    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
468
469    /// If present a set of protocols to install as our latest recommended set.
470    protocol_statuses: Option<(SystemTime, Arc<ProtoStatuses>)>,
471
472    /// A filter that gets applied to directory objects before we use them.
473    #[cfg(feature = "dirfilter")]
474    filter: Arc<dyn crate::filter::DirFilter>,
475}
476
477impl<R: Runtime> GetCertsState<R> {
478    /// Handle a certificate result returned by `tor_netdoc`: checking it for timeliness
479    /// and well-signedness.
480    ///
481    /// On success return the `AuthCert` and the string that represents it within the string `within`.
482    /// On failure, return an error.
483    fn check_parsed_certificate<'s>(
484        &self,
485        parsed: tor_netdoc::Result<UncheckedAuthCert>,
486        source: &DocSource,
487        within: &'s str,
488    ) -> Result<(AuthCert, &'s str)> {
489        let parsed = parsed.map_err(|e| Error::from_netdoc(source.clone(), e))?;
490        let cert_text = parsed
491            .within(within)
492            .expect("Certificate was not in input as expected");
493        let wellsigned = parsed.check_signature()?;
494        let now = self.rt.wallclock();
495        let timely_cert = self
496            .config
497            .tolerance
498            .extend_tolerance(wellsigned)
499            .check_valid_at(&now)?;
500        Ok((timely_cert, cert_text))
501    }
502
503    /// If we have enough certificates, and we have not yet checked the
504    /// signatures on the consensus, try checking them.
505    ///
506    /// If the consensus is valid, remove the unvalidated consensus from `self`
507    /// and put the validated consensus there instead.
508    ///
509    /// If the consensus is invalid, throw it out set a blocking error.
510    fn try_checking_sigs(&mut self) -> Result<()> {
511        use GetCertsConsensus as C;
512        // Temporary value; we'll replace the consensus field with something
513        // better before the method returns.
514        let mut consensus = C::Failed;
515        std::mem::swap(&mut consensus, &mut self.consensus);
516
517        let unvalidated = match consensus {
518            C::Unvalidated(uv) if uv.key_is_correct(&self.certs[..]).is_ok() => uv,
519            _ => {
520                // nothing to check at this point.  Either we already checked the consensus, or we don't yet have enough certificates.
521                self.consensus = consensus;
522                return Ok(());
523            }
524        };
525
526        let (new_consensus, outcome) = match unvalidated.check_signature(&self.certs[..]) {
527            Ok(validated) => (C::Validated(validated), Ok(())),
528            Err(cause) => (
529                C::Failed,
530                Err(Error::ConsensusInvalid {
531                    source: self.consensus_source.clone(),
532                    cause,
533                }),
534            ),
535        };
536        self.consensus = new_consensus;
537
538        // Update our protocol recommendations if we have a validated consensus,
539        // and if we haven't already updated our recommendations.
540        if let GetCertsConsensus::Validated(v) = &self.consensus {
541            if self.protocol_statuses.is_none() {
542                let protoset: &Arc<ProtoStatuses> = v.protocol_statuses();
543                self.protocol_statuses = Some((
544                    self.consensus_meta.lifetime().valid_after(),
545                    Arc::clone(protoset),
546                ));
547            }
548        }
549
550        outcome
551    }
552}
553
554impl<R: Runtime> DirState for GetCertsState<R> {
555    fn describe(&self) -> String {
556        use GetCertsConsensus as C;
557        match &self.consensus {
558            C::Unvalidated(_) => {
559                let total = self.certs.len() + self.missing_certs.len();
560                format!(
561                    "Downloading certificates for consensus (we are missing {}/{}).",
562                    self.missing_certs.len(),
563                    total
564                )
565            }
566            C::Validated(_) => "Validated consensus; about to get microdescriptors".to_string(),
567            C::Failed => "Failed to validate consensus".to_string(),
568        }
569    }
570    fn missing_docs(&self) -> Vec<DocId> {
571        self.missing_certs
572            .iter()
573            .map(|id| DocId::AuthCert(*id))
574            .collect()
575    }
576    fn is_ready(&self, _ready: Readiness) -> bool {
577        false
578    }
579    fn can_advance(&self) -> bool {
580        matches!(self.consensus, GetCertsConsensus::Validated(_))
581    }
582    fn bootstrap_progress(&self) -> DirProgress {
583        let n_certs = self.certs.len();
584        let n_missing_certs = self.missing_certs.len();
585        let total_certs = n_missing_certs + n_certs;
586        DirProgress::FetchingCerts {
587            lifetime: self.consensus_meta.lifetime().clone(),
588            usable_lifetime: self
589                .config
590                .tolerance
591                .extend_lifetime(self.consensus_meta.lifetime()),
592
593            n_certs: (n_certs as u16, total_certs as u16),
594        }
595    }
596    fn dl_config(&self) -> DownloadSchedule {
597        self.config.schedule.retry_certs()
598    }
599    fn add_from_cache(
600        &mut self,
601        docs: HashMap<DocId, DocumentText>,
602        changed: &mut bool,
603    ) -> Result<()> {
604        // Here we iterate over the documents we want, taking them from
605        // our input and remembering them.
606        let source = DocSource::LocalCache;
607        let mut nonfatal_error = None;
608        for id in &self.missing_docs() {
609            if let Some(cert) = docs.get(id) {
610                let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
611                let parsed = AuthCert::parse(text);
612                match self.check_parsed_certificate(parsed, &source, text) {
613                    Ok((cert, _text)) => {
614                        self.missing_certs.remove(&cert.key_ids());
615                        self.certs.push(cert);
616                        *changed = true;
617                    }
618                    Err(e) => {
619                        nonfatal_error.get_or_insert(e);
620                    }
621                }
622            }
623        }
624        if *changed {
625            self.try_checking_sigs()?;
626        }
627        opt_err_to_result(nonfatal_error)
628    }
629    fn add_from_download(
630        &mut self,
631        text: &str,
632        request: &ClientRequest,
633        source: DocSource,
634        storage: Option<&Mutex<DynStore>>,
635        changed: &mut bool,
636    ) -> Result<()> {
637        let asked_for: HashSet<_> = match request {
638            ClientRequest::AuthCert(a) => a.keys().collect(),
639            _ => return Err(internal!("expected an AuthCert request").into()),
640        };
641
642        let mut nonfatal_error = None;
643        let mut newcerts = Vec::new();
644        for cert in
645            AuthCert::parse_multiple(text).map_err(|e| Error::from_netdoc(source.clone(), e))?
646        {
647            match self.check_parsed_certificate(cert, &source, text) {
648                Ok((cert, cert_text)) => {
649                    newcerts.push((cert, cert_text));
650                }
651                Err(e) => {
652                    warn_report!(e, "Problem with certificate received from {}", &source);
653                    nonfatal_error.get_or_insert(e);
654                }
655            }
656        }
657
658        // Now discard any certs we didn't ask for.
659        let len_orig = newcerts.len();
660        newcerts.retain(|(cert, _)| asked_for.contains(&cert.key_ids()));
661        if newcerts.len() != len_orig {
662            warn!(
663                "Discarding certificates from {} that we didn't ask for.",
664                source
665            );
666            nonfatal_error.get_or_insert(Error::Unwanted("Certificate we didn't request"));
667        }
668
669        // We want to exit early if we aren't saving any certificates.
670        if newcerts.is_empty() {
671            return opt_err_to_result(nonfatal_error);
672        }
673
674        if let Some(store) = storage {
675            // Write the certificates to the store.
676            let v: Vec<_> = newcerts[..]
677                .iter()
678                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
679                .collect();
680            let mut w = store.lock().expect("Directory storage lock poisoned");
681            w.store_authcerts(&v[..])?;
682        }
683
684        // Remember the certificates in this state, and remove them
685        // from our list of missing certs.
686        for (cert, _) in newcerts {
687            let ids = cert.key_ids();
688            if self.missing_certs.contains(&ids) {
689                self.missing_certs.remove(&ids);
690                self.certs.push(cert);
691                *changed = true;
692            }
693        }
694
695        if *changed {
696            self.try_checking_sigs()?;
697        }
698        opt_err_to_result(nonfatal_error)
699    }
700
701    fn advance(self: Box<Self>) -> Box<dyn DirState> {
702        use GetCertsConsensus::*;
703        match self.consensus {
704            Validated(validated) => Box::new(GetMicrodescsState::new(
705                self.cache_usage,
706                validated,
707                self.consensus_meta,
708                self.rt,
709                self.config,
710                self.prev_netdir,
711                #[cfg(feature = "dirfilter")]
712                self.filter,
713            )),
714            _ => self,
715        }
716    }
717
718    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
719        self.protocol_statuses.as_ref().map(|(timestamp, protos)| {
720            NetDirChange::SetRequiredProtocol {
721                timestamp: *timestamp,
722                protos: Arc::clone(protos),
723            }
724        })
725    }
726
727    fn reset_time(&self) -> Option<SystemTime> {
728        Some(
729            self.consensus_meta.lifetime().valid_until()
730                + self.config.tolerance.post_valid_tolerance(),
731        )
732    }
733    fn reset(self: Box<Self>) -> Box<dyn DirState> {
734        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
735            // Cache only means we can't ever download.
736            CacheUsage::CacheOnly
737        } else {
738            // If we reset in this state, we should always go to "must
739            // download": Either we've failed to get the certs we needed, or we
740            // have found that the consensus wasn't valid.  Either case calls
741            // for a fresh consensus download attempt.
742            CacheUsage::MustDownload
743        };
744
745        Box::new(GetConsensusState::new(
746            self.rt,
747            self.config,
748            cache_usage,
749            self.prev_netdir,
750            #[cfg(feature = "dirfilter")]
751            self.filter,
752        ))
753    }
754}
755
756/// Final state: we're fetching or loading microdescriptors
757#[derive(Debug, Clone)]
758struct GetMicrodescsState<R: Runtime> {
759    /// How should we get the consensus from the cache, if at all?
760    cache_usage: CacheUsage,
761    /// Total number of microdescriptors listed in the consensus.
762    n_microdescs: usize,
763    /// The current status of our netdir.
764    partial: PendingNetDir,
765    /// Metadata for the current consensus.
766    meta: ConsensusMeta,
767    /// A pending list of microdescriptor digests whose
768    /// "last-listed-at" times we should update.
769    newly_listed: Vec<MdDigest>,
770    /// A time after which we should try to replace this directory and
771    /// find a new one.  Since this is randomized, we only compute it
772    /// once.
773    reset_time: SystemTime,
774
775    /// A `Runtime` implementation.
776    rt: R,
777    /// The configuration of the directory manager. Used for download configuration
778    /// purposes.
779    config: Arc<DirMgrConfig>,
780    /// If one exists, the netdir we're trying to update.
781    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
782
783    /// A filter that gets applied to directory objects before we use them.
784    #[cfg(feature = "dirfilter")]
785    filter: Arc<dyn crate::filter::DirFilter>,
786}
787
788/// Information about a network directory that might not be ready to become _the_ current network
789/// directory.
790#[derive(Debug, Clone)]
791enum PendingNetDir {
792    /// A NetDir for which we have a consensus, but not enough microdescriptors.
793    Partial(PartialNetDir),
794    /// A NetDir we're either trying to get our caller to replace, or that the caller
795    /// has already taken from us.
796    ///
797    /// After the netdir gets taken, the `collected_microdescs` and `missing_microdescs`
798    /// fields get used. Before then, we just do operations on the netdir.
799    Yielding {
800        /// The actual netdir. This starts out as `Some`, but our caller can `take()` it
801        /// from us.
802        netdir: Option<NetDir>,
803        /// Microdescs we have collected in order to yield to our caller.
804        collected_microdescs: Vec<Microdesc>,
805        /// Which microdescs we need for the netdir that either is or used to be in `netdir`.
806        ///
807        /// NOTE(eta): This MUST always match the netdir's own idea of which microdescs we need.
808        ///            We do this by copying the netdir's missing microdescs into here when we
809        ///            instantiate it.
810        ///            (This code assumes that it doesn't add more needed microdescriptors later!)
811        missing_microdescs: HashSet<MdDigest>,
812        /// The time at which we should renew this netdir, assuming we have
813        /// driven it to a "usable" state.
814        replace_dir_time: SystemTime,
815    },
816    /// A dummy value, so we can use `mem::replace`.
817    Dummy,
818}
819
820impl MdReceiver for PendingNetDir {
821    fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
822        match self {
823            PendingNetDir::Partial(partial) => partial.missing_microdescs(),
824            PendingNetDir::Yielding {
825                netdir,
826                missing_microdescs,
827                ..
828            } => {
829                if let Some(nd) = netdir.as_ref() {
830                    nd.missing_microdescs()
831                } else {
832                    Box::new(missing_microdescs.iter())
833                }
834            }
835            PendingNetDir::Dummy => unreachable!(),
836        }
837    }
838
839    fn add_microdesc(&mut self, md: Microdesc) -> bool {
840        match self {
841            PendingNetDir::Partial(partial) => partial.add_microdesc(md),
842            PendingNetDir::Yielding {
843                netdir,
844                missing_microdescs,
845                collected_microdescs,
846                ..
847            } => {
848                let wanted = missing_microdescs.remove(md.digest());
849                if let Some(nd) = netdir.as_mut() {
850                    let nd_wanted = nd.add_microdesc(md);
851                    // This shouldn't ever happen; if it does, our invariants are violated.
852                    debug_assert_eq!(wanted, nd_wanted);
853                    nd_wanted
854                } else {
855                    collected_microdescs.push(md);
856                    wanted
857                }
858            }
859            PendingNetDir::Dummy => unreachable!(),
860        }
861    }
862
863    fn n_missing(&self) -> usize {
864        match self {
865            PendingNetDir::Partial(partial) => partial.n_missing(),
866            PendingNetDir::Yielding {
867                netdir,
868                missing_microdescs,
869                ..
870            } => {
871                if let Some(nd) = netdir.as_ref() {
872                    // This shouldn't ever happen; if it does, our invariants are violated.
873                    debug_assert_eq!(nd.n_missing(), missing_microdescs.len());
874                    nd.n_missing()
875                } else {
876                    missing_microdescs.len()
877                }
878            }
879            PendingNetDir::Dummy => unreachable!(),
880        }
881    }
882}
883
884impl PendingNetDir {
885    /// If this PendingNetDir is Partial and could not be partial, upgrade it.
886    fn upgrade_if_necessary(&mut self) {
887        if matches!(self, PendingNetDir::Partial(..)) {
888            match mem::replace(self, PendingNetDir::Dummy) {
889                PendingNetDir::Partial(p) => match p.unwrap_if_sufficient() {
890                    Ok(nd) => {
891                        let missing: HashSet<_> = nd.missing_microdescs().copied().collect();
892                        let replace_dir_time = pick_download_time(nd.lifetime());
893                        debug!(
894                            "Consensus now usable, with {} microdescriptors missing. \
895                                The current consensus is fresh until {}, and valid until {}. \
896                                I've picked {} as the earliest time to replace it.",
897                            missing.len(),
898                            OffsetDateTime::from(nd.lifetime().fresh_until()),
899                            OffsetDateTime::from(nd.lifetime().valid_until()),
900                            OffsetDateTime::from(replace_dir_time)
901                        );
902                        *self = PendingNetDir::Yielding {
903                            netdir: Some(nd),
904                            collected_microdescs: vec![],
905                            missing_microdescs: missing,
906                            replace_dir_time,
907                        };
908                    }
909                    Err(p) => {
910                        *self = PendingNetDir::Partial(p);
911                    }
912                },
913                _ => unreachable!(),
914            }
915        }
916        assert!(!matches!(self, PendingNetDir::Dummy));
917    }
918}
919
920impl<R: Runtime> GetMicrodescsState<R> {
921    /// Create a new [`GetMicrodescsState`] from a provided
922    /// microdescriptor consensus.
923    fn new(
924        cache_usage: CacheUsage,
925        consensus: MdConsensus,
926        meta: ConsensusMeta,
927        rt: R,
928        config: Arc<DirMgrConfig>,
929        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
930        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
931    ) -> Self {
932        let reset_time =
933            consensus.lifetime().valid_until() + config.tolerance.post_valid_tolerance();
934        let n_microdescs = consensus.relays().len();
935
936        let params = &config.override_net_params;
937        #[cfg(not(feature = "geoip"))]
938        let mut partial_dir = PartialNetDir::new(consensus, Some(params));
939        // TODO(eta): Make this embedded database configurable using the `DirMgrConfig`.
940        #[cfg(feature = "geoip")]
941        let mut partial_dir =
942            PartialNetDir::new_with_geoip(consensus, Some(params), &GeoipDb::new_embedded());
943
944        if let Some(old_dir) = prev_netdir.as_ref().and_then(|x| x.get_netdir()) {
945            partial_dir.fill_from_previous_netdir(old_dir);
946        }
947
948        // Always upgrade at least once: otherwise, we won't notice we're ready unless we
949        // add a microdescriptor.
950        let mut partial = PendingNetDir::Partial(partial_dir);
951        partial.upgrade_if_necessary();
952
953        GetMicrodescsState {
954            cache_usage,
955            n_microdescs,
956            partial,
957            meta,
958            newly_listed: Vec::new(),
959            reset_time,
960            rt,
961            config,
962            prev_netdir,
963
964            #[cfg(feature = "dirfilter")]
965            filter,
966        }
967    }
968
969    /// Add a bunch of microdescriptors to the in-progress netdir.
970    fn register_microdescs<I>(&mut self, mds: I, _source: &DocSource, changed: &mut bool)
971    where
972        I: IntoIterator<Item = Microdesc>,
973    {
974        #[cfg(feature = "dirfilter")]
975        let mds: Vec<Microdesc> = mds
976            .into_iter()
977            .filter_map(|m| self.filter.filter_md(m).ok())
978            .collect();
979        let is_partial = matches!(self.partial, PendingNetDir::Partial(..));
980        for md in mds {
981            if is_partial {
982                self.newly_listed.push(*md.digest());
983            }
984            self.partial.add_microdesc(md);
985            *changed = true;
986        }
987        self.partial.upgrade_if_necessary();
988    }
989}
990
991impl<R: Runtime> DirState for GetMicrodescsState<R> {
992    fn describe(&self) -> String {
993        format!(
994            "Downloading microdescriptors (we are missing {}).",
995            self.partial.n_missing()
996        )
997    }
998    fn missing_docs(&self) -> Vec<DocId> {
999        self.partial
1000            .missing_microdescs()
1001            .map(|d| DocId::Microdesc(*d))
1002            .collect()
1003    }
1004    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
1005        match self.partial {
1006            PendingNetDir::Yielding {
1007                ref mut netdir,
1008                ref mut collected_microdescs,
1009                ..
1010            } => {
1011                if netdir.is_some() {
1012                    Some(NetDirChange::AttemptReplace {
1013                        netdir,
1014                        consensus_meta: &self.meta,
1015                    })
1016                } else {
1017                    collected_microdescs
1018                        .is_empty()
1019                        .then_some(NetDirChange::AddMicrodescs(collected_microdescs))
1020                }
1021            }
1022            _ => None,
1023        }
1024    }
1025    fn is_ready(&self, ready: Readiness) -> bool {
1026        match ready {
1027            Readiness::Complete => self.partial.n_missing() == 0,
1028            Readiness::Usable => {
1029                // We're "usable" if the calling code thought our netdir was usable enough to
1030                // steal it.
1031                matches!(self.partial, PendingNetDir::Yielding { ref netdir, .. } if netdir.is_none())
1032            }
1033        }
1034    }
1035    fn can_advance(&self) -> bool {
1036        false
1037    }
1038    fn bootstrap_progress(&self) -> DirProgress {
1039        let n_present = self.n_microdescs - self.partial.n_missing();
1040        DirProgress::Validated {
1041            lifetime: self.meta.lifetime().clone(),
1042            usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
1043            n_mds: (n_present as u32, self.n_microdescs as u32),
1044            usable: self.is_ready(Readiness::Usable),
1045        }
1046    }
1047    fn dl_config(&self) -> DownloadSchedule {
1048        self.config.schedule.retry_microdescs()
1049    }
1050    fn add_from_cache(
1051        &mut self,
1052        docs: HashMap<DocId, DocumentText>,
1053        changed: &mut bool,
1054    ) -> Result<()> {
1055        let mut microdescs = Vec::new();
1056        for (id, text) in docs {
1057            if let DocId::Microdesc(digest) = id {
1058                if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
1059                    if md.digest() == &digest {
1060                        microdescs.push(md);
1061                        continue;
1062                    }
1063                }
1064                warn!("Found a mismatched microdescriptor in cache; ignoring");
1065            }
1066        }
1067
1068        self.register_microdescs(microdescs, &DocSource::LocalCache, changed);
1069        Ok(())
1070    }
1071
1072    fn add_from_download(
1073        &mut self,
1074        text: &str,
1075        request: &ClientRequest,
1076        source: DocSource,
1077        storage: Option<&Mutex<DynStore>>,
1078        changed: &mut bool,
1079    ) -> Result<()> {
1080        let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
1081            req.digests().collect()
1082        } else {
1083            return Err(internal!("expected a microdesc request").into());
1084        };
1085        let mut new_mds = Vec::new();
1086        let mut nonfatal_err = None;
1087
1088        for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
1089            .map_err(|e| Error::from_netdoc(source.clone(), e))?
1090        {
1091            let anno = match anno {
1092                Err(e) => {
1093                    nonfatal_err.get_or_insert_with(|| Error::from_netdoc(source.clone(), e));
1094                    continue;
1095                }
1096                Ok(a) => a,
1097            };
1098            let txt = anno
1099                .within(text)
1100                .expect("microdesc not from within text as expected");
1101            let md = anno.into_microdesc();
1102            if !requested.contains(md.digest()) {
1103                warn!(
1104                    "Received microdescriptor from {} we did not ask for: {:?}",
1105                    source,
1106                    md.digest()
1107                );
1108                nonfatal_err.get_or_insert(Error::Unwanted("un-requested microdescriptor"));
1109                continue;
1110            }
1111            new_mds.push((txt, md));
1112        }
1113
1114        let mark_listed = self.meta.lifetime().valid_after();
1115        if let Some(store) = storage {
1116            let mut s = store
1117                .lock()
1118                //.get_mut()
1119                .expect("Directory storage lock poisoned");
1120            if !self.newly_listed.is_empty() {
1121                s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
1122                self.newly_listed.clear();
1123            }
1124            if !new_mds.is_empty() {
1125                s.store_microdescs(
1126                    &new_mds
1127                        .iter()
1128                        .map(|(text, md)| (*text, md.digest()))
1129                        .collect::<Vec<_>>(),
1130                    mark_listed,
1131                )?;
1132            }
1133        }
1134
1135        self.register_microdescs(new_mds.into_iter().map(|(_, md)| md), &source, changed);
1136
1137        opt_err_to_result(nonfatal_err)
1138    }
1139    fn advance(self: Box<Self>) -> Box<dyn DirState> {
1140        self
1141    }
1142    fn reset_time(&self) -> Option<SystemTime> {
1143        // TODO(nickm): The reset logic is a little wonky here: we don't truly
1144        // want to _reset_ this state at `replace_dir_time`.  In fact, we ought
1145        // to be able to have multiple states running in parallel: one filling
1146        // in the mds for an old consensus, and one trying to fetch a better
1147        // one.  That's likely to require some amount of refactoring of the
1148        // bootstrap code.
1149
1150        Some(match self.partial {
1151            // If the client has taken a completed netdir, the netdir is now
1152            // usable: We can reset our download attempt when we choose to try
1153            // to replace this directory.
1154            PendingNetDir::Yielding {
1155                replace_dir_time,
1156                netdir: None,
1157                ..
1158            } => replace_dir_time,
1159            // We don't have a completed netdir: Keep trying to fill this one in
1160            // until it is _definitely_ unusable.  (Our clock might be skewed;
1161            // there might be no up-to-date consensus.)
1162            _ => self.reset_time,
1163        })
1164    }
1165    fn reset(self: Box<Self>) -> Box<dyn DirState> {
1166        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
1167            // Cache only means we can't ever download.
1168            CacheUsage::CacheOnly
1169        } else if self.is_ready(Readiness::Usable) {
1170            // If we managed to bootstrap a usable consensus, then we won't
1171            // accept our next consensus from the cache.
1172            CacheUsage::MustDownload
1173        } else {
1174            // If we didn't manage to bootstrap a usable consensus, then we can
1175            // indeed try again with the one in the cache.
1176            // TODO(nickm) is this right?
1177            CacheUsage::CacheOkay
1178        };
1179        Box::new(GetConsensusState::new(
1180            self.rt,
1181            self.config,
1182            cache_usage,
1183            self.prev_netdir,
1184            #[cfg(feature = "dirfilter")]
1185            self.filter,
1186        ))
1187    }
1188}
1189
1190/// Choose a random download time to replace a consensus whose lifetime
1191/// is `lifetime`.
1192fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
1193    let (lowbound, uncertainty) = client_download_range(lifetime);
1194    lowbound + rand::rng().gen_range_infallible(..=uncertainty)
1195}
1196
1197/// Based on the lifetime for a consensus, return the time range during which
1198/// clients should fetch the next one.
1199fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
1200    let valid_after = lt.valid_after();
1201    let valid_until = lt.valid_until();
1202    let voting_interval = lt.voting_period();
1203    let whole_lifetime = valid_until
1204        .duration_since(valid_after)
1205        .expect("valid-after must precede valid-until");
1206
1207    // From dir-spec:
1208    // "This time is chosen uniformly at random from the interval
1209    // between the time 3/4 into the first interval after the
1210    // consensus is no longer fresh, and 7/8 of the time remaining
1211    // after that before the consensus is invalid."
1212    let lowbound = voting_interval + (voting_interval * 3) / 4;
1213    let remainder = whole_lifetime
1214        .checked_sub(lowbound)
1215        .expect("Arithmetic did not work as expected");
1216    let uncertainty = (remainder * 7) / 8;
1217
1218    (valid_after + lowbound, uncertainty)
1219}
1220
1221/// If `err` is some, return `Err(err)`.  Otherwise return Ok(()).
1222fn opt_err_to_result(e: Option<Error>) -> Result<()> {
1223    match e {
1224        Some(e) => Err(e),
1225        None => Ok(()),
1226    }
1227}
1228
1229/// A dummy state implementation, used when we need to temporarily write a
1230/// placeholder into a box.
1231///
1232/// Calling any method on this state will panic.
1233#[derive(Clone, Debug)]
1234pub(crate) struct PoisonedState;
1235
1236impl DirState for PoisonedState {
1237    fn describe(&self) -> String {
1238        unimplemented!()
1239    }
1240    fn missing_docs(&self) -> Vec<DocId> {
1241        unimplemented!()
1242    }
1243    fn is_ready(&self, _ready: Readiness) -> bool {
1244        unimplemented!()
1245    }
1246    fn can_advance(&self) -> bool {
1247        unimplemented!()
1248    }
1249    fn add_from_cache(
1250        &mut self,
1251        _docs: HashMap<DocId, DocumentText>,
1252        _changed: &mut bool,
1253    ) -> Result<()> {
1254        unimplemented!()
1255    }
1256    fn add_from_download(
1257        &mut self,
1258        _text: &str,
1259        _request: &ClientRequest,
1260        _source: DocSource,
1261        _storage: Option<&Mutex<DynStore>>,
1262        _changed: &mut bool,
1263    ) -> Result<()> {
1264        unimplemented!()
1265    }
1266    fn bootstrap_progress(&self) -> event::DirProgress {
1267        unimplemented!()
1268    }
1269    fn dl_config(&self) -> DownloadSchedule {
1270        unimplemented!()
1271    }
1272    fn advance(self: Box<Self>) -> Box<dyn DirState> {
1273        unimplemented!()
1274    }
1275    fn reset_time(&self) -> Option<SystemTime> {
1276        unimplemented!()
1277    }
1278    fn reset(self: Box<Self>) -> Box<dyn DirState> {
1279        unimplemented!()
1280    }
1281}
1282
1283#[cfg(test)]
1284mod test {
1285    // @@ begin test lint list maintained by maint/add_warning @@
1286    #![allow(clippy::bool_assert_comparison)]
1287    #![allow(clippy::clone_on_copy)]
1288    #![allow(clippy::dbg_macro)]
1289    #![allow(clippy::mixed_attributes_style)]
1290    #![allow(clippy::print_stderr)]
1291    #![allow(clippy::print_stdout)]
1292    #![allow(clippy::single_char_pattern)]
1293    #![allow(clippy::unwrap_used)]
1294    #![allow(clippy::unchecked_time_subtraction)]
1295    #![allow(clippy::useless_vec)]
1296    #![allow(clippy::needless_pass_by_value)]
1297    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1298    #![allow(clippy::cognitive_complexity)]
1299    use super::*;
1300    use std::convert::TryInto;
1301    use std::sync::Arc;
1302    use tempfile::TempDir;
1303    use time::macros::datetime;
1304    use tor_dircommon::{
1305        authority::{AuthorityContacts, AuthorityContactsBuilder},
1306        config::{DownloadScheduleConfig, NetworkConfig},
1307    };
1308    use tor_netdoc::doc::authcert::AuthCertKeyIds;
1309    use tor_rtcompat::RuntimeSubstExt as _;
1310    use tor_rtmock::simple_time::SimpleMockTimeProvider;
1311
1312    #[test]
1313    fn download_schedule() {
1314        let va = datetime!(2008-08-02 20:00 UTC).into();
1315        let fu = datetime!(2008-08-02 21:00 UTC).into();
1316        let vu = datetime!(2008-08-02 23:00 UTC).into();
1317        let lifetime = Lifetime::new(va, fu, vu).unwrap();
1318
1319        let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
1320        let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
1321
1322        let (start, range) = client_download_range(&lifetime);
1323        assert_eq!(start, expected_start);
1324        assert_eq!(range, expected_range);
1325
1326        for _ in 0..100 {
1327            let when = pick_download_time(&lifetime);
1328            assert!(when > va);
1329            assert!(when >= expected_start);
1330            assert!(when < vu);
1331            assert!(when <= expected_start + range);
1332        }
1333    }
1334
1335    /// Makes a memory-backed storage.
1336    fn temp_store() -> (TempDir, Mutex<DynStore>) {
1337        let tempdir = TempDir::new().unwrap();
1338
1339        let store = crate::storage::SqliteStore::from_path_and_mistrust(
1340            tempdir.path(),
1341            &fs_mistrust::Mistrust::new_dangerously_trust_everyone(),
1342            false,
1343        )
1344        .unwrap();
1345
1346        (tempdir, Mutex::new(Box::new(store)))
1347    }
1348
1349    fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
1350        let msp = SimpleMockTimeProvider::from_wallclock(now);
1351        rt.with_sleep_provider(msp.clone())
1352            .with_coarse_time_provider(msp)
1353    }
1354
1355    fn make_dirmgr_config(authorities: Option<AuthorityContactsBuilder>) -> Arc<DirMgrConfig> {
1356        let mut netcfg = NetworkConfig::builder();
1357        netcfg.set_fallback_caches(vec![]);
1358        if let Some(a) = authorities {
1359            *netcfg.authorities() = a;
1360        }
1361        let cfg = DirMgrConfig {
1362            cache_dir: "/we_will_never_use_this/".into(),
1363            network: netcfg.build().unwrap(),
1364            ..Default::default()
1365        };
1366        Arc::new(cfg)
1367    }
1368
1369    // Test data
1370    const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
1371    const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
1372    const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
1373    const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
1374    #[allow(unused)]
1375    const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
1376    fn test_time() -> SystemTime {
1377        datetime!(2020-08-07 12:42:45 UTC).into()
1378    }
1379    fn rsa(s: &str) -> RsaIdentity {
1380        RsaIdentity::from_hex(s).unwrap()
1381    }
1382    fn test_authorities() -> AuthorityContactsBuilder {
1383        let mut builder = AuthorityContacts::builder();
1384        builder
1385            .v3idents()
1386            .push(rsa("5696AB38CB3852AFA476A5C07B2D4788963D5567"));
1387        builder
1388            .v3idents()
1389            .push(rsa("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"));
1390
1391        builder
1392    }
1393    fn authcert_id_5696() -> AuthCertKeyIds {
1394        AuthCertKeyIds {
1395            id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
1396            sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
1397        }
1398    }
1399    fn authcert_id_5a23() -> AuthCertKeyIds {
1400        AuthCertKeyIds {
1401            id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
1402            sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
1403        }
1404    }
1405    // remember, we're saying that we don't recognize this one as an authority.
1406    fn authcert_id_7c47() -> AuthCertKeyIds {
1407        AuthCertKeyIds {
1408            id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
1409            sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
1410        }
1411    }
1412    fn microdescs() -> HashMap<MdDigest, String> {
1413        const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
1414        let text = MICRODESCS;
1415        MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
1416            .unwrap()
1417            .map(|res| {
1418                let anno = res.unwrap();
1419                let text = anno.within(text).unwrap();
1420                let md = anno.into_microdesc();
1421                (*md.digest(), text.to_owned())
1422            })
1423            .collect()
1424    }
1425
1426    #[test]
1427    fn get_consensus_state() {
1428        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1429            let rt = make_time_shifted_runtime(test_time(), rt);
1430            let cfg = make_dirmgr_config(None);
1431
1432            let (_tempdir, store) = temp_store();
1433
1434            let mut state = GetConsensusState::new(
1435                rt.clone(),
1436                cfg,
1437                CacheUsage::CacheOkay,
1438                None,
1439                #[cfg(feature = "dirfilter")]
1440                Arc::new(crate::filter::NilFilter),
1441            );
1442
1443            // Is description okay?
1444            assert_eq!(&state.describe(), "Looking for a consensus.");
1445
1446            // Basic properties: without a consensus it is not ready to advance.
1447            assert!(!state.can_advance());
1448            assert!(!state.is_ready(Readiness::Complete));
1449            assert!(!state.is_ready(Readiness::Usable));
1450
1451            // Basic properties: it doesn't want to reset.
1452            assert!(state.reset_time().is_none());
1453
1454            // Its starting DirStatus is "fetching a consensus".
1455            assert_eq!(
1456                state.bootstrap_progress().to_string(),
1457                "fetching a consensus"
1458            );
1459
1460            // Download configuration is simple: only 1 request can be done in
1461            // parallel.  It uses a consensus retry schedule.
1462            let retry = state.dl_config();
1463            assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus());
1464
1465            // Do we know what we want?
1466            let docs = state.missing_docs();
1467            assert_eq!(docs.len(), 1);
1468            let docid = docs[0];
1469
1470            assert!(matches!(
1471                docid,
1472                DocId::LatestConsensus {
1473                    flavor: ConsensusFlavor::Microdesc,
1474                    cache_usage: CacheUsage::CacheOkay,
1475                }
1476            ));
1477            let source = DocSource::DirServer { source: None };
1478
1479            // Now suppose that we get some complete junk from a download.
1480            let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
1481            let req = crate::docid::ClientRequest::Consensus(req);
1482            let mut changed = false;
1483            let outcome = state.add_from_download(
1484                "this isn't a consensus",
1485                &req,
1486                source.clone(),
1487                Some(&store),
1488                &mut changed,
1489            );
1490            assert!(matches!(outcome, Err(Error::NetDocError { .. })));
1491            assert!(!changed);
1492            // make sure it wasn't stored...
1493            assert!(
1494                store
1495                    .lock()
1496                    .unwrap()
1497                    .latest_consensus(ConsensusFlavor::Microdesc, None)
1498                    .unwrap()
1499                    .is_none()
1500            );
1501
1502            // Now try again, with a real consensus... but the wrong authorities.
1503            let mut changed = false;
1504            let outcome = state.add_from_download(
1505                CONSENSUS,
1506                &req,
1507                source.clone(),
1508                Some(&store),
1509                &mut changed,
1510            );
1511            assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
1512            assert!(!changed);
1513            assert!(
1514                store
1515                    .lock()
1516                    .unwrap()
1517                    .latest_consensus(ConsensusFlavor::Microdesc, None)
1518                    .unwrap()
1519                    .is_none()
1520            );
1521
1522            // Great. Change the receiver to use a configuration where these test
1523            // authorities are recognized.
1524            let cfg = make_dirmgr_config(Some(test_authorities()));
1525
1526            let mut state = GetConsensusState::new(
1527                rt.clone(),
1528                cfg,
1529                CacheUsage::CacheOkay,
1530                None,
1531                #[cfg(feature = "dirfilter")]
1532                Arc::new(crate::filter::NilFilter),
1533            );
1534            let mut changed = false;
1535            let outcome =
1536                state.add_from_download(CONSENSUS, &req, source, Some(&store), &mut changed);
1537            assert!(outcome.is_ok());
1538            assert!(changed);
1539            assert!(
1540                store
1541                    .lock()
1542                    .unwrap()
1543                    .latest_consensus(ConsensusFlavor::Microdesc, None)
1544                    .unwrap()
1545                    .is_some()
1546            );
1547
1548            // And with that, we should be asking for certificates
1549            assert!(state.can_advance());
1550            assert_eq!(&state.describe(), "About to fetch certificates.");
1551            assert_eq!(state.missing_docs(), Vec::new());
1552            let next = Box::new(state).advance();
1553            assert_eq!(
1554                &next.describe(),
1555                "Downloading certificates for consensus (we are missing 2/2)."
1556            );
1557
1558            // Try again, but this time get the state from the cache.
1559            let cfg = make_dirmgr_config(Some(test_authorities()));
1560            let mut state = GetConsensusState::new(
1561                rt,
1562                cfg,
1563                CacheUsage::CacheOkay,
1564                None,
1565                #[cfg(feature = "dirfilter")]
1566                Arc::new(crate::filter::NilFilter),
1567            );
1568            let text: crate::storage::InputString = CONSENSUS.to_owned().into();
1569            let map = vec![(docid, text.into())].into_iter().collect();
1570            let mut changed = false;
1571            let outcome = state.add_from_cache(map, &mut changed);
1572            assert!(outcome.is_ok());
1573            assert!(changed);
1574            assert!(state.can_advance());
1575        });
1576    }
1577
1578    #[test]
1579    fn get_certs_state() {
1580        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1581            /// Construct a GetCertsState with our test data
1582            fn new_getcerts_state(rt: impl Runtime) -> Box<dyn DirState> {
1583                let rt = make_time_shifted_runtime(test_time(), rt);
1584                let cfg = make_dirmgr_config(Some(test_authorities()));
1585                let mut state = GetConsensusState::new(
1586                    rt,
1587                    cfg,
1588                    CacheUsage::CacheOkay,
1589                    None,
1590                    #[cfg(feature = "dirfilter")]
1591                    Arc::new(crate::filter::NilFilter),
1592                );
1593                let source = DocSource::DirServer { source: None };
1594                let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
1595                let req = crate::docid::ClientRequest::Consensus(req);
1596                let mut changed = false;
1597                let outcome = state.add_from_download(CONSENSUS, &req, source, None, &mut changed);
1598                assert!(outcome.is_ok());
1599                Box::new(state).advance()
1600            }
1601
1602            let (_tempdir, store) = temp_store();
1603            let mut state = new_getcerts_state(rt.clone());
1604            // Basic properties: description, status, reset time.
1605            assert_eq!(
1606                &state.describe(),
1607                "Downloading certificates for consensus (we are missing 2/2)."
1608            );
1609            assert!(!state.can_advance());
1610            assert!(!state.is_ready(Readiness::Complete));
1611            assert!(!state.is_ready(Readiness::Usable));
1612            let consensus_expires: SystemTime = datetime!(2020-08-07 12:43:20 UTC).into();
1613            let post_valid_tolerance = crate::DirTolerance::default().post_valid_tolerance();
1614            assert_eq!(
1615                state.reset_time(),
1616                Some(consensus_expires + post_valid_tolerance)
1617            );
1618            let retry = state.dl_config();
1619            assert_eq!(retry, DownloadScheduleConfig::default().retry_certs());
1620
1621            // Bootstrap status okay?
1622            assert_eq!(
1623                state.bootstrap_progress().to_string(),
1624                "fetching authority certificates (0/2)"
1625            );
1626
1627            // Check that we get the right list of missing docs.
1628            let missing = state.missing_docs();
1629            assert_eq!(missing.len(), 2); // We are missing two certificates.
1630            assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
1631            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
1632            // we don't ask for this one because we don't recognize its authority
1633            assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
1634
1635            // Add one from the cache; make sure the list is still right
1636            let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
1637            // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
1638            let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
1639                .into_iter()
1640                .collect();
1641            let mut changed = false;
1642            let outcome = state.add_from_cache(docs, &mut changed);
1643            assert!(changed);
1644            assert!(outcome.is_ok()); // no error, and something changed.
1645            assert!(!state.can_advance()); // But we aren't done yet.
1646            let missing = state.missing_docs();
1647            assert_eq!(missing.len(), 1); // Now we're only missing one!
1648            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
1649            assert_eq!(
1650                state.bootstrap_progress().to_string(),
1651                "fetching authority certificates (1/2)"
1652            );
1653
1654            // Now try to add the other from a download ... but fail
1655            // because we didn't ask for it.
1656            let source = DocSource::DirServer { source: None };
1657            let mut req = tor_dirclient::request::AuthCertRequest::new();
1658            req.push(authcert_id_5696()); // it's the wrong id.
1659            let req = ClientRequest::AuthCert(req);
1660            let mut changed = false;
1661            let outcome = state.add_from_download(
1662                AUTHCERT_5A23,
1663                &req,
1664                source.clone(),
1665                Some(&store),
1666                &mut changed,
1667            );
1668            assert!(matches!(outcome, Err(Error::Unwanted(_))));
1669            assert!(!changed);
1670            let missing2 = state.missing_docs();
1671            assert_eq!(missing, missing2); // No change.
1672            assert!(
1673                store
1674                    .lock()
1675                    .unwrap()
1676                    .authcerts(&[authcert_id_5a23()])
1677                    .unwrap()
1678                    .is_empty()
1679            );
1680
1681            // Now try to add the other from a download ... for real!
1682            let mut req = tor_dirclient::request::AuthCertRequest::new();
1683            req.push(authcert_id_5a23()); // Right idea this time!
1684            let req = ClientRequest::AuthCert(req);
1685            let mut changed = false;
1686            let outcome =
1687                state.add_from_download(AUTHCERT_5A23, &req, source, Some(&store), &mut changed);
1688            assert!(outcome.is_ok()); // No error, _and_ something changed!
1689            assert!(changed);
1690            let missing3 = state.missing_docs();
1691            assert!(missing3.is_empty());
1692            assert!(state.can_advance());
1693            assert!(
1694                !store
1695                    .lock()
1696                    .unwrap()
1697                    .authcerts(&[authcert_id_5a23()])
1698                    .unwrap()
1699                    .is_empty()
1700            );
1701
1702            let next = state.advance();
1703            assert_eq!(
1704                &next.describe(),
1705                "Downloading microdescriptors (we are missing 6)."
1706            );
1707
1708            // If we start from scratch and reset, we're back in GetConsensus.
1709            let state = new_getcerts_state(rt);
1710            let state = state.reset();
1711            assert_eq!(&state.describe(), "Downloading a consensus.");
1712
1713            // TODO: I'd like even more tests to make sure that we never
1714            // accept a certificate for an authority we don't believe in.
1715        });
1716    }
1717
1718    #[test]
1719    fn get_microdescs_state() {
1720        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1721            /// Construct a GetCertsState with our test data
1722            fn new_getmicrodescs_state(rt: impl Runtime) -> GetMicrodescsState<impl Runtime> {
1723                let rt = make_time_shifted_runtime(test_time(), rt);
1724                let cfg = make_dirmgr_config(Some(test_authorities()));
1725                let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
1726                let consensus = consensus
1727                    .dangerously_assume_timely()
1728                    .dangerously_assume_wellsigned();
1729                let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
1730                GetMicrodescsState::new(
1731                    CacheUsage::CacheOkay,
1732                    consensus,
1733                    meta,
1734                    rt,
1735                    cfg,
1736                    None,
1737                    #[cfg(feature = "dirfilter")]
1738                    Arc::new(crate::filter::NilFilter),
1739                )
1740            }
1741            fn d64(s: &str) -> MdDigest {
1742                use base64ct::{Base64Unpadded, Encoding as _};
1743                Base64Unpadded::decode_vec(s).unwrap().try_into().unwrap()
1744            }
1745
1746            // If we start from scratch and reset, we're back in GetConsensus.
1747            let state = new_getmicrodescs_state(rt.clone());
1748            let state = Box::new(state).reset();
1749            assert_eq!(&state.describe(), "Looking for a consensus.");
1750
1751            // Check the basics.
1752            let mut state = new_getmicrodescs_state(rt.clone());
1753            assert_eq!(
1754                &state.describe(),
1755                "Downloading microdescriptors (we are missing 4)."
1756            );
1757            assert!(!state.can_advance());
1758            assert!(!state.is_ready(Readiness::Complete));
1759            assert!(!state.is_ready(Readiness::Usable));
1760            {
1761                let reset_time = state.reset_time().unwrap();
1762                let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
1763                let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
1764                assert!(reset_time >= fresh_until);
1765                assert!(reset_time <= valid_until + state.config.tolerance.post_valid_tolerance());
1766            }
1767            let retry = state.dl_config();
1768            assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs());
1769            assert_eq!(
1770                state.bootstrap_progress().to_string(),
1771                "fetching microdescriptors (0/4)"
1772            );
1773
1774            // Now check whether we're missing all the right microdescs.
1775            let missing = state.missing_docs();
1776            let md_text = microdescs();
1777            assert_eq!(missing.len(), 4);
1778            assert_eq!(md_text.len(), 4);
1779            let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
1780            let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
1781            let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
1782            let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
1783            for md_digest in [md1, md2, md3, md4] {
1784                assert!(missing.contains(&DocId::Microdesc(md_digest)));
1785                assert!(md_text.contains_key(&md_digest));
1786            }
1787
1788            // Try adding a microdesc from the cache.
1789            let (_tempdir, store) = temp_store();
1790            let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
1791            let docs = vec![(DocId::Microdesc(md1), doc1.into())]
1792                .into_iter()
1793                .collect();
1794            let mut changed = false;
1795            let outcome = state.add_from_cache(docs, &mut changed);
1796            assert!(outcome.is_ok()); // successfully loaded one MD.
1797            assert!(changed);
1798            assert!(!state.can_advance());
1799            assert!(!state.is_ready(Readiness::Complete));
1800            assert!(!state.is_ready(Readiness::Usable));
1801
1802            // Now we should be missing 3.
1803            let missing = state.missing_docs();
1804            assert_eq!(missing.len(), 3);
1805            assert!(!missing.contains(&DocId::Microdesc(md1)));
1806            assert_eq!(
1807                state.bootstrap_progress().to_string(),
1808                "fetching microdescriptors (1/4)"
1809            );
1810
1811            // Try adding the rest as if from a download.
1812            let mut req = tor_dirclient::request::MicrodescRequest::new();
1813            let mut response = "".to_owned();
1814            for md_digest in [md2, md3, md4] {
1815                response.push_str(md_text.get(&md_digest).unwrap());
1816                req.push(md_digest);
1817            }
1818            let req = ClientRequest::Microdescs(req);
1819            let source = DocSource::DirServer { source: None };
1820            let mut changed = false;
1821            let outcome = state.add_from_download(
1822                response.as_str(),
1823                &req,
1824                source,
1825                Some(&store),
1826                &mut changed,
1827            );
1828            assert!(outcome.is_ok()); // successfully loaded MDs
1829            assert!(changed);
1830            match state.get_netdir_change().unwrap() {
1831                NetDirChange::AttemptReplace { netdir, .. } => {
1832                    assert!(netdir.take().is_some());
1833                }
1834                x => panic!("wrong netdir change: {:?}", x),
1835            }
1836            assert!(state.is_ready(Readiness::Complete));
1837            assert!(state.is_ready(Readiness::Usable));
1838            assert_eq!(
1839                store
1840                    .lock()
1841                    .unwrap()
1842                    .microdescs(&[md2, md3, md4])
1843                    .unwrap()
1844                    .len(),
1845                3
1846            );
1847
1848            let missing = state.missing_docs();
1849            assert!(missing.is_empty());
1850        });
1851    }
1852}