Skip to main content

tor_guardmgr/
daemon.rs

1//! Implement background tasks used by guard managers.
2//!
3//! These background tasks keep a weak reference to the [`GuardMgrInner`]
4//! and use that to notice when they should shut down.
5
6use crate::GuardMgrInner;
7use crate::pending::{GuardStatus, RequestId};
8
9use futures::{channel::mpsc, stream::StreamExt};
10#[cfg(test)]
11use oneshot_fused_workaround as oneshot;
12use tor_proto::ClockSkew;
13use tracing::instrument;
14
15use std::sync::{Mutex, Weak};
16
17/// A message sent by to the [`report_status_events()`] task.
18#[derive(Debug)]
19pub(crate) enum Msg {
20    /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
21    /// report the status of an attempt to use a guard.
22    Status(RequestId, GuardStatus, Option<ClockSkew>),
23    /// Tells the task to reply on the provided oneshot::Sender once
24    /// it has seen this message.  Used to indicate that the message
25    /// queue is flushed.
26    #[cfg(test)]
27    Ping(oneshot::Sender<()>),
28}
29
30/// Background task: wait for messages about guard statuses, and
31/// tell a guard manager about them.  Runs indefinitely.
32///
33/// Takes the [`GuardMgrInner`] by weak reference; if the guard
34/// manager goes away, then this task exits.
35///
36/// Requires a `mpsc::Receiver` that is used to tell the task about
37/// new status events to wait for.
38#[instrument(skip_all, level = "trace")]
39pub(crate) async fn report_status_events(
40    runtime: impl tor_rtcompat::SleepProvider,
41    inner: Weak<Mutex<GuardMgrInner>>,
42    mut events: mpsc::UnboundedReceiver<Msg>,
43) {
44    loop {
45        match events.next().await {
46            Some(Msg::Status(id, status, skew)) => {
47                // We've got a report about a guard status.
48                if let Some(inner) = inner.upgrade() {
49                    let mut inner = inner.lock().expect("Poisoned lock");
50                    inner.handle_msg(id, status, skew, &runtime);
51                } else {
52                    // The guard manager has gone away.
53                    return;
54                }
55            }
56            #[cfg(test)]
57            Some(Msg::Ping(sender)) => {
58                let _ignore = sender.send(());
59            }
60            // The streams have all closed.  (I think this is impossible?)
61            None => return,
62        }
63        // TODO: Is this task guaranteed to exit?
64    }
65}
66
67/// Background task to run periodic events on the guard manager.
68///
69/// The only role of this task is to invoke
70/// [`GuardMgrInner::run_periodic_events`] from time to time, so that
71/// it can perform housekeeping tasks.
72///
73/// Takes the [`GuardMgrInner`] by weak reference; if the guard
74/// manager goes away, then this task exits.
75#[instrument(skip_all, level = "trace")]
76pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
77    runtime: R,
78    inner: Weak<Mutex<GuardMgrInner>>,
79) {
80    loop {
81        let delay = if let Some(inner) = inner.upgrade() {
82            let mut inner = inner.lock().expect("Poisoned lock");
83            let wallclock = runtime.wallclock();
84            let now = runtime.now();
85            inner.run_periodic_events(wallclock, now)
86        } else {
87            // The guard manager has gone away.
88            return;
89        };
90        runtime.sleep(delay).await;
91    }
92}
93
94/// Background task to keep a guard manager up-to-date with a given network
95/// directory provider.
96#[instrument(skip_all, level = "trace")]
97pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
98    runtime: RT,
99    inner: Weak<Mutex<GuardMgrInner>>,
100    netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
101) {
102    use tor_netdir::DirEvent;
103
104    let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
105        Some(s) => s,
106        None => return,
107    };
108
109    while let Some(event) = event_stream.next().await {
110        match event {
111            DirEvent::NewConsensus | DirEvent::NewDescriptors => {
112                if let Some(inner) = inner.upgrade() {
113                    let mut inner = inner.lock().expect("Poisoned lock");
114                    inner.update(runtime.wallclock(), runtime.now());
115                } else {
116                    return;
117                }
118            }
119            _ => {}
120        }
121    }
122}
123
124/// Background task to keep a guard manager up-to-date with a given bridge
125/// descriptor provider.
126#[cfg(feature = "bridge-client")]
127#[instrument(level = "trace", skip_all)]
128pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
129    runtime: RT,
130    inner: Weak<Mutex<GuardMgrInner>>,
131    bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
132) {
133    use crate::bridge::BridgeDescEvent as E;
134    let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
135        Some(s) => s,
136        None => return,
137    };
138
139    while let Some(event) = event_stream.next().await {
140        match event {
141            E::SomethingChanged => {
142                if let Some(inner) = inner.upgrade() {
143                    let mut inner = inner.lock().expect("Poisoned lock");
144                    inner.update(runtime.wallclock(), runtime.now());
145                } else {
146                    return;
147                }
148            }
149        }
150    }
151}