tor_proto/client/circuit/padding/maybenot_padding/backend.rs
1//! Padding backend based on [`maybenot`].
2//!
3//! # Operation
4//!
5//! For each each circuit hop, we have an optional [`maybenot::Framework`].
6//! This framework wraps multiple "padding machines",
7//! each of which is a randomized state machine.
8//! (Arti is built with a list of pre-configured of padding machines.
9//! The set of padding machines to use with any given circuit hop
10//! are negotiated via `PADDING_NEGOTIATE` messages.)
11//! We interact with the framework via
12//! [`Framework::trigger_events`](maybenot::Framework::trigger_events),
13//! which consumes [`TriggerEvent`]s and gives us [`TriggerAction`]s.
14//! Those `TriggerAction`s tell us to schedule or reschedule different timers,
15//! to block traffic, or to send padding.
16//!
17//! We wrap the `Framework` in [`MaybenotPadder`],
18//! which keeps track of the expiration time for each timer.
19//! From `MaybenotPadder`, we expose a single timer
20//! describing when the next action from the padding machine may be necessary.
21//! This timer is likely to update frequently.
22
23use std::{sync::Arc, task::Waker};
24
25use maybenot::{MachineId, TriggerEvent};
26use smallvec::SmallVec;
27
28use super::{Bypass, Duration, Instant, PerHopPaddingEvent, PerHopPaddingEventVec, Replace};
29
30/// The Rng that we construct for our padding machines.
31///
32/// We use a separate type alias here in case we want to move to a per-Framework
33/// ChaCha8Rng or something.
34type Rng = ThisThreadRng;
35
36/// The particular instantiated padding framework type that we use.
37type Framework = maybenot::Framework<Arc<[maybenot::Machine]>, Rng, Instant>;
38
39/// A [`maybenot::TriggerAction`] as we construct it for use with our [`Framework`]s.
40type TriggerAction = maybenot::TriggerAction<Instant>;
41
42/// A type we use to report events that we must trigger on the basis of triggering other events.
43///
44/// We've optimized here for the assumption that we _usually_ won't need to trigger more than one
45/// event.
46type TriggerEventsOutVec = SmallVec<[TriggerEvent; 1]>;
47
48/// An action that we should take on a machine's behalf,
49/// after a certain interval has elapsed.
50#[derive(Clone, Debug)]
51enum ScheduledAction {
52 /// We should send padding if and when the machine's action timer expires.
53 SendPadding {
54 /// Send padding even if bypassable blocking is in place.
55 /// (Blocking can be bypassable or non-bypassable.)
56 bypass: bool,
57 /// If an existing non-padding cell is queued,
58 /// it can replace this padding.
59 //
60 /// If `bypass` is true, such a cell can also bypass bypassable blocking.
61 replace: bool,
62 },
63 /// We should block outbound traffic if and when the machine's action timer expires.
64 Block {
65 /// If true, then the blocking is bypassable.
66 bypass: bool,
67 /// If true, then we should change the duration of the current blocking unconditionally.
68 /// If false, we should use whichever duration is longer.
69 replace: bool,
70 /// The interval of the blocking that we should apply.
71 duration: Duration,
72 },
73}
74
75/// The state for a _single_ padding Machine within a Framework.
76#[derive(Default, Clone, Debug)]
77struct MachineState {
78 /// The current state for the machine's "internal timer".
79 ///
80 /// Each machine has a single internal timer,
81 /// and manages the timer itself via the `UpdateTimer` and `Cancel`
82 /// [`TriggerAction`] variants.
83 internal_timer_expires: Option<Instant>,
84
85 /// The current state for the machine's "action timer".
86 ///
87 /// Each machine has a single action timer, after which some [`ScheduledAction`]
88 /// should be taken.
89 ///
90 /// (Note that only one action can be scheduled per machine,
91 /// so if we're told to schedule blocking, we should cancel padding;
92 /// and if we're told to schedule padding, we should cancel blocking.)
93 action_timer_expires: Option<(Instant, ScheduledAction)>,
94}
95
96impl MachineState {
97 /// Return the earliest time that either of this machine's timers will expire.
98 fn next_expiration(&self) -> Option<Instant> {
99 match (&self.internal_timer_expires, &self.action_timer_expires) {
100 (None, None) => None,
101 (None, Some((t, _))) => Some(*t),
102 (Some(t), None) => Some(*t),
103 (Some(t1), Some((t2, _))) => Some(*t1.min(t2)),
104 }
105 }
106}
107
108/// Represents the state for all padding machines within a framework.
109///
110/// N should be around the number of padding machines that the framework should support.
111struct PadderState<const N: usize> {
112 /// A list of all the padding machine states for a single framework.
113 ///
114 /// This list is indexed by `MachineId`.
115 //
116 // TODO: Optimize this size even more if appropriate
117 state: SmallVec<[MachineState; N]>,
118}
119
120impl<const N: usize> PadderState<N> {
121 /// Return a mutable reference to the state corresponding to a given [`MachineId`]
122 ///
123 /// # Panics
124 ///
125 /// Panics if `id` is out of range, which can only happen if a MachineId from
126 /// one Framework is given to another Framework.
127 fn state_mut(&mut self, id: MachineId) -> &mut MachineState {
128 &mut self.state[id.into_raw()]
129 }
130
131 /// Execute a single [`TriggerAction`] on this state.
132 ///
133 /// `TriggerActions` are created by `maybenot::Framework` instances
134 /// in response to [`TriggerEvent`]s.
135 ///
136 /// Executing a `TriggerAction` can adjust timers,
137 /// and can schedule a new [`ScheduledAction`] to be taken in the future;
138 /// it does not, however, send any padding or adjust any blocking on its own.
139 ///
140 /// The current time should be provided in `now`.
141 ///
142 /// Executing a `TriggerAction` can cause more events to occur.
143 /// If this happens, they are added to `events_out`.
144 ///
145 /// If this method returns false, no timer has changed.
146 /// If this method returns true, then a timer may have changed.
147 /// (False positives are possible, but not false negatives.)
148 fn trigger_action(
149 &mut self,
150 action: &TriggerAction,
151 now: Instant,
152 events_out: &mut TriggerEventsOutVec,
153 ) -> bool {
154 use maybenot::Timer as T;
155 use maybenot::TriggerAction as A;
156
157 let mut timer_changed = false;
158
159 match action {
160 A::Cancel { machine, timer } => {
161 // "Cancel" means to stop one or both of the timers from this machine.
162 let st = self.state_mut(*machine);
163 match timer {
164 T::Action => st.action_timer_expires = None,
165 T::Internal => st.internal_timer_expires = None,
166 T::All => {
167 st.action_timer_expires = None;
168 st.internal_timer_expires = None;
169 }
170 };
171 timer_changed = true;
172 }
173 A::SendPadding {
174 timeout,
175 bypass,
176 replace,
177 machine,
178 } => {
179 // "SendPadding" means to schedule padding to be sent after a given timeout,
180 // and to replace any previous timed action.
181 let st = self.state_mut(*machine);
182 st.action_timer_expires = Some((
183 now + *timeout,
184 ScheduledAction::SendPadding {
185 bypass: *bypass,
186 replace: *replace,
187 },
188 ));
189 timer_changed = true;
190 }
191 A::BlockOutgoing {
192 timeout,
193 duration,
194 bypass,
195 replace,
196 machine,
197 } => {
198 // "BlockOutgoing" means to begin blocking traffic for a given duration,
199 // after a given timeout,
200 // and to replace any previous timed action.
201 let st = self.state_mut(*machine);
202 st.action_timer_expires = Some((
203 now + *timeout,
204 ScheduledAction::Block {
205 bypass: *bypass,
206 replace: *replace,
207 duration: *duration,
208 },
209 ));
210 timer_changed = true;
211 }
212 A::UpdateTimer {
213 duration,
214 replace,
215 machine,
216 } => {
217 // "UpdateTimer" means to set or re-set the internal timer for this machine.
218 let st = self.state_mut(*machine);
219
220 let new_expiry = now + *duration;
221 // The "replace" flag means "update the internal timer unconditionally".
222 // If it is false, and the timer is already set, then we should only update
223 // the internal timer to be _longer_.
224 let update_timer = match (replace, st.internal_timer_expires) {
225 (_, None) => true,
226 (true, Some(_)) => true,
227 (false, Some(cur)) if new_expiry > cur => true,
228 (false, Some(_)) => false,
229 };
230 if update_timer {
231 st.internal_timer_expires = Some(new_expiry);
232 timer_changed = true;
233 }
234 // Note: We are supposed to trigger TimerBegin unconditionally
235 // if the timer changes at all.
236 events_out.push(TriggerEvent::TimerBegin { machine: *machine });
237 }
238 }
239
240 timer_changed
241 }
242
243 /// Return the next instant (if any) at which any of the padding machines' timers will expire.
244 fn next_expiration(&self) -> Option<Instant> {
245 self.state
246 .iter()
247 .filter_map(MachineState::next_expiration)
248 .min()
249 }
250}
251
252/// Possible state of a Framework's aggregate timer.
253///
254/// (Since there are two possible timers for each Machine,
255/// we just keep track of the one that will expire next.)
256#[derive(Clone, Debug)]
257struct Timer {
258 /// The next time at which any of this padding machines' timer will expire.
259 ///
260 /// (None means "no timers are set.")
261 next_expiration: Option<Instant>,
262
263 /// A [`Waker`] that we must wake whenever `self.next_expiration` becomes sooner than
264 /// our next scheduled wakeup (as passed as an argument to `set_expiration`).
265 waker: Waker,
266}
267
268impl Timer {
269 /// Construct a new Timer.
270 fn new() -> Self {
271 Self {
272 next_expiration: None,
273 waker: Waker::noop().clone(),
274 }
275 }
276
277 /// Return the next expiration time, and schedule `waker` to be alerted whenever
278 /// the expiration time becomes earlier than the time at which we've actually decided to sleep
279 /// (passed as an argument to `set_expiration()`).
280 ///
281 /// (There are two separate expiration times at work here because, in higher-level code,
282 /// we combine _all_ the timer expirations for all padding machines on a circuit
283 /// into a single expiration, and track only that expiration.)
284 fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
285 // TODO: Perhaps this should instead return and/or manipulate a sleep future.
286 // TODO: Perhaps there should be a shared AtomicWaker?
287 self.waker = waker.clone();
288 self.next_expiration
289 }
290
291 /// Change the expiration time to `new_expiration`, alerting the [`Waker`] if that time
292 /// is earlier than `next_scheduled_wakeup`.
293 fn set_expiration(
294 &mut self,
295 new_expiration: Option<Instant>,
296 next_scheduled_wakeup: Option<Instant>,
297 ) {
298 // we need to invoke the waker if the new expiration is earlier than the one the waker has.
299 let wake = match (next_scheduled_wakeup, new_expiration) {
300 (_, None) => false,
301 (None, Some(_)) => true,
302 (Some(w_exp), Some(new_exp)) => new_exp < w_exp,
303 };
304 self.next_expiration = new_expiration;
305 if wake {
306 self.waker.wake_by_ref();
307 }
308 }
309}
310
311/// State of a MaybenotPadder that is blocking.
312///
313/// Here we only need to remember when the blocking expires;
314/// we record the bypassable status of the padding in [`super::PaddingShared`].
315#[derive(Debug)]
316struct BlockingState {
317 /// The time at which this blocking expires.
318 expiration: Instant,
319}
320
321/// An implementation of circuit padding using [`maybenot`].
322///
323/// Supports up to `N` padding machines without spilling over onto the heap.
324pub(super) struct MaybenotPadder<const N: usize> {
325 /// Our underlying [`maybenot::Framework`].
326 framework: Framework,
327 /// The state of our padding machines.
328 state: PadderState<N>,
329 /// Our current timer information.
330 timer: Timer,
331 /// If we are blocking, information about the blocking.
332 blocking: Option<BlockingState>,
333}
334
335impl<const N: usize> MaybenotPadder<N> {
336 /// Construct a new MaybyenotPadder from a provided `FrameworkRules`.
337 pub(super) fn from_framework_rules(
338 rules: &super::PaddingRules,
339 now: Instant,
340 ) -> Result<Self, maybenot::Error> {
341 let framework = maybenot::Framework::new(
342 rules.machines.clone(),
343 rules.max_outbound_padding_frac,
344 rules.max_outbound_blocking_frac,
345 now,
346 ThisThreadRng,
347 )?;
348 Ok(Self::from_framework(framework))
349 }
350
351 /// Construct a new MaybenotPadder from a given Framework.
352 pub(super) fn from_framework(framework: Framework) -> Self {
353 let n = framework.num_machines();
354 let state = PadderState {
355 state: smallvec::smallvec![MachineState::default(); n],
356 };
357 Self {
358 framework,
359 state,
360 timer: Timer::new(),
361 blocking: None,
362 }
363 }
364
365 /// Return the next expiration time, and schedule `waker` to be alerted whenever
366 /// the expiration time becomes earlier than that.
367 pub(super) fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
368 self.timer.get_expiration(waker)
369 }
370
371 /// Tell the padding machines about all of the given `events`,
372 /// report them happening at `now`, and adjust internal state.
373 ///
374 /// If doing this would cause any timer to become earlier than `next_scheduled_wakeup`,
375 /// wake up the registered [`Waker`].
376 pub(super) fn trigger_events_at(
377 &mut self,
378 events: &[TriggerEvent],
379 now: Instant,
380 next_scheduled_wakeup: Option<Instant>,
381 ) {
382 let mut timer_changed = false;
383
384 // A pair of buffers that we'll use to handle events that arise while triggering other
385 // events. (The BeginTimer event can be triggered by the UpdateTimer action.)
386 let (mut e1, mut e2) = (TriggerEventsOutVec::new(), TriggerEventsOutVec::new());
387 let (mut processing, mut pending) = (&mut e1, &mut e2);
388
389 let mut events = events;
390
391 /// If we go through our loop more than this many times, we stop:
392 /// An infinite loop is in theory possible, but we don't want to allow one.
393 const MAX_LOOPS: usize = 4;
394
395 let finished_normally = 'finished: {
396 for _ in 0..MAX_LOOPS {
397 pending.clear();
398 for action in self.framework.trigger_events(events, now) {
399 timer_changed |= self.state.trigger_action(action, now, pending);
400 }
401
402 if pending.is_empty() {
403 // We don't have any additional events to trigger.
404 break 'finished true;
405 } else {
406 std::mem::swap(&mut processing, &mut pending);
407 events = &processing[..];
408 }
409 }
410 // We got to the last iteration of the loop and still had events to trigger.
411 break 'finished false;
412 };
413
414 if !finished_normally {
415 // TODO: Log in this case, but not too many times.
416 }
417
418 if timer_changed {
419 self.timer
420 .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
421 }
422 }
423
424 /// Take any actions that need to occur at time `now`.
425 ///
426 /// We should call this function as soon as possible after our timer has expired.
427 ///
428 /// Returns zero or more [`PerHopPaddingEvent`]s reflecting the padding that we should send,
429 /// and what we should do with blocking.
430 fn take_actions_at(
431 &mut self,
432 now: Instant,
433 next_scheduled_wakeup: Option<Instant>,
434 ) -> PerHopPaddingEventVec {
435 // Events that we need to trigger based on expired timers.
436 // TODO: We might want a smaller N here.
437 let mut e: SmallVec<[TriggerEvent; N]> = SmallVec::default();
438
439 // A list of events that we can't handle internally, and which we need to report
440 // to a circuit/tunnel reactor.
441 let mut return_events = PerHopPaddingEventVec::default();
442
443 let mut timer_changed = false;
444
445 if let Some(blocking) = &self.blocking {
446 if blocking.expiration <= now {
447 timer_changed = true;
448 self.blocking = None;
449 e.push(TriggerEvent::BlockingEnd);
450 return_events.push(PerHopPaddingEvent::StopBlocking);
451 }
452 }
453
454 for (idx, st) in self.state.state.iter_mut().enumerate() {
455 match st.internal_timer_expires {
456 Some(t) if t <= now => {
457 // This machine's internal timer has expired; we tell it so.
458 st.internal_timer_expires = None;
459 timer_changed = true;
460 e.push(TriggerEvent::TimerEnd {
461 machine: MachineId::from_raw(idx),
462 });
463 }
464 None | Some(_) => {}
465 }
466 match &st.action_timer_expires {
467 Some((t, _)) if *t <= now => {
468 // This machine's action timer has expired; we now take that action.
469 use ScheduledAction as SA;
470 let action = st
471 .action_timer_expires
472 .take()
473 .expect("It was Some a minute ago!")
474 .1;
475 timer_changed = true;
476 match action {
477 SA::SendPadding { bypass, replace } => {
478 return_events.push(PerHopPaddingEvent::SendPadding {
479 machine: MachineId::from_raw(idx),
480 replace: Replace::from_bool(replace),
481 bypass: Bypass::from_bool(bypass),
482 });
483 }
484 SA::Block {
485 bypass,
486 replace,
487 duration,
488 } => {
489 let new_expiry = now + duration;
490 if self.blocking.is_none() {
491 return_events.push(PerHopPaddingEvent::StartBlocking {
492 is_bypassable: bypass,
493 });
494 }
495 let replace = match &self.blocking {
496 None => true,
497 Some(b) if replace || b.expiration < new_expiry => true,
498 Some(_) => false,
499 };
500 if replace {
501 self.blocking = Some(BlockingState {
502 expiration: new_expiry,
503 });
504 }
505
506 // We trigger this event unconditionally, even if we were already
507 // blocking.
508 e.push(TriggerEvent::BlockingBegin {
509 machine: MachineId::from_raw(idx),
510 });
511 }
512 }
513 }
514 None | Some(_) => {}
515 }
516 }
517
518 if timer_changed {
519 self.timer
520 .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
521 }
522
523 // Inform the framework of any expired timeouts.
524 self.trigger_events_at(&e[..], now, next_scheduled_wakeup);
525
526 return_events
527 }
528}
529
530/// Helper: An Rng object that calls `rand::rng()` to get the thread Rng.
531///
532/// (We use this since we want our maybenot Framework to use the thread Rng,
533/// but we can't have it _own_ the thread Rng. )
534#[derive(Clone, Debug)]
535pub(super) struct ThisThreadRng;
536impl rand::RngCore for ThisThreadRng {
537 fn next_u32(&mut self) -> u32 {
538 rand::rng().next_u32()
539 }
540
541 fn next_u64(&mut self) -> u64 {
542 rand::rng().next_u64()
543 }
544
545 fn fill_bytes(&mut self, dst: &mut [u8]) {
546 rand::rng().fill_bytes(dst);
547 }
548}
549
550/// Helper trait: Used to wrap a single [`MaybenotPadder`].
551///
552/// (We don't use `MaybenotPadder` directly because we want to keep the freedom
553/// to parameterize it differently, or maybe even to replace it with something else.)
554//
555// TODO circpad: Decide whether this optimization/flexibility makes any sense.
556pub(super) trait PaddingBackend: Send + Sync {
557 /// Report one or more TriggerEvents to the padder.
558 ///
559 /// Alert any registered `Waker` if these events cause us to need to take action
560 /// earlier than `next_scheduled_wakeup`.
561 fn report_events_at(
562 &mut self,
563 events: &[maybenot::TriggerEvent],
564 now: Instant,
565 next_scheduled_wakeup: Option<Instant>,
566 );
567
568 /// Trigger any padding actions that should be taken `now`.
569 ///
570 /// If _we_ should perform any actions (blocking, unblocking, or sending padding),
571 /// return them in a [`PerHopPaddingEventVec`].
572 ///
573 /// Alert any registered `Waker` if these events cause us to need to take action
574 /// earlier than `next_scheduled_wakeup`.
575 fn take_padding_events_at(
576 &mut self,
577 now: Instant,
578 next_scheduled_wakeup: Option<Instant>,
579 ) -> PerHopPaddingEventVec;
580
581 /// This method should be called when we have no actions to perform,
582 /// with a [`Waker`] that will activate the corresponding [`PaddingEventStream`](super::PaddingEventStream).
583 ///
584 /// It will return a time at which pending_events_at() should next be called,
585 /// and will wake up the Waker if it turns out that we need to call `pending_events_at()`
586 /// any earlier than that.
587 fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant>;
588}
589
590impl<const N: usize> PaddingBackend for MaybenotPadder<N> {
591 fn report_events_at(
592 &mut self,
593 events: &[maybenot::TriggerEvent],
594 now: Instant,
595 next_scheduled_wakeup: Option<Instant>,
596 ) {
597 self.trigger_events_at(events, now, next_scheduled_wakeup);
598 }
599
600 fn take_padding_events_at(
601 &mut self,
602 now: Instant,
603 next_scheduled_wakeup: Option<Instant>,
604 ) -> PerHopPaddingEventVec {
605 self.take_actions_at(now, next_scheduled_wakeup)
606 }
607
608 fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
609 self.get_expiration(waker)
610 }
611}