1use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::{
7 collections::HashMap,
8 sync::{Arc, Weak},
9 time::{Duration, SystemTime},
10};
11
12use crate::DirMgrConfig;
13use crate::DocSource;
14use crate::err::BootstrapAction;
15use crate::state::{DirState, PoisonedState};
16use crate::{
17 DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
18 docid::{self, ClientRequest},
19 upgrade_weak_ref,
20};
21
22use futures::FutureExt;
23use futures::StreamExt;
24use oneshot_fused_workaround as oneshot;
25use tor_dirclient::DirResponse;
26use tor_error::{info_report, warn_report};
27use tor_rtcompat::Runtime;
28use tor_rtcompat::scheduler::TaskSchedule;
29use tracing::{debug, info, instrument, trace, warn};
30
31use crate::storage::Store;
32#[cfg(test)]
33use std::sync::LazyLock;
34#[cfg(test)]
35use std::sync::Mutex;
36use tor_circmgr::{CircMgr, DirInfo};
37use tor_netdir::{NetDir, NetDirProvider as _};
38use tor_netdoc::doc::netstatus::ConsensusFlavor;
39
40macro_rules! propagate_fatal_errors {
43 ( $e:expr ) => {
44 let v: Result<()> = $e;
45 if let Err(e) = v {
46 match e.bootstrap_action() {
47 BootstrapAction::Nonfatal => {}
48 _ => return Err(e),
49 }
50 }
51 };
52}
53
54#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
61#[display("{0}", id)]
62pub(crate) struct AttemptId {
63 id: NonZeroUsize,
65}
66
67impl AttemptId {
68 pub(crate) fn next() -> Self {
75 use std::sync::atomic::{AtomicUsize, Ordering};
76 static NEXT: AtomicUsize = AtomicUsize::new(1);
78 let id = NEXT.fetch_add(1, Ordering::Relaxed);
79 let id = id.try_into().expect("Allocated too many AttemptIds");
80 Self { id }
81 }
82}
83
84fn note_request_outcome<R: Runtime>(
88 circmgr: &CircMgr<R>,
89 outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
90) {
91 use tor_dirclient::{Error::RequestFailed, RequestFailedError};
92 let (err, source) = match outcome {
98 Ok(req) => {
99 if let (Some(e), Some(source)) = (req.error(), req.source()) {
100 (
101 RequestFailed(RequestFailedError {
102 error: e.clone(),
103 source: Some(source.clone()),
104 }),
105 source,
106 )
107 } else {
108 return;
109 }
110 }
111 Err(
112 error @ RequestFailed(RequestFailedError {
113 source: Some(source),
114 ..
115 }),
116 ) => (error.clone(), source),
117 _ => return,
118 };
119
120 note_cache_error(circmgr, source, &err.into());
121}
122
123fn note_cache_error<R: Runtime>(
125 circmgr: &CircMgr<R>,
126 source: &tor_dirclient::SourceInfo,
127 problem: &Error,
128) {
129 use tor_circmgr::ExternalActivity;
130
131 if !problem.indicates_cache_failure() {
132 return;
133 }
134
135 let real_source = match problem {
141 Error::NetDocError {
142 source: DocSource::DirServer { source: Some(info) },
143 ..
144 } => info,
145 _ => source,
146 };
147
148 info_report!(problem, "Marking {:?} as failed", real_source);
149 circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
150 circmgr.retire_circ(source.unique_circ_id());
151}
152
153fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
155 use tor_circmgr::ExternalActivity;
156
157 trace!("Marking {:?} as successful", source);
158 circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
159}
160
161fn load_and_apply_documents<R: Runtime>(
163 missing: &[DocId],
164 dirmgr: &Arc<DirMgr<R>>,
165 state: &mut Box<dyn DirState>,
166 changed: &mut bool,
167) -> Result<()> {
168 const CHUNK_SIZE: usize = 256;
173 for chunk in missing.chunks(CHUNK_SIZE) {
174 let documents = {
175 let store = dirmgr.store.lock().expect("store lock poisoned");
176 load_documents_from_store(chunk, &**store)?
177 };
178
179 state.add_from_cache(documents, changed)?;
180 }
181
182 Ok(())
183}
184
185fn load_documents_from_store(
188 missing: &[DocId],
189 store: &dyn Store,
190) -> Result<HashMap<DocId, DocumentText>> {
191 let mut loaded = HashMap::new();
192 for query in docid::partition_by_type(missing.iter().copied()).values() {
193 query.load_from_store_into(&mut loaded, store)?;
194 }
195 Ok(loaded)
196}
197
198pub(crate) fn make_consensus_request(
201 now: SystemTime,
202 flavor: ConsensusFlavor,
203 store: &dyn Store,
204 config: &DirMgrConfig,
205) -> Result<ClientRequest> {
206 let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
207
208 let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
209
210 match store.latest_consensus_meta(flavor) {
211 Ok(Some(meta)) => {
212 let valid_after = meta.lifetime().valid_after();
213 request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
214 request.push_old_consensus_digest(*meta.sha3_256_of_signed());
215 }
216 latest => {
217 if let Err(e) = latest {
218 warn_report!(e, "Error loading directory metadata");
219 }
220 request.set_last_consensus_date(default_cutoff);
224 }
225 }
226
227 request.set_skew_limit(
228 config.tolerance.post_valid_tolerance(),
231 config.tolerance.pre_valid_tolerance(),
234 );
235
236 Ok(ClientRequest::Consensus(request))
237}
238
239pub(crate) fn make_requests_for_documents<R: Runtime>(
241 rt: &R,
242 docs: &[DocId],
243 store: &dyn Store,
244 config: &DirMgrConfig,
245) -> Result<Vec<ClientRequest>> {
246 let mut res = Vec::new();
247 for q in docid::partition_by_type(docs.iter().copied())
248 .into_values()
249 .flat_map(|x| x.split_for_download().into_iter())
250 {
251 match q {
252 DocQuery::LatestConsensus { flavor, .. } => {
253 res.push(make_consensus_request(
254 rt.wallclock(),
255 flavor,
256 store,
257 config,
258 )?);
259 }
260 DocQuery::AuthCert(ids) => {
261 res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
262 }
263 DocQuery::Microdesc(ids) => {
264 res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
265 }
266 #[cfg(feature = "routerdesc")]
267 DocQuery::RouterDesc(ids) => {
268 res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
269 }
270 }
271 }
272 Ok(res)
273}
274
275#[instrument(level = "trace", skip_all)]
277async fn fetch_single<R: Runtime>(
278 rt: &R,
279 request: ClientRequest,
280 current_netdir: Option<&NetDir>,
281 circmgr: Arc<CircMgr<R>>,
282) -> Result<(ClientRequest, DirResponse)> {
283 let dirinfo: DirInfo = match current_netdir {
284 Some(netdir) => netdir.into(),
285 None => tor_circmgr::DirInfo::Nothing,
286 };
287 let outcome =
288 tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
289
290 note_request_outcome(&circmgr, &outcome);
291
292 let resource = outcome?;
293 Ok((request, resource))
294}
295
296#[cfg(test)]
302static CANNED_RESPONSE: LazyLock<Mutex<Vec<String>>> = LazyLock::new(|| Mutex::new(vec![]));
303
304#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
310async fn fetch_multiple<R: Runtime>(
311 dirmgr: Arc<DirMgr<R>>,
312 attempt_id: AttemptId,
313 missing: &[DocId],
314 parallelism: usize,
315) -> Result<Vec<(ClientRequest, DirResponse)>> {
316 let requests = {
317 let store = dirmgr.store.lock().expect("store lock poisoned");
318 make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
319 };
320
321 trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
322 requests.len(), missing.len());
323
324 #[cfg(test)]
325 {
326 let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
327 if !m.is_empty() {
328 return Ok(requests
329 .into_iter()
330 .zip(m.iter().map(DirResponse::from_get_body))
331 .collect());
332 }
333 }
334
335 let circmgr = dirmgr.circmgr()?;
336 let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
338
339 let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
342 .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
343 .buffer_unordered(parallelism)
344 .collect()
345 .await;
346
347 let mut useful_responses = Vec::new();
348 for r in responses {
349 match r {
351 Ok((request, response)) => {
352 if response.status_code() == 200 {
353 useful_responses.push((request, response));
354 } else {
355 trace!(
356 "cache declined request; reported status {:?}",
357 response.status_code()
358 );
359 }
360 }
361 Err(e) => warn_report!(e, "error while downloading"),
362 }
363 }
364
365 trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
366
367 Ok(useful_responses)
368}
369
370fn load_once<R: Runtime>(
372 dirmgr: &Arc<DirMgr<R>>,
373 state: &mut Box<dyn DirState>,
374 attempt_id: AttemptId,
375 changed_out: &mut bool,
376) -> Result<()> {
377 let missing = state.missing_docs();
378 let mut changed = false;
379 let outcome: Result<()> = if missing.is_empty() {
380 trace!("Found no missing documents; can't advance current state");
381 Ok(())
382 } else {
383 trace!(
384 "Found {} missing documents; trying to load them",
385 missing.len()
386 );
387
388 load_and_apply_documents(&missing, dirmgr, state, &mut changed)
389 };
390
391 if changed {
395 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
396 *changed_out = true;
397 }
398
399 outcome
400}
401
402#[allow(clippy::cognitive_complexity)] pub(crate) fn load<R: Runtime>(
408 dirmgr: &Arc<DirMgr<R>>,
409 mut state: Box<dyn DirState>,
410 attempt_id: AttemptId,
411) -> Result<Box<dyn DirState>> {
412 let mut safety_counter = 0_usize;
413 loop {
414 trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
415 let mut changed = false;
416 let outcome = load_once(dirmgr, &mut state, attempt_id, &mut changed);
417 {
418 let mut store = dirmgr.store.lock().expect("store lock poisoned");
419 dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
420 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
421 }
422 trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
423
424 if let Err(e) = outcome {
425 match e.bootstrap_action() {
426 BootstrapAction::Nonfatal => {
427 debug!("Recoverable error loading from cache: {}", e);
428 }
429 BootstrapAction::Fatal | BootstrapAction::Reset => {
430 return Err(e);
431 }
432 }
433 }
434
435 if state.can_advance() {
436 state = state.advance();
437 trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
438 safety_counter = 0;
439 } else {
440 if !changed {
441 trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
444 break;
445 }
446 safety_counter += 1;
447 assert!(
448 safety_counter < 100,
449 "Spent 100 iterations in the same state: this is a bug"
450 );
451 }
452 }
453
454 Ok(state)
455}
456
457#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
464async fn download_attempt<R: Runtime>(
465 dirmgr: &Arc<DirMgr<R>>,
466 state: &mut Box<dyn DirState>,
467 parallelism: usize,
468 attempt_id: AttemptId,
469) -> Result<()> {
470 let missing = state.missing_docs();
471 let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
472 let mut n_errors = 0;
473 for (client_req, dir_response) in fetched {
474 let source = dir_response.source().cloned();
475 let text = match String::from_utf8(dir_response.into_output_unchecked())
476 .map_err(Error::BadUtf8FromDirectory)
477 {
478 Ok(t) => t,
479 Err(e) => {
480 if let Some(source) = source {
481 n_errors += 1;
482 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
483 }
484 continue;
485 }
486 };
487 match dirmgr.expand_response_text(&client_req, text) {
488 Ok(text) => {
489 let doc_source = DocSource::DirServer {
490 source: source.clone(),
491 };
492 let mut changed = false;
493 let outcome = state.add_from_download(
494 &text,
495 &client_req,
496 doc_source,
497 Some(&dirmgr.store),
498 &mut changed,
499 );
500
501 if !changed {
502 debug_assert!(outcome.is_err());
503 }
504
505 if let Some(source) = source {
506 if let Err(e) = &outcome {
507 n_errors += 1;
508 note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
509 } else {
510 note_cache_success(dirmgr.circmgr()?.deref(), &source);
511 }
512 }
513
514 if let Err(e) = &outcome {
515 dirmgr.note_errors(attempt_id, 1);
516 warn_report!(e, "error while adding directory info");
517 }
518 propagate_fatal_errors!(outcome);
519 }
520 Err(e) => {
521 warn_report!(e, "Error when expanding directory text");
522 if let Some(source) = source {
523 n_errors += 1;
524 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
525 }
526 propagate_fatal_errors!(Err(e));
527 }
528 }
529 }
530 if n_errors != 0 {
531 dirmgr.note_errors(attempt_id, n_errors);
532 }
533 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
534
535 Ok(())
536}
537
538#[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
549pub(crate) async fn download<R: Runtime>(
550 dirmgr: Weak<DirMgr<R>>,
551 state: &mut Box<dyn DirState>,
552 schedule: &mut TaskSchedule<R>,
553 attempt_id: AttemptId,
554 on_usable: &mut Option<oneshot::Sender<()>>,
555) -> Result<()> {
556 let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
557
558 trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
559
560 'next_state: loop {
561 let retry_config = state.dl_config();
562 let parallelism = retry_config.parallelism();
563
564 let mut now = {
568 let dirmgr = upgrade_weak_ref(&dirmgr)?;
569 let mut changed = false;
570 trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
571 let load_result = load_once(&dirmgr, state, attempt_id, &mut changed);
572 trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
573 if let Err(e) = &load_result {
574 if let Some(source) = e.responsible_cache() {
577 dirmgr.note_errors(attempt_id, 1);
578 note_cache_error(dirmgr.circmgr()?.deref(), source, e);
579 }
580 }
581 propagate_fatal_errors!(load_result);
582 dirmgr.runtime.wallclock()
583 };
584
585 {
588 let dirmgr = upgrade_weak_ref(&dirmgr)?;
589 let mut store = dirmgr.store.lock().expect("store lock poisoned");
590 dirmgr.apply_netdir_changes(state, &mut **store)?;
591 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
592 }
593 if state.can_advance() {
595 advance(state);
596 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
597 continue 'next_state;
598 }
599 if state.is_ready(Readiness::Complete) {
600 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
601 return Ok(());
602 }
603
604 let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
605
606 let mut retry = retry_config.schedule();
607 let mut delay = None;
608
609 'next_attempt: for attempt in retry_config.attempts() {
613 let next_delay = retry.next_delay(&mut rand::rng());
617 if let Some(delay) = delay.replace(next_delay) {
618 let time_until_reset = {
619 reset_time
620 .duration_since(now)
621 .unwrap_or(Duration::from_secs(0))
622 };
623 let real_delay = delay.min(time_until_reset);
624 debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
625 schedule.sleep(real_delay).await?;
626
627 now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
628 if now >= reset_time {
629 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
630 reset(state);
631 continue 'next_state;
632 }
633 }
634
635 info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
636 let reset_time = no_more_than_a_week_from(now, state.reset_time());
637
638 now = {
639 let dirmgr = upgrade_weak_ref(&dirmgr)?;
640 futures::select_biased! {
641 outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
642 if let Err(e) = outcome {
643 warn_report!(e, attempt=%attempt_id, "Error while downloading.");
644 propagate_fatal_errors!(Err(e));
645 continue 'next_attempt;
646 } else {
647 trace!(attempt=%attempt_id, "Successfully downloaded some information.");
648 }
649 }
650 _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
651 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
656 reset(state);
657 continue 'next_state;
658 },
659 };
660 dirmgr.runtime.wallclock()
661 };
662
663 {
666 let dirmgr = upgrade_weak_ref(&dirmgr)?;
667 let mut store = dirmgr.store.lock().expect("store lock poisoned");
668 let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
669 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
670 propagate_fatal_errors!(outcome);
671 }
672
673 if state.is_ready(Readiness::Complete) {
675 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
676 return Ok(());
677 }
678
679 if on_usable.is_some() && state.is_ready(Readiness::Usable) {
681 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
682 #[allow(clippy::unwrap_used)]
684 let _ = on_usable.take().unwrap().send(());
685 }
686
687 if state.can_advance() {
688 advance(state);
690 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
691 continue 'next_state;
692 }
693 }
694
695 warn!(n_attempts=retry_config.n_attempts(),
697 state=%state.describe(),
698 "Unable to advance downloading state");
699 return Err(Error::CantAdvanceState);
700 }
701}
702
703fn reset(state: &mut Box<dyn DirState>) {
705 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
706 *state = cur_state.reset();
707}
708
709fn advance(state: &mut Box<dyn DirState>) {
711 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
712 *state = cur_state.advance();
713}
714
715fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
722 let one_week_later = now + Duration::new(86400 * 7, 0);
723 match v {
724 Some(t) => std::cmp::min(t, one_week_later),
725 None => one_week_later,
726 }
727}
728
729#[cfg(test)]
730mod test {
731 #![allow(clippy::bool_assert_comparison)]
733 #![allow(clippy::clone_on_copy)]
734 #![allow(clippy::dbg_macro)]
735 #![allow(clippy::mixed_attributes_style)]
736 #![allow(clippy::print_stderr)]
737 #![allow(clippy::print_stdout)]
738 #![allow(clippy::single_char_pattern)]
739 #![allow(clippy::unwrap_used)]
740 #![allow(clippy::unchecked_time_subtraction)]
741 #![allow(clippy::useless_vec)]
742 #![allow(clippy::needless_pass_by_value)]
743 use super::*;
745 use crate::storage::DynStore;
746 use crate::test::new_mgr;
747 use std::sync::Mutex;
748 use tor_dircommon::retry::DownloadSchedule;
749 use tor_netdoc::doc::microdesc::MdDigest;
750 use tor_rtcompat::SleepProvider;
751 use web_time_compat::SystemTimeExt;
752
753 #[test]
754 fn week() {
755 let now = SystemTime::get();
756 let one_day = Duration::new(86400, 0);
757
758 assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
759 assert_eq!(
760 no_more_than_a_week_from(now, Some(now + one_day)),
761 now + one_day
762 );
763 assert_eq!(
764 no_more_than_a_week_from(now, Some(now - one_day)),
765 now - one_day
766 );
767 assert_eq!(
768 no_more_than_a_week_from(now, Some(now + 30 * one_day)),
769 now + one_day * 7
770 );
771 }
772
773 #[derive(Debug, Clone)]
777 struct DemoState {
778 second_time_around: bool,
779 got_items: HashMap<MdDigest, bool>,
780 }
781
782 const H1: MdDigest = *b"satellite's gone up to the skies";
784 const H2: MdDigest = *b"things like that drive me out of";
785 const H3: MdDigest = *b"my mind i watched it for a littl";
786 const H4: MdDigest = *b"while i like to watch things on ";
787 const H5: MdDigest = *b"TV Satellite of love Satellite--";
788
789 impl DemoState {
790 fn new1() -> Self {
791 DemoState {
792 second_time_around: false,
793 got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
794 }
795 }
796 fn new2() -> Self {
797 DemoState {
798 second_time_around: true,
799 got_items: vec![(H3, false), (H4, false), (H5, false)]
800 .into_iter()
801 .collect(),
802 }
803 }
804 fn n_ready(&self) -> usize {
805 self.got_items.values().filter(|x| **x).count()
806 }
807 }
808
809 impl DirState for DemoState {
810 fn describe(&self) -> String {
811 format!("{:?}", &self)
812 }
813 fn bootstrap_progress(&self) -> crate::event::DirProgress {
814 crate::event::DirProgress::default()
815 }
816 fn is_ready(&self, ready: Readiness) -> bool {
817 match (ready, self.second_time_around) {
818 (_, false) => false,
819 (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
820 (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
821 }
822 }
823 fn can_advance(&self) -> bool {
824 if self.second_time_around {
825 false
826 } else {
827 self.n_ready() == self.got_items.len()
828 }
829 }
830 fn missing_docs(&self) -> Vec<DocId> {
831 self.got_items
832 .iter()
833 .filter_map(|(id, have)| {
834 if *have {
835 None
836 } else {
837 Some(DocId::Microdesc(*id))
838 }
839 })
840 .collect()
841 }
842 fn add_from_cache(
843 &mut self,
844 docs: HashMap<DocId, DocumentText>,
845 changed: &mut bool,
846 ) -> Result<()> {
847 for id in docs.keys() {
848 if let DocId::Microdesc(id) = id {
849 if self.got_items.get(id) == Some(&false) {
850 self.got_items.insert(*id, true);
851 *changed = true;
852 }
853 }
854 }
855 Ok(())
856 }
857 fn add_from_download(
858 &mut self,
859 text: &str,
860 _request: &ClientRequest,
861 _source: DocSource,
862 _storage: Option<&Mutex<DynStore>>,
863 changed: &mut bool,
864 ) -> Result<()> {
865 for token in text.split_ascii_whitespace() {
866 if let Ok(v) = hex::decode(token) {
867 if let Ok(id) = v.try_into() {
868 if self.got_items.get(&id) == Some(&false) {
869 self.got_items.insert(id, true);
870 *changed = true;
871 }
872 }
873 }
874 }
875 Ok(())
876 }
877 fn dl_config(&self) -> DownloadSchedule {
878 DownloadSchedule::default()
879 }
880 fn advance(self: Box<Self>) -> Box<dyn DirState> {
881 if self.can_advance() {
882 Box::new(Self::new2())
883 } else {
884 self
885 }
886 }
887 fn reset_time(&self) -> Option<SystemTime> {
888 None
889 }
890 fn reset(self: Box<Self>) -> Box<dyn DirState> {
891 Box::new(Self::new1())
892 }
893 }
894
895 #[test]
896 fn all_in_cache() {
897 tor_rtcompat::test_with_one_runtime!(|rt| async {
899 let now = rt.wallclock();
900 let (_tempdir, mgr) = new_mgr(rt.clone());
901 let (mut schedule, _handle) = TaskSchedule::new(rt);
902
903 {
904 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
905 for h in [H1, H2, H3, H4, H5] {
906 store.store_microdescs(&[("ignore", &h)], now).unwrap();
907 }
908 }
909 let mgr = Arc::new(mgr);
910 let attempt_id = AttemptId::next();
911
912 let state = Box::new(DemoState::new1());
914 let result = super::load(&mgr, state, attempt_id).unwrap();
915 assert!(result.is_ready(Readiness::Complete));
916
917 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
919
920 let mut on_usable = None;
921 super::download(
922 Arc::downgrade(&mgr),
923 &mut state,
924 &mut schedule,
925 attempt_id,
926 &mut on_usable,
927 )
928 .await
929 .unwrap();
930 assert!(state.is_ready(Readiness::Complete));
931 });
932 }
933
934 #[test]
935 fn partly_in_cache() {
936 tor_rtcompat::test_with_one_runtime!(|rt| async {
939 let now = rt.wallclock();
940 let (_tempdir, mgr) = new_mgr(rt.clone());
941 let (mut schedule, _handle) = TaskSchedule::new(rt);
942
943 {
944 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
945 for h in [H1, H2, H3] {
946 store.store_microdescs(&[("ignore", &h)], now).unwrap();
947 }
948 }
949 {
950 let mut resp = CANNED_RESPONSE.lock().unwrap();
951 *resp = vec![
953 "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
954 545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
955 .to_owned(),
956 ];
957 }
958 let mgr = Arc::new(mgr);
959 let mut on_usable = None;
960 let attempt_id = AttemptId::next();
961
962 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
963 super::download(
964 Arc::downgrade(&mgr),
965 &mut state,
966 &mut schedule,
967 attempt_id,
968 &mut on_usable,
969 )
970 .await
971 .unwrap();
972 assert!(state.is_ready(Readiness::Complete));
973 });
974 }
975}