Skip to main content

tor_proto/circuit/reactor/
hop_mgr.rs

1//! Channel for sending messages to [`StreamReactor`].
2
3use crate::circuit::UniqId;
4use crate::circuit::circhop::{CircHopOutbound, HopSettings};
5use crate::circuit::reactor::circhop::CircHopList;
6use crate::circuit::reactor::stream::{ReadyStreamMsg, StreamHandler, StreamMsg, StreamReactor};
7use crate::congestion::CongestionControl;
8use crate::memquota::CircuitAccount;
9use crate::util::err::ReactorError;
10use crate::{Error, HopNum, Result};
11
12#[cfg(any(feature = "hs-service", feature = "relay"))]
13use crate::stream::incoming::IncomingStreamRequestHandler;
14
15use tor_error::internal;
16use tor_rtcompat::Runtime;
17
18use futures::SinkExt;
19use futures::channel::mpsc;
20
21use std::result::Result as StdResult;
22use std::sync::{Arc, Mutex, RwLock};
23
24/// The hop manager of a reactor.
25///
26/// This contains the per-hop state (e.g. congestion control information),
27/// and a handle to the stream reactor of the hop.
28///
29/// The stream reactor of the hop is launched lazily,
30/// when the first [`StreamMsg`] is sent via [`HopMgr::send`].
31pub(crate) struct HopMgr<R: Runtime> {
32    /// A handle to the runtime.
33    runtime: R,
34    /// Context used when spawning a stream reactor.
35    ctx: StreamReactorContext,
36    /// Sender for sending messages to BWD.
37    ///
38    /// The receiver is in BWD.
39    ///
40    /// A clone of this is passed to each spawned StreamReactor
41    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
42    /// The underlying senders, indexed by [`HopNum`].
43    ///
44    /// Relays have at most one stream reactor per circuit.
45    /// Clients have at most one stream reactor per circuit hop.
46    ///
47    /// This is shared with the backward reactor.
48    /// The backward reactor only ever *reads* from this
49    /// (it never mutates the list).
50    ///
51    // TODO: the backward reactor only ever reads from this.
52    // Conceptually, it is the HopMgr that owns this list,
53    // because only HopMgr can add hops to the list.
54    //
55    // Perhaps we need a specialized abstraction that only allows reading here.
56    // This could be a wrapper over RwLock, providing a read-only API for the BWD.
57    hops: Arc<RwLock<CircHopList>>,
58    /// Memory quota account
59    memquota: CircuitAccount,
60}
61
62/// State needed to build a stream reactor.
63///
64/// Used when spawning the stream reactor of a hop.
65struct StreamReactorContext {
66    /// An identifier for logging about this reactor's circuit.
67    unique_id: UniqId,
68    /// The incoming stream handler.
69    ///
70    /// This is shared with every StreamReactor.
71    #[cfg(any(feature = "hs-service", feature = "relay"))]
72    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
73    /// A handler for customizing the stream reactor behavior.
74    handler: Arc<dyn StreamHandler>,
75}
76
77impl<R: Runtime> HopMgr<R> {
78    /// Create a new [`HopMgr`] with an empty hop list.
79    ///
80    /// Hops are added with [`HopMgr::add_hop`].
81    pub(crate) fn new<S: StreamHandler>(
82        runtime: R,
83        unique_id: UniqId,
84        handler: S,
85        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
86        memquota: CircuitAccount,
87    ) -> Self {
88        // We don't spawn any stream reactors ahead of time.
89        // Instead we spawn them lazily, when opening streams.
90        let hops = Arc::new(RwLock::new(Default::default()));
91        let ctx = StreamReactorContext {
92            unique_id,
93            #[cfg(any(feature = "hs-service", feature = "relay"))]
94            incoming: Arc::new(Mutex::new(None)),
95            handler: Arc::new(handler),
96        };
97
98        Self {
99            runtime,
100            hops,
101            ctx,
102            bwd_tx,
103            memquota,
104        }
105    }
106
107    /// Return a reference to our hop list.
108    pub(crate) fn hops(&self) -> &Arc<RwLock<CircHopList>> {
109        &self.hops
110    }
111
112    /// Set the incoming stream handler for this reactor.
113    ///
114    /// There can only be one incoming stream handler per reactor,
115    /// and each stream handler only pertains to a single hop (see expected_hop())
116    //
117    // TODO: eventually, we might want a different design here,
118    // for example we might want to allow multiple stream handlers per reactor (one per hop).
119    // However, for now, the implementation is intentionally kept similar to that
120    // in the client reactor (to make it easier to migrate it to the new reactor design).
121    //
122    /// Returns an error if the hop manager already has a stream handler.
123    ///
124    /// Since the handler is shared with every hop's stream reactor,
125    /// this function will update the handler for all of them.
126    ///
127    // TODO(DEDUP): almost identical to the client-side
128    // CellHandlers::set_incoming_stream_req_handler()
129    #[cfg(any(feature = "hs-service", feature = "relay"))]
130    pub(crate) fn set_incoming_handler(&self, handler: IncomingStreamRequestHandler) -> Result<()> {
131        let mut lock = self.ctx.incoming.lock().expect("poisoned lock");
132
133        if lock.is_none() {
134            *lock = Some(handler);
135            Ok(())
136        } else {
137            Err(Error::from(internal!(
138                "Tried to install a BEGIN cell handler before the old one was gone."
139            )))
140        }
141    }
142
143    /// Push a new hop to our hop list.
144    ///
145    /// Prepares a cc object for the hop, but does not spawn a stream reactor.
146    ///
147    /// Will return an error if the circuit already has [`u8::MAX`] hops.
148    pub(crate) fn add_hop(&mut self, settings: HopSettings) -> Result<()> {
149        let mut hops = self.hops.write().expect("poisoned lock");
150        hops.add_hop(settings)
151    }
152
153    /// Send a message to the stream reactor of the specified `hop`,
154    /// spawning it if necessary.
155    pub(crate) async fn send(
156        &mut self,
157        hopnum: Option<HopNum>,
158        msg: StreamMsg,
159    ) -> StdResult<(), ReactorError> {
160        let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
161
162        tx.send(msg).await.map_err(|_| {
163            // The stream reactor has shut down
164            ReactorError::Shutdown
165        })
166    }
167
168    /// Get a handle to the stream reactor, spawning it if necessary
169    fn get_or_spawn_stream_reactor(
170        &self,
171        hopnum: Option<HopNum>,
172    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
173        let mut hops = self.hops.write().expect("poisoned lock");
174        let hop = hops
175            .get_mut(hopnum)
176            .ok_or_else(|| internal!("tried to send cell to nonexistent hop?!"))?;
177
178        let tx = match &hop.tx {
179            Some(tx) => tx.clone(),
180            None => {
181                // If we don't have a handle to the stream reactor,
182                // it means it hasn't been spawned yet, so we have to spawn it now.
183                let tx =
184                    self.spawn_stream_reactor(hopnum, &hop.settings, Arc::clone(&hop.ccontrol))?;
185
186                hop.tx = Some(tx.clone());
187
188                // Return a copy of this sender (can't borrow because the hop
189                // is behind a Mutex, and we can't keep it locked across the send()
190                // await point)
191                tx
192            }
193        };
194
195        Ok(tx)
196    }
197
198    /// Spawn a [`StreamReactor`] for the specified hop.
199    fn spawn_stream_reactor(
200        &self,
201        hopnum: Option<HopNum>,
202        settings: &HopSettings,
203        ccontrol: Arc<Mutex<CongestionControl>>,
204    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
205        use tor_rtcompat::SpawnExt as _;
206
207        // NOTE: not registering this channel with the memquota subsystem is okay,
208        // because it has no buffering (if ever decide to make the size of this buffer
209        // non-zero for whatever reason, we must remember to register it with memquota
210        // so that it counts towards the total memory usage for the circuit.
211        //
212        // TODO(tuning): having zero buffering here is very likely suboptimal.
213        // We should do *some* buffering here, and then figure out if we should it
214        // up to memquota or not.
215        #[allow(clippy::disallowed_methods)]
216        let (fwd_stream_tx, fwd_stream_rx) = mpsc::channel(0);
217
218        let flow_ctrl_params = Arc::new(settings.flow_ctrl_params.clone());
219        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
220        let outbound = CircHopOutbound::new(ccontrol, relay_format, flow_ctrl_params, settings);
221
222        let stream_reactor = StreamReactor::new(
223            self.runtime.clone(),
224            hopnum,
225            outbound,
226            self.ctx.unique_id,
227            fwd_stream_rx,
228            self.bwd_tx.clone(),
229            Arc::clone(&self.ctx.handler),
230            #[cfg(any(feature = "hs-service", feature = "relay"))]
231            Arc::clone(&self.ctx.incoming),
232            self.memquota.clone(),
233        );
234
235        self.runtime
236            .spawn(async {
237                let _ = stream_reactor.run().await;
238            })
239            .map_err(|_| ReactorError::Shutdown)?;
240
241        Ok(fwd_stream_tx)
242    }
243}