Skip to main content

tor_proto/stream/flow_ctrl/window/
state.rs

1//! Circuit reactor's stream window flow control.
2
3use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
4use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
5use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
6
7use crate::congestion::sendme::{self, StreamSendWindow};
8use crate::stream::flow_ctrl::state::FlowCtrlHooks;
9use crate::{Error, Result};
10
11#[cfg(doc)]
12use crate::stream::flow_ctrl::state::StreamFlowCtrl;
13
14/// State for window-based flow control.
15#[derive(Debug)]
16pub(crate) struct WindowFlowCtrl {
17    /// Send window.
18    window: StreamSendWindow,
19}
20
21impl WindowFlowCtrl {
22    /// Returns a new sendme-window-based state.
23    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
24    // Unclear whether we need or want to support creating this object from a
25    // preexisting StreamSendWindow.
26    pub(crate) fn new(window: StreamSendWindow) -> Self {
27        Self { window }
28    }
29}
30
31impl FlowCtrlHooks for WindowFlowCtrl {
32    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
33        !sendme::cmd_counts_towards_windows(msg.cmd()) || self.window.window() > 0
34    }
35
36    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
37        if sendme::cmd_counts_towards_windows(msg.cmd()) {
38            self.window.take().map(|_| ())
39        } else {
40            // TODO: Maybe make this an error?
41            // Ideally caller would have checked this already.
42            Ok(())
43        }
44    }
45
46    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
47        let _sendme = msg
48            .decode::<Sendme>()
49            .map_err(|e| Error::from_bytes_err(e, "failed to decode stream sendme message"))?
50            .into_msg();
51
52        self.window.put()
53    }
54
55    fn handle_incoming_xon(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
56        let msg = "XON messages not allowed with window flow control";
57        Err(Error::CircProto(msg.into()))
58    }
59
60    fn handle_incoming_xoff(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
61        let msg = "XOFF messages not allowed with window flow control";
62        Err(Error::CircProto(msg.into()))
63    }
64
65    fn maybe_send_xon(&mut self, _rate: XonKbpsEwma, _buffer_len: usize) -> Result<Option<Xon>> {
66        let msg = "XON messages cannot be sent with window flow control";
67        Err(Error::CircProto(msg.into()))
68    }
69
70    fn maybe_send_xoff(&mut self, _buffer_len: usize) -> Result<Option<Xoff>> {
71        let msg = "XOFF messages cannot be sent with window flow control";
72        Err(Error::CircProto(msg.into()))
73    }
74}