Skip to main content

tor_proto/circuit/
cell_sender.rs

1//! Implements an outbound Sink type for cells being sent from a circuit onto a
2//! [channel](crate::channel).
3
4use std::{
5    pin::{Pin, pin},
6    task::{Context, Poll},
7};
8
9use cfg_if::cfg_if;
10use futures::Sink;
11use pin_project::pin_project;
12use tor_rtcompat::DynTimeProvider;
13use tracing::instrument;
14
15use crate::{
16    HopNum,
17    channel::{ChanCellQueueEntry, ChannelSender},
18    congestion::CongestionSignals,
19    util::{SinkExt, sometimes_unbounded_sink::SometimesUnboundedSink},
20};
21
22cfg_if! {
23    if #[cfg(feature="circ-padding")] {
24        use crate::util::sink_blocker::{BooleanPolicy, SinkBlocker};
25        /// Inner type used to implement a [`CircuitCellSender`].
26        ///
27        /// When `circ-padding` feature is enabled, this is a multi-level wrapper around
28        /// a ChanSender:
29        /// - On the outermost layer, there is a [`SinkBlocker`] that we use
30        ///   to make this sink behave as if it were full
31        ///   when our [circuit padding](crate::client::circuit::padding) code
32        ///   tells us to block outbound traffic.
33        /// - Then there is a [`SometimesUnboundedSink`] that we use to queue control messages
34        ///   when the target `ChanSender` is full,
35        ///   or when we traffic is blocked.
36        /// - Finally, there is the [`ChannelSender`] itself.
37        ///
38        /// NOTE: We once had a second `SinkBlocker` to keep messages from the
39        /// SometimesUnboundedSink from reaching the ChanSender
40        /// when we were blocked on padding.
41        /// We no longer use this SinkBlocker, since we decided in
42        /// our [padding design] that non-data messages
43        /// would never wait for a padding-based block.
44        /// We can reinstate it if we change our mind.
45        ///
46        /// TODO: Ideally, this type would participate in the memory quota system.
47        ///
48        /// TODO: At some point in the future, we might want to add
49        /// an additional _bounded_ [`futures::sink::Buffer`]
50        /// to queue cells before they are put onto the channel,
51        /// or to queue data from loud streams.
52        ///
53        /// [padding design]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/circuit-padding.md
54        type InnerSink = SinkBlocker<
55            SometimesUnbounded, BooleanPolicy,
56        >;
57        /// The type of our `SometimesUnboundedSink`, as instantiated.
58        ///
59        /// We use this to queue control cells.
60        type SometimesUnbounded = SometimesUnboundedSink<
61            ChanCellQueueEntry,
62            // This is what we would reinstate
63            // in order to have control messages blocked by padding frameworks:
64            //      SinkBlocker<ChannelSender, CountingPolicy>
65            ChannelSender
66        >;
67    } else {
68        /// Inner type used to implement a [`CircuitCellSender`].
69        ///
70        /// When the `circ-padding` is disabled, this only adds a [`SometimesUnboundedSink`].
71        ///
72        /// TODO: Ideally, this type would participate in the memory quota system.
73        /// TODO: At some point, we might want to add
74        /// an additional _bounded_ [`futures::sink::Buffer`]
75        /// to queue cells before they are put onto the channel.)
76        type InnerSink = SometimesUnboundedSink<ChanCellQueueEntry, ChannelSender>;
77        /// The type of our `SometimesUnboundedSink`, as instantiated.
78        ///
79        /// We use this to queue control cells.
80        type SometimesUnbounded = InnerSink;
81    }
82}
83
84/// A sink that a circuit uses to send cells onto a Channel.
85///
86/// (This is a separate type so we can more easily control access to its internals.)
87///
88/// ### You must poll this type
89///
90/// This type is based on [`SometimesUnboundedSink`].
91/// For queued items to be delivered,
92/// [`SometimesUnboundedSink`] must be polled,
93/// even if you don't have an item to send.
94/// The same rule applies here.
95///
96/// Currently [`Sink::poll_flush`], [`Sink::poll_close`], and [`Sink::poll_ready`]
97/// will all work for this purpose.
98#[pin_project]
99pub(crate) struct CircuitCellSender {
100    /// The actual inner sink on which we'll be sending cells.
101    ///
102    /// See type alias documentation for full details.
103    #[pin]
104    sink: InnerSink,
105}
106
107impl CircuitCellSender {
108    /// Construct a new `CircuitCellSender` to deliver cells onto `inner`.
109    pub(crate) fn from_channel_sender(inner: ChannelSender) -> Self {
110        cfg_if! {
111            if #[cfg(feature="circ-padding")] {
112                let sink = SinkBlocker::new(
113                    SometimesUnboundedSink::new(
114                        inner
115                    ),
116                    BooleanPolicy::Unblocked
117                );
118            } else {
119                let sink = SometimesUnboundedSink::new(inner);
120            }
121        }
122
123        Self { sink }
124    }
125
126    /// Return the number of cells queued in this Sender
127    /// that have not yet been flushed onto the channel.
128    pub(crate) fn n_queued(&self) -> usize {
129        self.sometimes_unbounded().n_queued()
130    }
131
132    /// Return true if we have a queued cell for the specified hop or later.
133    #[cfg(feature = "circ-padding")]
134    pub(crate) fn have_queued_cell_for_hop_or_later(&self, hop: HopNum) -> bool {
135        if hop.is_first_hop() && self.chan_sender().approx_count() > 0 {
136            // There's a cell on the outbound channel queue:
137            // That will function perfectly well as padding to the first hop of this circuit,
138            // whether it is actually for this circuit or not.
139            return true;
140        }
141
142        // Now look at our own sometimes_unbounded queue.
143        //
144        // TODO circpad: in theory we could also look at the members of the per-channel queue to find this out!
145        // But that's nontrivial, since the per-channel queue is implemented with an futures mpsc
146        // channel, which doesn't have any functionality to let you inspect its queue.
147        self.sometimes_unbounded()
148            .iter_queue()
149            .any(|(_, info)| info.is_some_and(|inf| inf.target_hop >= hop))
150    }
151
152    /// Send a cell on this sender,
153    /// even if the  underlying channel queues are all full.
154    ///
155    /// You must `.await` this, but it will never block.
156    /// (Its future is always `Ready`.)
157    ///
158    /// See note on [`CircuitCellSender`] type about polling:
159    /// If you don't poll this sink, then queued items might never flush.
160    #[instrument(level = "trace", skip_all)]
161    pub(crate) async fn send_unbounded(&mut self, entry: ChanCellQueueEntry) -> crate::Result<()> {
162        Pin::new(self.sometimes_unbounded_mut())
163            .send_unbounded(entry)
164            .await?;
165        self.chan_sender().note_cell_queued();
166        Ok(())
167    }
168
169    /// Return the time provider used by the underlying channel sender
170    /// for memory quota purposes.
171    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
172        self.chan_sender().time_provider()
173    }
174
175    /// Circpadding only: Put this sink into a blocked state.
176    ///
177    /// When we are blocked, attempts to `send()` to this sink will fail.
178    /// You can still queue items with `send_unbounded()`,
179    /// and they will be sent immediately.
180    //
181    // (Previously we would block those items too,
182    // and only allow them to be flushed one by one,
183    // but we changed that behavior so that non-DATA cells can _always_ be sent.)
184    #[cfg(feature = "circ-padding")]
185    pub(crate) fn start_blocking(&mut self) {
186        self.pre_queue_blocker_mut().set_blocked();
187    }
188
189    /// Circpadding only: Put this sink into an unblocked state.
190    #[cfg(feature = "circ-padding")]
191    pub(crate) fn stop_blocking(&mut self) {
192        self.pre_queue_blocker_mut().set_unblocked();
193    }
194
195    /// Note: This is only async because we need a Context to check the underlying sink for readiness.
196    /// This will register a new waker (or overwrite any existing waker).
197    #[instrument(level = "trace", skip_all)]
198    pub(crate) async fn congestion_signals(&mut self) -> CongestionSignals {
199        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
200            // We're looking at the ChanSender's in order to deliberately ignore the blocked/unblocked
201            // status of this sink.
202            //
203            // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3225#note_3252061
204            // for a deeper discussion.
205            let channel_ready = self
206                .chan_sender_mut()
207                .poll_ready_unpin_bool(cx)
208                .unwrap_or(false);
209            Poll::Ready(CongestionSignals::new(
210                /* channel_blocked= */ !channel_ready,
211                self.n_queued(),
212            ))
213        })
214        .await
215    }
216
217    /// Helper: return a reference to the internal [`SometimesUnboundedSink`]
218    /// that this `CircuitCellSender` is based on.
219    fn sometimes_unbounded(&self) -> &SometimesUnbounded {
220        cfg_if! {
221            if #[cfg(feature="circ-padding")] {
222                self.sink.as_inner()
223            } else {
224                &self.sink
225            }
226        }
227    }
228
229    /// Helper: return a mutable reference to the internal [`SometimesUnboundedSink`]
230    /// that this `CircuitCellSender` is based on.
231    fn sometimes_unbounded_mut(&mut self) -> &mut SometimesUnbounded {
232        cfg_if! {
233            if #[cfg(feature="circ-padding")] {
234                self.sink.as_inner_mut()
235            } else {
236                &mut self.sink
237            }
238        }
239    }
240
241    /// Helper: Return a reference to the internal [`ChannelSender`]
242    /// that this `CircuitCellSender` is based on.
243    fn chan_sender(&self) -> &ChannelSender {
244        cfg_if! {
245            if #[cfg(feature="circ-padding")] {
246                self.sink.as_inner().as_inner()
247            } else {
248                self.sink.as_inner()
249            }
250        }
251    }
252
253    /// Helper: Return a mutable reference to the internal [`ChannelSender`]
254    /// that this `CircuitCellSender` is based on.
255    fn chan_sender_mut(&mut self) -> &mut ChannelSender {
256        cfg_if! {
257            if #[cfg(feature="circ-padding")] {
258                self.sink.as_inner_mut().as_inner_mut()
259            } else {
260                self.sink.as_inner_mut()
261            }
262        }
263    }
264
265    /// Helper: Return a mutable reference to our outer [`SinkBlocker`]
266    #[cfg(feature = "circ-padding")]
267    fn pre_queue_blocker_mut(&mut self) -> &mut InnerSink {
268        &mut self.sink
269    }
270}
271
272impl Sink<ChanCellQueueEntry> for CircuitCellSender {
273    type Error = <ChannelSender as Sink<ChanCellQueueEntry>>::Error;
274
275    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276        cfg_if! {
277            if #[cfg(feature = "circ-padding")] {
278                // In this case, our sink is _not_ the same as our SometimesUnboundedSink.
279                // But we need to ensure that SometimesUnboundedMut gets polled
280                // unconditionally, so that it can actually flush its members.
281                //
282                // We don't actually _care_ if it's ready;
283                // we just need to make sure that it gets polled.
284                // See the "You must poll this type" comment on SometimesUnboundedSink.
285                let _ignore = pin!(self.sometimes_unbounded_mut()).poll_ready(cx);
286            }
287        }
288        self.project().sink.poll_ready(cx)
289    }
290
291    fn start_send(mut self: Pin<&mut Self>, item: ChanCellQueueEntry) -> Result<(), Self::Error> {
292        self.as_mut().project().sink.start_send(item)?;
293        self.chan_sender().note_cell_queued();
294        Ok(())
295    }
296
297    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
298        self.project().sink.poll_flush(cx)
299    }
300
301    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
302        self.project().sink.poll_close(cx)
303    }
304}