Skip to main content

tor_proto/client/reactor/
circuit.rs

1//! Module exposing types for representing circuits in the tunnel reactor.
2
3pub(crate) mod circhop;
4pub(super) mod extender;
5
6use crate::channel::Channel;
7use crate::circuit::cell_sender::CircuitCellSender;
8use crate::circuit::celltypes::CreateResponse;
9use crate::circuit::circhop::HopSettings;
10use crate::circuit::create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
11use crate::circuit::padding::CircPaddingDisposition;
12use crate::circuit::{CircuitRxReceiver, UniqId};
13use crate::client::circuit::handshake::{BoxedClientLayer, HandshakeRole};
14use crate::client::circuit::padding::{
15    self, PaddingController, PaddingEventStream, QueuedCellPaddingInfo,
16};
17use crate::client::circuit::{ClientCircChanMsg, MutableState, path};
18use crate::client::reactor::MetaCellDisposition;
19use crate::congestion::CongestionSignals;
20use crate::congestion::sendme;
21use crate::crypto::binding::CircuitBinding;
22use crate::crypto::cell::{
23    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
24    RelayCellBody,
25};
26use crate::crypto::handshake::fast::CreateFastClient;
27use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
28use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
29use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
30use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
31use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
32use crate::stream::flow_ctrl::state::StreamRateLimit;
33use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
34use crate::stream::queue::{StreamQueueSender, stream_queue};
35use crate::stream::{StreamMpscReceiver, msg_streamid};
36use crate::streammap;
37use crate::tunnel::TunnelScopedCircId;
38use crate::util::err::ReactorError;
39use crate::util::notify::NotifySender;
40use crate::util::timeout::TimeoutEstimator;
41use crate::{ClockSkew, Error, Result};
42
43use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
44use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
45use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
46use tor_cell::chancell::{BoxedCellBody, ChanMsg};
47use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
48use tor_cell::relaycell::{
49    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
50};
51use tor_error::{Bug, internal};
52use tor_linkspec::RelayIds;
53use tor_llcrypto::pk;
54use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
55use web_time_compat::{Duration, Instant, SystemTime};
56
57use futures::SinkExt as _;
58use oneshot_fused_workaround as oneshot;
59use postage::watch;
60use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
61use tracing::{debug, instrument, trace, warn};
62
63use super::{
64    CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
65};
66
67use crate::conflux::msghandler::ConfluxStatus;
68
69use std::borrow::Borrow;
70use std::pin::Pin;
71use std::result::Result as StdResult;
72use std::sync::Arc;
73
74use extender::HandshakeAuxDataHandler;
75
76#[cfg(feature = "hs-service")]
77use {
78    crate::circuit::CircHopSyncView,
79    crate::client::stream::{InboundDataCmdChecker, IncomingStreamRequest},
80    tor_cell::relaycell::msg::Begin,
81};
82
83#[cfg(feature = "conflux")]
84use {
85    crate::conflux::msghandler::{ConfluxAction, ConfluxCmd, ConfluxMsgHandler, OooRelayMsg},
86    crate::tunnel::TunnelId,
87};
88
89#[cfg(not(feature = "flowctl-cc"))]
90use crate::stream::STREAM_READER_BUFFER;
91
92pub(super) use circhop::{CircHop, CircHopList};
93
94/// A circuit "leg" from a tunnel.
95///
96/// Regular (non-multipath) circuits have a single leg.
97/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
98pub(crate) struct Circuit {
99    /// The time provider.
100    runtime: DynTimeProvider,
101    /// The channel this circuit is attached to.
102    channel: Arc<Channel>,
103    /// Sender object used to actually send cells.
104    ///
105    /// NOTE: Control messages could potentially add unboundedly to this, although that's
106    ///       not likely to happen (and isn't triggereable from the network, either).
107    pub(super) chan_sender: CircuitCellSender,
108    /// Input stream, on which we receive ChanMsg objects from this circuit's
109    /// channel.
110    ///
111    // TODO: could use a SPSC channel here instead.
112    pub(super) input: CircuitRxReceiver,
113    /// The cryptographic state for this circuit for inbound cells.
114    /// This object is divided into multiple layers, each of which is
115    /// shared with one hop of the circuit.
116    crypto_in: InboundClientCrypt,
117    /// The cryptographic state for this circuit for outbound cells.
118    crypto_out: OutboundClientCrypt,
119    /// List of hops state objects used by the reactor
120    pub(super) hops: CircHopList,
121    /// Mutable information about this circuit,
122    /// shared with the reactor's `ConfluxSet`.
123    mutable: Arc<MutableState>,
124    /// This circuit's identifier on the upstream channel.
125    channel_id: CircId,
126    /// An identifier for logging about this reactor's circuit.
127    unique_id: TunnelScopedCircId,
128    /// A handler for conflux cells.
129    ///
130    /// Set once the conflux handshake is initiated by the reactor
131    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
132    #[cfg(feature = "conflux")]
133    conflux_handler: Option<ConfluxMsgHandler>,
134    /// A padding controller to which padding-related events should be reported.
135    padding_ctrl: PaddingController,
136    /// An event stream telling us about padding-related events.
137    //
138    // TODO: it would be nice to have all of these streams wrapped in a single
139    // SelectAll, but we can't really do that, since we need the ability to move them
140    // from one conflux set to another, and a SelectAll doesn't let you actually
141    // remove one of its constituent streams.  This issue might get solved along
142    // with the rest of the next reactor refactoring.
143    pub(super) padding_event_stream: PaddingEventStream,
144    /// Current rules for blocking traffic, according to the padding controller.
145    #[cfg(feature = "circ-padding")]
146    padding_block: Option<padding::StartBlocking>,
147    /// The circuit timeout estimator.
148    ///
149    /// Used for computing half-stream expiration.
150    timeouts: Arc<dyn TimeoutEstimator>,
151    /// Memory quota account
152    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
153    memquota: CircuitAccount,
154}
155
156/// A command to run in response to a circuit event.
157///
158/// Unlike `RunOnceCmdInner`, doesn't know anything about `UniqId`s.
159/// The user of the `CircuitCmd`s is supposed to know the `UniqId`
160/// of the circuit the `CircuitCmd` came from.
161///
162/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
163#[derive(Debug, derive_more::From)]
164pub(super) enum CircuitCmd {
165    /// Send a RELAY cell on the circuit leg this command originates from.
166    Send(SendRelayCell),
167    /// Handle a SENDME message received on the circuit leg this command originates from.
168    HandleSendMe {
169        /// The hop number.
170        hop: HopNum,
171        /// The SENDME message to handle.
172        sendme: Sendme,
173    },
174    /// Close the specified stream on the circuit leg this command originates from.
175    CloseStream {
176        /// The hop number.
177        hop: HopNum,
178        /// The ID of the stream to close.
179        sid: StreamId,
180        /// The stream-closing behavior.
181        behav: CloseStreamBehavior,
182        /// The reason for closing the stream.
183        reason: streammap::TerminateReason,
184    },
185    /// Perform an action resulting from handling a conflux cell.
186    #[cfg(feature = "conflux")]
187    Conflux(ConfluxCmd),
188    /// Perform a clean shutdown on this circuit.
189    CleanShutdown,
190    /// Enqueue an out-of-order cell in the reactor.
191    #[cfg(feature = "conflux")]
192    Enqueue(OooRelayMsg),
193}
194
195/// Return a `CircProto` error for the specified unsupported cell.
196///
197/// This error will shut down the reactor.
198///
199/// Note: this is a macro to simplify usage (this way the caller doesn't
200/// need to .map() the result to the appropriate type)
201macro_rules! unsupported_client_cell {
202    ($msg:expr) => {{
203        unsupported_client_cell!(@ $msg, "")
204    }};
205
206    ($msg:expr, $hopnum:expr) => {{
207        let hop: HopNum = $hopnum;
208        let hop_display = format!(" from hop {}", hop.display());
209        unsupported_client_cell!(@ $msg, hop_display)
210    }};
211
212    (@ $msg:expr, $hopnum_display:expr) => {
213        Err(crate::Error::CircProto(format!(
214            "Unexpected {} cell{} on client circuit",
215            $msg.cmd(),
216            $hopnum_display,
217        )))
218    };
219}
220
221pub(super) use unsupported_client_cell;
222
223impl Circuit {
224    /// Create a new non-multipath circuit.
225    #[allow(clippy::too_many_arguments)]
226    pub(super) fn new(
227        runtime: DynTimeProvider,
228        channel: Arc<Channel>,
229        channel_id: CircId,
230        unique_id: TunnelScopedCircId,
231        input: CircuitRxReceiver,
232        memquota: CircuitAccount,
233        mutable: Arc<MutableState>,
234        padding_ctrl: PaddingController,
235        padding_event_stream: PaddingEventStream,
236        timeouts: Arc<dyn TimeoutEstimator>,
237    ) -> Self {
238        let chan_sender = CircuitCellSender::from_channel_sender(channel.sender());
239
240        let crypto_out = OutboundClientCrypt::new();
241        Circuit {
242            runtime,
243            channel,
244            chan_sender,
245            input,
246            crypto_in: InboundClientCrypt::new(),
247            hops: CircHopList::default(),
248            unique_id,
249            channel_id,
250            crypto_out,
251            mutable,
252            #[cfg(feature = "conflux")]
253            conflux_handler: None,
254            padding_ctrl,
255            padding_event_stream,
256            #[cfg(feature = "circ-padding")]
257            padding_block: None,
258            timeouts,
259            memquota,
260        }
261    }
262
263    /// Return the process-unique identifier of this circuit.
264    pub(super) fn unique_id(&self) -> UniqId {
265        self.unique_id.unique_id()
266    }
267
268    /// Return the shared mutable state of this circuit.
269    pub(super) fn mutable(&self) -> &Arc<MutableState> {
270        &self.mutable
271    }
272
273    /// Add this circuit to a multipath tunnel, by associating it with a new [`TunnelId`],
274    /// and installing a [`ConfluxMsgHandler`] on this circuit.
275    ///
276    /// Once this is called, the circuit will be able to handle conflux cells.
277    #[cfg(feature = "conflux")]
278    pub(super) fn add_to_conflux_tunnel(
279        &mut self,
280        tunnel_id: TunnelId,
281        conflux_handler: ConfluxMsgHandler,
282    ) {
283        self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
284        self.conflux_handler = Some(conflux_handler);
285    }
286
287    /// Send a LINK cell to the specified hop.
288    ///
289    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
290    /// on the circuit with [`add_to_conflux_tunnel`](Self::add_to_conflux_tunnel).
291    #[cfg(feature = "conflux")]
292    pub(super) async fn begin_conflux_link(
293        &mut self,
294        hop: HopNum,
295        cell: AnyRelayMsgOuter,
296        runtime: &tor_rtcompat::DynTimeProvider,
297    ) -> Result<()> {
298        use tor_rtcompat::SleepProvider as _;
299
300        if self.conflux_handler.is_none() {
301            return Err(internal!(
302                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
303            )
304            .into());
305        }
306
307        let cell = SendRelayCell {
308            hop: Some(hop),
309            early: false,
310            cell,
311        };
312        self.send_relay_cell(cell).await?;
313
314        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
315            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
316        };
317
318        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
319    }
320
321    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
322    ///
323    /// Returns `None` if the handshake is not currently in progress.
324    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
325        cfg_if::cfg_if! {
326            if #[cfg(feature = "conflux")] {
327                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
328            } else {
329                None
330            }
331        }
332    }
333
334    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
335    #[cfg(test)]
336    pub(super) fn handle_add_fake_hop(
337        &mut self,
338        format: RelayCellFormat,
339        fwd_lasthop: bool,
340        rev_lasthop: bool,
341        dummy_peer_id: path::HopDetail,
342        // TODO-CGO: Take HopSettings instead of CircParams.
343        // (Do this after we've got the virtual-hop refactorings done for
344        // virtual extending.)
345        params: &crate::client::circuit::CircParameters,
346        done: ReactorResultChannel<()>,
347    ) {
348        use tor_protover::{Protocols, named};
349
350        use crate::client::circuit::test::DummyCrypto;
351
352        assert!(matches!(format, RelayCellFormat::V0));
353        let _ = format; // TODO-CGO: remove this once we have CGO+hs implemented.
354
355        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
356        let rev = Box::new(DummyCrypto::new(rev_lasthop));
357        let binding = None;
358
359        let settings = HopSettings::from_params_and_caps(
360            // This is for testing only, so we'll assume full negotiation took place.
361            crate::circuit::circhop::HopNegotiationType::Full,
362            params,
363            &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
364        )
365        .expect("Can't construct HopSettings");
366        self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
367            .expect("could not add hop to circuit");
368        let _ = done.send(Ok(()));
369    }
370
371    /// Encode `msg` and encrypt it, returning the resulting cell
372    /// and tag that should be expected for an authenticated SENDME sent
373    /// in response to that cell.
374    fn encode_relay_cell(
375        crypto_out: &mut OutboundClientCrypt,
376        relay_format: RelayCellFormat,
377        hop: HopNum,
378        early: bool,
379        msg: AnyRelayMsgOuter,
380    ) -> Result<(AnyChanMsg, SendmeTag)> {
381        let mut body: RelayCellBody = msg
382            .encode(relay_format, &mut rand::rng())
383            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
384            .into();
385        let cmd = if early {
386            ChanCmd::RELAY_EARLY
387        } else {
388            ChanCmd::RELAY
389        };
390        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
391        let msg = Relay::from(BoxedCellBody::from(body));
392        let msg = if early {
393            AnyChanMsg::RelayEarly(msg.into())
394        } else {
395            AnyChanMsg::Relay(msg)
396        };
397
398        Ok((msg, tag))
399    }
400
401    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
402    ///
403    /// If there is insufficient outgoing *circuit-level* or *stream-level*
404    /// SENDME window, an error is returned instead.
405    ///
406    /// Does not check whether the cell is well-formed or reasonable.
407    ///
408    /// NOTE: the reactor should not call this function directly, only via
409    /// [`ConfluxSet::send_relay_cell_on_leg`](super::conflux::ConfluxSet::send_relay_cell_on_leg),
410    /// which will reroute the message, if necessary to the primary leg.
411    #[instrument(level = "trace", skip_all)]
412    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
413        self.send_relay_cell_inner(msg, None).await
414    }
415
416    /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
417    /// [`QueuedCellPaddingInfo`] in `padding_info`.
418    ///
419    /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
420    /// padding controller.
421    #[instrument(level = "trace", skip_all)]
422    async fn send_relay_cell_inner(
423        &mut self,
424        msg: SendRelayCell,
425        padding_info: Option<QueuedCellPaddingInfo>,
426    ) -> Result<()> {
427        let SendRelayCell {
428            hop,
429            early,
430            cell: msg,
431        } = msg;
432
433        let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
434        if !is_conflux_link && self.is_conflux_pending() {
435            // Note: it is the responsibility of the reactor user to wait until
436            // at least one of the legs completes the handshake.
437            return Err(internal!("tried to send cell on unlinked circuit").into());
438        }
439
440        trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
441
442        // Cloned, because we borrow mutably from self when we get the circhop.
443        let runtime = self.runtime.clone();
444        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
445        let stream_id = msg.stream_id();
446        let hop = hop.expect("missing hop in client SendRelayCell?!");
447        let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
448
449        // We might be out of capacity entirely; see if we are about to hit a limit.
450        //
451        // TODO: If we ever add a notion of _recoverable_ errors below, we'll
452        // need a way to restore this limit, and similarly for about_to_send().
453        circhop.decrement_outbound_cell_limit()?;
454
455        // We need to apply stream-level flow control *before* encoding the message.
456        if c_t_w {
457            if let Some(stream_id) = stream_id {
458                circhop.about_to_send(stream_id, msg.msg())?;
459            }
460        }
461
462        // Save the RelayCmd of the message before it gets consumed below.
463        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
464        // so that it can update its counters.
465        let relay_cmd = msg.cmd();
466
467        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
468        //            the whole circuit (e.g. by returning an error).
469        let (msg, tag) = Self::encode_relay_cell(
470            &mut self.crypto_out,
471            circhop.relay_cell_format(),
472            hop,
473            early,
474            msg,
475        )?;
476        // The cell counted for congestion control, inform our algorithm of such and pass down the
477        // tag for authenticated SENDMEs.
478        if c_t_w {
479            circhop.ccontrol().note_data_sent(&runtime, &tag)?;
480        }
481
482        // Remember that we've enqueued this cell.
483        let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
484
485        self.send_msg(msg, padding_info).await?;
486
487        #[cfg(feature = "conflux")]
488        if let Some(conflux) = self.conflux_handler.as_mut() {
489            conflux.note_cell_sent(relay_cmd);
490        }
491
492        Ok(())
493    }
494
495    /// Helper: process a cell on a channel.  Most cells get ignored
496    /// or rejected; a few get delivered to circuits.
497    ///
498    /// Return `CellStatus::CleanShutdown` if we should exit.
499    ///
500    // TODO: returning `Vec<CircuitCmd>` means we're unnecessarily
501    // allocating a `Vec` here. Generally, the number of commands is going to be small
502    // (usually 1, but > 1 when we start supporting packed cells).
503    //
504    // We should consider using smallvec instead. It might also be a good idea to have a
505    // separate higher-level type splitting this out into Single(CircuitCmd),
506    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
507    pub(super) fn handle_cell(
508        &mut self,
509        handlers: &mut CellHandlers,
510        leg: UniqId,
511        cell: ClientCircChanMsg,
512    ) -> Result<Vec<CircuitCmd>> {
513        trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
514        use ClientCircChanMsg::*;
515        match cell {
516            Relay(r) => self.handle_relay_cell(handlers, leg, r),
517            Destroy(d) => {
518                let reason = d.reason();
519                debug!(
520                    circ_id = %self.unique_id,
521                    "Received DESTROY cell. Reason: {} [{}]",
522                    reason.human_str(),
523                    reason
524                );
525
526                self.handle_destroy_cell().map(|c| vec![c])
527            }
528        }
529    }
530
531    /// Decode `cell`, returning its corresponding hop number, tag,
532    /// and decoded body.
533    fn decode_relay_cell(
534        &mut self,
535        cell: Relay,
536    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
537        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
538        let cmd = cell.cmd();
539        let mut body = cell.into_relay_body().into();
540
541        // Decrypt the cell. If it's recognized, then find the
542        // corresponding hop.
543        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
544
545        // Decode the cell.
546        let decode_res = self
547            .hop_mut(hopnum)
548            .ok_or_else(|| {
549                Error::from(internal!(
550                    "Trying to decode cell from nonexistent hop {:?}",
551                    hopnum
552                ))
553            })?
554            .decode(body.into())?;
555
556        Ok((hopnum, tag, decode_res))
557    }
558
559    /// React to a Relay or RelayEarly cell.
560    fn handle_relay_cell(
561        &mut self,
562        handlers: &mut CellHandlers,
563        leg: UniqId,
564        cell: Relay,
565    ) -> Result<Vec<CircuitCmd>> {
566        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
567
568        if decode_res.is_padding() {
569            self.padding_ctrl.decrypted_padding(hopnum)?;
570        } else {
571            self.padding_ctrl.decrypted_data(hopnum);
572        }
573
574        // Check whether we are allowed to receive more data for this circuit hop.
575        self.hop_mut(hopnum)
576            .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
577            .decrement_inbound_cell_limit()?;
578
579        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
580
581        // Decrement the circuit sendme windows, and see if we need to
582        // send a sendme cell.
583        let send_circ_sendme = if c_t_w {
584            self.hop_mut(hopnum)
585                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
586                .ccontrol()
587                .note_data_received()?
588        } else {
589            false
590        };
591
592        let mut circ_cmds = vec![];
593        // If we do need to send a circuit-level SENDME cell, do so.
594        if send_circ_sendme {
595            // This always sends a V1 (tagged) sendme cell, and thereby assumes
596            // that SendmeEmitMinVersion is no more than 1.  If the authorities
597            // every increase that parameter to a higher number, this will
598            // become incorrect.  (Higher numbers are not currently defined.)
599            let sendme = Sendme::from(tag);
600            let cell = AnyRelayMsgOuter::new(None, sendme.into());
601            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
602                hop: Some(hopnum),
603                early: false,
604                cell,
605            }));
606
607            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
608            self.hop_mut(hopnum)
609                .ok_or_else(|| {
610                    Error::from(internal!(
611                        "Trying to send SENDME to nonexistent hop {:?}",
612                        hopnum
613                    ))
614                })?
615                .ccontrol()
616                .note_sendme_sent()?;
617        }
618
619        let (mut msgs, incomplete) = decode_res.into_parts();
620        while let Some(msg) = msgs.next() {
621            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
622
623            match msg_status {
624                None => continue,
625                Some(msg @ CircuitCmd::CleanShutdown) => {
626                    for m in msgs {
627                        debug!(
628                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
629                            id = self.unique_id
630                        );
631                    }
632                    if let Some(incomplete) = incomplete {
633                        debug!(
634                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
635                            incomplete,
636                            id = self.unique_id,
637                        );
638                    }
639                    circ_cmds.push(msg);
640                    return Ok(circ_cmds);
641                }
642                Some(msg) => {
643                    circ_cmds.push(msg);
644                }
645            }
646        }
647
648        Ok(circ_cmds)
649    }
650
651    /// Handle a single incoming relay message.
652    fn handle_relay_msg(
653        &mut self,
654        handlers: &mut CellHandlers,
655        hopnum: HopNum,
656        leg: UniqId,
657        cell_counts_toward_windows: bool,
658        msg: UnparsedRelayMsg,
659    ) -> Result<Option<CircuitCmd>> {
660        // If this msg wants/refuses to have a Stream ID, does it
661        // have/not have one?
662        let streamid = msg_streamid(&msg)?;
663
664        // If this doesn't have a StreamId, it's a meta cell,
665        // not meant for a particular stream.
666        let Some(streamid) = streamid else {
667            return self.handle_meta_cell(handlers, hopnum, msg);
668        };
669
670        #[cfg(feature = "conflux")]
671        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
672            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
673                ConfluxAction::Deliver(msg) => {
674                    // The message either doesn't count towards the sequence numbers
675                    // or is already well-ordered, so we're ready to handle it.
676
677                    // It's possible that some of our buffered messages are now ready to be
678                    // handled. We don't check that here, however, because that's handled
679                    // by the reactor main loop.
680                    msg
681                }
682                ConfluxAction::Enqueue(msg) => {
683                    // Tell the reactor to enqueue this msg
684                    return Ok(Some(CircuitCmd::Enqueue(msg)));
685                }
686            }
687        } else {
688            // If we don't have a conflux_handler, it means this circuit is not part of
689            // a conflux tunnel, so we can just process the message.
690            msg
691        };
692
693        self.handle_in_order_relay_msg(
694            handlers,
695            hopnum,
696            leg,
697            cell_counts_toward_windows,
698            streamid,
699            msg,
700        )
701    }
702
703    /// Handle a single incoming relay message that is known to be in order.
704    pub(super) fn handle_in_order_relay_msg(
705        &mut self,
706        handlers: &mut CellHandlers,
707        hopnum: HopNum,
708        leg: UniqId,
709        cell_counts_toward_windows: bool,
710        streamid: StreamId,
711        msg: UnparsedRelayMsg,
712    ) -> Result<Option<CircuitCmd>> {
713        let now = self.runtime.now();
714
715        #[cfg(feature = "conflux")]
716        if let Some(conflux) = self.conflux_handler.as_mut() {
717            conflux.inc_last_seq_delivered(&msg);
718        }
719
720        let path = self.mutable.path();
721
722        let nonexistent_hop_err = || Error::CircProto("Cell from nonexistent hop!".into());
723        let hop = self.hop_mut(hopnum).ok_or_else(nonexistent_hop_err)?;
724
725        let hop_detail = path
726            .iter()
727            .nth(usize::from(hopnum))
728            .ok_or_else(nonexistent_hop_err)?;
729
730        // Returns the original message if it's an incoming stream request
731        // that we need to handle.
732        let res = hop.handle_msg(hop_detail, cell_counts_toward_windows, streamid, msg, now)?;
733
734        // If it was an incoming stream request, we don't need to worry about
735        // sending an XOFF as there's no stream data within this message.
736        if let Some(msg) = res {
737            cfg_if::cfg_if! {
738                if #[cfg(feature = "hs-service")] {
739                    return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
740                } else {
741                    return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
742                }
743            }
744        }
745
746        // We may want to send an XOFF if the incoming buffer is too large.
747        if let Some(cell) = hop.maybe_send_xoff(streamid)? {
748            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
749            let cell = SendRelayCell {
750                hop: Some(hopnum),
751                early: false,
752                cell,
753            };
754            return Ok(Some(CircuitCmd::Send(cell)));
755        }
756
757        Ok(None)
758    }
759
760    /// Handle a conflux message coming from the specified hop.
761    ///
762    /// Returns an error if
763    ///
764    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
765    ///   * this is a client circuit and the conflux message originated an unexpected hop
766    ///   * the cell was sent in violation of the handshake protocol
767    #[cfg(feature = "conflux")]
768    fn handle_conflux_msg(
769        &mut self,
770        hop: HopNum,
771        msg: UnparsedRelayMsg,
772    ) -> Result<Option<ConfluxCmd>> {
773        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
774            // If conflux is not enabled, tear down the circuit
775            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
776            return Err(Error::CircProto(format!(
777                "Received {} cell from hop {} on non-conflux client circuit?!",
778                msg.cmd(),
779                hop.display(),
780            )));
781        };
782
783        Ok(conflux_handler.handle_conflux_msg(msg, hop))
784    }
785
786    /// For conflux: return the sequence number of the last cell sent on this leg.
787    ///
788    /// Returns an error if this circuit is not part of a conflux set.
789    #[cfg(feature = "conflux")]
790    pub(super) fn last_seq_sent(&self) -> Result<u64> {
791        let handler = self
792            .conflux_handler
793            .as_ref()
794            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
795
796        Ok(handler.last_seq_sent())
797    }
798
799    /// For conflux: set the sequence number of the last cell sent on this leg.
800    ///
801    /// Returns an error if this circuit is not part of a conflux set.
802    #[cfg(feature = "conflux")]
803    pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
804        let handler = self
805            .conflux_handler
806            .as_mut()
807            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
808
809        handler.set_last_seq_sent(n);
810        Ok(())
811    }
812
813    /// For conflux: return the sequence number of the last cell received on this leg.
814    ///
815    /// Returns an error if this circuit is not part of a conflux set.
816    #[cfg(feature = "conflux")]
817    pub(super) fn last_seq_recv(&self) -> Result<u64> {
818        let handler = self
819            .conflux_handler
820            .as_ref()
821            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
822
823        Ok(handler.last_seq_recv())
824    }
825
826    /// A helper for handling incoming stream requests.
827    ///
828    // TODO: can we make this a method on CircHop to avoid the double HopNum lookup?
829    #[cfg(feature = "hs-service")]
830    fn handle_incoming_stream_request(
831        &mut self,
832        handlers: &mut CellHandlers,
833        msg: UnparsedRelayMsg,
834        stream_id: StreamId,
835        hop_num: HopNum,
836        leg: UniqId,
837    ) -> Result<Option<CircuitCmd>> {
838        use tor_cell::relaycell::msg::EndReason;
839        use tor_error::into_internal;
840        use tor_log_ratelim::log_ratelim;
841
842        use crate::client::circuit::CIRCUIT_BUFFER_SIZE;
843        use crate::stream::incoming::StreamReqInfo;
844
845        // We need to construct this early so that we don't double-borrow &mut self
846
847        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
848            return Err(Error::CircProto(
849                "Cannot handle BEGIN cells on this circuit".into(),
850            ));
851        };
852
853        // The handler's hop_num is only ever set to None for relays.
854        let expected_hop_num = handler
855            .hop_num
856            .ok_or_else(|| internal!("Handler HopNum is None in client impl?!"))?;
857
858        if hop_num != expected_hop_num {
859            return Err(Error::CircProto(format!(
860                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
861                expected_hop_num.display(),
862                msg.cmd(),
863                hop_num.display()
864            )));
865        }
866
867        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
868
869        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
870        // have to look it up again! However, we can't pass the `&mut hop` reference from
871        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
872        // borrowing self as mutable more than once).
873        //
874        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
875        // handle_relay_cell to work around the problem described above
876        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
877
878        if message_closes_stream {
879            hop.ending_msg_received(stream_id)?;
880
881            return Ok(None);
882        }
883
884        let begin = msg
885            .decode::<Begin>()
886            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
887            .into_msg();
888
889        let req = IncomingStreamRequest::Begin(begin);
890
891        {
892            use crate::client::stream::IncomingStreamRequestDisposition::*;
893
894            let ctx = crate::client::stream::IncomingStreamRequestContext { request: &req };
895            // IMPORTANT: super::syncview::CircHopSyncView::n_open_streams() (called via disposition() below)
896            // accesses the stream map mutexes!
897            //
898            // This means it's very important not to call this function while any of the hop's
899            // stream map mutex is held.
900            let view = CircHopSyncView::new(hop.outbound());
901
902            match handler.filter.as_mut().disposition(&ctx, &view)? {
903                Accept => {}
904                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
905                RejectRequest(end) => {
906                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
907                    let cell = SendRelayCell {
908                        hop: Some(hop_num),
909                        early: false,
910                        cell: end_msg,
911                    };
912                    return Ok(Some(CircuitCmd::Send(cell)));
913                }
914            }
915        }
916
917        // TODO: Sadly, we need to look up `&mut hop` yet again,
918        // since we needed to pass `&self.hops` by reference to our filter above. :(
919        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
920        let relay_cell_format = hop.relay_cell_format();
921
922        let memquota = StreamAccount::new(&self.memquota)?;
923
924        let (sender, receiver) = stream_queue(
925            #[cfg(not(feature = "flowctl-cc"))]
926            STREAM_READER_BUFFER,
927            &memquota,
928            self.chan_sender.time_provider(),
929        )?;
930
931        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
932            self.chan_sender.time_provider().clone(),
933            memquota.as_raw_account(),
934        )?;
935
936        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
937
938        // A channel for the reactor to request a new drain rate from the reader.
939        // Typically this notification will be sent after an XOFF is sent so that the reader can
940        // send us a new drain rate when the stream data queue becomes empty.
941        let mut drain_rate_request_tx = NotifySender::new_typed();
942        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
943
944        let cmd_checker = InboundDataCmdChecker::new_connected();
945        hop.add_ent_with_id(
946            sender,
947            msg_rx,
948            rate_limit_tx,
949            drain_rate_request_tx,
950            stream_id,
951            cmd_checker,
952        )?;
953
954        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
955            req,
956            stream_id,
957            hop: Some((leg, hop_num).into()),
958            msg_tx,
959            receiver,
960            rate_limit_stream: rate_limit_rx,
961            drain_rate_request_stream: drain_rate_request_rx,
962            memquota,
963            relay_cell_format,
964        });
965
966        log_ratelim!("Delivering message to incoming stream handler"; outcome);
967
968        if let Err(e) = outcome {
969            if e.is_full() {
970                // The IncomingStreamRequestHandler's stream is full; it isn't
971                // handling requests fast enough. So instead, we reply with an
972                // END cell.
973                let end_msg = AnyRelayMsgOuter::new(
974                    Some(stream_id),
975                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
976                );
977
978                let cell = SendRelayCell {
979                    hop: Some(hop_num),
980                    early: false,
981                    cell: end_msg,
982                };
983                return Ok(Some(CircuitCmd::Send(cell)));
984            } else if e.is_disconnected() {
985                // The IncomingStreamRequestHandler's stream has been dropped.
986                // In the Tor protocol as it stands, this always means that the
987                // circuit itself is out-of-use and should be closed. (See notes
988                // on `allow_stream_requests.`)
989                //
990                // Note that we will _not_ reach this point immediately after
991                // the IncomingStreamRequestHandler is dropped; we won't hit it
992                // until we next get an incoming request.  Thus, if we do later
993                // want to add early detection for a dropped
994                // IncomingStreamRequestHandler, we need to do it elsewhere, in
995                // a different way.
996                debug!(
997                    circ_id = %self.unique_id,
998                    "Incoming stream request receiver dropped",
999                );
1000                // This will _cause_ the circuit to get closed.
1001                return Err(Error::CircuitClosed);
1002            } else {
1003                // There are no errors like this with the current design of
1004                // futures::mpsc, but we shouldn't just ignore the possibility
1005                // that they'll be added later.
1006                return Err(Error::from((into_internal!(
1007                    "try_send failed unexpectedly"
1008                ))(e)));
1009            }
1010        }
1011
1012        Ok(None)
1013    }
1014
1015    /// Helper: process a destroy cell.
1016    #[allow(clippy::unnecessary_wraps)]
1017    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
1018        // I think there is nothing more to do here.
1019        Ok(CircuitCmd::CleanShutdown)
1020    }
1021
1022    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
1023    pub(super) async fn handle_create(
1024        &mut self,
1025        recv_created: oneshot::Receiver<CreateResponse>,
1026        handshake: CircuitHandshake,
1027        settings: HopSettings,
1028        done: ReactorResultChannel<()>,
1029    ) -> StdResult<(), ReactorError> {
1030        let ret = match handshake {
1031            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
1032            CircuitHandshake::Ntor {
1033                public_key,
1034                ed_identity,
1035            } => {
1036                self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
1037                    .await
1038            }
1039            CircuitHandshake::NtorV3 { public_key } => {
1040                self.create_firsthop_ntor_v3(recv_created, public_key, settings)
1041                    .await
1042            }
1043        };
1044        let _ = done.send(ret); // don't care if sender goes away
1045
1046        // TODO: maybe we don't need to flush here?
1047        // (we could let run_once() handle all the flushing)
1048        self.chan_sender.flush().await?;
1049
1050        Ok(())
1051    }
1052
1053    /// Helper: create the first hop of a circuit.
1054    ///
1055    /// This is parameterized not just on the RNG, but a wrapper object to
1056    /// build the right kind of create cell, and a handshake object to perform
1057    /// the cryptographic handshake.
1058    async fn create_impl<H, W, M>(
1059        &mut self,
1060        recvcreated: oneshot::Receiver<CreateResponse>,
1061        wrap: &W,
1062        key: &H::KeyType,
1063        mut settings: HopSettings,
1064        msg: &M,
1065    ) -> Result<()>
1066    where
1067        H: ClientHandshake + HandshakeAuxDataHandler,
1068        W: CreateHandshakeWrap,
1069        H::KeyGen: KeyGenerator,
1070        M: Borrow<H::ClientAuxData>,
1071    {
1072        // We don't need to shut down the circuit on failure here, since this
1073        // function consumes the PendingClientCirc and only returns
1074        // a ClientCirc on success.
1075
1076        let (state, msg) = H::client1(&mut rand::rng(), key, msg)?;
1077        let create_cell = wrap.to_chanmsg(msg);
1078        trace!(
1079            circ_id = %self.unique_id,
1080            create = %create_cell.cmd(),
1081            "Extending to hop 1",
1082        );
1083        self.send_msg(create_cell, None).await?;
1084
1085        let reply = recvcreated
1086            .await
1087            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1088
1089        let relay_handshake = wrap.decode_chanmsg(reply)?;
1090        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1091
1092        H::handle_server_aux_data(&mut settings, &server_msg)?;
1093
1094        let BoxedClientLayer { fwd, back, binding } = settings
1095            .relay_crypt_protocol()
1096            .construct_client_layers(HandshakeRole::Initiator, keygen)?;
1097
1098        trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
1099
1100        let peer_id = self.channel.target().clone();
1101
1102        self.add_hop(
1103            path::HopDetail::Relay(peer_id),
1104            fwd,
1105            back,
1106            binding,
1107            &settings,
1108        )?;
1109        Ok(())
1110    }
1111
1112    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1113    /// first hop of this circuit.
1114    ///
1115    /// There's no authentication in CREATE_FAST,
1116    /// so we don't need to know whom we're connecting to: we're just
1117    /// connecting to whichever relay the channel is for.
1118    async fn create_firsthop_fast(
1119        &mut self,
1120        recvcreated: oneshot::Receiver<CreateResponse>,
1121        settings: HopSettings,
1122    ) -> Result<()> {
1123        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1124        let wrap = CreateFastWrap;
1125        self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
1126            .await
1127    }
1128
1129    /// Use the ntor handshake to connect to the first hop of this circuit.
1130    ///
1131    /// Note that the provided keys must match the channel's target,
1132    /// or the handshake will fail.
1133    async fn create_firsthop_ntor(
1134        &mut self,
1135        recvcreated: oneshot::Receiver<CreateResponse>,
1136        ed_identity: pk::ed25519::Ed25519Identity,
1137        pubkey: NtorPublicKey,
1138        settings: HopSettings,
1139    ) -> Result<()> {
1140        // Exit now if we have an Ed25519 or RSA identity mismatch.
1141        let target = RelayIds::builder()
1142            .ed_identity(ed_identity)
1143            .rsa_identity(pubkey.id)
1144            .build()
1145            .expect("Unable to build RelayIds");
1146        self.channel.check_match(&target)?;
1147
1148        let wrap = Create2Wrap {
1149            handshake_type: HandshakeType::NTOR,
1150        };
1151        self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
1152            .await
1153    }
1154
1155    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1156    ///
1157    /// Note that the provided key must match the channel's target,
1158    /// or the handshake will fail.
1159    async fn create_firsthop_ntor_v3(
1160        &mut self,
1161        recvcreated: oneshot::Receiver<CreateResponse>,
1162        pubkey: NtorV3PublicKey,
1163        settings: HopSettings,
1164    ) -> Result<()> {
1165        // Exit now if we have a mismatched key.
1166        let target = RelayIds::builder()
1167            .ed_identity(pubkey.id)
1168            .build()
1169            .expect("Unable to build RelayIds");
1170        self.channel.check_match(&target)?;
1171
1172        // Set the client extensions.
1173        let client_extensions = settings.circuit_request_extensions()?;
1174        let wrap = Create2Wrap {
1175            handshake_type: HandshakeType::NTOR_V3,
1176        };
1177
1178        self.create_impl::<NtorV3Client, _, _>(
1179            recvcreated,
1180            &wrap,
1181            &pubkey,
1182            settings,
1183            &client_extensions,
1184        )
1185        .await
1186    }
1187
1188    /// Add a hop to the end of this circuit.
1189    ///
1190    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1191    pub(super) fn add_hop(
1192        &mut self,
1193        peer_id: path::HopDetail,
1194        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1195        rev: Box<dyn InboundClientLayer + 'static + Send>,
1196        binding: Option<CircuitBinding>,
1197        settings: &HopSettings,
1198    ) -> StdResult<(), Bug> {
1199        let hop_num = self.hops.len();
1200        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1201
1202        // There are several places in the code that assume that a `usize` hop number
1203        // can be cast or converted to a `u8` hop number,
1204        // so this check is important to prevent panics or incorrect behaviour.
1205        if hop_num == usize::from(u8::MAX) {
1206            return Err(internal!(
1207                "cannot add more hops to a circuit with `u8::MAX` hops"
1208            ));
1209        }
1210
1211        let hop_num = (hop_num as u8).into();
1212
1213        let hop = CircHop::new(self.unique_id, hop_num, settings);
1214        self.hops.push(hop);
1215        self.crypto_in.add_layer(rev);
1216        self.crypto_out.add_layer(fwd);
1217        self.mutable.add_hop(peer_id, binding);
1218
1219        Ok(())
1220    }
1221
1222    /// Handle a RELAY cell on this circuit with stream ID 0.
1223    ///
1224    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
1225    /// This function returns a `CircProto` error if `msg` is an unsupported,
1226    /// unexpected, or otherwise invalid message:
1227    ///
1228    ///   * unexpected messages are rejected by returning an error using
1229    ///     [`unsupported_client_cell`]
1230    ///   * SENDME/TRUNCATED messages are rejected if they don't parse
1231    ///   * SENDME authentication tags are validated inside [`Circuit::handle_sendme`]
1232    ///   * conflux cells are handled in the client [`ConfluxMsgHandler`]
1233    ///
1234    /// The error is propagated all the way up to [`Circuit::handle_cell`],
1235    /// and eventually ends up being returned from the reactor's `run_once` function,
1236    /// causing it to shut down.
1237    #[allow(clippy::cognitive_complexity)]
1238    fn handle_meta_cell(
1239        &mut self,
1240        handlers: &mut CellHandlers,
1241        hopnum: HopNum,
1242        msg: UnparsedRelayMsg,
1243    ) -> Result<Option<CircuitCmd>> {
1244        // SENDME cells and TRUNCATED get handled internally by the circuit.
1245
1246        // TODO: This pattern (Check command, try to decode, map error) occurs
1247        // several times, and would be good to extract simplify. Such
1248        // simplification is obstructed by a couple of factors: First, that
1249        // there is not currently a good way to get the RelayCmd from _type_ of
1250        // a RelayMsg.  Second, that decode() [correctly] consumes the
1251        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
1252        // for it. -nickm
1253        if msg.cmd() == RelayCmd::SENDME {
1254            let sendme = msg
1255                .decode::<Sendme>()
1256                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1257                .into_msg();
1258
1259            return Ok(Some(CircuitCmd::HandleSendMe {
1260                hop: hopnum,
1261                sendme,
1262            }));
1263        }
1264        if msg.cmd() == RelayCmd::TRUNCATED {
1265            let truncated = msg
1266                .decode::<Truncated>()
1267                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1268                .into_msg();
1269            let reason = truncated.reason();
1270            debug!(
1271                circ_id = %self.unique_id,
1272                "Truncated from hop {}. Reason: {} [{}]",
1273                hopnum.display(),
1274                reason.human_str(),
1275                reason
1276            );
1277
1278            return Ok(Some(CircuitCmd::CleanShutdown));
1279        }
1280
1281        if msg.cmd() == RelayCmd::DROP {
1282            cfg_if::cfg_if! {
1283                if #[cfg(feature = "circ-padding")] {
1284                    return Ok(None);
1285                } else {
1286                    use crate::util::err::ExcessPadding;
1287                    return Err(Error::ExcessPadding(ExcessPadding::NoPaddingNegotiated, hopnum));
1288                }
1289            }
1290        }
1291
1292        trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1293
1294        #[cfg(feature = "conflux")]
1295        if matches!(
1296            msg.cmd(),
1297            RelayCmd::CONFLUX_LINK
1298                | RelayCmd::CONFLUX_LINKED
1299                | RelayCmd::CONFLUX_LINKED_ACK
1300                | RelayCmd::CONFLUX_SWITCH
1301        ) {
1302            let cmd = self.handle_conflux_msg(hopnum, msg)?;
1303            return Ok(cmd.map(CircuitCmd::from));
1304        }
1305
1306        if self.is_conflux_pending() {
1307            warn!(
1308                circ_id = %self.unique_id,
1309                "received unexpected cell {msg:?} on unlinked conflux circuit",
1310            );
1311            return Err(Error::CircProto(
1312                "Received unexpected cell on unlinked circuit".into(),
1313            ));
1314        }
1315
1316        // For all other command types, we'll only get them in response
1317        // to another command, which should have registered a responder.
1318        //
1319        // TODO: should the conflux state machine be a meta cell handler?
1320        // We'd need to add support for multiple meta handlers, and change the
1321        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
1322        // (because some cells will require sending a response)
1323        if let Some(mut handler) = handlers.meta_handler.take() {
1324            // The handler has a TargetHop so we do a quick convert for equality check.
1325            if handler.expected_hop() == (self.unique_id(), hopnum).into() {
1326                // Somebody was waiting for a message -- maybe this message
1327                let ret = handler.handle_msg(msg, self);
1328                trace!(
1329                    circ_id = %self.unique_id,
1330                    result = ?ret,
1331                    "meta handler completed",
1332                );
1333                match ret {
1334                    #[cfg(feature = "send-control-msg")]
1335                    Ok(MetaCellDisposition::Consumed) => {
1336                        handlers.meta_handler = Some(handler);
1337                        Ok(None)
1338                    }
1339                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1340                    #[cfg(feature = "send-control-msg")]
1341                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1342                    Err(e) => Err(e),
1343                }
1344            } else {
1345                // Somebody wanted a message from a different hop!  Put this
1346                // one back.
1347                handlers.meta_handler = Some(handler);
1348
1349                unsupported_client_cell!(msg, hopnum)
1350            }
1351        } else {
1352            // No need to call shutdown here, since this error will
1353            // propagate to the reactor shut it down.
1354            unsupported_client_cell!(msg)
1355        }
1356    }
1357
1358    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1359    #[instrument(level = "trace", skip_all)]
1360    pub(super) fn handle_sendme(
1361        &mut self,
1362        hopnum: HopNum,
1363        msg: Sendme,
1364        signals: CongestionSignals,
1365    ) -> Result<Option<CircuitCmd>> {
1366        // Cloned, because we borrow mutably from self when we get the circhop.
1367        let runtime = self.runtime.clone();
1368
1369        // No need to call "shutdown" on errors in this function;
1370        // it's called from the reactor task and errors will propagate there.
1371        let hop = self
1372            .hop_mut(hopnum)
1373            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1374
1375        let tag = msg.into_sendme_tag().ok_or_else(||
1376                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1377                // but we don't support those any longer.
1378                 Error::CircProto("missing tag on circuit sendme".into()))?;
1379        // Update the CC object that we received a SENDME along with possible congestion signals.
1380        hop.ccontrol()
1381            .note_sendme_received(&runtime, tag, signals)?;
1382        Ok(None)
1383    }
1384
1385    /// Send a message onto the circuit's channel.
1386    ///
1387    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1388    /// will be enqueued for sending at a later iteration of the reactor loop.
1389    ///
1390    /// `info` is the status returned from the padding controller when we told it we were queueing
1391    /// this data.  It should be provided whenever possible.
1392    ///
1393    /// # Note
1394    ///
1395    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1396    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1397    /// ideally use this to implement backpressure (such that you do not read from other sources
1398    /// that would send here while you know you're unable to forward the messages on).
1399    #[instrument(level = "trace", skip_all)]
1400    async fn send_msg(
1401        &mut self,
1402        msg: AnyChanMsg,
1403        info: Option<QueuedCellPaddingInfo>,
1404    ) -> Result<()> {
1405        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1406        // Note: this future is always `Ready`, so await won't block.
1407        Pin::new(&mut self.chan_sender)
1408            .send_unbounded((cell, info))
1409            .await?;
1410        Ok(())
1411    }
1412
1413    /// Remove all halfstreams that are expired at `now`.
1414    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
1415        self.hops.remove_expired_halfstreams(now);
1416    }
1417
1418    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
1419    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1420        self.hops.hop(hopnum)
1421    }
1422
1423    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1424    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1425        self.hops.get_mut(hopnum)
1426    }
1427
1428    /// Begin a stream with the provided hop in this circuit.
1429    // TODO: see if there's a way that we can clean this up
1430    #[allow(clippy::too_many_arguments)]
1431    pub(super) fn begin_stream(
1432        &mut self,
1433        hop_num: HopNum,
1434        message: AnyRelayMsg,
1435        sender: StreamQueueSender,
1436        rx: StreamMpscReceiver<AnyRelayMsg>,
1437        rate_limit_notifier: watch::Sender<StreamRateLimit>,
1438        drain_rate_requester: NotifySender<DrainRateRequest>,
1439        cmd_checker: AnyCmdChecker,
1440    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1441        let Some(hop) = self.hop_mut(hop_num) else {
1442            return Err(internal!(
1443                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1444                self.unique_id,
1445            ));
1446        };
1447
1448        Ok(hop.begin_stream(
1449            message,
1450            sender,
1451            rx,
1452            rate_limit_notifier,
1453            drain_rate_requester,
1454            cmd_checker,
1455        ))
1456    }
1457
1458    /// Close the specified stream
1459    #[instrument(level = "trace", skip_all)]
1460    pub(super) async fn close_stream(
1461        &mut self,
1462        hop_num: HopNum,
1463        sid: StreamId,
1464        behav: CloseStreamBehavior,
1465        reason: streammap::TerminateReason,
1466        expiry: Instant,
1467    ) -> Result<()> {
1468        if let Some(hop) = self.hop_mut(hop_num) {
1469            let res = hop.close_stream(sid, behav, reason, expiry)?;
1470            if let Some(cell) = res {
1471                self.send_relay_cell(cell).await?;
1472            }
1473        }
1474        Ok(())
1475    }
1476
1477    /// Returns true if there are any streams on this circuit
1478    ///
1479    /// Important: this function locks the stream map of its each of the [`CircHop`]s
1480    /// in this circuit, so it must **not** be called from any function where the
1481    /// stream map lock is held.
1482    pub(super) fn has_streams(&self) -> bool {
1483        self.hops.has_streams()
1484    }
1485
1486    /// The number of hops in this circuit.
1487    pub(super) fn num_hops(&self) -> u8 {
1488        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
1489        // so `self.hops.len()` should be safe to cast to a `u8`.
1490        // If that assumption is violated,
1491        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1492        self.hops
1493            .len()
1494            .try_into()
1495            .expect("`hops.len()` has more than `u8::MAX` hops")
1496    }
1497
1498    /// Check whether this circuit has any hops.
1499    pub(super) fn has_hops(&self) -> bool {
1500        !self.hops.is_empty()
1501    }
1502
1503    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
1504    ///
1505    /// Returns `None` if the circuit has no hops.
1506    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1507        let num_hops = self.num_hops();
1508        if num_hops == 0 {
1509            // asked for the last hop, but there are no hops
1510            return None;
1511        }
1512        Some(HopNum::from(num_hops - 1))
1513    }
1514
1515    /// Get the path of the circuit.
1516    ///
1517    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
1518    pub(super) fn path(&self) -> Arc<path::Path> {
1519        self.mutable.path()
1520    }
1521
1522    /// Return a ClockSkew declaring how much clock skew the other side of this channel
1523    /// claimed that we had when we negotiated the connection.
1524    pub(super) fn clock_skew(&self) -> ClockSkew {
1525        self.channel.clock_skew()
1526    }
1527
1528    /// Does congestion control use stream SENDMEs for the given `hop`?
1529    ///
1530    /// Returns `None` if `hop` doesn't exist.
1531    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1532        let hop = self.hop(hop)?;
1533        Some(hop.ccontrol().uses_stream_sendme())
1534    }
1535
1536    /// Returns whether this is a conflux circuit that is not linked yet.
1537    pub(super) fn is_conflux_pending(&self) -> bool {
1538        let Some(status) = self.conflux_status() else {
1539            return false;
1540        };
1541
1542        status != ConfluxStatus::Linked
1543    }
1544
1545    /// Returns the conflux status of this circuit.
1546    ///
1547    /// Returns `None` if this is not a conflux circuit.
1548    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1549        cfg_if::cfg_if! {
1550            if #[cfg(feature = "conflux")] {
1551                self.conflux_handler
1552                    .as_ref()
1553                    .map(|handler| handler.status())
1554            } else {
1555                None
1556            }
1557        }
1558    }
1559
1560    /// Returns initial RTT on this leg, measured in the conflux handshake.
1561    #[cfg(feature = "conflux")]
1562    pub(super) fn init_rtt(&self) -> Option<Duration> {
1563        self.conflux_handler
1564            .as_ref()
1565            .map(|handler| handler.init_rtt())?
1566    }
1567
1568    /// Start or stop padding at the given hop.
1569    ///
1570    /// Replaces any previous padder at that hop.
1571    ///
1572    /// Return an error if that hop doesn't exist.
1573    #[cfg(feature = "circ-padding-manual")]
1574    pub(super) fn set_padding_at_hop(
1575        &self,
1576        hop: HopNum,
1577        padder: Option<padding::CircuitPadder>,
1578    ) -> Result<()> {
1579        if self.hop(hop).is_none() {
1580            return Err(Error::NoSuchHop);
1581        }
1582        self.padding_ctrl.install_padder_padding_at_hop(hop, padder);
1583        Ok(())
1584    }
1585
1586    /// Determine how exactly to handle a request to handle padding.
1587    ///
1588    /// This is fairly complicated; see the maybenot documentation for more information.
1589    ///
1590    /// ## Limitations
1591    ///
1592    /// In our current padding implementation, a circuit is either blocked or not blocked:
1593    /// we do not keep track of which hop is actually doing the blocking.
1594    #[cfg(feature = "circ-padding")]
1595    fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
1596        crate::circuit::padding::padding_disposition(
1597            send_padding,
1598            &self.chan_sender,
1599            self.padding_block.as_ref(),
1600        )
1601    }
1602
1603    /// Handle a request from our padding subsystem to send a padding packet.
1604    #[cfg(feature = "circ-padding")]
1605    pub(super) async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
1606        use CircPaddingDisposition::*;
1607
1608        let target_hop = send_padding.hop;
1609
1610        match self.padding_disposition(&send_padding) {
1611            QueuePaddingNormally => {
1612                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
1613                self.queue_padding_cell_for_hop(target_hop, queue_info)
1614                    .await?;
1615            }
1616            QueuePaddingAndBypass => {
1617                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
1618                self.queue_padding_cell_for_hop(target_hop, queue_info)
1619                    .await?;
1620            }
1621            TreatQueuedCellAsPadding => {
1622                self.padding_ctrl
1623                    .replaceable_padding_already_queued(target_hop, send_padding);
1624            }
1625        }
1626        Ok(())
1627    }
1628
1629    /// Generate and encrypt a padding cell, and send it to a targeted hop.
1630    ///
1631    /// Ignores any padding-based blocking.
1632    #[cfg(feature = "circ-padding")]
1633    async fn queue_padding_cell_for_hop(
1634        &mut self,
1635        target_hop: HopNum,
1636        queue_info: Option<QueuedCellPaddingInfo>,
1637    ) -> Result<()> {
1638        use tor_cell::relaycell::msg::Drop as DropMsg;
1639        let msg = SendRelayCell {
1640            hop: Some(target_hop),
1641            // TODO circpad: we will probably want padding machines that can send EARLY cells.
1642            early: false,
1643            cell: AnyRelayMsgOuter::new(None, DropMsg::default().into()),
1644        };
1645        self.send_relay_cell_inner(msg, queue_info).await
1646    }
1647
1648    /// Enable padding-based blocking,
1649    /// or change the rule for padding-based blocking to the one in `block`.
1650    #[cfg(feature = "circ-padding")]
1651    pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
1652        self.chan_sender.start_blocking();
1653        self.padding_block = Some(block);
1654    }
1655
1656    /// Disable padding-based blocking.
1657    #[cfg(feature = "circ-padding")]
1658    pub(super) fn stop_blocking_for_padding(&mut self) {
1659        self.chan_sender.stop_blocking();
1660        self.padding_block = None;
1661    }
1662
1663    /// The estimated circuit build timeout for a circuit of the specified length.
1664    pub(super) fn estimate_cbt(&self, length: usize) -> Duration {
1665        self.timeouts.circuit_build_timeout(length)
1666    }
1667}
1668
1669impl Drop for Circuit {
1670    fn drop(&mut self) {
1671        let _ = self.channel.close_circuit(self.channel_id);
1672    }
1673}