tor_proto/circuit/circhop.rs
1//! Module exposing structures relating to a reactor's view of a circuit hop.
2
3// TODO(relay): don't import from the client module
4use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5
6use crate::ccparams::CongestionControlParams;
7use crate::circuit::CircParameters;
8use crate::congestion::{CongestionControl, sendme};
9use crate::stream::CloseStreamBehavior;
10use crate::stream::SEND_WINDOW_INIT;
11use crate::stream::StreamMpscReceiver;
12use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
13use crate::stream::flow_ctrl::params::FlowCtrlParameters;
14use crate::stream::flow_ctrl::state::{StreamFlowCtrl, StreamRateLimit};
15use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
16use crate::stream::queue::StreamQueueSender;
17use crate::streammap::{
18 self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
19};
20use crate::util::notify::NotifySender;
21use crate::{Error, HopNum, Result};
22
23use postage::watch;
24use safelog::sensitive as sv;
25use tracing::{debug, trace};
26
27use tor_cell::chancell::BoxedCellBody;
28use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
29use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
30use tor_cell::relaycell::msg::AnyRelayMsg;
31use tor_cell::relaycell::{
32 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
33 StreamId, UnparsedRelayMsg,
34};
35use tor_error::{Bug, internal};
36use tor_protover::named;
37
38use std::num::NonZeroU32;
39use std::pin::Pin;
40use std::result::Result as StdResult;
41use std::sync::{Arc, Mutex};
42use web_time_compat::Instant;
43
44#[cfg(test)]
45use tor_cell::relaycell::msg::SendmeTag;
46
47use cfg_if::cfg_if;
48
49/// Type of negotiation that we'll be performing as we establish a hop.
50///
51/// Determines what flavor of extensions we can send and receive, which in turn
52/// limits the hop settings we can negotiate.
53///
54// TODO-CGO: This is likely to be refactored when we finally add support for
55// HsV3+CGO, which will require refactoring
56#[derive(Debug, Clone, Copy, Eq, PartialEq)]
57pub(crate) enum HopNegotiationType {
58 /// We're using a handshake in which extension-based negotiation cannot occur.
59 None,
60 /// We're using the HsV3-ntor handshake, in which the client can send extensions,
61 /// but the server cannot.
62 ///
63 /// As a special case, the default relay encryption protocol is the hsv3
64 /// variant of Tor1.
65 //
66 // We would call this "HalfDuplex" or something, but we do not expect to add
67 // any more handshakes of this type.
68 HsV3,
69 /// We're using a handshake in which both client and relay can send extensions.
70 Full,
71}
72
73/// The settings we use for single hop of a circuit.
74///
75/// Unlike [`CircParameters`], this type is crate-internal.
76/// We construct it based on our settings from the circuit,
77/// and from the hop's actual capabilities.
78/// Then, we negotiate with the hop as part of circuit
79/// creation/extension to determine the actual settings that will be in use.
80/// Finally, we use those settings to construct the negotiated circuit hop.
81//
82// TODO: Relays should probably derive an instance of this type too, as
83// part of the circuit creation handshake.
84#[derive(Clone, Debug)]
85pub(crate) struct HopSettings {
86 /// The negotiated congestion control settings for this hop .
87 pub(crate) ccontrol: CongestionControlParams,
88
89 /// Flow control parameters that will be used for streams on this hop.
90 pub(crate) flow_ctrl_params: FlowCtrlParameters,
91
92 /// Maximum number of permitted incoming relay cells for this hop.
93 pub(crate) n_incoming_cells_permitted: Option<u32>,
94
95 /// Maximum number of permitted outgoing relay cells for this hop.
96 pub(crate) n_outgoing_cells_permitted: Option<u32>,
97
98 /// The relay cell encryption algorithm and cell format for this hop.
99 relay_crypt_protocol: RelayCryptLayerProtocol,
100}
101
102impl HopSettings {
103 /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
104 /// and `caps` (a set of protocol capabilities for a circuit target).
105 ///
106 /// The resulting settings will represent what the client would prefer to negotiate
107 /// (determined by `params`),
108 /// as modified by what the target relay is believed to support (represented by `caps`).
109 ///
110 /// This represents the `HopSettings` in a pre-negotiation state:
111 /// the circuit negotiation process will modify it.
112 #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
113 pub(crate) fn from_params_and_caps(
114 hoptype: HopNegotiationType,
115 params: &CircParameters,
116 caps: &tor_protover::Protocols,
117 ) -> Result<Self> {
118 let mut ccontrol = params.ccontrol.clone();
119 match ccontrol.alg() {
120 crate::ccparams::Algorithm::FixedWindow(_) => {}
121 crate::ccparams::Algorithm::Vegas(_) => {
122 // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
123 if !caps.supports_named_subver(named::FLOWCTRL_CC) {
124 ccontrol.use_fallback_alg();
125 }
126 }
127 };
128 if hoptype == HopNegotiationType::None {
129 ccontrol.use_fallback_alg();
130 } else if hoptype == HopNegotiationType::HsV3 {
131 // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
132 // in this case too. But since we aren't sending them, we
133 // should use the fallback algorithm.
134 ccontrol.use_fallback_alg();
135 }
136 let ccontrol = ccontrol; // drop mut
137
138 // Negotiate CGO if it is supported, if CC is also supported,
139 // and if CGO is available on this relay.
140 let relay_crypt_protocol = match hoptype {
141 HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
142 HopNegotiationType::HsV3 => {
143 // TODO-CGO: Support CGO when available.
144 cfg_if! {
145 if #[cfg(feature = "hs-common")] {
146 RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
147 } else {
148 return Err(
149 tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
150 );
151 }
152 }
153 }
154 HopNegotiationType::Full => {
155 cfg_if! {
156 if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
157 #[allow(clippy::overly_complex_bool_expr)]
158 if ccontrol.alg().compatible_with_cgo()
159 && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
160 && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
161 {
162 RelayCryptLayerProtocol::Cgo
163 } else {
164 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
165 }
166 } else {
167 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
168 }
169 }
170 }
171 };
172
173 Ok(Self {
174 ccontrol,
175 flow_ctrl_params: params.flow_ctrl.clone(),
176 relay_crypt_protocol,
177 n_incoming_cells_permitted: params.n_incoming_cells_permitted,
178 n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
179 })
180 }
181
182 /// Return the negotiated relay crypto protocol.
183 pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
184 self.relay_crypt_protocol
185 }
186
187 /// Return the client circuit-creation extensions that we should use in order to negotiate
188 /// these circuit hop parameters.
189 #[allow(clippy::unnecessary_wraps)]
190 pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
191 // allow 'unused_mut' because of the combinations of `cfg` conditions below
192 #[allow(unused_mut)]
193 let mut client_extensions = Vec::new();
194
195 #[allow(unused, unused_mut)]
196 let mut cc_extension_set = false;
197
198 if self.ccontrol.is_enabled() {
199 cfg_if::cfg_if! {
200 if #[cfg(feature = "flowctl-cc")] {
201 client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
202 cc_extension_set = true;
203 } else {
204 return Err(
205 tor_error::internal!(
206 "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
207 )
208 .into()
209 );
210 }
211 }
212 }
213
214 // See whether we need to send a list of required protocol capabilities.
215 // These aren't "negotiated" per se; they're simply demanded.
216 // The relay will refuse the circuit if it doesn't support all of them,
217 // and if any of them isn't supported in the SubprotocolRequest extension.
218 //
219 // (In other words, don't add capabilities here just because you want the
220 // relay to have them! They must be explicitly listed as supported for use
221 // with this extension. For the current list, see
222 // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
223 //
224 #[allow(unused_mut)]
225 let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
226
227 #[cfg(feature = "counter-galois-onion")]
228 if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
229 if !cc_extension_set {
230 return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
231 }
232 required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
233 }
234
235 if !required_protocol_capabilities.is_empty() {
236 client_extensions.push(CircRequestExt::SubprotocolRequest(
237 required_protocol_capabilities.into_iter().collect(),
238 ));
239 }
240
241 Ok(client_extensions)
242 }
243}
244
245#[cfg(test)]
246impl std::default::Default for CircParameters {
247 fn default() -> Self {
248 Self {
249 extend_by_ed25519_id: true,
250 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
251 flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
252 n_incoming_cells_permitted: None,
253 n_outgoing_cells_permitted: None,
254 }
255 }
256}
257
258impl CircParameters {
259 /// Constructor
260 pub fn new(
261 extend_by_ed25519_id: bool,
262 ccontrol: CongestionControlParams,
263 flow_ctrl: FlowCtrlParameters,
264 ) -> Self {
265 Self {
266 extend_by_ed25519_id,
267 ccontrol,
268 flow_ctrl,
269 n_incoming_cells_permitted: None,
270 n_outgoing_cells_permitted: None,
271 }
272 }
273}
274
275/// Instructions for sending a RELAY cell.
276///
277/// This instructs a circuit reactor to send a RELAY cell to a given target
278/// (a hop, if we are a client, or the client, if we are a relay).
279#[derive(educe::Educe)]
280#[educe(Debug)]
281pub(crate) struct SendRelayCell {
282 /// The hop number, or `None` if we are a relay.
283 pub(crate) hop: Option<HopNum>,
284 /// Whether to use a RELAY_EARLY cell.
285 pub(crate) early: bool,
286 /// The cell to send.
287 pub(crate) cell: AnyRelayMsgOuter,
288}
289
290/// The inbound state of a hop.
291pub(crate) struct CircHopInbound {
292 /// Decodes relay cells received from this hop.
293 decoder: RelayCellDecoder,
294 /// Remaining permitted incoming relay cells from this hop, plus 1.
295 ///
296 /// (In other words, `None` represents no limit,
297 /// `Some(1)` represents an exhausted limit,
298 /// and `Some(n)` means that n-1 more cells may be received.)
299 ///
300 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
301 n_incoming_cells_permitted: Option<NonZeroU32>,
302}
303
304/// The outbound state of a hop.
305pub(crate) struct CircHopOutbound {
306 /// Congestion control object.
307 ///
308 /// This object is also in charge of handling circuit level SENDME logic for this hop.
309 ccontrol: Arc<Mutex<CongestionControl>>,
310 /// Map from stream IDs to streams.
311 ///
312 /// We store this with the reactor instead of the circuit, since the
313 /// reactor needs it for every incoming cell on a stream, whereas
314 /// the circuit only needs it when allocating new streams.
315 ///
316 /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
317 /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
318 /// this wouldn't be possible, because it would mean holding multiple
319 /// mutable references to `self` (the reactor). Note, however,
320 /// that there should never be any contention on this mutex:
321 /// we never create more than one
322 /// `CircHopList::ready_streams_iterator()` stream
323 /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
324 ///
325 /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
326 /// is shared with all the circuits in the tunnel.
327 map: Arc<Mutex<StreamMap>>,
328 /// Format to use for relay cells.
329 //
330 // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
331 relay_format: RelayCellFormat,
332 /// Flow control parameters for new streams.
333 flow_ctrl_params: Arc<FlowCtrlParameters>,
334 /// Remaining permitted outgoing relay cells from this hop, plus 1.
335 ///
336 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
337 n_outgoing_cells_permitted: Option<NonZeroU32>,
338}
339
340impl CircHopInbound {
341 /// Create a new [`CircHopInbound`].
342 pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
343 Self {
344 decoder,
345 n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
346 }
347 }
348
349 /// Parse a RELAY or RELAY_EARLY cell body.
350 ///
351 /// Requires that the cryptographic checks on the message have already been
352 /// performed
353 pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
354 self.decoder
355 .decode(cell)
356 .map_err(|e| Error::from_bytes_err(e, "relay cell"))
357 }
358
359 /// Decrement the limit of inbound cells that may be received from this hop; give
360 /// an error if it would reach zero.
361 pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
362 try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
363 .map_err(|_| Error::ExcessInboundCells)
364 }
365}
366
367impl CircHopOutbound {
368 /// Create a new [`CircHopOutbound`].
369 pub(crate) fn new(
370 ccontrol: Arc<Mutex<CongestionControl>>,
371 relay_format: RelayCellFormat,
372 flow_ctrl_params: Arc<FlowCtrlParameters>,
373 settings: &HopSettings,
374 ) -> Self {
375 Self {
376 ccontrol,
377 map: Arc::new(Mutex::new(StreamMap::new())),
378 relay_format,
379 flow_ctrl_params,
380 n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
381 }
382 }
383
384 /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
385 /// `message` to the provided hop.
386 #[allow(clippy::too_many_arguments)]
387 pub(crate) fn begin_stream(
388 &mut self,
389 hop: Option<HopNum>,
390 message: AnyRelayMsg,
391 sender: StreamQueueSender,
392 rx: StreamMpscReceiver<AnyRelayMsg>,
393 rate_limit_updater: watch::Sender<StreamRateLimit>,
394 drain_rate_requester: NotifySender<DrainRateRequest>,
395 cmd_checker: AnyCmdChecker,
396 ) -> Result<(SendRelayCell, StreamId)> {
397 let flow_ctrl = self.build_flow_ctrl(
398 Arc::clone(&self.flow_ctrl_params),
399 rate_limit_updater,
400 drain_rate_requester,
401 )?;
402 let r =
403 self.map
404 .lock()
405 .expect("lock poisoned")
406 .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
407 let cell = AnyRelayMsgOuter::new(Some(r), message);
408 Ok((
409 SendRelayCell {
410 hop,
411 early: false,
412 cell,
413 },
414 r,
415 ))
416 }
417
418 /// Close the stream associated with `id` because the stream was dropped.
419 ///
420 /// If we have not already received an END cell on this stream, send one.
421 /// If no END cell is specified, an END cell with the reason byte set to
422 /// REASON_MISC will be sent.
423 ///
424 // Note(relay): `circ_id` is an opaque displayable type
425 // because relays use a different circuit ID type
426 // than clients. Eventually, we should probably make
427 // them both use the same ID type, or have a nicer approach here
428 pub(crate) fn close_stream(
429 &mut self,
430 circ_id: impl std::fmt::Display,
431 id: StreamId,
432 hop: Option<HopNum>,
433 message: CloseStreamBehavior,
434 why: streammap::TerminateReason,
435 expiry: Instant,
436 ) -> Result<Option<SendRelayCell>> {
437 let should_send_end = self
438 .map
439 .lock()
440 .expect("lock poisoned")
441 .terminate(id, why, expiry)?;
442 trace!(
443 circ_id = %circ_id,
444 stream_id = %id,
445 should_send_end = ?should_send_end,
446 "Ending stream",
447 );
448 // TODO: I am about 80% sure that we only send an END cell if
449 // we didn't already get an END cell. But I should double-check!
450 if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
451 (should_send_end, message)
452 {
453 let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
454 let cell = SendRelayCell {
455 hop,
456 early: false,
457 cell: end_cell,
458 };
459
460 return Ok(Some(cell));
461 }
462 Ok(None)
463 }
464
465 /// Check if we should send an XON message.
466 ///
467 /// If we should, then returns the XON message that should be sent.
468 pub(crate) fn maybe_send_xon(
469 &mut self,
470 rate: XonKbpsEwma,
471 id: StreamId,
472 ) -> Result<Option<Xon>> {
473 // the call below will return an error if XON/XOFF aren't supported,
474 // so we check for support here
475 if !self
476 .ccontrol()
477 .lock()
478 .expect("poisoned lock")
479 .uses_xon_xoff()
480 {
481 return Ok(None);
482 }
483
484 let mut map = self.map.lock().expect("lock poisoned");
485 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
486 // stream went away
487 return Ok(None);
488 };
489
490 ent.maybe_send_xon(rate)
491 }
492
493 /// Check if we should send an XOFF message.
494 ///
495 /// If we should, then returns the XOFF message that should be sent.
496 pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
497 // the call below will return an error if XON/XOFF aren't supported,
498 // so we check for support here
499 if !self
500 .ccontrol()
501 .lock()
502 .expect("poisoned lock")
503 .uses_xon_xoff()
504 {
505 return Ok(None);
506 }
507
508 let mut map = self.map.lock().expect("lock poisoned");
509 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
510 // stream went away
511 return Ok(None);
512 };
513
514 ent.maybe_send_xoff()
515 }
516
517 /// Return the format that is used for relay cells sent to this hop.
518 ///
519 /// For the most part, this format isn't necessary to interact with a CircHop;
520 /// it becomes relevant when we are deciding _what_ we can encode for the hop.
521 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
522 self.relay_format
523 }
524
525 /// Delegate to CongestionControl, for testing purposes
526 #[cfg(test)]
527 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
528 self.ccontrol()
529 .lock()
530 .expect("poisoned lock")
531 .send_window_and_expected_tags()
532 }
533
534 /// Return the number of open streams on this hop.
535 ///
536 /// WARNING: because this locks the stream map mutex,
537 /// it should never be called from a context where that mutex is already locked.
538 pub(crate) fn n_open_streams(&self) -> usize {
539 self.map.lock().expect("lock poisoned").n_open_streams()
540 }
541
542 /// Return a reference to our CongestionControl object.
543 pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
544 &self.ccontrol
545 }
546
547 /// We're about to send `msg`.
548 ///
549 /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
550 //
551 // TODO prop340: This should take a cell or similar, not a message.
552 //
553 // Note(relay): `circ_id` is an opaque displayable type
554 // because relays use a different circuit ID type
555 // than clients. Eventually, we should probably make
556 // them both use the same ID type, or have a nicer approach here
557 pub(crate) fn about_to_send(
558 &mut self,
559 circ_id: impl std::fmt::Display,
560 stream_id: StreamId,
561 msg: &AnyRelayMsg,
562 ) -> Result<()> {
563 let mut hop_map = self.map.lock().expect("lock poisoned");
564 let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
565 // This can happen when we have outgoing data queued when we received an END.
566 // We shouldn't return an error here since it would close the circuit along with all
567 // other streams, and instead we just let the caller send this message anyways.
568 // Also the caller only calls `about_to_send()` for DATA cells,
569 // which means that other non-DATA cells don't hit this code path and are always sent,
570 // and so we should handle all cell types consistently.
571 // TODO: We should drop the message and not send it,
572 // but the caller of `about_to_send()` isn't designed to handle fallible sends
573 // so it would need some refactoring to handle this.
574 debug!(
575 circ_id = %circ_id,
576 stream_id = %stream_id,
577 "sending a relay cell for non-existent or non-open stream!",
578 );
579 return Ok(());
580 };
581
582 ent.about_to_send(msg)
583 }
584
585 /// Add an entry to this map using the specified StreamId.
586 #[cfg(any(feature = "hs-service", feature = "relay"))]
587 pub(crate) fn add_ent_with_id(
588 &self,
589 sink: StreamQueueSender,
590 rx: StreamMpscReceiver<AnyRelayMsg>,
591 rate_limit_updater: watch::Sender<StreamRateLimit>,
592 drain_rate_requester: NotifySender<DrainRateRequest>,
593 stream_id: StreamId,
594 cmd_checker: AnyCmdChecker,
595 ) -> Result<()> {
596 let mut hop_map = self.map.lock().expect("lock poisoned");
597 hop_map.add_ent_with_id(
598 sink,
599 rx,
600 self.build_flow_ctrl(
601 Arc::clone(&self.flow_ctrl_params),
602 rate_limit_updater,
603 drain_rate_requester,
604 )?,
605 stream_id,
606 cmd_checker,
607 )?;
608
609 Ok(())
610 }
611
612 /// Builds the reactor's flow control handler for a new stream.
613 // TODO: remove the `Result` once we remove the "flowctl-cc" feature
614 #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
615 fn build_flow_ctrl(
616 &self,
617 params: Arc<FlowCtrlParameters>,
618 rate_limit_updater: watch::Sender<StreamRateLimit>,
619 drain_rate_requester: NotifySender<DrainRateRequest>,
620 ) -> Result<StreamFlowCtrl> {
621 if self
622 .ccontrol()
623 .lock()
624 .expect("poisoned lock")
625 .uses_stream_sendme()
626 {
627 let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
628 Ok(StreamFlowCtrl::new_window(window))
629 } else {
630 cfg_if::cfg_if! {
631 if #[cfg(feature = "flowctl-cc")] {
632 // TODO: Currently arti only supports clients, and we don't support connecting
633 // to onion services while using congestion control, so we hardcode this. In the
634 // future we will need to somehow tell the `CircHop` this so that we can set it
635 // correctly, since we don't want to enable this at exits.
636 let use_sidechannel_mitigations = true;
637
638 Ok(StreamFlowCtrl::new_xon_xoff(
639 params,
640 use_sidechannel_mitigations,
641 rate_limit_updater,
642 drain_rate_requester,
643 ))
644 } else {
645 drop(params);
646 drop(rate_limit_updater);
647 drop(drain_rate_requester);
648 Err(internal!(
649 "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
650 ).into())
651 }
652 }
653 }
654 }
655
656 /// Deliver `msg` to the specified open stream entry `ent`.
657 fn deliver_msg_to_stream(
658 streamid: StreamId,
659 ent: &mut OpenStreamEnt,
660 cell_counts_toward_windows: bool,
661 msg: UnparsedRelayMsg,
662 ) -> Result<bool> {
663 use tor_async_utils::SinkTrySend as _;
664 use tor_async_utils::SinkTrySendError as _;
665
666 // The stream for this message exists, and is open.
667
668 // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
669 // else we'd never notice them if the stream isn't reading.
670 //
671 // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
672 // possible
673 match msg.cmd() {
674 RelayCmd::SENDME => {
675 ent.put_for_incoming_sendme(msg)?;
676 return Ok(false);
677 }
678 RelayCmd::XON => {
679 ent.handle_incoming_xon(msg)?;
680 return Ok(false);
681 }
682 RelayCmd::XOFF => {
683 ent.handle_incoming_xoff(msg)?;
684 return Ok(false);
685 }
686 _ => {}
687 }
688
689 let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
690
691 if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
692 if e.is_full() {
693 cfg_if::cfg_if! {
694 if #[cfg(not(feature = "flowctl-cc"))] {
695 // If we get here, we either have a logic bug (!), or an attacker
696 // is sending us more cells than we asked for via congestion control.
697 return Err(Error::CircProto(format!(
698 "Stream sink would block; received too many cells on stream ID {}",
699 sv(streamid),
700 )));
701 } else {
702 return Err(internal!(
703 "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
704 sv(streamid),
705 )
706 .into());
707 }
708 }
709 }
710 if e.is_disconnected() && cell_counts_toward_windows {
711 // the other side of the stream has gone away; remember
712 // that we received a cell that we couldn't queue for it.
713 //
714 // Later this value will be recorded in a half-stream.
715 ent.dropped += 1;
716 }
717 }
718
719 Ok(message_closes_stream)
720 }
721
722 /// Note that we received an END message (or other message indicating the end of
723 /// the stream) on the stream with `id`.
724 ///
725 /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
726 #[cfg(feature = "hs-service")]
727 pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
728 let mut hop_map = self.map.lock().expect("lock poisoned");
729
730 hop_map.ending_msg_received(stream_id)?;
731
732 Ok(())
733 }
734
735 /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
736 ///
737 /// Returns back the provided `msg`, if the message is an incoming stream request
738 /// that needs to be handled by the calling code.
739 ///
740 // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
741 // back and forth like this.
742 pub(crate) fn handle_msg<F>(
743 &self,
744 possible_proto_violation_err: F,
745 cell_counts_toward_windows: bool,
746 streamid: StreamId,
747 msg: UnparsedRelayMsg,
748 now: Instant,
749 ) -> Result<Option<UnparsedRelayMsg>>
750 where
751 F: FnOnce(StreamId) -> Error,
752 {
753 let mut hop_map = self.map.lock().expect("lock poisoned");
754
755 match hop_map.get_mut(streamid) {
756 Some(StreamEntMut::Open(ent)) => {
757 // Can't have a stream level SENDME when congestion control is enabled.
758 let message_closes_stream =
759 Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
760
761 if message_closes_stream {
762 hop_map.ending_msg_received(streamid)?;
763 }
764 }
765 Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
766 return Err(possible_proto_violation_err(streamid));
767 }
768 #[cfg(feature = "hs-service")]
769 Some(StreamEntMut::EndSent(_))
770 if matches!(
771 msg.cmd(),
772 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
773 ) =>
774 {
775 // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
776 // message, just remove the old stream from the map and stop waiting for a
777 // response
778 hop_map.ending_msg_received(streamid)?;
779 return Ok(Some(msg));
780 }
781 Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
782 // We sent an end but maybe the other side hasn't heard.
783
784 match half_stream.handle_msg(msg)? {
785 StreamStatus::Open => {}
786 StreamStatus::Closed => {
787 hop_map.ending_msg_received(streamid)?;
788 }
789 }
790 }
791 #[cfg(feature = "hs-service")]
792 None if matches!(
793 msg.cmd(),
794 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
795 ) =>
796 {
797 return Ok(Some(msg));
798 }
799 _ => {
800 // No stream wants this message, or ever did.
801 return Err(possible_proto_violation_err(streamid));
802 }
803 }
804
805 Ok(None)
806 }
807
808 /// Get the stream map of this hop.
809 pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
810 &self.map
811 }
812
813 /// Set the stream map of this hop to `map`.
814 ///
815 /// Returns an error if the existing stream map of the hop has any open stream.
816 pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
817 if self.n_open_streams() != 0 {
818 return Err(internal!("Tried to discard existing open streams?!"));
819 }
820
821 self.map = map;
822
823 Ok(())
824 }
825
826 /// Decrement the limit of outbound cells that may be sent to this hop; give
827 /// an error if it would reach zero.
828 pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
829 try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
830 .map_err(|_| Error::ExcessOutboundCells)
831 }
832}
833
834/// If `val` is `Some(1)`, return Err(());
835/// otherwise decrement it (if it is Some) and return Ok(()).
836#[inline]
837fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
838 // This is a bit verbose, but I've confirmed that it optimizes nicely.
839 match val {
840 Some(x) => {
841 let z = u32::from(*x);
842 if z == 1 {
843 Err(())
844 } else {
845 *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
846 Ok(())
847 }
848 }
849 None => Ok(()),
850 }
851}
852
853/// Convert a limit from the form used in a HopSettings to that used here.
854/// (The format we use here is more compact.)
855fn cvt(limit: u32) -> NonZeroU32 {
856 // See "known limitations" comment on n_incoming_cells_permitted.
857 limit
858 .saturating_add(1)
859 .try_into()
860 .expect("Adding one left it as zero?")
861}