Skip to main content

tor_proto/circuit/
circhop.rs

1//! Module exposing structures relating to a reactor's view of a circuit hop.
2
3// TODO(relay): don't import from the client module
4use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5
6use crate::ccparams::CongestionControlParams;
7use crate::circuit::CircParameters;
8use crate::congestion::{CongestionControl, sendme};
9use crate::stream::CloseStreamBehavior;
10use crate::stream::SEND_WINDOW_INIT;
11use crate::stream::StreamMpscReceiver;
12use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
13use crate::stream::flow_ctrl::params::FlowCtrlParameters;
14use crate::stream::flow_ctrl::state::{StreamFlowCtrl, StreamRateLimit};
15use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
16use crate::stream::queue::StreamQueueSender;
17use crate::streammap::{
18    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
19};
20use crate::util::notify::NotifySender;
21use crate::{Error, HopNum, Result};
22
23use postage::watch;
24use safelog::sensitive as sv;
25use tracing::{debug, trace};
26
27use tor_cell::chancell::BoxedCellBody;
28use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
29use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
30use tor_cell::relaycell::msg::AnyRelayMsg;
31use tor_cell::relaycell::{
32    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
33    StreamId, UnparsedRelayMsg,
34};
35use tor_error::{Bug, internal};
36use tor_protover::named;
37
38use std::num::NonZeroU32;
39use std::pin::Pin;
40use std::result::Result as StdResult;
41use std::sync::{Arc, Mutex};
42use web_time_compat::Instant;
43
44#[cfg(test)]
45use tor_cell::relaycell::msg::SendmeTag;
46
47use cfg_if::cfg_if;
48
49/// Type of negotiation that we'll be performing as we establish a hop.
50///
51/// Determines what flavor of extensions we can send and receive, which in turn
52/// limits the hop settings we can negotiate.
53///
54// TODO-CGO: This is likely to be refactored when we finally add support for
55// HsV3+CGO, which will require refactoring
56#[derive(Debug, Clone, Copy, Eq, PartialEq)]
57pub(crate) enum HopNegotiationType {
58    /// We're using a handshake in which extension-based negotiation cannot occur.
59    None,
60    /// We're using the HsV3-ntor handshake, in which the client can send extensions,
61    /// but the server cannot.
62    ///
63    /// As a special case, the default relay encryption protocol is the hsv3
64    /// variant of Tor1.
65    //
66    // We would call this "HalfDuplex" or something, but we do not expect to add
67    // any more handshakes of this type.
68    HsV3,
69    /// We're using a handshake in which both client and relay can send extensions.
70    Full,
71}
72
73/// The settings we use for single hop of a circuit.
74///
75/// Unlike [`CircParameters`], this type is crate-internal.
76/// We construct it based on our settings from the circuit,
77/// and from the hop's actual capabilities.
78/// Then, we negotiate with the hop as part of circuit
79/// creation/extension to determine the actual settings that will be in use.
80/// Finally, we use those settings to construct the negotiated circuit hop.
81//
82// TODO: Relays should probably derive an instance of this type too, as
83// part of the circuit creation handshake.
84#[derive(Clone, Debug)]
85pub(crate) struct HopSettings {
86    /// The negotiated congestion control settings for this hop .
87    pub(crate) ccontrol: CongestionControlParams,
88
89    /// Flow control parameters that will be used for streams on this hop.
90    pub(crate) flow_ctrl_params: FlowCtrlParameters,
91
92    /// Maximum number of permitted incoming relay cells for this hop.
93    pub(crate) n_incoming_cells_permitted: Option<u32>,
94
95    /// Maximum number of permitted outgoing relay cells for this hop.
96    pub(crate) n_outgoing_cells_permitted: Option<u32>,
97
98    /// The relay cell encryption algorithm and cell format for this hop.
99    relay_crypt_protocol: RelayCryptLayerProtocol,
100}
101
102impl HopSettings {
103    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
104    /// and `caps` (a set of protocol capabilities for a circuit target).
105    ///
106    /// The resulting settings will represent what the client would prefer to negotiate
107    /// (determined by `params`),
108    /// as modified by what the target relay is believed to support (represented by `caps`).
109    ///
110    /// This represents the `HopSettings` in a pre-negotiation state:
111    /// the circuit negotiation process will modify it.
112    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
113    pub(crate) fn from_params_and_caps(
114        hoptype: HopNegotiationType,
115        params: &CircParameters,
116        caps: &tor_protover::Protocols,
117    ) -> Result<Self> {
118        let mut ccontrol = params.ccontrol.clone();
119        match ccontrol.alg() {
120            crate::ccparams::Algorithm::FixedWindow(_) => {}
121            crate::ccparams::Algorithm::Vegas(_) => {
122                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
123                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
124                    ccontrol.use_fallback_alg();
125                }
126            }
127        };
128        if hoptype == HopNegotiationType::None {
129            ccontrol.use_fallback_alg();
130        } else if hoptype == HopNegotiationType::HsV3 {
131            // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
132            // in this case too.  But since we aren't sending them, we
133            // should use the fallback algorithm.
134            ccontrol.use_fallback_alg();
135        }
136        let ccontrol = ccontrol; // drop mut
137
138        // Negotiate CGO if it is supported, if CC is also supported,
139        // and if CGO is available on this relay.
140        let relay_crypt_protocol = match hoptype {
141            HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
142            HopNegotiationType::HsV3 => {
143                // TODO-CGO: Support CGO when available.
144                cfg_if! {
145                    if #[cfg(feature = "hs-common")] {
146                        RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
147                    } else {
148                        return Err(
149                            tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
150                        );
151                    }
152                }
153            }
154            HopNegotiationType::Full => {
155                cfg_if! {
156                    if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
157                        #[allow(clippy::overly_complex_bool_expr)]
158                        if  ccontrol.alg().compatible_with_cgo()
159                            && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
160                            && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
161                        {
162                            RelayCryptLayerProtocol::Cgo
163                        } else {
164                            RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
165                        }
166                    } else {
167                        RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
168                    }
169                }
170            }
171        };
172
173        Ok(Self {
174            ccontrol,
175            flow_ctrl_params: params.flow_ctrl.clone(),
176            relay_crypt_protocol,
177            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
178            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
179        })
180    }
181
182    /// Return the negotiated relay crypto protocol.
183    pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
184        self.relay_crypt_protocol
185    }
186
187    /// Return the client circuit-creation extensions that we should use in order to negotiate
188    /// these circuit hop parameters.
189    #[allow(clippy::unnecessary_wraps)]
190    pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
191        // allow 'unused_mut' because of the combinations of `cfg` conditions below
192        #[allow(unused_mut)]
193        let mut client_extensions = Vec::new();
194
195        #[allow(unused, unused_mut)]
196        let mut cc_extension_set = false;
197
198        if self.ccontrol.is_enabled() {
199            cfg_if::cfg_if! {
200                if #[cfg(feature = "flowctl-cc")] {
201                    client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
202                    cc_extension_set = true;
203                } else {
204                    return Err(
205                        tor_error::internal!(
206                            "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
207                        )
208                        .into()
209                    );
210                }
211            }
212        }
213
214        // See whether we need to send a list of required protocol capabilities.
215        // These aren't "negotiated" per se; they're simply demanded.
216        // The relay will refuse the circuit if it doesn't support all of them,
217        // and if any of them isn't supported in the SubprotocolRequest extension.
218        //
219        // (In other words, don't add capabilities here just because you want the
220        // relay to have them! They must be explicitly listed as supported for use
221        // with this extension. For the current list, see
222        // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
223        //
224        #[allow(unused_mut)]
225        let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
226
227        #[cfg(feature = "counter-galois-onion")]
228        if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
229            if !cc_extension_set {
230                return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
231            }
232            required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
233        }
234
235        if !required_protocol_capabilities.is_empty() {
236            client_extensions.push(CircRequestExt::SubprotocolRequest(
237                required_protocol_capabilities.into_iter().collect(),
238            ));
239        }
240
241        Ok(client_extensions)
242    }
243}
244
245#[cfg(test)]
246impl std::default::Default for CircParameters {
247    fn default() -> Self {
248        Self {
249            extend_by_ed25519_id: true,
250            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
251            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
252            n_incoming_cells_permitted: None,
253            n_outgoing_cells_permitted: None,
254        }
255    }
256}
257
258impl CircParameters {
259    /// Constructor
260    pub fn new(
261        extend_by_ed25519_id: bool,
262        ccontrol: CongestionControlParams,
263        flow_ctrl: FlowCtrlParameters,
264    ) -> Self {
265        Self {
266            extend_by_ed25519_id,
267            ccontrol,
268            flow_ctrl,
269            n_incoming_cells_permitted: None,
270            n_outgoing_cells_permitted: None,
271        }
272    }
273}
274
275/// Instructions for sending a RELAY cell.
276///
277/// This instructs a circuit reactor to send a RELAY cell to a given target
278/// (a hop, if we are a client, or the client, if we are a relay).
279#[derive(educe::Educe)]
280#[educe(Debug)]
281pub(crate) struct SendRelayCell {
282    /// The hop number, or `None` if we are a relay.
283    pub(crate) hop: Option<HopNum>,
284    /// Whether to use a RELAY_EARLY cell.
285    pub(crate) early: bool,
286    /// The cell to send.
287    pub(crate) cell: AnyRelayMsgOuter,
288}
289
290/// The inbound state of a hop.
291pub(crate) struct CircHopInbound {
292    /// Decodes relay cells received from this hop.
293    decoder: RelayCellDecoder,
294    /// Remaining permitted incoming relay cells from this hop, plus 1.
295    ///
296    /// (In other words, `None` represents no limit,
297    /// `Some(1)` represents an exhausted limit,
298    /// and `Some(n)` means that n-1 more cells may be received.)
299    ///
300    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
301    n_incoming_cells_permitted: Option<NonZeroU32>,
302}
303
304/// The outbound state of a hop.
305pub(crate) struct CircHopOutbound {
306    /// Congestion control object.
307    ///
308    /// This object is also in charge of handling circuit level SENDME logic for this hop.
309    ccontrol: Arc<Mutex<CongestionControl>>,
310    /// Map from stream IDs to streams.
311    ///
312    /// We store this with the reactor instead of the circuit, since the
313    /// reactor needs it for every incoming cell on a stream, whereas
314    /// the circuit only needs it when allocating new streams.
315    ///
316    /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
317    /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
318    /// this wouldn't be possible, because it would mean holding multiple
319    /// mutable references to `self` (the reactor). Note, however,
320    /// that there should never be any contention on this mutex:
321    /// we never create more than one
322    /// `CircHopList::ready_streams_iterator()` stream
323    /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
324    ///
325    /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
326    /// is shared with all the circuits in the tunnel.
327    map: Arc<Mutex<StreamMap>>,
328    /// Format to use for relay cells.
329    //
330    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
331    relay_format: RelayCellFormat,
332    /// Flow control parameters for new streams.
333    flow_ctrl_params: Arc<FlowCtrlParameters>,
334    /// Remaining permitted outgoing relay cells from this hop, plus 1.
335    ///
336    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
337    n_outgoing_cells_permitted: Option<NonZeroU32>,
338}
339
340impl CircHopInbound {
341    /// Create a new [`CircHopInbound`].
342    pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
343        Self {
344            decoder,
345            n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
346        }
347    }
348
349    /// Parse a RELAY or RELAY_EARLY cell body.
350    ///
351    /// Requires that the cryptographic checks on the message have already been
352    /// performed
353    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
354        self.decoder
355            .decode(cell)
356            .map_err(|e| Error::from_bytes_err(e, "relay cell"))
357    }
358
359    /// Decrement the limit of inbound cells that may be received from this hop; give
360    /// an error if it would reach zero.
361    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
362        try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
363            .map_err(|_| Error::ExcessInboundCells)
364    }
365}
366
367impl CircHopOutbound {
368    /// Create a new [`CircHopOutbound`].
369    pub(crate) fn new(
370        ccontrol: Arc<Mutex<CongestionControl>>,
371        relay_format: RelayCellFormat,
372        flow_ctrl_params: Arc<FlowCtrlParameters>,
373        settings: &HopSettings,
374    ) -> Self {
375        Self {
376            ccontrol,
377            map: Arc::new(Mutex::new(StreamMap::new())),
378            relay_format,
379            flow_ctrl_params,
380            n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
381        }
382    }
383
384    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
385    /// `message` to the provided hop.
386    #[allow(clippy::too_many_arguments)]
387    pub(crate) fn begin_stream(
388        &mut self,
389        hop: Option<HopNum>,
390        message: AnyRelayMsg,
391        sender: StreamQueueSender,
392        rx: StreamMpscReceiver<AnyRelayMsg>,
393        rate_limit_updater: watch::Sender<StreamRateLimit>,
394        drain_rate_requester: NotifySender<DrainRateRequest>,
395        cmd_checker: AnyCmdChecker,
396    ) -> Result<(SendRelayCell, StreamId)> {
397        let flow_ctrl = self.build_flow_ctrl(
398            Arc::clone(&self.flow_ctrl_params),
399            rate_limit_updater,
400            drain_rate_requester,
401        )?;
402        let r =
403            self.map
404                .lock()
405                .expect("lock poisoned")
406                .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
407        let cell = AnyRelayMsgOuter::new(Some(r), message);
408        Ok((
409            SendRelayCell {
410                hop,
411                early: false,
412                cell,
413            },
414            r,
415        ))
416    }
417
418    /// Close the stream associated with `id` because the stream was dropped.
419    ///
420    /// If we have not already received an END cell on this stream, send one.
421    /// If no END cell is specified, an END cell with the reason byte set to
422    /// REASON_MISC will be sent.
423    ///
424    // Note(relay): `circ_id` is an opaque displayable type
425    // because relays use a different circuit ID type
426    // than clients. Eventually, we should probably make
427    // them both use the same ID type, or have a nicer approach here
428    pub(crate) fn close_stream(
429        &mut self,
430        circ_id: impl std::fmt::Display,
431        id: StreamId,
432        hop: Option<HopNum>,
433        message: CloseStreamBehavior,
434        why: streammap::TerminateReason,
435        expiry: Instant,
436    ) -> Result<Option<SendRelayCell>> {
437        let should_send_end = self
438            .map
439            .lock()
440            .expect("lock poisoned")
441            .terminate(id, why, expiry)?;
442        trace!(
443            circ_id = %circ_id,
444            stream_id = %id,
445            should_send_end = ?should_send_end,
446            "Ending stream",
447        );
448        // TODO: I am about 80% sure that we only send an END cell if
449        // we didn't already get an END cell.  But I should double-check!
450        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
451            (should_send_end, message)
452        {
453            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
454            let cell = SendRelayCell {
455                hop,
456                early: false,
457                cell: end_cell,
458            };
459
460            return Ok(Some(cell));
461        }
462        Ok(None)
463    }
464
465    /// Check if we should send an XON message.
466    ///
467    /// If we should, then returns the XON message that should be sent.
468    pub(crate) fn maybe_send_xon(
469        &mut self,
470        rate: XonKbpsEwma,
471        id: StreamId,
472    ) -> Result<Option<Xon>> {
473        // the call below will return an error if XON/XOFF aren't supported,
474        // so we check for support here
475        if !self
476            .ccontrol()
477            .lock()
478            .expect("poisoned lock")
479            .uses_xon_xoff()
480        {
481            return Ok(None);
482        }
483
484        let mut map = self.map.lock().expect("lock poisoned");
485        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
486            // stream went away
487            return Ok(None);
488        };
489
490        ent.maybe_send_xon(rate)
491    }
492
493    /// Check if we should send an XOFF message.
494    ///
495    /// If we should, then returns the XOFF message that should be sent.
496    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
497        // the call below will return an error if XON/XOFF aren't supported,
498        // so we check for support here
499        if !self
500            .ccontrol()
501            .lock()
502            .expect("poisoned lock")
503            .uses_xon_xoff()
504        {
505            return Ok(None);
506        }
507
508        let mut map = self.map.lock().expect("lock poisoned");
509        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
510            // stream went away
511            return Ok(None);
512        };
513
514        ent.maybe_send_xoff()
515    }
516
517    /// Return the format that is used for relay cells sent to this hop.
518    ///
519    /// For the most part, this format isn't necessary to interact with a CircHop;
520    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
521    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
522        self.relay_format
523    }
524
525    /// Delegate to CongestionControl, for testing purposes
526    #[cfg(test)]
527    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
528        self.ccontrol()
529            .lock()
530            .expect("poisoned lock")
531            .send_window_and_expected_tags()
532    }
533
534    /// Return the number of open streams on this hop.
535    ///
536    /// WARNING: because this locks the stream map mutex,
537    /// it should never be called from a context where that mutex is already locked.
538    pub(crate) fn n_open_streams(&self) -> usize {
539        self.map.lock().expect("lock poisoned").n_open_streams()
540    }
541
542    /// Return a reference to our CongestionControl object.
543    pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
544        &self.ccontrol
545    }
546
547    /// We're about to send `msg`.
548    ///
549    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
550    //
551    // TODO prop340: This should take a cell or similar, not a message.
552    //
553    // Note(relay): `circ_id` is an opaque displayable type
554    // because relays use a different circuit ID type
555    // than clients. Eventually, we should probably make
556    // them both use the same ID type, or have a nicer approach here
557    pub(crate) fn about_to_send(
558        &mut self,
559        circ_id: impl std::fmt::Display,
560        stream_id: StreamId,
561        msg: &AnyRelayMsg,
562    ) -> Result<()> {
563        let mut hop_map = self.map.lock().expect("lock poisoned");
564        let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
565            // This can happen when we have outgoing data queued when we received an END.
566            // We shouldn't return an error here since it would close the circuit along with all
567            // other streams, and instead we just let the caller send this message anyways.
568            // Also the caller only calls `about_to_send()` for DATA cells,
569            // which means that other non-DATA cells don't hit this code path and are always sent,
570            // and so we should handle all cell types consistently.
571            // TODO: We should drop the message and not send it,
572            // but the caller of `about_to_send()` isn't designed to handle fallible sends
573            // so it would need some refactoring to handle this.
574            debug!(
575                circ_id = %circ_id,
576                stream_id = %stream_id,
577                "sending a relay cell for non-existent or non-open stream!",
578            );
579            return Ok(());
580        };
581
582        ent.about_to_send(msg)
583    }
584
585    /// Add an entry to this map using the specified StreamId.
586    #[cfg(any(feature = "hs-service", feature = "relay"))]
587    pub(crate) fn add_ent_with_id(
588        &self,
589        sink: StreamQueueSender,
590        rx: StreamMpscReceiver<AnyRelayMsg>,
591        rate_limit_updater: watch::Sender<StreamRateLimit>,
592        drain_rate_requester: NotifySender<DrainRateRequest>,
593        stream_id: StreamId,
594        cmd_checker: AnyCmdChecker,
595    ) -> Result<()> {
596        let mut hop_map = self.map.lock().expect("lock poisoned");
597        hop_map.add_ent_with_id(
598            sink,
599            rx,
600            self.build_flow_ctrl(
601                Arc::clone(&self.flow_ctrl_params),
602                rate_limit_updater,
603                drain_rate_requester,
604            )?,
605            stream_id,
606            cmd_checker,
607        )?;
608
609        Ok(())
610    }
611
612    /// Builds the reactor's flow control handler for a new stream.
613    // TODO: remove the `Result` once we remove the "flowctl-cc" feature
614    #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
615    fn build_flow_ctrl(
616        &self,
617        params: Arc<FlowCtrlParameters>,
618        rate_limit_updater: watch::Sender<StreamRateLimit>,
619        drain_rate_requester: NotifySender<DrainRateRequest>,
620    ) -> Result<StreamFlowCtrl> {
621        if self
622            .ccontrol()
623            .lock()
624            .expect("poisoned lock")
625            .uses_stream_sendme()
626        {
627            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
628            Ok(StreamFlowCtrl::new_window(window))
629        } else {
630            cfg_if::cfg_if! {
631                if #[cfg(feature = "flowctl-cc")] {
632                    // TODO: Currently arti only supports clients, and we don't support connecting
633                    // to onion services while using congestion control, so we hardcode this. In the
634                    // future we will need to somehow tell the `CircHop` this so that we can set it
635                    // correctly, since we don't want to enable this at exits.
636                    let use_sidechannel_mitigations = true;
637
638                    Ok(StreamFlowCtrl::new_xon_xoff(
639                        params,
640                        use_sidechannel_mitigations,
641                        rate_limit_updater,
642                        drain_rate_requester,
643                    ))
644                } else {
645                    drop(params);
646                    drop(rate_limit_updater);
647                    drop(drain_rate_requester);
648                    Err(internal!(
649                        "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
650                    ).into())
651                }
652            }
653        }
654    }
655
656    /// Deliver `msg` to the specified open stream entry `ent`.
657    fn deliver_msg_to_stream(
658        streamid: StreamId,
659        ent: &mut OpenStreamEnt,
660        cell_counts_toward_windows: bool,
661        msg: UnparsedRelayMsg,
662    ) -> Result<bool> {
663        use tor_async_utils::SinkTrySend as _;
664        use tor_async_utils::SinkTrySendError as _;
665
666        // The stream for this message exists, and is open.
667
668        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
669        // else we'd never notice them if the stream isn't reading.
670        //
671        // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
672        // possible
673        match msg.cmd() {
674            RelayCmd::SENDME => {
675                ent.put_for_incoming_sendme(msg)?;
676                return Ok(false);
677            }
678            RelayCmd::XON => {
679                ent.handle_incoming_xon(msg)?;
680                return Ok(false);
681            }
682            RelayCmd::XOFF => {
683                ent.handle_incoming_xoff(msg)?;
684                return Ok(false);
685            }
686            _ => {}
687        }
688
689        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
690
691        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
692            if e.is_full() {
693                cfg_if::cfg_if! {
694                    if #[cfg(not(feature = "flowctl-cc"))] {
695                        // If we get here, we either have a logic bug (!), or an attacker
696                        // is sending us more cells than we asked for via congestion control.
697                        return Err(Error::CircProto(format!(
698                            "Stream sink would block; received too many cells on stream ID {}",
699                            sv(streamid),
700                        )));
701                    } else {
702                        return Err(internal!(
703                            "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
704                            sv(streamid),
705                        )
706                        .into());
707                    }
708                }
709            }
710            if e.is_disconnected() && cell_counts_toward_windows {
711                // the other side of the stream has gone away; remember
712                // that we received a cell that we couldn't queue for it.
713                //
714                // Later this value will be recorded in a half-stream.
715                ent.dropped += 1;
716            }
717        }
718
719        Ok(message_closes_stream)
720    }
721
722    /// Note that we received an END message (or other message indicating the end of
723    /// the stream) on the stream with `id`.
724    ///
725    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
726    #[cfg(feature = "hs-service")]
727    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
728        let mut hop_map = self.map.lock().expect("lock poisoned");
729
730        hop_map.ending_msg_received(stream_id)?;
731
732        Ok(())
733    }
734
735    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
736    ///
737    /// Returns back the provided `msg`, if the message is an incoming stream request
738    /// that needs to be handled by the calling code.
739    ///
740    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
741    // back and forth like this.
742    pub(crate) fn handle_msg<F>(
743        &self,
744        possible_proto_violation_err: F,
745        cell_counts_toward_windows: bool,
746        streamid: StreamId,
747        msg: UnparsedRelayMsg,
748        now: Instant,
749    ) -> Result<Option<UnparsedRelayMsg>>
750    where
751        F: FnOnce(StreamId) -> Error,
752    {
753        let mut hop_map = self.map.lock().expect("lock poisoned");
754
755        match hop_map.get_mut(streamid) {
756            Some(StreamEntMut::Open(ent)) => {
757                // Can't have a stream level SENDME when congestion control is enabled.
758                let message_closes_stream =
759                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
760
761                if message_closes_stream {
762                    hop_map.ending_msg_received(streamid)?;
763                }
764            }
765            Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
766                return Err(possible_proto_violation_err(streamid));
767            }
768            #[cfg(feature = "hs-service")]
769            Some(StreamEntMut::EndSent(_))
770                if matches!(
771                    msg.cmd(),
772                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
773                ) =>
774            {
775                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
776                // message, just remove the old stream from the map and stop waiting for a
777                // response
778                hop_map.ending_msg_received(streamid)?;
779                return Ok(Some(msg));
780            }
781            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
782                // We sent an end but maybe the other side hasn't heard.
783
784                match half_stream.handle_msg(msg)? {
785                    StreamStatus::Open => {}
786                    StreamStatus::Closed => {
787                        hop_map.ending_msg_received(streamid)?;
788                    }
789                }
790            }
791            #[cfg(feature = "hs-service")]
792            None if matches!(
793                msg.cmd(),
794                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
795            ) =>
796            {
797                return Ok(Some(msg));
798            }
799            _ => {
800                // No stream wants this message, or ever did.
801                return Err(possible_proto_violation_err(streamid));
802            }
803        }
804
805        Ok(None)
806    }
807
808    /// Get the stream map of this hop.
809    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
810        &self.map
811    }
812
813    /// Set the stream map of this hop to `map`.
814    ///
815    /// Returns an error if the existing stream map of the hop has any open stream.
816    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
817        if self.n_open_streams() != 0 {
818            return Err(internal!("Tried to discard existing open streams?!"));
819        }
820
821        self.map = map;
822
823        Ok(())
824    }
825
826    /// Decrement the limit of outbound cells that may be sent to this hop; give
827    /// an error if it would reach zero.
828    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
829        try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
830            .map_err(|_| Error::ExcessOutboundCells)
831    }
832}
833
834/// If `val` is `Some(1)`, return Err(());
835/// otherwise decrement it (if it is Some) and return Ok(()).
836#[inline]
837fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
838    // This is a bit verbose, but I've confirmed that it optimizes nicely.
839    match val {
840        Some(x) => {
841            let z = u32::from(*x);
842            if z == 1 {
843                Err(())
844            } else {
845                *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
846                Ok(())
847            }
848        }
849        None => Ok(()),
850    }
851}
852
853/// Convert a limit from the form used in a HopSettings to that used here.
854/// (The format we use here is more compact.)
855fn cvt(limit: u32) -> NonZeroU32 {
856    // See "known limitations" comment on n_incoming_cells_permitted.
857    limit
858        .saturating_add(1)
859        .try_into()
860        .expect("Adding one left it as zero?")
861}