tor_proto/stream/raw.rs
1//! Declare the lowest level of stream: a stream that operates on raw
2//! cells.
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::Stream;
8use pin_project::pin_project;
9use tor_async_utils::peekable_stream::UnobtrusivePeekableStream;
10use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
11use tracing::debug;
12
13use crate::congestion::sendme;
14use crate::stream::StreamTarget;
15use crate::stream::queue::StreamQueueReceiver;
16use crate::{Error, Result};
17
18/// The read part of a stream on a particular circuit.
19///
20/// This [`Stream`](Stream) will return incoming messages for this Tor stream, excluding flow control
21/// related messages like SENDME, XON, and XOFF.
22///
23/// To avoid ambiguity, the following uses "stream" to refer to the `futures::Stream`, not the Tor
24/// stream.
25///
26/// If the stream ends unexpectedly (before an END message), the stream will return an error.
27/// After the stream returns an END message or an error, this stream will be "terminated" and future
28/// [`poll_next`](Stream::poll_next) calls will return `None`.
29// I think it would be better to *not* return an error if the stream ends before an END message is
30// received, and just return `None`. The caller will know if it received an END message or not, so
31// returning an error isn't very useful and is maybe unexpected.
32#[derive(Debug)]
33#[pin_project]
34pub struct StreamReceiver {
35 /// The underlying `StreamTarget` for this stream.
36 ///
37 /// A reader has this target in order to:
38 /// * Make the reactor send SENDME messages.
39 /// * Tell the reactor when there is a protocol error.
40 /// * Keep the stream alive at least until the StreamReceiver
41 /// is dropped.
42 pub(crate) target: StreamTarget,
43 /// Channel to receive stream messages from the reactor.
44 #[pin]
45 pub(crate) receiver: StreamQueueReceiver,
46 /// Congestion control receive window for this stream.
47 ///
48 /// Having this here means we're only going to update it when the end consumer of this stream
49 /// actually reads things, meaning we don't ask for more data until it's actually needed (as
50 /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
51 /// with having to buffer data).
52 pub(crate) recv_window: sendme::StreamRecvWindow,
53 /// Whether or not this stream has ended.
54 pub(crate) ended: bool,
55}
56
57impl StreamReceiver {
58 /// Try to read the next relay message from this stream.
59 fn poll_next_inner(
60 mut self: Pin<&mut Self>,
61 cx: &mut Context<'_>,
62 ) -> Result<Poll<UnparsedRelayMsg>> {
63 let msg = match self.as_mut().project().receiver.poll_next(cx) {
64 Poll::Ready(Some(msg)) => msg,
65 Poll::Ready(None) => {
66 // The channel is indicating that it has terminated, likely from a dropped sender.
67 // But if we're here, it means we never received an END cell.
68 //
69 // We don't know here if the sender was dropped because either:
70 // - The stream map dropped its open entry (see `StreamMap::terminate()`).
71 // For example if `DataStream::close()` was called.
72 // - The circuit closed and the entire stream map was dropped.
73 //
74 // We only know conclusively that the stream is closed.
75 return Err(Error::NotConnected);
76 }
77 Poll::Pending => return Ok(Poll::Pending),
78 };
79
80 if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
81 if let Err(e) = self.target.send_sendme() {
82 if matches!(e, Error::CircuitClosed) {
83 // If the tunnel has closed, sending a message to the tunnel reactor may fail.
84 // But this is okay. We still want the user to be able to continue reading the
85 // remaining queued data for this stream, and if the tunnel has closed it
86 // wouldn't make sense to send a SENDME message anyways.
87 debug!("Failed to send stream-level SENDME. Ignoring: {e}");
88 } else {
89 // This error is unexpected. Let's return it to the user.
90 return Err(e);
91 }
92 }
93 self.recv_window.put();
94 }
95
96 Ok(Poll::Ready(msg))
97 }
98
99 /// Shut down this stream.
100 pub fn protocol_error(&mut self) {
101 self.target.protocol_error();
102 }
103
104 /// Is the stream currently empty?
105 ///
106 /// This method is inherently subject to race conditions. More data may arrive even before this
107 /// method returns, so a result of `true` might have been outdated before the method even
108 /// returned.
109 ///
110 /// This takes a `&mut` so that we can peek the stream.
111 ///
112 /// We provide an `is_empty` method rather than implementing [`UnobtrusivePeekableStream`]
113 /// directly since `UnobtrusivePeekableStream` allows you to mutate the peeked item, which could
114 /// break any accounting we do here in `StreamReceiver` (like stream sendme accounting). Also
115 /// the stream types are incompatible (the inner receiver returns items of `UnparsedRelayMsg`,
116 /// but this [`StreamReceiver`] returns items of `Result<UnparsedRelayMsg>`).
117 pub(crate) fn is_empty(&mut self) -> bool {
118 // The `StreamQueueReceiver` gives us two ways of checking if the queue is empty:
119 // `unobtrusive_peek().is_none()` and `approx_stream_bytes() == 0`. The peek seems like a
120 // better approach, so we do that here.
121 // TODO(arti#534): Should reconsider using `unobtrusive_peek()`. What we really want to know
122 // is if there is more stream data in the queue. But peeking only tells us if there are more
123 // messages. There could be more messages, but none of them data messages.
124 let peek_is_none = Pin::new(&mut self.receiver).unobtrusive_peek().is_none();
125
126 // if the peek says that the stream is empty, assert that `approx_stream_bytes()` shows 0
127 // bytes
128 #[cfg(debug_assertions)]
129 if peek_is_none {
130 assert_eq!(self.receiver.approx_stream_bytes(), 0);
131 } else {
132 // if the peek is not empty it doesn't mean that approx_stream_bytes() != 0,
133 // since there may be messages that contain no stream data
134 }
135
136 peek_is_none
137 }
138}
139
140impl Stream for StreamReceiver {
141 type Item = Result<UnparsedRelayMsg>;
142
143 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 if self.ended {
145 // Prevent reading more messages from streams after they've ended. `None` indicates that
146 // the stream is complete/terminated.
147 return Poll::Ready(None);
148 }
149
150 match self.as_mut().poll_next_inner(cx) {
151 Ok(Poll::Pending) => Poll::Pending,
152 Ok(Poll::Ready(msg)) => {
153 if msg.cmd() == RelayCmd::END {
154 // We return the END cell, and future polls will return `None`.
155 self.ended = true;
156 }
157 Poll::Ready(Some(Ok(msg)))
158 }
159 Err(e) => {
160 // We return the error, and future polls will return `None`.
161 self.ended = true;
162 Poll::Ready(Some(Err(e)))
163 }
164 }
165 }
166}