Skip to main content

tor_proto/stream/flow_ctrl/xon_xoff/
state.rs

1//! Circuit reactor's stream XON/XOFF flow control.
2//!
3//! ## Notes on consensus parameters
4//!
5//! ### `cc_xoff_client`
6//!
7//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8//! bytes buffered can be *much* larger. For example there will be additional buffering:
9//!
10//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11//!   it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12//!   data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13//!   the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14//! - Within the kernel: There are two additional buffers that will store stream data before the
15//!   application connected over socks will see the data: Arti's socket send buffer and the
16//!   application's socket receive buffer. If the application were to stop reading from its socket,
17//!   stream data would accumulate first in the socket's receive buffer. Once full, stream data
18//!   would accumulate in arti's socket's send buffer. This can become relatively large, especially
19//!   with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20//!   stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21//!   2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22//!   significantly higher than the current consensus value of `cc_xoff_client`.
23//!
24//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
25//! than `cc_xoff_client`.
26//!
27//! While we should take into account the kernel and arti socks buffering above, we also need to
28//! keep in mind that arti-client is a library that can be used by others. These library users might
29//! not do any kernel or socks buffering, for example if they write a rust program that handles the
30//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
31//! harms the performance for these users, even if it's fine for the arti socks proxy case.
32
33use std::num::Saturating;
34use std::sync::Arc;
35
36use postage::watch;
37use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
38use tor_cell::relaycell::msg::AnyRelayMsg;
39use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
40use tracing::trace;
41
42use super::reader::DrainRateRequest;
43
44use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
45use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamRateLimit};
46use crate::util::notify::NotifySender;
47use crate::{Error, Result};
48
49#[cfg(doc)]
50use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
51
52/// State for XON/XOFF flow control.
53#[derive(Debug)]
54pub(crate) struct XonXoffFlowCtrl {
55    /// Consensus parameters.
56    params: Arc<FlowCtrlParameters>,
57    /// How we communicate rate limit updates to the
58    /// [`DataWriter`](crate::client::stream::DataWriter).
59    rate_limit_updater: watch::Sender<StreamRateLimit>,
60    /// How we communicate requests for new drain rate updates to the
61    /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
62    drain_rate_requester: NotifySender<DrainRateRequest>,
63    /// The last rate limit we sent.
64    last_sent_xon_xoff: Option<XonXoffMsg>,
65    /// The buffer limit at which we should send an XOFF.
66    ///
67    /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
68    /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
69    /// below).
70    xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
71    /// DropMark sidechannel mitigations.
72    ///
73    /// This is only enabled if we are a client (including an onion service).
74    //
75    // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
76    // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
77    // reconsider in the future if we add more functionality to `SidechannelMitigation`.
78    sidechannel_mitigation: Option<SidechannelMitigation>,
79}
80
81impl XonXoffFlowCtrl {
82    /// Returns a new xon/xoff-based state.
83    pub(crate) fn new(
84        params: Arc<FlowCtrlParameters>,
85        use_sidechannel_mitigations: bool,
86        rate_limit_updater: watch::Sender<StreamRateLimit>,
87        drain_rate_requester: NotifySender<DrainRateRequest>,
88    ) -> Self {
89        let sidechannel_mitigation =
90            use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
91
92        // We use the same XOFF limit regardless of if we're a client or exit.
93        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
94        let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
95
96        Self {
97            params,
98            rate_limit_updater,
99            drain_rate_requester,
100            last_sent_xon_xoff: None,
101            xoff_limit,
102            sidechannel_mitigation,
103        }
104    }
105}
106
107impl FlowCtrlHooks for XonXoffFlowCtrl {
108    fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
109        // we perform rate-limiting in the `DataWriter`,
110        // so we send any messages that made it past the `DataWriter`
111        true
112    }
113
114    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
115        // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
116        // notify that we sent a data message
117        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
118            if let AnyRelayMsg::Data(data_msg) = msg {
119                sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
120            }
121        }
122
123        Ok(())
124    }
125
126    fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
127        let msg = "Stream level SENDME not allowed due to congestion control";
128        Err(Error::CircProto(msg.into()))
129    }
130
131    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
132        let xon = msg
133            .decode::<Xon>()
134            .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
135            .into_msg();
136
137        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
138        // > violation.
139        if *xon.version() != 0 {
140            return Err(Error::CircProto("Unrecognized XON version".into()));
141        }
142
143        // if sidechannel mitigations are enabled, notify that an XON was received
144        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
145            sidechannel_mitigation.received_xon(&self.params)?;
146        }
147
148        trace!("Received an XON with rate {}", xon.kbps_ewma());
149
150        let rate = match xon.kbps_ewma() {
151            XonKbpsEwma::Limited(rate_kbps) => {
152                let rate_kbps = u64::from(rate_kbps.get());
153                // convert from kbps to bytes/s
154                StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
155            }
156            XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
157        };
158
159        *self.rate_limit_updater.borrow_mut() = rate;
160        Ok(())
161    }
162
163    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
164        let xoff = msg
165            .decode::<Xoff>()
166            .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
167            .into_msg();
168
169        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
170        // > violation.
171        if *xoff.version() != 0 {
172            return Err(Error::CircProto("Unrecognized XOFF version".into()));
173        }
174
175        // if sidechannel mitigations are enabled, notify that an XOFF was received
176        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
177            sidechannel_mitigation.received_xoff(&self.params)?;
178        }
179
180        trace!("Received an XOFF");
181
182        // update the rate limit and notify the `DataWriter`
183        *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
184
185        Ok(())
186    }
187
188    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
189        if buffer_len as u64 > self.xoff_limit.as_bytes() {
190            // we can't send an XON, and we should have already sent an XOFF when the queue first
191            // exceeded the limit (see `maybe_send_xoff()`)
192            debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
193
194            // inform the stream reader that we need a new drain rate
195            self.drain_rate_requester.notify();
196            return Ok(None);
197        }
198
199        self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
200
201        trace!("Want to send an XON with rate {rate}");
202
203        Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
204    }
205
206    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
207        // if the last XON/XOFF we sent was an XOFF, no need to send another
208        if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
209            return Ok(None);
210        }
211
212        if buffer_len as u64 <= self.xoff_limit.as_bytes() {
213            return Ok(None);
214        }
215
216        // either we have never sent an XOFF or XON, or we last sent an XON
217
218        // remember that we last sent an XOFF
219        self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
220
221        // inform the stream reader that we need a new drain rate
222        self.drain_rate_requester.notify();
223
224        trace!("Want to send an XOFF");
225
226        Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
227    }
228}
229
230/// An XON or XOFF message with no associated data.
231#[derive(Debug, PartialEq, Eq)]
232enum XonXoff {
233    /// XON message.
234    Xon,
235    /// XOFF message.
236    Xoff,
237}
238
239/// An XON or XOFF message with associated data.
240#[derive(Debug)]
241enum XonXoffMsg {
242    /// XON message with a rate.
243    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
244    // If that doesn't end up being the case, then we should remove it.
245    #[expect(dead_code)]
246    Xon(XonKbpsEwma),
247    /// XOFF message.
248    Xoff,
249}
250
251/// Sidechannel mitigations for DropMark attacks.
252///
253/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
254/// > restricted.
255///
256/// These restrictions should be implemented for clients (OPs and onion services).
257#[derive(Debug)]
258struct SidechannelMitigation {
259    /// The last rate limit update we received.
260    last_recvd_xon_xoff: Option<XonXoff>,
261    /// Number of sent stream bytes.
262    ///
263    /// C-tor has some logic to try to fit this into a 32-bit integer,
264    /// but lets not do that unless we need to as it will make bugs more likely.
265    bytes_sent_total: Saturating<u64>,
266    /// The number of advisory XON messages we've received.
267    ///
268    /// Note: Advisory XONs are XON->XON messages, and not XOFF->XON messages.
269    num_advisory_xon_recvd: Saturating<u64>,
270    /// The number of XOFF messages we've received.
271    num_xoff_recvd: Saturating<u64>,
272}
273
274impl SidechannelMitigation {
275    /// A new [`SidechannelMitigation`].
276    fn new() -> Self {
277        Self {
278            last_recvd_xon_xoff: None,
279            bytes_sent_total: Saturating(0),
280            num_advisory_xon_recvd: Saturating(0),
281            num_xoff_recvd: Saturating(0),
282        }
283    }
284
285    /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
286    fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
287        // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
288        // here exactly what kind of peer we're connected to, and that we may have a different view
289        // of the consensus than the peer.
290        // We deviate from prop324 here and use a more relaxed threshold.
291        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
292        let min = std::cmp::min(
293            params.cc_xoff_client.as_bytes(),
294            params.cc_xoff_exit.as_bytes(),
295        );
296        min / 2
297    }
298
299    /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
300    fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
301        // We need to consider that we may have a different view of the consensus than the peer.
302        // We deviate from prop324 here and use a more relaxed threshold.
303        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
304        params.cc_xon_rate.as_bytes() / 2
305    }
306
307    /// Notify that we have sent stream data.
308    fn sent_stream_data(&mut self, stream_bytes: usize) {
309        // perform a saturating conversion to u64
310        let stream_bytes: u64 = stream_bytes.try_into().unwrap_or(u64::MAX);
311        self.bytes_sent_total += stream_bytes;
312    }
313
314    /// Notify that we have received an XON message.
315    fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
316        // Check to make sure that XON is not sent too early, for dropmark attacks. The main
317        // sidechannel risk is early cells, but we also check to see that we did not get more XONs
318        // than make sense for the number of bytes we sent.
319        //
320        // The ordering is important here. For example we first want to check if we received an
321        // advisory XON that was too early, before we check if we received the advisory XON too
322        // frequently.
323
324        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
325        // is the most important check so we do it explicitly here first.
326        if self.bytes_sent_total.0 == 0 {
327            const MSG: &str = "Received XON before sending any data";
328            return Err(Error::CircProto(MSG.into()));
329        }
330
331        // is this an advisory XON?
332        let is_advisory = match self.last_recvd_xon_xoff {
333            // if we last received an XON, then this is advisory since we are already sending data
334            Some(XonXoff::Xon) => true,
335            // if we last received an XOFF, then this isn't advisory since we're being asked to
336            // resume sending data
337            Some(XonXoff::Xoff) => false,
338            // if we never received an XON nor XOFF, then this is advisory since we are already
339            // sending data
340            None => true,
341        };
342
343        // set this before we possibly return early below, since this must be set regardless of if
344        // it's an advisory XON or not
345        self.last_recvd_xon_xoff = Some(XonXoff::Xon);
346
347        // we only restrict advisory XON messages
348        if !is_advisory {
349            return Ok(());
350        }
351
352        self.num_advisory_xon_recvd += 1;
353
354        // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
355        // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
356        //
357        // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
358        let advisory_not_expected_before = std::cmp::min(
359            Self::peer_xoff_limit_bytes(params),
360            Self::peer_xon_limit_bytes(params),
361        );
362        if self.bytes_sent_total.0 < advisory_not_expected_before {
363            const MSG: &str = "Received advisory XON too early";
364            return Err(Error::CircProto(MSG.into()));
365        }
366
367        // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
368        // > 'cc_xon_rate' cells worth of sent data.
369        //
370        // It should be an error if
371        //   XON frequency > 1/peer_xon_limit_bytes
372        // where
373        //   XON frequency = num_advisory_xon_recvd/bytes_sent_total
374        //
375        // so
376        //   num_advisory_xon_recvd/bytes_sent_total > 1/peer_xon_limit_bytes
377        //
378        // or to better work with integers
379        //   num_advisory_xon_recvd > bytes_sent_total/peer_xon_limit_bytes
380        //
381        // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
382        let peer_xon_limit_bytes = Self::peer_xon_limit_bytes(params);
383        if peer_xon_limit_bytes != 0
384            && self.num_advisory_xon_recvd.0 > self.bytes_sent_total.0 / peer_xon_limit_bytes
385        {
386            const MSG: &str = "Received advisory XON too frequently";
387            return Err(Error::CircProto(MSG.into()));
388        }
389
390        Ok(())
391    }
392
393    /// Notify that we have received an XOFF message.
394    fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
395        // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
396        // main sidechannel risk is early cells, but we also check to make sure that we have not
397        // received more XOFFs than could have been generated by the bytes we sent.
398        //
399        // The ordering is important here. For example we first want to disallow consecutive XOFFs,
400        // then check if we received an XOFF that was too early, and finally check if we received
401        // the XOFF too frequently.
402
403        self.num_xoff_recvd += 1;
404
405        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
406        // is the most important check so we do it explicitly here first.
407        if self.bytes_sent_total.0 == 0 {
408            const MSG: &str = "Received XOFF before sending any data";
409            return Err(Error::CircProto(MSG.into()));
410        }
411
412        // disallow consecutive XOFF messages
413        if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
414            const MSG: &str = "Received consecutive XOFF messages";
415            return Err(Error::CircProto(MSG.into()));
416        }
417
418        // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
419        // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
420        // > onions).
421        //
422        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
423        if self.bytes_sent_total.0 < Self::peer_xoff_limit_bytes(params) {
424            const MSG: &str = "Received XOFF too early";
425            return Err(Error::CircProto(MSG.into()));
426        }
427
428        // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
429        // > limit worth of sent data.
430        //
431        // It should be an error if
432        //   XOFF frequency > 1/peer_xoff_limit_bytes
433        // where
434        //   XOFF frequency = num_xoff_recvd/bytes_sent_total
435        //
436        // so
437        //   num_xoff_recvd/bytes_sent_total > 1/peer_xoff_limit_bytes
438        //
439        // or to better work with integers
440        //   num_xoff_recvd > bytes_sent_total/peer_xoff_limit_bytes
441        //
442        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
443        let peer_xoff_limit_bytes = Self::peer_xoff_limit_bytes(params);
444        if peer_xoff_limit_bytes != 0
445            && self.num_xoff_recvd.0 > self.bytes_sent_total.0 / peer_xoff_limit_bytes
446        {
447            return Err(Error::CircProto("Received XOFF too frequently".into()));
448        }
449
450        self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
451
452        Ok(())
453    }
454}
455
456#[cfg(test)]
457mod test {
458    use super::*;
459
460    use crate::stream::flow_ctrl::params::CellCount;
461
462    #[test]
463    fn sidechannel_mitigation() {
464        let params = [
465            FlowCtrlParameters {
466                cc_xoff_client: CellCount::new(2),
467                cc_xoff_exit: CellCount::new(4),
468                cc_xon_rate: CellCount::new(8),
469                cc_xon_change_pct: 1,
470                cc_xon_ewma_cnt: 1,
471            },
472            FlowCtrlParameters {
473                cc_xoff_client: CellCount::new(8),
474                cc_xoff_exit: CellCount::new(4),
475                cc_xon_rate: CellCount::new(2),
476                cc_xon_change_pct: 1,
477                cc_xon_ewma_cnt: 1,
478            },
479        ];
480
481        for params in params {
482            let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(&params);
483            let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(&params);
484
485            let mut x = SidechannelMitigation::new();
486            // cannot receive XON as first message
487            assert!(x.received_xon(&params).is_err());
488
489            let mut x = SidechannelMitigation::new();
490            // cannot receive XOFF as first message
491            assert!(x.received_xoff(&params).is_err());
492
493            let mut x = SidechannelMitigation::new();
494            // cannot receive XOFF after sending fewer than `xoff_limit` bytes
495            x.sent_stream_data(xoff_limit as usize - 1);
496            assert!(x.received_xoff(&params).is_err());
497
498            let mut x = SidechannelMitigation::new();
499            // can receive XOFF after sending `xoff_limit` bytes
500            x.sent_stream_data(xoff_limit as usize);
501            assert!(x.received_xoff(&params).is_ok());
502            // but cannot receive another XOFF immediately after
503            assert!(x.received_xoff(&params).is_err());
504
505            let mut x = SidechannelMitigation::new();
506            // can receive XOFF after sending `xoff_limit` bytes
507            x.sent_stream_data(xoff_limit as usize);
508            assert!(x.received_xoff(&params).is_ok());
509            // but cannot receive another XOFF even after sending another `xoff_limit` bytes
510            x.sent_stream_data(xoff_limit as usize);
511            assert!(x.received_xoff(&params).is_err());
512
513            let mut x = SidechannelMitigation::new();
514            // can receive XOFF after sending `xoff_limit` bytes
515            x.sent_stream_data(xoff_limit as usize);
516            assert!(x.received_xoff(&params).is_ok());
517            // and can immediately receive an XON
518            assert!(x.received_xon(&params).is_ok());
519            // and can receive another XOFF after sending another `xoff_limit` bytes
520            x.sent_stream_data(xoff_limit as usize);
521            assert!(x.received_xoff(&params).is_ok());
522
523            let mut x = SidechannelMitigation::new();
524            // cannot receive XON after sending fewer than `xon_limit` bytes
525            x.sent_stream_data(xon_limit as usize - 1);
526            assert!(x.received_xon(&params).is_err());
527
528            let mut x = SidechannelMitigation::new();
529            // can receive XON after sending a large number of bytes
530            x.sent_stream_data(xon_limit as usize * 3);
531            assert!(x.received_xon(&params).is_ok());
532            // and can immediately receive another XON
533            assert!(x.received_xon(&params).is_ok());
534            // and can immediately receive another XON
535            assert!(x.received_xon(&params).is_ok());
536            // but cannot receive another XON immediately after
537            assert!(x.received_xon(&params).is_err());
538
539            let mut x = SidechannelMitigation::new();
540            // can receive XOFF after sending a large number of bytes
541            x.sent_stream_data(xoff_limit as usize * 3);
542            assert!(x.received_xoff(&params).is_ok());
543            // and can immediately receive an XON
544            assert!(x.received_xon(&params).is_ok());
545            // and can immediately receive an XOFF
546            assert!(x.received_xoff(&params).is_ok());
547            // and can immediately receive an XON
548            assert!(x.received_xon(&params).is_ok());
549            // and can immediately receive an XOFF
550            assert!(x.received_xoff(&params).is_ok());
551            // and can immediately receive an XON
552            assert!(x.received_xon(&params).is_ok());
553            // but cannot immediately receive an XOFF
554            assert!(x.received_xoff(&params).is_err());
555        }
556    }
557}