Skip to main content

tor_proto/stream/flow_ctrl/xon_xoff/
reader.rs

1//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2//!
3//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4//! control.
5
6use std::io::Error;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{AsyncRead, Stream};
11use pin_project::pin_project;
12use tor_basic_utils::assert_val_impl_trait;
13use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14
15use crate::stream::StreamTarget;
16use crate::util::notify::NotifyReceiver;
17
18/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
19///
20/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
21/// events.
22#[derive(Debug)]
23#[pin_project]
24pub(crate) struct XonXoffReader<R> {
25    /// How we communicate with the circuit reactor.
26    #[pin]
27    ctrl: XonXoffReaderCtrl,
28    /// The inner reader.
29    #[pin]
30    reader: R,
31    /// Have we received a drain rate request notification from the reactor,
32    /// but haven't yet sent a drain rate update back to the reactor?
33    pending_drain_rate_update: bool,
34}
35
36impl<R> XonXoffReader<R> {
37    /// Create a new [`XonXoffReader`].
38    ///
39    /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
40    /// the incoming stream buffer is empty or not.
41    pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
42        Self {
43            ctrl,
44            reader,
45            pending_drain_rate_update: false,
46        }
47    }
48
49    /// Get a reference to the inner [`AsyncRead`].
50    ///
51    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
52    /// how you use the returned reader (for example if it uses interior mutability).
53    pub(crate) fn inner(&self) -> &R {
54        &self.reader
55    }
56
57    /// Get a mutable reference to the inner [`AsyncRead`].
58    ///
59    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
60    /// how you use the returned reader (for example if you read bytes directly).
61    pub(crate) fn inner_mut(&mut self) -> &mut R {
62        &mut self.reader
63    }
64}
65
66impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
67    fn poll_read(
68        self: Pin<&mut Self>,
69        cx: &mut Context<'_>,
70        buf: &mut [u8],
71    ) -> Poll<Result<usize, Error>> {
72        let mut self_ = self.project();
73
74        // ensure that `drain_rate_request_stream` is a `FusedStream`,
75        // which means that we don't need to worry about calling `poll_next()` repeatedly
76        assert_val_impl_trait!(
77            self_.ctrl.drain_rate_request_stream,
78            futures::stream::FusedStream,
79        );
80
81        // check if the circuit reactor has requested a drain rate update
82        if let Poll::Ready(Some(())) = self_
83            .ctrl
84            .as_mut()
85            .project()
86            .drain_rate_request_stream
87            .poll_next(cx)
88        {
89            // a drain rate update was requested, so we need to send a drain rate update once we
90            // have no more bytes buffered
91            *self_.pending_drain_rate_update = true;
92        }
93
94        // try reading from the inner reader
95        let res = self_.reader.as_mut().poll_read(cx, buf);
96
97        // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
98        if *self_.pending_drain_rate_update && self_.reader.is_empty() {
99            // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
100            // send an "unlimited" drain rate
101            self_
102                .ctrl
103                .stream_target
104                .drain_rate_update(XonKbpsEwma::Unlimited)?;
105            *self_.pending_drain_rate_update = false;
106        }
107
108        res
109    }
110}
111
112/// The control structure for a stream that partakes in XON/XOFF flow control.
113#[derive(Debug)]
114#[pin_project]
115pub(crate) struct XonXoffReaderCtrl {
116    /// Receive notifications when the reactor requests a new drain rate.
117    /// When we do, we should begin waiting for the receive buffer to clear.
118    /// Then when the buffer clears, we should send a new drain rate update to the reactor.
119    #[pin]
120    drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
121    /// A handle to the reactor for this stream.
122    /// This allows us to send drain rate updates to the circuit reactor.
123    stream_target: StreamTarget,
124}
125
126impl XonXoffReaderCtrl {
127    /// Create a new [`XonXoffReaderCtrl`].
128    pub(crate) fn new(
129        drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
130        stream_target: StreamTarget,
131    ) -> Self {
132        Self {
133            drain_rate_request_stream,
134            stream_target,
135        }
136    }
137}
138
139/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
140/// (typically resulting in an XON message).
141pub(crate) trait BufferIsEmpty {
142    /// Returns `true` if there are no incoming bytes buffered on this stream.
143    ///
144    /// This takes a `&mut` so that implementers can
145    /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
146    /// a stream if necessary.
147    fn is_empty(self: Pin<&mut Self>) -> bool;
148}
149
150/// A marker type for a [`NotifySender`](crate::util::notify::NotifySender)
151/// indicating that notifications are for new drain rate requests.
152#[derive(Debug)]
153pub(crate) struct DrainRateRequest;