1use crate::GuardMgrInner;
7use crate::pending::{GuardStatus, RequestId};
8
9use futures::{channel::mpsc, stream::StreamExt};
10#[cfg(test)]
11use oneshot_fused_workaround as oneshot;
12use tor_proto::ClockSkew;
13use tracing::instrument;
14
15use std::sync::{Mutex, Weak};
16
17#[derive(Debug)]
19pub(crate) enum Msg {
20 Status(RequestId, GuardStatus, Option<ClockSkew>),
23 #[cfg(test)]
27 Ping(oneshot::Sender<()>),
28}
29
30#[instrument(skip_all, level = "trace")]
39pub(crate) async fn report_status_events(
40 runtime: impl tor_rtcompat::SleepProvider,
41 inner: Weak<Mutex<GuardMgrInner>>,
42 mut events: mpsc::UnboundedReceiver<Msg>,
43) {
44 loop {
45 match events.next().await {
46 Some(Msg::Status(id, status, skew)) => {
47 if let Some(inner) = inner.upgrade() {
49 let mut inner = inner.lock().expect("Poisoned lock");
50 inner.handle_msg(id, status, skew, &runtime);
51 } else {
52 return;
54 }
55 }
56 #[cfg(test)]
57 Some(Msg::Ping(sender)) => {
58 let _ignore = sender.send(());
59 }
60 None => return,
62 }
63 }
65}
66
67#[instrument(skip_all, level = "trace")]
76pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
77 runtime: R,
78 inner: Weak<Mutex<GuardMgrInner>>,
79) {
80 loop {
81 let delay = if let Some(inner) = inner.upgrade() {
82 let mut inner = inner.lock().expect("Poisoned lock");
83 let wallclock = runtime.wallclock();
84 let now = runtime.now();
85 inner.run_periodic_events(wallclock, now)
86 } else {
87 return;
89 };
90 runtime.sleep(delay).await;
91 }
92}
93
94#[instrument(skip_all, level = "trace")]
97pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
98 runtime: RT,
99 inner: Weak<Mutex<GuardMgrInner>>,
100 netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
101) {
102 use tor_netdir::DirEvent;
103
104 let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
105 Some(s) => s,
106 None => return,
107 };
108
109 while let Some(event) = event_stream.next().await {
110 match event {
111 DirEvent::NewConsensus | DirEvent::NewDescriptors => {
112 if let Some(inner) = inner.upgrade() {
113 let mut inner = inner.lock().expect("Poisoned lock");
114 inner.update(runtime.wallclock(), runtime.now());
115 } else {
116 return;
117 }
118 }
119 _ => {}
120 }
121 }
122}
123
124#[cfg(feature = "bridge-client")]
127#[instrument(level = "trace", skip_all)]
128pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
129 runtime: RT,
130 inner: Weak<Mutex<GuardMgrInner>>,
131 bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
132) {
133 use crate::bridge::BridgeDescEvent as E;
134 let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
135 Some(s) => s,
136 None => return,
137 };
138
139 while let Some(event) = event_stream.next().await {
140 match event {
141 E::SomethingChanged => {
142 if let Some(inner) = inner.upgrade() {
143 let mut inner = inner.lock().expect("Poisoned lock");
144 inner.update(runtime.wallclock(), runtime.now());
145 } else {
146 return;
147 }
148 }
149 }
150 }
151}