1use crate::factory::BootstrapReporter;
4use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
5use crate::util::defer::Defer;
6use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
7
8use async_trait::async_trait;
9use futures::future::Shared;
10use oneshot_fused_workaround as oneshot;
11use std::result::Result as StdResult;
12use std::sync::Arc;
13use std::time::Duration;
14use tor_error::{error_report, internal};
15use tor_linkspec::{HasChanMethod, HasRelayIds};
16use tor_netdir::params::NetParameters;
17use tor_proto::channel::kist::KistParams;
18use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
19use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
20use tracing::{instrument, trace};
21
22#[cfg(feature = "relay")]
23use {safelog::Sensitive, std::net::SocketAddr, tor_proto::RelayChannelAuthMaterial};
24
25mod select;
26mod state;
27
28pub(crate) trait AbstractChannel: HasRelayIds {
32 fn is_canonical(&self) -> bool;
34 fn is_canonical_to_peer(&self) -> bool;
36 fn is_usable(&self) -> bool;
42 fn duration_unused(&self) -> Option<Duration>;
45
46 fn reparameterize(
51 &self,
52 updates: Arc<ChannelPaddingInstructionsUpdates>,
53 ) -> tor_proto::Result<()>;
54
55 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
60
61 fn engage_padding_activities(&self);
67}
68
69#[async_trait]
75pub(crate) trait AbstractChannelFactory {
76 type Channel: AbstractChannel;
78 type BuildSpec: HasRelayIds + HasChanMethod;
80 type Stream;
82
83 async fn build_channel(
90 &self,
91 target: &Self::BuildSpec,
92 reporter: BootstrapReporter,
93 memquota: ChannelAccount,
94 ) -> Result<Arc<Self::Channel>>;
95
96 #[cfg(feature = "relay")]
98 async fn build_channel_using_incoming(
99 &self,
100 peer: Sensitive<std::net::SocketAddr>,
101 stream: Self::Stream,
102 memquota: ChannelAccount,
103 ) -> Result<Arc<Self::Channel>>;
104}
105
106#[derive(Default)]
108pub struct ChanMgrConfig {
109 pub(crate) cfg: ChannelConfig,
111 #[cfg(feature = "relay")]
113 pub(crate) auth_material: Option<Arc<RelayChannelAuthMaterial>>,
114 #[cfg(feature = "relay")]
117 pub(crate) my_addrs: Vec<SocketAddr>,
118 }
120
121impl ChanMgrConfig {
122 pub fn new(cfg: ChannelConfig) -> Self {
124 Self {
125 cfg,
126 #[cfg(feature = "relay")]
127 auth_material: None,
128 #[cfg(feature = "relay")]
129 my_addrs: Vec::new(),
130 }
131 }
132
133 #[cfg(feature = "relay")]
135 pub fn with_auth_material(mut self, auth_material: Arc<RelayChannelAuthMaterial>) -> Self {
136 self.auth_material = Some(auth_material);
137 self
138 }
139
140 #[cfg(feature = "relay")]
142 pub fn with_my_addrs(mut self, my_addrs: Vec<SocketAddr>) -> Self {
143 self.my_addrs = my_addrs;
144 self
145 }
146}
147
148pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
157 pub(crate) channels: state::MgrState<CF>,
162
163 pub(crate) reporter: BootstrapReporter,
165
166 pub(crate) memquota: ToplevelAccount,
168}
169
170type Pending = Shared<oneshot::Receiver<Result<()>>>;
173
174type Sending = oneshot::Sender<Result<()>>;
177
178impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
179 pub(crate) fn new(
181 connector: CF,
182 config: ChannelConfig,
183 dormancy: Dormancy,
184 netparams: &NetParameters,
185 reporter: BootstrapReporter,
186 memquota: ToplevelAccount,
187 ) -> Self {
188 AbstractChanMgr {
189 channels: state::MgrState::new(connector, config, dormancy, netparams),
190 reporter,
191 memquota,
192 }
193 }
194
195 #[allow(unused)]
197 pub(crate) fn with_mut_builder<F>(&self, func: F)
198 where
199 F: FnOnce(&mut CF),
200 {
201 self.channels.with_mut_builder(func);
202 }
203
204 #[cfg(test)]
206 pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
207 self.channels.remove_unusable()
208 }
209
210 #[cfg(feature = "relay")]
213 pub(crate) async fn handle_incoming(
214 &self,
215 src: Sensitive<std::net::SocketAddr>,
216 stream: CF::Stream,
217 ) -> Result<Arc<CF::Channel>> {
218 let chan_builder = self.channels.builder();
219 let memquota = ChannelAccount::new(&self.memquota)?;
220 let channel = chan_builder
221 .build_channel_using_incoming(src, stream, memquota)
222 .await?;
223 self.channels.add_open(channel.clone())?;
225 Ok(channel)
226 }
227
228 #[instrument(skip_all, level = "trace")]
238 pub(crate) async fn get_or_launch(
239 &self,
240 target: CF::BuildSpec,
241 usage: ChannelUsage,
242 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
243 use ChannelUsage as CU;
244
245 let chan = self.get_or_launch_internal(target).await?;
246
247 match usage {
248 CU::Dir | CU::UselessCircuit => {}
249 CU::UserTraffic => chan.0.engage_padding_activities(),
250 }
251
252 Ok(chan)
253 }
254
255 #[allow(clippy::cognitive_complexity)]
257 #[instrument(skip_all, level = "trace")]
258 async fn get_or_launch_internal(
259 &self,
260 target: CF::BuildSpec,
261 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
262 const N_ATTEMPTS: usize = 2;
264 let mut attempts_so_far = 0;
265 let mut final_attempt = false;
266 let mut provenance = ChanProvenance::Preexisting;
267
268 let mut last_err = None;
270
271 while attempts_so_far < N_ATTEMPTS || final_attempt {
272 attempts_so_far += 1;
273
274 let action = self.choose_action(&target, final_attempt)?;
279
280 match action {
283 None => {
286 if !final_attempt {
287 return Err(Error::Internal(internal!(
288 "No action returned while not on final attempt"
289 )));
290 }
291 break;
292 }
293 Some(Action::Return(v)) => {
295 trace!("Returning existing channel");
296 return v.map(|chan| (chan, provenance));
297 }
298 Some(Action::Wait(pend)) => {
300 trace!("Waiting for in-progress channel");
301 match pend.await {
302 Ok(Ok(())) => {
303 final_attempt = true;
309 provenance = ChanProvenance::NewlyCreated;
310 last_err.get_or_insert(Error::RequestCancelled);
311 }
312 Ok(Err(e)) => {
313 last_err = Some(e);
314 }
315 Err(_) => {
316 last_err =
317 Some(Error::Internal(internal!("channel build task disappeared")));
318 }
319 }
320 }
321 Some(Action::Launch((handle, send))) => {
323 trace!("Launching channel");
324 let defer_remove_pending = Defer::new(handle, |handle| {
334 if let Err(e) = self.channels.remove_pending_channel(handle) {
335 #[allow(clippy::missing_docs_in_private_items)]
340 const MSG: &str = "Unable to remove the pending channel";
341 error_report!(internal!("{e}"), "{}", MSG);
342 }
343 });
344
345 let connector = self.channels.builder();
346 let memquota = ChannelAccount::new(&self.memquota)?;
347
348 let outcome = connector
349 .build_channel(&target, self.reporter.clone(), memquota)
350 .await;
351
352 match outcome {
353 Ok(ref chan) => {
354 let handle = defer_remove_pending.cancel();
356 self.channels
357 .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
358 }
359 Err(_) => {
360 drop(defer_remove_pending);
362 }
363 }
364
365 let _ignore_err = send.send(outcome.clone().map(|_| ()));
368
369 match outcome {
370 Ok(chan) => {
371 return Ok((chan, ChanProvenance::NewlyCreated));
372 }
373 Err(e) => last_err = Some(e),
374 }
375 }
376 }
377
378 }
380
381 Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
382 }
383
384 #[instrument(skip_all, level = "trace")]
393 fn choose_action(
394 &self,
395 target: &CF::BuildSpec,
396 final_attempt: bool,
397 ) -> Result<Option<Action<CF::Channel>>> {
398 let response = self.channels.request_channel(
400 target,
401 !final_attempt,
402 );
403
404 match response {
405 Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
406 Ok(Some(ChannelForTarget::Pending(pending))) => {
407 if !final_attempt {
408 Ok(Some(Action::Wait(pending)))
409 } else {
410 Ok(None)
412 }
413 }
414 Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
415 Ok(Some(Action::Launch((handle, send))))
417 }
418 Ok(None) => Ok(None),
419 Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
420 Err(e) => Err(e),
421 }
422 }
423
424 pub(crate) fn update_netparams(
426 &self,
427 netparams: Arc<dyn AsRef<NetParameters>>,
428 ) -> StdResult<(), tor_error::Bug> {
429 self.channels.reconfigure_general(None, None, netparams)
430 }
431
432 pub(crate) fn set_dormancy(
434 &self,
435 dormancy: Dormancy,
436 netparams: Arc<dyn AsRef<NetParameters>>,
437 ) -> StdResult<(), tor_error::Bug> {
438 self.channels
439 .reconfigure_general(None, Some(dormancy), netparams)
440 }
441
442 pub(crate) fn reconfigure(
444 &self,
445 config: &ChannelConfig,
446 netparams: Arc<dyn AsRef<NetParameters>>,
447 ) -> StdResult<(), tor_error::Bug> {
448 self.channels
449 .reconfigure_general(Some(config), None, netparams)
450 }
451
452 pub(crate) fn expire_channels(&self) -> Duration {
461 self.channels.expire_channels()
462 }
463
464 #[cfg(test)]
466 pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
467 where
468 T: Into<tor_linkspec::RelayIdRef<'a>>,
469 {
470 use state::ChannelState::*;
471 self.channels
472 .with_channels(|channel_map| {
473 channel_map
474 .by_id(ident)
475 .filter_map(|entry| match entry {
476 Open(ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
477 _ => None,
478 })
479 .collect()
480 })
481 .expect("Poisoned lock")
482 }
483}
484
485#[allow(clippy::large_enum_variant)]
487enum Action<C: AbstractChannel> {
488 Launch((PendingChannelHandle, Sending)),
491 Wait(Pending),
494 Return(Result<Arc<C>>),
496}
497
498#[cfg(test)]
499mod test {
500 #![allow(clippy::bool_assert_comparison)]
502 #![allow(clippy::clone_on_copy)]
503 #![allow(clippy::dbg_macro)]
504 #![allow(clippy::mixed_attributes_style)]
505 #![allow(clippy::print_stderr)]
506 #![allow(clippy::print_stdout)]
507 #![allow(clippy::single_char_pattern)]
508 #![allow(clippy::unwrap_used)]
509 #![allow(clippy::unchecked_time_subtraction)]
510 #![allow(clippy::useless_vec)]
511 #![allow(clippy::needless_pass_by_value)]
512 use super::*;
514 use crate::Error;
515
516 use futures::join;
517 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
518 use std::sync::Arc;
519 use std::sync::atomic::{AtomicBool, Ordering};
520 use std::time::Duration;
521 use tor_error::bad_api_usage;
522 use tor_linkspec::ChannelMethod;
523 use tor_llcrypto::pk::ed25519::Ed25519Identity;
524 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
525
526 use crate::ChannelUsage as CU;
527 use tor_rtcompat::{Runtime, task::yield_now, test_with_one_runtime};
528
529 const ADDR_A: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 443));
531 const ADDR_B: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 443));
532
533 #[derive(Clone)]
534 struct FakeChannelFactory<RT> {
535 runtime: RT,
536 }
537
538 #[derive(Clone, Debug)]
539 struct FakeChannel {
540 ed_ident: Ed25519Identity,
541 mood: char,
542 closing: Arc<AtomicBool>,
543 detect_reuse: Arc<char>,
544 }
546
547 impl PartialEq for FakeChannel {
548 fn eq(&self, other: &Self) -> bool {
549 Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
550 }
551 }
552
553 impl AbstractChannel for FakeChannel {
554 fn is_canonical(&self) -> bool {
555 unimplemented!()
556 }
557 fn is_canonical_to_peer(&self) -> bool {
558 unimplemented!()
559 }
560 fn is_usable(&self) -> bool {
561 !self.closing.load(Ordering::SeqCst)
562 }
563 fn duration_unused(&self) -> Option<Duration> {
564 None
565 }
566 fn reparameterize(
567 &self,
568 _updates: Arc<ChannelPaddingInstructionsUpdates>,
569 ) -> tor_proto::Result<()> {
570 Ok(())
572 }
573 fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
574 Ok(())
575 }
576 fn engage_padding_activities(&self) {}
577 }
578
579 impl HasRelayIds for FakeChannel {
580 fn identity(
581 &self,
582 key_type: tor_linkspec::RelayIdType,
583 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
584 match key_type {
585 tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
586 _ => None,
587 }
588 }
589 }
590
591 impl FakeChannel {
592 fn start_closing(&self) {
593 self.closing.store(true, Ordering::SeqCst);
594 }
595 }
596
597 impl<RT: Runtime> FakeChannelFactory<RT> {
598 fn new(runtime: RT) -> Self {
599 FakeChannelFactory { runtime }
600 }
601 }
602
603 fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
604 let cf = FakeChannelFactory::new(runtime);
605 AbstractChanMgr::new(
606 cf,
607 Default::default(),
608 Default::default(),
609 &Default::default(),
610 BootstrapReporter::fake(),
611 ToplevelAccount::new_noop(),
612 )
613 }
614
615 #[derive(Clone, Debug)]
616 struct FakeBuildSpec(u32, char, Ed25519Identity, SocketAddr);
617
618 impl HasRelayIds for FakeBuildSpec {
619 fn identity(
620 &self,
621 key_type: tor_linkspec::RelayIdType,
622 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
623 match key_type {
624 tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
625 _ => None,
626 }
627 }
628 }
629
630 impl HasChanMethod for FakeBuildSpec {
631 fn chan_method(&self) -> ChannelMethod {
632 ChannelMethod::Direct(vec![self.3.clone()])
633 }
634 }
635
636 fn u32_to_ed(n: u32) -> Ed25519Identity {
638 let mut bytes = [0; 32];
639 bytes[0..4].copy_from_slice(&n.to_be_bytes());
640 bytes.into()
641 }
642
643 #[async_trait]
644 impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
645 type Channel = FakeChannel;
646 type BuildSpec = FakeBuildSpec;
647 type Stream = ();
648
649 async fn build_channel(
650 &self,
651 target: &Self::BuildSpec,
652 _reporter: BootstrapReporter,
653 _memquota: ChannelAccount,
654 ) -> Result<Arc<FakeChannel>> {
655 yield_now().await;
656 let FakeBuildSpec(ident, mood, id, _addr) = *target;
657 let ed_ident = u32_to_ed(ident);
658 assert_eq!(ed_ident, id);
659 match mood {
660 '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
662 '💤' => {
664 self.runtime.sleep(Duration::new(15, 0)).await;
665 }
666 _ => {}
667 }
668 Ok(Arc::new(FakeChannel {
669 ed_ident,
670 mood,
671 closing: Arc::new(AtomicBool::new(false)),
672 detect_reuse: Default::default(),
673 }))
675 }
676
677 #[cfg(feature = "relay")]
678 async fn build_channel_using_incoming(
679 &self,
680 _peer: Sensitive<std::net::SocketAddr>,
681 _stream: Self::Stream,
682 _memquota: ChannelAccount,
683 ) -> Result<Arc<Self::Channel>> {
684 unimplemented!()
685 }
686 }
687
688 #[test]
689 fn connect_one_ok() {
690 test_with_one_runtime!(|runtime| async {
691 let mgr = new_test_abstract_chanmgr(runtime);
692 let target = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
693 let chan1 = mgr
694 .get_or_launch(target.clone(), CU::UserTraffic)
695 .await
696 .unwrap()
697 .0;
698 let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
699
700 assert_eq!(chan1, chan2);
701 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
702 });
703 }
704
705 #[test]
706 fn connect_one_fail() {
707 test_with_one_runtime!(|runtime| async {
708 let mgr = new_test_abstract_chanmgr(runtime);
709
710 let target = FakeBuildSpec(999, '❌', u32_to_ed(999), ADDR_A);
712 let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
713 assert!(matches!(res1, Err(Error::UnusableTarget(_))));
714
715 assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
716 });
717 }
718
719 #[test]
720 fn connect_different_address() {
721 test_with_one_runtime!(|runtime| async {
722 let mgr = new_test_abstract_chanmgr(runtime);
723
724 let target1 = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
726 let mut target2 = target1.clone();
727 target2.3 = ADDR_B;
728
729 let chan1 = mgr.get_or_launch(target1, CU::UserTraffic).await.unwrap().0;
730 let chan2 = mgr.get_or_launch(target2, CU::UserTraffic).await.unwrap().0;
731
732 assert_eq!(chan1, chan2);
734 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
735 });
736 }
737
738 #[test]
739 fn test_concurrent() {
740 test_with_one_runtime!(|runtime| async {
741 let mgr = new_test_abstract_chanmgr(runtime);
742
743 let usage = CU::UserTraffic;
744
745 let (ch3a, ch3b, ch44a, ch44b, ch50a, ch50b, ch86a, ch86b) = join!(
749 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), usage),
750 mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), usage),
751 mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44), ADDR_A), usage),
752 mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44), ADDR_A), usage),
753 mgr.get_or_launch(FakeBuildSpec(50, 'a', u32_to_ed(50), ADDR_A), usage),
754 mgr.get_or_launch(FakeBuildSpec(50, 'b', u32_to_ed(50), ADDR_B), usage),
755 mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86), ADDR_A), usage),
756 mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86), ADDR_A), usage),
757 );
758 let ch3a = ch3a.unwrap();
759 let ch3b = ch3b.unwrap();
760 let ch44a = ch44a.unwrap();
761 let ch44b = ch44b.unwrap();
762 let ch50a = ch50a.unwrap();
763 let ch50b = ch50b.unwrap();
764 let err_a = ch86a.unwrap_err();
765 let err_b = ch86b.unwrap_err();
766
767 assert_eq!(ch3a, ch3b);
768 assert_eq!(ch44a, ch44b);
769 assert_eq!(ch50a, ch50b);
770 assert_ne!(ch44a, ch3a);
771
772 assert!(matches!(err_a, Error::UnusableTarget(_)));
773 assert!(matches!(err_b, Error::UnusableTarget(_)));
774 });
775 }
776
777 #[test]
778 fn unusable_entries() {
779 test_with_one_runtime!(|runtime| async {
780 let mgr = new_test_abstract_chanmgr(runtime);
781
782 let (ch3, ch4, ch5) = join!(
783 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), CU::UserTraffic),
784 mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4), ADDR_A), CU::UserTraffic),
785 mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5), ADDR_A), CU::UserTraffic),
786 );
787
788 let ch3 = ch3.unwrap().0;
789 let _ch4 = ch4.unwrap();
790 let ch5 = ch5.unwrap().0;
791
792 ch3.start_closing();
793 ch5.start_closing();
794
795 let ch3_new = mgr
796 .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), CU::UserTraffic)
797 .await
798 .unwrap()
799 .0;
800 assert_ne!(ch3, ch3_new);
801 assert_eq!(ch3_new.mood, 'b');
802
803 mgr.remove_unusable_entries().unwrap();
804
805 assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
806 assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
807 assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
808 });
809 }
810}