Skip to main content

tor_chanmgr/
event.rs

1//! Code for exporting events from the channel manager.
2#![allow(dead_code, unreachable_pub)]
3
4use educe::Educe;
5use futures::{Stream, StreamExt};
6use postage::watch;
7use std::fmt;
8use tor_basic_utils::skip_fmt;
9use web_time_compat::{Duration, Instant, InstantExt};
10
11/// The status of our connection to the internet.
12#[derive(Default, Debug, Clone)]
13pub struct ConnStatus {
14    /// Have we been able to make TCP connections?
15    ///
16    /// True if we've been able to make outgoing connections recently.
17    /// False if we've definitely been failing.
18    /// None if we haven't succeeded yet, but it's too early to say if
19    /// that's a problem.
20    online: Option<bool>,
21
22    /// Have we ever been able to make TLS handshakes and negotiate
23    /// certificates, _not including timeliness checking_?
24    ///
25    /// True if we've been able to make TLS handshakes and talk to Tor relays we
26    /// like recently. False if we've definitely been failing. None if we
27    /// haven't succeeded yet, but it's too early to say if that's a problem.
28    auth_works: Option<bool>,
29
30    /// Have we been able to successfully negotiate full Tor handshakes?
31    ///
32    /// True if we've been able to make Tor handshakes recently.
33    /// False if we've definitely been failing.
34    /// None if we haven't succeeded yet, but it's too early to say if
35    /// that's a problem.
36    handshake_works: Option<bool>,
37}
38
39/// A problem detected while connecting to the Tor network.
40#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
41#[non_exhaustive]
42pub enum ConnBlockage {
43    #[display("unable to connect to the internet")]
44    /// We haven't been able to make successful TCP connections.
45    NoTcp,
46    /// We've made TCP connections, but our TLS connections either failed, or
47    /// got hit by an attempted man-in-the-middle attack.
48    #[display("our internet connection seems to be filtered")]
49    NoHandshake,
50    /// We've made TCP connections, and our TLS connections mostly succeeded,
51    /// but we encountered failures that are well explained by clock skew,
52    /// or expired certificates.
53    #[display("relays all seem to be using expired certificates")]
54    CertsExpired,
55}
56
57impl ConnStatus {
58    /// Return true if this status is equal to `other`.
59    ///
60    /// Note:(This would just be a PartialEq implementation, but I'm not sure I
61    /// want to expose that PartialEq for this struct.)
62    fn eq(&self, other: &ConnStatus) -> bool {
63        self.online == other.online && self.handshake_works == other.handshake_works
64    }
65
66    /// Return true if this status indicates that we can successfully open Tor channels.
67    pub fn usable(&self) -> bool {
68        self.online == Some(true) && self.handshake_works == Some(true)
69    }
70
71    /// Return a float representing "how bootstrapped" we are with respect to
72    /// connecting to the Tor network, where 0 is "not at all" and 1 is
73    /// "successful".
74    ///
75    /// Callers _should not_ depend on the specific meaning of any particular
76    /// fraction; we may change these fractions in the future.
77    pub fn frac(&self) -> f32 {
78        match self {
79            Self {
80                online: Some(true),
81                auth_works: Some(true),
82                handshake_works: Some(true),
83            } => 1.0,
84            Self {
85                online: Some(true), ..
86            } => 0.5,
87            _ => 0.0,
88        }
89    }
90
91    /// Return the cause of why we aren't able to connect to the Tor network,
92    /// if we think we're stuck.
93    pub fn blockage(&self) -> Option<ConnBlockage> {
94        match self {
95            Self {
96                online: Some(false),
97                ..
98            } => Some(ConnBlockage::NoTcp),
99            Self {
100                auth_works: Some(false),
101                ..
102            } => Some(ConnBlockage::NoHandshake),
103            Self {
104                handshake_works: Some(false),
105                ..
106            } => Some(ConnBlockage::CertsExpired),
107            _ => None,
108        }
109    }
110}
111
112impl fmt::Display for ConnStatus {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
116            ConnStatus {
117                online: Some(false),
118                ..
119            } => write!(f, "unable to connect to the internet"),
120            ConnStatus {
121                handshake_works: None,
122                ..
123            } => write!(f, "handshaking with Tor relays"),
124            ConnStatus {
125                auth_works: Some(true),
126                handshake_works: Some(false),
127                ..
128            } => write!(
129                f,
130                "unable to handshake with Tor relays, possibly due to clock skew"
131            ),
132            ConnStatus {
133                handshake_works: Some(false),
134                ..
135            } => write!(f, "unable to handshake with Tor relays"),
136            ConnStatus {
137                online: Some(true),
138                handshake_works: Some(true),
139                ..
140            } => write!(f, "connecting successfully"),
141        }
142    }
143}
144
145/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
146///
147/// This stream is lossy; a reader might not see some events on the stream, if
148/// they are produced faster than the reader can consume.  In that case, the
149/// reader will see more recent updates, and miss older ones.
150///
151/// Note that the bootstrap status is not monotonic: we might become less
152/// bootstrapped than we were before.  (For example, the internet could go
153/// down.)
154#[derive(Clone, Educe)]
155#[educe(Debug)]
156pub struct ConnStatusEvents {
157    /// The receiver that implements this stream.
158    ///
159    /// (We wrap it in a new type here so that we can replace the implementation
160    /// later on if we need to.)
161    #[educe(Debug(method = "skip_fmt"))]
162    inner: watch::Receiver<ConnStatus>,
163}
164
165impl Stream for ConnStatusEvents {
166    type Item = ConnStatus;
167    fn poll_next(
168        mut self: std::pin::Pin<&mut Self>,
169        cx: &mut std::task::Context<'_>,
170    ) -> std::task::Poll<Option<Self::Item>> {
171        self.inner.poll_next_unpin(cx)
172    }
173}
174
175/// Crate-internal view of "how connected are we to the internet?"
176///
177/// This is a more complex and costly structure than ConnStatus, so we track
178/// this here, and only expose the minimum via ConnStatus over a
179/// `postage::watch`.  Later, we might want to expose more of this information.
180//
181// TODO: Eventually we should add some ability to reset our bootstrap status, if
182// our connections start failing.
183#[derive(Debug, Clone)]
184struct ChanMgrStatus {
185    /// When did we first get initialized?
186    startup: Instant,
187
188    /// Since we started, how many channels have we tried to build?
189    n_attempts: usize,
190
191    /// When (if ever) have we made a TCP connection to (what we hoped was) a
192    /// Tor relay?
193    ///
194    /// If we don't reach this point, we're probably not on the internet.
195    ///
196    /// If we get no further than this, we're probably having our TCP
197    /// connections captured or replaced.
198    last_tcp_success: Option<Instant>,
199
200    /// When (if ever) have we successfully finished a TLS handshake to (what we
201    /// hoped was) a Tor relay?
202    ///
203    /// If we get no further than this, we might be facing a TLS MITM attack.
204    //
205    // TODO: We don't actually use this information yet: our output doesn't
206    // distinguish filtering where TLS succeeds but gets MITM'd from filtering
207    // where TLS fails.
208    last_tls_success: Option<Instant>,
209
210    /// When (if ever) have we ever finished the inner Tor handshake with a relay,
211    /// up to the point where we check for certificate timeliness?
212    last_chan_auth_success: Option<Instant>,
213
214    /// When (if ever) have we successfully finished the inner Tor handshake
215    /// with a relay?
216    ///
217    /// If we get to this point, we can successfully talk to something that
218    /// holds the private key that it's supposed to.
219    last_chan_success: Option<Instant>,
220}
221
222impl ChanMgrStatus {
223    /// Construct a new ChanMgr status.
224    ///
225    /// It will be built as having been initialized at the time `now`.
226    fn new_at(now: Instant) -> ChanMgrStatus {
227        ChanMgrStatus {
228            startup: now,
229            n_attempts: 0,
230            last_tcp_success: None,
231            last_tls_success: None,
232            last_chan_auth_success: None,
233            last_chan_success: None,
234        }
235    }
236
237    /// Return a [`ConnStatus`] for the current state, at time `now`.
238    ///
239    /// (The time is necessary because a lack of success doesn't indicate a
240    /// problem until enough time has passed.)
241    fn conn_status_at(&self, now: Instant) -> ConnStatus {
242        /// How long do we need to be online before we'll acknowledge failure?
243        const MIN_DURATION: Duration = Duration::from_secs(60);
244        /// How many attempts do we need to launch before we'll acknowledge failure?
245        const MIN_ATTEMPTS: usize = 6;
246
247        // If set, it's too early to determine failure.
248        let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
249
250        let online = match (self.last_tcp_success.is_some(), early) {
251            (true, _) => Some(true),
252            (_, true) => None,
253            (false, false) => Some(false),
254        };
255
256        let auth_works = match (self.last_chan_auth_success.is_some(), early) {
257            (true, _) => Some(true),
258            (_, true) => None,
259            (false, false) => Some(false),
260        };
261
262        let handshake_works = match (self.last_chan_success.is_some(), early) {
263            (true, _) => Some(true),
264            (_, true) => None,
265            (false, false) => Some(false),
266        };
267
268        ConnStatus {
269            online,
270            auth_works,
271            handshake_works,
272        }
273    }
274
275    /// Note that an attempt to connect has been started.
276    fn record_attempt(&mut self) {
277        self.n_attempts += 1;
278    }
279
280    /// Note that we've successfully done a TCP handshake with an alleged relay.
281    fn record_tcp_success(&mut self, now: Instant) {
282        self.last_tcp_success = Some(now);
283    }
284
285    /// Note that we've completed a TLS handshake with an alleged relay.
286    ///
287    /// (Its identity won't be verified till the next step.)
288    fn record_tls_finished(&mut self, now: Instant) {
289        self.last_tls_success = Some(now);
290    }
291
292    /// Note that we've completed a Tor handshake with a relay, _but failed to
293    /// verify the certificates in a way that could indicate clock skew_.
294    fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
295        self.last_chan_auth_success = Some(now);
296    }
297
298    /// Note that we've completed a Tor handshake with a relay.
299    ///
300    /// (This includes performing the TLS handshake, and verifying that the
301    /// relay was indeed the one that we wanted to reach.)
302    fn record_handshake_done(&mut self, now: Instant) {
303        self.last_chan_auth_success = Some(now);
304        self.last_chan_success = Some(now);
305    }
306}
307
308/// Object that manages information about a `ChanMgr`'s status, and sends
309/// information about connectivity changes over an asynchronous channel
310pub(crate) struct ChanMgrEventSender {
311    /// The last ConnStatus that we sent over the channel.
312    last_conn_status: ConnStatus,
313    /// The unsummarized status information from the ChanMgr.
314    mgr_status: ChanMgrStatus,
315    /// The channel that we use for sending ConnStatus information.
316    sender: watch::Sender<ConnStatus>,
317}
318
319impl ChanMgrEventSender {
320    /// If the status has changed as of `now`, tell any listeners.
321    ///
322    /// (This takes a time because we need to know how much time has elapsed
323    /// without successful attempts.)
324    ///
325    /// # Limitations
326    ///
327    /// We are dependent on calls to `record_attempt()` and similar methods to
328    /// actually invoke this function; if they were never called, we'd never
329    /// notice that we had gone too long without building connections.  That's
330    /// okay for now, though, since any Tor client will immediately start
331    /// building circuits, which will launch connection attempts until one
332    /// succeeds or the client gives up entirely.  
333    fn push_at(&mut self, now: Instant) {
334        let status = self.mgr_status.conn_status_at(now);
335        if !status.eq(&self.last_conn_status) {
336            self.last_conn_status = status.clone();
337            let mut b = self.sender.borrow_mut();
338            *b = status;
339        }
340    }
341
342    /// Note that an attempt to connect has been started.
343    pub(crate) fn record_attempt(&mut self) {
344        self.mgr_status.record_attempt();
345        self.push_at(Instant::get());
346    }
347
348    /// Note that we've successfully done a TCP handshake with an alleged relay.
349    pub(crate) fn record_tcp_success(&mut self) {
350        let now = Instant::get();
351        self.mgr_status.record_tcp_success(now);
352        self.push_at(now);
353    }
354
355    /// Note that we've completed a TLS handshake with an alleged relay.
356    ///
357    /// (Its identity won't be verified till the next step.)
358    pub(crate) fn record_tls_finished(&mut self) {
359        let now = Instant::get();
360        self.mgr_status.record_tls_finished(now);
361        self.push_at(now);
362    }
363
364    /// Record that a handshake has succeeded _except for the certificate
365    /// timeliness check, which may indicate a skewed clock.
366    pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
367        let now = Instant::get();
368        self.mgr_status.record_handshake_done_with_skewed_clock(now);
369        self.push_at(now);
370    }
371
372    /// Note that we've completed a Tor handshake with a relay.
373    ///
374    /// (This includes performing the TLS handshake, and verifying that the
375    /// relay was indeed the one that we wanted to reach.)
376    pub(crate) fn record_handshake_done(&mut self) {
377        let now = Instant::get();
378        self.mgr_status.record_handshake_done(now);
379        self.push_at(now);
380    }
381}
382
383/// Create a new channel for sending connectivity status events to other crates.
384pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
385    let (sender, receiver) = watch::channel();
386    let receiver = ConnStatusEvents { inner: receiver };
387    let sender = ChanMgrEventSender {
388        last_conn_status: ConnStatus::default(),
389        mgr_status: ChanMgrStatus::new_at(Instant::get()),
390        sender,
391    };
392    (sender, receiver)
393}
394
395#[cfg(test)]
396#[allow(clippy::cognitive_complexity)]
397mod test {
398    // @@ begin test lint list maintained by maint/add_warning @@
399    #![allow(clippy::bool_assert_comparison)]
400    #![allow(clippy::clone_on_copy)]
401    #![allow(clippy::dbg_macro)]
402    #![allow(clippy::mixed_attributes_style)]
403    #![allow(clippy::print_stderr)]
404    #![allow(clippy::print_stdout)]
405    #![allow(clippy::single_char_pattern)]
406    #![allow(clippy::unwrap_used)]
407    #![allow(clippy::unchecked_time_subtraction)]
408    #![allow(clippy::useless_vec)]
409    #![allow(clippy::needless_pass_by_value)]
410    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
411    use super::*;
412    use float_eq::assert_float_eq;
413
414    /// Tolerance for float comparison.
415    const TOL: f32 = 0.00001;
416
417    #[test]
418    fn status_basics() {
419        let s1 = ConnStatus::default();
420        assert_eq!(s1.to_string(), "connecting to the internet");
421        assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
422        assert!(s1.eq(&s1));
423        assert!(s1.blockage().is_none());
424        assert!(!s1.usable());
425
426        let s2 = ConnStatus {
427            online: Some(false),
428            auth_works: None,
429            handshake_works: None,
430        };
431        assert_eq!(s2.to_string(), "unable to connect to the internet");
432        assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
433        assert!(s2.eq(&s2));
434        assert!(!s2.eq(&s1));
435        assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
436        assert_eq!(
437            s2.blockage().unwrap().to_string(),
438            "unable to connect to the internet"
439        );
440        assert!(!s2.usable());
441
442        let s3 = ConnStatus {
443            online: Some(true),
444            auth_works: None,
445            handshake_works: None,
446        };
447        assert_eq!(s3.to_string(), "handshaking with Tor relays");
448        assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
449        assert_eq!(s3.blockage(), None);
450        assert!(!s3.eq(&s1));
451        assert!(!s3.usable());
452
453        let s4 = ConnStatus {
454            online: Some(true),
455            auth_works: Some(false),
456            handshake_works: Some(false),
457        };
458        assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
459        assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
460        assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
461        assert_eq!(
462            s4.blockage().unwrap().to_string(),
463            "our internet connection seems to be filtered"
464        );
465        assert!(!s4.eq(&s1));
466        assert!(!s4.eq(&s2));
467        assert!(!s4.eq(&s3));
468        assert!(s4.eq(&s4));
469        assert!(!s4.usable());
470
471        let s5 = ConnStatus {
472            online: Some(true),
473            auth_works: Some(true),
474            handshake_works: Some(true),
475        };
476        assert_eq!(s5.to_string(), "connecting successfully");
477        assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
478        assert!(s5.blockage().is_none());
479        assert!(s5.eq(&s5));
480        assert!(!s5.eq(&s4));
481        assert!(s5.usable());
482    }
483
484    #[test]
485    fn derive_status() {
486        let start = Instant::get();
487        let sec = Duration::from_secs(1);
488        let hour = Duration::from_secs(3600);
489
490        let mut ms = ChanMgrStatus::new_at(start);
491
492        // when we start, we're unable to reach any conclusions.
493        let s0 = ms.conn_status_at(start);
494        assert!(s0.online.is_none());
495        assert!(s0.handshake_works.is_none());
496
497        // Time won't let us make conclusions either, unless there have been
498        // attempts.
499        let s = ms.conn_status_at(start + hour);
500        assert!(s.eq(&s0));
501
502        // But if there have been attempts, _and_ time has passed, we notice
503        // failure.
504        for _ in 0..10 {
505            ms.record_attempt();
506        }
507        // (Not immediately...)
508        let s = ms.conn_status_at(start);
509        assert!(s.eq(&s0));
510        // (... but after a while.)
511        let s = ms.conn_status_at(start + hour);
512        assert_eq!(s.online, Some(false));
513        assert_eq!(s.handshake_works, Some(false));
514
515        // If TCP has succeeded, we should notice that.
516        ms.record_tcp_success(start + sec);
517        let s = ms.conn_status_at(start + sec * 2);
518        assert_eq!(s.online, Some(true));
519        assert!(s.handshake_works.is_none());
520        let s = ms.conn_status_at(start + hour);
521        assert_eq!(s.online, Some(true));
522        assert_eq!(s.handshake_works, Some(false));
523
524        // If the handshake succeeded, we can notice that too.
525        ms.record_handshake_done(start + sec * 2);
526        let s = ms.conn_status_at(start + sec * 3);
527        assert_eq!(s.online, Some(true));
528        assert_eq!(s.handshake_works, Some(true));
529    }
530
531    #[test]
532    fn sender() {
533        let (mut snd, rcv) = channel();
534
535        {
536            let s = rcv.inner.borrow().clone();
537            assert_float_eq!(s.frac(), 0.0, abs <= TOL);
538        }
539
540        snd.record_attempt();
541        snd.record_tcp_success();
542        snd.record_tls_finished();
543        snd.record_handshake_done();
544
545        {
546            let s = rcv.inner.borrow().clone();
547            assert_float_eq!(s.frac(), 1.0, abs <= TOL);
548        }
549    }
550}