1pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65 CircId,
66 msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72use web_time_compat::Instant;
73
74use crate::circuit::UniqId;
75
76use super::{ClientTunnel, TargetHop};
77
78use futures::channel::mpsc;
79use oneshot_fused_workaround as oneshot;
80
81use futures::FutureExt as _;
82use std::collections::HashMap;
83use std::sync::{Arc, Mutex};
84use tor_memquota::derive_deftly_template_HasMemoryCost;
85
86use crate::crypto::handshake::ntor::NtorPublicKey;
87
88#[cfg(test)]
89use crate::stream::{StreamMpscReceiver, StreamMpscSender};
90
91pub use crate::crypto::binding::CircuitBinding;
92pub use path::{Path, PathEntry};
93
94pub const CIRCUIT_BUFFER_SIZE: usize = 128;
96
97pub use crate::circuit::CircParameters;
99
100pub use crate::util::timeout::TimeoutEstimator;
102
103#[derive(Debug, Deftly)]
106#[allow(unreachable_pub)] #[derive_deftly(HasMemoryCost)]
108#[derive_deftly(RestrictedChanMsgSet)]
109#[deftly(usage = "on an open client circuit")]
110pub(super) enum ClientCircChanMsg {
111 Relay(chanmsg::Relay),
114 Destroy(chanmsg::Destroy),
116 }
118
119#[derive(Debug)]
120pub struct ClientCirc {
163 pub(super) mutable: Arc<TunnelMutableState>,
165 unique_id: UniqId,
167 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
169 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
171 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
174 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
175 #[cfg(test)]
177 circid: CircId,
178 pub(super) memquota: CircuitAccount,
180 pub(super) time_provider: DynTimeProvider,
182 pub(super) is_multi_path: bool,
192}
193
194#[derive(Debug, Default)]
214pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
215
216impl TunnelMutableState {
217 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
219 #[allow(unused)] let state = self
221 .0
222 .lock()
223 .expect("lock poisoned")
224 .insert(unique_id, mutable);
225
226 debug_assert!(state.is_none());
227 }
228
229 pub(super) fn remove(&self, unique_id: UniqId) {
231 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
233
234 debug_assert!(state.is_some());
235 }
236
237 fn all_paths(&self) -> Vec<Arc<Path>> {
239 let lock = self.0.lock().expect("lock poisoned");
240 lock.values().map(|mutable| mutable.path()).collect()
241 }
242
243 #[cfg(feature = "rpc")]
249 pub(super) fn tagged_paths(&self) -> HashMap<UniqId, Arc<Path>> {
250 let lock = self.0.lock().expect("lock poisoned");
251 lock.iter()
252 .map(|(id, mutable)| (*id, mutable.path()))
253 .collect()
254 }
255
256 #[allow(unstable_name_collisions)]
264 fn single_path(&self) -> Result<Arc<Path>> {
265 use itertools::Itertools as _;
266
267 self.all_paths().into_iter().exactly_one().map_err(|_| {
268 bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
269 })
270 }
271
272 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
277 let lock = self.0.lock().expect("lock poisoned");
278 let mutable = lock
279 .get(&unique_id)
280 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
281
282 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
283 path::HopDetail::Relay(r) => r,
284 #[cfg(feature = "hs-common")]
285 path::HopDetail::Virtual => {
286 panic!("somehow made a circuit with a virtual first hop.")
287 }
288 });
289
290 Ok(first_hop)
291 }
292
293 pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
299 let lock = self.0.lock().expect("lock poisoned");
300 let mutable = lock
301 .get(&unique_id)
302 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
303
304 Ok(mutable.last_hop_num())
305 }
306
307 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
311 let lock = self.0.lock().expect("lock poisoned");
312 let mutable = lock
313 .get(&unique_id)
314 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
315
316 Ok(mutable.n_hops())
317 }
318}
319
320#[derive(Educe, Default)]
322#[educe(Debug)]
323pub(super) struct MutableState(Mutex<CircuitState>);
324
325impl MutableState {
326 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
328 let mut mutable = self.0.lock().expect("poisoned lock");
329 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
330 mutable.binding.push(binding);
331 }
332
333 pub(super) fn path(&self) -> Arc<path::Path> {
335 let mutable = self.0.lock().expect("poisoned lock");
336 Arc::clone(&mutable.path)
337 }
338
339 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
342 let mutable = self.0.lock().expect("poisoned lock");
343
344 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
345 }
348
349 fn first_hop(&self) -> Option<HopDetail> {
351 let mutable = self.0.lock().expect("poisoned lock");
352 mutable.path.first_hop()
353 }
354
355 fn last_hop_num(&self) -> Option<HopNum> {
362 let mutable = self.0.lock().expect("poisoned lock");
363 mutable.path.last_hop_num()
364 }
365
366 fn n_hops(&self) -> usize {
373 let mutable = self.0.lock().expect("poisoned lock");
374 mutable.path.n_hops()
375 }
376}
377
378#[derive(Educe, Default)]
380#[educe(Debug)]
381pub(super) struct CircuitState {
382 path: Arc<path::Path>,
388
389 #[educe(Debug(ignore))]
397 binding: Vec<Option<CircuitBinding>>,
398}
399
400pub struct PendingClientTunnel {
405 recvcreated: oneshot::Receiver<CreateResponse>,
408 circ: ClientCirc,
410}
411
412impl ClientCirc {
413 pub fn into_tunnel(self) -> Result<ClientTunnel> {
415 self.try_into()
416 }
417
418 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
426 Ok(self
427 .mutable
428 .first_hop(self.unique_id)
429 .map_err(|_| Error::CircuitClosed)?
430 .expect("called first_hop on an un-constructed circuit"))
431 }
432
433 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
443 let all_paths = self.all_paths();
444 let path = all_paths.first().ok_or_else(|| {
445 tor_error::bad_api_usage!("Called last_hop_info on an un-constructed tunnel")
446 })?;
447 Ok(path
448 .hops()
449 .last()
450 .expect("Called last_hop on an un-constructed circuit")
451 .as_chan_target()
452 .map(OwnedChanTarget::from_chan_target))
453 }
454
455 pub fn last_hop_num(&self) -> Result<HopNum> {
465 Ok(self
466 .mutable
467 .last_hop_num(self.unique_id)?
468 .ok_or_else(|| internal!("no last hop index"))?)
469 }
470
471 pub fn last_hop(&self) -> Result<TargetHop> {
476 let hop_num = self
477 .mutable
478 .last_hop_num(self.unique_id)?
479 .ok_or_else(|| bad_api_usage!("no last hop"))?;
480 Ok((self.unique_id, hop_num).into())
481 }
482
483 pub fn all_paths(&self) -> Vec<Arc<Path>> {
488 self.mutable.all_paths()
489 }
490
491 pub fn single_path(&self) -> Result<Arc<Path>> {
495 self.mutable.single_path()
496 }
497
498 pub async fn disused_since(&self) -> Result<Option<Instant>> {
507 let (tx, rx) = oneshot::channel();
508 self.command
509 .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
510 .map_err(|_| Error::CircuitClosed)?;
511
512 Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
513 }
514
515 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
519 let (tx, rx) = oneshot::channel();
520
521 self.control
522 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
523 .map_err(|_| Error::CircuitClosed)?;
524
525 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
526 }
527
528 pub fn mq_account(&self) -> &CircuitAccount {
530 &self.memquota
531 }
532
533 #[cfg(feature = "hs-service")]
541 pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
542 let (sender, receiver) = oneshot::channel();
543 let msg = CtrlCmd::GetBindingKey { hop, done: sender };
544 self.command
545 .unbounded_send(msg)
546 .map_err(|_| Error::CircuitClosed)?;
547
548 receiver.await.map_err(|_| Error::CircuitClosed)?
549 }
550
551 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
554 where
555 Tg: CircTarget,
556 {
557 #![allow(deprecated)]
558
559 if target
571 .protovers()
572 .supports_named_subver(named::RELAY_NTORV3)
573 {
574 self.extend_ntor_v3(target, params).await
575 } else {
576 self.extend_ntor(target, params).await
577 }
578 }
579
580 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
583 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
584 where
585 Tg: CircTarget,
586 {
587 let key = NtorPublicKey {
588 id: *target
589 .rsa_identity()
590 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
591 pk: *target.ntor_onion_key(),
592 };
593 let mut linkspecs = target
594 .linkspecs()
595 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
596 if !params.extend_by_ed25519_id {
597 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
598 }
599
600 let (tx, rx) = oneshot::channel();
601
602 let peer_id = OwnedChanTarget::from_chan_target(target);
603 let settings = HopSettings::from_params_and_caps(
604 HopNegotiationType::None,
605 ¶ms,
606 target.protovers(),
607 )?;
608 self.control
609 .unbounded_send(CtrlMsg::ExtendNtor {
610 peer_id,
611 public_key: key,
612 linkspecs,
613 settings,
614 done: tx,
615 })
616 .map_err(|_| Error::CircuitClosed)?;
617
618 rx.await.map_err(|_| Error::CircuitClosed)??;
619
620 Ok(())
621 }
622
623 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
626 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
627 where
628 Tg: CircTarget,
629 {
630 let key = NtorV3PublicKey {
631 id: *target
632 .ed_identity()
633 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
634 pk: *target.ntor_onion_key(),
635 };
636 let mut linkspecs = target
637 .linkspecs()
638 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
639 if !params.extend_by_ed25519_id {
640 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
641 }
642
643 let (tx, rx) = oneshot::channel();
644
645 let peer_id = OwnedChanTarget::from_chan_target(target);
646 let settings = HopSettings::from_params_and_caps(
647 HopNegotiationType::Full,
648 ¶ms,
649 target.protovers(),
650 )?;
651 self.control
652 .unbounded_send(CtrlMsg::ExtendNtorV3 {
653 peer_id,
654 public_key: key,
655 linkspecs,
656 settings,
657 done: tx,
658 })
659 .map_err(|_| Error::CircuitClosed)?;
660
661 rx.await.map_err(|_| Error::CircuitClosed)??;
662
663 Ok(())
664 }
665
666 #[cfg(feature = "hs-common")]
690 pub async fn extend_virtual(
691 &self,
692 protocol: handshake::RelayProtocol,
693 role: handshake::HandshakeRole,
694 seed: impl handshake::KeyGenerator,
695 params: &CircParameters,
696 capabilities: &tor_protover::Protocols,
697 ) -> Result<()> {
698 use self::handshake::BoxedClientLayer;
699
700 let negotiation_type = match protocol {
702 handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
703 };
704 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
705
706 let BoxedClientLayer { fwd, back, binding } =
707 protocol.construct_client_layers(role, seed)?;
708
709 let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
710 let (tx, rx) = oneshot::channel();
711 let message = CtrlCmd::ExtendVirtual {
712 cell_crypto: (fwd, back, binding),
713 settings,
714 done: tx,
715 };
716
717 self.command
718 .unbounded_send(message)
719 .map_err(|_| Error::CircuitClosed)?;
720
721 rx.await.map_err(|_| Error::CircuitClosed)?
722 }
723
724 #[cfg(feature = "circ-padding-manual")]
728 pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
729 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
730 .await
731 }
732
733 #[cfg(feature = "circ-padding-manual")]
737 pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
738 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
739 .await
740 }
741
742 #[cfg(feature = "circ-padding-manual")]
744 pub(super) async fn set_padder_impl(
745 &self,
746 hop: crate::HopLocation,
747 padder: Option<CircuitPadder>,
748 ) -> Result<()> {
749 let (tx, rx) = oneshot::channel();
750 let msg = CtrlCmd::SetPadder {
751 hop,
752 padder,
753 sender: tx,
754 };
755 self.command
756 .unbounded_send(msg)
757 .map_err(|_| Error::CircuitClosed)?;
758 rx.await.map_err(|_| Error::CircuitClosed)?
759 }
760
761 pub fn is_closing(&self) -> bool {
763 self.control.is_closed()
764 }
765
766 pub fn unique_id(&self) -> UniqId {
768 self.unique_id
769 }
770
771 pub fn n_hops(&self) -> Result<usize> {
778 self.mutable
779 .n_hops(self.unique_id)
780 .map_err(|_| Error::CircuitClosed)
781 }
782
783 pub fn wait_for_close(
790 &self,
791 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
792 self.reactor_closed_rx.clone().map(|_| ())
793 }
794}
795
796impl PendingClientTunnel {
797 #[allow(clippy::too_many_arguments)]
801 pub(crate) fn new(
802 id: CircId,
803 channel: Arc<Channel>,
804 createdreceiver: oneshot::Receiver<CreateResponse>,
805 input: CircuitRxReceiver,
806 unique_id: UniqId,
807 runtime: DynTimeProvider,
808 memquota: CircuitAccount,
809 padding_ctrl: PaddingController,
810 padding_stream: PaddingEventStream,
811 timeouts: Arc<dyn TimeoutEstimator>,
812 ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
813 let time_provider = channel.time_provider().clone();
814 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
815 channel,
816 id,
817 unique_id,
818 input,
819 runtime,
820 memquota.clone(),
821 padding_ctrl,
822 padding_stream,
823 timeouts,
824 );
825
826 let circuit = ClientCirc {
827 mutable,
828 unique_id,
829 control: control_tx,
830 command: command_tx,
831 reactor_closed_rx: reactor_closed_rx.shared(),
832 #[cfg(test)]
833 circid: id,
834 memquota,
835 time_provider,
836 is_multi_path: false,
837 };
838
839 let pending = PendingClientTunnel {
840 recvcreated: createdreceiver,
841 circ: circuit,
842 };
843 (pending, reactor)
844 }
845
846 pub fn peek_unique_id(&self) -> UniqId {
848 self.circ.unique_id
849 }
850
851 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
858 let protocols = tor_protover::Protocols::new();
863 let settings =
864 HopSettings::from_params_and_caps(HopNegotiationType::None, ¶ms, &protocols)?;
865 let (tx, rx) = oneshot::channel();
866 self.circ
867 .control
868 .unbounded_send(CtrlMsg::Create {
869 recv_created: self.recvcreated,
870 handshake: CircuitHandshake::CreateFast,
871 settings,
872 done: tx,
873 })
874 .map_err(|_| Error::CircuitClosed)?;
875
876 rx.await.map_err(|_| Error::CircuitClosed)??;
877
878 self.circ.into_tunnel()
879 }
880
881 pub async fn create_firsthop<Tg>(
886 self,
887 target: &Tg,
888 params: CircParameters,
889 ) -> Result<ClientTunnel>
890 where
891 Tg: tor_linkspec::CircTarget,
892 {
893 #![allow(deprecated)]
894 if target
896 .protovers()
897 .supports_named_subver(named::RELAY_NTORV3)
898 {
899 self.create_firsthop_ntor_v3(target, params).await
900 } else {
901 self.create_firsthop_ntor(target, params).await
902 }
903 }
904
905 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
910 pub async fn create_firsthop_ntor<Tg>(
911 self,
912 target: &Tg,
913 params: CircParameters,
914 ) -> Result<ClientTunnel>
915 where
916 Tg: tor_linkspec::CircTarget,
917 {
918 let (tx, rx) = oneshot::channel();
919 let settings = HopSettings::from_params_and_caps(
920 HopNegotiationType::None,
921 ¶ms,
922 target.protovers(),
923 )?;
924
925 self.circ
926 .control
927 .unbounded_send(CtrlMsg::Create {
928 recv_created: self.recvcreated,
929 handshake: CircuitHandshake::Ntor {
930 public_key: NtorPublicKey {
931 id: *target
932 .rsa_identity()
933 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
934 pk: *target.ntor_onion_key(),
935 },
936 ed_identity: *target
937 .ed_identity()
938 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
939 },
940 settings,
941 done: tx,
942 })
943 .map_err(|_| Error::CircuitClosed)?;
944
945 rx.await.map_err(|_| Error::CircuitClosed)??;
946
947 self.circ.into_tunnel()
948 }
949
950 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
959 pub async fn create_firsthop_ntor_v3<Tg>(
960 self,
961 target: &Tg,
962 params: CircParameters,
963 ) -> Result<ClientTunnel>
964 where
965 Tg: tor_linkspec::CircTarget,
966 {
967 let settings = HopSettings::from_params_and_caps(
968 HopNegotiationType::Full,
969 ¶ms,
970 target.protovers(),
971 )?;
972 let (tx, rx) = oneshot::channel();
973
974 self.circ
975 .control
976 .unbounded_send(CtrlMsg::Create {
977 recv_created: self.recvcreated,
978 handshake: CircuitHandshake::NtorV3 {
979 public_key: NtorV3PublicKey {
980 id: *target
981 .ed_identity()
982 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
983 pk: *target.ntor_onion_key(),
984 },
985 },
986 settings,
987 done: tx,
988 })
989 .map_err(|_| Error::CircuitClosed)?;
990
991 rx.await.map_err(|_| Error::CircuitClosed)??;
992
993 self.circ.into_tunnel()
994 }
995}
996
997#[cfg(test)]
998pub(crate) mod test {
999 #![allow(clippy::bool_assert_comparison)]
1001 #![allow(clippy::clone_on_copy)]
1002 #![allow(clippy::dbg_macro)]
1003 #![allow(clippy::mixed_attributes_style)]
1004 #![allow(clippy::print_stderr)]
1005 #![allow(clippy::print_stdout)]
1006 #![allow(clippy::single_char_pattern)]
1007 #![allow(clippy::unwrap_used)]
1008 #![allow(clippy::unchecked_time_subtraction)]
1009 #![allow(clippy::useless_vec)]
1010 #![allow(clippy::needless_pass_by_value)]
1011 use super::*;
1014 use crate::channel::test::{CodecResult, new_reactor};
1015 use crate::circuit::CircuitRxSender;
1016 use crate::circuit::reactor::test::rmsg_to_ccmsg;
1017 use crate::client::circuit::padding::new_padding;
1018 use crate::client::stream::DataStream;
1019 use crate::congestion::params::CongestionControlParams;
1020 use crate::congestion::test_utils::params::build_cc_vegas_params;
1021 use crate::crypto::cell::RelayCellBody;
1022 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1023 use crate::memquota::SpecificAccount as _;
1024 use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1025 use crate::util::DummyTimeoutEstimator;
1026 use assert_matches::assert_matches;
1027 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1028 use futures::channel::mpsc::{Receiver, Sender};
1029 use futures::io::{AsyncReadExt, AsyncWriteExt};
1030 use futures::sink::SinkExt;
1031 use futures::stream::StreamExt;
1032 use hex_literal::hex;
1033 use std::collections::{HashMap, VecDeque};
1034 use std::fmt::Debug;
1035 use std::time::Duration;
1036 use tor_basic_utils::test_rng::testing_rng;
1037 use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1038 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1039 use tor_cell::relaycell::msg::SendmeTag;
1040 use tor_cell::relaycell::{
1041 AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1042 };
1043 use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1044 use tor_linkspec::OwnedCircTarget;
1045 use tor_memquota::HasMemoryCost;
1046 use tor_rtcompat::Runtime;
1047 use tor_rtcompat::SpawnExt;
1048 use tracing::trace;
1049 use tracing_test::traced_test;
1050
1051 #[cfg(feature = "conflux")]
1052 use {
1053 crate::client::reactor::ConfluxHandshakeResult,
1054 crate::util::err::ConfluxHandshakeError,
1055 futures::future::FusedFuture,
1056 futures::lock::Mutex as AsyncMutex,
1057 std::pin::Pin,
1058 std::result::Result as StdResult,
1059 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1060 tor_cell::relaycell::msg::ConfluxLink,
1061 tor_rtmock::MockRuntime,
1062 };
1063
1064 #[cfg(feature = "hs-service")]
1065 use crate::circuit::reactor::test::AllowAllStreamsFilter;
1066
1067 impl PendingClientTunnel {
1068 pub(crate) fn peek_circid(&self) -> CircId {
1070 self.circ.circid
1071 }
1072 }
1073
1074 impl ClientCirc {
1075 pub(crate) fn peek_circid(&self) -> CircId {
1077 self.circid
1078 }
1079 }
1080
1081 impl ClientTunnel {
1082 pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1083 let (sender, receiver) = oneshot::channel();
1084 let _ =
1085 self.as_single_circ()
1086 .unwrap()
1087 .command
1088 .unbounded_send(CtrlCmd::ResolveTargetHop {
1089 hop: TargetHop::LastHop,
1090 done: sender,
1091 });
1092 TargetHop::Hop(receiver.await.unwrap().unwrap())
1093 }
1094 }
1095
1096 const EXAMPLE_SK: [u8; 32] =
1098 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1099 const EXAMPLE_PK: [u8; 32] =
1100 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1101 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1102 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1103
1104 #[cfg(test)]
1106 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1107 buffer: usize,
1108 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1109 crate::fake_mpsc(buffer)
1110 }
1111
1112 fn example_target() -> OwnedCircTarget {
1114 let mut builder = OwnedCircTarget::builder();
1115 builder
1116 .chan_target()
1117 .ed_identity(EXAMPLE_ED_ID.into())
1118 .rsa_identity(EXAMPLE_RSA_ID.into());
1119 builder
1120 .ntor_onion_key(EXAMPLE_PK.into())
1121 .protocols("FlowCtrl=1-2".parse().unwrap())
1122 .build()
1123 .unwrap()
1124 }
1125 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1126 crate::crypto::handshake::ntor::NtorSecretKey::new(
1127 EXAMPLE_SK.into(),
1128 EXAMPLE_PK.into(),
1129 EXAMPLE_RSA_ID.into(),
1130 )
1131 }
1132 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1133 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1134 EXAMPLE_SK.into(),
1135 EXAMPLE_PK.into(),
1136 EXAMPLE_ED_ID.into(),
1137 )
1138 }
1139
1140 fn working_fake_channel<R: Runtime>(
1141 rt: &R,
1142 ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1143 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1144 rt.spawn(async {
1145 let _ignore = chan_reactor.run().await;
1146 })
1147 .unwrap();
1148 (channel, rx, tx)
1149 }
1150
1151 #[derive(Copy, Clone)]
1153 enum HandshakeType {
1154 Fast,
1155 Ntor,
1156 NtorV3,
1157 }
1158
1159 #[allow(deprecated)]
1160 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1161 use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1165
1166 let (chan, mut rx, _sink) = working_fake_channel(rt);
1167 let circid = CircId::new(128).unwrap();
1168 let (created_send, created_recv) = oneshot::channel();
1169 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1170 let unique_id = UniqId::new(23, 17);
1171 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1172
1173 let (pending, reactor) = PendingClientTunnel::new(
1174 circid,
1175 chan,
1176 created_recv,
1177 circmsg_recv,
1178 unique_id,
1179 DynTimeProvider::new(rt.clone()),
1180 CircuitAccount::new_noop(),
1181 padding_ctrl,
1182 padding_stream,
1183 Arc::new(DummyTimeoutEstimator),
1184 );
1185
1186 rt.spawn(async {
1187 let _ignore = reactor.run().await;
1188 })
1189 .unwrap();
1190
1191 let simulate_relay_fut = async move {
1193 let mut rng = testing_rng();
1194 let create_cell = rx.next().await.unwrap();
1195 assert_eq!(create_cell.circid(), Some(circid));
1196 let reply = match handshake_type {
1197 HandshakeType::Fast => {
1198 let cf = match create_cell.msg() {
1199 AnyChanMsg::CreateFast(cf) => cf,
1200 other => panic!("{:?}", other),
1201 };
1202 let (_, rep) = CreateFastServer::server(
1203 &mut rng,
1204 &mut |_: &()| Some(()),
1205 &[()],
1206 cf.handshake(),
1207 )
1208 .unwrap();
1209 CreateResponse::CreatedFast(CreatedFast::new(rep))
1210 }
1211 HandshakeType::Ntor => {
1212 let c2 = match create_cell.msg() {
1213 AnyChanMsg::Create2(c2) => c2,
1214 other => panic!("{:?}", other),
1215 };
1216 let (_, rep) = NtorServer::server(
1217 &mut rng,
1218 &mut |_: &()| Some(()),
1219 &[example_ntor_key()],
1220 c2.body(),
1221 )
1222 .unwrap();
1223 CreateResponse::Created2(Created2::new(rep))
1224 }
1225 HandshakeType::NtorV3 => {
1226 let c2 = match create_cell.msg() {
1227 AnyChanMsg::Create2(c2) => c2,
1228 other => panic!("{:?}", other),
1229 };
1230 let mut reply_fn = if with_cc {
1231 |client_exts: &[CircRequestExt]| {
1232 let _ = client_exts
1233 .iter()
1234 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1235 .expect("Client failed to request CC");
1236 Some(vec![CircResponseExt::CcResponse(
1239 extend_ext::CcResponse::new(31),
1240 )])
1241 }
1242 } else {
1243 |_: &_| Some(vec![])
1244 };
1245 let (_, rep) = NtorV3Server::server(
1246 &mut rng,
1247 &mut reply_fn,
1248 &[example_ntor_v3_key()],
1249 c2.body(),
1250 )
1251 .unwrap();
1252 CreateResponse::Created2(Created2::new(rep))
1253 }
1254 };
1255 created_send.send(reply).unwrap();
1256 };
1257 let client_fut = async move {
1259 let target = example_target();
1260 let params = CircParameters::default();
1261 let ret = match handshake_type {
1262 HandshakeType::Fast => {
1263 trace!("doing fast create");
1264 pending.create_firsthop_fast(params).await
1265 }
1266 HandshakeType::Ntor => {
1267 trace!("doing ntor create");
1268 pending.create_firsthop_ntor(&target, params).await
1269 }
1270 HandshakeType::NtorV3 => {
1271 let params = if with_cc {
1272 CircParameters::new(
1274 true,
1275 build_cc_vegas_params(),
1276 FlowCtrlParameters::defaults_for_tests(),
1277 )
1278 } else {
1279 params
1280 };
1281 trace!("doing ntor_v3 create");
1282 pending.create_firsthop_ntor_v3(&target, params).await
1283 }
1284 };
1285 trace!("create done: result {:?}", ret);
1286 ret
1287 };
1288
1289 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1290
1291 let _circ = circ.unwrap();
1292
1293 assert_eq!(_circ.n_hops().unwrap(), 1);
1295 }
1296
1297 #[traced_test]
1298 #[test]
1299 fn test_create_fast() {
1300 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1301 test_create(&rt, HandshakeType::Fast, false).await;
1302 });
1303 }
1304 #[traced_test]
1305 #[test]
1306 fn test_create_ntor() {
1307 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1308 test_create(&rt, HandshakeType::Ntor, false).await;
1309 });
1310 }
1311 #[traced_test]
1312 #[test]
1313 fn test_create_ntor_v3() {
1314 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1315 test_create(&rt, HandshakeType::NtorV3, false).await;
1316 });
1317 }
1318 #[traced_test]
1319 #[test]
1320 #[cfg(feature = "flowctl-cc")]
1321 fn test_create_ntor_v3_with_cc() {
1322 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1323 test_create(&rt, HandshakeType::NtorV3, true).await;
1324 });
1325 }
1326
1327 pub(crate) struct DummyCrypto {
1330 counter_tag: [u8; 20],
1331 counter: u32,
1332 lasthop: bool,
1333 }
1334 impl DummyCrypto {
1335 fn next_tag(&mut self) -> SendmeTag {
1336 #![allow(clippy::identity_op)]
1337 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1338 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1339 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1340 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1341 self.counter += 1;
1342 self.counter_tag.into()
1343 }
1344 }
1345
1346 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1347 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1348 self.next_tag()
1349 }
1350 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1351 }
1352 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1353 fn decrypt_inbound(
1354 &mut self,
1355 _cmd: ChanCmd,
1356 _cell: &mut RelayCellBody,
1357 ) -> Option<SendmeTag> {
1358 if self.lasthop {
1359 Some(self.next_tag())
1360 } else {
1361 None
1362 }
1363 }
1364 }
1365 impl DummyCrypto {
1366 pub(crate) fn new(lasthop: bool) -> Self {
1367 DummyCrypto {
1368 counter_tag: [0; 20],
1369 counter: 0,
1370 lasthop,
1371 }
1372 }
1373 }
1374
1375 async fn newtunnel_ext<R: Runtime>(
1378 rt: &R,
1379 unique_id: UniqId,
1380 chan: Arc<Channel>,
1381 hops: Vec<path::HopDetail>,
1382 next_msg_from: HopNum,
1383 params: CircParameters,
1384 ) -> (ClientTunnel, CircuitRxSender) {
1385 let circid = CircId::new(128).unwrap();
1386 let (_created_send, created_recv) = oneshot::channel();
1387 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1388 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1389
1390 let (pending, reactor) = PendingClientTunnel::new(
1391 circid,
1392 chan,
1393 created_recv,
1394 circmsg_recv,
1395 unique_id,
1396 DynTimeProvider::new(rt.clone()),
1397 CircuitAccount::new_noop(),
1398 padding_ctrl,
1399 padding_stream,
1400 Arc::new(DummyTimeoutEstimator),
1401 );
1402
1403 rt.spawn(async {
1404 let _ignore = reactor.run().await;
1405 })
1406 .unwrap();
1407 let PendingClientTunnel {
1408 circ,
1409 recvcreated: _,
1410 } = pending;
1411
1412 let relay_cell_format = RelayCellFormat::V0;
1414
1415 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1416 for (idx, peer_id) in hops.into_iter().enumerate() {
1417 let (tx, rx) = oneshot::channel();
1418 let idx = idx as u8;
1419
1420 circ.command
1421 .unbounded_send(CtrlCmd::AddFakeHop {
1422 relay_cell_format,
1423 fwd_lasthop: idx == last_hop_num,
1424 rev_lasthop: idx == u8::from(next_msg_from),
1425 peer_id,
1426 params: params.clone(),
1427 done: tx,
1428 })
1429 .unwrap();
1430 rx.await.unwrap().unwrap();
1431 }
1432 (circ.into_tunnel().unwrap(), circmsg_send)
1433 }
1434
1435 async fn newtunnel<R: Runtime>(
1438 rt: &R,
1439 chan: Arc<Channel>,
1440 ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1441 let hops = std::iter::repeat_with(|| {
1442 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1443 .ed_identity([4; 32].into())
1444 .rsa_identity([5; 20].into())
1445 .build()
1446 .expect("Could not construct fake hop");
1447
1448 path::HopDetail::Relay(peer_id)
1449 })
1450 .take(3)
1451 .collect();
1452
1453 let unique_id = UniqId::new(23, 17);
1454 let (tunnel, circmsg_send) = newtunnel_ext(
1455 rt,
1456 unique_id,
1457 chan,
1458 hops,
1459 2.into(),
1460 CircParameters::default(),
1461 )
1462 .await;
1463
1464 (Arc::new(tunnel), circmsg_send)
1465 }
1466
1467 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1470 (0..n)
1471 .map(|idx| {
1472 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1473 .ed_identity([idx + start_idx; 32].into())
1474 .rsa_identity([idx + start_idx + 1; 20].into())
1475 .build()
1476 .expect("Could not construct fake hop");
1477
1478 path::HopDetail::Relay(peer_id)
1479 })
1480 .collect()
1481 }
1482
1483 #[allow(deprecated)]
1484 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1485 use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1486
1487 let (chan, mut rx, _sink) = working_fake_channel(rt);
1488 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1489 let circ = Arc::new(tunnel.as_single_circ().unwrap());
1490 let circid = circ.peek_circid();
1491 let params = CircParameters::default();
1492
1493 let extend_fut = async move {
1494 let target = example_target();
1495 match handshake_type {
1496 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1497 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1498 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1499 };
1500 circ };
1502 let reply_fut = async move {
1503 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1506 assert_eq!(id, Some(circid));
1507 let rmsg = match chmsg {
1508 AnyChanMsg::RelayEarly(r) => {
1509 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1510 .unwrap()
1511 }
1512 other => panic!("{:?}", other),
1513 };
1514 let e2 = match rmsg.msg() {
1515 AnyRelayMsg::Extend2(e2) => e2,
1516 other => panic!("{:?}", other),
1517 };
1518 let mut rng = testing_rng();
1519 let reply = match handshake_type {
1520 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1521 HandshakeType::Ntor => {
1522 let (_keygen, reply) = NtorServer::server(
1523 &mut rng,
1524 &mut |_: &()| Some(()),
1525 &[example_ntor_key()],
1526 e2.handshake(),
1527 )
1528 .unwrap();
1529 reply
1530 }
1531 HandshakeType::NtorV3 => {
1532 let (_keygen, reply) = NtorV3Server::server(
1533 &mut rng,
1534 &mut |_: &[CircRequestExt]| Some(vec![]),
1535 &[example_ntor_v3_key()],
1536 e2.handshake(),
1537 )
1538 .unwrap();
1539 reply
1540 }
1541 };
1542
1543 let extended2 = relaymsg::Extended2::new(reply).into();
1544 sink.send(rmsg_to_ccmsg(None, extended2, false))
1545 .await
1546 .unwrap();
1547 (sink, rx) };
1549
1550 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1551
1552 assert_eq!(circ.n_hops().unwrap(), 4);
1554
1555 {
1557 let path = circ.single_path().unwrap();
1558 let path = path
1559 .all_hops()
1560 .filter_map(|hop| match hop {
1561 path::HopDetail::Relay(r) => Some(r),
1562 #[cfg(feature = "hs-common")]
1563 path::HopDetail::Virtual => None,
1564 })
1565 .collect::<Vec<_>>();
1566
1567 assert_eq!(path.len(), 4);
1568 use tor_linkspec::HasRelayIds;
1569 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1570 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1571 }
1572 {
1573 let path = circ.single_path().unwrap();
1574 assert_eq!(path.n_hops(), 4);
1575 use tor_linkspec::HasRelayIds;
1576 assert_eq!(
1577 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1578 example_target().ed_identity()
1579 );
1580 assert_ne!(
1581 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1582 example_target().ed_identity()
1583 );
1584 }
1585 }
1586
1587 #[traced_test]
1588 #[test]
1589 fn test_extend_ntor() {
1590 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1591 test_extend(&rt, HandshakeType::Ntor).await;
1592 });
1593 }
1594
1595 #[traced_test]
1596 #[test]
1597 fn test_extend_ntor_v3() {
1598 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1599 test_extend(&rt, HandshakeType::NtorV3).await;
1600 });
1601 }
1602
1603 #[allow(deprecated)]
1604 async fn bad_extend_test_impl<R: Runtime>(
1605 rt: &R,
1606 reply_hop: HopNum,
1607 bad_reply: AnyChanMsg,
1608 ) -> Error {
1609 let (chan, mut rx, _sink) = working_fake_channel(rt);
1610 let hops = std::iter::repeat_with(|| {
1611 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1612 .ed_identity([4; 32].into())
1613 .rsa_identity([5; 20].into())
1614 .build()
1615 .expect("Could not construct fake hop");
1616
1617 path::HopDetail::Relay(peer_id)
1618 })
1619 .take(3)
1620 .collect();
1621
1622 let unique_id = UniqId::new(23, 17);
1623 let (tunnel, mut sink) = newtunnel_ext(
1624 rt,
1625 unique_id,
1626 chan,
1627 hops,
1628 reply_hop,
1629 CircParameters::default(),
1630 )
1631 .await;
1632 let params = CircParameters::default();
1633
1634 let target = example_target();
1635 let reply_task_handle = rt
1636 .spawn_with_handle(async move {
1637 let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1639 let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1640 panic!("unexpected message {chanmsg:?}");
1641 };
1642 let relaymsg = UnparsedRelayMsg::from_singleton_body(
1643 RelayCellFormat::V0,
1644 relay_early.into_relay_body(),
1645 )
1646 .unwrap();
1647 assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1648
1649 sink.send(bad_reply).await.unwrap();
1651 sink
1652 })
1653 .unwrap();
1654 let outcome = tunnel
1655 .as_single_circ()
1656 .unwrap()
1657 .extend_ntor(&target, params)
1658 .await;
1659 let _sink = reply_task_handle.await;
1660
1661 assert_eq!(tunnel.n_hops().unwrap(), 3);
1662 assert!(outcome.is_err());
1663 outcome.unwrap_err()
1664 }
1665
1666 #[traced_test]
1667 #[test]
1668 fn bad_extend_wronghop() {
1669 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1670 let extended2 = relaymsg::Extended2::new(vec![]).into();
1671 let cc = rmsg_to_ccmsg(None, extended2, false);
1672
1673 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1674 match error {
1679 Error::CircuitClosed => {}
1680 x => panic!("got other error: {}", x),
1681 }
1682 });
1683 }
1684
1685 #[traced_test]
1686 #[test]
1687 fn bad_extend_wrongtype() {
1688 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1689 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1690 let cc = rmsg_to_ccmsg(None, extended, false);
1691
1692 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1693 match error {
1694 Error::BytesErr {
1695 err: tor_bytes::Error::InvalidMessage(_),
1696 object: "extended2 message",
1697 } => {}
1698 other => panic!("{:?}", other),
1699 }
1700 });
1701 }
1702
1703 #[traced_test]
1704 #[test]
1705 fn bad_extend_destroy() {
1706 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707 let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1708 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1709 match error {
1710 Error::CircuitClosed => {}
1711 other => panic!("{:?}", other),
1712 }
1713 });
1714 }
1715
1716 #[traced_test]
1717 #[test]
1718 fn bad_extend_crypto() {
1719 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1720 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1721 let cc = rmsg_to_ccmsg(None, extended2, false);
1722 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1723 assert_matches!(error, Error::BadCircHandshakeAuth);
1724 });
1725 }
1726
1727 #[traced_test]
1728 #[test]
1729 fn begindir() {
1730 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1731 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1732 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1733 let circ = tunnel.as_single_circ().unwrap();
1734 let circid = circ.peek_circid();
1735
1736 let begin_and_send_fut = async move {
1737 let mut stream = tunnel.begin_dir_stream().await.unwrap();
1740 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1741 stream.flush().await.unwrap();
1742 let mut buf = [0_u8; 1024];
1743 let n = stream.read(&mut buf).await.unwrap();
1744 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1745 let n = stream.read(&mut buf).await.unwrap();
1746 assert_eq!(n, 0);
1747 stream
1748 };
1749 let reply_fut = async move {
1750 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1753 assert_eq!(id, Some(circid));
1754 let rmsg = match chmsg {
1755 AnyChanMsg::Relay(r) => {
1756 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1757 .unwrap()
1758 }
1759 other => panic!("{:?}", other),
1760 };
1761 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1762 assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1763
1764 let connected = relaymsg::Connected::new_empty().into();
1766 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1767 .await
1768 .unwrap();
1769
1770 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1772 assert_eq!(id, Some(circid));
1773 let rmsg = match chmsg {
1774 AnyChanMsg::Relay(r) => {
1775 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1776 .unwrap()
1777 }
1778 other => panic!("{:?}", other),
1779 };
1780 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1781 assert_eq!(streamid_2, streamid);
1782 if let AnyRelayMsg::Data(d) = rmsg {
1783 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1784 } else {
1785 panic!();
1786 }
1787
1788 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1790 .unwrap()
1791 .into();
1792 sink.send(rmsg_to_ccmsg(streamid, data, false))
1793 .await
1794 .unwrap();
1795
1796 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1798 sink.send(rmsg_to_ccmsg(streamid, end, false))
1799 .await
1800 .unwrap();
1801
1802 (rx, sink) };
1804
1805 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1806 });
1807 }
1808
1809 fn close_stream_helper(by_drop: bool) {
1811 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1812 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1813 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1814
1815 let stream_fut = async move {
1816 let stream = tunnel
1817 .begin_stream("www.example.com", 80, None)
1818 .await
1819 .unwrap();
1820
1821 let (r, mut w) = stream.split();
1822 if by_drop {
1823 drop(r);
1825 drop(w);
1826 (None, tunnel) } else {
1828 w.close().await.unwrap();
1830 (Some(r), tunnel)
1831 }
1832 };
1833 let handler_fut = async {
1834 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1836 let rmsg = match msg {
1837 AnyChanMsg::Relay(r) => {
1838 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1839 .unwrap()
1840 }
1841 other => panic!("{:?}", other),
1842 };
1843 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1844 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1845
1846 let connected =
1848 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1849 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1850 .await
1851 .unwrap();
1852
1853 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1855 let rmsg = match msg {
1856 AnyChanMsg::Relay(r) => {
1857 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1858 .unwrap()
1859 }
1860 other => panic!("{:?}", other),
1861 };
1862 let (_, rmsg) = rmsg.into_streamid_and_msg();
1863 assert_eq!(rmsg.cmd(), RelayCmd::END);
1864
1865 (rx, sink) };
1867
1868 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1869 });
1870 }
1871
1872 #[traced_test]
1873 #[test]
1874 fn drop_stream() {
1875 close_stream_helper(true);
1876 }
1877
1878 #[traced_test]
1879 #[test]
1880 fn close_stream() {
1881 close_stream_helper(false);
1882 }
1883
1884 #[traced_test]
1885 #[test]
1886 fn expire_halfstreams() {
1887 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1888 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1889 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1890
1891 let client_fut = async move {
1892 let stream = tunnel
1893 .begin_stream("www.example.com", 80, None)
1894 .await
1895 .unwrap();
1896
1897 let (r, mut w) = stream.split();
1898 w.close().await.unwrap();
1900 (Some(r), tunnel)
1901 };
1902 let exit_fut = async {
1903 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1905 let rmsg = match msg {
1906 AnyChanMsg::Relay(r) => {
1907 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1908 .unwrap()
1909 }
1910 other => panic!("{:?}", other),
1911 };
1912 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1913 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1914
1915 let connected =
1917 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1918 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1919 .await
1920 .unwrap();
1921
1922 (rx, streamid, sink) };
1924
1925 let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1926 futures::join!(client_fut, exit_fut);
1927
1928 rt.progress_until_stalled().await;
1931
1932 assert!(!tunnel.is_closed());
1934
1935 let data = relaymsg::Data::new(b"hello").unwrap();
1938 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1939 .await
1940 .unwrap();
1941 rt.progress_until_stalled().await;
1942
1943 assert!(!tunnel.is_closed());
1945
1946 let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1951 rt.advance_by(2 * stream_timeout).await;
1952
1953 let data = relaymsg::Data::new(b"hello").unwrap();
1956 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1957 .await
1958 .unwrap();
1959 rt.progress_until_stalled().await;
1960
1961 assert!(tunnel.is_closed());
1963 });
1964 }
1965
1966 async fn setup_incoming_sendme_case<R: Runtime>(
1968 rt: &R,
1969 n_to_send: usize,
1970 ) -> (
1971 Arc<ClientTunnel>,
1972 DataStream,
1973 CircuitRxSender,
1974 Option<StreamId>,
1975 usize,
1976 Receiver<AnyChanCell>,
1977 Sender<CodecResult>,
1978 ) {
1979 let (chan, mut rx, sink2) = working_fake_channel(rt);
1980 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1981 let circid = tunnel.as_single_circ().unwrap().peek_circid();
1982
1983 let begin_and_send_fut = {
1984 let tunnel = tunnel.clone();
1985 async move {
1986 let mut stream = tunnel
1988 .begin_stream("www.example.com", 443, None)
1989 .await
1990 .unwrap();
1991 let junk = [0_u8; 1024];
1992 let mut remaining = n_to_send;
1993 while remaining > 0 {
1994 let n = std::cmp::min(remaining, junk.len());
1995 stream.write_all(&junk[..n]).await.unwrap();
1996 remaining -= n;
1997 }
1998 stream.flush().await.unwrap();
1999 stream
2000 }
2001 };
2002
2003 let receive_fut = async move {
2004 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2006 let rmsg = match chmsg {
2007 AnyChanMsg::Relay(r) => {
2008 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2009 .unwrap()
2010 }
2011 other => panic!("{:?}", other),
2012 };
2013 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2014 assert_matches!(rmsg, AnyRelayMsg::Begin(_));
2015 let connected = relaymsg::Connected::new_empty().into();
2017 sink.send(rmsg_to_ccmsg(streamid, connected, false))
2018 .await
2019 .unwrap();
2020 let mut bytes_received = 0_usize;
2022 let mut cells_received = 0_usize;
2023 while bytes_received < n_to_send {
2024 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2026 assert_eq!(id, Some(circid));
2027
2028 let rmsg = match chmsg {
2029 AnyChanMsg::Relay(r) => {
2030 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2031 .unwrap()
2032 }
2033 other => panic!("{:?}", other),
2034 };
2035 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2036 assert_eq!(streamid2, streamid);
2037 if let AnyRelayMsg::Data(dat) = rmsg {
2038 cells_received += 1;
2039 bytes_received += dat.as_ref().len();
2040 } else {
2041 panic!();
2042 }
2043 }
2044
2045 (sink, streamid, cells_received, rx)
2046 };
2047
2048 let (stream, (sink, streamid, cells_received, rx)) =
2049 futures::join!(begin_and_send_fut, receive_fut);
2050
2051 (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2052 }
2053
2054 #[traced_test]
2055 #[test]
2056 fn accept_valid_sendme() {
2057 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2058 let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2059 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2060 let circ = tunnel.as_single_circ().unwrap();
2061
2062 assert_eq!(cells_received, 301);
2063
2064 {
2066 let (tx, rx) = oneshot::channel();
2067 circ.command
2068 .unbounded_send(CtrlCmd::QuerySendWindow {
2069 hop: 2.into(),
2070 leg: tunnel.unique_id(),
2071 done: tx,
2072 })
2073 .unwrap();
2074 let (window, tags) = rx.await.unwrap().unwrap();
2075 assert_eq!(window, 1000 - 301);
2076 assert_eq!(tags.len(), 3);
2077 assert_eq!(
2079 tags[0],
2080 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2081 );
2082 assert_eq!(
2084 tags[1],
2085 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2086 );
2087 assert_eq!(
2089 tags[2],
2090 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2091 );
2092 }
2093
2094 let reply_with_sendme_fut = async move {
2095 let c_sendme =
2097 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2098 .into();
2099 sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2100 .await
2101 .unwrap();
2102
2103 let s_sendme = relaymsg::Sendme::new_empty().into();
2105 sink.send(rmsg_to_ccmsg(streamid, s_sendme, false))
2106 .await
2107 .unwrap();
2108
2109 sink
2110 };
2111
2112 let _sink = reply_with_sendme_fut.await;
2113
2114 rt.advance_until_stalled().await;
2115
2116 {
2119 let (tx, rx) = oneshot::channel();
2120 circ.command
2121 .unbounded_send(CtrlCmd::QuerySendWindow {
2122 hop: 2.into(),
2123 leg: tunnel.unique_id(),
2124 done: tx,
2125 })
2126 .unwrap();
2127 let (window, _tags) = rx.await.unwrap().unwrap();
2128 assert_eq!(window, 1000 - 201);
2129 }
2130 });
2131 }
2132
2133 #[traced_test]
2134 #[test]
2135 fn invalid_circ_sendme() {
2136 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2137 let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2141 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2142
2143 let reply_with_sendme_fut = async move {
2144 let c_sendme =
2146 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2147 .into();
2148 sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2149 .await
2150 .unwrap();
2151 sink
2152 };
2153
2154 let _sink = reply_with_sendme_fut.await;
2155
2156 rt.advance_until_stalled().await;
2158 assert!(tunnel.is_closed());
2159 });
2160 }
2161
2162 #[traced_test]
2163 #[test]
2164 fn test_busy_stream_fairness() {
2165 const N_STREAMS: usize = 3;
2167 const N_CELLS: usize = 20;
2169 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2172 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2179 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2180
2181 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2182 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2183 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2184
2185 rt.spawn({
2191 let tunnel = tunnel.clone();
2194 async move {
2195 let mut clients = VecDeque::new();
2196 struct Client {
2197 stream: DataStream,
2198 to_write: &'static [u8],
2199 }
2200 for _ in 0..N_STREAMS {
2201 clients.push_back(Client {
2202 stream: tunnel
2203 .begin_stream("www.example.com", 80, None)
2204 .await
2205 .unwrap(),
2206 to_write: &[0_u8; N_BYTES][..],
2207 });
2208 }
2209 while let Some(mut client) = clients.pop_front() {
2210 if client.to_write.is_empty() {
2211 continue;
2213 }
2214 let written = client.stream.write(client.to_write).await.unwrap();
2215 client.to_write = &client.to_write[written..];
2216 clients.push_back(client);
2217 }
2218 }
2219 })
2220 .unwrap();
2221
2222 let channel_handler_fut = async {
2223 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2224 let mut total_bytes_received = 0;
2225
2226 loop {
2227 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2228 let rmsg = match msg {
2229 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2230 RelayCellFormat::V0,
2231 r.into_relay_body(),
2232 )
2233 .unwrap(),
2234 other => panic!("Unexpected chanmsg: {other:?}"),
2235 };
2236 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2237 match rmsg.cmd() {
2238 RelayCmd::BEGIN => {
2239 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2241 assert_eq!(prev, None);
2242 let connected = relaymsg::Connected::new_with_addr(
2244 "10.0.0.1".parse().unwrap(),
2245 1234,
2246 )
2247 .into();
2248 sink.send(rmsg_to_ccmsg(streamid, connected, false))
2249 .await
2250 .unwrap();
2251 }
2252 RelayCmd::DATA => {
2253 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2254 let nbytes = data_msg.as_ref().len();
2255 total_bytes_received += nbytes;
2256 let streamid = streamid.unwrap();
2257 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2258 *stream_bytes += nbytes;
2259 if total_bytes_received >= N_BYTES {
2260 break;
2261 }
2262 }
2263 RelayCmd::END => {
2264 continue;
2269 }
2270 other => {
2271 panic!("Unexpected command {other:?}");
2272 }
2273 }
2274 }
2275
2276 (total_bytes_received, stream_bytes_received, rx, sink)
2279 };
2280
2281 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2282 channel_handler_fut.await;
2283 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2284 for (sid, stream_bytes) in stream_bytes_received {
2285 assert!(
2286 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2287 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2288 );
2289 }
2290 });
2291 }
2292
2293 #[test]
2294 fn basic_params() {
2295 use super::CircParameters;
2296 let mut p = CircParameters::default();
2297 assert!(p.extend_by_ed25519_id);
2298
2299 p.extend_by_ed25519_id = false;
2300 assert!(!p.extend_by_ed25519_id);
2301 }
2302
2303 #[traced_test]
2304 #[test]
2305 #[cfg(feature = "hs-service")]
2306 fn allow_stream_requests_twice() {
2307 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2308 let (chan, _rx, _sink) = working_fake_channel(&rt);
2309 let (tunnel, _send) = newtunnel(&rt, chan).await;
2310
2311 let _incoming = tunnel
2312 .allow_stream_requests(
2313 &[tor_cell::relaycell::RelayCmd::BEGIN],
2314 tunnel.resolve_last_hop().await,
2315 AllowAllStreamsFilter,
2316 )
2317 .await
2318 .unwrap();
2319
2320 let incoming = tunnel
2321 .allow_stream_requests(
2322 &[tor_cell::relaycell::RelayCmd::BEGIN],
2323 tunnel.resolve_last_hop().await,
2324 AllowAllStreamsFilter,
2325 )
2326 .await;
2327
2328 assert!(incoming.is_err());
2330 });
2331 }
2332
2333 #[traced_test]
2334 #[test]
2335 #[cfg(feature = "hs-service")]
2336 fn allow_stream_requests() {
2337 use tor_cell::relaycell::msg::BeginFlags;
2338
2339 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2340 const TEST_DATA: &[u8] = b"ping";
2341
2342 let (chan, _rx, _sink) = working_fake_channel(&rt);
2343 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2344
2345 let rfmt = RelayCellFormat::V0;
2346
2347 let (tx, rx) = oneshot::channel();
2349 let mut incoming = tunnel
2350 .allow_stream_requests(
2351 &[tor_cell::relaycell::RelayCmd::BEGIN],
2352 tunnel.resolve_last_hop().await,
2353 AllowAllStreamsFilter,
2354 )
2355 .await
2356 .unwrap();
2357
2358 let simulate_service = async move {
2359 let stream = incoming.next().await.unwrap();
2360 let mut data_stream = stream
2361 .accept_data(relaymsg::Connected::new_empty())
2362 .await
2363 .unwrap();
2364 tx.send(()).unwrap();
2366
2367 let mut buf = [0_u8; TEST_DATA.len()];
2369 data_stream.read_exact(&mut buf).await.unwrap();
2370 assert_eq!(&buf, TEST_DATA);
2371
2372 tunnel
2373 };
2374
2375 let simulate_client = async move {
2376 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2377 let body: BoxedCellBody =
2378 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2379 .encode(rfmt, &mut testing_rng())
2380 .unwrap();
2381 let begin_msg = chanmsg::Relay::from(body);
2382
2383 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2385
2386 rx.await.unwrap();
2392 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2394 let body: BoxedCellBody =
2395 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2396 .encode(rfmt, &mut testing_rng())
2397 .unwrap();
2398 let data_msg = chanmsg::Relay::from(body);
2399
2400 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2401 send
2402 };
2403
2404 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2405 });
2406 }
2407
2408 #[traced_test]
2409 #[test]
2410 #[cfg(feature = "hs-service")]
2411 fn accept_stream_after_reject() {
2412 use tor_cell::relaycell::msg::AnyRelayMsg;
2413 use tor_cell::relaycell::msg::BeginFlags;
2414 use tor_cell::relaycell::msg::EndReason;
2415
2416 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2417 const TEST_DATA: &[u8] = b"ping";
2418 const STREAM_COUNT: usize = 2;
2419 let rfmt = RelayCellFormat::V0;
2420
2421 let (chan, _rx, _sink) = working_fake_channel(&rt);
2422 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2423
2424 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2426
2427 let mut incoming = tunnel
2428 .allow_stream_requests(
2429 &[tor_cell::relaycell::RelayCmd::BEGIN],
2430 tunnel.resolve_last_hop().await,
2431 AllowAllStreamsFilter,
2432 )
2433 .await
2434 .unwrap();
2435
2436 let simulate_service = async move {
2437 for i in 0..STREAM_COUNT {
2439 let stream = incoming.next().await.unwrap();
2440
2441 if i == 0 {
2443 stream
2444 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2445 .await
2446 .unwrap();
2447 tx.send(()).await.unwrap();
2449 continue;
2450 }
2451
2452 let mut data_stream = stream
2453 .accept_data(relaymsg::Connected::new_empty())
2454 .await
2455 .unwrap();
2456 tx.send(()).await.unwrap();
2458
2459 let mut buf = [0_u8; TEST_DATA.len()];
2461 data_stream.read_exact(&mut buf).await.unwrap();
2462 assert_eq!(&buf, TEST_DATA);
2463 }
2464
2465 tunnel
2466 };
2467
2468 let simulate_client = async move {
2469 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2470 let body: BoxedCellBody =
2471 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2472 .encode(rfmt, &mut testing_rng())
2473 .unwrap();
2474 let begin_msg = chanmsg::Relay::from(body);
2475
2476 for _ in 0..STREAM_COUNT {
2479 send.send(AnyChanMsg::Relay(begin_msg.clone()))
2480 .await
2481 .unwrap();
2482
2483 rx.next().await.unwrap();
2485 }
2486
2487 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2489 let body: BoxedCellBody =
2490 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2491 .encode(rfmt, &mut testing_rng())
2492 .unwrap();
2493 let data_msg = chanmsg::Relay::from(body);
2494
2495 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2496 send
2497 };
2498
2499 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2500 });
2501 }
2502
2503 #[traced_test]
2504 #[test]
2505 #[cfg(feature = "hs-service")]
2506 fn incoming_stream_bad_hop() {
2507 use tor_cell::relaycell::msg::BeginFlags;
2508
2509 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2510 const EXPECTED_HOP: u8 = 1;
2512 let rfmt = RelayCellFormat::V0;
2513
2514 let (chan, _rx, _sink) = working_fake_channel(&rt);
2515 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2516
2517 let mut incoming = tunnel
2519 .allow_stream_requests(
2520 &[tor_cell::relaycell::RelayCmd::BEGIN],
2521 (
2523 tunnel.as_single_circ().unwrap().unique_id(),
2524 EXPECTED_HOP.into(),
2525 )
2526 .into(),
2527 AllowAllStreamsFilter,
2528 )
2529 .await
2530 .unwrap();
2531
2532 let simulate_service = async move {
2533 assert!(incoming.next().await.is_none());
2536 tunnel
2537 };
2538
2539 let simulate_client = async move {
2540 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2541 let body: BoxedCellBody =
2542 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2543 .encode(rfmt, &mut testing_rng())
2544 .unwrap();
2545 let begin_msg = chanmsg::Relay::from(body);
2546
2547 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2549
2550 send
2551 };
2552
2553 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2554 });
2555 }
2556
2557 #[traced_test]
2558 #[test]
2559 #[cfg(feature = "conflux")]
2560 fn multipath_circ_validation() {
2561 use std::error::Error as _;
2562
2563 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2564 let params = CircParameters::default();
2565 let invalid_tunnels = [
2566 setup_bad_conflux_tunnel(&rt).await,
2567 setup_conflux_tunnel(&rt, true, params).await,
2568 ];
2569
2570 for tunnel in invalid_tunnels {
2571 let TestTunnelCtx {
2572 tunnel: _tunnel,
2573 circs: _circs,
2574 conflux_link_rx,
2575 } = tunnel;
2576
2577 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2578 let err_src = conflux_hs_err.source().unwrap();
2579
2580 assert!(
2583 err_src
2584 .to_string()
2585 .contains("one more conflux circuits are invalid")
2586 );
2587 }
2588 });
2589 }
2590
2591 #[derive(Debug)]
2595 #[allow(unused)]
2596 #[cfg(feature = "conflux")]
2597 struct TestCircuitCtx {
2598 chan_rx: Receiver<AnyChanCell>,
2599 chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2600 circ_tx: CircuitRxSender,
2601 unique_id: UniqId,
2602 }
2603
2604 #[derive(Debug)]
2605 #[cfg(feature = "conflux")]
2606 struct TestTunnelCtx {
2607 tunnel: Arc<ClientTunnel>,
2608 circs: Vec<TestCircuitCtx>,
2609 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2610 }
2611
2612 #[cfg(feature = "conflux")]
2614 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2615 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2617 let rmsg = match chmsg {
2618 AnyChanMsg::Relay(r) => {
2619 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2620 .unwrap()
2621 }
2622 other => panic!("{:?}", other),
2623 };
2624 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2625
2626 let link = match rmsg {
2627 AnyRelayMsg::ConfluxLink(link) => link,
2628 _ => panic!("unexpected relay message {rmsg:?}"),
2629 };
2630
2631 assert!(streamid.is_none());
2632
2633 link
2634 }
2635
2636 #[cfg(feature = "conflux")]
2637 async fn setup_conflux_tunnel(
2638 rt: &MockRuntime,
2639 same_hops: bool,
2640 params: CircParameters,
2641 ) -> TestTunnelCtx {
2642 let hops1 = hop_details(3, 0);
2643 let hops2 = if same_hops {
2644 hops1.clone()
2645 } else {
2646 hop_details(3, 10)
2647 };
2648
2649 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2650 let (mut tunnel1, sink1) = newtunnel_ext(
2651 rt,
2652 UniqId::new(1, 3),
2653 chan1,
2654 hops1,
2655 2.into(),
2656 params.clone(),
2657 )
2658 .await;
2659
2660 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2661
2662 let (tunnel2, sink2) =
2663 newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2664
2665 let (answer_tx, answer_rx) = oneshot::channel();
2666 tunnel2
2667 .as_single_circ()
2668 .unwrap()
2669 .command
2670 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2671 .unwrap();
2672
2673 let circuit = answer_rx.await.unwrap().unwrap();
2674 rt.advance_until_stalled().await;
2676 assert!(tunnel2.is_closed());
2677
2678 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2679 tunnel1
2681 .as_single_circ()
2682 .unwrap()
2683 .control
2684 .unbounded_send(CtrlMsg::LinkCircuits {
2685 circuits: vec![circuit],
2686 answer: conflux_link_tx,
2687 })
2688 .unwrap();
2689
2690 let circ_ctx1 = TestCircuitCtx {
2691 chan_rx: rx1,
2692 chan_tx: chan_sink1,
2693 circ_tx: sink1,
2694 unique_id: tunnel1.unique_id(),
2695 };
2696
2697 let circ_ctx2 = TestCircuitCtx {
2698 chan_rx: rx2,
2699 chan_tx: chan_sink2,
2700 circ_tx: sink2,
2701 unique_id: tunnel2.unique_id(),
2702 };
2703
2704 tunnel1.circ.is_multi_path = true;
2710 TestTunnelCtx {
2711 tunnel: Arc::new(tunnel1),
2712 circs: vec![circ_ctx1, circ_ctx2],
2713 conflux_link_rx,
2714 }
2715 }
2716
2717 #[cfg(feature = "conflux")]
2718 async fn setup_good_conflux_tunnel(
2719 rt: &MockRuntime,
2720 cc_params: CongestionControlParams,
2721 ) -> TestTunnelCtx {
2722 let same_hops = true;
2728 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2729 let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2730 setup_conflux_tunnel(rt, same_hops, params).await
2731 }
2732
2733 #[cfg(feature = "conflux")]
2734 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2735 let same_hops = false;
2739 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2740 let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2741 setup_conflux_tunnel(rt, same_hops, params).await
2742 }
2743
2744 #[traced_test]
2745 #[test]
2746 #[cfg(feature = "conflux")]
2747 fn reject_conflux_linked_before_hs() {
2748 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2749 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2750 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2751
2752 let nonce = V1Nonce::new(&mut testing_rng());
2753 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2754 let linked = relaymsg::ConfluxLinked::new(payload).into();
2756 sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
2757
2758 rt.advance_until_stalled().await;
2759 assert!(tunnel.is_closed());
2760 });
2761 }
2762
2763 #[traced_test]
2764 #[test]
2765 #[cfg(feature = "conflux")]
2766 fn conflux_hs_timeout() {
2767 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2768 let TestTunnelCtx {
2769 tunnel: _tunnel,
2770 circs,
2771 conflux_link_rx,
2772 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2773
2774 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2775
2776 let link = await_link_payload(&mut circ1.chan_rx).await;
2778
2779 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2781 circ1
2782 .circ_tx
2783 .send(rmsg_to_ccmsg(None, linked, false))
2784 .await
2785 .unwrap();
2786
2787 rt.advance_by(Duration::from_secs(60)).await;
2789
2790 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2791
2792 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2794 conflux_hs_res.try_into().unwrap();
2795
2796 assert!(res1.is_ok());
2797
2798 let err = res2.unwrap_err();
2799 assert_matches!(err, ConfluxHandshakeError::Timeout);
2800 });
2801 }
2802
2803 #[traced_test]
2804 #[test]
2805 #[cfg(feature = "conflux")]
2806 fn conflux_bad_hs() {
2807 use crate::util::err::ConfluxHandshakeError;
2808
2809 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2810 let nonce = V1Nonce::new(&mut testing_rng());
2811 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2812 let bad_hs_responses = [
2814 (
2815 rmsg_to_ccmsg(
2816 None,
2817 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2818 false,
2819 ),
2820 "Received CONFLUX_LINKED cell with mismatched nonce",
2821 ),
2822 (
2823 rmsg_to_ccmsg(
2824 None,
2825 relaymsg::ConfluxLink::new(bad_link_payload).into(),
2826 false,
2827 ),
2828 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2829 ),
2830 (
2831 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2832 "Received CONFLUX_SWITCH on unlinked circuit?!",
2833 ),
2834 ];
2843
2844 for (bad_cell, expected_err) in bad_hs_responses {
2845 let TestTunnelCtx {
2846 tunnel,
2847 circs,
2848 conflux_link_rx,
2849 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2850
2851 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2852
2853 circ2.circ_tx.send(bad_cell).await.unwrap();
2855
2856 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2857 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2861 conflux_hs_res.try_into().unwrap();
2862
2863 match res2.unwrap_err() {
2864 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2865 assert_eq!(e, expected_err);
2866 }
2867 e => panic!("unexpected error: {e:?}"),
2868 }
2869
2870 assert!(tunnel.is_closed());
2871 }
2872 });
2873 }
2874
2875 #[traced_test]
2876 #[test]
2877 #[cfg(feature = "conflux")]
2878 fn unexpected_conflux_cell() {
2879 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2880 let nonce = V1Nonce::new(&mut testing_rng());
2881 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2882 let bad_cells = [
2883 rmsg_to_ccmsg(
2884 None,
2885 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2886 false,
2887 ),
2888 rmsg_to_ccmsg(
2889 None,
2890 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2891 false,
2892 ),
2893 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2894 ];
2895
2896 for bad_cell in bad_cells {
2897 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2898 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2899
2900 sink.send(bad_cell).await.unwrap();
2901 rt.advance_until_stalled().await;
2902
2903 assert!(tunnel.is_closed());
2907 }
2908 });
2909 }
2910
2911 #[traced_test]
2912 #[test]
2913 #[cfg(feature = "conflux")]
2914 fn conflux_bad_linked() {
2915 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2916 let TestTunnelCtx {
2917 tunnel,
2918 circs,
2919 conflux_link_rx: _,
2920 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2921
2922 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2923
2924 let link = await_link_payload(&mut circ1.chan_rx).await;
2925
2926 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2928 circ1
2929 .circ_tx
2930 .send(rmsg_to_ccmsg(None, linked, false))
2931 .await
2932 .unwrap();
2933
2934 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2936 circ2
2937 .circ_tx
2938 .send(rmsg_to_ccmsg(None, linked, false))
2939 .await
2940 .unwrap();
2941 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2942 circ2
2943 .circ_tx
2944 .send(rmsg_to_ccmsg(None, linked, false))
2945 .await
2946 .unwrap();
2947
2948 rt.advance_until_stalled().await;
2949
2950 assert!(tunnel.is_closed());
2953 });
2954 }
2955
2956 #[traced_test]
2957 #[test]
2958 #[cfg(feature = "conflux")]
2959 fn conflux_bad_switch() {
2960 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2961 let cc_vegas_params = build_cc_vegas_params();
2962 let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2963 let bad_switch = [
2964 relaymsg::ConfluxSwitch::new(0),
2966 relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2969 ];
2970
2971 for bad_cell in bad_switch {
2972 let TestTunnelCtx {
2973 tunnel,
2974 circs,
2975 conflux_link_rx,
2976 } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2977
2978 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2979
2980 let link = await_link_payload(&mut circ1.chan_rx).await;
2981
2982 for circ in [&mut circ1, &mut circ2] {
2984 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2985 circ.circ_tx
2986 .send(rmsg_to_ccmsg(None, linked, false))
2987 .await
2988 .unwrap();
2989 }
2990
2991 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2992 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2993
2994 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into(), false);
2997 circ1.circ_tx.send(msg).await.unwrap();
2998
2999 rt.advance_until_stalled().await;
3001 assert!(tunnel.is_closed());
3002 }
3003 });
3004 }
3005
3006 #[traced_test]
3007 #[test]
3008 #[cfg(feature = "conflux")]
3009 fn conflux_consecutive_switch() {
3010 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3011 let TestTunnelCtx {
3012 tunnel,
3013 circs,
3014 conflux_link_rx,
3015 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3016
3017 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3018
3019 let link = await_link_payload(&mut circ1.chan_rx).await;
3020
3021 for circ in [&mut circ1, &mut circ2] {
3023 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3024 circ.circ_tx
3025 .send(rmsg_to_ccmsg(None, linked, false))
3026 .await
3027 .unwrap();
3028 }
3029
3030 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3031 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3032
3033 let switch1 = relaymsg::ConfluxSwitch::new(10);
3035 let msg = rmsg_to_ccmsg(None, switch1.into(), false);
3036 circ1.circ_tx.send(msg).await.unwrap();
3037
3038 rt.advance_until_stalled().await;
3040 assert!(!tunnel.is_closed());
3041
3042 let switch2 = relaymsg::ConfluxSwitch::new(12);
3044 let msg = rmsg_to_ccmsg(None, switch2.into(), false);
3045 circ1.circ_tx.send(msg).await.unwrap();
3046
3047 rt.advance_until_stalled().await;
3050 assert!(tunnel.is_closed());
3051 });
3052 }
3053
3054 #[traced_test]
3057 #[test]
3058 #[cfg(feature = "conflux")]
3059 fn shutdown_and_return_circ_multipath() {
3060 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3061 let TestTunnelCtx {
3062 tunnel,
3063 circs,
3064 conflux_link_rx: _,
3065 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3066
3067 rt.progress_until_stalled().await;
3068
3069 let (answer_tx, answer_rx) = oneshot::channel();
3070 tunnel
3071 .circ
3072 .command
3073 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3074 .unwrap();
3075
3076 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3078 let err = answer_rx
3079 .await
3080 .unwrap()
3081 .map(|_| {
3082 ()
3085 })
3086 .unwrap_err();
3087
3088 const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3089 assert!(err.to_string().contains(MSG), "{err}");
3090
3091 rt.progress_until_stalled().await;
3094 assert!(tunnel.is_closed());
3095
3096 drop(circs);
3099 });
3100 }
3101
3102 #[cfg(feature = "conflux")]
3104 #[derive(Debug)]
3105 enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3106 Relay(ConfluxExitState<I>),
3108 Client {
3110 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3112 tunnel: Arc<ClientTunnel>,
3114 send_data: Vec<u8>,
3116 recv_data: Vec<u8>,
3118 },
3119 }
3120
3121 #[allow(unused, clippy::large_enum_variant)]
3124 #[derive(Debug)]
3125 #[cfg(feature = "conflux")]
3126 enum ConfluxEndpointResult {
3127 Circuit {
3128 tunnel: Arc<ClientTunnel>,
3129 stream: DataStream,
3130 },
3131 Relay {
3132 circ: TestCircuitCtx,
3133 },
3134 }
3135
3136 #[derive(Debug)]
3138 #[cfg(feature = "conflux")]
3139 struct ConfluxStreamState {
3140 data_recvd: Vec<u8>,
3142 expected_data_len: usize,
3144 begin_recvd: bool,
3146 end_recvd: bool,
3148 end_sent: bool,
3150 }
3151
3152 #[cfg(feature = "conflux")]
3153 impl ConfluxStreamState {
3154 fn new(expected_data_len: usize) -> Self {
3155 Self {
3156 data_recvd: vec![],
3157 expected_data_len,
3158 begin_recvd: false,
3159 end_recvd: false,
3160 end_sent: false,
3161 }
3162 }
3163 }
3164
3165 #[derive(Debug)]
3168 #[cfg(feature = "conflux")]
3169 struct ExpectedSwitch {
3170 cells_so_far: usize,
3173 seqno: u32,
3175 }
3176
3177 #[cfg(feature = "conflux")]
3183 struct CellDispatcher {
3184 leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3186 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3188 }
3189
3190 #[cfg(feature = "conflux")]
3191 impl CellDispatcher {
3192 async fn run(mut self) {
3193 while !self.cells_to_send.is_empty() {
3194 let (circ_id, cell) = self.cells_to_send.remove(0);
3195 let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3196 let (done_tx, done_rx) = oneshot::channel();
3197 cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3198 let () = done_rx.await.unwrap();
3200 }
3201 }
3202 }
3203
3204 #[cfg(feature = "conflux")]
3206 #[derive(Debug)]
3207 struct CellToSend {
3208 done_tx: oneshot::Sender<()>,
3210 cell: AnyRelayMsg,
3212 }
3213
3214 #[derive(Debug)]
3216 #[cfg(feature = "conflux")]
3217 struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3218 runtime: Arc<AsyncMutex<MockRuntime>>,
3225 tunnel: Arc<ClientTunnel>,
3227 circ: TestCircuitCtx,
3229 rtt_delays: I,
3233 stream_state: Arc<Mutex<ConfluxStreamState>>,
3236 expect_switch: Vec<ExpectedSwitch>,
3239 event_rx: mpsc::Receiver<MockExitEvent>,
3241 event_tx: mpsc::Sender<MockExitEvent>,
3243 is_sending_leg: bool,
3245 cells_rx: mpsc::Receiver<CellToSend>,
3247 }
3248
3249 #[cfg(feature = "conflux")]
3250 async fn good_exit_handshake(
3251 runtime: &Arc<AsyncMutex<MockRuntime>>,
3252 init_rtt_delay: Option<Duration>,
3253 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3254 sink: &mut CircuitRxSender,
3255 ) {
3256 let link = await_link_payload(rx).await;
3258
3259 if let Some(init_rtt_delay) = init_rtt_delay {
3262 runtime.lock().await.advance_by(init_rtt_delay).await;
3263 }
3264
3265 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3267 sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
3268
3269 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3271 let rmsg = match chmsg {
3272 AnyChanMsg::Relay(r) => {
3273 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3274 .unwrap()
3275 }
3276 other => panic!("{other:?}"),
3277 };
3278 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3279
3280 assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3281 }
3282
3283 #[derive(Copy, Clone, Debug)]
3285 enum MockExitEvent {
3286 Done,
3288 BeginRecvd(StreamId),
3290 }
3291
3292 #[cfg(feature = "conflux")]
3293 async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3294 state: ConfluxExitState<I>,
3295 ) -> ConfluxEndpointResult {
3296 let ConfluxExitState {
3297 runtime,
3298 tunnel,
3299 mut circ,
3300 rtt_delays,
3301 stream_state,
3302 mut expect_switch,
3303 mut event_tx,
3304 mut event_rx,
3305 is_sending_leg,
3306 mut cells_rx,
3307 } = state;
3308
3309 let mut rtt_delays = rtt_delays.into_iter();
3310
3311 let stream_len = stream_state.lock().unwrap().expected_data_len;
3313 let mut data_cells_received = 0_usize;
3314 let mut cell_count = 0_usize;
3315 let mut tags = vec![];
3316 let mut streamid = None;
3317 let mut done_writing = false;
3318
3319 loop {
3320 let should_exit = {
3321 let stream_state = stream_state.lock().unwrap();
3322 let done_reading = stream_state.data_recvd.len() >= stream_len;
3323
3324 (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3325 };
3326
3327 if should_exit {
3328 break;
3329 }
3330
3331 use futures::select;
3332
3333 let mut next_cell = if streamid.is_some() && !done_writing {
3336 Box::pin(cells_rx.next().fuse())
3337 as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3338 } else {
3339 Box::pin(std::future::pending().fuse())
3340 };
3341
3342 let res = select! {
3345 res = circ.chan_rx.next() => {
3346 res.unwrap()
3347 },
3348 res = event_rx.next() => {
3349 let Some(event) = res else {
3350 break;
3351 };
3352
3353 match event {
3354 MockExitEvent::Done => {
3355 break;
3356 },
3357 MockExitEvent::BeginRecvd(id) => {
3358 streamid = Some(id);
3361 continue;
3362 },
3363 }
3364 }
3365 res = next_cell => {
3366 if let Some(cell_to_send) = res {
3367 let CellToSend { cell, done_tx } = cell_to_send;
3368
3369 let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3371 None
3372 } else {
3373 streamid
3374 };
3375
3376 circ.circ_tx
3377 .send(rmsg_to_ccmsg(streamid, cell, false))
3378 .await
3379 .unwrap();
3380
3381 runtime.lock().await.advance_until_stalled().await;
3382 done_tx.send(()).unwrap();
3383 } else {
3384 done_writing = true;
3385 }
3386
3387 continue;
3388 }
3389 };
3390
3391 let (_id, chmsg) = res.into_circid_and_msg();
3392 cell_count += 1;
3393 let rmsg = match chmsg {
3394 AnyChanMsg::Relay(r) => {
3395 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3396 .unwrap()
3397 }
3398 other => panic!("{:?}", other),
3399 };
3400 let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3401 if streamid.is_none() {
3402 streamid = new_streamid;
3403 }
3404
3405 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3406 let end_recvd = stream_state.lock().unwrap().end_recvd;
3407 match rmsg {
3408 AnyRelayMsg::Begin(_) if begin_recvd => {
3409 panic!("client tried to open two streams?!");
3410 }
3411 AnyRelayMsg::Begin(_) if !begin_recvd => {
3412 stream_state.lock().unwrap().begin_recvd = true;
3413 let connected = relaymsg::Connected::new_empty().into();
3415 circ.circ_tx
3416 .send(rmsg_to_ccmsg(streamid, connected, false))
3417 .await
3418 .unwrap();
3419 event_tx
3421 .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3422 .await
3423 .unwrap();
3424 }
3425 AnyRelayMsg::End(_) if !end_recvd => {
3426 stream_state.lock().unwrap().end_recvd = true;
3427 break;
3428 }
3429 AnyRelayMsg::End(_) if end_recvd => {
3430 panic!("received two END cells for the same stream?!");
3431 }
3432 AnyRelayMsg::ConfluxSwitch(cell) => {
3433 let expected = expect_switch.remove(0);
3435
3436 assert_eq!(expected.cells_so_far, cell_count);
3437 assert_eq!(expected.seqno, cell.seqno());
3438
3439 continue;
3445 }
3446 AnyRelayMsg::Data(dat) => {
3447 data_cells_received += 1;
3448 stream_state
3449 .lock()
3450 .unwrap()
3451 .data_recvd
3452 .extend_from_slice(dat.as_ref());
3453
3454 let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3455 if is_next_cell_sendme {
3456 if tags.is_empty() {
3457 runtime.lock().await.advance_until_stalled().await;
3462 let (tx, rx) = oneshot::channel();
3463 tunnel
3464 .circ
3465 .command
3466 .unbounded_send(CtrlCmd::QuerySendWindow {
3467 hop: 2.into(),
3468 leg: circ.unique_id,
3469 done: tx,
3470 })
3471 .unwrap();
3472
3473 let (_window, new_tags) = rx.await.unwrap().unwrap();
3475 tags = new_tags;
3476 }
3477
3478 let tag = tags.remove(0);
3479
3480 if let Some(rtt_delay) = rtt_delays.next().flatten() {
3483 runtime.lock().await.advance_by(rtt_delay).await;
3484 }
3485 let sendme = relaymsg::Sendme::from(tag).into();
3487
3488 circ.circ_tx
3489 .send(rmsg_to_ccmsg(None, sendme, false))
3490 .await
3491 .unwrap();
3492 }
3493 }
3494 _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3495 }
3496 }
3497
3498 let end_recvd = stream_state.lock().unwrap().end_recvd;
3499
3500 if is_sending_leg && !end_recvd {
3502 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3503 circ.circ_tx
3504 .send(rmsg_to_ccmsg(streamid, end, false))
3505 .await
3506 .unwrap();
3507 stream_state.lock().unwrap().end_sent = true;
3508 }
3509
3510 let _ = event_tx.send(MockExitEvent::Done).await;
3512
3513 assert!(
3515 expect_switch.is_empty(),
3516 "expect_switch = {expect_switch:?}"
3517 );
3518
3519 ConfluxEndpointResult::Relay { circ }
3520 }
3521
3522 #[cfg(feature = "conflux")]
3523 async fn run_conflux_client(
3524 tunnel: Arc<ClientTunnel>,
3525 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3526 send_data: Vec<u8>,
3527 recv_data: Vec<u8>,
3528 ) -> ConfluxEndpointResult {
3529 let res = conflux_link_rx.await;
3530
3531 let res = res.unwrap().unwrap();
3532 assert_eq!(res.len(), 2);
3533
3534 let mut stream = tunnel
3539 .begin_stream("www.example.com", 443, None)
3540 .await
3541 .unwrap();
3542
3543 stream.write_all(&send_data).await.unwrap();
3544 stream.flush().await.unwrap();
3545
3546 let mut recv: Vec<u8> = Vec::new();
3547 let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3548 assert_eq!(recv_len, recv_data.len());
3549 assert_eq!(recv_data, recv);
3550
3551 ConfluxEndpointResult::Circuit { tunnel, stream }
3552 }
3553
3554 #[cfg(feature = "conflux")]
3555 async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3556 endpoint: ConfluxTestEndpoint<I>,
3557 ) -> ConfluxEndpointResult {
3558 match endpoint {
3559 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3560 ConfluxTestEndpoint::Client {
3561 tunnel,
3562 conflux_link_rx,
3563 send_data,
3564 recv_data,
3565 } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3566 }
3567 }
3568
3569 #[traced_test]
3587 #[test]
3588 #[cfg(feature = "conflux")]
3589 fn multipath_client_to_exit() {
3590 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3591 const NUM_CELLS: usize = 300;
3593 const CELL_SIZE: usize = 498;
3595
3596 let TestTunnelCtx {
3597 tunnel,
3598 circs,
3599 conflux_link_rx,
3600 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3601 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3602
3603 let mut send_data = (0..255_u8)
3605 .cycle()
3606 .take(NUM_CELLS * CELL_SIZE)
3607 .collect::<Vec<_>>();
3608 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3609
3610 let mut tasks = vec![];
3611
3612 let (tx1, rx1) = mpsc::channel(1);
3615 let (tx2, rx2) = mpsc::channel(1);
3616
3617 let circ1_rtt_delays = [
3622 Some(Duration::from_millis(100)),
3624 Some(Duration::from_millis(500)),
3628 Some(Duration::from_millis(700)),
3629 Some(Duration::from_millis(900)),
3630 Some(Duration::from_millis(1100)),
3631 Some(Duration::from_millis(1300)),
3632 Some(Duration::from_millis(1500)),
3633 Some(Duration::from_millis(1700)),
3634 Some(Duration::from_millis(1900)),
3635 Some(Duration::from_millis(2100)),
3636 ]
3637 .into_iter();
3638
3639 let circ2_rtt_delays = [
3640 Some(Duration::from_millis(200)),
3641 Some(Duration::from_millis(400)),
3642 Some(Duration::from_millis(600)),
3643 Some(Duration::from_millis(800)),
3644 Some(Duration::from_millis(1000)),
3645 Some(Duration::from_millis(1200)),
3646 Some(Duration::from_millis(1400)),
3647 Some(Duration::from_millis(1600)),
3648 Some(Duration::from_millis(1800)),
3649 Some(Duration::from_millis(2000)),
3650 ]
3651 .into_iter();
3652
3653 let expected_switches1 = vec![ExpectedSwitch {
3654 cells_so_far: 126,
3662 seqno: 124,
3671 }];
3672
3673 let expected_switches2 = vec![ExpectedSwitch {
3674 cells_so_far: 1,
3677 seqno: 125,
3679 }];
3680
3681 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3682
3683 let (_, cells_rx1) = mpsc::channel(1);
3686 let (_, cells_rx2) = mpsc::channel(1);
3687
3688 let relay1 = ConfluxExitState {
3689 runtime: Arc::clone(&relay_runtime),
3690 tunnel: Arc::clone(&tunnel),
3691 circ: circ1,
3692 rtt_delays: circ1_rtt_delays,
3693 stream_state: Arc::clone(&stream_state),
3694 expect_switch: expected_switches1,
3695 event_tx: tx1,
3696 event_rx: rx2,
3697 is_sending_leg: true,
3698 cells_rx: cells_rx1,
3699 };
3700
3701 let relay2 = ConfluxExitState {
3702 runtime: Arc::clone(&relay_runtime),
3703 tunnel: Arc::clone(&tunnel),
3704 circ: circ2,
3705 rtt_delays: circ2_rtt_delays,
3706 stream_state: Arc::clone(&stream_state),
3707 expect_switch: expected_switches2,
3708 event_tx: tx2,
3709 event_rx: rx1,
3710 is_sending_leg: false,
3711 cells_rx: cells_rx2,
3712 };
3713
3714 for mut mock_relay in [relay1, relay2] {
3715 let leg = mock_relay.circ.unique_id;
3716
3717 good_exit_handshake(
3725 &relay_runtime,
3726 mock_relay.rtt_delays.next().flatten(),
3727 &mut mock_relay.circ.chan_rx,
3728 &mut mock_relay.circ.circ_tx,
3729 )
3730 .await;
3731
3732 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3733
3734 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3735 }
3736
3737 tasks.push(rt.spawn_join(
3738 "client task".to_string(),
3739 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3740 tunnel,
3741 conflux_link_rx,
3742 send_data: send_data.clone(),
3743 recv_data: vec![],
3744 }),
3745 ));
3746 let _sinks = futures::future::join_all(tasks).await;
3747 let mut stream_state = stream_state.lock().unwrap();
3748 assert!(stream_state.begin_recvd);
3749
3750 stream_state.data_recvd.sort();
3751 send_data.sort();
3752 assert_eq!(stream_state.data_recvd, send_data);
3753 });
3754 }
3755
3756 #[cfg(feature = "conflux")]
3767 async fn run_multipath_exit_to_client_test(
3768 rt: MockRuntime,
3769 tunnel: TestTunnelCtx,
3770 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3771 send_data: Vec<u8>,
3772 recv_data: Vec<u8>,
3773 ) -> Arc<Mutex<ConfluxStreamState>> {
3774 let TestTunnelCtx {
3775 tunnel,
3776 circs,
3777 conflux_link_rx,
3778 } = tunnel;
3779 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3780
3781 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3782
3783 let mut tasks = vec![];
3784 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3785 let (cells_tx1, cells_rx1) = mpsc::channel(1);
3786 let (cells_tx2, cells_rx2) = mpsc::channel(1);
3787
3788 let dispatcher = CellDispatcher {
3789 leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3790 .into_iter()
3791 .collect(),
3792 cells_to_send,
3793 };
3794
3795 let (tx1, rx1) = mpsc::channel(1);
3798 let (tx2, rx2) = mpsc::channel(1);
3799
3800 let relay1 = ConfluxExitState {
3801 runtime: Arc::clone(&relay_runtime),
3802 tunnel: Arc::clone(&tunnel),
3803 circ: circ1,
3804 rtt_delays: [].into_iter(),
3805 stream_state: Arc::clone(&stream_state),
3806 expect_switch: vec![],
3808 event_tx: tx1,
3809 event_rx: rx2,
3810 is_sending_leg: false,
3811 cells_rx: cells_rx1,
3812 };
3813
3814 let relay2 = ConfluxExitState {
3815 runtime: Arc::clone(&relay_runtime),
3816 tunnel: Arc::clone(&tunnel),
3817 circ: circ2,
3818 rtt_delays: [].into_iter(),
3819 stream_state: Arc::clone(&stream_state),
3820 expect_switch: vec![],
3822 event_tx: tx2,
3823 event_rx: rx1,
3824 is_sending_leg: true,
3825 cells_rx: cells_rx2,
3826 };
3827
3828 rt.spawn(dispatcher.run()).unwrap();
3833
3834 for mut mock_relay in [relay1, relay2] {
3835 let leg = mock_relay.circ.unique_id;
3836
3837 good_exit_handshake(
3838 &relay_runtime,
3839 mock_relay.rtt_delays.next().flatten(),
3840 &mut mock_relay.circ.chan_rx,
3841 &mut mock_relay.circ.circ_tx,
3842 )
3843 .await;
3844
3845 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3846
3847 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3848 }
3849
3850 tasks.push(rt.spawn_join(
3851 "client task".to_string(),
3852 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3853 tunnel,
3854 conflux_link_rx,
3855 send_data: send_data.clone(),
3856 recv_data,
3857 }),
3858 ));
3859
3860 let _sinks = futures::future::join_all(tasks).await;
3862
3863 stream_state
3864 }
3865
3866 #[traced_test]
3867 #[test]
3868 #[cfg(feature = "conflux")]
3869 fn multipath_exit_to_client() {
3870 const TO_SEND: &[u8] =
3872 b"But something about Buster Friendly irritated John Isidore, one specific thing";
3873
3874 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3875 const CIRC1: usize = 0;
3877 const CIRC2: usize = 1;
3878
3879 let simple_switch = vec![
3903 (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3904 (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3905 (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3907 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3909 (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3912 (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3913 (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3914 ];
3915
3916 let multiple_switches = vec![
3963 (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3966 (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3968 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3969 (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3971 (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3972 (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3974 (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3976 (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3978 (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3980 (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3982 ];
3983
3984 let tests = [simple_switch, multiple_switches];
3990
3991 for cells_to_send in tests {
3992 let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3993 assert_eq!(tunnel.circs.len(), 2);
3994 let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3995 let cells_to_send = cells_to_send
3996 .into_iter()
3997 .map(|(i, cell)| (circ_ids[i], cell))
3998 .collect();
3999
4000 let send_data = vec![];
4002 let stream_state = run_multipath_exit_to_client_test(
4003 rt.clone(),
4004 tunnel,
4005 cells_to_send,
4006 send_data.clone(),
4007 TO_SEND.into(),
4008 )
4009 .await;
4010 let stream_state = stream_state.lock().unwrap();
4011 assert!(stream_state.begin_recvd);
4012 assert!(stream_state.data_recvd.is_empty());
4014 }
4015 });
4016 }
4017
4018 #[traced_test]
4019 #[test]
4020 #[cfg(all(feature = "conflux", feature = "hs-service"))]
4021 fn conflux_incoming_stream() {
4022 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4023 use std::error::Error as _;
4024
4025 const EXPECTED_HOP: u8 = 1;
4026
4027 let TestTunnelCtx {
4028 tunnel,
4029 circs,
4030 conflux_link_rx,
4031 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4032
4033 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4034
4035 let link = await_link_payload(&mut circ1.chan_rx).await;
4036 for circ in [&mut circ1, &mut circ2] {
4037 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4038 circ.circ_tx
4039 .send(rmsg_to_ccmsg(None, linked, false))
4040 .await
4041 .unwrap();
4042 }
4043
4044 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4045 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4046
4047 let err = tunnel
4049 .allow_stream_requests(
4050 &[tor_cell::relaycell::RelayCmd::BEGIN],
4051 (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4052 AllowAllStreamsFilter,
4053 )
4054 .await
4055 .map(|_| ())
4057 .unwrap_err();
4058
4059 let err_src = err.source().unwrap().to_string();
4060 assert!(
4061 err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4062 "{err_src}"
4063 );
4064 });
4065 }
4066
4067 #[test]
4068 fn client_circ_chan_msg() {
4069 use tor_cell::chancell::msg::{self, AnyChanMsg};
4070 fn good(m: AnyChanMsg) {
4071 assert!(ClientCircChanMsg::try_from(m).is_ok());
4072 }
4073 fn bad(m: AnyChanMsg) {
4074 assert!(ClientCircChanMsg::try_from(m).is_err());
4075 }
4076
4077 good(msg::Destroy::new(2.into()).into());
4078 bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4079 bad(msg::Created2::new(&b"and the next"[..]).into());
4080 good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4081 bad(msg::AnyChanMsg::RelayEarly(
4082 msg::Relay::new(&b"for the world and its mother"[..]).into(),
4083 ));
4084 bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4085 }
4086}