Skip to main content

tor_proto/client/reactor/circuit/
circhop.rs

1//! Module exposing structures relating to the reactor's view of a circuit's hops.
2
3use super::{CircuitCmd, CloseStreamBehavior};
4use crate::circuit::circhop::{CircHopInbound, CircHopOutbound, HopSettings, SendRelayCell};
5use crate::client::reactor::circuit::path::PathEntry;
6use crate::congestion::CongestionControl;
7use crate::crypto::cell::HopNum;
8use crate::stream::StreamMpscReceiver;
9use crate::stream::cmdcheck::AnyCmdChecker;
10use crate::stream::flow_ctrl::state::StreamRateLimit;
11use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
12use crate::stream::queue::StreamQueueSender;
13use crate::streammap::{self, StreamEntMut, StreamMap};
14use crate::tunnel::TunnelScopedCircId;
15use crate::util::notify::NotifySender;
16use crate::util::tunnel_activity::TunnelActivity;
17use crate::{Error, Result};
18
19use futures::Stream;
20use futures::stream::FuturesUnordered;
21use postage::watch;
22use smallvec::SmallVec;
23use tor_cell::chancell::BoxedCellBody;
24use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
25use tor_cell::relaycell::msg::AnyRelayMsg;
26use tor_cell::relaycell::{
27    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, StreamId,
28    UnparsedRelayMsg,
29};
30use web_time_compat::Instant;
31
32use safelog::sensitive as sv;
33use tor_error::Bug;
34use tracing::instrument;
35
36use std::result::Result as StdResult;
37use std::sync::{Arc, Mutex, MutexGuard};
38use std::task::Poll;
39
40#[cfg(test)]
41use tor_cell::relaycell::msg::SendmeTag;
42
43/// The "usual" number of hops in a [`CircHopList`].
44///
45/// This saves us a heap allocation when the number of hops is less than or equal to this value.
46const NUM_HOPS: usize = 3;
47
48/// Represents the reactor's view of a circuit's hop.
49#[derive(Default)]
50pub(crate) struct CircHopList {
51    /// The list of hops.
52    hops: SmallVec<[CircHop; NUM_HOPS]>,
53}
54
55impl CircHopList {
56    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
57    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
58        self.hops.get(Into::<usize>::into(hopnum))
59    }
60
61    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
62    pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
63        self.hops.get_mut(Into::<usize>::into(hopnum))
64    }
65
66    /// Append the specified hop.
67    pub(crate) fn push(&mut self, hop: CircHop) {
68        self.hops.push(hop);
69    }
70
71    /// Returns `true` if the list contains no [`CircHop`]s.
72    pub(crate) fn is_empty(&self) -> bool {
73        self.hops.is_empty()
74    }
75
76    /// Returns the number of hops in the list.
77    pub(crate) fn len(&self) -> usize {
78        self.hops.len()
79    }
80
81    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
82    ///
83    /// The iterator contains at most one [`CircuitCmd`] for each hop,
84    /// representing the instructions for handling the ready-item, if any,
85    /// of its highest priority stream.
86    ///
87    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
88    /// To avoid contention, never create more than one
89    /// [`ready_streams_iterator`](Self::ready_streams_iterator)
90    /// stream at a time!
91    ///
92    /// This is cancellation-safe.
93    pub(in crate::client::reactor) fn ready_streams_iterator(
94        &self,
95        exclude: Option<HopNum>,
96    ) -> impl Stream<Item = CircuitCmd> + use<> {
97        self.hops
98            .iter()
99            .enumerate()
100            .filter_map(|(i, hop)| {
101                let hop_num = HopNum::from(i as u8);
102
103                if exclude == Some(hop_num) {
104                    // We must skip polling this hop
105                    return None;
106                }
107
108                if !hop.ccontrol().can_send() {
109                    // We can't send anything on this hop that counts towards SENDME windows.
110                    //
111                    // In theory we could send messages that don't count towards
112                    // windows (like `RESOLVE`), and process end-of-stream
113                    // events (to send an `END`), but it's probably not worth
114                    // doing an O(N) iteration over flow-control-ready streams
115                    // to see if that's the case.
116                    //
117                    // This *doesn't* block outgoing flow-control messages (e.g.
118                    // SENDME), which are initiated via the control-message
119                    // channel, handled above.
120                    //
121                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
122                    // congestion control has "bottomed out" might not be so bad, and the
123                    // alternatives have complexity and/or performance costs.
124                    return None;
125                }
126
127                let hop_map = Arc::clone(self.hops[i].stream_map());
128                Some(futures::future::poll_fn(move |cx| {
129                    // Process an outbound message from the first ready stream on
130                    // this hop. The stream map implements round robin scheduling to
131                    // ensure fairness across streams.
132                    // TODO: Consider looping here to process multiple ready
133                    // streams. Need to be careful though to balance that with
134                    // continuing to service incoming and control messages.
135                    let mut hop_map = hop_map.lock().expect("lock poisoned");
136                    let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
137                        // No ready streams for this hop.
138                        return Poll::Pending;
139                    };
140
141                    if msg.is_none() {
142                        return Poll::Ready(CircuitCmd::CloseStream {
143                            hop: hop_num,
144                            sid,
145                            behav: CloseStreamBehavior::default(),
146                            reason: streammap::TerminateReason::StreamTargetClosed,
147                        });
148                    };
149                    let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
150
151                    #[allow(unused)] // unused in non-debug builds
152                    let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
153                        panic!("Stream {sid} disappeared");
154                    };
155
156                    debug_assert!(
157                        s.can_send(&msg),
158                        "Stream {sid} produced a message it can't send: {msg:?}"
159                    );
160
161                    let cell = SendRelayCell {
162                        hop: Some(hop_num),
163                        early: false,
164                        cell: AnyRelayMsgOuter::new(Some(sid), msg),
165                    };
166                    Poll::Ready(CircuitCmd::Send(cell))
167                }))
168            })
169            .collect::<FuturesUnordered<_>>()
170    }
171
172    /// Remove all halfstreams that are expired at `now`.
173    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
174        for hop in self.hops.iter_mut() {
175            hop.stream_map()
176                .lock()
177                .expect("lock poisoned")
178                .remove_expired_halfstreams(now);
179        }
180    }
181
182    /// Returns true if there are any streams on this circuit
183    ///
184    /// Important: this function locks the stream map of its each of the [`CircHop`]s
185    /// in this circuit, so it must **not** be called from any function where the
186    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
187    pub(super) fn has_streams(&self) -> bool {
188        self.hops.iter().any(|hop| {
189            hop.stream_map()
190                .lock()
191                .expect("lock poisoned")
192                .n_open_streams()
193                > 0
194        })
195    }
196
197    /// Return the most active [`TunnelActivity`] for any hop on this `CircHopList`.
198    pub(crate) fn tunnel_activity(&self) -> TunnelActivity {
199        self.hops
200            .iter()
201            .map(|hop| {
202                hop.stream_map()
203                    .lock()
204                    .expect("Poisoned lock")
205                    .tunnel_activity()
206            })
207            .max()
208            .unwrap_or_else(TunnelActivity::never_used)
209    }
210}
211
212/// Represents the reactor's view of a single hop.
213pub(crate) struct CircHop {
214    /// The unique ID of the circuit. Used for logging.
215    unique_id: TunnelScopedCircId,
216    /// Hop number in the path.
217    hop_num: HopNum,
218    /// The inbound state of the hop.
219    ///
220    /// Used for processing cells received from this hop.
221    inbound: CircHopInbound,
222    /// The outbound state of the hop.
223    ///
224    /// Used for preparing cells to send to this hop.
225    outbound: CircHopOutbound,
226}
227
228impl CircHop {
229    /// Create a new hop.
230    pub(crate) fn new(
231        unique_id: TunnelScopedCircId,
232        hop_num: HopNum,
233        settings: &HopSettings,
234    ) -> Self {
235        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
236
237        let ccontrol = Arc::new(Mutex::new(CongestionControl::new(&settings.ccontrol)));
238        let inbound = CircHopInbound::new(RelayCellDecoder::new(relay_format), settings);
239
240        let outbound = CircHopOutbound::new(
241            ccontrol,
242            relay_format,
243            Arc::new(settings.flow_ctrl_params.clone()),
244            settings,
245        );
246
247        CircHop {
248            unique_id,
249            hop_num,
250            inbound,
251            outbound,
252        }
253    }
254
255    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
256    /// `message` to the provided hop.
257    pub(crate) fn begin_stream(
258        &mut self,
259        message: AnyRelayMsg,
260        sender: StreamQueueSender,
261        rx: StreamMpscReceiver<AnyRelayMsg>,
262        rate_limit_updater: watch::Sender<StreamRateLimit>,
263        drain_rate_requester: NotifySender<DrainRateRequest>,
264        cmd_checker: AnyCmdChecker,
265    ) -> Result<(SendRelayCell, StreamId)> {
266        self.outbound.begin_stream(
267            Some(self.hop_num),
268            message,
269            sender,
270            rx,
271            rate_limit_updater,
272            drain_rate_requester,
273            cmd_checker,
274        )
275    }
276
277    /// Close the stream associated with `id` because the stream was
278    /// dropped.
279    ///
280    /// See [`CircHopOutbound::close_stream`].
281    pub(crate) fn close_stream(
282        &mut self,
283        id: StreamId,
284        message: CloseStreamBehavior,
285        why: streammap::TerminateReason,
286        expiry: Instant,
287    ) -> Result<Option<SendRelayCell>> {
288        self.outbound
289            .close_stream(self.unique_id, id, Some(self.hop_num), message, why, expiry)
290    }
291
292    /// Check if we should send an XON message.
293    ///
294    /// If we should, then returns the XON message that should be sent.
295    #[instrument(level = "trace", skip_all)]
296    pub(crate) fn maybe_send_xon(
297        &mut self,
298        rate: XonKbpsEwma,
299        id: StreamId,
300    ) -> Result<Option<Xon>> {
301        self.outbound.maybe_send_xon(rate, id)
302    }
303
304    /// Check if we should send an XOFF message.
305    ///
306    /// If we should, then returns the XOFF message that should be sent.
307    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
308        self.outbound.maybe_send_xoff(id)
309    }
310
311    /// Return the format that is used for relay cells sent to this hop.
312    ///
313    /// For the most part, this format isn't necessary to interact with a CircHop;
314    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
315    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
316        self.outbound.relay_cell_format()
317    }
318
319    /// Delegate to CongestionControl, for testing purposes
320    #[cfg(test)]
321    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
322        self.outbound.send_window_and_expected_tags()
323    }
324
325    /// Return a mutable reference to our CongestionControl object.
326    pub(crate) fn ccontrol(&self) -> MutexGuard<'_, CongestionControl> {
327        self.outbound.ccontrol().lock().expect("poisoned lock")
328    }
329
330    /// Return a reference to our CircHopOutbound object.
331    pub(crate) fn outbound(&self) -> &CircHopOutbound {
332        &self.outbound
333    }
334
335    /// We're about to send `msg`.
336    ///
337    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
338    //
339    // TODO prop340: This should take a cell or similar, not a message.
340    pub(crate) fn about_to_send(&mut self, stream_id: StreamId, msg: &AnyRelayMsg) -> Result<()> {
341        self.outbound.about_to_send(self.unique_id, stream_id, msg)
342    }
343
344    /// Add an entry to this map using the specified StreamId.
345    #[cfg(feature = "hs-service")]
346    pub(crate) fn add_ent_with_id(
347        &self,
348        sink: StreamQueueSender,
349        rx: StreamMpscReceiver<AnyRelayMsg>,
350        rate_limit_updater: watch::Sender<StreamRateLimit>,
351        drain_rate_requester: NotifySender<DrainRateRequest>,
352        stream_id: StreamId,
353        cmd_checker: AnyCmdChecker,
354    ) -> Result<()> {
355        self.outbound.add_ent_with_id(
356            sink,
357            rx,
358            rate_limit_updater,
359            drain_rate_requester,
360            stream_id,
361            cmd_checker,
362        )
363    }
364
365    /// Note that we received an END message (or other message indicating the end of
366    /// the stream) on the stream with `id`.
367    ///
368    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
369    #[cfg(feature = "hs-service")]
370    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
371        self.outbound.ending_msg_received(stream_id)
372    }
373
374    /// Parse a RELAY or RELAY_EARLY cell body.
375    ///
376    /// Requires that the cryptographic checks on the message have already been
377    /// performed
378    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
379        self.inbound.decode(cell)
380    }
381
382    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
383    ///
384    /// Returns back the provided `msg`, if the message is an incoming stream request
385    /// that needs to be handled by the calling code.
386    ///
387    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
388    // back and forth like this.
389    pub(super) fn handle_msg(
390        &self,
391        hop_detail: &PathEntry,
392        cell_counts_toward_windows: bool,
393        streamid: StreamId,
394        msg: UnparsedRelayMsg,
395        now: Instant,
396    ) -> Result<Option<UnparsedRelayMsg>> {
397        let possible_proto_violation_err = |streamid: StreamId| Error::UnknownStream {
398            src: sv(hop_detail.clone()),
399            streamid,
400        };
401
402        self.outbound.handle_msg(
403            possible_proto_violation_err,
404            cell_counts_toward_windows,
405            streamid,
406            msg,
407            now,
408        )
409    }
410
411    /// Get the stream map of this hop.
412    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
413        self.outbound.stream_map()
414    }
415
416    /// Set the stream map of this hop to `map`.
417    ///
418    /// Returns an error if the existing stream map of the hop has any open stream.
419    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
420        self.outbound.set_stream_map(map)
421    }
422
423    /// Decrement the limit of outbound cells that may be sent to this hop; give
424    /// an error if it would reach zero.
425    pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
426        self.outbound.decrement_cell_limit()
427    }
428
429    /// Decrement the limit of inbound cells that may be received from this hop; give
430    /// an error if it would reach zero.
431    pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
432        self.inbound.decrement_cell_limit()
433    }
434}