1use std::sync::{Arc, Mutex};
5
6use crate::event::ChanMgrEventSender;
7use async_trait::async_trait;
8use tor_error::{HasKind, HasRetryTime, internal};
9use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
10use tor_proto::channel::Channel;
11use tor_proto::memquota::ChannelAccount;
12use tracing::{debug, instrument};
13
14#[cfg(feature = "relay")]
15use safelog::Sensitive;
16
17#[derive(Clone)]
22pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
23
24impl BootstrapReporter {
25 #[cfg(test)]
26 pub(crate) fn fake() -> Self {
28 let (snd, _rcv) = crate::event::channel();
29 Self(Arc::new(Mutex::new(snd)))
30 }
31}
32
33#[async_trait]
49pub trait ChannelFactory: Send + Sync {
50 async fn connect_via_transport(
60 &self,
61 target: &OwnedChanTarget,
62 reporter: BootstrapReporter,
63 memquota: ChannelAccount,
64 ) -> crate::Result<Arc<Channel>>;
65}
66
67#[async_trait]
72pub trait IncomingChannelFactory: Send + Sync {
73 type Stream: Send + Sync + 'static;
75
76 #[cfg(feature = "relay")]
79 async fn accept_from_transport(
80 &self,
81 peer: Sensitive<std::net::SocketAddr>,
82 stream: Self::Stream,
83 memquota: ChannelAccount,
84 ) -> crate::Result<Arc<Channel>>;
85}
86
87#[async_trait]
88impl<CF> crate::mgr::AbstractChannelFactory for CF
89where
90 CF: ChannelFactory + IncomingChannelFactory + Sync,
91{
92 type Channel = tor_proto::channel::Channel;
93 type BuildSpec = OwnedChanTarget;
94 type Stream = CF::Stream;
95
96 #[instrument(skip_all, level = "trace")]
97 async fn build_channel(
98 &self,
99 target: &Self::BuildSpec,
100 reporter: BootstrapReporter,
101 memquota: ChannelAccount,
102 ) -> crate::Result<Arc<Self::Channel>> {
103 debug!("Attempting to open a new channel to {target}");
104 self.connect_via_transport(target, reporter, memquota).await
105 }
106
107 #[cfg(feature = "relay")]
108 #[instrument(skip_all, level = "trace")]
109 async fn build_channel_using_incoming(
110 &self,
111 peer: Sensitive<std::net::SocketAddr>,
112 stream: Self::Stream,
113 memquota: ChannelAccount,
114 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
115 debug!("Attempting to open a new channel from {peer}");
116 self.accept_from_transport(peer, stream, memquota).await
117 }
118}
119
120pub trait AbstractPtError:
122 std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
123{
124}
125
126#[async_trait]
131pub trait AbstractPtMgr: Send + Sync {
132 async fn factory_for_transport(
134 &self,
135 transport: &PtTransportName,
136 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
137}
138
139#[async_trait]
140impl<P> AbstractPtMgr for Option<P>
141where
142 P: AbstractPtMgr,
143{
144 async fn factory_for_transport(
145 &self,
146 transport: &PtTransportName,
147 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
148 match self {
149 Some(mgr) => mgr.factory_for_transport(transport).await,
150 None => Ok(None),
151 }
152 }
153}
154
155pub(crate) struct CompoundFactory<CF> {
158 #[cfg(feature = "pt-client")]
159 ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
161 default_factory: Arc<CF>,
163}
164
165impl<CF> Clone for CompoundFactory<CF> {
166 fn clone(&self) -> Self {
167 Self {
168 #[cfg(feature = "pt-client")]
169 ptmgr: self.ptmgr.as_ref().map(Arc::clone),
170 default_factory: Arc::clone(&self.default_factory),
171 }
172 }
173}
174
175#[async_trait]
176impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
177 #[instrument(skip_all, level = "trace")]
178 async fn connect_via_transport(
179 &self,
180 target: &OwnedChanTarget,
181 reporter: BootstrapReporter,
182 memquota: ChannelAccount,
183 ) -> crate::Result<Arc<Channel>> {
184 use tor_linkspec::ChannelMethod::*;
185 let factory = match target.chan_method() {
186 Direct(_) => self.default_factory.clone(),
187 #[cfg(feature = "pt-client")]
188 Pluggable(a) => match self.ptmgr.as_ref() {
189 Some(mgr) => mgr
190 .factory_for_transport(a.transport())
191 .await
192 .map_err(crate::Error::Pt)?
193 .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
194 None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
195 },
196 #[allow(unreachable_patterns)]
197 _ => {
198 return Err(crate::Error::Internal(internal!(
199 "No support for channel method"
200 )));
201 }
202 };
203
204 factory
205 .connect_via_transport(target, reporter, memquota)
206 .await
207 }
208}
209
210#[async_trait]
211impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
212 type Stream = CF::Stream;
213
214 #[cfg(feature = "relay")]
215 async fn accept_from_transport(
216 &self,
217 peer: Sensitive<std::net::SocketAddr>,
218 stream: Self::Stream,
219 memquota: ChannelAccount,
220 ) -> crate::Result<Arc<Channel>> {
221 self.default_factory
222 .accept_from_transport(peer, stream, memquota)
223 .await
224 }
225}
226
227impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
228 pub(crate) fn new(
231 default_factory: Arc<CF>,
232 #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
233 ) -> Self {
234 Self {
235 default_factory,
236 #[cfg(feature = "pt-client")]
237 ptmgr,
238 }
239 }
240
241 #[cfg(feature = "pt-client")]
242 pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
244 self.ptmgr = Some(ptmgr);
245 }
246
247 #[cfg(feature = "relay")]
249 pub(crate) fn default_factory(&self) -> &CF {
250 &self.default_factory
251 }
252
253 #[cfg(feature = "relay")]
255 pub(crate) fn replace_default_factory(&mut self, factory: Arc<CF>) {
256 self.default_factory = factory;
257 }
258}