tor_proto/stream/flow_ctrl/xon_xoff/state.rs
1//! Circuit reactor's stream XON/XOFF flow control.
2//!
3//! ## Notes on consensus parameters
4//!
5//! ### `cc_xoff_client`
6//!
7//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8//! bytes buffered can be *much* larger. For example there will be additional buffering:
9//!
10//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11//! it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12//! data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13//! the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14//! - Within the kernel: There are two additional buffers that will store stream data before the
15//! application connected over socks will see the data: Arti's socket send buffer and the
16//! application's socket receive buffer. If the application were to stop reading from its socket,
17//! stream data would accumulate first in the socket's receive buffer. Once full, stream data
18//! would accumulate in arti's socket's send buffer. This can become relatively large, especially
19//! with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20//! stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21//! 2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22//! significantly higher than the current consensus value of `cc_xoff_client`.
23//!
24//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
25//! than `cc_xoff_client`.
26//!
27//! While we should take into account the kernel and arti socks buffering above, we also need to
28//! keep in mind that arti-client is a library that can be used by others. These library users might
29//! not do any kernel or socks buffering, for example if they write a rust program that handles the
30//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
31//! harms the performance for these users, even if it's fine for the arti socks proxy case.
32
33use std::num::Saturating;
34use std::sync::Arc;
35
36use postage::watch;
37use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
38use tor_cell::relaycell::msg::AnyRelayMsg;
39use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
40use tracing::trace;
41
42use super::reader::DrainRateRequest;
43
44use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
45use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamRateLimit};
46use crate::util::notify::NotifySender;
47use crate::{Error, Result};
48
49#[cfg(doc)]
50use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
51
52/// State for XON/XOFF flow control.
53#[derive(Debug)]
54pub(crate) struct XonXoffFlowCtrl {
55 /// Consensus parameters.
56 params: Arc<FlowCtrlParameters>,
57 /// How we communicate rate limit updates to the
58 /// [`DataWriter`](crate::client::stream::DataWriter).
59 rate_limit_updater: watch::Sender<StreamRateLimit>,
60 /// How we communicate requests for new drain rate updates to the
61 /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
62 drain_rate_requester: NotifySender<DrainRateRequest>,
63 /// The last rate limit we sent.
64 last_sent_xon_xoff: Option<XonXoffMsg>,
65 /// The buffer limit at which we should send an XOFF.
66 ///
67 /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
68 /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
69 /// below).
70 xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
71 /// DropMark sidechannel mitigations.
72 ///
73 /// This is only enabled if we are a client (including an onion service).
74 //
75 // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
76 // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
77 // reconsider in the future if we add more functionality to `SidechannelMitigation`.
78 sidechannel_mitigation: Option<SidechannelMitigation>,
79}
80
81impl XonXoffFlowCtrl {
82 /// Returns a new xon/xoff-based state.
83 pub(crate) fn new(
84 params: Arc<FlowCtrlParameters>,
85 use_sidechannel_mitigations: bool,
86 rate_limit_updater: watch::Sender<StreamRateLimit>,
87 drain_rate_requester: NotifySender<DrainRateRequest>,
88 ) -> Self {
89 let sidechannel_mitigation =
90 use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
91
92 // We use the same XOFF limit regardless of if we're a client or exit.
93 // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
94 let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
95
96 Self {
97 params,
98 rate_limit_updater,
99 drain_rate_requester,
100 last_sent_xon_xoff: None,
101 xoff_limit,
102 sidechannel_mitigation,
103 }
104 }
105}
106
107impl FlowCtrlHooks for XonXoffFlowCtrl {
108 fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
109 // we perform rate-limiting in the `DataWriter`,
110 // so we send any messages that made it past the `DataWriter`
111 true
112 }
113
114 fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
115 // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
116 // notify that we sent a data message
117 if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
118 if let AnyRelayMsg::Data(data_msg) = msg {
119 sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
120 }
121 }
122
123 Ok(())
124 }
125
126 fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
127 let msg = "Stream level SENDME not allowed due to congestion control";
128 Err(Error::CircProto(msg.into()))
129 }
130
131 fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
132 let xon = msg
133 .decode::<Xon>()
134 .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
135 .into_msg();
136
137 // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
138 // > violation.
139 if *xon.version() != 0 {
140 return Err(Error::CircProto("Unrecognized XON version".into()));
141 }
142
143 // if sidechannel mitigations are enabled, notify that an XON was received
144 if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
145 sidechannel_mitigation.received_xon(&self.params)?;
146 }
147
148 trace!("Received an XON with rate {}", xon.kbps_ewma());
149
150 let rate = match xon.kbps_ewma() {
151 XonKbpsEwma::Limited(rate_kbps) => {
152 let rate_kbps = u64::from(rate_kbps.get());
153 // convert from kbps to bytes/s
154 StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
155 }
156 XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
157 };
158
159 *self.rate_limit_updater.borrow_mut() = rate;
160 Ok(())
161 }
162
163 fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
164 let xoff = msg
165 .decode::<Xoff>()
166 .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
167 .into_msg();
168
169 // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
170 // > violation.
171 if *xoff.version() != 0 {
172 return Err(Error::CircProto("Unrecognized XOFF version".into()));
173 }
174
175 // if sidechannel mitigations are enabled, notify that an XOFF was received
176 if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
177 sidechannel_mitigation.received_xoff(&self.params)?;
178 }
179
180 trace!("Received an XOFF");
181
182 // update the rate limit and notify the `DataWriter`
183 *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
184
185 Ok(())
186 }
187
188 fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
189 if buffer_len as u64 > self.xoff_limit.as_bytes() {
190 // we can't send an XON, and we should have already sent an XOFF when the queue first
191 // exceeded the limit (see `maybe_send_xoff()`)
192 debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
193
194 // inform the stream reader that we need a new drain rate
195 self.drain_rate_requester.notify();
196 return Ok(None);
197 }
198
199 self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
200
201 trace!("Want to send an XON with rate {rate}");
202
203 Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
204 }
205
206 fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
207 // if the last XON/XOFF we sent was an XOFF, no need to send another
208 if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
209 return Ok(None);
210 }
211
212 if buffer_len as u64 <= self.xoff_limit.as_bytes() {
213 return Ok(None);
214 }
215
216 // either we have never sent an XOFF or XON, or we last sent an XON
217
218 // remember that we last sent an XOFF
219 self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
220
221 // inform the stream reader that we need a new drain rate
222 self.drain_rate_requester.notify();
223
224 trace!("Want to send an XOFF");
225
226 Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
227 }
228}
229
230/// An XON or XOFF message with no associated data.
231#[derive(Debug, PartialEq, Eq)]
232enum XonXoff {
233 /// XON message.
234 Xon,
235 /// XOFF message.
236 Xoff,
237}
238
239/// An XON or XOFF message with associated data.
240#[derive(Debug)]
241enum XonXoffMsg {
242 /// XON message with a rate.
243 // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
244 // If that doesn't end up being the case, then we should remove it.
245 #[expect(dead_code)]
246 Xon(XonKbpsEwma),
247 /// XOFF message.
248 Xoff,
249}
250
251/// Sidechannel mitigations for DropMark attacks.
252///
253/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
254/// > restricted.
255///
256/// These restrictions should be implemented for clients (OPs and onion services).
257#[derive(Debug)]
258struct SidechannelMitigation {
259 /// The last rate limit update we received.
260 last_recvd_xon_xoff: Option<XonXoff>,
261 /// Number of sent stream bytes.
262 ///
263 /// C-tor has some logic to try to fit this into a 32-bit integer,
264 /// but lets not do that unless we need to as it will make bugs more likely.
265 bytes_sent_total: Saturating<u64>,
266 /// The number of advisory XON messages we've received.
267 ///
268 /// Note: Advisory XONs are XON->XON messages, and not XOFF->XON messages.
269 num_advisory_xon_recvd: Saturating<u64>,
270 /// The number of XOFF messages we've received.
271 num_xoff_recvd: Saturating<u64>,
272}
273
274impl SidechannelMitigation {
275 /// A new [`SidechannelMitigation`].
276 fn new() -> Self {
277 Self {
278 last_recvd_xon_xoff: None,
279 bytes_sent_total: Saturating(0),
280 num_advisory_xon_recvd: Saturating(0),
281 num_xoff_recvd: Saturating(0),
282 }
283 }
284
285 /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
286 fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
287 // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
288 // here exactly what kind of peer we're connected to, and that we may have a different view
289 // of the consensus than the peer.
290 // We deviate from prop324 here and use a more relaxed threshold.
291 // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
292 let min = std::cmp::min(
293 params.cc_xoff_client.as_bytes(),
294 params.cc_xoff_exit.as_bytes(),
295 );
296 min / 2
297 }
298
299 /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
300 fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
301 // We need to consider that we may have a different view of the consensus than the peer.
302 // We deviate from prop324 here and use a more relaxed threshold.
303 // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
304 params.cc_xon_rate.as_bytes() / 2
305 }
306
307 /// Notify that we have sent stream data.
308 fn sent_stream_data(&mut self, stream_bytes: usize) {
309 // perform a saturating conversion to u64
310 let stream_bytes: u64 = stream_bytes.try_into().unwrap_or(u64::MAX);
311 self.bytes_sent_total += stream_bytes;
312 }
313
314 /// Notify that we have received an XON message.
315 fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
316 // Check to make sure that XON is not sent too early, for dropmark attacks. The main
317 // sidechannel risk is early cells, but we also check to see that we did not get more XONs
318 // than make sense for the number of bytes we sent.
319 //
320 // The ordering is important here. For example we first want to check if we received an
321 // advisory XON that was too early, before we check if we received the advisory XON too
322 // frequently.
323
324 // Ensure that we have sent some bytes. This might be covered by other checks below, but this
325 // is the most important check so we do it explicitly here first.
326 if self.bytes_sent_total.0 == 0 {
327 const MSG: &str = "Received XON before sending any data";
328 return Err(Error::CircProto(MSG.into()));
329 }
330
331 // is this an advisory XON?
332 let is_advisory = match self.last_recvd_xon_xoff {
333 // if we last received an XON, then this is advisory since we are already sending data
334 Some(XonXoff::Xon) => true,
335 // if we last received an XOFF, then this isn't advisory since we're being asked to
336 // resume sending data
337 Some(XonXoff::Xoff) => false,
338 // if we never received an XON nor XOFF, then this is advisory since we are already
339 // sending data
340 None => true,
341 };
342
343 // set this before we possibly return early below, since this must be set regardless of if
344 // it's an advisory XON or not
345 self.last_recvd_xon_xoff = Some(XonXoff::Xon);
346
347 // we only restrict advisory XON messages
348 if !is_advisory {
349 return Ok(());
350 }
351
352 self.num_advisory_xon_recvd += 1;
353
354 // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
355 // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
356 //
357 // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
358 let advisory_not_expected_before = std::cmp::min(
359 Self::peer_xoff_limit_bytes(params),
360 Self::peer_xon_limit_bytes(params),
361 );
362 if self.bytes_sent_total.0 < advisory_not_expected_before {
363 const MSG: &str = "Received advisory XON too early";
364 return Err(Error::CircProto(MSG.into()));
365 }
366
367 // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
368 // > 'cc_xon_rate' cells worth of sent data.
369 //
370 // It should be an error if
371 // XON frequency > 1/peer_xon_limit_bytes
372 // where
373 // XON frequency = num_advisory_xon_recvd/bytes_sent_total
374 //
375 // so
376 // num_advisory_xon_recvd/bytes_sent_total > 1/peer_xon_limit_bytes
377 //
378 // or to better work with integers
379 // num_advisory_xon_recvd > bytes_sent_total/peer_xon_limit_bytes
380 //
381 // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
382 let peer_xon_limit_bytes = Self::peer_xon_limit_bytes(params);
383 if peer_xon_limit_bytes != 0
384 && self.num_advisory_xon_recvd.0 > self.bytes_sent_total.0 / peer_xon_limit_bytes
385 {
386 const MSG: &str = "Received advisory XON too frequently";
387 return Err(Error::CircProto(MSG.into()));
388 }
389
390 Ok(())
391 }
392
393 /// Notify that we have received an XOFF message.
394 fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
395 // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
396 // main sidechannel risk is early cells, but we also check to make sure that we have not
397 // received more XOFFs than could have been generated by the bytes we sent.
398 //
399 // The ordering is important here. For example we first want to disallow consecutive XOFFs,
400 // then check if we received an XOFF that was too early, and finally check if we received
401 // the XOFF too frequently.
402
403 self.num_xoff_recvd += 1;
404
405 // Ensure that we have sent some bytes. This might be covered by other checks below, but this
406 // is the most important check so we do it explicitly here first.
407 if self.bytes_sent_total.0 == 0 {
408 const MSG: &str = "Received XOFF before sending any data";
409 return Err(Error::CircProto(MSG.into()));
410 }
411
412 // disallow consecutive XOFF messages
413 if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
414 const MSG: &str = "Received consecutive XOFF messages";
415 return Err(Error::CircProto(MSG.into()));
416 }
417
418 // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
419 // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
420 // > onions).
421 //
422 // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
423 if self.bytes_sent_total.0 < Self::peer_xoff_limit_bytes(params) {
424 const MSG: &str = "Received XOFF too early";
425 return Err(Error::CircProto(MSG.into()));
426 }
427
428 // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
429 // > limit worth of sent data.
430 //
431 // It should be an error if
432 // XOFF frequency > 1/peer_xoff_limit_bytes
433 // where
434 // XOFF frequency = num_xoff_recvd/bytes_sent_total
435 //
436 // so
437 // num_xoff_recvd/bytes_sent_total > 1/peer_xoff_limit_bytes
438 //
439 // or to better work with integers
440 // num_xoff_recvd > bytes_sent_total/peer_xoff_limit_bytes
441 //
442 // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
443 let peer_xoff_limit_bytes = Self::peer_xoff_limit_bytes(params);
444 if peer_xoff_limit_bytes != 0
445 && self.num_xoff_recvd.0 > self.bytes_sent_total.0 / peer_xoff_limit_bytes
446 {
447 return Err(Error::CircProto("Received XOFF too frequently".into()));
448 }
449
450 self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
451
452 Ok(())
453 }
454}
455
456#[cfg(test)]
457mod test {
458 use super::*;
459
460 use crate::stream::flow_ctrl::params::CellCount;
461
462 #[test]
463 fn sidechannel_mitigation() {
464 let params = [
465 FlowCtrlParameters {
466 cc_xoff_client: CellCount::new(2),
467 cc_xoff_exit: CellCount::new(4),
468 cc_xon_rate: CellCount::new(8),
469 cc_xon_change_pct: 1,
470 cc_xon_ewma_cnt: 1,
471 },
472 FlowCtrlParameters {
473 cc_xoff_client: CellCount::new(8),
474 cc_xoff_exit: CellCount::new(4),
475 cc_xon_rate: CellCount::new(2),
476 cc_xon_change_pct: 1,
477 cc_xon_ewma_cnt: 1,
478 },
479 ];
480
481 for params in params {
482 let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(¶ms);
483 let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(¶ms);
484
485 let mut x = SidechannelMitigation::new();
486 // cannot receive XON as first message
487 assert!(x.received_xon(¶ms).is_err());
488
489 let mut x = SidechannelMitigation::new();
490 // cannot receive XOFF as first message
491 assert!(x.received_xoff(¶ms).is_err());
492
493 let mut x = SidechannelMitigation::new();
494 // cannot receive XOFF after sending fewer than `xoff_limit` bytes
495 x.sent_stream_data(xoff_limit as usize - 1);
496 assert!(x.received_xoff(¶ms).is_err());
497
498 let mut x = SidechannelMitigation::new();
499 // can receive XOFF after sending `xoff_limit` bytes
500 x.sent_stream_data(xoff_limit as usize);
501 assert!(x.received_xoff(¶ms).is_ok());
502 // but cannot receive another XOFF immediately after
503 assert!(x.received_xoff(¶ms).is_err());
504
505 let mut x = SidechannelMitigation::new();
506 // can receive XOFF after sending `xoff_limit` bytes
507 x.sent_stream_data(xoff_limit as usize);
508 assert!(x.received_xoff(¶ms).is_ok());
509 // but cannot receive another XOFF even after sending another `xoff_limit` bytes
510 x.sent_stream_data(xoff_limit as usize);
511 assert!(x.received_xoff(¶ms).is_err());
512
513 let mut x = SidechannelMitigation::new();
514 // can receive XOFF after sending `xoff_limit` bytes
515 x.sent_stream_data(xoff_limit as usize);
516 assert!(x.received_xoff(¶ms).is_ok());
517 // and can immediately receive an XON
518 assert!(x.received_xon(¶ms).is_ok());
519 // and can receive another XOFF after sending another `xoff_limit` bytes
520 x.sent_stream_data(xoff_limit as usize);
521 assert!(x.received_xoff(¶ms).is_ok());
522
523 let mut x = SidechannelMitigation::new();
524 // cannot receive XON after sending fewer than `xon_limit` bytes
525 x.sent_stream_data(xon_limit as usize - 1);
526 assert!(x.received_xon(¶ms).is_err());
527
528 let mut x = SidechannelMitigation::new();
529 // can receive XON after sending a large number of bytes
530 x.sent_stream_data(xon_limit as usize * 3);
531 assert!(x.received_xon(¶ms).is_ok());
532 // and can immediately receive another XON
533 assert!(x.received_xon(¶ms).is_ok());
534 // and can immediately receive another XON
535 assert!(x.received_xon(¶ms).is_ok());
536 // but cannot receive another XON immediately after
537 assert!(x.received_xon(¶ms).is_err());
538
539 let mut x = SidechannelMitigation::new();
540 // can receive XOFF after sending a large number of bytes
541 x.sent_stream_data(xoff_limit as usize * 3);
542 assert!(x.received_xoff(¶ms).is_ok());
543 // and can immediately receive an XON
544 assert!(x.received_xon(¶ms).is_ok());
545 // and can immediately receive an XOFF
546 assert!(x.received_xoff(¶ms).is_ok());
547 // and can immediately receive an XON
548 assert!(x.received_xon(¶ms).is_ok());
549 // and can immediately receive an XOFF
550 assert!(x.received_xoff(¶ms).is_ok());
551 // and can immediately receive an XON
552 assert!(x.received_xon(¶ms).is_ok());
553 // but cannot immediately receive an XOFF
554 assert!(x.received_xoff(¶ms).is_err());
555 }
556 }
557}