Skip to main content

tor_proto/client/reactor/
control.rs

1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6    RunOnceCmdInner, SendRelayCell,
7};
8use crate::Result;
9use crate::circuit::celltypes::CreateResponse;
10use crate::circuit::circhop::HopSettings;
11#[cfg(feature = "circ-padding-manual")]
12use crate::client::circuit::padding;
13use crate::client::circuit::path;
14use crate::client::reactor::{NoJoinPointError, NtorClient, ReactorError};
15use crate::client::{HopLocation, TargetHop};
16use crate::crypto::binding::CircuitBinding;
17use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer};
18use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19use crate::stream::cmdcheck::AnyCmdChecker;
20use crate::stream::flow_ctrl::state::StreamRateLimit;
21use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
22use crate::stream::queue::StreamQueueSender;
23use crate::streammap;
24use crate::util::notify::NotifySender;
25use crate::util::skew::ClockSkew;
26use crate::util::tunnel_activity::TunnelActivity;
27#[cfg(test)]
28use crate::{circuit::UniqId, client::circuit::CircParameters, crypto::cell::HopNum};
29use postage::watch;
30use tor_cell::chancell::msg::HandshakeType;
31use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
32use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
33use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
34use tor_error::{Bug, bad_api_usage, internal, into_bad_api_usage};
35use tracing::{debug, trace};
36#[cfg(feature = "hs-service")]
37use {
38    crate::client::reactor::IncomingStreamRequestHandler,
39    crate::client::stream::IncomingStreamRequestFilter, crate::stream::incoming::StreamReqSender,
40};
41
42#[cfg(test)]
43use tor_cell::relaycell::msg::SendmeTag;
44
45#[cfg(feature = "conflux")]
46use super::{Circuit, ConfluxLinkResultChannel};
47
48use oneshot_fused_workaround as oneshot;
49
50use crate::crypto::handshake::ntor::NtorPublicKey;
51use crate::stream::StreamMpscReceiver;
52use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
53
54use std::result::Result as StdResult;
55
56/// A message telling the reactor to do something.
57///
58/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
59///
60/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
61/// cause the reactor to send cells on the reactor's `chan_sender`,
62/// whereas `CtrlCmd` do not.
63#[derive(educe::Educe)]
64#[educe(Debug)]
65pub(crate) enum CtrlMsg {
66    /// Create the first hop of this circuit.
67    Create {
68        /// A oneshot channel on which we'll receive the creation response.
69        recv_created: oneshot::Receiver<CreateResponse>,
70        /// The handshake type to use for the first hop.
71        handshake: CircuitHandshake,
72        /// Other parameters relevant for circuit creation.
73        settings: HopSettings,
74        /// Oneshot channel to notify on completion.
75        done: ReactorResultChannel<()>,
76    },
77    /// Extend a circuit by one hop, using the ntor handshake.
78    ExtendNtor {
79        /// The peer that we're extending to.
80        ///
81        /// Used to extend our record of the circuit's path.
82        peer_id: OwnedChanTarget,
83        /// The handshake type to use for this hop.
84        public_key: NtorPublicKey,
85        /// Information about how to connect to the relay we're extending to.
86        linkspecs: Vec<EncodedLinkSpec>,
87        /// Other parameters we are negotiating.
88        settings: HopSettings,
89        /// Oneshot channel to notify on completion.
90        done: ReactorResultChannel<()>,
91    },
92    /// Extend a circuit by one hop, using the ntorv3 handshake.
93    ExtendNtorV3 {
94        /// The peer that we're extending to.
95        ///
96        /// Used to extend our record of the circuit's path.
97        peer_id: OwnedChanTarget,
98        /// The handshake type to use for this hop.
99        public_key: NtorV3PublicKey,
100        /// Information about how to connect to the relay we're extending to.
101        linkspecs: Vec<EncodedLinkSpec>,
102        /// Other parameters we are negotiating.
103        settings: HopSettings,
104        /// Oneshot channel to notify on completion.
105        done: ReactorResultChannel<()>,
106    },
107    /// Begin a stream with the provided hop in this circuit.
108    ///
109    /// Allocates a stream ID, and sends the provided message to that hop.
110    BeginStream {
111        /// The hop number to begin the stream with.
112        hop: TargetHop,
113        /// The message to send.
114        message: AnyRelayMsg,
115        /// A channel to send messages on this stream down.
116        ///
117        /// This sender shouldn't ever block, because we use congestion control and only send
118        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
119        /// can assume someone is trying to send us more cells than they should, and abort
120        /// the stream.
121        sender: StreamQueueSender,
122        /// A channel to receive messages to send on this stream from.
123        rx: StreamMpscReceiver<AnyRelayMsg>,
124        /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
125        rate_limit_notifier: watch::Sender<StreamRateLimit>,
126        /// Notifies the stream reader when it should send a new drain rate.
127        drain_rate_requester: NotifySender<DrainRateRequest>,
128        /// Oneshot channel to notify on completion, with the allocated stream ID.
129        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
130        /// A `CmdChecker` to keep track of which message types are acceptable.
131        cmd_checker: AnyCmdChecker,
132    },
133    /// Close the specified pending incoming stream, sending the provided END message.
134    ///
135    /// A stream is said to be pending if the message for initiating the stream was received but
136    /// not has not been responded to yet.
137    ///
138    /// This should be used by responders for closing pending incoming streams initiated by the
139    /// other party on the circuit.
140    #[cfg(feature = "hs-service")]
141    ClosePendingStream {
142        /// The hop number the stream is on.
143        hop: HopLocation,
144        /// The stream ID to send the END for.
145        stream_id: StreamId,
146        /// The END message to send, if any.
147        message: CloseStreamBehavior,
148        /// Oneshot channel to notify on completion.
149        done: ReactorResultChannel<()>,
150    },
151    /// Send a given control message on this circuit.
152    #[cfg(feature = "send-control-msg")]
153    SendMsg {
154        /// The hop to receive this message.
155        hop: TargetHop,
156        /// The message to send.
157        msg: AnyRelayMsg,
158        /// A sender that we use to tell the caller that the message was sent
159        /// and the handler installed.
160        sender: oneshot::Sender<Result<()>>,
161    },
162    /// Send a given control message on this circuit, and install a control-message handler to
163    /// receive responses.
164    #[cfg(feature = "send-control-msg")]
165    SendMsgAndInstallHandler {
166        /// The message to send, if any
167        msg: Option<AnyRelayMsgOuter>,
168        /// A message handler to install.
169        ///
170        /// If this is `None`, there must already be a message handler installed
171        #[educe(Debug(ignore))]
172        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
173        /// A sender that we use to tell the caller that the message was sent
174        /// and the handler installed.
175        sender: oneshot::Sender<Result<()>>,
176    },
177    /// Inform the reactor that there's a flow control update for a given stream.
178    ///
179    /// The reactor will decide how to handle this update depending on the type of flow control and
180    /// the current state of the stream.
181    FlowCtrlUpdate {
182        /// The type of flow control update, and any associated metadata.
183        msg: FlowCtrlMsg,
184        /// The stream ID that the update is for.
185        stream_id: StreamId,
186        /// The hop that the stream is on.
187        hop: HopLocation,
188    },
189    /// Get the clock skew claimed by the first hop of the circuit.
190    FirstHopClockSkew {
191        /// Oneshot channel to return the clock skew.
192        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
193    },
194    /// Link the specified circuits into the current tunnel,
195    /// to form a multi-path tunnel.
196    #[cfg(feature = "conflux")]
197    #[allow(unused)] // TODO(conflux)
198    LinkCircuits {
199        /// The circuits to link into the tunnel,
200        #[educe(Debug(ignore))]
201        circuits: Vec<Circuit>,
202        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
203        /// or have failed to link.
204        ///
205        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
206        /// (see [set construction]).
207        ///
208        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
209        answer: ConfluxLinkResultChannel,
210    },
211}
212
213/// A message telling the reactor to do something.
214///
215/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
216/// never cause cells to sent on the channel,
217/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
218/// some of which instruct the reactor to send cells down the channel.
219#[derive(educe::Educe)]
220#[educe(Debug)]
221pub(crate) enum CtrlCmd {
222    /// Shut down the reactor.
223    Shutdown,
224    /// Extend the circuit by one hop, in response to an out-of-band handshake.
225    ///
226    /// (This is used for onion services, where the negotiation takes place in
227    /// INTRODUCE and RENDEZVOUS messages.)
228    #[cfg(feature = "hs-common")]
229    ExtendVirtual {
230        /// The cryptographic algorithms and keys to use when communicating with
231        /// the newly added hop.
232        #[educe(Debug(ignore))]
233        cell_crypto: (
234            Box<dyn OutboundClientLayer + Send>,
235            Box<dyn InboundClientLayer + Send>,
236            Option<CircuitBinding>,
237        ),
238        /// A set of parameters to negotiate with this hop.
239        settings: HopSettings,
240        /// Oneshot channel to notify on completion.
241        done: ReactorResultChannel<()>,
242    },
243    /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
244    ResolveTargetHop {
245        /// The target hop to resolve.
246        hop: TargetHop,
247        /// Oneshot channel to notify on completion.
248        done: ReactorResultChannel<HopLocation>,
249    },
250    /// Begin accepting streams on this circuit.
251    #[cfg(feature = "hs-service")]
252    AwaitStreamRequest {
253        /// A channel for sending information about an incoming stream request.
254        incoming_sender: StreamReqSender,
255        /// A `CmdChecker` to keep track of which message types are acceptable.
256        cmd_checker: AnyCmdChecker,
257        /// Oneshot channel to notify on completion.
258        done: ReactorResultChannel<()>,
259        /// The hop that is allowed to create streams.
260        hop: TargetHop,
261        /// A filter used to check requests before passing them on.
262        #[educe(Debug(ignore))]
263        #[cfg(feature = "hs-service")]
264        filter: Box<dyn IncomingStreamRequestFilter>,
265    },
266    /// Request the binding key of a target hop.
267    #[cfg(feature = "hs-service")]
268    GetBindingKey {
269        /// The hop for which we want the key.
270        hop: TargetHop,
271        /// Oneshot channel to notify on completion.
272        done: ReactorResultChannel<Option<CircuitBinding>>,
273    },
274    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
275    #[cfg(test)]
276    AddFakeHop {
277        relay_cell_format: RelayCellFormat,
278        fwd_lasthop: bool,
279        rev_lasthop: bool,
280        peer_id: path::HopDetail,
281        params: CircParameters,
282        done: ReactorResultChannel<()>,
283    },
284    /// (tests only) Get the send window and expected tags for a given hop.
285    #[cfg(test)]
286    QuerySendWindow {
287        hop: HopNum,
288        leg: UniqId,
289        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
290    },
291    /// Shut down the reactor, and return the underlying [`Circuit`],
292    /// if the tunnel is not multi-path.
293    ///
294    /// Returns an error if called on a multi-path reactor.
295    #[cfg(feature = "conflux")]
296    #[allow(unused)] // TODO(conflux)
297    ShutdownAndReturnCircuit {
298        /// Oneshot channel to return the underlying [`Circuit`],
299        /// or an error if the reactor's tunnel is multi-path.
300        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
301    },
302
303    /// Install or remove a [`padding::CircuitPadder`] for a given hop.
304    ///
305    /// Any existing `CircuitPadder` at that hop is replaced.
306    #[cfg(feature = "circ-padding-manual")]
307    SetPadder {
308        /// The hop to modify.
309        hop: HopLocation,
310        /// The Padder to install, or None to remove any existing padder.
311        padder: Option<padding::CircuitPadder>,
312        /// A sender to alert after we've changed the padding.
313        sender: oneshot::Sender<Result<()>>,
314    },
315
316    /// Yield the most active [`TunnelActivity`] for any hop on any leg of this tunnel.
317    GetTunnelActivity {
318        /// A sender to receive the reply.
319        sender: oneshot::Sender<TunnelActivity>,
320    },
321}
322
323/// A flow control update message.
324#[derive(Debug)]
325pub(crate) enum FlowCtrlMsg {
326    /// Send a SENDME message on this stream.
327    Sendme,
328    /// Send an XON message on this stream with the given rate.
329    Xon(XonKbpsEwma),
330}
331
332/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
333///
334/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
335/// respectively, are handled.
336pub(crate) struct ControlHandler<'a> {
337    /// Reference to the reactor of this
338    reactor: &'a mut Reactor,
339}
340
341impl<'a> ControlHandler<'a> {
342    /// Constructor.
343    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
344        Self { reactor }
345    }
346
347    /// Handle a control message.
348    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
349        trace!(
350            tunnel_id = %self.reactor.tunnel_id,
351            msg = ?msg,
352            "reactor received control message"
353        );
354
355        match msg {
356            // This is handled earlier, since it requires blocking.
357            CtrlMsg::Create { done, .. } => {
358                if self.reactor.circuits.len() == 1 {
359                    // This should've been handled in Reactor::run_once()
360                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
361                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
362                    // Don't care if the receiver goes away
363                    let _ = done.send(Err(tor_error::bad_api_usage!(
364                        "cannot create first hop twice"
365                    )
366                    .into()));
367                } else {
368                    // Don't care if the receiver goes away
369                    let _ = done.send(Err(tor_error::bad_api_usage!(
370                        "cannot create first hop on multipath tunnel"
371                    )
372                    .into()));
373                }
374
375                Ok(None)
376            }
377            CtrlMsg::ExtendNtor {
378                peer_id,
379                public_key,
380                linkspecs,
381                settings,
382                done,
383            } => {
384                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
385                    // Don't care if the receiver goes away
386                    let _ = done.send(Err(tor_error::bad_api_usage!(
387                        "cannot extend multipath tunnel"
388                    )
389                    .into()));
390
391                    return Ok(None);
392                };
393
394                let (extender, cell) = CircuitExtender::<NtorClient>::begin(
395                    peer_id,
396                    HandshakeType::NTOR,
397                    &public_key,
398                    linkspecs,
399                    settings,
400                    &(),
401                    circ,
402                    done,
403                )?;
404                self.reactor
405                    .cell_handlers
406                    .set_meta_handler(Box::new(extender))?;
407
408                Ok(Some(RunOnceCmdInner::Send {
409                    leg: circ.unique_id(),
410                    cell,
411                    done: None,
412                }))
413            }
414            CtrlMsg::ExtendNtorV3 {
415                peer_id,
416                public_key,
417                linkspecs,
418                settings,
419                done,
420            } => {
421                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
422                    // Don't care if the receiver goes away
423                    let _ = done.send(Err(tor_error::bad_api_usage!(
424                        "cannot extend multipath tunnel"
425                    )
426                    .into()));
427
428                    return Ok(None);
429                };
430
431                let client_extensions = settings.circuit_request_extensions()?;
432
433                let (extender, cell) = CircuitExtender::<NtorV3Client>::begin(
434                    peer_id,
435                    HandshakeType::NTOR_V3,
436                    &public_key,
437                    linkspecs,
438                    settings,
439                    &client_extensions,
440                    circ,
441                    done,
442                )?;
443                self.reactor
444                    .cell_handlers
445                    .set_meta_handler(Box::new(extender))?;
446
447                Ok(Some(RunOnceCmdInner::Send {
448                    leg: circ.unique_id(),
449                    cell,
450                    done: None,
451                }))
452            }
453            CtrlMsg::BeginStream {
454                hop,
455                message,
456                sender,
457                rx,
458                rate_limit_notifier,
459                drain_rate_requester,
460                done,
461                cmd_checker,
462            } => {
463                // If resolving the hop fails,
464                // we want to report an error back to the initiator and not shut down the reactor.
465                let hop_location = match self.reactor.resolve_target_hop(hop) {
466                    Ok(x) => x,
467                    Err(e) => {
468                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
469                        // don't care if receiver goes away
470                        let _ = done.send(Err(e.into()));
471                        return Ok(None);
472                    }
473                };
474                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
475                    Ok(x) => x,
476                    Err(e) => {
477                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
478                        // don't care if receiver goes away
479                        let _ = done.send(Err(e.into()));
480                        return Ok(None);
481                    }
482                };
483                let circ = match self.reactor.circuits.leg_mut(leg_id) {
484                    Some(x) => x,
485                    None => {
486                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
487                        // don't care if receiver goes away
488                        let _ = done.send(Err(e.into()));
489                        return Ok(None);
490                    }
491                };
492
493                let cell = circ.begin_stream(
494                    hop_num,
495                    message,
496                    sender,
497                    rx,
498                    rate_limit_notifier,
499                    drain_rate_requester,
500                    cmd_checker,
501                )?;
502                Ok(Some(RunOnceCmdInner::BeginStream {
503                    leg: leg_id,
504                    cell,
505                    hop: hop_location,
506                    done,
507                }))
508            }
509            #[cfg(feature = "hs-service")]
510            CtrlMsg::ClosePendingStream {
511                hop,
512                stream_id,
513                message,
514                done,
515            } => Ok(Some(RunOnceCmdInner::CloseStream {
516                hop,
517                sid: stream_id,
518                behav: message,
519                reason: streammap::TerminateReason::ExplicitEnd,
520                done: Some(done),
521            })),
522            CtrlMsg::FlowCtrlUpdate {
523                msg,
524                stream_id,
525                hop,
526            } => {
527                match msg {
528                    FlowCtrlMsg::Sendme => {
529                        let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
530                            Ok(x) => x,
531                            Err(NoJoinPointError) => {
532                                // A stream tried to send a stream-level SENDME message to the join point of
533                                // a tunnel that has never had a join point. Currently in arti, only a
534                                // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
535                                // originally created the `StreamTarget` to begin with. So this is a
536                                // legitimate bug somewhere in the tunnel code.
537                                return Err(
538                                    internal!(
539                                        "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
540                                    )
541                                    .into()
542                                );
543                            }
544                        };
545
546                        // Congestion control decides if we can send stream level SENDMEs or not.
547                        let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
548                        {
549                            Some(x) => x,
550                            None => {
551                                // The leg/hop has disappeared. This is fine since the stream may have ended
552                                // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
553                                // It is possible that is a bug and this is an incorrect leg/hop number, but
554                                // it's not currently possible to differentiate between an incorrect leg/hop
555                                // number and a circuit hop that has been closed.
556                                debug!(
557                                    "Could not send a stream-level SENDME on a hop that does not exist. Ignoring."
558                                );
559                                return Ok(None);
560                            }
561                        };
562
563                        if !sendme_required {
564                            // Nothing to do, so discard the SENDME.
565                            return Ok(None);
566                        }
567
568                        let sendme = Sendme::new_empty();
569                        let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
570
571                        let cell = SendRelayCell {
572                            hop: Some(hop_num),
573                            early: false,
574                            cell,
575                        };
576
577                        Ok(Some(RunOnceCmdInner::Send {
578                            leg: leg_id,
579                            cell,
580                            done: None,
581                        }))
582                    }
583                    FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
584                        rate,
585                        hop,
586                        stream_id,
587                    })),
588                }
589            }
590            // TODO(conflux): this should specify which leg to send the msg on
591            // (currently we send it down the primary leg).
592            //
593            // This will involve updating ClientCIrc::send_raw_msg() to take a
594            // leg id argument (which is a breaking change.
595            #[cfg(feature = "send-control-msg")]
596            CtrlMsg::SendMsg { hop, msg, sender } => {
597                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
598                    // Don't care if receiver goes away
599                    let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
600                    return Ok(None);
601                };
602
603                let cell = AnyRelayMsgOuter::new(None, msg);
604                let cell = SendRelayCell {
605                    hop: Some(hop_num),
606                    early: false,
607                    cell,
608                };
609
610                Ok(Some(RunOnceCmdInner::Send {
611                    leg: leg_id,
612                    cell,
613                    done: Some(sender),
614                }))
615            }
616            // TODO(conflux): this should specify which leg to send the msg on
617            // (currently we send it down the primary leg)
618            #[cfg(feature = "send-control-msg")]
619            CtrlMsg::SendMsgAndInstallHandler {
620                msg,
621                handler,
622                sender,
623            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
624                msg,
625                handler,
626                done: sender,
627            })),
628            CtrlMsg::FirstHopClockSkew { answer } => {
629                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
630            }
631            #[cfg(feature = "conflux")]
632            CtrlMsg::LinkCircuits { circuits, answer } => {
633                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
634            }
635        }
636    }
637
638    /// Handle a control command.
639    #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
640    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
641        trace!(
642            tunnel_id = %self.reactor.tunnel_id,
643            msg = ?msg,
644            "reactor received control command"
645        );
646
647        match msg {
648            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
649            #[cfg(feature = "hs-common")]
650            #[allow(unreachable_code)]
651            CtrlCmd::ExtendVirtual {
652                cell_crypto,
653                settings,
654                done,
655            } => {
656                let (outbound, inbound, binding) = cell_crypto;
657
658                // TODO HS: Perhaps this should describe the onion service, or
659                // describe why the virtual hop was added, or something?
660                let peer_id = path::HopDetail::Virtual;
661
662                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
663                    // Don't care if the receiver goes away
664                    let _ = done.send(Err(tor_error::bad_api_usage!(
665                        "cannot extend multipath tunnel"
666                    )
667                    .into()));
668
669                    return Ok(());
670                };
671
672                leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
673                let _ = done.send(Ok(()));
674
675                Ok(())
676            }
677            CtrlCmd::ResolveTargetHop { hop, done } => {
678                let _ = done.send(
679                    self.reactor
680                        .resolve_target_hop(hop)
681                        .map_err(|_| crate::util::err::Error::NoSuchHop),
682                );
683                Ok(())
684            }
685            #[cfg(feature = "hs-service")]
686            CtrlCmd::AwaitStreamRequest {
687                cmd_checker,
688                incoming_sender,
689                hop,
690                done,
691                filter,
692            } => {
693                let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
694                    let _ = done.send(Err(crate::Error::NoSuchHop));
695                    return Ok(());
696                };
697                // TODO: At some point we might want to add a CtrlCmd for
698                // de-registering the handler.  See comments on `allow_stream_requests`.
699                let handler = IncomingStreamRequestHandler {
700                    incoming_sender,
701                    cmd_checker,
702                    hop_num: Some(hop_num),
703                    filter,
704                };
705
706                let ret = self
707                    .reactor
708                    .cell_handlers
709                    .set_incoming_stream_req_handler(handler);
710                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
711
712                Ok(())
713            }
714            #[cfg(feature = "hs-service")]
715            CtrlCmd::GetBindingKey { hop, done } => {
716                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
717                    let _ = done.send(Err(tor_error::internal!(
718                        "Unknown TargetHop when getting binding key"
719                    )
720                    .into()));
721                    return Ok(());
722                };
723                let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
724                    let _ = done.send(Err(tor_error::bad_api_usage!(
725                        "Unknown circuit id {leg_id} when getting binding key"
726                    )
727                    .into()));
728                    return Ok(());
729                };
730                // Get the binding key from the mutable state and send it back.
731                let key = circuit.mutable().binding_key(hop_num);
732                let _ = done.send(Ok(key));
733
734                Ok(())
735            }
736            #[cfg(test)]
737            CtrlCmd::AddFakeHop {
738                relay_cell_format,
739                fwd_lasthop,
740                rev_lasthop,
741                peer_id,
742                params,
743                done,
744            } => {
745                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
746                    // Don't care if the receiver goes away
747                    let _ = done.send(Err(tor_error::bad_api_usage!(
748                        "cannot add fake hop to multipath tunnel"
749                    )
750                    .into()));
751
752                    return Ok(());
753                };
754
755                leg.handle_add_fake_hop(
756                    relay_cell_format,
757                    fwd_lasthop,
758                    rev_lasthop,
759                    peer_id,
760                    &params,
761                    done,
762                );
763
764                Ok(())
765            }
766            #[cfg(test)]
767            CtrlCmd::QuerySendWindow { hop, leg, done } => {
768                // Immediately invoked function means that errors will be sent to the channel.
769                let _ = done.send((|| {
770                    let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
771                        bad_api_usage!("cannot query send window of non-existent circuit")
772                    })?;
773
774                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
775                        "received QuerySendWindow for unknown hop {}",
776                        hop.display()
777                    ))?;
778
779                    Ok(hop.send_window_and_expected_tags())
780                })());
781
782                Ok(())
783            }
784            #[cfg(feature = "conflux")]
785            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
786                self.reactor.handle_shutdown_and_return_circuit(answer)
787            }
788            #[cfg(feature = "circ-padding-manual")]
789            CtrlCmd::SetPadder {
790                hop,
791                padder,
792                sender,
793            } => {
794                let result = self.reactor.set_padding_at_hop(hop, padder);
795                let _ = sender.send(result);
796                Ok(())
797            }
798            CtrlCmd::GetTunnelActivity { sender } => {
799                let count = self.reactor.circuits.tunnel_activity();
800                let _ = sender.send(count);
801                Ok(())
802            }
803        }
804    }
805}