Skip to main content

tor_chanmgr/
factory.rs

1//! Traits and code to define different mechanisms for building Channels to
2//! different kinds of targets.
3
4use std::sync::{Arc, Mutex};
5
6use crate::event::ChanMgrEventSender;
7use async_trait::async_trait;
8use tor_error::{HasKind, HasRetryTime, internal};
9use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
10use tor_proto::channel::Channel;
11use tor_proto::memquota::ChannelAccount;
12use tracing::{debug, instrument};
13
14#[cfg(feature = "relay")]
15use safelog::Sensitive;
16
17/// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress.
18///
19/// A future release of this crate might make this type less opaque.
20// FIXME(eta): Do that.
21#[derive(Clone)]
22pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
23
24impl BootstrapReporter {
25    #[cfg(test)]
26    /// Create a useless version of this type to satisfy some test.
27    pub(crate) fn fake() -> Self {
28        let (snd, _rcv) = crate::event::channel();
29        Self(Arc::new(Mutex::new(snd)))
30    }
31}
32
33/// An object that knows how to build `Channels` to `ChanTarget`s.
34///
35/// This trait must be object-safe.
36///
37/// Every [`ChanMgr`](crate::ChanMgr) has a `ChannelFactory` that it uses to
38/// construct all of its channels.
39///
40/// A `ChannelFactory` can be implemented in terms of a
41/// [`TransportImplHelper`](crate::transport::TransportImplHelper), by wrapping it in a
42/// `ChanBuilder`.
43///
44// FIXME(eta): Rectify the below situation.
45/// (In fact, as of the time of writing, this is the *only* way to implement this trait
46/// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter`
47/// is an opaque type.)
48#[async_trait]
49pub trait ChannelFactory: Send + Sync {
50    /// Open an authenticated channel to `target`.
51    ///
52    /// This method does does not necessarily handle retries or timeouts,
53    /// although some of its implementations may.
54    ///
55    /// This method does not necessarily handle every kind of transport. If the
56    /// caller provides a target with an unsupported
57    /// [`TransportId`](tor_linkspec::TransportId), this method should return
58    /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport).
59    async fn connect_via_transport(
60        &self,
61        target: &OwnedChanTarget,
62        reporter: BootstrapReporter,
63        memquota: ChannelAccount,
64    ) -> crate::Result<Arc<Channel>>;
65}
66
67/// Similar to [`ChannelFactory`], but for building channels from incoming streams.
68// This is a separate trait since for some `ChannelFactory`s like the one returned from
69// `tor_ptmgr::PtMgr::factory_for_transport`, it doesn't make sense to deal with incoming streams
70// (all PT connections are outgoing).
71#[async_trait]
72pub trait IncomingChannelFactory: Send + Sync {
73    /// The type of byte stream that's required to build channels for incoming connections.
74    type Stream: Send + Sync + 'static;
75
76    /// Open a channel from `peer` with the given `stream`. The channel may or may not be
77    /// authenticated.
78    #[cfg(feature = "relay")]
79    async fn accept_from_transport(
80        &self,
81        peer: Sensitive<std::net::SocketAddr>,
82        stream: Self::Stream,
83        memquota: ChannelAccount,
84    ) -> crate::Result<Arc<Channel>>;
85}
86
87#[async_trait]
88impl<CF> crate::mgr::AbstractChannelFactory for CF
89where
90    CF: ChannelFactory + IncomingChannelFactory + Sync,
91{
92    type Channel = tor_proto::channel::Channel;
93    type BuildSpec = OwnedChanTarget;
94    type Stream = CF::Stream;
95
96    #[instrument(skip_all, level = "trace")]
97    async fn build_channel(
98        &self,
99        target: &Self::BuildSpec,
100        reporter: BootstrapReporter,
101        memquota: ChannelAccount,
102    ) -> crate::Result<Arc<Self::Channel>> {
103        debug!("Attempting to open a new channel to {target}");
104        self.connect_via_transport(target, reporter, memquota).await
105    }
106
107    #[cfg(feature = "relay")]
108    #[instrument(skip_all, level = "trace")]
109    async fn build_channel_using_incoming(
110        &self,
111        peer: Sensitive<std::net::SocketAddr>,
112        stream: Self::Stream,
113        memquota: ChannelAccount,
114    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
115        debug!("Attempting to open a new channel from {peer}");
116        self.accept_from_transport(peer, stream, memquota).await
117    }
118}
119
120/// The error type returned by a pluggable transport manager.
121pub trait AbstractPtError:
122    std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
123{
124}
125
126/// A pluggable transport manager.
127///
128/// We can't directly reference the `PtMgr` type from `tor-ptmgr`, because of dependency resolution
129/// constraints, so this defines the interface for what one should look like.
130#[async_trait]
131pub trait AbstractPtMgr: Send + Sync {
132    /// Get a `ChannelFactory` for the provided `PtTransportName`.
133    async fn factory_for_transport(
134        &self,
135        transport: &PtTransportName,
136    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
137}
138
139#[async_trait]
140impl<P> AbstractPtMgr for Option<P>
141where
142    P: AbstractPtMgr,
143{
144    async fn factory_for_transport(
145        &self,
146        transport: &PtTransportName,
147    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
148        match self {
149            Some(mgr) => mgr.factory_for_transport(transport).await,
150            None => Ok(None),
151        }
152    }
153}
154
155/// A ChannelFactory built from an optional PtMgr to use for pluggable transports, and a
156/// ChannelFactory to use for everything else.
157pub(crate) struct CompoundFactory<CF> {
158    #[cfg(feature = "pt-client")]
159    /// The PtMgr to use for pluggable transports
160    ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
161    /// The factory to use for everything else
162    default_factory: Arc<CF>,
163}
164
165impl<CF> Clone for CompoundFactory<CF> {
166    fn clone(&self) -> Self {
167        Self {
168            #[cfg(feature = "pt-client")]
169            ptmgr: self.ptmgr.as_ref().map(Arc::clone),
170            default_factory: Arc::clone(&self.default_factory),
171        }
172    }
173}
174
175#[async_trait]
176impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
177    #[instrument(skip_all, level = "trace")]
178    async fn connect_via_transport(
179        &self,
180        target: &OwnedChanTarget,
181        reporter: BootstrapReporter,
182        memquota: ChannelAccount,
183    ) -> crate::Result<Arc<Channel>> {
184        use tor_linkspec::ChannelMethod::*;
185        let factory = match target.chan_method() {
186            Direct(_) => self.default_factory.clone(),
187            #[cfg(feature = "pt-client")]
188            Pluggable(a) => match self.ptmgr.as_ref() {
189                Some(mgr) => mgr
190                    .factory_for_transport(a.transport())
191                    .await
192                    .map_err(crate::Error::Pt)?
193                    .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
194                None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
195            },
196            #[allow(unreachable_patterns)]
197            _ => {
198                return Err(crate::Error::Internal(internal!(
199                    "No support for channel method"
200                )));
201            }
202        };
203
204        factory
205            .connect_via_transport(target, reporter, memquota)
206            .await
207    }
208}
209
210#[async_trait]
211impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
212    type Stream = CF::Stream;
213
214    #[cfg(feature = "relay")]
215    async fn accept_from_transport(
216        &self,
217        peer: Sensitive<std::net::SocketAddr>,
218        stream: Self::Stream,
219        memquota: ChannelAccount,
220    ) -> crate::Result<Arc<Channel>> {
221        self.default_factory
222            .accept_from_transport(peer, stream, memquota)
223            .await
224    }
225}
226
227impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
228    /// Create a new `Factory` that will try to use `ptmgr` to handle pluggable
229    /// transports requests, and `default_factory` to handle everything else.
230    pub(crate) fn new(
231        default_factory: Arc<CF>,
232        #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
233    ) -> Self {
234        Self {
235            default_factory,
236            #[cfg(feature = "pt-client")]
237            ptmgr,
238        }
239    }
240
241    #[cfg(feature = "pt-client")]
242    /// Replace the PtMgr in this object.
243    pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
244        self.ptmgr = Some(ptmgr);
245    }
246
247    /// Return a reference to the default channel factory.
248    #[cfg(feature = "relay")]
249    pub(crate) fn default_factory(&self) -> &CF {
250        &self.default_factory
251    }
252
253    /// Replace the default channel factory.
254    #[cfg(feature = "relay")]
255    pub(crate) fn replace_default_factory(&mut self, factory: Arc<CF>) {
256        self.default_factory = factory;
257    }
258}