tor_proto/stream/flow_ctrl/
state.rs1use std::sync::Arc;
4
5use postage::watch;
6use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
7use tor_cell::relaycell::msg::AnyRelayMsg;
8use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
9
10use super::params::FlowCtrlParameters;
11use super::window::state::WindowFlowCtrl;
12use super::xon_xoff::reader::DrainRateRequest;
13#[cfg(feature = "flowctl-cc")]
14use super::xon_xoff::state::XonXoffFlowCtrl;
15
16use crate::Result;
17use crate::congestion::sendme;
18use crate::util::notify::NotifySender;
19
20#[enum_dispatch::enum_dispatch]
22#[derive(Debug)]
23enum StreamFlowCtrlEnum {
24 WindowBased(WindowFlowCtrl),
26 #[cfg(feature = "flowctl-cc")]
28 XonXoffBased(XonXoffFlowCtrl),
29}
30
31#[derive(Debug)]
33pub(crate) struct StreamFlowCtrl {
34 e: StreamFlowCtrlEnum,
36}
37
38impl StreamFlowCtrl {
39 pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
41 Self {
42 e: StreamFlowCtrlEnum::WindowBased(WindowFlowCtrl::new(window)),
43 }
44 }
45
46 #[cfg(feature = "flowctl-cc")]
48 pub(crate) fn new_xon_xoff(
49 params: Arc<FlowCtrlParameters>,
50 use_sidechannel_mitigations: bool,
51 rate_limit_updater: watch::Sender<StreamRateLimit>,
52 drain_rate_requester: NotifySender<DrainRateRequest>,
53 ) -> Self {
54 Self {
55 e: StreamFlowCtrlEnum::XonXoffBased(XonXoffFlowCtrl::new(
56 params,
57 use_sidechannel_mitigations,
58 rate_limit_updater,
59 drain_rate_requester,
60 )),
61 }
62 }
63}
64
65impl FlowCtrlHooks for StreamFlowCtrl {
67 fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
68 self.e.can_send(msg)
69 }
70
71 fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
72 self.e.about_to_send(msg)
73 }
74
75 fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
76 self.e.put_for_incoming_sendme(msg)
77 }
78
79 fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
80 self.e.handle_incoming_xon(msg)
81 }
82
83 fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
84 self.e.handle_incoming_xoff(msg)
85 }
86
87 fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
88 self.e.maybe_send_xon(rate, buffer_len)
89 }
90
91 fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
92 self.e.maybe_send_xoff(buffer_len)
93 }
94}
95
96#[enum_dispatch::enum_dispatch(StreamFlowCtrlEnum)]
100pub(crate) trait FlowCtrlHooks {
101 fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
103
104 fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
113
114 fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
124
125 fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
130
131 fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
136
137 fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
142
143 fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
148}
149
150#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
152pub(crate) struct StreamRateLimit {
153 rate: u64,
155}
156
157impl StreamRateLimit {
158 pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
160
161 pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
163
164 pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
166 Self { rate }
167 }
168
169 pub(crate) const fn bytes_per_sec(&self) -> u64 {
171 self.rate
172 }
173}
174
175impl std::fmt::Display for StreamRateLimit {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 write!(f, "{} bytes/s", self.rate)
178 }
179}