tor_proto/stream/flow_ctrl/xon_xoff/reader.rs
1//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2//!
3//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4//! control.
5
6use std::io::Error;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use futures::{AsyncRead, Stream};
11use pin_project::pin_project;
12use tor_basic_utils::assert_val_impl_trait;
13use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14
15use crate::stream::StreamTarget;
16use crate::util::notify::NotifyReceiver;
17
18/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
19///
20/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
21/// events.
22#[derive(Debug)]
23#[pin_project]
24pub(crate) struct XonXoffReader<R> {
25 /// How we communicate with the circuit reactor.
26 #[pin]
27 ctrl: XonXoffReaderCtrl,
28 /// The inner reader.
29 #[pin]
30 reader: R,
31 /// Have we received a drain rate request notification from the reactor,
32 /// but haven't yet sent a drain rate update back to the reactor?
33 pending_drain_rate_update: bool,
34}
35
36impl<R> XonXoffReader<R> {
37 /// Create a new [`XonXoffReader`].
38 ///
39 /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
40 /// the incoming stream buffer is empty or not.
41 pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
42 Self {
43 ctrl,
44 reader,
45 pending_drain_rate_update: false,
46 }
47 }
48
49 /// Get a reference to the inner [`AsyncRead`].
50 ///
51 /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
52 /// how you use the returned reader (for example if it uses interior mutability).
53 pub(crate) fn inner(&self) -> &R {
54 &self.reader
55 }
56
57 /// Get a mutable reference to the inner [`AsyncRead`].
58 ///
59 /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
60 /// how you use the returned reader (for example if you read bytes directly).
61 pub(crate) fn inner_mut(&mut self) -> &mut R {
62 &mut self.reader
63 }
64}
65
66impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
67 fn poll_read(
68 self: Pin<&mut Self>,
69 cx: &mut Context<'_>,
70 buf: &mut [u8],
71 ) -> Poll<Result<usize, Error>> {
72 let mut self_ = self.project();
73
74 // ensure that `drain_rate_request_stream` is a `FusedStream`,
75 // which means that we don't need to worry about calling `poll_next()` repeatedly
76 assert_val_impl_trait!(
77 self_.ctrl.drain_rate_request_stream,
78 futures::stream::FusedStream,
79 );
80
81 // check if the circuit reactor has requested a drain rate update
82 if let Poll::Ready(Some(())) = self_
83 .ctrl
84 .as_mut()
85 .project()
86 .drain_rate_request_stream
87 .poll_next(cx)
88 {
89 // a drain rate update was requested, so we need to send a drain rate update once we
90 // have no more bytes buffered
91 *self_.pending_drain_rate_update = true;
92 }
93
94 // try reading from the inner reader
95 let res = self_.reader.as_mut().poll_read(cx, buf);
96
97 // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
98 if *self_.pending_drain_rate_update && self_.reader.is_empty() {
99 // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
100 // send an "unlimited" drain rate
101 self_
102 .ctrl
103 .stream_target
104 .drain_rate_update(XonKbpsEwma::Unlimited)?;
105 *self_.pending_drain_rate_update = false;
106 }
107
108 res
109 }
110}
111
112/// The control structure for a stream that partakes in XON/XOFF flow control.
113#[derive(Debug)]
114#[pin_project]
115pub(crate) struct XonXoffReaderCtrl {
116 /// Receive notifications when the reactor requests a new drain rate.
117 /// When we do, we should begin waiting for the receive buffer to clear.
118 /// Then when the buffer clears, we should send a new drain rate update to the reactor.
119 #[pin]
120 drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
121 /// A handle to the reactor for this stream.
122 /// This allows us to send drain rate updates to the circuit reactor.
123 stream_target: StreamTarget,
124}
125
126impl XonXoffReaderCtrl {
127 /// Create a new [`XonXoffReaderCtrl`].
128 pub(crate) fn new(
129 drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
130 stream_target: StreamTarget,
131 ) -> Self {
132 Self {
133 drain_rate_request_stream,
134 stream_target,
135 }
136 }
137}
138
139/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
140/// (typically resulting in an XON message).
141pub(crate) trait BufferIsEmpty {
142 /// Returns `true` if there are no incoming bytes buffered on this stream.
143 ///
144 /// This takes a `&mut` so that implementers can
145 /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
146 /// a stream if necessary.
147 fn is_empty(self: Pin<&mut Self>) -> bool;
148}
149
150/// A marker type for a [`NotifySender`](crate::util::notify::NotifySender)
151/// indicating that notifications are for new drain rate requests.
152#[derive(Debug)]
153pub(crate) struct DrainRateRequest;