tor_proto/client/reactor/control.rs
1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5 CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6 RunOnceCmdInner, SendRelayCell,
7};
8use crate::Result;
9use crate::circuit::celltypes::CreateResponse;
10use crate::circuit::circhop::HopSettings;
11#[cfg(feature = "circ-padding-manual")]
12use crate::client::circuit::padding;
13use crate::client::circuit::path;
14use crate::client::reactor::{NoJoinPointError, NtorClient, ReactorError};
15use crate::client::{HopLocation, TargetHop};
16use crate::crypto::binding::CircuitBinding;
17use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer};
18use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19use crate::stream::cmdcheck::AnyCmdChecker;
20use crate::stream::flow_ctrl::state::StreamRateLimit;
21use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
22use crate::stream::queue::StreamQueueSender;
23use crate::streammap;
24use crate::util::notify::NotifySender;
25use crate::util::skew::ClockSkew;
26use crate::util::tunnel_activity::TunnelActivity;
27#[cfg(test)]
28use crate::{circuit::UniqId, client::circuit::CircParameters, crypto::cell::HopNum};
29use postage::watch;
30use tor_cell::chancell::msg::HandshakeType;
31use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
32use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
33use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
34use tor_error::{Bug, bad_api_usage, internal, into_bad_api_usage};
35use tracing::{debug, trace};
36#[cfg(feature = "hs-service")]
37use {
38 crate::client::reactor::IncomingStreamRequestHandler,
39 crate::client::stream::IncomingStreamRequestFilter, crate::stream::incoming::StreamReqSender,
40};
41
42#[cfg(test)]
43use tor_cell::relaycell::msg::SendmeTag;
44
45#[cfg(feature = "conflux")]
46use super::{Circuit, ConfluxLinkResultChannel};
47
48use oneshot_fused_workaround as oneshot;
49
50use crate::crypto::handshake::ntor::NtorPublicKey;
51use crate::stream::StreamMpscReceiver;
52use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
53
54use std::result::Result as StdResult;
55
56/// A message telling the reactor to do something.
57///
58/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
59///
60/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
61/// cause the reactor to send cells on the reactor's `chan_sender`,
62/// whereas `CtrlCmd` do not.
63#[derive(educe::Educe)]
64#[educe(Debug)]
65pub(crate) enum CtrlMsg {
66 /// Create the first hop of this circuit.
67 Create {
68 /// A oneshot channel on which we'll receive the creation response.
69 recv_created: oneshot::Receiver<CreateResponse>,
70 /// The handshake type to use for the first hop.
71 handshake: CircuitHandshake,
72 /// Other parameters relevant for circuit creation.
73 settings: HopSettings,
74 /// Oneshot channel to notify on completion.
75 done: ReactorResultChannel<()>,
76 },
77 /// Extend a circuit by one hop, using the ntor handshake.
78 ExtendNtor {
79 /// The peer that we're extending to.
80 ///
81 /// Used to extend our record of the circuit's path.
82 peer_id: OwnedChanTarget,
83 /// The handshake type to use for this hop.
84 public_key: NtorPublicKey,
85 /// Information about how to connect to the relay we're extending to.
86 linkspecs: Vec<EncodedLinkSpec>,
87 /// Other parameters we are negotiating.
88 settings: HopSettings,
89 /// Oneshot channel to notify on completion.
90 done: ReactorResultChannel<()>,
91 },
92 /// Extend a circuit by one hop, using the ntorv3 handshake.
93 ExtendNtorV3 {
94 /// The peer that we're extending to.
95 ///
96 /// Used to extend our record of the circuit's path.
97 peer_id: OwnedChanTarget,
98 /// The handshake type to use for this hop.
99 public_key: NtorV3PublicKey,
100 /// Information about how to connect to the relay we're extending to.
101 linkspecs: Vec<EncodedLinkSpec>,
102 /// Other parameters we are negotiating.
103 settings: HopSettings,
104 /// Oneshot channel to notify on completion.
105 done: ReactorResultChannel<()>,
106 },
107 /// Begin a stream with the provided hop in this circuit.
108 ///
109 /// Allocates a stream ID, and sends the provided message to that hop.
110 BeginStream {
111 /// The hop number to begin the stream with.
112 hop: TargetHop,
113 /// The message to send.
114 message: AnyRelayMsg,
115 /// A channel to send messages on this stream down.
116 ///
117 /// This sender shouldn't ever block, because we use congestion control and only send
118 /// SENDME cells once we've read enough out of the other end. If it *does* block, we
119 /// can assume someone is trying to send us more cells than they should, and abort
120 /// the stream.
121 sender: StreamQueueSender,
122 /// A channel to receive messages to send on this stream from.
123 rx: StreamMpscReceiver<AnyRelayMsg>,
124 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
125 rate_limit_notifier: watch::Sender<StreamRateLimit>,
126 /// Notifies the stream reader when it should send a new drain rate.
127 drain_rate_requester: NotifySender<DrainRateRequest>,
128 /// Oneshot channel to notify on completion, with the allocated stream ID.
129 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
130 /// A `CmdChecker` to keep track of which message types are acceptable.
131 cmd_checker: AnyCmdChecker,
132 },
133 /// Close the specified pending incoming stream, sending the provided END message.
134 ///
135 /// A stream is said to be pending if the message for initiating the stream was received but
136 /// not has not been responded to yet.
137 ///
138 /// This should be used by responders for closing pending incoming streams initiated by the
139 /// other party on the circuit.
140 #[cfg(feature = "hs-service")]
141 ClosePendingStream {
142 /// The hop number the stream is on.
143 hop: HopLocation,
144 /// The stream ID to send the END for.
145 stream_id: StreamId,
146 /// The END message to send, if any.
147 message: CloseStreamBehavior,
148 /// Oneshot channel to notify on completion.
149 done: ReactorResultChannel<()>,
150 },
151 /// Send a given control message on this circuit.
152 #[cfg(feature = "send-control-msg")]
153 SendMsg {
154 /// The hop to receive this message.
155 hop: TargetHop,
156 /// The message to send.
157 msg: AnyRelayMsg,
158 /// A sender that we use to tell the caller that the message was sent
159 /// and the handler installed.
160 sender: oneshot::Sender<Result<()>>,
161 },
162 /// Send a given control message on this circuit, and install a control-message handler to
163 /// receive responses.
164 #[cfg(feature = "send-control-msg")]
165 SendMsgAndInstallHandler {
166 /// The message to send, if any
167 msg: Option<AnyRelayMsgOuter>,
168 /// A message handler to install.
169 ///
170 /// If this is `None`, there must already be a message handler installed
171 #[educe(Debug(ignore))]
172 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
173 /// A sender that we use to tell the caller that the message was sent
174 /// and the handler installed.
175 sender: oneshot::Sender<Result<()>>,
176 },
177 /// Inform the reactor that there's a flow control update for a given stream.
178 ///
179 /// The reactor will decide how to handle this update depending on the type of flow control and
180 /// the current state of the stream.
181 FlowCtrlUpdate {
182 /// The type of flow control update, and any associated metadata.
183 msg: FlowCtrlMsg,
184 /// The stream ID that the update is for.
185 stream_id: StreamId,
186 /// The hop that the stream is on.
187 hop: HopLocation,
188 },
189 /// Get the clock skew claimed by the first hop of the circuit.
190 FirstHopClockSkew {
191 /// Oneshot channel to return the clock skew.
192 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
193 },
194 /// Link the specified circuits into the current tunnel,
195 /// to form a multi-path tunnel.
196 #[cfg(feature = "conflux")]
197 #[allow(unused)] // TODO(conflux)
198 LinkCircuits {
199 /// The circuits to link into the tunnel,
200 #[educe(Debug(ignore))]
201 circuits: Vec<Circuit>,
202 /// Oneshot channel to notify sender when all the specified circuits have finished linking,
203 /// or have failed to link.
204 ///
205 /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
206 /// (see [set construction]).
207 ///
208 /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
209 answer: ConfluxLinkResultChannel,
210 },
211}
212
213/// A message telling the reactor to do something.
214///
215/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
216/// never cause cells to sent on the channel,
217/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
218/// some of which instruct the reactor to send cells down the channel.
219#[derive(educe::Educe)]
220#[educe(Debug)]
221pub(crate) enum CtrlCmd {
222 /// Shut down the reactor.
223 Shutdown,
224 /// Extend the circuit by one hop, in response to an out-of-band handshake.
225 ///
226 /// (This is used for onion services, where the negotiation takes place in
227 /// INTRODUCE and RENDEZVOUS messages.)
228 #[cfg(feature = "hs-common")]
229 ExtendVirtual {
230 /// The cryptographic algorithms and keys to use when communicating with
231 /// the newly added hop.
232 #[educe(Debug(ignore))]
233 cell_crypto: (
234 Box<dyn OutboundClientLayer + Send>,
235 Box<dyn InboundClientLayer + Send>,
236 Option<CircuitBinding>,
237 ),
238 /// A set of parameters to negotiate with this hop.
239 settings: HopSettings,
240 /// Oneshot channel to notify on completion.
241 done: ReactorResultChannel<()>,
242 },
243 /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
244 ResolveTargetHop {
245 /// The target hop to resolve.
246 hop: TargetHop,
247 /// Oneshot channel to notify on completion.
248 done: ReactorResultChannel<HopLocation>,
249 },
250 /// Begin accepting streams on this circuit.
251 #[cfg(feature = "hs-service")]
252 AwaitStreamRequest {
253 /// A channel for sending information about an incoming stream request.
254 incoming_sender: StreamReqSender,
255 /// A `CmdChecker` to keep track of which message types are acceptable.
256 cmd_checker: AnyCmdChecker,
257 /// Oneshot channel to notify on completion.
258 done: ReactorResultChannel<()>,
259 /// The hop that is allowed to create streams.
260 hop: TargetHop,
261 /// A filter used to check requests before passing them on.
262 #[educe(Debug(ignore))]
263 #[cfg(feature = "hs-service")]
264 filter: Box<dyn IncomingStreamRequestFilter>,
265 },
266 /// Request the binding key of a target hop.
267 #[cfg(feature = "hs-service")]
268 GetBindingKey {
269 /// The hop for which we want the key.
270 hop: TargetHop,
271 /// Oneshot channel to notify on completion.
272 done: ReactorResultChannel<Option<CircuitBinding>>,
273 },
274 /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
275 #[cfg(test)]
276 AddFakeHop {
277 relay_cell_format: RelayCellFormat,
278 fwd_lasthop: bool,
279 rev_lasthop: bool,
280 peer_id: path::HopDetail,
281 params: CircParameters,
282 done: ReactorResultChannel<()>,
283 },
284 /// (tests only) Get the send window and expected tags for a given hop.
285 #[cfg(test)]
286 QuerySendWindow {
287 hop: HopNum,
288 leg: UniqId,
289 done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
290 },
291 /// Shut down the reactor, and return the underlying [`Circuit`],
292 /// if the tunnel is not multi-path.
293 ///
294 /// Returns an error if called on a multi-path reactor.
295 #[cfg(feature = "conflux")]
296 #[allow(unused)] // TODO(conflux)
297 ShutdownAndReturnCircuit {
298 /// Oneshot channel to return the underlying [`Circuit`],
299 /// or an error if the reactor's tunnel is multi-path.
300 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
301 },
302
303 /// Install or remove a [`padding::CircuitPadder`] for a given hop.
304 ///
305 /// Any existing `CircuitPadder` at that hop is replaced.
306 #[cfg(feature = "circ-padding-manual")]
307 SetPadder {
308 /// The hop to modify.
309 hop: HopLocation,
310 /// The Padder to install, or None to remove any existing padder.
311 padder: Option<padding::CircuitPadder>,
312 /// A sender to alert after we've changed the padding.
313 sender: oneshot::Sender<Result<()>>,
314 },
315
316 /// Yield the most active [`TunnelActivity`] for any hop on any leg of this tunnel.
317 GetTunnelActivity {
318 /// A sender to receive the reply.
319 sender: oneshot::Sender<TunnelActivity>,
320 },
321}
322
323/// A flow control update message.
324#[derive(Debug)]
325pub(crate) enum FlowCtrlMsg {
326 /// Send a SENDME message on this stream.
327 Sendme,
328 /// Send an XON message on this stream with the given rate.
329 Xon(XonKbpsEwma),
330}
331
332/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
333///
334/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
335/// respectively, are handled.
336pub(crate) struct ControlHandler<'a> {
337 /// Reference to the reactor of this
338 reactor: &'a mut Reactor,
339}
340
341impl<'a> ControlHandler<'a> {
342 /// Constructor.
343 pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
344 Self { reactor }
345 }
346
347 /// Handle a control message.
348 pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
349 trace!(
350 tunnel_id = %self.reactor.tunnel_id,
351 msg = ?msg,
352 "reactor received control message"
353 );
354
355 match msg {
356 // This is handled earlier, since it requires blocking.
357 CtrlMsg::Create { done, .. } => {
358 if self.reactor.circuits.len() == 1 {
359 // This should've been handled in Reactor::run_once()
360 // (ControlHandler::handle_msg() is never called before wait_for_create()).
361 debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
362 // Don't care if the receiver goes away
363 let _ = done.send(Err(tor_error::bad_api_usage!(
364 "cannot create first hop twice"
365 )
366 .into()));
367 } else {
368 // Don't care if the receiver goes away
369 let _ = done.send(Err(tor_error::bad_api_usage!(
370 "cannot create first hop on multipath tunnel"
371 )
372 .into()));
373 }
374
375 Ok(None)
376 }
377 CtrlMsg::ExtendNtor {
378 peer_id,
379 public_key,
380 linkspecs,
381 settings,
382 done,
383 } => {
384 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
385 // Don't care if the receiver goes away
386 let _ = done.send(Err(tor_error::bad_api_usage!(
387 "cannot extend multipath tunnel"
388 )
389 .into()));
390
391 return Ok(None);
392 };
393
394 let (extender, cell) = CircuitExtender::<NtorClient>::begin(
395 peer_id,
396 HandshakeType::NTOR,
397 &public_key,
398 linkspecs,
399 settings,
400 &(),
401 circ,
402 done,
403 )?;
404 self.reactor
405 .cell_handlers
406 .set_meta_handler(Box::new(extender))?;
407
408 Ok(Some(RunOnceCmdInner::Send {
409 leg: circ.unique_id(),
410 cell,
411 done: None,
412 }))
413 }
414 CtrlMsg::ExtendNtorV3 {
415 peer_id,
416 public_key,
417 linkspecs,
418 settings,
419 done,
420 } => {
421 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
422 // Don't care if the receiver goes away
423 let _ = done.send(Err(tor_error::bad_api_usage!(
424 "cannot extend multipath tunnel"
425 )
426 .into()));
427
428 return Ok(None);
429 };
430
431 let client_extensions = settings.circuit_request_extensions()?;
432
433 let (extender, cell) = CircuitExtender::<NtorV3Client>::begin(
434 peer_id,
435 HandshakeType::NTOR_V3,
436 &public_key,
437 linkspecs,
438 settings,
439 &client_extensions,
440 circ,
441 done,
442 )?;
443 self.reactor
444 .cell_handlers
445 .set_meta_handler(Box::new(extender))?;
446
447 Ok(Some(RunOnceCmdInner::Send {
448 leg: circ.unique_id(),
449 cell,
450 done: None,
451 }))
452 }
453 CtrlMsg::BeginStream {
454 hop,
455 message,
456 sender,
457 rx,
458 rate_limit_notifier,
459 drain_rate_requester,
460 done,
461 cmd_checker,
462 } => {
463 // If resolving the hop fails,
464 // we want to report an error back to the initiator and not shut down the reactor.
465 let hop_location = match self.reactor.resolve_target_hop(hop) {
466 Ok(x) => x,
467 Err(e) => {
468 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
469 // don't care if receiver goes away
470 let _ = done.send(Err(e.into()));
471 return Ok(None);
472 }
473 };
474 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
475 Ok(x) => x,
476 Err(e) => {
477 let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
478 // don't care if receiver goes away
479 let _ = done.send(Err(e.into()));
480 return Ok(None);
481 }
482 };
483 let circ = match self.reactor.circuits.leg_mut(leg_id) {
484 Some(x) => x,
485 None => {
486 let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
487 // don't care if receiver goes away
488 let _ = done.send(Err(e.into()));
489 return Ok(None);
490 }
491 };
492
493 let cell = circ.begin_stream(
494 hop_num,
495 message,
496 sender,
497 rx,
498 rate_limit_notifier,
499 drain_rate_requester,
500 cmd_checker,
501 )?;
502 Ok(Some(RunOnceCmdInner::BeginStream {
503 leg: leg_id,
504 cell,
505 hop: hop_location,
506 done,
507 }))
508 }
509 #[cfg(feature = "hs-service")]
510 CtrlMsg::ClosePendingStream {
511 hop,
512 stream_id,
513 message,
514 done,
515 } => Ok(Some(RunOnceCmdInner::CloseStream {
516 hop,
517 sid: stream_id,
518 behav: message,
519 reason: streammap::TerminateReason::ExplicitEnd,
520 done: Some(done),
521 })),
522 CtrlMsg::FlowCtrlUpdate {
523 msg,
524 stream_id,
525 hop,
526 } => {
527 match msg {
528 FlowCtrlMsg::Sendme => {
529 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
530 Ok(x) => x,
531 Err(NoJoinPointError) => {
532 // A stream tried to send a stream-level SENDME message to the join point of
533 // a tunnel that has never had a join point. Currently in arti, only a
534 // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
535 // originally created the `StreamTarget` to begin with. So this is a
536 // legitimate bug somewhere in the tunnel code.
537 return Err(
538 internal!(
539 "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
540 )
541 .into()
542 );
543 }
544 };
545
546 // Congestion control decides if we can send stream level SENDMEs or not.
547 let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
548 {
549 Some(x) => x,
550 None => {
551 // The leg/hop has disappeared. This is fine since the stream may have ended
552 // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
553 // It is possible that is a bug and this is an incorrect leg/hop number, but
554 // it's not currently possible to differentiate between an incorrect leg/hop
555 // number and a circuit hop that has been closed.
556 debug!(
557 "Could not send a stream-level SENDME on a hop that does not exist. Ignoring."
558 );
559 return Ok(None);
560 }
561 };
562
563 if !sendme_required {
564 // Nothing to do, so discard the SENDME.
565 return Ok(None);
566 }
567
568 let sendme = Sendme::new_empty();
569 let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
570
571 let cell = SendRelayCell {
572 hop: Some(hop_num),
573 early: false,
574 cell,
575 };
576
577 Ok(Some(RunOnceCmdInner::Send {
578 leg: leg_id,
579 cell,
580 done: None,
581 }))
582 }
583 FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
584 rate,
585 hop,
586 stream_id,
587 })),
588 }
589 }
590 // TODO(conflux): this should specify which leg to send the msg on
591 // (currently we send it down the primary leg).
592 //
593 // This will involve updating ClientCIrc::send_raw_msg() to take a
594 // leg id argument (which is a breaking change.
595 #[cfg(feature = "send-control-msg")]
596 CtrlMsg::SendMsg { hop, msg, sender } => {
597 let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
598 // Don't care if receiver goes away
599 let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
600 return Ok(None);
601 };
602
603 let cell = AnyRelayMsgOuter::new(None, msg);
604 let cell = SendRelayCell {
605 hop: Some(hop_num),
606 early: false,
607 cell,
608 };
609
610 Ok(Some(RunOnceCmdInner::Send {
611 leg: leg_id,
612 cell,
613 done: Some(sender),
614 }))
615 }
616 // TODO(conflux): this should specify which leg to send the msg on
617 // (currently we send it down the primary leg)
618 #[cfg(feature = "send-control-msg")]
619 CtrlMsg::SendMsgAndInstallHandler {
620 msg,
621 handler,
622 sender,
623 } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
624 msg,
625 handler,
626 done: sender,
627 })),
628 CtrlMsg::FirstHopClockSkew { answer } => {
629 Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
630 }
631 #[cfg(feature = "conflux")]
632 CtrlMsg::LinkCircuits { circuits, answer } => {
633 Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
634 }
635 }
636 }
637
638 /// Handle a control command.
639 #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
640 pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
641 trace!(
642 tunnel_id = %self.reactor.tunnel_id,
643 msg = ?msg,
644 "reactor received control command"
645 );
646
647 match msg {
648 CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
649 #[cfg(feature = "hs-common")]
650 #[allow(unreachable_code)]
651 CtrlCmd::ExtendVirtual {
652 cell_crypto,
653 settings,
654 done,
655 } => {
656 let (outbound, inbound, binding) = cell_crypto;
657
658 // TODO HS: Perhaps this should describe the onion service, or
659 // describe why the virtual hop was added, or something?
660 let peer_id = path::HopDetail::Virtual;
661
662 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
663 // Don't care if the receiver goes away
664 let _ = done.send(Err(tor_error::bad_api_usage!(
665 "cannot extend multipath tunnel"
666 )
667 .into()));
668
669 return Ok(());
670 };
671
672 leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
673 let _ = done.send(Ok(()));
674
675 Ok(())
676 }
677 CtrlCmd::ResolveTargetHop { hop, done } => {
678 let _ = done.send(
679 self.reactor
680 .resolve_target_hop(hop)
681 .map_err(|_| crate::util::err::Error::NoSuchHop),
682 );
683 Ok(())
684 }
685 #[cfg(feature = "hs-service")]
686 CtrlCmd::AwaitStreamRequest {
687 cmd_checker,
688 incoming_sender,
689 hop,
690 done,
691 filter,
692 } => {
693 let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
694 let _ = done.send(Err(crate::Error::NoSuchHop));
695 return Ok(());
696 };
697 // TODO: At some point we might want to add a CtrlCmd for
698 // de-registering the handler. See comments on `allow_stream_requests`.
699 let handler = IncomingStreamRequestHandler {
700 incoming_sender,
701 cmd_checker,
702 hop_num: Some(hop_num),
703 filter,
704 };
705
706 let ret = self
707 .reactor
708 .cell_handlers
709 .set_incoming_stream_req_handler(handler);
710 let _ = done.send(ret); // don't care if the corresponding receiver goes away.
711
712 Ok(())
713 }
714 #[cfg(feature = "hs-service")]
715 CtrlCmd::GetBindingKey { hop, done } => {
716 let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
717 let _ = done.send(Err(tor_error::internal!(
718 "Unknown TargetHop when getting binding key"
719 )
720 .into()));
721 return Ok(());
722 };
723 let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
724 let _ = done.send(Err(tor_error::bad_api_usage!(
725 "Unknown circuit id {leg_id} when getting binding key"
726 )
727 .into()));
728 return Ok(());
729 };
730 // Get the binding key from the mutable state and send it back.
731 let key = circuit.mutable().binding_key(hop_num);
732 let _ = done.send(Ok(key));
733
734 Ok(())
735 }
736 #[cfg(test)]
737 CtrlCmd::AddFakeHop {
738 relay_cell_format,
739 fwd_lasthop,
740 rev_lasthop,
741 peer_id,
742 params,
743 done,
744 } => {
745 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
746 // Don't care if the receiver goes away
747 let _ = done.send(Err(tor_error::bad_api_usage!(
748 "cannot add fake hop to multipath tunnel"
749 )
750 .into()));
751
752 return Ok(());
753 };
754
755 leg.handle_add_fake_hop(
756 relay_cell_format,
757 fwd_lasthop,
758 rev_lasthop,
759 peer_id,
760 ¶ms,
761 done,
762 );
763
764 Ok(())
765 }
766 #[cfg(test)]
767 CtrlCmd::QuerySendWindow { hop, leg, done } => {
768 // Immediately invoked function means that errors will be sent to the channel.
769 let _ = done.send((|| {
770 let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
771 bad_api_usage!("cannot query send window of non-existent circuit")
772 })?;
773
774 let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
775 "received QuerySendWindow for unknown hop {}",
776 hop.display()
777 ))?;
778
779 Ok(hop.send_window_and_expected_tags())
780 })());
781
782 Ok(())
783 }
784 #[cfg(feature = "conflux")]
785 CtrlCmd::ShutdownAndReturnCircuit { answer } => {
786 self.reactor.handle_shutdown_and_return_circuit(answer)
787 }
788 #[cfg(feature = "circ-padding-manual")]
789 CtrlCmd::SetPadder {
790 hop,
791 padder,
792 sender,
793 } => {
794 let result = self.reactor.set_padding_at_hop(hop, padder);
795 let _ = sender.send(result);
796 Ok(())
797 }
798 CtrlCmd::GetTunnelActivity { sender } => {
799 let count = self.reactor.circuits.tunnel_activity();
800 let _ = sender.send(count);
801 Ok(())
802 }
803 }
804 }
805}