1pub(crate) mod backward;
40pub(crate) mod forward;
41
42use std::sync::Arc;
43use std::time::Duration;
44
45use futures::channel::mpsc;
46
47use tor_cell::chancell::CircId;
48use tor_linkspec::OwnedChanTarget;
49use tor_rtcompat::Runtime;
50
51use crate::channel::Channel;
52use crate::circuit::circhop::{CircHopOutbound, HopSettings};
53use crate::circuit::reactor::Reactor as BaseReactor;
54use crate::circuit::reactor::hop_mgr::HopMgr;
55use crate::circuit::reactor::stream;
56use crate::circuit::{CircuitRxReceiver, UniqId};
57use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
58use crate::memquota::CircuitAccount;
59use crate::relay::RelayCirc;
60use crate::relay::channel_provider::ChannelProvider;
61use crate::relay::reactor::backward::Backward;
62use crate::relay::reactor::forward::Forward;
63
64use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
66
67type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
69
70#[allow(unused)] #[must_use = "If you don't call run() on a reactor, the circuit won't work."]
73pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
74
75struct StreamHandler;
77
78impl stream::StreamHandler for StreamHandler {
79 fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
80 let ccontrol = hop.ccontrol();
81
82 ccontrol
90 .lock()
91 .expect("poisoned lock")
92 .rtt()
93 .max_rtt_usec()
94 .map(|rtt| Duration::from_millis(u64::from(rtt)))
95 .unwrap_or_default()
98 }
99}
100
101#[allow(unused)] impl<R: Runtime> Reactor<R> {
103 #[allow(clippy::too_many_arguments)] pub(crate) fn new(
112 runtime: R,
113 channel: &Arc<Channel>,
114 circ_id: CircId,
115 unique_id: UniqId,
116 input: CircuitRxReceiver,
117 crypto_in: Box<dyn InboundRelayLayer + Send>,
118 crypto_out: Box<dyn OutboundRelayLayer + Send>,
119 settings: &HopSettings,
120 chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
121 padding_ctrl: PaddingController,
122 padding_event_stream: PaddingEventStream,
123 memquota: &CircuitAccount,
124 ) -> crate::Result<(Self, Arc<RelayCirc>)> {
125 #[allow(clippy::disallowed_methods)]
130 let (stream_tx, stream_rx) = mpsc::channel(0);
131
132 let mut hop_mgr = HopMgr::new(
133 runtime.clone(),
134 unique_id,
135 StreamHandler,
136 stream_tx,
137 memquota.clone(),
138 );
139
140 hop_mgr.add_hop(settings.clone())?;
145
146 #[allow(clippy::disallowed_methods)]
149 let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
150 let forward = Forward::new(
151 channel,
152 unique_id,
153 crypto_out,
154 chan_provider,
155 fwd_ev_tx,
156 memquota.clone(),
157 );
158 let backward = Backward::new(crypto_in);
159
160 let (inner, handle) = BaseReactor::new(
161 runtime,
162 channel,
163 circ_id,
164 unique_id,
165 input,
166 forward,
167 backward,
168 hop_mgr,
169 padding_ctrl,
170 padding_event_stream,
171 stream_rx,
172 fwd_ev_rx,
173 memquota,
174 );
175
176 let reactor = Self(inner);
177 let handle = Arc::new(RelayCirc(handle));
178
179 Ok((reactor, handle))
180 }
181
182 pub(crate) async fn run(mut self) -> crate::Result<()> {
188 self.0.run().await
189 }
190}
191
192#[cfg(test)]
193pub(crate) mod test {
194 #![allow(clippy::bool_assert_comparison)]
196 #![allow(clippy::clone_on_copy)]
197 #![allow(clippy::dbg_macro)]
198 #![allow(clippy::mixed_attributes_style)]
199 #![allow(clippy::print_stderr)]
200 #![allow(clippy::print_stdout)]
201 #![allow(clippy::single_char_pattern)]
202 #![allow(clippy::unwrap_used)]
203 #![allow(clippy::unchecked_time_subtraction)]
204 #![allow(clippy::useless_vec)]
205 #![allow(clippy::needless_pass_by_value)]
206 use super::*;
209 use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
210 use crate::circuit::{CircParameters, CircuitRxSender};
211 use crate::client::circuit::padding::new_padding;
212 use crate::congestion::test_utils::params::build_cc_vegas_params;
213 use crate::crypto::cell::RelayCellBody;
214 use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
215 use crate::fake_mpsc;
216 use crate::memquota::SpecificAccount as _;
217 use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel};
218 use crate::stream::flow_ctrl::params::FlowCtrlParameters;
219 use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter};
220
221 use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _};
222 use tracing_test::traced_test;
223
224 use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg};
225 use tor_cell::relaycell::{
226 AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg,
227 };
228 use tor_linkspec::{EncodedLinkSpec, HasRelayIds, LinkSpec};
229 use tor_protover::{Protocols, named};
230 use tor_rtcompat::SpawnExt;
231 use tor_rtcompat::{DynTimeProvider, Runtime};
232 use tor_rtmock::MockRuntime;
233
234 use chanmsg::{AnyChanMsg, DestroyReason, HandshakeType};
235 use relaymsg::SendmeTag;
236
237 use std::net::IpAddr;
238 use std::sync::{Arc, Mutex, mpsc};
239
240 struct DummyInboundCrypto {}
242
243 struct DummyOutboundCrypto {
245 recognized_rx: mpsc::Receiver<Recognized>,
250 }
251
252 const DUMMY_TAG: [u8; 20] = [1; 20];
253
254 impl InboundRelayLayer for DummyInboundCrypto {
255 fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
256 DUMMY_TAG.into()
257 }
258
259 fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
260 }
261
262 impl OutboundRelayLayer for DummyOutboundCrypto {
263 fn decrypt_outbound(
264 &mut self,
265 _cmd: ChanCmd,
266 _cell: &mut RelayCellBody,
267 ) -> Option<SendmeTag> {
268 let recognized = self.recognized_rx.recv().unwrap();
270
271 match recognized {
272 Recognized::Yes => Some(DUMMY_TAG.into()),
273 Recognized::No => None,
274 }
275 }
276 }
277
278 struct ReactorTestCtrl {
279 relay_circ: Arc<RelayCirc>,
281 circmsg_send: CircuitRxSender,
283 inbound_chan: DummyChan,
285 outbound_chan: Arc<Mutex<Option<DummyChan>>>,
291 recognized_tx: mpsc::Sender<Recognized>,
294 }
295
296 enum Recognized {
299 Yes,
301 No,
303 }
304
305 impl ReactorTestCtrl {
306 fn spawn_reactor<R: Runtime>(rt: &R) -> Self {
309 let inbound_chan = working_dummy_channel(rt);
310 let circid = CircId::new(1337).unwrap();
311 let unique_id = UniqId::new(8, 17);
312 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
313 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
314 let params = CircParameters::new(
315 true,
316 build_cc_vegas_params(),
317 FlowCtrlParameters::defaults_for_tests(),
318 );
319 let settings = HopSettings::from_params_and_caps(
320 crate::circuit::circhop::HopNegotiationType::Full,
321 ¶ms,
322 &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
323 )
324 .unwrap();
325
326 let outbound_chan = Arc::new(Mutex::new(None));
327 let (recognized_tx, recognized_rx) = mpsc::channel();
328 let chan_provider = Arc::new(DummyChanProvider::new(
329 rt.clone(),
330 Arc::clone(&outbound_chan),
331 ));
332
333 let (reactor, relay_circ) = Reactor::new(
334 rt.clone(),
335 &Arc::clone(&inbound_chan.channel),
336 circid,
337 unique_id,
338 circmsg_recv,
339 Box::new(DummyInboundCrypto {}),
340 Box::new(DummyOutboundCrypto { recognized_rx }),
341 &settings,
342 chan_provider,
343 padding_ctrl,
344 padding_stream,
345 &CircuitAccount::new_noop(),
346 )
347 .unwrap();
348
349 rt.spawn(async {
350 let _ = reactor.run().await;
351 })
352 .unwrap();
353
354 Self {
355 relay_circ,
356 circmsg_send,
357 recognized_tx,
358 inbound_chan,
359 outbound_chan,
360 }
361 }
362
363 async fn send_fwd(
365 &mut self,
366 id: Option<StreamId>,
367 msg: relaymsg::AnyRelayMsg,
368 recognized: Recognized,
369 early: bool,
370 ) {
371 self.recognized_tx.send(recognized).unwrap();
376 self.circmsg_send
377 .send(rmsg_to_ccmsg(id, msg, early))
378 .await
379 .unwrap();
380 }
381
382 fn outbound_chan_launched(&self) -> bool {
385 self.outbound_chan.lock().unwrap().is_some()
386 }
387
388 async fn allow_stream_requests<'a, FILT>(
392 &self,
393 allow_commands: &'a [RelayCmd],
394 filter: FILT,
395 ) -> impl futures::Stream<Item = IncomingStream> + use<'a, FILT>
396 where
397 FILT: IncomingStreamRequestFilter,
398 {
399 Arc::clone(&self.relay_circ)
400 .allow_stream_requests(allow_commands, filter)
401 .await
402 .unwrap()
403 }
404
405 async fn do_create2_handshake(
407 &mut self,
408 rt: &MockRuntime,
409 expected_hs_type: HandshakeType,
410 ) {
411 let (circid, msg) = self.read_outbound().into_circid_and_msg();
413 let _create2 = match msg {
414 chanmsg::AnyChanMsg::Create2(c) => {
415 assert_eq!(c.handshake_type(), expected_hs_type);
416 c
417 }
418 _ => panic!("unexpected forwarded {msg:?}"),
419 };
420
421 let handshake = vec![];
422 let created2 = chanmsg::Created2::new(handshake);
423 self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
426 rt.advance_until_stalled().await;
427 }
428
429 fn is_closing(&self) -> bool {
431 self.relay_circ.is_closing()
432 }
433
434 fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
439 #[allow(deprecated)] self.inbound_chan.rx.try_next().unwrap().unwrap()
441 }
442
443 fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
448 let mut lock = self.outbound_chan.lock().unwrap();
449 let chan = lock.as_mut().unwrap();
450 #[allow(deprecated)] chan.rx.try_next().unwrap().unwrap()
452 }
453
454 fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
460 let mut lock = self.outbound_chan.lock().unwrap();
461 let chan = lock.as_mut().unwrap();
462 let cell = ChanCell::new(circid, msg);
463
464 chan.tx.try_send(Ok(cell)).unwrap();
465 }
466 }
467
468 fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
469 vec![
470 LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
471 LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
472 LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
473 .encode()
474 .unwrap(),
475 ]
476 }
477
478 fn assert_circuit_destroyed(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
485 assert!(ctrl.is_closing());
486
487 let cell = ctrl.read_inbound();
488
489 match cell.msg() {
490 chanmsg::AnyChanMsg::Destroy(d) => {
491 assert_eq!(d.reason(), reason);
492 }
493 _ => panic!("unexpected ending {cell:?}"),
494 }
495 }
496
497 #[traced_test]
498 #[test]
499 fn reject_extend2_relay() {
500 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
501 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
502 rt.advance_until_stalled().await;
503
504 let linkspecs = dummy_linkspecs();
505 let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
506 ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
507 rt.advance_until_stalled().await;
508
509 assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
510 assert!(!ctrl.outbound_chan_launched());
511 assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
512 });
513 }
514
515 #[traced_test]
516 #[test]
517 fn reject_extend2_previous_hop() {
518 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
519 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
520 rt.advance_until_stalled().await;
521
522 assert!(!ctrl.outbound_chan_launched());
524
525 let mut linkspecs = ctrl
527 .inbound_chan
528 .channel
529 .target()
530 .identities()
531 .map(|id| LinkSpec::from(id.to_owned()).encode())
532 .collect::<Result<Vec<_>, _>>()
533 .unwrap();
534
535 assert_eq!(linkspecs.len(), 2);
538
539 linkspecs.push(
541 LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
542 .encode()
543 .unwrap(),
544 );
545 let handshake_type = HandshakeType::NTOR_V3;
546 let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
547 ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
548 rt.advance_until_stalled().await;
549
550 assert!(logs_contain("Cannot extend circuit to previous hop"));
552 assert!(!ctrl.outbound_chan_launched());
553 assert!(ctrl.is_closing());
554 });
555 }
556
557 #[traced_test]
558 #[test]
559 fn extend_and_forward() {
560 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
561 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
562 rt.advance_until_stalled().await;
563
564 assert!(!ctrl.outbound_chan_launched());
566
567 let linkspecs = dummy_linkspecs();
568 let handshake_type = HandshakeType::NTOR_V3;
569 let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
570 ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
571 rt.advance_until_stalled().await;
572
573 assert!(logs_contain(
575 "Launched channel to the next hop circ_id=Circ 8.17"
576 ));
577 assert!(ctrl.outbound_chan_launched());
578 assert!(!ctrl.is_closing());
579
580 ctrl.do_create2_handshake(&rt, handshake_type).await;
581 assert!(logs_contain("Got CREATED2 response from next hop"));
582 assert!(logs_contain("Extended circuit to the next hop"));
583
584 let early = false;
586 let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
587 ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
588 .await;
589 rt.advance_until_stalled().await;
590
591 macro_rules! expect_cell {
592 ($chanmsg:tt, $relaymsg:tt) => {{
593 let cell = ctrl.read_outbound();
594 let msg = match cell.msg() {
595 chanmsg::AnyChanMsg::$chanmsg(m) => {
596 let body = m.clone().into_relay_body();
597 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
598 }
599 _ => panic!("unexpected forwarded {cell:?}"),
600 };
601
602 match msg.msg() {
603 relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
604 _ => panic!("unexpected cell {msg:?}"),
605 }
606 }};
607 }
608
609 let recvd_begin = expect_cell!(Relay, Begin);
611 assert_eq!(begin, recvd_begin);
612
613 let early = true;
615 let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
616 ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
617 .await;
618 rt.advance_until_stalled().await;
619 let recvd_begin = expect_cell!(RelayEarly, Begin);
620 assert_eq!(begin, recvd_begin);
621 });
622 }
623
624 #[traced_test]
625 #[test]
626 fn forward_before_extend() {
627 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
628 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
629 rt.advance_until_stalled().await;
630
631 let extend2 = relaymsg::End::new_misc().into();
634 ctrl.send_fwd(None, extend2, Recognized::No, true).await;
635 rt.advance_until_stalled().await;
636
637 assert!(logs_contain(
639 "Asked to forward cell before the circuit was extended?!"
640 ));
641 assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
642 });
643 }
644
645 #[traced_test]
646 #[test]
647 fn reject_invalid_begin() {
648 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
649 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
650 rt.advance_until_stalled().await;
651
652 let _streams = ctrl
653 .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
654 .await;
655
656 let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
657
658 ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
661 rt.advance_until_stalled().await;
662
663 assert!(logs_contain(
664 "Invalid stream ID [scrubbed] for relay command BEGIN"
665 ));
666 assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
667 });
668 }
669
670 #[traced_test]
671 #[test]
672 #[ignore] fn data_stream() {
674 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
675 const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
676
677 let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
678 rt.advance_until_stalled().await;
679
680 let mut incoming_streams = ctrl
681 .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
682 .await;
683
684 let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
685 ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
686 .await;
687 rt.advance_until_stalled().await;
688
689 let data = relaymsg::Data::new(TO_SEND).unwrap().into();
690 ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
691 .await;
692
693 let pending = incoming_streams.next().await.unwrap();
695
696 let mut stream = pending
698 .accept_data(relaymsg::Connected::new_empty())
699 .await
700 .unwrap();
701
702 let mut recv_buf = [0_u8; TO_SEND.len()];
703 stream.read_exact(&mut recv_buf).await.unwrap();
704 assert_eq!(recv_buf, TO_SEND);
705 });
706 }
707}