tor_proto/circuit/reactor.rs
1//! Module exposing the circuit reactor subsystem.
2//!
3//! This module implements the new [multi-reactor circuit subsystem].
4//!
5// Note: this is currently only used for the relay side,
6// but we plan to eventually rewrite client circuit implementation
7// to use these new reactor types as well.
8//!
9//! The entry point of the reactor is [`Reactor::run`], which launches the
10//! reactor background tasks, and begins listening for inbound cells on the provided
11//! inbound Tor channel.
12//!
13//! ### Architecture
14//!
15//! Internally, the circuit reactor consists of multiple reactors,
16//! each running in a separate task:
17//!
18//! * [`StreamReactor`] (one per hop): handles all messages arriving to,
19//! and coming from the streams of a given hop. The ready stream messages
20//! are sent to the [`BackwardReactor`]
21//! * [`ForwardReactor`]: handles incoming cells arriving on the
22//! "inbound" Tor channel (towards the guard, if we are a client, or towards
23//! the client, if we are a relay). If we are a client, it moves stream messages
24//! towards the corresponding [`StreamReactor`]. If we are a relay,
25//! in addition to sending any stream messages to the `StreamReactor`,
26//! this reactor also moves cells in the forward direction
27//! (from the client towards the exit)
28//! * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
29//! towards the client if we are a relay, or the towards the exit
30//! if we are a client.
31//!
32// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
33//!
34//! If we are an exit relay, the cell flow looks roughly like this:
35//!
36//! ```text
37//! <stream_tx
38//! MPSC (0)>
39//! +--------------> FWD -------------------------+
40//! | | |
41//! | | |
42//! | | |
43//! | | v
44//! relay BackwardReactorCmd StreamReactor
45//! ^ <MPSC (0)> |
46//! | | |
47//! | | |
48//! | | |
49//! | v |
50//! +--------------- BWD <------------------------+
51//! application stream data <stream_rx
52//! MPSC (0)>
53//!
54//! For a middle relay (the `StreamReactor` is omitted for brevity,
55//! but middle relays can have one too, if leaky pipe is in use):
56//!
57//! ```text unrecognized cell
58//! +--------------> FWD -------------------------+
59//! | | |
60//! | | |
61//! | | |
62//! | | v
63//! client BackwardReactorCmd relay
64//! or relay <MPSC (0)> |
65//! ^ | |
66//! | | |
67//! | | |
68//! | | |
69//! | v |
70//! +--------------- BWD <------------------------+
71//! ```
72//!
73//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
74//! and the `BackwardReactor` writes to it.
75//!
76//! ```text
77//! +--------------- FWD <--------------------+
78//! | | |
79//! | | |
80//! | | |
81//! v | |
82//! StreamReactor BackwardReactorCmd guard
83//! | <MPSC (0)> ^
84//! | | |
85//! | | |
86//! | | |
87//! | v |
88//! +--------------> BWD ---------------------+
89//! ```
90//!
91//! Client with leaky pipe (`SR` = `StreamReactor`):
92//!
93//! ```text
94//! +------------------------------+
95//! | +--------------------+ | (1 MPSC TX per SR)
96//! | | | |
97//! | | +----------- FWD <------------------+
98//! | | | | |
99//! | | | | |
100//! | | | | |
101//! v v v | |
102//! SR SR SR BackwardReactorCmd guard
103//! (hop 4) (hop 3) (hop 2) <MPSC (0)> ^
104//! | | | | |
105//! | | | | |
106//! | | | | |
107//! | | | v |
108//! | | | BWD -------------------+
109//! | | | ^
110//! | | | |
111//! | | | | <stream_rx
112//! | | | | MPSC (0)>
113//! +-------+-------+-------------+
114//! ```
115//!
116// TODO(tuning): The inter-reactor MPSC channels have no buffering,
117// which is likely going to be bad for performance,
118// so we will need to tune the sizes of these MPSC buffers.
119//!
120//! The read and write ends of the inbound and outbound Tor channels are "split",
121//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
122//! and a `inbound_chan_tx` sink (for writing):
123//!
124//! * `ForwardReactor` holds the reading end of the inbound
125//! (coming from the client, if we are a relay, or coming from the guard, if we are a client)
126//! Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
127//! Tor channel, if there is one
128//! * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
129//! and the writing end of the inbound channel, if there is one
130//!
131//! #### `ForwardReactor`
132//!
133//! It handles forward cells, by delegating to the implementation-dependent
134//! [`ForwardHandler::handle_forward_cell`], which decides
135//! whether the cell needs to be handled in `ForwardReactor`,
136//! or in the `ForwardHandler` itself.
137//!
138//! More concretely:
139//!
140//! ```text
141//!
142//! Legend: `F` = "forward reactor", `H` = "ForwardHandler"
143//!
144//! | Message | Received in | Handled in | Description |
145//! |-------------------|-------------|------------|----------------------------------------|
146//! | DESTROY | F | H | Handled internally by the FowardHandler|
147//! |-------------------|-------------|------------|----------------------------------------|
148//! | PADDING_NEGOTIATE | F | H | Handled internally by the FowardHandler|
149//! |-------------------|-------------|------------|----------------------------------------|
150//! | *unrecognized* | F | H | Unrecognized relay cell handling is |
151//! | RELAY OR | | | implementation-dependent so these are |
152//! | RELAY_EARLY | | | handled in the ForwardHandler. |
153//! | | | | |
154//! | | | | The relay ForwardHandler will handle |
155//! | | | | these by forwarding them to the next |
156//! | | | | hop, if there is one. |
157//! | | | | |
158//! | | | | Clients don't yet implement |
159//! | | | | ForwardHandler, but when they do, |
160//! | | | | its implementation will simply reject |
161//! | | | | any messages that can't be decrypted |
162//! |-------------------|-------------|------------|----------------------------------------|
163//! | *recognized* | F | see table | Handling depends on the cmd |
164//! | RELAY OR | | below | |
165//! | RELAY_EARLY | | | |
166//! ```
167//!
168//! Recognized relay cells are handled by splitting each cell into individual messages,
169//! and handling each message individually as described in the table below
170//! (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell):
171//!
172//! ```text
173//!
174//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
175//!
176//! | RELAY cmd | Received in | Handled in | Description |
177//! |-------------------|-------------|------------|----------------------------------------|
178//! | SENDME | F | B | Sent to BackwardReactor for handling |
179//! | | | | (BackwardReactorCmd::HandleSendme) |
180//! | | | | because the forward reactor doesn't |
181//! | | | | have access to the inbound_chan_tx part|
182//! | | | | of the inbound (towards the client) |
183//! | | | | Tor channel, and so cannot obtain the |
184//! | | | | congestion signals needed for SENDME |
185//! | | | | handling |
186//! |-------------------|-------------|------------|----------------------------------------|
187//! | Other | F | F | Passed to impl-dependent handler |
188//! | (StreamId = 0) | | | `ForwardHandler::handle_meta_msg()` |
189//! |-------------------|-------------|------------|----------------------------------------|
190//! | Other | F | S | All messages with a non-zero stream ID |
191//! | (StreamId != 0) | | | are forwarded to the stream reactor |
192//! |-------------------|-------------|------------|----------------------------------------|
193//! ```
194//!
195//! #### `BackwardReactor`
196//!
197//! It handles
198//!
199//! * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
200//! (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
201//! * incoming cells coming over the "outbound" Tor channel. This channel only exists
202//! if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
203//! * the sending of padding cells, according to the PaddingController's instructions
204//!
205//! This multi-reactor architecture should, in theory, have better performance than
206//! a single reactor system, because it enables us to parallelize some of the work:
207//! the forward and backward directions share little state,
208//! because they read from, and write to, different sinks/streams,
209//! so they can be run in parallel (as separate tasks).
210//! With a single reactor architecture, the reactor would need to drive
211//! both the forward and the backward direction, and on each iteration
212//! would need to decide which to prioritize, which might prove tricky
213//! (though prioritizing one of them at random would've probably been good enough).
214//!
215//! The monolithic single reactor alternative would also have been significantly
216//! more convoluted, and so more difficult to maintain in the long run.
217//!
218//
219// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
220// (including the congestion control object, which is behind a mutex).
221//
222//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
223//! [`StreamReactor`]: stream::StreamReactor
224
225// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
226// to use the new reactor architecture
227pub(crate) mod circhop;
228
229pub(crate) mod backward;
230pub(crate) mod forward;
231pub(crate) mod hop_mgr;
232pub(crate) mod macros;
233pub(crate) mod stream;
234
235use std::result::Result as StdResult;
236use std::sync::Arc;
237
238use derive_deftly::Deftly;
239use futures::channel::mpsc;
240use futures::{FutureExt as _, StreamExt as _, select_biased};
241use oneshot_fused_workaround as oneshot;
242use tracing::trace;
243
244use tor_cell::chancell::CircId;
245use tor_rtcompat::{DynTimeProvider, Runtime};
246
247use crate::channel::Channel;
248use crate::circuit::reactor::backward::BackwardHandler;
249use crate::circuit::reactor::forward::ForwardHandler;
250use crate::circuit::reactor::hop_mgr::HopMgr;
251use crate::circuit::reactor::stream::ReadyStreamMsg;
252use crate::circuit::{CircuitRxReceiver, UniqId};
253use crate::memquota::CircuitAccount;
254use crate::util::err::ReactorError;
255
256// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
257use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
258
259use backward::BackwardReactor;
260use forward::ForwardReactor;
261use macros::derive_deftly_template_CircuitReactor;
262
263/// The type of a oneshot channel used to inform reactor of the result of an operation.
264pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
265
266/// A handle for interacting with a circuit reactor.
267#[derive(derive_more::Debug)]
268pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
269 /// Sender for reactor control messages.
270 #[debug(skip)]
271 pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
272 /// Sender for reactor control commands.
273 #[debug(skip)]
274 pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
275 /// The time provider.
276 pub(crate) time_provider: DynTimeProvider,
277 /// Memory quota account
278 pub(crate) memquota: CircuitAccount,
279}
280
281/// A control command.
282///
283/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
284/// never cause cells to sent on the Tor channel,
285/// while `CtrlMsg`s potentially do.
286#[allow(unused)] // TODO(relay)
287pub(crate) enum CtrlCmd<F, B> {
288 /// A control command for the forward reactor.
289 Forward(forward::CtrlCmd<F>),
290 /// A control command for the backward reactor.
291 Backward(backward::CtrlCmd<B>),
292 /// Shut down the reactor.
293 Shutdown,
294}
295
296/// A control message.
297#[allow(unused)] // TODO(relay)
298pub(crate) enum CtrlMsg<F, B> {
299 /// A control message for the forward reactor.
300 Forward(forward::CtrlMsg<F>),
301 /// A control message for the backward reactor.
302 Backward(backward::CtrlMsg<B>),
303}
304
305/// The entry point of the circuit reactor subsystem.
306#[derive(Deftly)]
307#[derive_deftly(CircuitReactor)]
308#[deftly(reactor_name = "circuit reactor")]
309#[deftly(only_run_once)]
310#[deftly(run_inner_fn = "Self::run_inner")]
311#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
312pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
313 /// The process-unique identifier of this circuit.
314 ///
315 /// Used for logging.
316 unique_id: UniqId,
317 /// The reactor for handling
318 ///
319 /// * cells moving in the forward direction (from the client towards exit), if we are a relay
320 /// * incoming cells (coming from the guard), if we are a client
321 ///
322 /// Optional so we can move it out of self in run().
323 forward: Option<ForwardReactor<R, F>>,
324 /// The reactor for handling
325 ///
326 /// * cells moving in the backward direction (from the exit towards client), if we are a relay
327 /// * outgoing cells (moving towards the guard), if we are a client
328 ///
329 /// Optional so we can move it out of self in run().
330 backward: Option<BackwardReactor<B>>,
331 /// Receiver for control messages for this reactor, sent by reactor handle objects.
332 control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
333 /// Receiver for command messages for this reactor, sent by reactor handle objects.
334 ///
335 /// This MPSC channel is polled in [`run`](Self::run).
336 ///
337 /// NOTE: this is a separate channel from `control`, because some messages
338 /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
339 /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
340 /// is ready to accept cells).
341 command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
342 /// Control channels for the [`ForwardReactor`].
343 ///
344 /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
345 fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
346 /// Control channels for the [`BackwardReactor`].
347 ///
348 /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
349 bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
350}
351
352/// A handle for sending control/command messages to a FWD or BWD.
353struct ReactorCtrl<C, M> {
354 /// Sender for control commands.
355 command_tx: mpsc::UnboundedSender<C>,
356 /// Sender for control messages.
357 control_tx: mpsc::UnboundedSender<M>,
358}
359
360impl<C, M> ReactorCtrl<C, M> {
361 /// Create a new sender handle.
362 fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
363 Self {
364 command_tx,
365 control_tx,
366 }
367 }
368
369 /// Send a control command.
370 fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
371 self.command_tx
372 .unbounded_send(cmd)
373 .map_err(|_| ReactorError::Shutdown)
374 }
375
376 /// Send a control message.
377 fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
378 self.control_tx
379 .unbounded_send(msg)
380 .map_err(|_| ReactorError::Shutdown)
381 }
382}
383
384/// Trait implemented by types that can handle control messages and commands.
385pub(crate) trait ControlHandler {
386 /// The type of control message expected by the forward reactor.
387 type CtrlMsg;
388
389 /// The type of control command expected by the forward reactor.
390 type CtrlCmd;
391
392 // TODO(DEDUP): do these APIs make sense?
393 // What should we return here, maybe some instructions for the base reactor
394 // to do something?
395
396 /// Handle a control command.
397 fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
398
399 /// Handle a control message.
400 fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
401}
402
403#[allow(unused)] // TODO(relay)
404impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
405 Reactor<R, F, B>
406{
407 /// Create a new circuit reactor.
408 ///
409 /// The reactor will send outbound messages on `channel`, receive incoming
410 /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
411 /// [`CircId`] provided.
412 ///
413 /// The internal unique identifier for this circuit will be `unique_id`.
414 #[allow(clippy::too_many_arguments)] // TODO
415 pub(crate) fn new(
416 runtime: R,
417 channel: &Arc<Channel>,
418 circ_id: CircId,
419 unique_id: UniqId,
420 inbound_chan_rx: CircuitRxReceiver,
421 forward_impl: F,
422 backward_impl: B,
423 hop_mgr: HopMgr<R>,
424 padding_ctrl: PaddingController,
425 padding_event_stream: PaddingEventStream,
426 // The sending end of this channel should be in HopMgr
427 bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
428 fwd_events: mpsc::Receiver<F::CircEvent>,
429 memquota: &CircuitAccount,
430 ) -> (Self, CircReactorHandle<F, B>) {
431 // NOTE: not registering this channel with the memquota subsystem is okay,
432 // because it has no buffering (if ever decide to make the size of this buffer
433 // non-zero for whatever reason, we must remember to register it with memquota
434 // so that it counts towards the total memory usage for the circuit.
435 #[allow(clippy::disallowed_methods)]
436 let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
437
438 // TODO: channels galore
439 let (control_tx, control_rx) = mpsc::unbounded();
440 let (command_tx, command_rx) = mpsc::unbounded();
441
442 let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
443 let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
444 let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
445 let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
446
447 let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
448 let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
449
450 let handle = CircReactorHandle {
451 control: control_tx,
452 command: command_tx,
453 time_provider: DynTimeProvider::new(runtime.clone()),
454 memquota: memquota.clone(),
455 };
456
457 /// Grab a handle to the hop list (it's needed by the BWD)
458 let hops = Arc::clone(hop_mgr.hops());
459 let forward = ForwardReactor::new(
460 runtime.clone(),
461 unique_id,
462 forward_impl,
463 hop_mgr,
464 inbound_chan_rx,
465 fwd_control_rx,
466 fwd_command_rx,
467 backward_reactor_tx,
468 fwd_events,
469 padding_ctrl.clone(),
470 );
471
472 let backward = BackwardReactor::new(
473 runtime,
474 channel,
475 circ_id,
476 unique_id,
477 backward_impl,
478 hops,
479 forward_reactor_rx,
480 bwd_control_rx,
481 bwd_command_rx,
482 padding_ctrl,
483 padding_event_stream,
484 bwd_rx,
485 );
486
487 let reactor = Reactor {
488 unique_id,
489 forward: Some(forward),
490 backward: Some(backward),
491 control: control_rx,
492 command: command_rx,
493 fwd_ctrl,
494 bwd_ctrl,
495 };
496
497 (reactor, handle)
498 }
499
500 /// Helper for [`run`](Self::run).
501 pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
502 let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
503 .expect("relay reactor spawned twice?!");
504
505 let mut forward = Box::pin(forward.run()).fuse();
506 let mut backward = Box::pin(backward.run()).fuse();
507 loop {
508 // If either of these completes, this function returns,
509 // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
510 // cause the remaining reactor, if there is one, to shut down too
511 select_biased! {
512 res = self.command.next() => {
513 let Some(cmd) = res else {
514 trace!(
515 circ_id = %self.unique_id,
516 reason = "command channel drop",
517 "reactor shutdown",
518 );
519
520 return Err(ReactorError::Shutdown);
521 };
522
523 self.handle_command(cmd)?;
524 },
525 res = self.control.next() => {
526 let Some(msg) = res else {
527 trace!(
528 circ_id = %self.unique_id,
529 reason = "control channel drop",
530 "reactor shutdown",
531 );
532
533 return Err(ReactorError::Shutdown);
534 };
535
536 self.handle_control(msg)?;
537 },
538 // No need to log the error here, because it was already logged
539 // by the reactor that shut down
540 res = forward => return Ok(res?),
541 res = backward => return Ok(res?),
542 }
543 }
544 }
545
546 /// Handle a shutdown request.
547 fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
548 trace!(
549 tunnel_id = %self.unique_id,
550 "reactor shutdown due to explicit request",
551 );
552
553 Err(ReactorError::Shutdown)
554 }
555
556 /// Handle a [`CtrlCmd`].
557 fn handle_command(
558 &mut self,
559 cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
560 ) -> StdResult<(), ReactorError> {
561 match cmd {
562 CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
563 CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
564 CtrlCmd::Shutdown => self.handle_shutdown(),
565 }
566 }
567
568 /// Handle a [`CtrlMsg`].
569 fn handle_control(
570 &mut self,
571 cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
572 ) -> StdResult<(), ReactorError> {
573 match cmd {
574 CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
575 CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
576 }
577 }
578}
579
580#[cfg(test)]
581pub(crate) mod test {
582 // @@ begin test lint list maintained by maint/add_warning @@
583 #![allow(clippy::bool_assert_comparison)]
584 #![allow(clippy::clone_on_copy)]
585 #![allow(clippy::dbg_macro)]
586 #![allow(clippy::mixed_attributes_style)]
587 #![allow(clippy::print_stderr)]
588 #![allow(clippy::print_stdout)]
589 #![allow(clippy::single_char_pattern)]
590 #![allow(clippy::unwrap_used)]
591 #![allow(clippy::unchecked_time_subtraction)]
592 #![allow(clippy::useless_vec)]
593 #![allow(clippy::needless_pass_by_value)]
594 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
595
596 use tor_basic_utils::test_rng::testing_rng;
597 use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
598 use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
599
600 use chanmsg::AnyChanMsg;
601
602 #[cfg(feature = "hs-service")]
603 use crate::client::stream::IncomingStreamRequestFilter;
604
605 pub(crate) fn rmsg_to_ccmsg(
606 id: Option<StreamId>,
607 msg: relaymsg::AnyRelayMsg,
608 early: bool,
609 ) -> AnyChanMsg {
610 // TODO #1947: test other formats.
611 let rfmt = RelayCellFormat::V0;
612 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
613 .encode(rfmt, &mut testing_rng())
614 .unwrap();
615 let chanmsg = chanmsg::Relay::from(body);
616
617 if early {
618 let chanmsg = chanmsg::RelayEarly::from(chanmsg);
619 AnyChanMsg::RelayEarly(chanmsg)
620 } else {
621 AnyChanMsg::Relay(chanmsg)
622 }
623 }
624
625 #[cfg(any(feature = "hs-service", feature = "relay"))]
626 pub(crate) struct AllowAllStreamsFilter;
627 #[cfg(any(feature = "hs-service", feature = "relay"))]
628 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
629 fn disposition(
630 &mut self,
631 _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
632 _circ: &crate::circuit::CircHopSyncView<'_>,
633 ) -> crate::Result<crate::client::stream::IncomingStreamRequestDisposition> {
634 Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
635 }
636 }
637}