Skip to main content

tor_proto/relay/
reactor.rs

1//! Module exposing the relay circuit reactor subsystem.
2//!
3//! See [`reactor`](crate::circuit::reactor) for a description of the overall architecture.
4//!
5//! #### `ForwardReactor`
6//!
7//! It handles
8//!
9//!  * unrecognized RELAY cells, by moving them in the forward direction (towards the exit)
10//!  * recognized RELAY cells, by splitting each cell into messages, and handling
11//!    each message individually as described in the table below
12//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
13//!  * RELAY_EARLY cells (**not yet implemented**)
14//!  * DESTROY cells (**not yet implemented**)
15//!  * PADDING_NEGOTIATE cells (**not yet implemented**)
16//!
17//! ```text
18//!
19//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
20//!
21//! | RELAY cmd         | Received in | Handled in | Description                            |
22//! |-------------------|-------------|------------|----------------------------------------|
23//! | DROP              | F           | F          | Passed to PaddingController for        |
24//! |                   |             |            | validation                             |
25//! |-------------------|-------------|------------|----------------------------------------|
26//! | EXTEND2           | F           |            | Handled by instructing the channel     |
27//! |                   |             |            | provider to launch a new channel, and  |
28//! |                   |             |            | waiting for the new channel on its     |
29//! |                   |             |            | outgoing_chan_rx receiver              |
30//! |                   |             |            | (**not yet implemented**)              |
31//! |-------------------|-------------|------------|----------------------------------------|
32//! | TRUNCATE          | F           | F          | (**not yet implemented**)              |
33//! |                   |             |            |                                        |
34//! |-------------------|-------------|------------|----------------------------------------|
35//! | TODO              |             |            |                                        |
36//! |                   |             |            |                                        |
37//! ```
38
39pub(crate) mod backward;
40pub(crate) mod forward;
41
42use std::sync::Arc;
43use std::time::Duration;
44
45use futures::channel::mpsc;
46
47use tor_cell::chancell::CircId;
48use tor_linkspec::OwnedChanTarget;
49use tor_rtcompat::Runtime;
50
51use crate::channel::Channel;
52use crate::circuit::circhop::{CircHopOutbound, HopSettings};
53use crate::circuit::reactor::Reactor as BaseReactor;
54use crate::circuit::reactor::hop_mgr::HopMgr;
55use crate::circuit::reactor::stream;
56use crate::circuit::{CircuitRxReceiver, UniqId};
57use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
58use crate::memquota::CircuitAccount;
59use crate::relay::RelayCirc;
60use crate::relay::channel_provider::ChannelProvider;
61use crate::relay::reactor::backward::Backward;
62use crate::relay::reactor::forward::Forward;
63
64// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
65use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
66
67/// Type-alias for the relay base reactor type.
68type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
69
70/// The entry point of the circuit reactor subsystem.
71#[allow(unused)] // TODO(relay)
72#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
73pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
74
75/// A handler customizing the relay stream reactor.
76struct StreamHandler;
77
78impl stream::StreamHandler for StreamHandler {
79    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
80        let ccontrol = hop.ccontrol();
81
82        // Note: if we have no measurements for the RTT, this will be set to 0,
83        // so the stream will be removed from the stream map immediately,
84        // and any subsequent messages arriving on it will trigger
85        // a proto violation causing the circuit to close.
86        //
87        // TODO(relay-tuning): we should make sure that this doesn't cause us to
88        // wrongly close legitimate circuits that still have in-flight stream data
89        ccontrol
90            .lock()
91            .expect("poisoned lock")
92            .rtt()
93            .max_rtt_usec()
94            .map(|rtt| Duration::from_millis(u64::from(rtt)))
95            // TODO(relay): we should fallback to a non-zero default here
96            // if we don't have any RTT measurements yet
97            .unwrap_or_default()
98    }
99}
100
101#[allow(unused)] // TODO(relay)
102impl<R: Runtime> Reactor<R> {
103    /// Create a new circuit reactor.
104    ///
105    /// The reactor will send outbound messages on `channel`, receive incoming
106    /// messages on `input`, and identify this circuit by the channel-local
107    /// [`CircId`] provided.
108    ///
109    /// The internal unique identifier for this circuit will be `unique_id`.
110    #[allow(clippy::too_many_arguments)] // TODO
111    pub(crate) fn new(
112        runtime: R,
113        channel: &Arc<Channel>,
114        circ_id: CircId,
115        unique_id: UniqId,
116        input: CircuitRxReceiver,
117        crypto_in: Box<dyn InboundRelayLayer + Send>,
118        crypto_out: Box<dyn OutboundRelayLayer + Send>,
119        settings: &HopSettings,
120        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
121        padding_ctrl: PaddingController,
122        padding_event_stream: PaddingEventStream,
123        memquota: &CircuitAccount,
124    ) -> crate::Result<(Self, Arc<RelayCirc>)> {
125        // NOTE: not registering this channel with the memquota subsystem is okay,
126        // because it has no buffering (if ever decide to make the size of this buffer
127        // non-zero for whatever reason, we must remember to register it with memquota
128        // so that it counts towards the total memory usage for the circuit.
129        #[allow(clippy::disallowed_methods)]
130        let (stream_tx, stream_rx) = mpsc::channel(0);
131
132        let mut hop_mgr = HopMgr::new(
133            runtime.clone(),
134            unique_id,
135            StreamHandler,
136            stream_tx,
137            memquota.clone(),
138        );
139
140        // On the relay side, we always have one "hop" (ourselves).
141        //
142        // Clients will need to call this function in response to CtrlMsg::Create
143        // (TODO: for clients, we probably will need to store a bunch more state here)
144        hop_mgr.add_hop(settings.clone())?;
145
146        // TODO(relay): currently we don't need buffering on this channel,
147        // but we might need it if we start using it for more than just EXTENDED2 events
148        #[allow(clippy::disallowed_methods)]
149        let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
150        let forward = Forward::new(
151            channel,
152            unique_id,
153            crypto_out,
154            chan_provider,
155            fwd_ev_tx,
156            memquota.clone(),
157        );
158        let backward = Backward::new(crypto_in);
159
160        let (inner, handle) = BaseReactor::new(
161            runtime,
162            channel,
163            circ_id,
164            unique_id,
165            input,
166            forward,
167            backward,
168            hop_mgr,
169            padding_ctrl,
170            padding_event_stream,
171            stream_rx,
172            fwd_ev_rx,
173            memquota,
174        );
175
176        let reactor = Self(inner);
177        let handle = Arc::new(RelayCirc(handle));
178
179        Ok((reactor, handle))
180    }
181
182    /// Launch the reactor, and run until the circuit closes or we
183    /// encounter an error.
184    ///
185    /// Once this method returns, the circuit is dead and cannot be
186    /// used again.
187    pub(crate) async fn run(mut self) -> crate::Result<()> {
188        self.0.run().await
189    }
190}
191
192#[cfg(test)]
193pub(crate) mod test {
194    // @@ begin test lint list maintained by maint/add_warning @@
195    #![allow(clippy::bool_assert_comparison)]
196    #![allow(clippy::clone_on_copy)]
197    #![allow(clippy::dbg_macro)]
198    #![allow(clippy::mixed_attributes_style)]
199    #![allow(clippy::print_stderr)]
200    #![allow(clippy::print_stdout)]
201    #![allow(clippy::single_char_pattern)]
202    #![allow(clippy::unwrap_used)]
203    #![allow(clippy::unchecked_time_subtraction)]
204    #![allow(clippy::useless_vec)]
205    #![allow(clippy::needless_pass_by_value)]
206    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
207
208    use super::*;
209    use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
210    use crate::circuit::{CircParameters, CircuitRxSender};
211    use crate::client::circuit::padding::new_padding;
212    use crate::congestion::test_utils::params::build_cc_vegas_params;
213    use crate::crypto::cell::RelayCellBody;
214    use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
215    use crate::fake_mpsc;
216    use crate::memquota::SpecificAccount as _;
217    use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel};
218    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
219    use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter};
220
221    use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _};
222    use tracing_test::traced_test;
223
224    use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg};
225    use tor_cell::relaycell::{
226        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg,
227    };
228    use tor_linkspec::{EncodedLinkSpec, HasRelayIds, LinkSpec};
229    use tor_protover::{Protocols, named};
230    use tor_rtcompat::SpawnExt;
231    use tor_rtcompat::{DynTimeProvider, Runtime};
232    use tor_rtmock::MockRuntime;
233
234    use chanmsg::{AnyChanMsg, DestroyReason, HandshakeType};
235    use relaymsg::SendmeTag;
236
237    use std::net::IpAddr;
238    use std::sync::{Arc, Mutex, mpsc};
239
240    // An inbound encryption layer that doesn't do any crypto.
241    struct DummyInboundCrypto {}
242
243    // An outbound encryption layer that doesn't do any crypto.
244    struct DummyOutboundCrypto {
245        /// Channel for controlling whether the current cell is meant for us or not.
246        ///
247        /// Useful for tests that check if recognized/unrecognized
248        /// cells are handled/forwarded correctly.
249        recognized_rx: mpsc::Receiver<Recognized>,
250    }
251
252    const DUMMY_TAG: [u8; 20] = [1; 20];
253
254    impl InboundRelayLayer for DummyInboundCrypto {
255        fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
256            DUMMY_TAG.into()
257        }
258
259        fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
260    }
261
262    impl OutboundRelayLayer for DummyOutboundCrypto {
263        fn decrypt_outbound(
264            &mut self,
265            _cmd: ChanCmd,
266            _cell: &mut RelayCellBody,
267        ) -> Option<SendmeTag> {
268            // Note: this should never block.
269            let recognized = self.recognized_rx.recv().unwrap();
270
271            match recognized {
272                Recognized::Yes => Some(DUMMY_TAG.into()),
273                Recognized::No => None,
274            }
275        }
276    }
277
278    struct ReactorTestCtrl {
279        /// The relay circuit handle.
280        relay_circ: Arc<RelayCirc>,
281        /// Mock channel -> circuit reactor MPSC channel.
282        circmsg_send: CircuitRxSender,
283        /// The inbound channel ("towards the client").
284        inbound_chan: DummyChan,
285        /// The outbound channel ("away from the client"), if any.
286        ///
287        /// Shared with the DummyChanProvider, which initializes this
288        /// when the relay reactor launches a channel to the next hop
289        /// via `get_or_launch()`.
290        outbound_chan: Arc<Mutex<Option<DummyChan>>>,
291        /// MPSC channel for telling the DummyOutboundCrypto that the next
292        /// cell we're about to send to the reactor should be "recognized".
293        recognized_tx: mpsc::Sender<Recognized>,
294    }
295
296    /// Whether a forward cell to send should be "recognized"
297    /// or "unrecognized" by the relay under test.
298    enum Recognized {
299        /// Recognized
300        Yes,
301        /// Unrecognized
302        No,
303    }
304
305    impl ReactorTestCtrl {
306        /// Spawn a relay circuit reactor, returning a `ReactorTestCtrl` for
307        /// controlling it.
308        fn spawn_reactor<R: Runtime>(rt: &R) -> Self {
309            let inbound_chan = working_dummy_channel(rt);
310            let circid = CircId::new(1337).unwrap();
311            let unique_id = UniqId::new(8, 17);
312            let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
313            let (circmsg_send, circmsg_recv) = fake_mpsc(64);
314            let params = CircParameters::new(
315                true,
316                build_cc_vegas_params(),
317                FlowCtrlParameters::defaults_for_tests(),
318            );
319            let settings = HopSettings::from_params_and_caps(
320                crate::circuit::circhop::HopNegotiationType::Full,
321                &params,
322                &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
323            )
324            .unwrap();
325
326            let outbound_chan = Arc::new(Mutex::new(None));
327            let (recognized_tx, recognized_rx) = mpsc::channel();
328            let chan_provider = Arc::new(DummyChanProvider::new(
329                rt.clone(),
330                Arc::clone(&outbound_chan),
331            ));
332
333            let (reactor, relay_circ) = Reactor::new(
334                rt.clone(),
335                &Arc::clone(&inbound_chan.channel),
336                circid,
337                unique_id,
338                circmsg_recv,
339                Box::new(DummyInboundCrypto {}),
340                Box::new(DummyOutboundCrypto { recognized_rx }),
341                &settings,
342                chan_provider,
343                padding_ctrl,
344                padding_stream,
345                &CircuitAccount::new_noop(),
346            )
347            .unwrap();
348
349            rt.spawn(async {
350                let _ = reactor.run().await;
351            })
352            .unwrap();
353
354            Self {
355                relay_circ,
356                circmsg_send,
357                recognized_tx,
358                inbound_chan,
359                outbound_chan,
360            }
361        }
362
363        /// Simulate the sending of a forward relay message through our relay.
364        async fn send_fwd(
365            &mut self,
366            id: Option<StreamId>,
367            msg: relaymsg::AnyRelayMsg,
368            recognized: Recognized,
369            early: bool,
370        ) {
371            // This a bit janky, but for each forward cell we send to the reactor
372            // we need to send a bit of metadata to the DummyOutboundLayer
373            // specifying whether the cell should be treated as recognized
374            // or unrecognized
375            self.recognized_tx.send(recognized).unwrap();
376            self.circmsg_send
377                .send(rmsg_to_ccmsg(id, msg, early))
378                .await
379                .unwrap();
380        }
381
382        /// Whether the reactor opened an outbound channel
383        /// (i.e. a channel to the next relay in the circuit).
384        fn outbound_chan_launched(&self) -> bool {
385            self.outbound_chan.lock().unwrap().is_some()
386        }
387
388        /// Allow inbound stream requests.
389        ///
390        /// Used for testing leaky pipe and exit functionality.
391        async fn allow_stream_requests<'a, FILT>(
392            &self,
393            allow_commands: &'a [RelayCmd],
394            filter: FILT,
395        ) -> impl futures::Stream<Item = IncomingStream> + use<'a, FILT>
396        where
397            FILT: IncomingStreamRequestFilter,
398        {
399            Arc::clone(&self.relay_circ)
400                .allow_stream_requests(allow_commands, filter)
401                .await
402                .unwrap()
403        }
404
405        /// Perform the CREATE2 handshake.
406        async fn do_create2_handshake(
407            &mut self,
408            rt: &MockRuntime,
409            expected_hs_type: HandshakeType,
410        ) {
411            // First, check that the reactor actually sent a CREATE2 to the next hop...
412            let (circid, msg) = self.read_outbound().into_circid_and_msg();
413            let _create2 = match msg {
414                chanmsg::AnyChanMsg::Create2(c) => {
415                    assert_eq!(c.handshake_type(), expected_hs_type);
416                    c
417                }
418                _ => panic!("unexpected forwarded {msg:?}"),
419            };
420
421            let handshake = vec![];
422            let created2 = chanmsg::Created2::new(handshake);
423            // ...and then finalize the handshake by pretending to be
424            // the responding relay
425            self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
426            rt.advance_until_stalled().await;
427        }
428
429        /// Whether the circuit is closing (e.g. due to a proto violation).
430        fn is_closing(&self) -> bool {
431            self.relay_circ.is_closing()
432        }
433
434        /// Read a cell from the inbound channel
435        /// (moving towards the client).
436        ///
437        /// Panics if there are no ready cells on the inbound MPSC channel.
438        fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
439            #[allow(deprecated)] // TODO(#2386)
440            self.inbound_chan.rx.try_next().unwrap().unwrap()
441        }
442
443        /// Read a cell from the outbound channel
444        /// (moving towards the next hop).
445        ///
446        /// Panics if there are no ready cells on the outbound MPSC channel.
447        fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
448            let mut lock = self.outbound_chan.lock().unwrap();
449            let chan = lock.as_mut().unwrap();
450            #[allow(deprecated)] // TODO(#2386)
451            chan.rx.try_next().unwrap().unwrap()
452        }
453
454        /// Write to the sending end of the outbound Tor channel.
455        ///
456        /// Simulates the receipt of a cell from the next hop.
457        ///
458        /// Panics if the outbound chan sender is full.
459        fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
460            let mut lock = self.outbound_chan.lock().unwrap();
461            let chan = lock.as_mut().unwrap();
462            let cell = ChanCell::new(circid, msg);
463
464            chan.tx.try_send(Ok(cell)).unwrap();
465        }
466    }
467
468    fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
469        vec![
470            LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
471            LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
472            LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
473                .encode()
474                .unwrap(),
475        ]
476    }
477
478    /// Assert that the relay circuit is shutting down.
479    ///
480    /// Also asserts that the next cell on the inbound channel
481    /// is a DESTROY with the specified `reason`.
482    /// The test is expected to drain the inbound Tor "channel"
483    /// of any non-ending cells it might be expecting before calling this function.
484    fn assert_circuit_destroyed(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
485        assert!(ctrl.is_closing());
486
487        let cell = ctrl.read_inbound();
488
489        match cell.msg() {
490            chanmsg::AnyChanMsg::Destroy(d) => {
491                assert_eq!(d.reason(), reason);
492            }
493            _ => panic!("unexpected ending {cell:?}"),
494        }
495    }
496
497    #[traced_test]
498    #[test]
499    fn reject_extend2_relay() {
500        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
501            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
502            rt.advance_until_stalled().await;
503
504            let linkspecs = dummy_linkspecs();
505            let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
506            ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
507            rt.advance_until_stalled().await;
508
509            assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
510            assert!(!ctrl.outbound_chan_launched());
511            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
512        });
513    }
514
515    #[traced_test]
516    #[test]
517    fn reject_extend2_previous_hop() {
518        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
519            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
520            rt.advance_until_stalled().await;
521
522            // No outbound circuits yet
523            assert!(!ctrl.outbound_chan_launched());
524
525            // Build a linkspec with the identities of the dummy channel
526            let mut linkspecs = ctrl
527                .inbound_chan
528                .channel
529                .target()
530                .identities()
531                .map(|id| LinkSpec::from(id.to_owned()).encode())
532                .collect::<Result<Vec<_>, _>>()
533                .unwrap();
534
535            // Make sure this channel actually has some identities
536            // (i.e. that it's not a client channel or something)
537            assert_eq!(linkspecs.len(), 2);
538
539            // There must be at least one IPv4 OR port address
540            linkspecs.push(
541                LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
542                    .encode()
543                    .unwrap(),
544            );
545            let handshake_type = HandshakeType::NTOR_V3;
546            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
547            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
548            rt.advance_until_stalled().await;
549
550            // The reactor handled the EXTEND2 and launched an outbound channel
551            assert!(logs_contain("Cannot extend circuit to previous hop"));
552            assert!(!ctrl.outbound_chan_launched());
553            assert!(ctrl.is_closing());
554        });
555    }
556
557    #[traced_test]
558    #[test]
559    fn extend_and_forward() {
560        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
561            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
562            rt.advance_until_stalled().await;
563
564            // No outbound circuits yet
565            assert!(!ctrl.outbound_chan_launched());
566
567            let linkspecs = dummy_linkspecs();
568            let handshake_type = HandshakeType::NTOR_V3;
569            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
570            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
571            rt.advance_until_stalled().await;
572
573            // The reactor handled the EXTEND2 and launched an outbound channel
574            assert!(logs_contain(
575                "Launched channel to the next hop circ_id=Circ 8.17"
576            ));
577            assert!(ctrl.outbound_chan_launched());
578            assert!(!ctrl.is_closing());
579
580            ctrl.do_create2_handshake(&rt, handshake_type).await;
581            assert!(logs_contain("Got CREATED2 response from next hop"));
582            assert!(logs_contain("Extended circuit to the next hop"));
583
584            // Time to forward a message to the next hop!
585            let early = false;
586            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
587            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
588                .await;
589            rt.advance_until_stalled().await;
590
591            macro_rules! expect_cell {
592                ($chanmsg:tt, $relaymsg:tt) => {{
593                    let cell = ctrl.read_outbound();
594                    let msg = match cell.msg() {
595                        chanmsg::AnyChanMsg::$chanmsg(m) => {
596                            let body = m.clone().into_relay_body();
597                            AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
598                        }
599                        _ => panic!("unexpected forwarded {cell:?}"),
600                    };
601
602                    match msg.msg() {
603                        relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
604                        _ => panic!("unexpected cell {msg:?}"),
605                    }
606                }};
607            }
608
609            // Ensure the other end received the BEGIN cell
610            let recvd_begin = expect_cell!(Relay, Begin);
611            assert_eq!(begin, recvd_begin);
612
613            // Now send the same message again, but this time in a RELAY_EARLY
614            let early = true;
615            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
616            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
617                .await;
618            rt.advance_until_stalled().await;
619            let recvd_begin = expect_cell!(RelayEarly, Begin);
620            assert_eq!(begin, recvd_begin);
621        });
622    }
623
624    #[traced_test]
625    #[test]
626    fn forward_before_extend() {
627        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
628            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
629            rt.advance_until_stalled().await;
630
631            // Send an arbitrary unrecognized cell. The reactor should flag this as
632            // a protocol violation, because we don't have an outbound channel to forward it on.
633            let extend2 = relaymsg::End::new_misc().into();
634            ctrl.send_fwd(None, extend2, Recognized::No, true).await;
635            rt.advance_until_stalled().await;
636
637            // The reactor handled the EXTEND2 and launched an outbound channel
638            assert!(logs_contain(
639                "Asked to forward cell before the circuit was extended?!"
640            ));
641            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
642        });
643    }
644
645    #[traced_test]
646    #[test]
647    fn reject_invalid_begin() {
648        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
649            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
650            rt.advance_until_stalled().await;
651
652            let _streams = ctrl
653                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
654                .await;
655
656            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
657
658            // BEGIN cells *must* have a stream ID, so expect the reactor to reject this
659            // and close the circuit
660            ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
661            rt.advance_until_stalled().await;
662
663            assert!(logs_contain(
664                "Invalid stream ID [scrubbed] for relay command BEGIN"
665            ));
666            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
667        });
668    }
669
670    #[traced_test]
671    #[test]
672    #[ignore] // TODO(relay): Sad trombone, this is not yet supported
673    fn data_stream() {
674        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
675            const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
676
677            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
678            rt.advance_until_stalled().await;
679
680            let mut incoming_streams = ctrl
681                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
682                .await;
683
684            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
685            ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
686                .await;
687            rt.advance_until_stalled().await;
688
689            let data = relaymsg::Data::new(TO_SEND).unwrap().into();
690            ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
691                .await;
692
693            // We should have a pending incoming stream
694            let pending = incoming_streams.next().await.unwrap();
695
696            // Accept it, and let's see what we have!
697            let mut stream = pending
698                .accept_data(relaymsg::Connected::new_empty())
699                .await
700                .unwrap();
701
702            let mut recv_buf = [0_u8; TO_SEND.len()];
703            stream.read_exact(&mut recv_buf).await.unwrap();
704            assert_eq!(recv_buf, TO_SEND);
705        });
706    }
707}