Skip to main content

tor_proto/client/circuit/padding/maybenot_padding/
backend.rs

1//! Padding backend based on [`maybenot`].
2//!
3//! # Operation
4//!
5//! For each each circuit hop, we have an optional [`maybenot::Framework`].
6//! This framework wraps multiple "padding machines",
7//! each of which is a randomized state machine.
8//! (Arti is built with a list of pre-configured of padding machines.
9//! The set of padding machines to use with any given circuit hop
10//! are negotiated via `PADDING_NEGOTIATE` messages.)
11//! We interact with the framework via
12//! [`Framework::trigger_events`](maybenot::Framework::trigger_events),
13//! which consumes [`TriggerEvent`]s and gives us [`TriggerAction`]s.
14//! Those `TriggerAction`s tell us to schedule or reschedule different timers,
15//! to block traffic, or to send padding.
16//!
17//! We wrap the `Framework` in [`MaybenotPadder`],
18//! which keeps track of the expiration time for each timer.
19//! From `MaybenotPadder`, we expose a single timer
20//! describing when the next action from the padding machine may be necessary.
21//! This timer is likely to update frequently.
22
23use std::{sync::Arc, task::Waker};
24
25use maybenot::{MachineId, TriggerEvent};
26use smallvec::SmallVec;
27
28use super::{Bypass, Duration, Instant, PerHopPaddingEvent, PerHopPaddingEventVec, Replace};
29
30/// The Rng that we construct for our padding machines.
31///
32/// We use a separate type alias here in case we want to move to a per-Framework
33/// ChaCha8Rng or something.
34type Rng = ThisThreadRng;
35
36/// The particular instantiated padding framework type that we use.
37type Framework = maybenot::Framework<Arc<[maybenot::Machine]>, Rng, Instant>;
38
39/// A [`maybenot::TriggerAction`] as we construct it for use with our [`Framework`]s.
40type TriggerAction = maybenot::TriggerAction<Instant>;
41
42/// A type we use to report events that we must trigger on the basis of triggering other events.
43///
44/// We've optimized here for the assumption that we _usually_ won't need to trigger more than one
45/// event.
46type TriggerEventsOutVec = SmallVec<[TriggerEvent; 1]>;
47
48/// An action that we should take on a machine's behalf,
49/// after a certain interval has elapsed.
50#[derive(Clone, Debug)]
51enum ScheduledAction {
52    /// We should send padding if and when the machine's action timer expires.
53    SendPadding {
54        /// Send padding even if bypassable blocking is in place.
55        /// (Blocking can be bypassable or non-bypassable.)
56        bypass: bool,
57        /// If an existing non-padding cell is queued,
58        /// it can replace this padding.
59        //
60        /// If `bypass` is true, such a cell can also bypass bypassable blocking.
61        replace: bool,
62    },
63    /// We should block outbound traffic if and when the machine's action timer expires.
64    Block {
65        /// If true, then the blocking is bypassable.
66        bypass: bool,
67        /// If true, then we should change the duration of the current blocking unconditionally.
68        /// If false, we should use whichever duration is longer.
69        replace: bool,
70        /// The interval of the blocking that we should apply.
71        duration: Duration,
72    },
73}
74
75/// The state for a _single_ padding Machine within a Framework.
76#[derive(Default, Clone, Debug)]
77struct MachineState {
78    /// The current state for the machine's "internal timer".
79    ///
80    /// Each machine has a single internal timer,
81    /// and manages the timer itself via the `UpdateTimer` and `Cancel`
82    /// [`TriggerAction`] variants.
83    internal_timer_expires: Option<Instant>,
84
85    /// The current state for the machine's "action timer".
86    ///
87    /// Each machine has a single action timer, after which some [`ScheduledAction`]
88    /// should be taken.
89    ///
90    /// (Note that only one action can be scheduled per machine,
91    /// so if we're told to schedule blocking, we should cancel padding;
92    /// and if we're told to schedule padding, we should cancel blocking.)
93    action_timer_expires: Option<(Instant, ScheduledAction)>,
94}
95
96impl MachineState {
97    /// Return the earliest time that either of this machine's timers will expire.
98    fn next_expiration(&self) -> Option<Instant> {
99        match (&self.internal_timer_expires, &self.action_timer_expires) {
100            (None, None) => None,
101            (None, Some((t, _))) => Some(*t),
102            (Some(t), None) => Some(*t),
103            (Some(t1), Some((t2, _))) => Some(*t1.min(t2)),
104        }
105    }
106}
107
108/// Represents the state for all padding machines within a framework.
109///
110/// N should be around the number of padding machines that the framework should support.
111struct PadderState<const N: usize> {
112    /// A list of all the padding machine states for a single framework.
113    ///
114    /// This list is indexed by `MachineId`.
115    //
116    // TODO: Optimize this size even more if appropriate
117    state: SmallVec<[MachineState; N]>,
118}
119
120impl<const N: usize> PadderState<N> {
121    /// Return a mutable reference to the state corresponding to a given [`MachineId`]
122    ///
123    /// # Panics
124    ///
125    /// Panics if `id` is out of range, which can only happen if a MachineId from
126    /// one Framework is given to another Framework.
127    fn state_mut(&mut self, id: MachineId) -> &mut MachineState {
128        &mut self.state[id.into_raw()]
129    }
130
131    /// Execute a single [`TriggerAction`] on this state.
132    ///
133    /// `TriggerActions` are created by `maybenot::Framework` instances
134    /// in response to [`TriggerEvent`]s.
135    ///
136    /// Executing a `TriggerAction` can adjust timers,
137    /// and can schedule a new [`ScheduledAction`] to be taken in the future;
138    /// it does not, however, send any padding or adjust any blocking on its own.
139    ///
140    /// The current time should be provided in `now`.
141    ///
142    /// Executing a `TriggerAction` can cause more events to occur.
143    /// If this happens, they are added to `events_out`.
144    ///
145    /// If this method returns false, no timer has changed.
146    /// If this method returns true, then a timer may have changed.
147    /// (False positives are possible, but not false negatives.)
148    fn trigger_action(
149        &mut self,
150        action: &TriggerAction,
151        now: Instant,
152        events_out: &mut TriggerEventsOutVec,
153    ) -> bool {
154        use maybenot::Timer as T;
155        use maybenot::TriggerAction as A;
156
157        let mut timer_changed = false;
158
159        match action {
160            A::Cancel { machine, timer } => {
161                // "Cancel" means to stop one or both of the timers from this machine.
162                let st = self.state_mut(*machine);
163                match timer {
164                    T::Action => st.action_timer_expires = None,
165                    T::Internal => st.internal_timer_expires = None,
166                    T::All => {
167                        st.action_timer_expires = None;
168                        st.internal_timer_expires = None;
169                    }
170                };
171                timer_changed = true;
172            }
173            A::SendPadding {
174                timeout,
175                bypass,
176                replace,
177                machine,
178            } => {
179                // "SendPadding" means to schedule padding to be sent after a given timeout,
180                // and to replace any previous timed action.
181                let st = self.state_mut(*machine);
182                st.action_timer_expires = Some((
183                    now + *timeout,
184                    ScheduledAction::SendPadding {
185                        bypass: *bypass,
186                        replace: *replace,
187                    },
188                ));
189                timer_changed = true;
190            }
191            A::BlockOutgoing {
192                timeout,
193                duration,
194                bypass,
195                replace,
196                machine,
197            } => {
198                // "BlockOutgoing" means to begin blocking traffic for a given duration,
199                // after a given timeout,
200                // and to replace any previous timed action.
201                let st = self.state_mut(*machine);
202                st.action_timer_expires = Some((
203                    now + *timeout,
204                    ScheduledAction::Block {
205                        bypass: *bypass,
206                        replace: *replace,
207                        duration: *duration,
208                    },
209                ));
210                timer_changed = true;
211            }
212            A::UpdateTimer {
213                duration,
214                replace,
215                machine,
216            } => {
217                // "UpdateTimer" means to set or re-set the internal timer for this machine.
218                let st = self.state_mut(*machine);
219
220                let new_expiry = now + *duration;
221                // The "replace" flag means "update the internal timer unconditionally".
222                // If it is false, and the timer is already set, then we should only update
223                // the internal timer to be _longer_.
224                let update_timer = match (replace, st.internal_timer_expires) {
225                    (_, None) => true,
226                    (true, Some(_)) => true,
227                    (false, Some(cur)) if new_expiry > cur => true,
228                    (false, Some(_)) => false,
229                };
230                if update_timer {
231                    st.internal_timer_expires = Some(new_expiry);
232                    timer_changed = true;
233                }
234                // Note: We are supposed to trigger TimerBegin unconditionally
235                // if the timer changes at all.
236                events_out.push(TriggerEvent::TimerBegin { machine: *machine });
237            }
238        }
239
240        timer_changed
241    }
242
243    /// Return the next instant (if any) at which any of the padding machines' timers will expire.
244    fn next_expiration(&self) -> Option<Instant> {
245        self.state
246            .iter()
247            .filter_map(MachineState::next_expiration)
248            .min()
249    }
250}
251
252/// Possible state of a Framework's aggregate timer.
253///
254/// (Since there are two possible timers for each Machine,
255/// we just keep track of the one that will expire next.)
256#[derive(Clone, Debug)]
257struct Timer {
258    /// The next time at which any of this padding machines' timer will expire.
259    ///
260    /// (None means "no timers are set.")
261    next_expiration: Option<Instant>,
262
263    /// A [`Waker`] that we must wake whenever `self.next_expiration` becomes sooner than
264    /// our next scheduled wakeup (as passed as an argument to `set_expiration`).
265    waker: Waker,
266}
267
268impl Timer {
269    /// Construct a new Timer.
270    fn new() -> Self {
271        Self {
272            next_expiration: None,
273            waker: Waker::noop().clone(),
274        }
275    }
276
277    /// Return the next expiration time, and schedule `waker` to be alerted whenever
278    /// the expiration time becomes earlier than the time at which we've actually decided to sleep
279    /// (passed as an argument to `set_expiration()`).
280    ///
281    /// (There are two separate expiration times at work here because, in higher-level code,
282    /// we combine _all_ the timer expirations for all padding machines on a circuit
283    /// into a single expiration, and track only that expiration.)
284    fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
285        // TODO: Perhaps this should instead return and/or manipulate a sleep future.
286        // TODO: Perhaps there should be a shared AtomicWaker?
287        self.waker = waker.clone();
288        self.next_expiration
289    }
290
291    /// Change the expiration time to `new_expiration`, alerting the [`Waker`] if that time
292    /// is earlier than `next_scheduled_wakeup`.
293    fn set_expiration(
294        &mut self,
295        new_expiration: Option<Instant>,
296        next_scheduled_wakeup: Option<Instant>,
297    ) {
298        // we need to invoke the waker if the new expiration is earlier than the one the waker has.
299        let wake = match (next_scheduled_wakeup, new_expiration) {
300            (_, None) => false,
301            (None, Some(_)) => true,
302            (Some(w_exp), Some(new_exp)) => new_exp < w_exp,
303        };
304        self.next_expiration = new_expiration;
305        if wake {
306            self.waker.wake_by_ref();
307        }
308    }
309}
310
311/// State of a MaybenotPadder that is blocking.
312///
313/// Here we only need to remember when the blocking expires;
314/// we record the bypassable status of the padding in [`super::PaddingShared`].
315#[derive(Debug)]
316struct BlockingState {
317    /// The time at which this blocking expires.
318    expiration: Instant,
319}
320
321/// An implementation of circuit padding using [`maybenot`].
322///
323/// Supports up to `N` padding machines without spilling over onto the heap.
324pub(super) struct MaybenotPadder<const N: usize> {
325    /// Our underlying [`maybenot::Framework`].
326    framework: Framework,
327    /// The state of our padding machines.
328    state: PadderState<N>,
329    /// Our current timer information.
330    timer: Timer,
331    /// If we are blocking, information about the blocking.
332    blocking: Option<BlockingState>,
333}
334
335impl<const N: usize> MaybenotPadder<N> {
336    /// Construct a new MaybyenotPadder from a provided `FrameworkRules`.
337    pub(super) fn from_framework_rules(
338        rules: &super::PaddingRules,
339        now: Instant,
340    ) -> Result<Self, maybenot::Error> {
341        let framework = maybenot::Framework::new(
342            rules.machines.clone(),
343            rules.max_outbound_padding_frac,
344            rules.max_outbound_blocking_frac,
345            now,
346            ThisThreadRng,
347        )?;
348        Ok(Self::from_framework(framework))
349    }
350
351    /// Construct a new MaybenotPadder from a given Framework.
352    pub(super) fn from_framework(framework: Framework) -> Self {
353        let n = framework.num_machines();
354        let state = PadderState {
355            state: smallvec::smallvec![MachineState::default(); n],
356        };
357        Self {
358            framework,
359            state,
360            timer: Timer::new(),
361            blocking: None,
362        }
363    }
364
365    /// Return the next expiration time, and schedule `waker` to be alerted whenever
366    /// the expiration time becomes earlier than that.
367    pub(super) fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
368        self.timer.get_expiration(waker)
369    }
370
371    /// Tell the padding machines about all of the given `events`,
372    /// report them happening at `now`, and adjust internal state.
373    ///
374    /// If doing this would cause any timer to become earlier than `next_scheduled_wakeup`,
375    /// wake up the registered [`Waker`].
376    pub(super) fn trigger_events_at(
377        &mut self,
378        events: &[TriggerEvent],
379        now: Instant,
380        next_scheduled_wakeup: Option<Instant>,
381    ) {
382        let mut timer_changed = false;
383
384        // A pair of buffers that we'll use to handle events that arise while triggering other
385        // events.  (The BeginTimer event can be triggered by the UpdateTimer action.)
386        let (mut e1, mut e2) = (TriggerEventsOutVec::new(), TriggerEventsOutVec::new());
387        let (mut processing, mut pending) = (&mut e1, &mut e2);
388
389        let mut events = events;
390
391        /// If we go through our loop more than this many times, we stop:
392        /// An infinite loop is in theory possible, but we don't want to allow one.
393        const MAX_LOOPS: usize = 4;
394
395        let finished_normally = 'finished: {
396            for _ in 0..MAX_LOOPS {
397                pending.clear();
398                for action in self.framework.trigger_events(events, now) {
399                    timer_changed |= self.state.trigger_action(action, now, pending);
400                }
401
402                if pending.is_empty() {
403                    // We don't have any additional events to trigger.
404                    break 'finished true;
405                } else {
406                    std::mem::swap(&mut processing, &mut pending);
407                    events = &processing[..];
408                }
409            }
410            // We got to the last iteration of the loop and still had events to trigger.
411            break 'finished false;
412        };
413
414        if !finished_normally {
415            // TODO: Log in this case, but not too many times.
416        }
417
418        if timer_changed {
419            self.timer
420                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
421        }
422    }
423
424    /// Take any actions that need to occur at time `now`.
425    ///
426    /// We should call this function as soon as possible after our timer has expired.
427    ///
428    /// Returns zero or more [`PerHopPaddingEvent`]s reflecting the padding that we should send,
429    /// and what we should do with blocking.
430    fn take_actions_at(
431        &mut self,
432        now: Instant,
433        next_scheduled_wakeup: Option<Instant>,
434    ) -> PerHopPaddingEventVec {
435        // Events that we need to trigger based on expired timers.
436        // TODO: We might want a smaller N here.
437        let mut e: SmallVec<[TriggerEvent; N]> = SmallVec::default();
438
439        // A list of events that we can't handle internally, and which we need to report
440        // to a circuit/tunnel reactor.
441        let mut return_events = PerHopPaddingEventVec::default();
442
443        let mut timer_changed = false;
444
445        if let Some(blocking) = &self.blocking {
446            if blocking.expiration <= now {
447                timer_changed = true;
448                self.blocking = None;
449                e.push(TriggerEvent::BlockingEnd);
450                return_events.push(PerHopPaddingEvent::StopBlocking);
451            }
452        }
453
454        for (idx, st) in self.state.state.iter_mut().enumerate() {
455            match st.internal_timer_expires {
456                Some(t) if t <= now => {
457                    // This machine's internal timer has expired; we tell it so.
458                    st.internal_timer_expires = None;
459                    timer_changed = true;
460                    e.push(TriggerEvent::TimerEnd {
461                        machine: MachineId::from_raw(idx),
462                    });
463                }
464                None | Some(_) => {}
465            }
466            match &st.action_timer_expires {
467                Some((t, _)) if *t <= now => {
468                    // This machine's action timer has expired; we now take that action.
469                    use ScheduledAction as SA;
470                    let action = st
471                        .action_timer_expires
472                        .take()
473                        .expect("It was Some a minute ago!")
474                        .1;
475                    timer_changed = true;
476                    match action {
477                        SA::SendPadding { bypass, replace } => {
478                            return_events.push(PerHopPaddingEvent::SendPadding {
479                                machine: MachineId::from_raw(idx),
480                                replace: Replace::from_bool(replace),
481                                bypass: Bypass::from_bool(bypass),
482                            });
483                        }
484                        SA::Block {
485                            bypass,
486                            replace,
487                            duration,
488                        } => {
489                            let new_expiry = now + duration;
490                            if self.blocking.is_none() {
491                                return_events.push(PerHopPaddingEvent::StartBlocking {
492                                    is_bypassable: bypass,
493                                });
494                            }
495                            let replace = match &self.blocking {
496                                None => true,
497                                Some(b) if replace || b.expiration < new_expiry => true,
498                                Some(_) => false,
499                            };
500                            if replace {
501                                self.blocking = Some(BlockingState {
502                                    expiration: new_expiry,
503                                });
504                            }
505
506                            // We trigger this event unconditionally, even if we were already
507                            // blocking.
508                            e.push(TriggerEvent::BlockingBegin {
509                                machine: MachineId::from_raw(idx),
510                            });
511                        }
512                    }
513                }
514                None | Some(_) => {}
515            }
516        }
517
518        if timer_changed {
519            self.timer
520                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
521        }
522
523        // Inform the framework of any expired timeouts.
524        self.trigger_events_at(&e[..], now, next_scheduled_wakeup);
525
526        return_events
527    }
528}
529
530/// Helper: An Rng object that calls `rand::rng()` to get the thread Rng.
531///
532/// (We use this since we want our maybenot Framework to use the thread Rng,
533/// but we can't have it _own_ the thread Rng. )
534#[derive(Clone, Debug)]
535pub(super) struct ThisThreadRng;
536impl rand::RngCore for ThisThreadRng {
537    fn next_u32(&mut self) -> u32 {
538        rand::rng().next_u32()
539    }
540
541    fn next_u64(&mut self) -> u64 {
542        rand::rng().next_u64()
543    }
544
545    fn fill_bytes(&mut self, dst: &mut [u8]) {
546        rand::rng().fill_bytes(dst);
547    }
548}
549
550/// Helper trait: Used to wrap a single [`MaybenotPadder`].
551///
552/// (We don't use `MaybenotPadder` directly because we want to keep the freedom
553/// to parameterize it differently, or maybe even to replace it with something else.)
554//
555// TODO circpad: Decide whether this optimization/flexibility makes any sense.
556pub(super) trait PaddingBackend: Send + Sync {
557    /// Report one or more TriggerEvents to the padder.
558    ///
559    /// Alert any registered `Waker` if these events cause us to need to take action
560    /// earlier than `next_scheduled_wakeup`.
561    fn report_events_at(
562        &mut self,
563        events: &[maybenot::TriggerEvent],
564        now: Instant,
565        next_scheduled_wakeup: Option<Instant>,
566    );
567
568    /// Trigger any padding actions that should be taken `now`.
569    ///
570    /// If _we_ should perform any actions (blocking, unblocking, or sending padding),
571    /// return them in a [`PerHopPaddingEventVec`].
572    ///
573    /// Alert any registered `Waker` if these events cause us to need to take action
574    /// earlier than `next_scheduled_wakeup`.
575    fn take_padding_events_at(
576        &mut self,
577        now: Instant,
578        next_scheduled_wakeup: Option<Instant>,
579    ) -> PerHopPaddingEventVec;
580
581    /// This method should be called when we have no actions to perform,
582    /// with a [`Waker`] that will activate the corresponding [`PaddingEventStream`](super::PaddingEventStream).
583    ///
584    /// It will return a time at which pending_events_at() should next be called,
585    /// and will wake up the Waker if it turns out that we need to call `pending_events_at()`
586    /// any earlier than that.
587    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant>;
588}
589
590impl<const N: usize> PaddingBackend for MaybenotPadder<N> {
591    fn report_events_at(
592        &mut self,
593        events: &[maybenot::TriggerEvent],
594        now: Instant,
595        next_scheduled_wakeup: Option<Instant>,
596    ) {
597        self.trigger_events_at(events, now, next_scheduled_wakeup);
598    }
599
600    fn take_padding_events_at(
601        &mut self,
602        now: Instant,
603        next_scheduled_wakeup: Option<Instant>,
604    ) -> PerHopPaddingEventVec {
605        self.take_actions_at(now, next_scheduled_wakeup)
606    }
607
608    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
609        self.get_expiration(waker)
610    }
611}