tor_proto/circuit/cell_sender.rs
1//! Implements an outbound Sink type for cells being sent from a circuit onto a
2//! [channel](crate::channel).
3
4use std::{
5 pin::{Pin, pin},
6 task::{Context, Poll},
7};
8
9use cfg_if::cfg_if;
10use futures::Sink;
11use pin_project::pin_project;
12use tor_rtcompat::DynTimeProvider;
13use tracing::instrument;
14
15use crate::{
16 HopNum,
17 channel::{ChanCellQueueEntry, ChannelSender},
18 congestion::CongestionSignals,
19 util::{SinkExt, sometimes_unbounded_sink::SometimesUnboundedSink},
20};
21
22cfg_if! {
23 if #[cfg(feature="circ-padding")] {
24 use crate::util::sink_blocker::{BooleanPolicy, SinkBlocker};
25 /// Inner type used to implement a [`CircuitCellSender`].
26 ///
27 /// When `circ-padding` feature is enabled, this is a multi-level wrapper around
28 /// a ChanSender:
29 /// - On the outermost layer, there is a [`SinkBlocker`] that we use
30 /// to make this sink behave as if it were full
31 /// when our [circuit padding](crate::client::circuit::padding) code
32 /// tells us to block outbound traffic.
33 /// - Then there is a [`SometimesUnboundedSink`] that we use to queue control messages
34 /// when the target `ChanSender` is full,
35 /// or when we traffic is blocked.
36 /// - Finally, there is the [`ChannelSender`] itself.
37 ///
38 /// NOTE: We once had a second `SinkBlocker` to keep messages from the
39 /// SometimesUnboundedSink from reaching the ChanSender
40 /// when we were blocked on padding.
41 /// We no longer use this SinkBlocker, since we decided in
42 /// our [padding design] that non-data messages
43 /// would never wait for a padding-based block.
44 /// We can reinstate it if we change our mind.
45 ///
46 /// TODO: Ideally, this type would participate in the memory quota system.
47 ///
48 /// TODO: At some point in the future, we might want to add
49 /// an additional _bounded_ [`futures::sink::Buffer`]
50 /// to queue cells before they are put onto the channel,
51 /// or to queue data from loud streams.
52 ///
53 /// [padding design]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/circuit-padding.md
54 type InnerSink = SinkBlocker<
55 SometimesUnbounded, BooleanPolicy,
56 >;
57 /// The type of our `SometimesUnboundedSink`, as instantiated.
58 ///
59 /// We use this to queue control cells.
60 type SometimesUnbounded = SometimesUnboundedSink<
61 ChanCellQueueEntry,
62 // This is what we would reinstate
63 // in order to have control messages blocked by padding frameworks:
64 // SinkBlocker<ChannelSender, CountingPolicy>
65 ChannelSender
66 >;
67 } else {
68 /// Inner type used to implement a [`CircuitCellSender`].
69 ///
70 /// When the `circ-padding` is disabled, this only adds a [`SometimesUnboundedSink`].
71 ///
72 /// TODO: Ideally, this type would participate in the memory quota system.
73 /// TODO: At some point, we might want to add
74 /// an additional _bounded_ [`futures::sink::Buffer`]
75 /// to queue cells before they are put onto the channel.)
76 type InnerSink = SometimesUnboundedSink<ChanCellQueueEntry, ChannelSender>;
77 /// The type of our `SometimesUnboundedSink`, as instantiated.
78 ///
79 /// We use this to queue control cells.
80 type SometimesUnbounded = InnerSink;
81 }
82}
83
84/// A sink that a circuit uses to send cells onto a Channel.
85///
86/// (This is a separate type so we can more easily control access to its internals.)
87///
88/// ### You must poll this type
89///
90/// This type is based on [`SometimesUnboundedSink`].
91/// For queued items to be delivered,
92/// [`SometimesUnboundedSink`] must be polled,
93/// even if you don't have an item to send.
94/// The same rule applies here.
95///
96/// Currently [`Sink::poll_flush`], [`Sink::poll_close`], and [`Sink::poll_ready`]
97/// will all work for this purpose.
98#[pin_project]
99pub(crate) struct CircuitCellSender {
100 /// The actual inner sink on which we'll be sending cells.
101 ///
102 /// See type alias documentation for full details.
103 #[pin]
104 sink: InnerSink,
105}
106
107impl CircuitCellSender {
108 /// Construct a new `CircuitCellSender` to deliver cells onto `inner`.
109 pub(crate) fn from_channel_sender(inner: ChannelSender) -> Self {
110 cfg_if! {
111 if #[cfg(feature="circ-padding")] {
112 let sink = SinkBlocker::new(
113 SometimesUnboundedSink::new(
114 inner
115 ),
116 BooleanPolicy::Unblocked
117 );
118 } else {
119 let sink = SometimesUnboundedSink::new(inner);
120 }
121 }
122
123 Self { sink }
124 }
125
126 /// Return the number of cells queued in this Sender
127 /// that have not yet been flushed onto the channel.
128 pub(crate) fn n_queued(&self) -> usize {
129 self.sometimes_unbounded().n_queued()
130 }
131
132 /// Return true if we have a queued cell for the specified hop or later.
133 #[cfg(feature = "circ-padding")]
134 pub(crate) fn have_queued_cell_for_hop_or_later(&self, hop: HopNum) -> bool {
135 if hop.is_first_hop() && self.chan_sender().approx_count() > 0 {
136 // There's a cell on the outbound channel queue:
137 // That will function perfectly well as padding to the first hop of this circuit,
138 // whether it is actually for this circuit or not.
139 return true;
140 }
141
142 // Now look at our own sometimes_unbounded queue.
143 //
144 // TODO circpad: in theory we could also look at the members of the per-channel queue to find this out!
145 // But that's nontrivial, since the per-channel queue is implemented with an futures mpsc
146 // channel, which doesn't have any functionality to let you inspect its queue.
147 self.sometimes_unbounded()
148 .iter_queue()
149 .any(|(_, info)| info.is_some_and(|inf| inf.target_hop >= hop))
150 }
151
152 /// Send a cell on this sender,
153 /// even if the underlying channel queues are all full.
154 ///
155 /// You must `.await` this, but it will never block.
156 /// (Its future is always `Ready`.)
157 ///
158 /// See note on [`CircuitCellSender`] type about polling:
159 /// If you don't poll this sink, then queued items might never flush.
160 #[instrument(level = "trace", skip_all)]
161 pub(crate) async fn send_unbounded(&mut self, entry: ChanCellQueueEntry) -> crate::Result<()> {
162 Pin::new(self.sometimes_unbounded_mut())
163 .send_unbounded(entry)
164 .await?;
165 self.chan_sender().note_cell_queued();
166 Ok(())
167 }
168
169 /// Return the time provider used by the underlying channel sender
170 /// for memory quota purposes.
171 pub(crate) fn time_provider(&self) -> &DynTimeProvider {
172 self.chan_sender().time_provider()
173 }
174
175 /// Circpadding only: Put this sink into a blocked state.
176 ///
177 /// When we are blocked, attempts to `send()` to this sink will fail.
178 /// You can still queue items with `send_unbounded()`,
179 /// and they will be sent immediately.
180 //
181 // (Previously we would block those items too,
182 // and only allow them to be flushed one by one,
183 // but we changed that behavior so that non-DATA cells can _always_ be sent.)
184 #[cfg(feature = "circ-padding")]
185 pub(crate) fn start_blocking(&mut self) {
186 self.pre_queue_blocker_mut().set_blocked();
187 }
188
189 /// Circpadding only: Put this sink into an unblocked state.
190 #[cfg(feature = "circ-padding")]
191 pub(crate) fn stop_blocking(&mut self) {
192 self.pre_queue_blocker_mut().set_unblocked();
193 }
194
195 /// Note: This is only async because we need a Context to check the underlying sink for readiness.
196 /// This will register a new waker (or overwrite any existing waker).
197 #[instrument(level = "trace", skip_all)]
198 pub(crate) async fn congestion_signals(&mut self) -> CongestionSignals {
199 futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
200 // We're looking at the ChanSender's in order to deliberately ignore the blocked/unblocked
201 // status of this sink.
202 //
203 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3225#note_3252061
204 // for a deeper discussion.
205 let channel_ready = self
206 .chan_sender_mut()
207 .poll_ready_unpin_bool(cx)
208 .unwrap_or(false);
209 Poll::Ready(CongestionSignals::new(
210 /* channel_blocked= */ !channel_ready,
211 self.n_queued(),
212 ))
213 })
214 .await
215 }
216
217 /// Helper: return a reference to the internal [`SometimesUnboundedSink`]
218 /// that this `CircuitCellSender` is based on.
219 fn sometimes_unbounded(&self) -> &SometimesUnbounded {
220 cfg_if! {
221 if #[cfg(feature="circ-padding")] {
222 self.sink.as_inner()
223 } else {
224 &self.sink
225 }
226 }
227 }
228
229 /// Helper: return a mutable reference to the internal [`SometimesUnboundedSink`]
230 /// that this `CircuitCellSender` is based on.
231 fn sometimes_unbounded_mut(&mut self) -> &mut SometimesUnbounded {
232 cfg_if! {
233 if #[cfg(feature="circ-padding")] {
234 self.sink.as_inner_mut()
235 } else {
236 &mut self.sink
237 }
238 }
239 }
240
241 /// Helper: Return a reference to the internal [`ChannelSender`]
242 /// that this `CircuitCellSender` is based on.
243 fn chan_sender(&self) -> &ChannelSender {
244 cfg_if! {
245 if #[cfg(feature="circ-padding")] {
246 self.sink.as_inner().as_inner()
247 } else {
248 self.sink.as_inner()
249 }
250 }
251 }
252
253 /// Helper: Return a mutable reference to the internal [`ChannelSender`]
254 /// that this `CircuitCellSender` is based on.
255 fn chan_sender_mut(&mut self) -> &mut ChannelSender {
256 cfg_if! {
257 if #[cfg(feature="circ-padding")] {
258 self.sink.as_inner_mut().as_inner_mut()
259 } else {
260 self.sink.as_inner_mut()
261 }
262 }
263 }
264
265 /// Helper: Return a mutable reference to our outer [`SinkBlocker`]
266 #[cfg(feature = "circ-padding")]
267 fn pre_queue_blocker_mut(&mut self) -> &mut InnerSink {
268 &mut self.sink
269 }
270}
271
272impl Sink<ChanCellQueueEntry> for CircuitCellSender {
273 type Error = <ChannelSender as Sink<ChanCellQueueEntry>>::Error;
274
275 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276 cfg_if! {
277 if #[cfg(feature = "circ-padding")] {
278 // In this case, our sink is _not_ the same as our SometimesUnboundedSink.
279 // But we need to ensure that SometimesUnboundedMut gets polled
280 // unconditionally, so that it can actually flush its members.
281 //
282 // We don't actually _care_ if it's ready;
283 // we just need to make sure that it gets polled.
284 // See the "You must poll this type" comment on SometimesUnboundedSink.
285 let _ignore = pin!(self.sometimes_unbounded_mut()).poll_ready(cx);
286 }
287 }
288 self.project().sink.poll_ready(cx)
289 }
290
291 fn start_send(mut self: Pin<&mut Self>, item: ChanCellQueueEntry) -> Result<(), Self::Error> {
292 self.as_mut().project().sink.start_send(item)?;
293 self.chan_sender().note_cell_queued();
294 Ok(())
295 }
296
297 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
298 self.project().sink.poll_flush(cx)
299 }
300
301 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
302 self.project().sink.poll_close(cx)
303 }
304}