tor_proto/client/circuit/padding/maybenot_padding.rs
1//! A `maybenot`-specific backend for padding.
2
3// Some of the circuit padding implementation isn't reachable unless
4// the extra-experimental circ-padding-manual feature is also present.
5//
6// TODO circpad: Remove this once we have circ-padding negotiation implemented.
7#![cfg_attr(
8 all(feature = "circ-padding", not(feature = "circ-padding-manual")),
9 expect(dead_code)
10)]
11
12mod backend;
13
14use std::collections::VecDeque;
15use std::num::NonZeroU16;
16use std::pin::Pin;
17use std::sync::{Arc, Mutex};
18use std::task::{Context, Poll, Waker};
19
20use bitvec::BitArr;
21use maybenot::MachineId;
22use smallvec::SmallVec;
23use tor_memquota::memory_cost_structural_copy;
24use tor_rtcompat::{DynTimeProvider, SleepProvider};
25
26use crate::HopNum;
27use crate::circuit::HOPS;
28use crate::util::err::ExcessPadding;
29use backend::PaddingBackend;
30
31/// The type of Instant that we'll use for our padding machines.
32///
33/// We use a separate type alias here in case we want to move to coarsetime.
34type Instant = web_time_compat::Instant;
35
36/// The type of Duration that we'll use for our padding machines.
37///
38/// We use a separate type alias here in case we want to move to coarsetime.
39type Duration = std::time::Duration;
40
41/// A type we use to generate a set of [`PaddingEvent`].
42///
43/// This is a separate type so we can tune it and make it into a smallvec if needed.
44type PaddingEventQueue = VecDeque<PaddingEvent>;
45
46/// A type we use to generate a set of [`PaddingEvent`].
47///
48/// This is a separate type so we can tune it and make it into a smallvec if needed.
49type PerHopPaddingEventVec = Vec<PerHopPaddingEvent>;
50
51/// Specifications for a set of maybenot padding machines as used in Arti: used to construct a `maybenot::Framework`.
52#[derive(Clone, Debug, derive_builder::Builder)]
53#[builder(build_fn(
54 validate = "Self::validate",
55 private,
56 error = "CircuitPadderConfigError"
57))]
58#[builder(name = "CircuitPadderConfig")]
59#[cfg_attr(not(feature = "circ-padding-manual"), builder(private))]
60#[cfg_attr(feature = "circ-padding-manual", builder(public))]
61pub(crate) struct PaddingRules {
62 /// List of padding machines to use for shaping traffic.
63 ///
64 /// Note that this list may be empty, if we only want to receive padding,
65 /// and never send it.
66 machines: Arc<[maybenot::Machine]>,
67 /// Maximum allowable outbound padding fraction.
68 ///
69 /// Passed directly to maybenot; not enforced in Arti.
70 /// See [`maybenot::Framework::new`] for details.
71 ///
72 /// Must be between 0.0 and 1.0
73 #[builder(default = "1.0")]
74 max_outbound_blocking_frac: f64,
75 /// Maximum allowable outbound blocking fraction.
76 ///
77 /// Passed directly to maybenot; not enforced in Arti.
78 /// See [`maybenot::Framework::new`] for details.
79 ///
80 /// Must be between 0.0 and 1.0.
81 #[builder(default = "1.0")]
82 max_outbound_padding_frac: f64,
83 /// Maximum allowable fraction of inbound padding
84 #[builder(default = "1.0")]
85 max_inbound_padding_frac: f64,
86 /// Number of cells before which we should not enforce max_inbound_padding_frac.
87 #[builder(default = "1")]
88 enforce_inbound_padding_after_cells: u16,
89}
90
91/// An error returned from validating a [`CircuitPadderConfig`].
92#[derive(Clone, Debug, thiserror::Error)]
93#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
94#[non_exhaustive]
95pub(crate) enum CircuitPadderConfigError {
96 /// A field needed to be given, but wasn't.
97 #[error("No value was given for {0}")]
98 UninitializedField(&'static str),
99 /// A field needed to be a proper fraction, but wasn't.
100 #[error("Value was out of range for {0}. (Must be between 0 and 1)")]
101 FractionOutOfRange(&'static str),
102 /// Maybenot gave us an error when initializing the framework.
103 #[error("Maybenot could not initialize framework for rules")]
104 MaybenotError(#[from] maybenot::Error),
105}
106
107impl From<derive_builder::UninitializedFieldError> for CircuitPadderConfigError {
108 fn from(value: derive_builder::UninitializedFieldError) -> Self {
109 Self::UninitializedField(value.field_name())
110 }
111}
112
113impl CircuitPadderConfig {
114 /// Helper: Return an error if this is not a valid Builder.
115 fn validate(&self) -> Result<(), CircuitPadderConfigError> {
116 macro_rules! enforce_frac {
117 { $field:ident } =>
118 {
119 if self.$field.is_some_and(|v| ! (0.0 .. 1.0).contains(&v)) {
120 return Err(CircuitPadderConfigError::FractionOutOfRange(stringify!($field)));
121 }
122 }
123 }
124 enforce_frac!(max_outbound_blocking_frac);
125 enforce_frac!(max_outbound_padding_frac);
126 enforce_frac!(max_inbound_padding_frac);
127
128 Ok(())
129 }
130
131 /// Construct a [`CircuitPadder`] based on this [`CircuitPadderConfig`].
132 ///
133 /// A [`CircuitPadderConfig`] is created its accessors, and used with this method to build a [`CircuitPadder`].
134 ///
135 /// That [`CircuitPadder`] can then be installed on a circuit using [`ClientCirc::start_padding_at_hop`](crate::client::circuit::ClientCirc::start_padding_at_hop).
136 #[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
137 pub(crate) fn create_padder(
138 &self,
139 now: Instant,
140 ) -> Result<CircuitPadder, CircuitPadderConfigError> {
141 let rules = self.build()?;
142 let backend = rules.create_padding_backend(now)?;
143 let initial_stats = rules.initialize_stats();
144 Ok(CircuitPadder {
145 initial_stats,
146 backend,
147 })
148 }
149}
150
151impl PaddingRules {
152 /// Create a [`PaddingBackend`] for this [`PaddingRules`], so we can install it in a
153 /// [`PaddingShared`].
154 fn create_padding_backend(
155 &self,
156 now: Instant,
157 ) -> Result<Box<dyn PaddingBackend>, maybenot::Error> {
158 // TODO circpad: specialize this for particular values of n_machines,
159 // when we finally go to implement padding.
160 const OPTIMIZE_FOR_N_MACHINES: usize = 4;
161
162 let backend =
163 backend::MaybenotPadder::<OPTIMIZE_FOR_N_MACHINES>::from_framework_rules(self, now)?;
164 Ok(Box::new(backend))
165 }
166
167 /// Create a new `PaddingStats` to reflect the rules for inbound padding of this PaddingRules
168 fn initialize_stats(&self) -> PaddingStats {
169 PaddingStats {
170 n_padding: 0,
171 n_normal: 0,
172 max_padding_frac: self.max_inbound_padding_frac as f32,
173 // We just convert 0 to 1, since that's necessarily what was meant.
174 enforce_max_after: self
175 .enforce_inbound_padding_after_cells
176 .try_into()
177 .unwrap_or(1.try_into().expect("1 was not nonzero!?")),
178 }
179 }
180}
181
182/// A opaque handle to a padding implementation for a single hop.
183///
184/// This type is constructed with [`CircuitPadderConfig::create_padder`].
185#[derive(derive_more::Debug)]
186#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
187pub(crate) struct CircuitPadder {
188 /// The initial padding stats and restrictions for inbound padding.
189 initial_stats: PaddingStats,
190 /// The underlying backend to use.
191 #[debug(skip)]
192 backend: Box<dyn PaddingBackend>,
193}
194
195/// An instruction from the padding machine to the circuit.
196///
197/// These are returned from the [`PaddingEventStream`].
198///
199/// When the `circ-padding` feature is disabled, these won't actually be constructed.
200#[derive(Clone, Copy, Debug)]
201pub(crate) enum PaddingEvent {
202 /// An instruction to send padding.
203 SendPadding(SendPadding),
204 /// An instruction to start blocking outbound traffic,
205 /// or change the hop at which traffic is blocked.
206 StartBlocking(StartBlocking),
207 /// An instruction to stop all blocking.
208 StopBlocking,
209}
210
211/// An instruction from a single padding hop.
212///
213/// This will be turned into a [`PaddingEvent`] before it's given
214/// to the circuit reactor.
215#[derive(Clone, Copy, Debug)]
216enum PerHopPaddingEvent {
217 /// An instruction to send padding.
218 SendPadding {
219 /// The machine that told us to send the padding.
220 ///
221 /// (We need to use this when we report that we sent the padding.)
222 machine: MachineId,
223 /// Whether the padding can be replaced with regular data.
224 replace: Replace,
225 /// Whether the padding can bypass a bypassable block.
226 bypass: Bypass,
227 },
228 /// An instruction to start blocking traffic..
229 StartBlocking {
230 /// Whether this blocking instance may be bypassed by padding with
231 /// [`Bypass::BypassBlocking`].
232 ///
233 /// (Note that this is _not_ a `Bypass`, since that enum notes whether
234 /// or not a _padding_ cell can bypass blocking)
235 is_bypassable: bool,
236 },
237 /// An instruction to stop blocking.
238 StopBlocking,
239}
240
241/// Whether a given piece of padding can be replaced with queued data.
242///
243/// This is an enum to avoid confusing it with `Bypass`.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub(crate) enum Replace {
246 /// The padding can be replaced
247 /// either by packaging data in a regular data cell,
248 /// or with data currently queued but not yet sent.
249 Replaceable,
250 /// The padding must be queued; it can't be replaced with data.
251 NotReplaceable,
252}
253
254impl Replace {
255 /// Construct a [`Replace`] from a bool.
256 fn from_bool(replace: bool) -> Self {
257 match replace {
258 true => Replace::Replaceable,
259 false => Replace::NotReplaceable,
260 }
261 }
262}
263
264/// Whether a piece of padding can bypass a bypassable case of blocking.
265///
266/// This is an enum to avoid confusing it with `Release`.
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub(crate) enum Bypass {
269 /// This padding may bypass the block, if the block is bypassable.
270 ///
271 /// Note that this case has complicated interactions with `Replace`; see the
272 /// `maybenot` documentation.
273 BypassBlocking,
274 /// The padding may not bypass the block.
275 DoNotBypass,
276}
277
278/// Information about a queued cell that we need to feed back into the padding
279/// subsystem.
280#[derive(Clone, Copy, Debug)]
281pub(crate) struct QueuedCellPaddingInfo {
282 /// The hop that will receive this cell.
283 pub(crate) target_hop: HopNum,
284}
285memory_cost_structural_copy!(QueuedCellPaddingInfo);
286
287impl Bypass {
288 /// Construct a [`Bypass`] from a bool.
289 fn from_bool(replace: bool) -> Self {
290 match replace {
291 true => Bypass::BypassBlocking,
292 false => Bypass::DoNotBypass,
293 }
294 }
295}
296
297/// An indication that we should send a padding cell.
298///
299/// Don't drop this: instead, once the cell is queued,
300/// pass this `SendPadding` object to the relevant [`PaddingController`]
301/// to report that the particular piece of padding has been queued.
302#[derive(Clone, Debug, Copy)]
303pub(crate) struct SendPadding {
304 /// The machine within a framework that told us to send the padding.
305 ///
306 /// We store this so we can tell the framework which machine's padding we sent.
307 machine: maybenot::MachineId,
308
309 /// The hop to which we need to send the padding.
310 pub(crate) hop: HopNum,
311
312 /// Whether this padding can be replaced by regular data.
313 pub(crate) replace: Replace,
314
315 /// Whether this padding cell should bypass any current blocking.
316 pub(crate) bypass: Bypass,
317}
318
319impl SendPadding {
320 /// Convert this SendPadding into a TriggerEvent for Maybenot,
321 /// to indicate that the padding was sent.
322 fn into_sent_event(self) -> maybenot::TriggerEvent {
323 maybenot::TriggerEvent::PaddingSent {
324 machine: self.machine,
325 }
326 }
327
328 /// If true, we are allowed to replace this padding cell
329 /// with a normal non-padding cell.
330 ///
331 /// (If we do, we should call [`PaddingController::queued_data_as_padding`])
332 pub(crate) fn may_replace_with_data(&self) -> Replace {
333 self.replace
334 }
335
336 /// Return whether this padding cell is allowed to bypass any current blocking.
337 pub(crate) fn may_bypass_block(&self) -> Bypass {
338 self.bypass
339 }
340}
341
342/// An instruction to start blocking traffic
343/// or to change the rules for blocking traffic.
344#[derive(Clone, Copy, Debug)]
345pub(crate) struct StartBlocking {
346 /// If true, then padding traffic _to the blocking hop_
347 /// can bypass this block, if it has [`Bypass::BypassBlocking`].
348 ///
349 /// (All traffic can be sent to earlier hops as normal.
350 /// No traffic may be sent to later hops.)
351 pub(crate) is_bypassable: bool,
352}
353
354/// Absolute upper bound for number of hops.
355const MAX_HOPS: usize = 64;
356
357/// A handle to the padding state of a single circuit.
358///
359/// Used to tell the padders about events that they need to react to.
360#[derive(Clone, derive_more::Debug)]
361pub(crate) struct PaddingController<S = DynTimeProvider>
362where
363 S: SleepProvider,
364{
365 /// The underlying shared state.
366 #[debug(skip)]
367 shared: Arc<Mutex<PaddingShared<S>>>,
368}
369
370/// The shared state for a single circuit's padding.
371///
372/// Used by both PaddingController and PaddingEventStream.
373struct PaddingShared<S: SleepProvider> {
374 /// A sleep provider for telling the time and creating sleep futures.
375 runtime: S,
376 /// Per-hop state for each hop that we have enabled padding with.
377 ///
378 /// INVARIANT: the length of this vector is no greater than `MAX_HOPS`.
379 hops: SmallVec<[Option<Box<dyn PaddingBackend>>; HOPS]>,
380 /// Records about how much padding and normal traffic we have received from each hop,
381 /// and how much padding is allowed.
382 stats: SmallVec<[Option<PaddingStats>; HOPS]>,
383 /// Which hops are currently blocking, and whether that blocking is bypassable.
384 blocking: BlockingState,
385 /// When will the currently pending sleep future next expire?
386 ///
387 /// We keep track of this so that we know when we need to reset the sleep future.
388 /// It gets updated by `PaddingStream::schedule_next_wakeup`,
389 /// which we call in `<PaddingStream as Stream>::poll_next` immediately
390 /// before we create a timer.
391 next_scheduled_wakeup: Option<Instant>,
392
393 /// A deque of `PaddingEvent` that we want to yield from our [`PaddingEventStream`].
394 ///
395 /// NOTE: If you put new items in this list from anywhere other than inside
396 /// `PaddingEventStream::poll_next`, you need to alert the `waker`.
397 pending_events: PaddingEventQueue,
398
399 /// A waker to alert if we've added any events to padding_events,
400 /// or if we need the stream to re-poll.
401 //
402 // TODO circpad: This waker is redundant with the one stored in every backend's `Timer`.
403 // When we revisit this code we may want to consider combining them somehow.
404 waker: Waker,
405}
406
407/// The number of padding and non-padding cells we have received from each hop,
408/// and the rules for how many are allowed.
409#[derive(Clone, Debug)]
410struct PaddingStats {
411 /// The number of padding cells we've received from this hop.
412 n_padding: u64,
413 /// The number of non-padding cells we've received from this hop.
414 n_normal: u64,
415 /// The maximum allowable fraction of padding cells.
416 max_padding_frac: f32,
417 /// A lower limit, below which we will not enforce `max_padding_frac`.
418 //
419 // This is a NonZero for several reasons:
420 // - It doesn't make sense to enforce a ratio when no cells have been received.
421 // - If we only check when the total is at above zero, we can avoid a division-by-zero check.
422 // - Having an impossible value here ensures that the niche optimization
423 // will work on PaddingStats.
424 enforce_max_after: NonZeroU16,
425}
426
427impl PaddingStats {
428 /// Return an error if this PaddingStats has exceeded its maximum.
429 fn validate(&self) -> Result<(), ExcessPadding> {
430 // Total number of cells.
431 // (It is impossible to get so many cells that this addition will overflow a u64.)
432 let total = self.n_padding + self.n_normal;
433
434 if total >= u16::from(self.enforce_max_after).into() {
435 // TODO: is there a way we can avoid a floating-point op here?
436 // Or can we limit the number of times that we need to check?
437 // (Tobias suggests randomization; I'm worried.)
438 //
439 // On the one hand, this may never appear on our profiles.
440 // But on the other hand, if it _does_ matter for performance,
441 // it is likely to be on some marginal platform with bad FP performance,
442 // where we are unlikely to be doing much testing.
443 //
444 // One promising possibility is to calculate a minimum amount of padding
445 // that we _know_ will be valid, given the current total,
446 // and then not check again until we at all until we reach that amount.
447 if self.n_padding as f32 > (total as f32 * self.max_padding_frac) {
448 return Err(ExcessPadding::PaddingExceedsLimit);
449 }
450 }
451 Ok(())
452 }
453}
454
455/// Current padding-related blocking status for a circuit.
456///
457/// We have to keep track of whether each hop is blocked or not,
458/// and whether its blocking is bypassable.
459/// But all we actually need to tell the reactor code
460/// is whether to block the _entire_ circuit or not.
461//
462// TODO circpad: It might beneficial
463// to block only the first blocking hop and its successors,
464// but that creates tricky starvation problems
465// in the case where we have queued traffic for a later, blocking, hop
466// that prevents us from flushing any messages to earlier hops.
467// We could solve this with tricky out-of-order designs,
468// but for now we're just treating "blocked" as a boolean.
469#[derive(Default)]
470struct BlockingState {
471 /// Whether each hop is currently blocked.
472 hop_blocked: BitArr![for MAX_HOPS],
473 /// Whether each hop's blocking is currently **not** bypassable.
474 blocking_non_bypassable: BitArr![for MAX_HOPS],
475}
476
477impl BlockingState {
478 /// Set the hop at position `idx` to blocked.
479 fn set_blocked(&mut self, idx: usize, is_bypassable: bool) {
480 self.hop_blocked.set(idx, true);
481 self.blocking_non_bypassable.set(idx, !is_bypassable);
482 }
483 /// Set the hop at position `idx` to unblocked.
484 fn set_unblocked(&mut self, idx: usize) {
485 self.hop_blocked.set(idx, false);
486 self.blocking_non_bypassable.set(idx, false);
487 }
488 /// Return a [`PaddingEvent`]
489 fn blocking_update_paddingevent(&self) -> PaddingEvent {
490 if self.blocking_non_bypassable.any() {
491 // At least one hop has non-bypassable blocking, so our blocking is non-bypassable.
492 PaddingEvent::StartBlocking(StartBlocking {
493 is_bypassable: false,
494 })
495 } else if self.hop_blocked.any() {
496 // At least one hop is blocking, but no hop has non-bypassable padding, so this padding
497 // is bypassable.
498 PaddingEvent::StartBlocking(StartBlocking {
499 is_bypassable: true,
500 })
501 } else {
502 // Nobody is blocking right now; it's time to unblock.
503 PaddingEvent::StopBlocking
504 }
505 }
506}
507
508#[allow(clippy::unnecessary_wraps)]
509impl<S: SleepProvider> PaddingController<S> {
510 /// Report that we've enqueued a non-padding cell for a given hop.
511 ///
512 /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
513 /// when this cell is flushed.
514 pub(crate) fn queued_data(&self, hop: HopNum) -> Option<QueuedCellPaddingInfo> {
515 let mut shared = self.shared.lock().expect("Lock poisoned");
516 // Every hop up to and including the target hop will see this as normal data.
517 shared.trigger_events(hop, &[maybenot::TriggerEvent::NormalSent]);
518 shared.info_for_hop(hop)
519 }
520
521 /// Install the given [`CircuitPadder`] to start padding traffic to the listed `hop`.
522 ///
523 /// Stops padding if the provided padder is `None`.
524 ///
525 /// Replaces any previous [`CircuitPadder`].
526 pub(crate) fn install_padder_padding_at_hop(&self, hop: HopNum, padder: Option<CircuitPadder>) {
527 self.shared
528 .lock()
529 .expect("lock poisoned")
530 .set_hop_backend(hop, padder);
531 }
532
533 /// Report that we have enqueued a non-padding cell
534 /// in place of a replaceable padding cell
535 /// for a given hop.
536 ///
537 /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
538 /// when this cell is flushed.
539 pub(crate) fn queued_data_as_padding(
540 &self,
541 hop: HopNum,
542 sendpadding: SendPadding,
543 ) -> Option<QueuedCellPaddingInfo> {
544 assert_eq!(hop, sendpadding.hop);
545 assert_eq!(Replace::Replaceable, sendpadding.replace);
546 let mut shared = self.shared.lock().expect("Lock poisoned");
547 shared.trigger_events_mixed(
548 hop,
549 // Each intermediate hop sees this as normal data.
550 &[maybenot::TriggerEvent::NormalSent],
551 // For the target hop, we treat this both as normal, _and_ as padding.
552 &[
553 maybenot::TriggerEvent::NormalSent,
554 sendpadding.into_sent_event(),
555 ],
556 );
557 shared.info_for_hop(hop)
558 }
559
560 /// Report that we have enqueued a padding cell to a given hop.
561 ///
562 /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
563 /// when this cell is flushed.
564 pub(crate) fn queued_padding(
565 &self,
566 hop: HopNum,
567 sendpadding: SendPadding,
568 ) -> Option<QueuedCellPaddingInfo> {
569 assert_eq!(hop, sendpadding.hop);
570 let mut shared = self.shared.lock().expect("Lock poisoned");
571 shared.trigger_events_mixed(
572 hop,
573 // Each intermediate hop sees this as normal data.
574 &[maybenot::TriggerEvent::NormalSent],
575 // The target hop sees this as padding.
576 &[sendpadding.into_sent_event()],
577 );
578 shared.info_for_hop(hop)
579 }
580
581 /// Report that we are using an already-queued cell
582 /// as a substitute for sending padding to a given hop.
583 pub(crate) fn replaceable_padding_already_queued(&self, hop: HopNum, sendpadding: SendPadding) {
584 assert_eq!(hop, sendpadding.hop);
585 let mut shared = self.shared.lock().expect("Lock poisoned");
586 shared.trigger_events_mixed(
587 hop,
588 // No additional data will be seen for any intermediate hops.
589 &[],
590 // The target hop's machine sees this as padding.
591 &[sendpadding.into_sent_event()],
592 );
593 }
594
595 /// Report that we've flushed a cell from the queue for the given hop.
596 pub(crate) fn flushed_relay_cell(&self, info: QueuedCellPaddingInfo) {
597 // Every hop up to the last
598 let mut shared = self.shared.lock().expect("Lock poisoned");
599 shared.trigger_events(info.target_hop, &[maybenot::TriggerEvent::TunnelSent]);
600 }
601
602 /// Report that we've flushed a cell from the per-channel queue.
603 pub(crate) fn flushed_channel_cell(&self) {
604 let mut shared = self.shared.lock().expect("Lock poisoned");
605 shared.trigger_events(HopNum::from(0), &[maybenot::TriggerEvent::TunnelSent]);
606 }
607
608 /// Report that we have decrypted a non-padding cell from our queue
609 /// from a given hop.
610 ///
611 // Note that in theory, it would be better to trigger TunnelRecv as soon as
612 // possible after we receive and enqueue the data cell, and NormalRecv only
613 // once we've decrypted it and found it to be data. But we can't do that,
614 // since we won't know which hop actually originated the cell until we
615 // decrypt it.
616 pub(crate) fn decrypted_data(&self, hop: HopNum) {
617 let mut shared = self.shared.lock().expect("Lock poisoned");
618 shared.inc_normal_received(hop);
619 shared.trigger_events(
620 hop,
621 // We treat this as normal data from every hop.
622 &[
623 maybenot::TriggerEvent::TunnelRecv,
624 maybenot::TriggerEvent::NormalRecv,
625 ],
626 );
627 }
628 /// Report that we have decrypted a padding cell from our queue.
629 ///
630 /// Return an error if this padding cell is not acceptable
631 /// (because we have received too much padding from this hop,
632 /// or because we have not enabled padding with this hop.)
633 //
634 // See note above.
635 pub(crate) fn decrypted_padding(&self, hop: HopNum) -> Result<(), crate::Error> {
636 let mut shared = self.shared.lock().expect("Lock poisoned");
637 shared
638 .inc_padding_received(hop)
639 .map_err(|e| crate::Error::ExcessPadding(e, hop))?;
640 shared.trigger_events_mixed(
641 hop,
642 // We treat this as normal data from the intermediate hops.
643 &[
644 maybenot::TriggerEvent::TunnelRecv,
645 maybenot::TriggerEvent::NormalRecv,
646 ],
647 // But from the target hop, it counts as padding.
648 &[
649 maybenot::TriggerEvent::TunnelRecv,
650 maybenot::TriggerEvent::PaddingRecv,
651 ],
652 );
653 Ok(())
654 }
655}
656
657impl<S: SleepProvider> PaddingShared<S> {
658 /// Trigger a list of maybenot events on every hop up to and including `hop`.
659 fn trigger_events(&mut self, hop: HopNum, events: &[maybenot::TriggerEvent]) {
660 let final_idx = usize::from(hop);
661 let now = self.runtime.now();
662 let next_scheduled_wakeup = self.next_scheduled_wakeup;
663 for hop_controller in self.hops.iter_mut().take(final_idx + 1) {
664 let Some(hop_controller) = hop_controller else {
665 continue;
666 };
667 hop_controller.report_events_at(events, now, next_scheduled_wakeup);
668 }
669 }
670
671 /// Trigger `intermediate_hop_events` on every hop up to but _not_ including `hop`.
672 ///
673 /// Trigger `final_hop_events` on `hop`.
674 ///
675 /// (Don't trigger anything on any hops _after_ `hop`.)
676 fn trigger_events_mixed(
677 &mut self,
678 hop: HopNum,
679 intermediate_hop_events: &[maybenot::TriggerEvent],
680 final_hop_events: &[maybenot::TriggerEvent],
681 ) {
682 use itertools::Itertools as _;
683 use itertools::Position as P;
684 let final_idx = usize::from(hop);
685 let now = self.runtime.now();
686 let next_scheduled_wakeup = self.next_scheduled_wakeup;
687 for (position, hop_controller) in self.hops.iter_mut().take(final_idx + 1).with_position() {
688 let Some(hop_controller) = hop_controller else {
689 continue;
690 };
691 let events = match position {
692 P::First | P::Middle => intermediate_hop_events,
693 P::Last | P::Only => final_hop_events,
694 };
695 hop_controller.report_events_at(events, now, next_scheduled_wakeup);
696 }
697 }
698
699 /// Increment the normal cell count from every hop up to and including `hop`.
700 fn inc_normal_received(&mut self, hop: HopNum) {
701 let final_idx = usize::from(hop);
702 for stats in self.stats.iter_mut().take(final_idx + 1).flatten() {
703 stats.n_normal += 1;
704 }
705 }
706
707 /// Increment the padding count from `hop`, and the normal cell count from all earlier hops.
708 ///
709 /// Return an error if a padding cell from `hop` would not be acceptable.
710 fn inc_padding_received(&mut self, hop: HopNum) -> Result<(), ExcessPadding> {
711 use itertools::Itertools as _;
712 use itertools::Position as P;
713 let final_idx = usize::from(hop);
714 for (position, stats) in self.stats.iter_mut().take(final_idx + 1).with_position() {
715 match (position, stats) {
716 (P::First | P::Middle, Some(stats)) => stats.n_normal += 1,
717 (P::First | P::Middle, None) => {}
718 (P::Last | P::Only, Some(stats)) => {
719 stats.n_padding += 1;
720 stats.validate()?;
721 }
722 (P::Last | P::Only, None) => {
723 return Err(ExcessPadding::NoPaddingNegotiated);
724 }
725 }
726 }
727 Ok(())
728 }
729
730 /// Return the `QueuedCellPaddingInfo` to use when sending messages to `target_hop`
731 #[allow(clippy::unnecessary_wraps)]
732 fn info_for_hop(&self, target_hop: HopNum) -> Option<QueuedCellPaddingInfo> {
733 // TODO circpad optimization: This is always Some for now, but we
734 // could someday avoid creating this object
735 // when padding is not enabled on the circuit,
736 // or if padding is not enabled on any hop of the circuit <= target_hop.
737 Some(QueuedCellPaddingInfo { target_hop })
738 }
739}
740
741impl<S: SleepProvider> PaddingShared<S> {
742 /// Install or remove a [`CircuitPadder`] for a single hop.
743 fn set_hop_backend(&mut self, hop: HopNum, backend: Option<CircuitPadder>) {
744 let hop_idx: usize = hop.into();
745 assert!(hop_idx < MAX_HOPS);
746 let n_needed = hop_idx + 1;
747 // Make sure there are enough spaces in self.hops.
748 // We can't use "resize" or "extend", since Box<dyn<PaddingBackend>>
749 // doesn't implement Clone, which SmallVec requires.
750 while self.hops.len() < n_needed {
751 self.hops.push(None);
752 }
753 while self.stats.len() < n_needed {
754 self.stats.push(None);
755 }
756 // project through option...
757 let (hop_backend, stats) = if let Some(padder) = backend {
758 (Some(padder.backend), Some(padder.initial_stats))
759 } else {
760 (None, None)
761 };
762 self.hops[hop_idx] = hop_backend;
763 self.stats[hop_idx] = stats;
764
765 let was_blocked = self.blocking.hop_blocked[hop_idx];
766 self.blocking.set_unblocked(hop_idx);
767 if was_blocked {
768 self.pending_events
769 .push_back(self.blocking.blocking_update_paddingevent());
770 }
771
772 // We need to alert the stream, in case we added an event above, and so that it will poll
773 // the new padder at least once.
774 self.waker.wake_by_ref();
775 }
776
777 /// Transform a [`PerHopPaddingEvent`] for a single hop with index `idx` into a [`PaddingEvent`],
778 /// updating our state as appropriate.
779 fn process_per_hop_event(
780 blocking: &mut BlockingState,
781 hop_idx: usize,
782 event: PerHopPaddingEvent,
783 ) -> PaddingEvent {
784 use PaddingEvent as PE;
785 use PerHopPaddingEvent as PHPE;
786
787 match event {
788 PHPE::SendPadding {
789 machine,
790 replace,
791 bypass,
792 } => PE::SendPadding(SendPadding {
793 machine,
794 hop: hopnum_from_hop_idx(hop_idx),
795 replace,
796 bypass,
797 }),
798 PHPE::StartBlocking { is_bypassable } => {
799 // NOTE that we remember is_bypassable for every hop, but the blocking is only
800 // bypassable if _every_ hop is unblocked, or has bypassable blocking.
801 blocking.set_blocked(hop_idx, is_bypassable);
802 blocking.blocking_update_paddingevent()
803 }
804 PHPE::StopBlocking => {
805 blocking.set_unblocked(hop_idx);
806 blocking.blocking_update_paddingevent()
807 }
808 }
809 }
810
811 /// Extract every PaddingEvent that is ready to be reported to the circuit at time `now`.
812 ///
813 /// May trigger other events, or wake up the stream, in the course of running.
814 fn take_padding_events_at(&mut self, now: Instant) -> PaddingEventQueue {
815 let mut output = PaddingEventQueue::default();
816 for (hop_idx, backend) in self.hops.iter_mut().enumerate() {
817 let Some(backend) = backend else {
818 continue;
819 };
820
821 let hop_events = backend.take_padding_events_at(now, self.next_scheduled_wakeup);
822
823 output.extend(
824 hop_events
825 .into_iter()
826 .map(|ev| Self::process_per_hop_event(&mut self.blocking, hop_idx, ev)),
827 );
828 }
829 output
830 }
831
832 /// Find the next time at which we should wake up the stream, and register it as our
833 /// "next scheduled wakeup".
834 fn schedule_next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
835 // Find the earliest time at which any hop has a scheduled event.
836 let next_expiration = self
837 .hops
838 .iter_mut()
839 .flatten()
840 .filter_map(|hop| hop.next_wakeup(waker))
841 .min();
842 self.next_scheduled_wakeup = next_expiration;
843 self.waker = waker.clone();
844 next_expiration
845 }
846}
847
848/// A stream of [`PaddingEvent`] to tell a circuit when (if at all) it should send
849/// padding and block traffic.
850//
851// TODO circpad: Optimize this even more for the no-padding case?
852// We could make it smaller or faster.
853pub(crate) struct PaddingEventStream<S = DynTimeProvider>
854where
855 S: SleepProvider,
856{
857 /// An underlying list of PaddingBackend.
858 shared: Arc<Mutex<PaddingShared<S>>>,
859
860 /// A future defining a time at which we must next call `padder.padding_events_at`.
861 ///
862 /// (We also arrange for the backend to wake us up if we need to change this time,
863 /// or call `padder.padding_events_at`.)
864 ///
865 /// Note that this timer is allowed to be _earlier_ than our true wakeup time,
866 /// but not later.
867 sleep_future: S::SleepFuture,
868}
869
870impl futures::Stream for PaddingEventStream {
871 type Item = PaddingEvent;
872
873 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
874 loop {
875 let (now, next_wakeup, runtime) = {
876 // We destructure like this to avoid simultaneous mutable/immutable borrows.
877 let Self { shared, .. } = &mut *self;
878
879 let mut shared = shared.lock().expect("Poisoned lock");
880
881 // Do we have any events that are waiting to be yielded?
882 if let Some(val) = shared.pending_events.pop_front() {
883 return Poll::Ready(Some(val));
884 }
885
886 // Does the padder have any events that have become ready to be yielded?
887 let now = shared.runtime.now();
888 shared.pending_events = shared.take_padding_events_at(now);
889
890 if let Some(val) = shared.pending_events.pop_front() {
891 return Poll::Ready(Some(val));
892 }
893
894 // If we reach this point, there are no events to trigger right now.
895 //
896 // We'll ask all the padders for the time at which they next might need to take
897 // action, and register our Waker with them, to be alerted if we need to take any action
898 // before that.
899 (
900 now,
901 shared.schedule_next_wakeup(cx.waker()),
902 shared.runtime.clone(),
903 )
904 // Here we drop the lock on the shared state.
905 };
906
907 match next_wakeup {
908 None => {
909 return Poll::Pending;
910 }
911 Some(t) => {
912 // TODO circpad: Avoid rebuilding sleep future needlessly. May require new APIs in
913 // tor-rtcompat.
914 self.sleep_future = runtime.sleep(t.saturating_duration_since(now));
915 match self.sleep_future.as_mut().poll(cx) {
916 Poll::Ready(()) => {
917 // Okay, The timer expired already. Continue through the loop.
918 continue;
919 }
920 Poll::Pending => return Poll::Pending,
921 }
922 }
923 }
924 }
925 }
926}
927
928impl futures::stream::FusedStream for PaddingEventStream {
929 fn is_terminated(&self) -> bool {
930 // This stream is _never_ terminated: even if it has no padding machines now,
931 // we might add some in the future.
932 false
933 }
934}
935
936/// Construct a HopNum from an index into the `hops` field of a [`PaddingShared`].
937///
938/// # Panics
939///
940/// Panics if `hop_idx` is greater than u8::MAX, which should be impossible.
941fn hopnum_from_hop_idx(hop_idx: usize) -> HopNum {
942 // (Static assertion: makes sure we can represent every index of hops as a HopNum.)
943 const _: () = assert!(MAX_HOPS < u8::MAX as usize);
944 HopNum::from(u8::try_from(hop_idx).expect("hop_idx out of range!"))
945}
946
947/// Create a new, empty padding instance for a new circuit.
948pub(crate) fn new_padding<S>(runtime: S) -> (PaddingController<S>, PaddingEventStream<S>)
949where
950 S: SleepProvider,
951{
952 // Start with an arbitrary sleep future. We won't actually use this until
953 // the first time that we have an event to schedule, so the timeout doesn't matter.
954 let sleep_future = runtime.sleep(Duration::new(86400, 0));
955
956 let shared = PaddingShared {
957 runtime,
958 hops: Default::default(),
959 stats: Default::default(),
960 blocking: Default::default(),
961 next_scheduled_wakeup: None,
962 pending_events: PaddingEventQueue::default(),
963 waker: Waker::noop().clone(),
964 };
965 let shared = Arc::new(Mutex::new(shared));
966 let controller = PaddingController {
967 shared: shared.clone(),
968 };
969 let stream = PaddingEventStream {
970 shared,
971 sleep_future,
972 };
973
974 (controller, stream)
975}