Skip to main content

tor_proto/circuit/
reactor.rs

1//! Module exposing the circuit reactor subsystem.
2//!
3//! This module implements the new [multi-reactor circuit subsystem].
4//!
5// Note: this is currently only used for the relay side,
6// but we plan to eventually rewrite client circuit implementation
7// to use these new reactor types as well.
8//!
9//! The entry point of the reactor is [`Reactor::run`], which launches the
10//! reactor background tasks, and begins listening for inbound cells on the provided
11//! inbound Tor channel.
12//!
13//! ### Architecture
14//!
15//! Internally, the circuit reactor consists of multiple reactors,
16//! each running in a separate task:
17//!
18//!   * [`StreamReactor`] (one per hop): handles all messages arriving to,
19//!     and coming from the streams of a given hop. The ready stream messages
20//!     are sent to the [`BackwardReactor`]
21//!   * [`ForwardReactor`]: handles incoming cells arriving on the
22//!     "inbound" Tor channel (towards the guard, if we are a client, or towards
23//!     the client, if we are a relay). If we are a client, it moves stream messages
24//!     towards the corresponding [`StreamReactor`]. If we are a relay,
25//!     in addition to sending any stream messages to the `StreamReactor`,
26//!     this reactor also moves cells in the forward direction
27//!     (from the client towards the exit)
28//!   * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
29//!     towards the client if we are a relay, or the towards the exit
30//!     if we are a client.
31//!
32// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
33//!
34//! If we are an exit relay, the cell flow looks roughly like this:
35//!
36//! ```text
37//!                             <stream_tx
38//!                              MPSC (0)>
39//!   +--------------> FWD -------------------------+
40//!   |                 |                           |
41//!   |                 |                           |
42//!   |                 |                           |
43//!   |                 |                           v
44//! relay      BackwardReactorCmd            StreamReactor
45//!   ^             <MPSC (0)>                      |
46//!   |                 |                           |
47//!   |                 |                           |
48//!   |                 |                           |
49//!   |                 v                           |
50//!   +--------------- BWD <------------------------+
51//!     application stream data    <stream_rx
52//!                                 MPSC (0)>
53//!
54//! For a middle relay (the `StreamReactor` is omitted for brevity,
55//! but middle relays can have one too, if leaky pipe is in use):
56//!
57//! ```text                   unrecognized cell
58//!   +--------------> FWD -------------------------+
59//!   |                 |                           |
60//!   |                 |                           |
61//!   |                 |                           |
62//!   |                 |                           v
63//! client      BackwardReactorCmd                relay
64//! or relay        <MPSC (0)>                      |
65//!   ^                 |                           |
66//!   |                 |                           |
67//!   |                 |                           |
68//!   |                 |                           |
69//!   |                 v                           |
70//!   +--------------- BWD <------------------------+
71//! ```
72//!
73//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
74//! and the `BackwardReactor` writes to it.
75//!
76//! ```text
77//!   +--------------- FWD <--------------------+
78//!   |                 |                       |
79//!   |                 |                       |
80//!   |                 |                       |
81//!   v                 |                       |
82//! StreamReactor  BackwardReactorCmd         guard
83//!   |               <MPSC (0)>                ^
84//!   |                 |                       |
85//!   |                 |                       |
86//!   |                 |                       |
87//!   |                 v                       |
88//!   +--------------> BWD ---------------------+
89//! ```
90//!
91//! Client with leaky pipe (`SR` = `StreamReactor`):
92//!
93//! ```text
94//!   +------------------------------+
95//!   |       +--------------------+ | (1 MPSC TX per SR)
96//!   |       |                    | |
97//!   |       |       +----------- FWD <------------------+
98//!   |       |       |             |                     |
99//!   |       |       |             |                     |
100//!   |       |       |             |                     |
101//!   v       v       v             |                     |
102//!  SR      SR      SR           BackwardReactorCmd    guard
103//! (hop 4) (hop 3)  (hop 2)      <MPSC (0)>              ^
104//!   |       |       |             |                     |
105//!   |       |       |             |                     |
106//!   |       |       |             |                     |
107//!   |       |       |             v                     |
108//!   |       |       |            BWD -------------------+
109//!   |       |       |             ^
110//!   |       |       |             |
111//!   |       |       |             | <stream_rx
112//!   |       |       |             |  MPSC (0)>
113//!   +-------+-------+-------------+
114//! ```
115//!
116// TODO(tuning): The inter-reactor MPSC channels have no buffering,
117// which is likely going to be bad for performance,
118// so we will need to tune the sizes of these MPSC buffers.
119//!
120//! The read and write ends of the inbound and outbound Tor channels are "split",
121//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
122//! and a `inbound_chan_tx` sink (for writing):
123//!
124//!  * `ForwardReactor` holds the reading end of the inbound
125//!    (coming from the client, if we are a relay, or coming from the guard, if we are a client)
126//!    Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
127//!    Tor channel, if there is one
128//!  * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
129//!    and the writing end of the inbound channel, if there is one
130//!
131//! #### `ForwardReactor`
132//!
133//! It handles forward cells, by delegating to the implementation-dependent
134//! [`ForwardHandler::handle_forward_cell`], which decides
135//! whether the cell needs to be handled in `ForwardReactor`,
136//! or in the `ForwardHandler` itself.
137//!
138//! More concretely:
139//!
140//! ```text
141//!
142//! Legend: `F` = "forward reactor", `H` = "ForwardHandler"
143//!
144//! | Message           | Received in | Handled in | Description                            |
145//! |-------------------|-------------|------------|----------------------------------------|
146//! | DESTROY           | F           | H          | Handled internally by the FowardHandler|
147//! |-------------------|-------------|------------|----------------------------------------|
148//! | PADDING_NEGOTIATE | F           | H          | Handled internally by the FowardHandler|
149//! |-------------------|-------------|------------|----------------------------------------|
150//! | *unrecognized*    | F           | H          | Unrecognized relay cell handling is    |
151//! | RELAY OR          |             |            | implementation-dependent so these are  |
152//! | RELAY_EARLY       |             |            | handled in the ForwardHandler.         |
153//! |                   |             |            |                                        |
154//! |                   |             |            | The relay ForwardHandler will handle   |
155//! |                   |             |            | these by forwarding them to the next   |
156//! |                   |             |            | hop, if there is one.                  |
157//! |                   |             |            |                                        |
158//! |                   |             |            | Clients don't yet implement            |
159//! |                   |             |            | ForwardHandler, but when they do,      |
160//! |                   |             |            | its implementation will simply reject  |
161//! |                   |             |            | any messages that can't be decrypted   |
162//! |-------------------|-------------|------------|----------------------------------------|
163//! | *recognized*      | F           | see table  | Handling depends on the cmd            |
164//! | RELAY OR          |             | below      |                                        |
165//! | RELAY_EARLY       |             |            |                                        |
166//! ```
167//!
168//! Recognized relay cells are handled by splitting each cell into individual messages,
169//! and handling each message individually as described in the table below
170//! (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell):
171//!
172//! ```text
173//!
174//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
175//!
176//! | RELAY cmd         | Received in | Handled in | Description                            |
177//! |-------------------|-------------|------------|----------------------------------------|
178//! | SENDME            | F           | B          | Sent to BackwardReactor for handling   |
179//! |                   |             |            | (BackwardReactorCmd::HandleSendme)     |
180//! |                   |             |            | because the forward reactor doesn't    |
181//! |                   |             |            | have access to the inbound_chan_tx part|
182//! |                   |             |            | of the inbound (towards the client)    |
183//! |                   |             |            | Tor channel, and so cannot obtain the  |
184//! |                   |             |            | congestion signals needed for SENDME   |
185//! |                   |             |            | handling                               |
186//! |-------------------|-------------|------------|----------------------------------------|
187//! | Other             | F           | F          | Passed to impl-dependent handler       |
188//! | (StreamId = 0)    |             |            |  `ForwardHandler::handle_meta_msg()`   |
189//! |-------------------|-------------|------------|----------------------------------------|
190//! | Other             | F           | S          | All messages with a non-zero stream ID |
191//! | (StreamId != 0)   |             |            | are forwarded to the stream reactor    |
192//! |-------------------|-------------|------------|----------------------------------------|
193//! ```
194//!
195//! #### `BackwardReactor`
196//!
197//! It handles
198//!
199//!  * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
200//!    (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
201//!  * incoming cells coming over the "outbound" Tor channel. This channel only exists
202//!    if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
203//!  * the sending of padding cells, according to the PaddingController's instructions
204//!
205//! This multi-reactor architecture should, in theory, have better performance than
206//! a single reactor system, because it enables us to parallelize some of the work:
207//! the forward and backward directions share little state,
208//! because they read from, and write to, different sinks/streams,
209//! so they can be run in parallel (as separate tasks).
210//! With a single reactor architecture, the reactor would need to drive
211//! both the forward and the backward direction, and on each iteration
212//! would need to decide which to prioritize, which might prove tricky
213//! (though prioritizing one of them at random would've probably been good enough).
214//!
215//! The monolithic single reactor alternative would also have been significantly
216//! more convoluted, and so more difficult to maintain in the long run.
217//!
218//
219// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
220// (including the congestion control object, which is behind a mutex).
221//
222//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
223//! [`StreamReactor`]: stream::StreamReactor
224
225// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
226// to use the new reactor architecture
227pub(crate) mod circhop;
228
229pub(crate) mod backward;
230pub(crate) mod forward;
231pub(crate) mod hop_mgr;
232pub(crate) mod macros;
233pub(crate) mod stream;
234
235use std::result::Result as StdResult;
236use std::sync::Arc;
237
238use derive_deftly::Deftly;
239use futures::channel::mpsc;
240use futures::{FutureExt as _, StreamExt as _, select_biased};
241use oneshot_fused_workaround as oneshot;
242use tracing::trace;
243
244use tor_cell::chancell::CircId;
245use tor_rtcompat::{DynTimeProvider, Runtime};
246
247use crate::channel::Channel;
248use crate::circuit::reactor::backward::BackwardHandler;
249use crate::circuit::reactor::forward::ForwardHandler;
250use crate::circuit::reactor::hop_mgr::HopMgr;
251use crate::circuit::reactor::stream::ReadyStreamMsg;
252use crate::circuit::{CircuitRxReceiver, UniqId};
253use crate::memquota::CircuitAccount;
254use crate::util::err::ReactorError;
255
256// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
257use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
258
259use backward::BackwardReactor;
260use forward::ForwardReactor;
261use macros::derive_deftly_template_CircuitReactor;
262
263/// The type of a oneshot channel used to inform reactor of the result of an operation.
264pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
265
266/// A handle for interacting with a circuit reactor.
267#[derive(derive_more::Debug)]
268pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
269    /// Sender for reactor control messages.
270    #[debug(skip)]
271    pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
272    /// Sender for reactor control commands.
273    #[debug(skip)]
274    pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
275    /// The time provider.
276    pub(crate) time_provider: DynTimeProvider,
277    /// Memory quota account
278    pub(crate) memquota: CircuitAccount,
279}
280
281/// A control command.
282///
283/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
284/// never cause cells to sent on the Tor channel,
285/// while `CtrlMsg`s potentially do.
286#[allow(unused)] // TODO(relay)
287pub(crate) enum CtrlCmd<F, B> {
288    /// A control command for the forward reactor.
289    Forward(forward::CtrlCmd<F>),
290    /// A control command for the backward reactor.
291    Backward(backward::CtrlCmd<B>),
292    /// Shut down the reactor.
293    Shutdown,
294}
295
296/// A control message.
297#[allow(unused)] // TODO(relay)
298pub(crate) enum CtrlMsg<F, B> {
299    /// A control message for the forward reactor.
300    Forward(forward::CtrlMsg<F>),
301    /// A control message for the backward reactor.
302    Backward(backward::CtrlMsg<B>),
303}
304
305/// The entry point of the circuit reactor subsystem.
306#[derive(Deftly)]
307#[derive_deftly(CircuitReactor)]
308#[deftly(reactor_name = "circuit reactor")]
309#[deftly(only_run_once)]
310#[deftly(run_inner_fn = "Self::run_inner")]
311#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
312pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
313    /// The process-unique identifier of this circuit.
314    ///
315    /// Used for logging.
316    unique_id: UniqId,
317    /// The reactor for handling
318    ///
319    ///   * cells moving in the forward direction (from the client towards exit), if we are a relay
320    ///   * incoming cells (coming from the guard), if we are a client
321    ///
322    /// Optional so we can move it out of self in run().
323    forward: Option<ForwardReactor<R, F>>,
324    /// The reactor for handling
325    ///
326    ///   * cells moving in the backward direction (from the exit towards client), if we are a relay
327    ///   * outgoing cells (moving towards the guard), if we are a client
328    ///
329    /// Optional so we can move it out of self in run().
330    backward: Option<BackwardReactor<B>>,
331    /// Receiver for control messages for this reactor, sent by reactor handle objects.
332    control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
333    /// Receiver for command messages for this reactor, sent by reactor handle objects.
334    ///
335    /// This MPSC channel is polled in [`run`](Self::run).
336    ///
337    /// NOTE: this is a separate channel from `control`, because some messages
338    /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
339    /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
340    /// is ready to accept cells).
341    command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
342    /// Control channels for the [`ForwardReactor`].
343    ///
344    /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
345    fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
346    /// Control channels for the [`BackwardReactor`].
347    ///
348    /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
349    bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
350}
351
352/// A handle for sending control/command messages to a FWD or BWD.
353struct ReactorCtrl<C, M> {
354    /// Sender for control commands.
355    command_tx: mpsc::UnboundedSender<C>,
356    /// Sender for control messages.
357    control_tx: mpsc::UnboundedSender<M>,
358}
359
360impl<C, M> ReactorCtrl<C, M> {
361    /// Create a new sender handle.
362    fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
363        Self {
364            command_tx,
365            control_tx,
366        }
367    }
368
369    /// Send a control command.
370    fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
371        self.command_tx
372            .unbounded_send(cmd)
373            .map_err(|_| ReactorError::Shutdown)
374    }
375
376    /// Send a control message.
377    fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
378        self.control_tx
379            .unbounded_send(msg)
380            .map_err(|_| ReactorError::Shutdown)
381    }
382}
383
384/// Trait implemented by types that can handle control messages and commands.
385pub(crate) trait ControlHandler {
386    /// The type of control message expected by the forward reactor.
387    type CtrlMsg;
388
389    /// The type of control command expected by the forward reactor.
390    type CtrlCmd;
391
392    // TODO(DEDUP): do these APIs make sense?
393    // What should we return here, maybe some instructions for the base reactor
394    // to do something?
395
396    /// Handle a control command.
397    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
398
399    /// Handle a control message.
400    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
401}
402
403#[allow(unused)] // TODO(relay)
404impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
405    Reactor<R, F, B>
406{
407    /// Create a new circuit reactor.
408    ///
409    /// The reactor will send outbound messages on `channel`, receive incoming
410    /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
411    /// [`CircId`] provided.
412    ///
413    /// The internal unique identifier for this circuit will be `unique_id`.
414    #[allow(clippy::too_many_arguments)] // TODO
415    pub(crate) fn new(
416        runtime: R,
417        channel: &Arc<Channel>,
418        circ_id: CircId,
419        unique_id: UniqId,
420        inbound_chan_rx: CircuitRxReceiver,
421        forward_impl: F,
422        backward_impl: B,
423        hop_mgr: HopMgr<R>,
424        padding_ctrl: PaddingController,
425        padding_event_stream: PaddingEventStream,
426        // The sending end of this channel should be in HopMgr
427        bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
428        fwd_events: mpsc::Receiver<F::CircEvent>,
429        memquota: &CircuitAccount,
430    ) -> (Self, CircReactorHandle<F, B>) {
431        // NOTE: not registering this channel with the memquota subsystem is okay,
432        // because it has no buffering (if ever decide to make the size of this buffer
433        // non-zero for whatever reason, we must remember to register it with memquota
434        // so that it counts towards the total memory usage for the circuit.
435        #[allow(clippy::disallowed_methods)]
436        let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
437
438        // TODO: channels galore
439        let (control_tx, control_rx) = mpsc::unbounded();
440        let (command_tx, command_rx) = mpsc::unbounded();
441
442        let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
443        let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
444        let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
445        let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
446
447        let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
448        let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
449
450        let handle = CircReactorHandle {
451            control: control_tx,
452            command: command_tx,
453            time_provider: DynTimeProvider::new(runtime.clone()),
454            memquota: memquota.clone(),
455        };
456
457        /// Grab a handle to the hop list (it's needed by the BWD)
458        let hops = Arc::clone(hop_mgr.hops());
459        let forward = ForwardReactor::new(
460            runtime.clone(),
461            unique_id,
462            forward_impl,
463            hop_mgr,
464            inbound_chan_rx,
465            fwd_control_rx,
466            fwd_command_rx,
467            backward_reactor_tx,
468            fwd_events,
469            padding_ctrl.clone(),
470        );
471
472        let backward = BackwardReactor::new(
473            runtime,
474            channel,
475            circ_id,
476            unique_id,
477            backward_impl,
478            hops,
479            forward_reactor_rx,
480            bwd_control_rx,
481            bwd_command_rx,
482            padding_ctrl,
483            padding_event_stream,
484            bwd_rx,
485        );
486
487        let reactor = Reactor {
488            unique_id,
489            forward: Some(forward),
490            backward: Some(backward),
491            control: control_rx,
492            command: command_rx,
493            fwd_ctrl,
494            bwd_ctrl,
495        };
496
497        (reactor, handle)
498    }
499
500    /// Helper for [`run`](Self::run).
501    pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
502        let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
503            .expect("relay reactor spawned twice?!");
504
505        let mut forward = Box::pin(forward.run()).fuse();
506        let mut backward = Box::pin(backward.run()).fuse();
507        loop {
508            // If either of these completes, this function returns,
509            // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
510            // cause the remaining reactor, if there is one, to shut down too
511            select_biased! {
512                res = self.command.next() => {
513                    let Some(cmd) = res else {
514                        trace!(
515                            circ_id = %self.unique_id,
516                            reason = "command channel drop",
517                            "reactor shutdown",
518                        );
519
520                        return Err(ReactorError::Shutdown);
521                    };
522
523                    self.handle_command(cmd)?;
524                },
525                res = self.control.next() => {
526                    let Some(msg) = res else {
527                        trace!(
528                            circ_id = %self.unique_id,
529                            reason = "control channel drop",
530                            "reactor shutdown",
531                        );
532
533                        return Err(ReactorError::Shutdown);
534                    };
535
536                    self.handle_control(msg)?;
537                },
538                // No need to log the error here, because it was already logged
539                // by the reactor that shut down
540                res = forward => return Ok(res?),
541                res = backward => return Ok(res?),
542            }
543        }
544    }
545
546    /// Handle a shutdown request.
547    fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
548        trace!(
549            tunnel_id = %self.unique_id,
550            "reactor shutdown due to explicit request",
551        );
552
553        Err(ReactorError::Shutdown)
554    }
555
556    /// Handle a [`CtrlCmd`].
557    fn handle_command(
558        &mut self,
559        cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
560    ) -> StdResult<(), ReactorError> {
561        match cmd {
562            CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
563            CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
564            CtrlCmd::Shutdown => self.handle_shutdown(),
565        }
566    }
567
568    /// Handle a [`CtrlMsg`].
569    fn handle_control(
570        &mut self,
571        cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
572    ) -> StdResult<(), ReactorError> {
573        match cmd {
574            CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
575            CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
576        }
577    }
578}
579
580#[cfg(test)]
581pub(crate) mod test {
582    // @@ begin test lint list maintained by maint/add_warning @@
583    #![allow(clippy::bool_assert_comparison)]
584    #![allow(clippy::clone_on_copy)]
585    #![allow(clippy::dbg_macro)]
586    #![allow(clippy::mixed_attributes_style)]
587    #![allow(clippy::print_stderr)]
588    #![allow(clippy::print_stdout)]
589    #![allow(clippy::single_char_pattern)]
590    #![allow(clippy::unwrap_used)]
591    #![allow(clippy::unchecked_time_subtraction)]
592    #![allow(clippy::useless_vec)]
593    #![allow(clippy::needless_pass_by_value)]
594    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
595
596    use tor_basic_utils::test_rng::testing_rng;
597    use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
598    use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
599
600    use chanmsg::AnyChanMsg;
601
602    #[cfg(feature = "hs-service")]
603    use crate::client::stream::IncomingStreamRequestFilter;
604
605    pub(crate) fn rmsg_to_ccmsg(
606        id: Option<StreamId>,
607        msg: relaymsg::AnyRelayMsg,
608        early: bool,
609    ) -> AnyChanMsg {
610        // TODO #1947: test other formats.
611        let rfmt = RelayCellFormat::V0;
612        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
613            .encode(rfmt, &mut testing_rng())
614            .unwrap();
615        let chanmsg = chanmsg::Relay::from(body);
616
617        if early {
618            let chanmsg = chanmsg::RelayEarly::from(chanmsg);
619            AnyChanMsg::RelayEarly(chanmsg)
620        } else {
621            AnyChanMsg::Relay(chanmsg)
622        }
623    }
624
625    #[cfg(any(feature = "hs-service", feature = "relay"))]
626    pub(crate) struct AllowAllStreamsFilter;
627    #[cfg(any(feature = "hs-service", feature = "relay"))]
628    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
629        fn disposition(
630            &mut self,
631            _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
632            _circ: &crate::circuit::CircHopSyncView<'_>,
633        ) -> crate::Result<crate::client::stream::IncomingStreamRequestDisposition> {
634            Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
635        }
636    }
637}