Skip to main content

tor_dirmgr/
bootstrap.rs

1//! Functions to download or load directory objects, using the
2//! state machines in the `states` module.
3
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::{
7    collections::HashMap,
8    sync::{Arc, Weak},
9    time::{Duration, SystemTime},
10};
11
12use crate::DirMgrConfig;
13use crate::DocSource;
14use crate::err::BootstrapAction;
15use crate::state::{DirState, PoisonedState};
16use crate::{
17    DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
18    docid::{self, ClientRequest},
19    upgrade_weak_ref,
20};
21
22use futures::FutureExt;
23use futures::StreamExt;
24use oneshot_fused_workaround as oneshot;
25use tor_dirclient::DirResponse;
26use tor_error::{info_report, warn_report};
27use tor_rtcompat::Runtime;
28use tor_rtcompat::scheduler::TaskSchedule;
29use tracing::{debug, info, instrument, trace, warn};
30
31use crate::storage::Store;
32#[cfg(test)]
33use std::sync::LazyLock;
34#[cfg(test)]
35use std::sync::Mutex;
36use tor_circmgr::{CircMgr, DirInfo};
37use tor_netdir::{NetDir, NetDirProvider as _};
38use tor_netdoc::doc::netstatus::ConsensusFlavor;
39
40/// Given a Result<()>, exit the current function if it is anything other than
41/// Ok(), or a nonfatal error.
42macro_rules! propagate_fatal_errors {
43    ( $e:expr ) => {
44        let v: Result<()> = $e;
45        if let Err(e) = v {
46            match e.bootstrap_action() {
47                BootstrapAction::Nonfatal => {}
48                _ => return Err(e),
49            }
50        }
51    };
52}
53
54/// Identifier for an attempt to bootstrap a directory.
55///
56/// Every time that we decide to download a new directory, _despite already
57/// having one_, counts as a new attempt.
58///
59/// These are used to track the progress of each attempt independently.
60#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
61#[display("{0}", id)]
62pub(crate) struct AttemptId {
63    /// Which attempt at downloading a directory is this?
64    id: NonZeroUsize,
65}
66
67impl AttemptId {
68    /// Return a new unused AtomicUsize that will be greater than any previous
69    /// one.
70    ///
71    /// # Panics
72    ///
73    /// Panics if we have exhausted the possible space of AtomicIds.
74    pub(crate) fn next() -> Self {
75        use std::sync::atomic::{AtomicUsize, Ordering};
76        /// atomic used to generate the next attempt.
77        static NEXT: AtomicUsize = AtomicUsize::new(1);
78        let id = NEXT.fetch_add(1, Ordering::Relaxed);
79        let id = id.try_into().expect("Allocated too many AttemptIds");
80        Self { id }
81    }
82}
83
84/// If there were errors from a peer in `outcome`, record those errors by
85/// marking the circuit (if any) as needing retirement, and noting the peer
86/// (if any) as having failed.
87fn note_request_outcome<R: Runtime>(
88    circmgr: &CircMgr<R>,
89    outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
90) {
91    use tor_dirclient::{Error::RequestFailed, RequestFailedError};
92    // Extract an error and a source from this outcome, if there is one.
93    //
94    // This is complicated because DirResponse can encapsulate the notion of
95    // a response that failed part way through a download: in the case, it
96    // has some data, and also an error.
97    let (err, source) = match outcome {
98        Ok(req) => {
99            if let (Some(e), Some(source)) = (req.error(), req.source()) {
100                (
101                    RequestFailed(RequestFailedError {
102                        error: e.clone(),
103                        source: Some(source.clone()),
104                    }),
105                    source,
106                )
107            } else {
108                return;
109            }
110        }
111        Err(
112            error @ RequestFailed(RequestFailedError {
113                source: Some(source),
114                ..
115            }),
116        ) => (error.clone(), source),
117        _ => return,
118    };
119
120    note_cache_error(circmgr, source, &err.into());
121}
122
123/// Record that a problem has occurred because of a failure in an answer from `source`.
124fn note_cache_error<R: Runtime>(
125    circmgr: &CircMgr<R>,
126    source: &tor_dirclient::SourceInfo,
127    problem: &Error,
128) {
129    use tor_circmgr::ExternalActivity;
130
131    if !problem.indicates_cache_failure() {
132        return;
133    }
134
135    // Does the error here tell us whom to really blame?  If so, blame them
136    // instead.
137    //
138    // (This can happen if we notice a problem while downloading a certificate,
139    // but the real problem is that the consensus was no good.)
140    let real_source = match problem {
141        Error::NetDocError {
142            source: DocSource::DirServer { source: Some(info) },
143            ..
144        } => info,
145        _ => source,
146    };
147
148    info_report!(problem, "Marking {:?} as failed", real_source);
149    circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
150    circmgr.retire_circ(source.unique_circ_id());
151}
152
153/// Record that `source` has successfully given us some directory info.
154fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
155    use tor_circmgr::ExternalActivity;
156
157    trace!("Marking {:?} as successful", source);
158    circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
159}
160
161/// Load every document in `missing` and try to apply it to `state`.
162fn load_and_apply_documents<R: Runtime>(
163    missing: &[DocId],
164    dirmgr: &Arc<DirMgr<R>>,
165    state: &mut Box<dyn DirState>,
166    changed: &mut bool,
167) -> Result<()> {
168    /// How many documents will we try to load at once?  We try to keep this from being too large,
169    /// to avoid excessive RAM usage.
170    ///
171    /// TODO: we may well want to tune this.
172    const CHUNK_SIZE: usize = 256;
173    for chunk in missing.chunks(CHUNK_SIZE) {
174        let documents = {
175            let store = dirmgr.store.lock().expect("store lock poisoned");
176            load_documents_from_store(chunk, &**store)?
177        };
178
179        state.add_from_cache(documents, changed)?;
180    }
181
182    Ok(())
183}
184
185/// Load a set of documents from a `Store`, returning all documents found in the store.
186/// Note that this may be less than the number of documents in `missing`.
187fn load_documents_from_store(
188    missing: &[DocId],
189    store: &dyn Store,
190) -> Result<HashMap<DocId, DocumentText>> {
191    let mut loaded = HashMap::new();
192    for query in docid::partition_by_type(missing.iter().copied()).values() {
193        query.load_from_store_into(&mut loaded, store)?;
194    }
195    Ok(loaded)
196}
197
198/// Construct an appropriate ClientRequest to download a consensus
199/// of the given flavor.
200pub(crate) fn make_consensus_request(
201    now: SystemTime,
202    flavor: ConsensusFlavor,
203    store: &dyn Store,
204    config: &DirMgrConfig,
205) -> Result<ClientRequest> {
206    let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
207
208    let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
209
210    match store.latest_consensus_meta(flavor) {
211        Ok(Some(meta)) => {
212            let valid_after = meta.lifetime().valid_after();
213            request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
214            request.push_old_consensus_digest(*meta.sha3_256_of_signed());
215        }
216        latest => {
217            if let Err(e) = latest {
218                warn_report!(e, "Error loading directory metadata");
219            }
220            // If we don't have a consensus, then request one that's
221            // "reasonably new".  That way, our clock is set far in the
222            // future, we won't download stuff we can't use.
223            request.set_last_consensus_date(default_cutoff);
224        }
225    }
226
227    request.set_skew_limit(
228        // If we are _fast_ by at least this much, then any valid directory will
229        // seem to be at least this far in the past.
230        config.tolerance.post_valid_tolerance(),
231        // If we are _slow_ by this much, then any valid directory will seem to
232        // be at least this far in the future.
233        config.tolerance.pre_valid_tolerance(),
234    );
235
236    Ok(ClientRequest::Consensus(request))
237}
238
239/// Construct a set of `ClientRequest`s in order to fetch the documents in `docs`.
240pub(crate) fn make_requests_for_documents<R: Runtime>(
241    rt: &R,
242    docs: &[DocId],
243    store: &dyn Store,
244    config: &DirMgrConfig,
245) -> Result<Vec<ClientRequest>> {
246    let mut res = Vec::new();
247    for q in docid::partition_by_type(docs.iter().copied())
248        .into_values()
249        .flat_map(|x| x.split_for_download().into_iter())
250    {
251        match q {
252            DocQuery::LatestConsensus { flavor, .. } => {
253                res.push(make_consensus_request(
254                    rt.wallclock(),
255                    flavor,
256                    store,
257                    config,
258                )?);
259            }
260            DocQuery::AuthCert(ids) => {
261                res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
262            }
263            DocQuery::Microdesc(ids) => {
264                res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
265            }
266            #[cfg(feature = "routerdesc")]
267            DocQuery::RouterDesc(ids) => {
268                res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
269            }
270        }
271    }
272    Ok(res)
273}
274
275/// Launch a single client request and get an associated response.
276#[instrument(level = "trace", skip_all)]
277async fn fetch_single<R: Runtime>(
278    rt: &R,
279    request: ClientRequest,
280    current_netdir: Option<&NetDir>,
281    circmgr: Arc<CircMgr<R>>,
282) -> Result<(ClientRequest, DirResponse)> {
283    let dirinfo: DirInfo = match current_netdir {
284        Some(netdir) => netdir.into(),
285        None => tor_circmgr::DirInfo::Nothing,
286    };
287    let outcome =
288        tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
289
290    note_request_outcome(&circmgr, &outcome);
291
292    let resource = outcome?;
293    Ok((request, resource))
294}
295
296/// Testing helper: if this is Some, then we return it in place of any
297/// response to fetch_multiple.
298///
299/// Note that only one test uses this: otherwise there would be a race
300/// condition. :p
301#[cfg(test)]
302static CANNED_RESPONSE: LazyLock<Mutex<Vec<String>>> = LazyLock::new(|| Mutex::new(vec![]));
303
304/// Launch a set of download requests for a set of missing objects in
305/// `missing`, and return each request along with the response it received.
306///
307/// Don't launch more than `parallelism` requests at once.
308#[allow(clippy::cognitive_complexity)] // TODO: maybe refactor?
309#[instrument(level = "trace", skip_all)]
310async fn fetch_multiple<R: Runtime>(
311    dirmgr: Arc<DirMgr<R>>,
312    attempt_id: AttemptId,
313    missing: &[DocId],
314    parallelism: usize,
315) -> Result<Vec<(ClientRequest, DirResponse)>> {
316    let requests = {
317        let store = dirmgr.store.lock().expect("store lock poisoned");
318        make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
319    };
320
321    trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
322           requests.len(), missing.len());
323
324    #[cfg(test)]
325    {
326        let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
327        if !m.is_empty() {
328            return Ok(requests
329                .into_iter()
330                .zip(m.iter().map(DirResponse::from_get_body))
331                .collect());
332        }
333    }
334
335    let circmgr = dirmgr.circmgr()?;
336    // Only use timely directories for bootstrapping directories; otherwise, we'll try fallbacks.
337    let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
338
339    // TODO: instead of waiting for all the queries to finish, we
340    // could stream the responses back or something.
341    let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
342        .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
343        .buffer_unordered(parallelism)
344        .collect()
345        .await;
346
347    let mut useful_responses = Vec::new();
348    for r in responses {
349        // TODO: on some error cases we might want to stop using this source.
350        match r {
351            Ok((request, response)) => {
352                if response.status_code() == 200 {
353                    useful_responses.push((request, response));
354                } else {
355                    trace!(
356                        "cache declined request; reported status {:?}",
357                        response.status_code()
358                    );
359                }
360            }
361            Err(e) => warn_report!(e, "error while downloading"),
362        }
363    }
364
365    trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
366
367    Ok(useful_responses)
368}
369
370/// Try to update `state` by loading cached information from `dirmgr`.
371fn load_once<R: Runtime>(
372    dirmgr: &Arc<DirMgr<R>>,
373    state: &mut Box<dyn DirState>,
374    attempt_id: AttemptId,
375    changed_out: &mut bool,
376) -> Result<()> {
377    let missing = state.missing_docs();
378    let mut changed = false;
379    let outcome: Result<()> = if missing.is_empty() {
380        trace!("Found no missing documents; can't advance current state");
381        Ok(())
382    } else {
383        trace!(
384            "Found {} missing documents; trying to load them",
385            missing.len()
386        );
387
388        load_and_apply_documents(&missing, dirmgr, state, &mut changed)
389    };
390
391    // We have to update the status here regardless of the outcome, if we got
392    // any information: even if there was an error, we might have received
393    // partial information that changed our status.
394    if changed {
395        dirmgr.update_progress(attempt_id, state.bootstrap_progress());
396        *changed_out = true;
397    }
398
399    outcome
400}
401
402/// Try to load as much state as possible for a provided `state` from the
403/// cache in `dirmgr`, advancing the state to the extent possible.
404///
405/// No downloads are performed; the provided state will not be reset.
406#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Somewhat due to tracing.
407pub(crate) fn load<R: Runtime>(
408    dirmgr: &Arc<DirMgr<R>>,
409    mut state: Box<dyn DirState>,
410    attempt_id: AttemptId,
411) -> Result<Box<dyn DirState>> {
412    let mut safety_counter = 0_usize;
413    loop {
414        trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
415        let mut changed = false;
416        let outcome = load_once(dirmgr, &mut state, attempt_id, &mut changed);
417        {
418            let mut store = dirmgr.store.lock().expect("store lock poisoned");
419            dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
420            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
421        }
422        trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
423
424        if let Err(e) = outcome {
425            match e.bootstrap_action() {
426                BootstrapAction::Nonfatal => {
427                    debug!("Recoverable error loading from cache: {}", e);
428                }
429                BootstrapAction::Fatal | BootstrapAction::Reset => {
430                    return Err(e);
431                }
432            }
433        }
434
435        if state.can_advance() {
436            state = state.advance();
437            trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
438            safety_counter = 0;
439        } else {
440            if !changed {
441                // TODO: Are there more nonfatal errors that mean we should
442                // break?
443                trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
444                break;
445            }
446            safety_counter += 1;
447            assert!(
448                safety_counter < 100,
449                "Spent 100 iterations in the same state: this is a bug"
450            );
451        }
452    }
453
454    Ok(state)
455}
456
457/// Helper: Make a set of download attempts for the current directory state,
458/// and on success feed their results into the state object.
459///
460/// This can launch one or more download requests, but will not launch more
461/// than `parallelism` requests at a time.
462#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
463#[instrument(level = "trace", skip_all)]
464async fn download_attempt<R: Runtime>(
465    dirmgr: &Arc<DirMgr<R>>,
466    state: &mut Box<dyn DirState>,
467    parallelism: usize,
468    attempt_id: AttemptId,
469) -> Result<()> {
470    let missing = state.missing_docs();
471    let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
472    let mut n_errors = 0;
473    for (client_req, dir_response) in fetched {
474        let source = dir_response.source().cloned();
475        let text = match String::from_utf8(dir_response.into_output_unchecked())
476            .map_err(Error::BadUtf8FromDirectory)
477        {
478            Ok(t) => t,
479            Err(e) => {
480                if let Some(source) = source {
481                    n_errors += 1;
482                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
483                }
484                continue;
485            }
486        };
487        match dirmgr.expand_response_text(&client_req, text) {
488            Ok(text) => {
489                let doc_source = DocSource::DirServer {
490                    source: source.clone(),
491                };
492                let mut changed = false;
493                let outcome = state.add_from_download(
494                    &text,
495                    &client_req,
496                    doc_source,
497                    Some(&dirmgr.store),
498                    &mut changed,
499                );
500
501                if !changed {
502                    debug_assert!(outcome.is_err());
503                }
504
505                if let Some(source) = source {
506                    if let Err(e) = &outcome {
507                        n_errors += 1;
508                        note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
509                    } else {
510                        note_cache_success(dirmgr.circmgr()?.deref(), &source);
511                    }
512                }
513
514                if let Err(e) = &outcome {
515                    dirmgr.note_errors(attempt_id, 1);
516                    warn_report!(e, "error while adding directory info");
517                }
518                propagate_fatal_errors!(outcome);
519            }
520            Err(e) => {
521                warn_report!(e, "Error when expanding directory text");
522                if let Some(source) = source {
523                    n_errors += 1;
524                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
525                }
526                propagate_fatal_errors!(Err(e));
527            }
528        }
529    }
530    if n_errors != 0 {
531        dirmgr.note_errors(attempt_id, n_errors);
532    }
533    dirmgr.update_progress(attempt_id, state.bootstrap_progress());
534
535    Ok(())
536}
537
538/// Download information into a DirState state machine until it is
539/// ["complete"](Readiness::Complete), or until we hit a non-recoverable error.
540///
541/// Use `dirmgr` to load from the cache or to launch downloads.
542///
543/// Keep resetting the state as needed.
544///
545/// The first time that the state becomes ["usable"](Readiness::Usable), notify
546/// the sender in `on_usable`.
547#[allow(clippy::cognitive_complexity)] // TODO: Refactor!
548#[instrument(level = "trace", skip_all)]
549pub(crate) async fn download<R: Runtime>(
550    dirmgr: Weak<DirMgr<R>>,
551    state: &mut Box<dyn DirState>,
552    schedule: &mut TaskSchedule<R>,
553    attempt_id: AttemptId,
554    on_usable: &mut Option<oneshot::Sender<()>>,
555) -> Result<()> {
556    let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
557
558    trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
559
560    'next_state: loop {
561        let retry_config = state.dl_config();
562        let parallelism = retry_config.parallelism();
563
564        // In theory this could be inside the loop below maybe?  If we
565        // want to drop the restriction that the missing() members of a
566        // state must never grow, then we'll need to move it inside.
567        let mut now = {
568            let dirmgr = upgrade_weak_ref(&dirmgr)?;
569            let mut changed = false;
570            trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
571            let load_result = load_once(&dirmgr, state, attempt_id, &mut changed);
572            trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
573            if let Err(e) = &load_result {
574                // If the load failed but the error can be blamed on a directory
575                // cache, do so.
576                if let Some(source) = e.responsible_cache() {
577                    dirmgr.note_errors(attempt_id, 1);
578                    note_cache_error(dirmgr.circmgr()?.deref(), source, e);
579                }
580            }
581            propagate_fatal_errors!(load_result);
582            dirmgr.runtime.wallclock()
583        };
584
585        // Apply any netdir changes that the state gives us.
586        // TODO(eta): Consider deprecating state.is_ready().
587        {
588            let dirmgr = upgrade_weak_ref(&dirmgr)?;
589            let mut store = dirmgr.store.lock().expect("store lock poisoned");
590            dirmgr.apply_netdir_changes(state, &mut **store)?;
591            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
592        }
593        // Skip the downloads if we can...
594        if state.can_advance() {
595            advance(state);
596            trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
597            continue 'next_state;
598        }
599        if state.is_ready(Readiness::Complete) {
600            trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
601            return Ok(());
602        }
603
604        let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
605
606        let mut retry = retry_config.schedule();
607        let mut delay = None;
608
609        // Make several attempts to fetch whatever we're missing,
610        // until either we can advance, or we've got a complete
611        // document, or we run out of tries, or we run out of time.
612        'next_attempt: for attempt in retry_config.attempts() {
613            // We wait at the start of this loop, on all attempts but the first.
614            // This ensures that we always wait between attempts, but not after
615            // the final attempt.
616            let next_delay = retry.next_delay(&mut rand::rng());
617            if let Some(delay) = delay.replace(next_delay) {
618                let time_until_reset = {
619                    reset_time
620                        .duration_since(now)
621                        .unwrap_or(Duration::from_secs(0))
622                };
623                let real_delay = delay.min(time_until_reset);
624                debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
625                schedule.sleep(real_delay).await?;
626
627                now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
628                if now >= reset_time {
629                    info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
630                    reset(state);
631                    continue 'next_state;
632                }
633            }
634
635            info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
636            let reset_time = no_more_than_a_week_from(now, state.reset_time());
637
638            now = {
639                let dirmgr = upgrade_weak_ref(&dirmgr)?;
640                futures::select_biased! {
641                    outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
642                        if let Err(e) = outcome {
643                            warn_report!(e, attempt=%attempt_id, "Error while downloading.");
644                            propagate_fatal_errors!(Err(e));
645                            continue 'next_attempt;
646                        } else {
647                            trace!(attempt=%attempt_id, "Successfully downloaded some information.");
648                        }
649                    }
650                    _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
651                        // We need to reset. This can happen if (for
652                        // example) we're downloading the last few
653                        // microdescriptors on a consensus that now
654                        // we're ready to replace.
655                        info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
656                        reset(state);
657                        continue 'next_state;
658                    },
659                };
660                dirmgr.runtime.wallclock()
661            };
662
663            // Apply any netdir changes that the state gives us.
664            // TODO(eta): Consider deprecating state.is_ready().
665            {
666                let dirmgr = upgrade_weak_ref(&dirmgr)?;
667                let mut store = dirmgr.store.lock().expect("store lock poisoned");
668                let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
669                dirmgr.update_progress(attempt_id, state.bootstrap_progress());
670                propagate_fatal_errors!(outcome);
671            }
672
673            // Exit if there is nothing more to download.
674            if state.is_ready(Readiness::Complete) {
675                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
676                return Ok(());
677            }
678
679            // Report usable-ness if appropriate.
680            if on_usable.is_some() && state.is_ready(Readiness::Usable) {
681                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
682                // Unwrap should be safe due to parent `.is_some()` check
683                #[allow(clippy::unwrap_used)]
684                let _ = on_usable.take().unwrap().send(());
685            }
686
687            if state.can_advance() {
688                // We have enough info to advance to another state.
689                advance(state);
690                trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
691                continue 'next_state;
692            }
693        }
694
695        // We didn't advance the state, after all the retries.
696        warn!(n_attempts=retry_config.n_attempts(),
697              state=%state.describe(),
698              "Unable to advance downloading state");
699        return Err(Error::CantAdvanceState);
700    }
701}
702
703/// Replace `state` with `state.reset()`.
704fn reset(state: &mut Box<dyn DirState>) {
705    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
706    *state = cur_state.reset();
707}
708
709/// Replace `state` with `state.advance()`.
710fn advance(state: &mut Box<dyn DirState>) {
711    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
712    *state = cur_state.advance();
713}
714
715/// Helper: Clamp `v` so that it is no more than one week from `now`.
716///
717/// If `v` is absent, return the time that's one week from now.
718///
719/// We use this to determine a reset time when no reset time is
720/// available, or when it is too far in the future.
721fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
722    let one_week_later = now + Duration::new(86400 * 7, 0);
723    match v {
724        Some(t) => std::cmp::min(t, one_week_later),
725        None => one_week_later,
726    }
727}
728
729#[cfg(test)]
730mod test {
731    // @@ begin test lint list maintained by maint/add_warning @@
732    #![allow(clippy::bool_assert_comparison)]
733    #![allow(clippy::clone_on_copy)]
734    #![allow(clippy::dbg_macro)]
735    #![allow(clippy::mixed_attributes_style)]
736    #![allow(clippy::print_stderr)]
737    #![allow(clippy::print_stdout)]
738    #![allow(clippy::single_char_pattern)]
739    #![allow(clippy::unwrap_used)]
740    #![allow(clippy::unchecked_time_subtraction)]
741    #![allow(clippy::useless_vec)]
742    #![allow(clippy::needless_pass_by_value)]
743    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
744    use super::*;
745    use crate::storage::DynStore;
746    use crate::test::new_mgr;
747    use std::sync::Mutex;
748    use tor_dircommon::retry::DownloadSchedule;
749    use tor_netdoc::doc::microdesc::MdDigest;
750    use tor_rtcompat::SleepProvider;
751    use web_time_compat::SystemTimeExt;
752
753    #[test]
754    fn week() {
755        let now = SystemTime::get();
756        let one_day = Duration::new(86400, 0);
757
758        assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
759        assert_eq!(
760            no_more_than_a_week_from(now, Some(now + one_day)),
761            now + one_day
762        );
763        assert_eq!(
764            no_more_than_a_week_from(now, Some(now - one_day)),
765            now - one_day
766        );
767        assert_eq!(
768            no_more_than_a_week_from(now, Some(now + 30 * one_day)),
769            now + one_day * 7
770        );
771    }
772
773    /// A fake implementation of DirState that just wants a fixed set
774    /// of microdescriptors.  It doesn't care if it gets them: it just
775    /// wants to be told that the IDs exist.
776    #[derive(Debug, Clone)]
777    struct DemoState {
778        second_time_around: bool,
779        got_items: HashMap<MdDigest, bool>,
780    }
781
782    // Constants from Lou Reed
783    const H1: MdDigest = *b"satellite's gone up to the skies";
784    const H2: MdDigest = *b"things like that drive me out of";
785    const H3: MdDigest = *b"my mind i watched it for a littl";
786    const H4: MdDigest = *b"while i like to watch things on ";
787    const H5: MdDigest = *b"TV Satellite of love Satellite--";
788
789    impl DemoState {
790        fn new1() -> Self {
791            DemoState {
792                second_time_around: false,
793                got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
794            }
795        }
796        fn new2() -> Self {
797            DemoState {
798                second_time_around: true,
799                got_items: vec![(H3, false), (H4, false), (H5, false)]
800                    .into_iter()
801                    .collect(),
802            }
803        }
804        fn n_ready(&self) -> usize {
805            self.got_items.values().filter(|x| **x).count()
806        }
807    }
808
809    impl DirState for DemoState {
810        fn describe(&self) -> String {
811            format!("{:?}", &self)
812        }
813        fn bootstrap_progress(&self) -> crate::event::DirProgress {
814            crate::event::DirProgress::default()
815        }
816        fn is_ready(&self, ready: Readiness) -> bool {
817            match (ready, self.second_time_around) {
818                (_, false) => false,
819                (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
820                (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
821            }
822        }
823        fn can_advance(&self) -> bool {
824            if self.second_time_around {
825                false
826            } else {
827                self.n_ready() == self.got_items.len()
828            }
829        }
830        fn missing_docs(&self) -> Vec<DocId> {
831            self.got_items
832                .iter()
833                .filter_map(|(id, have)| {
834                    if *have {
835                        None
836                    } else {
837                        Some(DocId::Microdesc(*id))
838                    }
839                })
840                .collect()
841        }
842        fn add_from_cache(
843            &mut self,
844            docs: HashMap<DocId, DocumentText>,
845            changed: &mut bool,
846        ) -> Result<()> {
847            for id in docs.keys() {
848                if let DocId::Microdesc(id) = id {
849                    if self.got_items.get(id) == Some(&false) {
850                        self.got_items.insert(*id, true);
851                        *changed = true;
852                    }
853                }
854            }
855            Ok(())
856        }
857        fn add_from_download(
858            &mut self,
859            text: &str,
860            _request: &ClientRequest,
861            _source: DocSource,
862            _storage: Option<&Mutex<DynStore>>,
863            changed: &mut bool,
864        ) -> Result<()> {
865            for token in text.split_ascii_whitespace() {
866                if let Ok(v) = hex::decode(token) {
867                    if let Ok(id) = v.try_into() {
868                        if self.got_items.get(&id) == Some(&false) {
869                            self.got_items.insert(id, true);
870                            *changed = true;
871                        }
872                    }
873                }
874            }
875            Ok(())
876        }
877        fn dl_config(&self) -> DownloadSchedule {
878            DownloadSchedule::default()
879        }
880        fn advance(self: Box<Self>) -> Box<dyn DirState> {
881            if self.can_advance() {
882                Box::new(Self::new2())
883            } else {
884                self
885            }
886        }
887        fn reset_time(&self) -> Option<SystemTime> {
888            None
889        }
890        fn reset(self: Box<Self>) -> Box<dyn DirState> {
891            Box::new(Self::new1())
892        }
893    }
894
895    #[test]
896    fn all_in_cache() {
897        // Let's try bootstrapping when everything is in the cache.
898        tor_rtcompat::test_with_one_runtime!(|rt| async {
899            let now = rt.wallclock();
900            let (_tempdir, mgr) = new_mgr(rt.clone());
901            let (mut schedule, _handle) = TaskSchedule::new(rt);
902
903            {
904                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
905                for h in [H1, H2, H3, H4, H5] {
906                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
907                }
908            }
909            let mgr = Arc::new(mgr);
910            let attempt_id = AttemptId::next();
911
912            // Try just a load.
913            let state = Box::new(DemoState::new1());
914            let result = super::load(&mgr, state, attempt_id).unwrap();
915            assert!(result.is_ready(Readiness::Complete));
916
917            // Try a bootstrap that could (but won't!) download.
918            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
919
920            let mut on_usable = None;
921            super::download(
922                Arc::downgrade(&mgr),
923                &mut state,
924                &mut schedule,
925                attempt_id,
926                &mut on_usable,
927            )
928            .await
929            .unwrap();
930            assert!(state.is_ready(Readiness::Complete));
931        });
932    }
933
934    #[test]
935    fn partly_in_cache() {
936        // Let's try bootstrapping with all of phase1 and part of
937        // phase 2 in cache.
938        tor_rtcompat::test_with_one_runtime!(|rt| async {
939            let now = rt.wallclock();
940            let (_tempdir, mgr) = new_mgr(rt.clone());
941            let (mut schedule, _handle) = TaskSchedule::new(rt);
942
943            {
944                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
945                for h in [H1, H2, H3] {
946                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
947                }
948            }
949            {
950                let mut resp = CANNED_RESPONSE.lock().unwrap();
951                // H4 and H5.
952                *resp = vec![
953                    "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
954                     545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
955                        .to_owned(),
956                ];
957            }
958            let mgr = Arc::new(mgr);
959            let mut on_usable = None;
960            let attempt_id = AttemptId::next();
961
962            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
963            super::download(
964                Arc::downgrade(&mgr),
965                &mut state,
966                &mut schedule,
967                attempt_id,
968                &mut on_usable,
969            )
970            .await
971            .unwrap();
972            assert!(state.is_ready(Readiness::Complete));
973        });
974    }
975}