tor_chanmgr/event.rs
1//! Code for exporting events from the channel manager.
2#![allow(dead_code, unreachable_pub)]
3
4use educe::Educe;
5use futures::{Stream, StreamExt};
6use postage::watch;
7use std::fmt;
8use tor_basic_utils::skip_fmt;
9use web_time_compat::{Duration, Instant, InstantExt};
10
11/// The status of our connection to the internet.
12#[derive(Default, Debug, Clone)]
13pub struct ConnStatus {
14 /// Have we been able to make TCP connections?
15 ///
16 /// True if we've been able to make outgoing connections recently.
17 /// False if we've definitely been failing.
18 /// None if we haven't succeeded yet, but it's too early to say if
19 /// that's a problem.
20 online: Option<bool>,
21
22 /// Have we ever been able to make TLS handshakes and negotiate
23 /// certificates, _not including timeliness checking_?
24 ///
25 /// True if we've been able to make TLS handshakes and talk to Tor relays we
26 /// like recently. False if we've definitely been failing. None if we
27 /// haven't succeeded yet, but it's too early to say if that's a problem.
28 auth_works: Option<bool>,
29
30 /// Have we been able to successfully negotiate full Tor handshakes?
31 ///
32 /// True if we've been able to make Tor handshakes recently.
33 /// False if we've definitely been failing.
34 /// None if we haven't succeeded yet, but it's too early to say if
35 /// that's a problem.
36 handshake_works: Option<bool>,
37}
38
39/// A problem detected while connecting to the Tor network.
40#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
41#[non_exhaustive]
42pub enum ConnBlockage {
43 #[display("unable to connect to the internet")]
44 /// We haven't been able to make successful TCP connections.
45 NoTcp,
46 /// We've made TCP connections, but our TLS connections either failed, or
47 /// got hit by an attempted man-in-the-middle attack.
48 #[display("our internet connection seems to be filtered")]
49 NoHandshake,
50 /// We've made TCP connections, and our TLS connections mostly succeeded,
51 /// but we encountered failures that are well explained by clock skew,
52 /// or expired certificates.
53 #[display("relays all seem to be using expired certificates")]
54 CertsExpired,
55}
56
57impl ConnStatus {
58 /// Return true if this status is equal to `other`.
59 ///
60 /// Note:(This would just be a PartialEq implementation, but I'm not sure I
61 /// want to expose that PartialEq for this struct.)
62 fn eq(&self, other: &ConnStatus) -> bool {
63 self.online == other.online && self.handshake_works == other.handshake_works
64 }
65
66 /// Return true if this status indicates that we can successfully open Tor channels.
67 pub fn usable(&self) -> bool {
68 self.online == Some(true) && self.handshake_works == Some(true)
69 }
70
71 /// Return a float representing "how bootstrapped" we are with respect to
72 /// connecting to the Tor network, where 0 is "not at all" and 1 is
73 /// "successful".
74 ///
75 /// Callers _should not_ depend on the specific meaning of any particular
76 /// fraction; we may change these fractions in the future.
77 pub fn frac(&self) -> f32 {
78 match self {
79 Self {
80 online: Some(true),
81 auth_works: Some(true),
82 handshake_works: Some(true),
83 } => 1.0,
84 Self {
85 online: Some(true), ..
86 } => 0.5,
87 _ => 0.0,
88 }
89 }
90
91 /// Return the cause of why we aren't able to connect to the Tor network,
92 /// if we think we're stuck.
93 pub fn blockage(&self) -> Option<ConnBlockage> {
94 match self {
95 Self {
96 online: Some(false),
97 ..
98 } => Some(ConnBlockage::NoTcp),
99 Self {
100 auth_works: Some(false),
101 ..
102 } => Some(ConnBlockage::NoHandshake),
103 Self {
104 handshake_works: Some(false),
105 ..
106 } => Some(ConnBlockage::CertsExpired),
107 _ => None,
108 }
109 }
110}
111
112impl fmt::Display for ConnStatus {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
116 ConnStatus {
117 online: Some(false),
118 ..
119 } => write!(f, "unable to connect to the internet"),
120 ConnStatus {
121 handshake_works: None,
122 ..
123 } => write!(f, "handshaking with Tor relays"),
124 ConnStatus {
125 auth_works: Some(true),
126 handshake_works: Some(false),
127 ..
128 } => write!(
129 f,
130 "unable to handshake with Tor relays, possibly due to clock skew"
131 ),
132 ConnStatus {
133 handshake_works: Some(false),
134 ..
135 } => write!(f, "unable to handshake with Tor relays"),
136 ConnStatus {
137 online: Some(true),
138 handshake_works: Some(true),
139 ..
140 } => write!(f, "connecting successfully"),
141 }
142 }
143}
144
145/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
146///
147/// This stream is lossy; a reader might not see some events on the stream, if
148/// they are produced faster than the reader can consume. In that case, the
149/// reader will see more recent updates, and miss older ones.
150///
151/// Note that the bootstrap status is not monotonic: we might become less
152/// bootstrapped than we were before. (For example, the internet could go
153/// down.)
154#[derive(Clone, Educe)]
155#[educe(Debug)]
156pub struct ConnStatusEvents {
157 /// The receiver that implements this stream.
158 ///
159 /// (We wrap it in a new type here so that we can replace the implementation
160 /// later on if we need to.)
161 #[educe(Debug(method = "skip_fmt"))]
162 inner: watch::Receiver<ConnStatus>,
163}
164
165impl Stream for ConnStatusEvents {
166 type Item = ConnStatus;
167 fn poll_next(
168 mut self: std::pin::Pin<&mut Self>,
169 cx: &mut std::task::Context<'_>,
170 ) -> std::task::Poll<Option<Self::Item>> {
171 self.inner.poll_next_unpin(cx)
172 }
173}
174
175/// Crate-internal view of "how connected are we to the internet?"
176///
177/// This is a more complex and costly structure than ConnStatus, so we track
178/// this here, and only expose the minimum via ConnStatus over a
179/// `postage::watch`. Later, we might want to expose more of this information.
180//
181// TODO: Eventually we should add some ability to reset our bootstrap status, if
182// our connections start failing.
183#[derive(Debug, Clone)]
184struct ChanMgrStatus {
185 /// When did we first get initialized?
186 startup: Instant,
187
188 /// Since we started, how many channels have we tried to build?
189 n_attempts: usize,
190
191 /// When (if ever) have we made a TCP connection to (what we hoped was) a
192 /// Tor relay?
193 ///
194 /// If we don't reach this point, we're probably not on the internet.
195 ///
196 /// If we get no further than this, we're probably having our TCP
197 /// connections captured or replaced.
198 last_tcp_success: Option<Instant>,
199
200 /// When (if ever) have we successfully finished a TLS handshake to (what we
201 /// hoped was) a Tor relay?
202 ///
203 /// If we get no further than this, we might be facing a TLS MITM attack.
204 //
205 // TODO: We don't actually use this information yet: our output doesn't
206 // distinguish filtering where TLS succeeds but gets MITM'd from filtering
207 // where TLS fails.
208 last_tls_success: Option<Instant>,
209
210 /// When (if ever) have we ever finished the inner Tor handshake with a relay,
211 /// up to the point where we check for certificate timeliness?
212 last_chan_auth_success: Option<Instant>,
213
214 /// When (if ever) have we successfully finished the inner Tor handshake
215 /// with a relay?
216 ///
217 /// If we get to this point, we can successfully talk to something that
218 /// holds the private key that it's supposed to.
219 last_chan_success: Option<Instant>,
220}
221
222impl ChanMgrStatus {
223 /// Construct a new ChanMgr status.
224 ///
225 /// It will be built as having been initialized at the time `now`.
226 fn new_at(now: Instant) -> ChanMgrStatus {
227 ChanMgrStatus {
228 startup: now,
229 n_attempts: 0,
230 last_tcp_success: None,
231 last_tls_success: None,
232 last_chan_auth_success: None,
233 last_chan_success: None,
234 }
235 }
236
237 /// Return a [`ConnStatus`] for the current state, at time `now`.
238 ///
239 /// (The time is necessary because a lack of success doesn't indicate a
240 /// problem until enough time has passed.)
241 fn conn_status_at(&self, now: Instant) -> ConnStatus {
242 /// How long do we need to be online before we'll acknowledge failure?
243 const MIN_DURATION: Duration = Duration::from_secs(60);
244 /// How many attempts do we need to launch before we'll acknowledge failure?
245 const MIN_ATTEMPTS: usize = 6;
246
247 // If set, it's too early to determine failure.
248 let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
249
250 let online = match (self.last_tcp_success.is_some(), early) {
251 (true, _) => Some(true),
252 (_, true) => None,
253 (false, false) => Some(false),
254 };
255
256 let auth_works = match (self.last_chan_auth_success.is_some(), early) {
257 (true, _) => Some(true),
258 (_, true) => None,
259 (false, false) => Some(false),
260 };
261
262 let handshake_works = match (self.last_chan_success.is_some(), early) {
263 (true, _) => Some(true),
264 (_, true) => None,
265 (false, false) => Some(false),
266 };
267
268 ConnStatus {
269 online,
270 auth_works,
271 handshake_works,
272 }
273 }
274
275 /// Note that an attempt to connect has been started.
276 fn record_attempt(&mut self) {
277 self.n_attempts += 1;
278 }
279
280 /// Note that we've successfully done a TCP handshake with an alleged relay.
281 fn record_tcp_success(&mut self, now: Instant) {
282 self.last_tcp_success = Some(now);
283 }
284
285 /// Note that we've completed a TLS handshake with an alleged relay.
286 ///
287 /// (Its identity won't be verified till the next step.)
288 fn record_tls_finished(&mut self, now: Instant) {
289 self.last_tls_success = Some(now);
290 }
291
292 /// Note that we've completed a Tor handshake with a relay, _but failed to
293 /// verify the certificates in a way that could indicate clock skew_.
294 fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
295 self.last_chan_auth_success = Some(now);
296 }
297
298 /// Note that we've completed a Tor handshake with a relay.
299 ///
300 /// (This includes performing the TLS handshake, and verifying that the
301 /// relay was indeed the one that we wanted to reach.)
302 fn record_handshake_done(&mut self, now: Instant) {
303 self.last_chan_auth_success = Some(now);
304 self.last_chan_success = Some(now);
305 }
306}
307
308/// Object that manages information about a `ChanMgr`'s status, and sends
309/// information about connectivity changes over an asynchronous channel
310pub(crate) struct ChanMgrEventSender {
311 /// The last ConnStatus that we sent over the channel.
312 last_conn_status: ConnStatus,
313 /// The unsummarized status information from the ChanMgr.
314 mgr_status: ChanMgrStatus,
315 /// The channel that we use for sending ConnStatus information.
316 sender: watch::Sender<ConnStatus>,
317}
318
319impl ChanMgrEventSender {
320 /// If the status has changed as of `now`, tell any listeners.
321 ///
322 /// (This takes a time because we need to know how much time has elapsed
323 /// without successful attempts.)
324 ///
325 /// # Limitations
326 ///
327 /// We are dependent on calls to `record_attempt()` and similar methods to
328 /// actually invoke this function; if they were never called, we'd never
329 /// notice that we had gone too long without building connections. That's
330 /// okay for now, though, since any Tor client will immediately start
331 /// building circuits, which will launch connection attempts until one
332 /// succeeds or the client gives up entirely.
333 fn push_at(&mut self, now: Instant) {
334 let status = self.mgr_status.conn_status_at(now);
335 if !status.eq(&self.last_conn_status) {
336 self.last_conn_status = status.clone();
337 let mut b = self.sender.borrow_mut();
338 *b = status;
339 }
340 }
341
342 /// Note that an attempt to connect has been started.
343 pub(crate) fn record_attempt(&mut self) {
344 self.mgr_status.record_attempt();
345 self.push_at(Instant::get());
346 }
347
348 /// Note that we've successfully done a TCP handshake with an alleged relay.
349 pub(crate) fn record_tcp_success(&mut self) {
350 let now = Instant::get();
351 self.mgr_status.record_tcp_success(now);
352 self.push_at(now);
353 }
354
355 /// Note that we've completed a TLS handshake with an alleged relay.
356 ///
357 /// (Its identity won't be verified till the next step.)
358 pub(crate) fn record_tls_finished(&mut self) {
359 let now = Instant::get();
360 self.mgr_status.record_tls_finished(now);
361 self.push_at(now);
362 }
363
364 /// Record that a handshake has succeeded _except for the certificate
365 /// timeliness check, which may indicate a skewed clock.
366 pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
367 let now = Instant::get();
368 self.mgr_status.record_handshake_done_with_skewed_clock(now);
369 self.push_at(now);
370 }
371
372 /// Note that we've completed a Tor handshake with a relay.
373 ///
374 /// (This includes performing the TLS handshake, and verifying that the
375 /// relay was indeed the one that we wanted to reach.)
376 pub(crate) fn record_handshake_done(&mut self) {
377 let now = Instant::get();
378 self.mgr_status.record_handshake_done(now);
379 self.push_at(now);
380 }
381}
382
383/// Create a new channel for sending connectivity status events to other crates.
384pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
385 let (sender, receiver) = watch::channel();
386 let receiver = ConnStatusEvents { inner: receiver };
387 let sender = ChanMgrEventSender {
388 last_conn_status: ConnStatus::default(),
389 mgr_status: ChanMgrStatus::new_at(Instant::get()),
390 sender,
391 };
392 (sender, receiver)
393}
394
395#[cfg(test)]
396#[allow(clippy::cognitive_complexity)]
397mod test {
398 // @@ begin test lint list maintained by maint/add_warning @@
399 #![allow(clippy::bool_assert_comparison)]
400 #![allow(clippy::clone_on_copy)]
401 #![allow(clippy::dbg_macro)]
402 #![allow(clippy::mixed_attributes_style)]
403 #![allow(clippy::print_stderr)]
404 #![allow(clippy::print_stdout)]
405 #![allow(clippy::single_char_pattern)]
406 #![allow(clippy::unwrap_used)]
407 #![allow(clippy::unchecked_time_subtraction)]
408 #![allow(clippy::useless_vec)]
409 #![allow(clippy::needless_pass_by_value)]
410 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
411 use super::*;
412 use float_eq::assert_float_eq;
413
414 /// Tolerance for float comparison.
415 const TOL: f32 = 0.00001;
416
417 #[test]
418 fn status_basics() {
419 let s1 = ConnStatus::default();
420 assert_eq!(s1.to_string(), "connecting to the internet");
421 assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
422 assert!(s1.eq(&s1));
423 assert!(s1.blockage().is_none());
424 assert!(!s1.usable());
425
426 let s2 = ConnStatus {
427 online: Some(false),
428 auth_works: None,
429 handshake_works: None,
430 };
431 assert_eq!(s2.to_string(), "unable to connect to the internet");
432 assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
433 assert!(s2.eq(&s2));
434 assert!(!s2.eq(&s1));
435 assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
436 assert_eq!(
437 s2.blockage().unwrap().to_string(),
438 "unable to connect to the internet"
439 );
440 assert!(!s2.usable());
441
442 let s3 = ConnStatus {
443 online: Some(true),
444 auth_works: None,
445 handshake_works: None,
446 };
447 assert_eq!(s3.to_string(), "handshaking with Tor relays");
448 assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
449 assert_eq!(s3.blockage(), None);
450 assert!(!s3.eq(&s1));
451 assert!(!s3.usable());
452
453 let s4 = ConnStatus {
454 online: Some(true),
455 auth_works: Some(false),
456 handshake_works: Some(false),
457 };
458 assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
459 assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
460 assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
461 assert_eq!(
462 s4.blockage().unwrap().to_string(),
463 "our internet connection seems to be filtered"
464 );
465 assert!(!s4.eq(&s1));
466 assert!(!s4.eq(&s2));
467 assert!(!s4.eq(&s3));
468 assert!(s4.eq(&s4));
469 assert!(!s4.usable());
470
471 let s5 = ConnStatus {
472 online: Some(true),
473 auth_works: Some(true),
474 handshake_works: Some(true),
475 };
476 assert_eq!(s5.to_string(), "connecting successfully");
477 assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
478 assert!(s5.blockage().is_none());
479 assert!(s5.eq(&s5));
480 assert!(!s5.eq(&s4));
481 assert!(s5.usable());
482 }
483
484 #[test]
485 fn derive_status() {
486 let start = Instant::get();
487 let sec = Duration::from_secs(1);
488 let hour = Duration::from_secs(3600);
489
490 let mut ms = ChanMgrStatus::new_at(start);
491
492 // when we start, we're unable to reach any conclusions.
493 let s0 = ms.conn_status_at(start);
494 assert!(s0.online.is_none());
495 assert!(s0.handshake_works.is_none());
496
497 // Time won't let us make conclusions either, unless there have been
498 // attempts.
499 let s = ms.conn_status_at(start + hour);
500 assert!(s.eq(&s0));
501
502 // But if there have been attempts, _and_ time has passed, we notice
503 // failure.
504 for _ in 0..10 {
505 ms.record_attempt();
506 }
507 // (Not immediately...)
508 let s = ms.conn_status_at(start);
509 assert!(s.eq(&s0));
510 // (... but after a while.)
511 let s = ms.conn_status_at(start + hour);
512 assert_eq!(s.online, Some(false));
513 assert_eq!(s.handshake_works, Some(false));
514
515 // If TCP has succeeded, we should notice that.
516 ms.record_tcp_success(start + sec);
517 let s = ms.conn_status_at(start + sec * 2);
518 assert_eq!(s.online, Some(true));
519 assert!(s.handshake_works.is_none());
520 let s = ms.conn_status_at(start + hour);
521 assert_eq!(s.online, Some(true));
522 assert_eq!(s.handshake_works, Some(false));
523
524 // If the handshake succeeded, we can notice that too.
525 ms.record_handshake_done(start + sec * 2);
526 let s = ms.conn_status_at(start + sec * 3);
527 assert_eq!(s.online, Some(true));
528 assert_eq!(s.handshake_works, Some(true));
529 }
530
531 #[test]
532 fn sender() {
533 let (mut snd, rcv) = channel();
534
535 {
536 let s = rcv.inner.borrow().clone();
537 assert_float_eq!(s.frac(), 0.0, abs <= TOL);
538 }
539
540 snd.record_attempt();
541 snd.record_tcp_success();
542 snd.record_tls_finished();
543 snd.record_handshake_done();
544
545 {
546 let s = rcv.inner.borrow().clone();
547 assert_float_eq!(s.frac(), 1.0, abs <= TOL);
548 }
549 }
550}