Skip to main content

tor_proto/stream/flow_ctrl/
state.rs

1//! Code for implementing flow control (stream-level).
2
3use std::sync::Arc;
4
5use postage::watch;
6use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
7use tor_cell::relaycell::msg::AnyRelayMsg;
8use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
9
10use super::params::FlowCtrlParameters;
11use super::window::state::WindowFlowCtrl;
12use super::xon_xoff::reader::DrainRateRequest;
13#[cfg(feature = "flowctl-cc")]
14use super::xon_xoff::state::XonXoffFlowCtrl;
15
16use crate::Result;
17use crate::congestion::sendme;
18use crate::util::notify::NotifySender;
19
20/// Private internals of [`StreamFlowCtrl`].
21#[enum_dispatch::enum_dispatch]
22#[derive(Debug)]
23enum StreamFlowCtrlEnum {
24    /// "legacy" sendme-window-based flow control.
25    WindowBased(WindowFlowCtrl),
26    /// XON/XOFF flow control.
27    #[cfg(feature = "flowctl-cc")]
28    XonXoffBased(XonXoffFlowCtrl),
29}
30
31/// Manages flow control for a stream.
32#[derive(Debug)]
33pub(crate) struct StreamFlowCtrl {
34    /// Private internal enum.
35    e: StreamFlowCtrlEnum,
36}
37
38impl StreamFlowCtrl {
39    /// Returns a new sendme-window-based [`StreamFlowCtrl`].
40    pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
41        Self {
42            e: StreamFlowCtrlEnum::WindowBased(WindowFlowCtrl::new(window)),
43        }
44    }
45
46    /// Returns a new xon/xoff-based [`StreamFlowCtrl`].
47    #[cfg(feature = "flowctl-cc")]
48    pub(crate) fn new_xon_xoff(
49        params: Arc<FlowCtrlParameters>,
50        use_sidechannel_mitigations: bool,
51        rate_limit_updater: watch::Sender<StreamRateLimit>,
52        drain_rate_requester: NotifySender<DrainRateRequest>,
53    ) -> Self {
54        Self {
55            e: StreamFlowCtrlEnum::XonXoffBased(XonXoffFlowCtrl::new(
56                params,
57                use_sidechannel_mitigations,
58                rate_limit_updater,
59                drain_rate_requester,
60            )),
61        }
62    }
63}
64
65// forward all trait methods to the inner enum
66impl FlowCtrlHooks for StreamFlowCtrl {
67    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
68        self.e.can_send(msg)
69    }
70
71    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
72        self.e.about_to_send(msg)
73    }
74
75    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
76        self.e.put_for_incoming_sendme(msg)
77    }
78
79    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
80        self.e.handle_incoming_xon(msg)
81    }
82
83    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
84        self.e.handle_incoming_xoff(msg)
85    }
86
87    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
88        self.e.maybe_send_xon(rate, buffer_len)
89    }
90
91    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
92        self.e.maybe_send_xoff(buffer_len)
93    }
94}
95
96/// Methods that can be called on a [`StreamFlowCtrl`].
97///
98/// We use a trait so that we can use `enum_dispatch` on the inner [`StreamFlowCtrlEnum`] enum.
99#[enum_dispatch::enum_dispatch(StreamFlowCtrlEnum)]
100pub(crate) trait FlowCtrlHooks {
101    /// Whether this stream is ready to send `msg`.
102    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
103
104    /// Inform the flow control code that we're about to send `msg`.
105    /// Returns an error if the message should not be sent,
106    /// and the circuit should be closed.
107    // TODO: Consider having this method wrap the message in a type that
108    // "proves" we've applied flow control. This would make it easier to apply
109    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
110    // ambiguity in the sending function as to whether flow control has already
111    // been applied or not.
112    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
113
114    /// Handle an incoming sendme.
115    ///
116    /// On success, return the number of cells left in the window.
117    ///
118    /// On failure, return an error: the caller should close the stream or
119    /// circuit with a protocol error.
120    ///
121    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
122    /// correct type of flow control.
123    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
124
125    /// Handle an incoming XON message.
126    ///
127    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
128    /// correct type of flow control.
129    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
130
131    /// Handle an incoming XOFF message.
132    ///
133    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
134    /// correct type of flow control.
135    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
136
137    /// Check if we should send an XON message.
138    ///
139    /// If we should, then returns the XON message that should be sent.
140    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
141    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
142
143    /// Check if we should send an XOFF message.
144    ///
145    /// If we should, then returns the XOFF message that should be sent.
146    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
147    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
148}
149
150/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
151#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
152pub(crate) struct StreamRateLimit {
153    /// The rate in bytes/s.
154    rate: u64,
155}
156
157impl StreamRateLimit {
158    /// A maximum rate limit.
159    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
160
161    /// A rate limit of 0.
162    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
163
164    /// A new [`StreamRateLimit`] with `rate` bytes/s.
165    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
166        Self { rate }
167    }
168
169    /// The rate in bytes/s.
170    pub(crate) const fn bytes_per_sec(&self) -> u64 {
171        self.rate
172    }
173}
174
175impl std::fmt::Display for StreamRateLimit {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        write!(f, "{} bytes/s", self.rate)
178    }
179}