Skip to main content

tor_proto/client/circuit/padding/
maybenot_padding.rs

1//! A `maybenot`-specific backend for padding.
2
3// Some of the circuit padding implementation isn't reachable unless
4// the extra-experimental circ-padding-manual feature is also present.
5//
6// TODO circpad: Remove this once we have circ-padding negotiation implemented.
7#![cfg_attr(
8    all(feature = "circ-padding", not(feature = "circ-padding-manual")),
9    expect(dead_code)
10)]
11
12mod backend;
13
14use std::collections::VecDeque;
15use std::num::NonZeroU16;
16use std::pin::Pin;
17use std::sync::{Arc, Mutex};
18use std::task::{Context, Poll, Waker};
19
20use bitvec::BitArr;
21use maybenot::MachineId;
22use smallvec::SmallVec;
23use tor_memquota::memory_cost_structural_copy;
24use tor_rtcompat::{DynTimeProvider, SleepProvider};
25
26use crate::HopNum;
27use crate::circuit::HOPS;
28use crate::util::err::ExcessPadding;
29use backend::PaddingBackend;
30
31/// The type of Instant that we'll use for our padding machines.
32///
33/// We use a separate type alias here in case we want to move to coarsetime.
34type Instant = web_time_compat::Instant;
35
36/// The type of Duration that we'll use for our padding machines.
37///
38/// We use a separate type alias here in case we want to move to coarsetime.
39type Duration = std::time::Duration;
40
41/// A type we use to generate a set of [`PaddingEvent`].
42///
43/// This is a separate type so we can tune it and make it into a smallvec if needed.
44type PaddingEventQueue = VecDeque<PaddingEvent>;
45
46/// A type we use to generate a set of [`PaddingEvent`].
47///
48/// This is a separate type so we can tune it and make it into a smallvec if needed.
49type PerHopPaddingEventVec = Vec<PerHopPaddingEvent>;
50
51/// Specifications for a set of maybenot padding machines as used in Arti: used to construct a `maybenot::Framework`.
52#[derive(Clone, Debug, derive_builder::Builder)]
53#[builder(build_fn(
54    validate = "Self::validate",
55    private,
56    error = "CircuitPadderConfigError"
57))]
58#[builder(name = "CircuitPadderConfig")]
59#[cfg_attr(not(feature = "circ-padding-manual"), builder(private))]
60#[cfg_attr(feature = "circ-padding-manual", builder(public))]
61pub(crate) struct PaddingRules {
62    /// List of padding machines to use for shaping traffic.
63    ///
64    /// Note that this list may be empty, if we only want to receive padding,
65    /// and never send it.
66    machines: Arc<[maybenot::Machine]>,
67    /// Maximum allowable outbound padding fraction.
68    ///
69    /// Passed directly to maybenot; not enforced in Arti.
70    /// See [`maybenot::Framework::new`] for details.
71    ///
72    /// Must be between 0.0 and 1.0
73    #[builder(default = "1.0")]
74    max_outbound_blocking_frac: f64,
75    /// Maximum allowable outbound blocking fraction.
76    ///
77    /// Passed directly to maybenot; not enforced in Arti.
78    /// See [`maybenot::Framework::new`] for details.
79    ///
80    /// Must be between 0.0 and 1.0.
81    #[builder(default = "1.0")]
82    max_outbound_padding_frac: f64,
83    /// Maximum allowable fraction of inbound padding
84    #[builder(default = "1.0")]
85    max_inbound_padding_frac: f64,
86    /// Number of cells before which we should not enforce max_inbound_padding_frac.
87    #[builder(default = "1")]
88    enforce_inbound_padding_after_cells: u16,
89}
90
91/// An error returned from validating a [`CircuitPadderConfig`].
92#[derive(Clone, Debug, thiserror::Error)]
93#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
94#[non_exhaustive]
95pub(crate) enum CircuitPadderConfigError {
96    /// A field needed to be given, but wasn't.
97    #[error("No value was given for {0}")]
98    UninitializedField(&'static str),
99    /// A field needed to be a proper fraction, but wasn't.
100    #[error("Value was out of range for {0}. (Must be between 0 and 1)")]
101    FractionOutOfRange(&'static str),
102    /// Maybenot gave us an error when initializing the framework.
103    #[error("Maybenot could not initialize framework for rules")]
104    MaybenotError(#[from] maybenot::Error),
105}
106
107impl From<derive_builder::UninitializedFieldError> for CircuitPadderConfigError {
108    fn from(value: derive_builder::UninitializedFieldError) -> Self {
109        Self::UninitializedField(value.field_name())
110    }
111}
112
113impl CircuitPadderConfig {
114    /// Helper: Return an error if this is not a valid Builder.
115    fn validate(&self) -> Result<(), CircuitPadderConfigError> {
116        macro_rules! enforce_frac {
117            { $field:ident } =>
118            {
119                if self.$field.is_some_and(|v| ! (0.0 .. 1.0).contains(&v)) {
120                    return Err(CircuitPadderConfigError::FractionOutOfRange(stringify!($field)));
121                }
122            }
123        }
124        enforce_frac!(max_outbound_blocking_frac);
125        enforce_frac!(max_outbound_padding_frac);
126        enforce_frac!(max_inbound_padding_frac);
127
128        Ok(())
129    }
130
131    /// Construct a [`CircuitPadder`] based on this [`CircuitPadderConfig`].
132    ///
133    /// A [`CircuitPadderConfig`] is created its accessors, and used with this method to build a [`CircuitPadder`].
134    ///
135    /// That [`CircuitPadder`] can then be installed on a circuit using [`ClientCirc::start_padding_at_hop`](crate::client::circuit::ClientCirc::start_padding_at_hop).
136    #[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
137    pub(crate) fn create_padder(
138        &self,
139        now: Instant,
140    ) -> Result<CircuitPadder, CircuitPadderConfigError> {
141        let rules = self.build()?;
142        let backend = rules.create_padding_backend(now)?;
143        let initial_stats = rules.initialize_stats();
144        Ok(CircuitPadder {
145            initial_stats,
146            backend,
147        })
148    }
149}
150
151impl PaddingRules {
152    /// Create a [`PaddingBackend`] for this [`PaddingRules`], so we can install it in a
153    /// [`PaddingShared`].
154    fn create_padding_backend(
155        &self,
156        now: Instant,
157    ) -> Result<Box<dyn PaddingBackend>, maybenot::Error> {
158        // TODO circpad: specialize this for particular values of n_machines,
159        // when we finally go to implement padding.
160        const OPTIMIZE_FOR_N_MACHINES: usize = 4;
161
162        let backend =
163            backend::MaybenotPadder::<OPTIMIZE_FOR_N_MACHINES>::from_framework_rules(self, now)?;
164        Ok(Box::new(backend))
165    }
166
167    /// Create a new `PaddingStats` to reflect the rules for inbound padding of this  PaddingRules
168    fn initialize_stats(&self) -> PaddingStats {
169        PaddingStats {
170            n_padding: 0,
171            n_normal: 0,
172            max_padding_frac: self.max_inbound_padding_frac as f32,
173            // We just convert 0 to 1, since that's necessarily what was meant.
174            enforce_max_after: self
175                .enforce_inbound_padding_after_cells
176                .try_into()
177                .unwrap_or(1.try_into().expect("1 was not nonzero!?")),
178        }
179    }
180}
181
182/// A opaque handle to a padding implementation for a single hop.
183///
184/// This type is constructed with [`CircuitPadderConfig::create_padder`].
185#[derive(derive_more::Debug)]
186#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
187pub(crate) struct CircuitPadder {
188    /// The initial padding stats and restrictions for inbound padding.
189    initial_stats: PaddingStats,
190    /// The underlying backend to use.
191    #[debug(skip)]
192    backend: Box<dyn PaddingBackend>,
193}
194
195/// An instruction from the padding machine to the circuit.
196///
197/// These are returned from the [`PaddingEventStream`].
198///
199/// When the `circ-padding` feature is disabled, these won't actually be constructed.
200#[derive(Clone, Copy, Debug)]
201pub(crate) enum PaddingEvent {
202    /// An instruction to send padding.
203    SendPadding(SendPadding),
204    /// An instruction to start blocking outbound traffic,
205    /// or change the hop at which traffic is blocked.
206    StartBlocking(StartBlocking),
207    /// An instruction to stop all blocking.
208    StopBlocking,
209}
210
211/// An instruction from a single padding hop.
212///
213/// This will be turned into a [`PaddingEvent`] before it's given
214/// to the circuit reactor.
215#[derive(Clone, Copy, Debug)]
216enum PerHopPaddingEvent {
217    /// An instruction to send padding.
218    SendPadding {
219        /// The machine that told us to send the padding.
220        ///
221        /// (We need to use this when we report that we sent the padding.)
222        machine: MachineId,
223        /// Whether the padding can be replaced with regular data.
224        replace: Replace,
225        /// Whether the padding can bypass a bypassable block.
226        bypass: Bypass,
227    },
228    /// An instruction to start blocking traffic..
229    StartBlocking {
230        /// Whether this blocking instance may be bypassed by padding with
231        /// [`Bypass::BypassBlocking`].
232        ///
233        /// (Note that this is _not_ a `Bypass`, since that enum notes whether
234        /// or not a _padding_ cell can bypass blocking)
235        is_bypassable: bool,
236    },
237    /// An instruction to stop blocking.
238    StopBlocking,
239}
240
241/// Whether a given piece of padding can be replaced with queued data.
242///
243/// This is an enum to avoid confusing it with `Bypass`.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub(crate) enum Replace {
246    /// The padding can be replaced
247    /// either by packaging data in a regular data cell,
248    /// or with data currently queued but not yet sent.
249    Replaceable,
250    /// The padding must be queued; it can't be replaced with data.
251    NotReplaceable,
252}
253
254impl Replace {
255    /// Construct a [`Replace`] from a bool.
256    fn from_bool(replace: bool) -> Self {
257        match replace {
258            true => Replace::Replaceable,
259            false => Replace::NotReplaceable,
260        }
261    }
262}
263
264/// Whether a piece of padding can bypass a bypassable case of blocking.
265///
266/// This is an enum to avoid confusing it with `Release`.
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub(crate) enum Bypass {
269    /// This padding may bypass the block, if the block is bypassable.
270    ///
271    /// Note that this case has complicated interactions with `Replace`; see the
272    /// `maybenot` documentation.
273    BypassBlocking,
274    /// The padding may not bypass the block.
275    DoNotBypass,
276}
277
278/// Information about a queued cell that we need to feed back into the padding
279/// subsystem.
280#[derive(Clone, Copy, Debug)]
281pub(crate) struct QueuedCellPaddingInfo {
282    /// The hop that will receive this cell.
283    pub(crate) target_hop: HopNum,
284}
285memory_cost_structural_copy!(QueuedCellPaddingInfo);
286
287impl Bypass {
288    /// Construct a [`Bypass`] from a bool.
289    fn from_bool(replace: bool) -> Self {
290        match replace {
291            true => Bypass::BypassBlocking,
292            false => Bypass::DoNotBypass,
293        }
294    }
295}
296
297/// An indication that we should send a padding cell.
298///
299/// Don't drop this: instead, once the cell is queued,
300/// pass this `SendPadding` object to the relevant [`PaddingController`]
301/// to report that the particular piece of padding has been queued.
302#[derive(Clone, Debug, Copy)]
303pub(crate) struct SendPadding {
304    /// The machine within a framework that told us to send the padding.
305    ///
306    /// We store this so we can tell the framework which machine's padding we sent.
307    machine: maybenot::MachineId,
308
309    /// The hop to which we need to send the padding.
310    pub(crate) hop: HopNum,
311
312    /// Whether this padding can be replaced by regular data.
313    pub(crate) replace: Replace,
314
315    /// Whether this padding cell should bypass any current blocking.
316    pub(crate) bypass: Bypass,
317}
318
319impl SendPadding {
320    /// Convert this SendPadding into a TriggerEvent for Maybenot,
321    /// to indicate that the padding was sent.
322    fn into_sent_event(self) -> maybenot::TriggerEvent {
323        maybenot::TriggerEvent::PaddingSent {
324            machine: self.machine,
325        }
326    }
327
328    /// If true, we are allowed to replace this padding cell
329    /// with a normal non-padding cell.
330    ///
331    /// (If we do, we should call [`PaddingController::queued_data_as_padding`])
332    pub(crate) fn may_replace_with_data(&self) -> Replace {
333        self.replace
334    }
335
336    /// Return whether this padding cell is allowed to bypass any current blocking.
337    pub(crate) fn may_bypass_block(&self) -> Bypass {
338        self.bypass
339    }
340}
341
342/// An instruction to start blocking traffic
343/// or to change the rules for blocking traffic.
344#[derive(Clone, Copy, Debug)]
345pub(crate) struct StartBlocking {
346    /// If true, then padding traffic _to the blocking hop_
347    /// can bypass this block, if it has [`Bypass::BypassBlocking`].
348    ///
349    /// (All traffic can be sent to earlier hops as normal.
350    /// No traffic may be sent to later hops.)
351    pub(crate) is_bypassable: bool,
352}
353
354/// Absolute upper bound for number of hops.
355const MAX_HOPS: usize = 64;
356
357/// A handle to the padding state of a single circuit.
358///
359/// Used to tell the padders about events that they need to react to.
360#[derive(Clone, derive_more::Debug)]
361pub(crate) struct PaddingController<S = DynTimeProvider>
362where
363    S: SleepProvider,
364{
365    /// The underlying shared state.
366    #[debug(skip)]
367    shared: Arc<Mutex<PaddingShared<S>>>,
368}
369
370/// The shared state for a single circuit's padding.
371///
372/// Used by both PaddingController and PaddingEventStream.
373struct PaddingShared<S: SleepProvider> {
374    /// A sleep provider for telling the time and creating sleep futures.
375    runtime: S,
376    /// Per-hop state for each hop that we have enabled padding with.
377    ///
378    /// INVARIANT: the length of this vector is no greater than `MAX_HOPS`.
379    hops: SmallVec<[Option<Box<dyn PaddingBackend>>; HOPS]>,
380    /// Records about how much padding and normal traffic we have received from each hop,
381    /// and how much padding is allowed.
382    stats: SmallVec<[Option<PaddingStats>; HOPS]>,
383    /// Which hops are currently blocking, and whether that blocking is bypassable.
384    blocking: BlockingState,
385    /// When will the currently pending sleep future next expire?
386    ///
387    /// We keep track of this so that we know when we need to reset the sleep future.
388    /// It gets updated by `PaddingStream::schedule_next_wakeup`,
389    /// which we call in `<PaddingStream as Stream>::poll_next` immediately
390    /// before we create a timer.
391    next_scheduled_wakeup: Option<Instant>,
392
393    /// A deque of `PaddingEvent` that we want to yield from our [`PaddingEventStream`].
394    ///
395    /// NOTE: If you put new items in this list from anywhere other than inside
396    /// `PaddingEventStream::poll_next`, you need to alert the `waker`.
397    pending_events: PaddingEventQueue,
398
399    /// A waker to alert if we've added any events to padding_events,
400    /// or if we need the stream to re-poll.
401    //
402    // TODO circpad: This waker is redundant with the one stored in every backend's `Timer`.
403    // When we revisit this code we may want to consider combining them somehow.
404    waker: Waker,
405}
406
407/// The number of padding and non-padding cells we have received from each hop,
408/// and the rules for how many are allowed.
409#[derive(Clone, Debug)]
410struct PaddingStats {
411    /// The number of padding cells we've received from this hop.
412    n_padding: u64,
413    /// The number of non-padding cells we've received from this hop.
414    n_normal: u64,
415    /// The maximum allowable fraction of padding cells.
416    max_padding_frac: f32,
417    /// A lower limit, below which we will not enforce `max_padding_frac`.
418    //
419    // This is a NonZero for several reasons:
420    // - It doesn't make sense to enforce a ratio when no cells have been received.
421    // - If we only check when the total is at above zero, we can avoid a division-by-zero check.
422    // - Having an impossible value here ensures that the niche optimization
423    //   will work on PaddingStats.
424    enforce_max_after: NonZeroU16,
425}
426
427impl PaddingStats {
428    /// Return an error if this PaddingStats has exceeded its maximum.
429    fn validate(&self) -> Result<(), ExcessPadding> {
430        // Total number of cells.
431        // (It is impossible to get so many cells that this addition will overflow a u64.)
432        let total = self.n_padding + self.n_normal;
433
434        if total >= u16::from(self.enforce_max_after).into() {
435            // TODO: is there a way we can avoid a floating-point op here?
436            // Or can we limit the number of times that we need to check?
437            // (Tobias suggests randomization; I'm worried.)
438            //
439            // On the one hand, this may never appear on our profiles.
440            // But on the other hand, if it _does_ matter for performance,
441            // it is likely to be on some marginal platform with bad FP performance,
442            // where we are unlikely to be doing much testing.
443            //
444            // One promising possibility is to calculate a minimum amount of padding
445            // that we _know_ will be valid, given the current total,
446            // and then not check again until we at all until we reach that amount.
447            if self.n_padding as f32 > (total as f32 * self.max_padding_frac) {
448                return Err(ExcessPadding::PaddingExceedsLimit);
449            }
450        }
451        Ok(())
452    }
453}
454
455/// Current padding-related blocking status for a circuit.
456///
457/// We have to keep track of whether each hop is blocked or not,
458/// and whether its blocking is bypassable.
459/// But all we actually need to tell the reactor code
460/// is whether to block the _entire_ circuit or not.
461//
462// TODO circpad: It might beneficial
463// to block only the first blocking hop and its successors,
464// but that creates tricky starvation problems
465// in the case where we have queued traffic for a later, blocking, hop
466// that prevents us from flushing any messages to earlier hops.
467// We could solve this with tricky out-of-order designs,
468// but for now we're just treating "blocked" as a boolean.
469#[derive(Default)]
470struct BlockingState {
471    /// Whether each hop is currently blocked.
472    hop_blocked: BitArr![for MAX_HOPS],
473    /// Whether each hop's blocking is currently **not** bypassable.
474    blocking_non_bypassable: BitArr![for MAX_HOPS],
475}
476
477impl BlockingState {
478    /// Set the hop at position `idx` to blocked.
479    fn set_blocked(&mut self, idx: usize, is_bypassable: bool) {
480        self.hop_blocked.set(idx, true);
481        self.blocking_non_bypassable.set(idx, !is_bypassable);
482    }
483    /// Set the hop at position `idx` to unblocked.
484    fn set_unblocked(&mut self, idx: usize) {
485        self.hop_blocked.set(idx, false);
486        self.blocking_non_bypassable.set(idx, false);
487    }
488    /// Return a [`PaddingEvent`]
489    fn blocking_update_paddingevent(&self) -> PaddingEvent {
490        if self.blocking_non_bypassable.any() {
491            // At least one hop has non-bypassable blocking, so our blocking is non-bypassable.
492            PaddingEvent::StartBlocking(StartBlocking {
493                is_bypassable: false,
494            })
495        } else if self.hop_blocked.any() {
496            // At least one hop is blocking, but no hop has non-bypassable padding, so this padding
497            // is bypassable.
498            PaddingEvent::StartBlocking(StartBlocking {
499                is_bypassable: true,
500            })
501        } else {
502            // Nobody is blocking right now; it's time to unblock.
503            PaddingEvent::StopBlocking
504        }
505    }
506}
507
508#[allow(clippy::unnecessary_wraps)]
509impl<S: SleepProvider> PaddingController<S> {
510    /// Report that we've enqueued a non-padding cell for a given hop.
511    ///
512    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
513    /// when this cell is flushed.
514    pub(crate) fn queued_data(&self, hop: HopNum) -> Option<QueuedCellPaddingInfo> {
515        let mut shared = self.shared.lock().expect("Lock poisoned");
516        // Every hop up to and including the target hop will see this as normal data.
517        shared.trigger_events(hop, &[maybenot::TriggerEvent::NormalSent]);
518        shared.info_for_hop(hop)
519    }
520
521    /// Install the given [`CircuitPadder`] to start padding traffic to the listed `hop`.
522    ///
523    /// Stops padding if the provided padder is `None`.
524    ///
525    /// Replaces any previous [`CircuitPadder`].
526    pub(crate) fn install_padder_padding_at_hop(&self, hop: HopNum, padder: Option<CircuitPadder>) {
527        self.shared
528            .lock()
529            .expect("lock poisoned")
530            .set_hop_backend(hop, padder);
531    }
532
533    /// Report that we have enqueued a non-padding cell
534    /// in place of a replaceable padding cell
535    /// for a given hop.
536    ///
537    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
538    /// when this cell is flushed.
539    pub(crate) fn queued_data_as_padding(
540        &self,
541        hop: HopNum,
542        sendpadding: SendPadding,
543    ) -> Option<QueuedCellPaddingInfo> {
544        assert_eq!(hop, sendpadding.hop);
545        assert_eq!(Replace::Replaceable, sendpadding.replace);
546        let mut shared = self.shared.lock().expect("Lock poisoned");
547        shared.trigger_events_mixed(
548            hop,
549            // Each intermediate hop sees this as normal data.
550            &[maybenot::TriggerEvent::NormalSent],
551            // For the target hop, we treat this both as normal, _and_ as padding.
552            &[
553                maybenot::TriggerEvent::NormalSent,
554                sendpadding.into_sent_event(),
555            ],
556        );
557        shared.info_for_hop(hop)
558    }
559
560    /// Report that we have enqueued a padding cell to a given hop.
561    ///
562    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
563    /// when this cell is flushed.
564    pub(crate) fn queued_padding(
565        &self,
566        hop: HopNum,
567        sendpadding: SendPadding,
568    ) -> Option<QueuedCellPaddingInfo> {
569        assert_eq!(hop, sendpadding.hop);
570        let mut shared = self.shared.lock().expect("Lock poisoned");
571        shared.trigger_events_mixed(
572            hop,
573            // Each intermediate hop sees this as normal data.
574            &[maybenot::TriggerEvent::NormalSent],
575            // The target hop sees this as padding.
576            &[sendpadding.into_sent_event()],
577        );
578        shared.info_for_hop(hop)
579    }
580
581    /// Report that we are using an already-queued cell
582    /// as a substitute for sending padding to a given hop.
583    pub(crate) fn replaceable_padding_already_queued(&self, hop: HopNum, sendpadding: SendPadding) {
584        assert_eq!(hop, sendpadding.hop);
585        let mut shared = self.shared.lock().expect("Lock poisoned");
586        shared.trigger_events_mixed(
587            hop,
588            // No additional data will be seen for any intermediate hops.
589            &[],
590            // The target hop's machine sees this as padding.
591            &[sendpadding.into_sent_event()],
592        );
593    }
594
595    /// Report that we've flushed a cell from the queue for the given hop.
596    pub(crate) fn flushed_relay_cell(&self, info: QueuedCellPaddingInfo) {
597        // Every hop up to the last
598        let mut shared = self.shared.lock().expect("Lock poisoned");
599        shared.trigger_events(info.target_hop, &[maybenot::TriggerEvent::TunnelSent]);
600    }
601
602    /// Report that we've flushed a cell from the per-channel queue.
603    pub(crate) fn flushed_channel_cell(&self) {
604        let mut shared = self.shared.lock().expect("Lock poisoned");
605        shared.trigger_events(HopNum::from(0), &[maybenot::TriggerEvent::TunnelSent]);
606    }
607
608    /// Report that we have decrypted a non-padding cell from our queue
609    /// from a given hop.
610    ///
611    // Note that in theory, it would be better to trigger TunnelRecv as soon as
612    // possible after we receive and enqueue the data cell, and NormalRecv only
613    // once we've decrypted it and found it to be data.  But we can't do that,
614    // since we won't know which hop actually originated the cell until we
615    // decrypt it.
616    pub(crate) fn decrypted_data(&self, hop: HopNum) {
617        let mut shared = self.shared.lock().expect("Lock poisoned");
618        shared.inc_normal_received(hop);
619        shared.trigger_events(
620            hop,
621            // We treat this as normal data from every hop.
622            &[
623                maybenot::TriggerEvent::TunnelRecv,
624                maybenot::TriggerEvent::NormalRecv,
625            ],
626        );
627    }
628    /// Report that we have decrypted a padding cell from our queue.
629    ///
630    /// Return an error if this padding cell is not acceptable
631    /// (because we have received too much padding from this hop,
632    /// or because we have not enabled padding with this hop.)
633    //
634    // See note above.
635    pub(crate) fn decrypted_padding(&self, hop: HopNum) -> Result<(), crate::Error> {
636        let mut shared = self.shared.lock().expect("Lock poisoned");
637        shared
638            .inc_padding_received(hop)
639            .map_err(|e| crate::Error::ExcessPadding(e, hop))?;
640        shared.trigger_events_mixed(
641            hop,
642            // We treat this as normal data from the intermediate hops.
643            &[
644                maybenot::TriggerEvent::TunnelRecv,
645                maybenot::TriggerEvent::NormalRecv,
646            ],
647            // But from the target hop, it counts as padding.
648            &[
649                maybenot::TriggerEvent::TunnelRecv,
650                maybenot::TriggerEvent::PaddingRecv,
651            ],
652        );
653        Ok(())
654    }
655}
656
657impl<S: SleepProvider> PaddingShared<S> {
658    /// Trigger a list of maybenot events on every hop up to and including `hop`.
659    fn trigger_events(&mut self, hop: HopNum, events: &[maybenot::TriggerEvent]) {
660        let final_idx = usize::from(hop);
661        let now = self.runtime.now();
662        let next_scheduled_wakeup = self.next_scheduled_wakeup;
663        for hop_controller in self.hops.iter_mut().take(final_idx + 1) {
664            let Some(hop_controller) = hop_controller else {
665                continue;
666            };
667            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
668        }
669    }
670
671    /// Trigger `intermediate_hop_events` on every hop up to but _not_ including `hop`.
672    ///
673    /// Trigger `final_hop_events` on `hop`.
674    ///
675    /// (Don't trigger anything on any hops _after_ `hop`.)
676    fn trigger_events_mixed(
677        &mut self,
678        hop: HopNum,
679        intermediate_hop_events: &[maybenot::TriggerEvent],
680        final_hop_events: &[maybenot::TriggerEvent],
681    ) {
682        use itertools::Itertools as _;
683        use itertools::Position as P;
684        let final_idx = usize::from(hop);
685        let now = self.runtime.now();
686        let next_scheduled_wakeup = self.next_scheduled_wakeup;
687        for (position, hop_controller) in self.hops.iter_mut().take(final_idx + 1).with_position() {
688            let Some(hop_controller) = hop_controller else {
689                continue;
690            };
691            let events = match position {
692                P::First | P::Middle => intermediate_hop_events,
693                P::Last | P::Only => final_hop_events,
694            };
695            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
696        }
697    }
698
699    /// Increment the normal cell count from every hop up to and including `hop`.
700    fn inc_normal_received(&mut self, hop: HopNum) {
701        let final_idx = usize::from(hop);
702        for stats in self.stats.iter_mut().take(final_idx + 1).flatten() {
703            stats.n_normal += 1;
704        }
705    }
706
707    /// Increment the padding count from `hop`, and the normal cell count from all earlier hops.
708    ///
709    /// Return an error if a padding cell from `hop` would not be acceptable.
710    fn inc_padding_received(&mut self, hop: HopNum) -> Result<(), ExcessPadding> {
711        use itertools::Itertools as _;
712        use itertools::Position as P;
713        let final_idx = usize::from(hop);
714        for (position, stats) in self.stats.iter_mut().take(final_idx + 1).with_position() {
715            match (position, stats) {
716                (P::First | P::Middle, Some(stats)) => stats.n_normal += 1,
717                (P::First | P::Middle, None) => {}
718                (P::Last | P::Only, Some(stats)) => {
719                    stats.n_padding += 1;
720                    stats.validate()?;
721                }
722                (P::Last | P::Only, None) => {
723                    return Err(ExcessPadding::NoPaddingNegotiated);
724                }
725            }
726        }
727        Ok(())
728    }
729
730    /// Return the `QueuedCellPaddingInfo` to use when sending messages to `target_hop`
731    #[allow(clippy::unnecessary_wraps)]
732    fn info_for_hop(&self, target_hop: HopNum) -> Option<QueuedCellPaddingInfo> {
733        // TODO circpad optimization: This is always Some for now, but we
734        // could someday avoid creating this object
735        // when padding is not enabled on the circuit,
736        // or if padding is not enabled on any hop of the circuit <= target_hop.
737        Some(QueuedCellPaddingInfo { target_hop })
738    }
739}
740
741impl<S: SleepProvider> PaddingShared<S> {
742    /// Install or remove a [`CircuitPadder`] for a single hop.
743    fn set_hop_backend(&mut self, hop: HopNum, backend: Option<CircuitPadder>) {
744        let hop_idx: usize = hop.into();
745        assert!(hop_idx < MAX_HOPS);
746        let n_needed = hop_idx + 1;
747        // Make sure there are enough spaces in self.hops.
748        // We can't use "resize" or "extend", since Box<dyn<PaddingBackend>>
749        // doesn't implement Clone, which SmallVec requires.
750        while self.hops.len() < n_needed {
751            self.hops.push(None);
752        }
753        while self.stats.len() < n_needed {
754            self.stats.push(None);
755        }
756        // project through option...
757        let (hop_backend, stats) = if let Some(padder) = backend {
758            (Some(padder.backend), Some(padder.initial_stats))
759        } else {
760            (None, None)
761        };
762        self.hops[hop_idx] = hop_backend;
763        self.stats[hop_idx] = stats;
764
765        let was_blocked = self.blocking.hop_blocked[hop_idx];
766        self.blocking.set_unblocked(hop_idx);
767        if was_blocked {
768            self.pending_events
769                .push_back(self.blocking.blocking_update_paddingevent());
770        }
771
772        // We need to alert the stream, in case we added an event above, and so that it will poll
773        // the new padder at least once.
774        self.waker.wake_by_ref();
775    }
776
777    /// Transform a [`PerHopPaddingEvent`] for a single hop with index `idx` into a [`PaddingEvent`],
778    /// updating our state as appropriate.
779    fn process_per_hop_event(
780        blocking: &mut BlockingState,
781        hop_idx: usize,
782        event: PerHopPaddingEvent,
783    ) -> PaddingEvent {
784        use PaddingEvent as PE;
785        use PerHopPaddingEvent as PHPE;
786
787        match event {
788            PHPE::SendPadding {
789                machine,
790                replace,
791                bypass,
792            } => PE::SendPadding(SendPadding {
793                machine,
794                hop: hopnum_from_hop_idx(hop_idx),
795                replace,
796                bypass,
797            }),
798            PHPE::StartBlocking { is_bypassable } => {
799                // NOTE that we remember is_bypassable for every hop, but the blocking is only
800                // bypassable if _every_ hop is unblocked, or has bypassable blocking.
801                blocking.set_blocked(hop_idx, is_bypassable);
802                blocking.blocking_update_paddingevent()
803            }
804            PHPE::StopBlocking => {
805                blocking.set_unblocked(hop_idx);
806                blocking.blocking_update_paddingevent()
807            }
808        }
809    }
810
811    /// Extract every PaddingEvent that is ready to be reported to the circuit at time `now`.
812    ///
813    /// May trigger other events, or wake up the stream, in the course of running.
814    fn take_padding_events_at(&mut self, now: Instant) -> PaddingEventQueue {
815        let mut output = PaddingEventQueue::default();
816        for (hop_idx, backend) in self.hops.iter_mut().enumerate() {
817            let Some(backend) = backend else {
818                continue;
819            };
820
821            let hop_events = backend.take_padding_events_at(now, self.next_scheduled_wakeup);
822
823            output.extend(
824                hop_events
825                    .into_iter()
826                    .map(|ev| Self::process_per_hop_event(&mut self.blocking, hop_idx, ev)),
827            );
828        }
829        output
830    }
831
832    /// Find the next time at which we should wake up the stream, and register it as our
833    /// "next scheduled wakeup".
834    fn schedule_next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
835        // Find the earliest time at which any hop has a scheduled event.
836        let next_expiration = self
837            .hops
838            .iter_mut()
839            .flatten()
840            .filter_map(|hop| hop.next_wakeup(waker))
841            .min();
842        self.next_scheduled_wakeup = next_expiration;
843        self.waker = waker.clone();
844        next_expiration
845    }
846}
847
848/// A stream of [`PaddingEvent`] to tell a circuit when (if at all) it should send
849/// padding and block traffic.
850//
851// TODO circpad: Optimize this even more for the no-padding case?
852// We could make it smaller or faster.
853pub(crate) struct PaddingEventStream<S = DynTimeProvider>
854where
855    S: SleepProvider,
856{
857    /// An underlying list of PaddingBackend.
858    shared: Arc<Mutex<PaddingShared<S>>>,
859
860    /// A future defining a time at which we must next call `padder.padding_events_at`.
861    ///
862    /// (We also arrange for the backend to wake us up if we need to change this time,
863    /// or call `padder.padding_events_at`.)
864    ///
865    /// Note that this timer is allowed to be _earlier_ than our true wakeup time,
866    /// but not later.
867    sleep_future: S::SleepFuture,
868}
869
870impl futures::Stream for PaddingEventStream {
871    type Item = PaddingEvent;
872
873    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
874        loop {
875            let (now, next_wakeup, runtime) = {
876                // We destructure like this to avoid simultaneous mutable/immutable borrows.
877                let Self { shared, .. } = &mut *self;
878
879                let mut shared = shared.lock().expect("Poisoned lock");
880
881                // Do we have any events that are waiting to be yielded?
882                if let Some(val) = shared.pending_events.pop_front() {
883                    return Poll::Ready(Some(val));
884                }
885
886                // Does the padder have any events that have become ready to be yielded?
887                let now = shared.runtime.now();
888                shared.pending_events = shared.take_padding_events_at(now);
889
890                if let Some(val) = shared.pending_events.pop_front() {
891                    return Poll::Ready(Some(val));
892                }
893
894                // If we reach this point, there are no events to trigger right now.
895                //
896                // We'll ask all the padders for the time at which they next might need to take
897                // action, and register our Waker with them, to be alerted if we need to take any action
898                // before that.
899                (
900                    now,
901                    shared.schedule_next_wakeup(cx.waker()),
902                    shared.runtime.clone(),
903                )
904                // Here we drop the lock on the shared state.
905            };
906
907            match next_wakeup {
908                None => {
909                    return Poll::Pending;
910                }
911                Some(t) => {
912                    // TODO circpad: Avoid rebuilding sleep future needlessly.  May require new APIs in
913                    // tor-rtcompat.
914                    self.sleep_future = runtime.sleep(t.saturating_duration_since(now));
915                    match self.sleep_future.as_mut().poll(cx) {
916                        Poll::Ready(()) => {
917                            // Okay, The timer expired already. Continue through the loop.
918                            continue;
919                        }
920                        Poll::Pending => return Poll::Pending,
921                    }
922                }
923            }
924        }
925    }
926}
927
928impl futures::stream::FusedStream for PaddingEventStream {
929    fn is_terminated(&self) -> bool {
930        // This stream is _never_ terminated: even if it has no padding machines now,
931        // we might add some in the future.
932        false
933    }
934}
935
936/// Construct a HopNum from an index into the `hops` field of a [`PaddingShared`].
937///
938/// # Panics
939///
940/// Panics if `hop_idx` is greater than u8::MAX, which should be impossible.
941fn hopnum_from_hop_idx(hop_idx: usize) -> HopNum {
942    // (Static assertion: makes sure we can represent every index of hops as a HopNum.)
943    const _: () = assert!(MAX_HOPS < u8::MAX as usize);
944    HopNum::from(u8::try_from(hop_idx).expect("hop_idx out of range!"))
945}
946
947/// Create a new, empty padding instance for a new circuit.
948pub(crate) fn new_padding<S>(runtime: S) -> (PaddingController<S>, PaddingEventStream<S>)
949where
950    S: SleepProvider,
951{
952    // Start with an arbitrary sleep future.  We won't actually use this until
953    // the first time that we have an event to schedule, so the timeout doesn't matter.
954    let sleep_future = runtime.sleep(Duration::new(86400, 0));
955
956    let shared = PaddingShared {
957        runtime,
958        hops: Default::default(),
959        stats: Default::default(),
960        blocking: Default::default(),
961        next_scheduled_wakeup: None,
962        pending_events: PaddingEventQueue::default(),
963        waker: Waker::noop().clone(),
964    };
965    let shared = Arc::new(Mutex::new(shared));
966    let controller = PaddingController {
967        shared: shared.clone(),
968    };
969    let stream = PaddingEventStream {
970        shared,
971        sleep_future,
972    };
973
974    (controller, stream)
975}