1use async_trait::async_trait;
4use futures::{AsyncRead, AsyncWrite};
5use std::io;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8use tracing::instrument;
9
10use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
11use crate::transport::TransportImplHelper;
12use crate::{Error, event::ChanMgrEventSender};
13
14use safelog::MaybeSensitive;
15use tor_basic_utils::rand_hostname;
16use tor_error::internal;
17use tor_linkspec::{ChanTarget, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
18use tor_proto::channel::ChannelType;
19use tor_proto::channel::kist::KistParams;
20use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
21use tor_proto::memquota::ChannelAccount;
22use tor_proto::peer::PeerAddr;
23use tor_rtcompat::SpawnExt;
24use tor_rtcompat::{CertifiedConn, Runtime, StreamOps, TlsProvider, tls::TlsConnector};
25
26#[cfg(feature = "relay")]
27use {
28 safelog::Sensitive, std::net::SocketAddr, tor_error::bad_api_usage,
29 tor_proto::RelayChannelAuthMaterial, tor_proto::relay::CreateRequestHandler,
30};
31
32pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
45where
46 R: tor_rtcompat::TlsProvider<H::Stream>,
47{
48 runtime: R,
50 transport: H,
52 tls_connector: <R as TlsProvider<H::Stream>>::Connector,
54 #[cfg(feature = "relay")]
56 tls_acceptor: Option<<R as TlsProvider<H::Stream>>::Acceptor>,
57 #[cfg(feature = "relay")]
59 auth_material: Option<Arc<RelayChannelAuthMaterial>>,
60 #[cfg(feature = "relay")]
64 my_addrs: Vec<SocketAddr>,
65 #[cfg(feature = "relay")]
67 create_request_handler: Option<Arc<CreateRequestHandler>>,
68}
69
70impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
71where
72 R: TlsProvider<H::Stream>,
73{
74 pub fn new_client(runtime: R, transport: H) -> Self {
76 let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
77 ChanBuilder {
78 runtime,
79 transport,
80 tls_connector,
81 #[cfg(feature = "relay")]
82 tls_acceptor: None,
83 #[cfg(feature = "relay")]
84 auth_material: None,
85 #[cfg(feature = "relay")]
86 my_addrs: Vec::new(),
87 #[cfg(feature = "relay")]
88 create_request_handler: None,
89 }
90 }
91
92 #[cfg(feature = "relay")]
94 pub fn new_relay(
95 runtime: R,
96 transport: H,
97 auth_material: Arc<RelayChannelAuthMaterial>,
98 my_addrs: Vec<SocketAddr>,
99 create_request_handler: Option<Arc<CreateRequestHandler>>,
100 ) -> crate::Result<Self> {
101 use tor_error::into_internal;
102 use tor_rtcompat::tls::TlsAcceptorSettings;
103
104 let tls_settings = TlsAcceptorSettings::new(auth_material.tls_key_and_cert().clone())
106 .map_err(into_internal!("Unable to build TLS acceptor setting"))?;
107 let tls_acceptor = <R as TlsProvider<H::Stream>>::tls_acceptor(&runtime, tls_settings)
108 .map_err(into_internal!("Unable to build TLS acceptor"))?;
109
110 let mut builder = Self::new_client(runtime, transport);
112 builder.auth_material = Some(auth_material);
113 builder.tls_acceptor = Some(tls_acceptor);
114 builder.my_addrs = my_addrs;
115 builder.create_request_handler = create_request_handler;
116
117 Ok(builder)
118 }
119
120 #[cfg(feature = "relay")]
125 pub fn rebuild_with_auth_material(
126 &self,
127 auth_material: Arc<RelayChannelAuthMaterial>,
128 ) -> crate::Result<Self>
129 where
130 H: Clone,
131 {
132 Self::new_relay(
133 self.runtime.clone(),
134 self.transport.clone(),
135 auth_material,
136 self.my_addrs.clone(),
137 self.create_request_handler.as_ref().map(Arc::clone),
138 )
139 }
140
141 #[cfg(feature = "relay")]
147 pub fn rebuild_with_create_request_handler(
148 &self,
149 handler: Arc<CreateRequestHandler>,
150 ) -> crate::Result<Self>
151 where
152 H: Clone,
153 {
154 let auth_material = self.auth_material.clone().ok_or_else(|| {
155 internal!("Trying to set a CREATE* request handler for a non-relay channel builder")
156 })?;
157
158 Self::new_relay(
159 self.runtime.clone(),
160 self.transport.clone(),
161 auth_material,
162 self.my_addrs.clone(),
163 Some(handler),
164 )
165 }
166
167 fn outbound_chan_type(&self) -> ChannelType {
175 #[cfg(feature = "relay")]
176 if self.auth_material.is_some() {
177 return ChannelType::RelayInitiator;
178 }
179 ChannelType::ClientInitiator
180 }
181}
182
183#[async_trait]
184impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
185where
186 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
187 H: Send + Sync,
188{
189 #[instrument(skip_all, level = "trace")]
190 async fn connect_via_transport(
191 &self,
192 target: &OwnedChanTarget,
193 reporter: BootstrapReporter,
194 memquota: ChannelAccount,
195 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
196 use tor_rtcompat::SleepProviderExt;
197
198 let delay = if target.chan_method().is_direct() {
200 std::time::Duration::new(5, 0)
201 } else {
202 std::time::Duration::new(10, 0)
203 };
204
205 self.runtime
206 .timeout(delay, self.connect_no_timeout(target, reporter.0, memquota))
207 .await
208 .map_err(|_| Error::ChanTimeout {
209 peer: target.to_logged(),
210 })?
211 }
212}
213
214#[async_trait]
215impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
216where
217 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
218 H: Send + Sync,
219{
220 type Stream = H::Stream;
221
222 #[cfg(feature = "relay")]
223 async fn accept_from_transport(
224 &self,
225 peer: Sensitive<std::net::SocketAddr>,
226 stream: Self::Stream,
227 memquota: ChannelAccount,
228 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
229 use tor_linkspec::OwnedChanTargetBuilder;
230 use tor_proto::relay::MaybeVerifiableRelayResponderChannel;
231
232 let target_no_ids = OwnedChanTargetBuilder::default()
237 .addrs(vec![peer.into_inner()])
238 .build()
239 .map_err(|e| internal!("Unable to build chan target from peer sockaddr: {e}"))?;
240 let peer_addr: MaybeSensitive<PeerAddr> =
242 MaybeSensitive::sensitive(peer.into_inner().into());
243
244 let map_ioe = |ioe, action| Error::Io {
246 action,
247 peer: peer_addr.clone(),
248 source: ioe,
249 };
250 let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
251 source,
252 peer: target.to_logged(),
253 clock_skew,
254 };
255
256 let tls = self
257 .tls_acceptor
258 .as_ref()
259 .ok_or(internal!("Accepting connection without TLS acceptor"))?
260 .negotiate_unvalidated(stream, "ignored")
261 .await
262 .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
263 let auth_material = self
264 .auth_material
265 .as_ref()
266 .ok_or(internal!(
267 "Unable to build relay channel without auth material"
268 ))?
269 .clone();
270
271 let our_cert = tls
272 .own_certificate()
273 .map_err(|e| map_ioe(e.into(), "TLS Certs"))?
274 .ok_or_else(|| Error::Internal(internal!("TLS connection without our certificate")))?
275 .into_owned();
276
277 let create_request_handler = self.create_request_handler.as_ref().ok_or_else(|| {
278 bad_api_usage!("Can't create a relay channel without a CREATE* request handler")
279 })?;
280
281 let builder = tor_proto::RelayChannelBuilder::new();
282
283 let unverified = builder
284 .accept(
285 Sensitive::new(peer_addr.inner()),
286 self.my_addrs.clone(),
287 tls,
288 self.runtime.clone(),
289 auth_material,
290 memquota,
291 Arc::clone(create_request_handler),
292 )
293 .handshake(|| self.runtime.wallclock())
294 .await
295 .map_err(|e| map_proto(e, &target_no_ids, None))?;
296
297 let (chan, reactor) = match unverified {
298 MaybeVerifiableRelayResponderChannel::Verifiable(c) => {
299 let clock_skew = c.clock_skew();
300 let now = self.runtime.wallclock();
301 c.verify(&target_no_ids, &our_cert, Some(now))
302 .map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
303 .finish()
304 .await
305 .map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
306 }
307 MaybeVerifiableRelayResponderChannel::NonVerifiable(c) => {
308 c.finish().map_err(|e| map_proto(e, &target_no_ids, None))?
309 }
310 };
311
312 self.runtime
314 .spawn(async {
315 let _ = reactor.run().await;
316 })
317 .map_err(|e| Error::from_spawn("responder channel reactor", e))?;
318
319 Ok(chan)
320 }
321}
322
323impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
324where
325 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
326 H: Send + Sync,
327{
328 #[instrument(skip_all, level = "trace")]
332 async fn connect_no_timeout(
333 &self,
334 target: &OwnedChanTarget,
335 event_sender: Arc<Mutex<ChanMgrEventSender>>,
336 memquota: ChannelAccount,
337 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
338 use tor_rtcompat::tls::CertifiedConn;
339
340 {
341 event_sender.lock().expect("Lock poisoned").record_attempt();
342 }
343
344 self.validate_target(target)?;
347
348 let (peer_addr, stream) = self.transport.connect(target).await?;
352 let peer_addr = match self.outbound_chan_type() {
355 ChannelType::ClientInitiator => MaybeSensitive::sensitive(peer_addr),
356 ChannelType::RelayInitiator => MaybeSensitive::not_sensitive(peer_addr),
357 _ => return Err(Error::Internal(internal!("Unknown outbound channel type"))),
358 };
359
360 let map_ioe = |action: &'static str| {
361 let peer = peer_addr.clone();
362 move |ioe: io::Error| Error::Io {
363 action,
364 peer,
365 source: ioe.into(),
366 }
367 };
368
369 {
370 event_sender
374 .lock()
375 .expect("Lock poisoned")
376 .record_tcp_success();
377 }
378
379 let hostname = rand_hostname::random_hostname(&mut rand::rng());
382
383 let tls = self
384 .tls_connector
385 .negotiate_unvalidated(stream, hostname.as_str())
386 .await
387 .map_err(map_ioe("TLS negotiation"))?;
388
389 let peer_tls_cert = tls
390 .peer_certificate()
391 .map_err(map_ioe("TLS certs"))?
392 .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?
393 .into_owned();
397
398 {
399 event_sender
400 .lock()
401 .expect("Lock poisoned")
402 .record_tls_finished();
403 }
404
405 let outbound_chan_type = self.outbound_chan_type();
407 let chan = match outbound_chan_type {
408 ChannelType::ClientInitiator => {
409 self.build_client_channel(
410 tls,
411 peer_addr,
412 target,
413 &peer_tls_cert,
414 memquota,
415 event_sender.clone(),
416 )
417 .await?
418 }
419 #[cfg(feature = "relay")]
420 ChannelType::RelayInitiator => {
421 self.build_relay_channel(
422 tls,
423 peer_addr,
424 target,
425 &peer_tls_cert,
426 memquota,
427 event_sender.clone(),
428 )
429 .await?
430 }
431 _ => {
432 return Err(Error::Internal(internal!(
433 "Unusable channel type for outbound: {outbound_chan_type}",
434 )));
435 }
436 };
437
438 event_sender
439 .lock()
440 .expect("Lock poisoned")
441 .record_handshake_done();
442
443 Ok(chan)
444 }
445
446 #[cfg(feature = "relay")]
448 fn validate_target_as_relay<CT>(&self, target: &CT) -> crate::Result<()>
449 where
450 CT: ChanTarget,
451 {
452 use tor_linkspec::HasRelayIds;
453
454 if !target.chan_method().is_direct() {
456 return Err(Error::UnusableTarget(tor_error::bad_api_usage!(
459 "Relays don't support outbound PT channels"
460 )));
461 }
462
463 if !target.all_addrs_allowed_for_outgoing_channels() {
469 return Err(Error::Proto {
470 source: tor_proto::Error::ChanProto(
471 "Target address is not allowed for outgoing channels".into(),
472 ),
473 peer: target.to_owned().into(),
474 clock_skew: None,
475 });
476 }
477
478 let Some(auth_material) = &self.auth_material else {
481 return Err(Error::Internal(tor_error::bad_api_usage!(
482 "Relay initiating a channel without key auth material"
483 )));
484 };
485
486 if auth_material.has_any_relay_id_from(target) {
488 Err(Error::Proto {
489 source: tor_proto::Error::ChanProto("Refusing to build channel to ourself".into()),
490 peer: target.to_owned().into(),
491 clock_skew: None,
492 })
493 } else {
494 Ok(())
495 }
496 }
497
498 fn validate_target<CT>(&self, target: &CT) -> crate::Result<()>
502 where
503 CT: ChanTarget,
504 {
505 if !target.has_all_nonzero_port() {
507 return Err(Error::Proto {
508 source: tor_proto::Error::ChanProto("Target address port is invalid".into()),
509 peer: target.to_owned().into(),
510 clock_skew: None,
511 });
512 }
513
514 #[cfg(feature = "relay")]
518 for addr in target.addrs() {
519 if self.my_addrs.contains(&addr) {
520 return Err(Error::Proto {
521 source: tor_proto::Error::ChanProto("Target address is ours".into()),
522 peer: target.to_owned().into(),
523 clock_skew: None,
524 });
525 }
526 }
527
528 let chan_type = self.outbound_chan_type();
529 match chan_type {
530 ChannelType::ClientInitiator => Ok(()),
532 #[cfg(feature = "relay")]
534 ChannelType::RelayInitiator => self.validate_target_as_relay(target),
535 _ => Err(Error::UnusableTarget(tor_error::bad_api_usage!(
537 "Channel type can't be used as a target: {chan_type}"
538 ))),
539 }
540 }
541
542 async fn build_client_channel<T>(
546 &self,
547 tls: T,
548 peer_addr: MaybeSensitive<PeerAddr>,
549 target: &OwnedChanTarget,
550 peer_tls_cert: &[u8],
551 memquota: ChannelAccount,
552 event_sender: Arc<Mutex<ChanMgrEventSender>>,
553 ) -> crate::Result<Arc<tor_proto::channel::Channel>>
554 where
555 T: AsyncRead + AsyncWrite + CertifiedConn + StreamOps + Send + Unpin + 'static,
556 {
557 let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
563 source,
564 peer: target.to_logged(),
565 clock_skew,
566 };
567
568 let now = self.runtime.wallclock();
569
570 let mut builder = tor_proto::ClientChannelBuilder::new();
572 builder.set_declared_method(target.chan_method());
573
574 let unverified = builder
575 .launch(
576 tls,
577 self.runtime.clone(), memquota,
579 )
580 .connect(|| self.runtime.wallclock())
581 .await
582 .map_err(|e| Error::from_proto_no_skew(e, target))?;
583
584 let clock_skew = unverified.clock_skew();
585 let (chan, reactor) = unverified
586 .verify(target, peer_tls_cert, Some(now))
587 .map_err(|source| match &source {
588 tor_proto::Error::HandshakeCertsExpired { .. } => {
589 event_sender
590 .lock()
591 .expect("Lock poisoned")
592 .record_handshake_done_with_skewed_clock();
593 map_proto(source, target, Some(clock_skew))
594 }
595 _ => Error::from_proto_no_skew(source, target),
596 })?
597 .finish(peer_addr)
598 .await
599 .map_err(|e| map_proto(e, target, Some(clock_skew)))?;
600
601 self.runtime
603 .spawn(async {
604 let _ = reactor.run().await;
605 })
606 .map_err(|e| Error::from_spawn("client channel reactor", e))?;
607 Ok(chan)
608 }
609
610 #[cfg(feature = "relay")]
614 async fn build_relay_channel<T>(
615 &self,
616 tls: T,
617 peer_addr: MaybeSensitive<PeerAddr>,
618 target: &OwnedChanTarget,
619 peer_tls_cert: &[u8],
620 memquota: ChannelAccount,
621 event_sender: Arc<Mutex<ChanMgrEventSender>>,
622 ) -> crate::Result<Arc<tor_proto::channel::Channel>>
623 where
624 T: AsyncRead + AsyncWrite + CertifiedConn + StreamOps + Send + Unpin + 'static,
625 {
626 let builder = tor_proto::RelayChannelBuilder::new();
627 let auth_material = self
628 .auth_material
629 .as_ref()
630 .ok_or(internal!(
631 "Unable to build relay channel without auth material"
632 ))?
633 .clone();
634
635 let create_request_handler = self.create_request_handler.as_ref().ok_or_else(|| {
636 bad_api_usage!("Can't create a relay channel without a CREATE* request handler")
637 })?;
638
639 let unverified = builder
640 .launch(
641 tls,
642 self.runtime.clone(), auth_material,
644 self.my_addrs.clone(),
645 target,
646 memquota,
647 Arc::clone(create_request_handler),
648 )
649 .connect(|| self.runtime.wallclock())
650 .await
651 .map_err(|e| Error::from_proto_no_skew(e, target))?;
652
653 let now = self.runtime.wallclock();
654 let clock_skew = unverified.clock_skew();
655 let (chan, reactor) = unverified
656 .verify(target, peer_tls_cert, Some(now))
657 .map_err(|source| match &source {
658 tor_proto::Error::HandshakeCertsExpired { .. } => {
659 event_sender
660 .lock()
661 .expect("Lock poisoned")
662 .record_handshake_done_with_skewed_clock();
663 Error::Proto {
664 source,
665 peer: target.to_logged(),
666 clock_skew: Some(clock_skew),
667 }
668 }
669 _ => Error::from_proto_no_skew(source, target),
670 })?
671 .finish(peer_addr.inner())
673 .await
674 .map_err(|source| Error::Proto {
675 source,
676 peer: target.to_logged(),
677 clock_skew: Some(clock_skew),
678 })?;
679
680 self.runtime
682 .spawn(async {
683 let _ = reactor.run().await;
684 })
685 .map_err(|e| Error::from_spawn("relay channel reactor", e))?;
686
687 Ok(chan)
688 }
689}
690
691impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
692 fn is_canonical(&self) -> bool {
693 self.is_canonical()
694 }
695 fn is_canonical_to_peer(&self) -> bool {
696 self.is_canonical_to_peer()
697 }
698 fn is_usable(&self) -> bool {
699 !self.is_closing()
700 }
701 fn duration_unused(&self) -> Option<Duration> {
702 self.duration_unused()
703 }
704 fn reparameterize(
705 &self,
706 updates: Arc<ChannelPaddingInstructionsUpdates>,
707 ) -> tor_proto::Result<()> {
708 tor_proto::channel::Channel::reparameterize(self, updates)
709 }
710 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
711 tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
712 }
713 fn engage_padding_activities(&self) {
714 tor_proto::channel::Channel::engage_padding_activities(self);
715 }
716}
717
718#[cfg(test)]
719mod test {
720 #![allow(clippy::bool_assert_comparison)]
722 #![allow(clippy::clone_on_copy)]
723 #![allow(clippy::dbg_macro)]
724 #![allow(clippy::mixed_attributes_style)]
725 #![allow(clippy::print_stderr)]
726 #![allow(clippy::print_stdout)]
727 #![allow(clippy::single_char_pattern)]
728 #![allow(clippy::unwrap_used)]
729 #![allow(clippy::unchecked_time_subtraction)]
730 #![allow(clippy::useless_vec)]
731 #![allow(clippy::needless_pass_by_value)]
732 use super::*;
734 use crate::{
735 Result,
736 mgr::{AbstractChannel, AbstractChannelFactory},
737 };
738 use futures::StreamExt as _;
739 use std::net::SocketAddr;
740 use std::time::{Duration, SystemTime};
741 use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
742 use tor_llcrypto::pk::ed25519::Ed25519Identity;
743 use tor_llcrypto::pk::rsa::RsaIdentity;
744 use tor_proto::channel::Channel;
745 use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
746 use tor_rtcompat::{NetStreamListener, test_with_one_runtime};
747 use tor_rtmock::{io::LocalStream, net::MockNetwork};
748
749 #[allow(deprecated)] use tor_rtmock::MockSleepRuntime;
751
752 #[test]
757 fn build_ok() -> Result<()> {
758 use crate::testing::msgs;
759 let orport: SocketAddr = msgs::ADDR.parse().unwrap();
760 let ed: Ed25519Identity = msgs::ED_ID.into();
761 let rsa: RsaIdentity = msgs::RSA_ID.into();
762 let client_addr = "192.0.2.17".parse().unwrap();
763 let tls_cert = msgs::X509_CERT.into();
764 let target = OwnedChanTarget::builder()
765 .addrs(vec![orport])
766 .method(ChannelMethod::Direct(vec![orport]))
767 .ed_identity(ed)
768 .rsa_identity(rsa)
769 .build()
770 .unwrap();
771 let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
772
773 test_with_one_runtime!(|rt| async move {
774 let network = MockNetwork::new();
776
777 let client_rt = network
779 .builder()
780 .add_address(client_addr)
781 .runtime(rt.clone());
782 #[allow(deprecated)] let client_rt = MockSleepRuntime::new(client_rt);
785
786 let relay_rt = network
788 .builder()
789 .add_address(orport.ip())
790 .runtime(rt.clone());
791
792 let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
794
795 client_rt.jump_to(now);
797
798 let transport = crate::transport::DefaultTransport::new(client_rt.clone(), None);
800 let builder = ChanBuilder::new_client(client_rt, transport);
801
802 let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
803 async {
804 builder
806 .build_channel(
807 &target,
808 BootstrapReporter::fake(),
809 ChannelAccount::new_noop(),
810 )
811 .await
812 },
813 async {
814 let (mut con, addr) = lis
817 .incoming()
818 .next()
819 .await
820 .expect("Closed?")
821 .expect("accept failed");
822 assert_eq!(client_addr, addr.ip());
823 crate::testing::answer_channel_req(&mut con)
824 .await
825 .expect("answer failed");
826 Ok(con)
827 }
828 );
829
830 let chan = r1.unwrap();
831 assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
832 assert!(chan.is_usable());
833 let dur_unused = Channel::duration_unused(&chan);
836 let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
837 let dur_unused_3 = Channel::duration_unused(&chan);
838 assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
839 assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
840
841 r2.unwrap();
842 Ok(())
843 })
844 }
845
846 }