tor_proto/relay.rs
1//! This module contains a WIP relay tunnel reactor.
2//!
3//! The initial version will duplicate some of the logic from
4//! the client tunnel reactor.
5//!
6//! TODO(relay): refactor the relay tunnel
7//! to share the same base tunnel implementation
8//! as the client tunnel (to reduce code duplication).
9//!
10//! See the design notes at doc/dev/notes/relay-reactor.md
11
12pub(crate) mod channel;
13#[allow(unreachable_pub)] // TODO(relay): use in tor-chanmgr(?)
14pub mod channel_provider;
15pub(crate) mod reactor;
16
17pub use channel::MaybeVerifiableRelayResponderChannel;
18pub use channel::create_handler::{
19 CircNetParameters, CongestionControlNetParams, CreateRequestHandler,
20};
21
22use derive_deftly::Deftly;
23use futures::StreamExt as _;
24use oneshot_fused_workaround as oneshot;
25
26use tor_cell::chancell::msg::{self as chanmsg};
27use tor_cell::relaycell::StreamId;
28use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
29use tor_memquota::derive_deftly_template_HasMemoryCost;
30use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
31
32use crate::Error;
33use crate::circuit::celltypes::derive_deftly_template_RestrictedChanMsgSet;
34use crate::circuit::reactor::CircReactorHandle;
35use crate::circuit::reactor::{CtrlCmd, forward};
36use crate::congestion::sendme::StreamRecvWindow;
37use crate::memquota::SpecificAccount;
38use crate::relay::reactor::backward::Backward;
39use crate::relay::reactor::forward::Forward;
40use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
41use crate::stream::incoming::{
42 IncomingCmdChecker, IncomingStream, IncomingStreamRequestFilter, StreamReqInfo,
43};
44use crate::stream::raw::StreamReceiver;
45use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
46
47use std::sync::Arc;
48
49/// A subclass of ChanMsg that can correctly arrive on a live relay
50/// circuit (one where a CREATE* has been received).
51#[derive(Debug, Deftly)]
52#[derive_deftly(HasMemoryCost)]
53#[derive_deftly(RestrictedChanMsgSet)]
54#[deftly(usage = "on an open relay circuit")]
55#[cfg(feature = "relay")]
56#[cfg_attr(not(test), allow(unused))] // TODO(relay)
57pub(crate) enum RelayCircChanMsg {
58 /// A relay cell telling us some kind of remote command from some
59 /// party on the circuit.
60 Relay(chanmsg::Relay),
61 /// A relay early cell that is allowed to contain a CREATE message.
62 RelayEarly(chanmsg::RelayEarly),
63 /// A cell telling us to destroy the circuit.
64 Destroy(chanmsg::Destroy),
65 /// A cell telling us to enable/disable channel padding.
66 PaddingNegotiate(chanmsg::PaddingNegotiate),
67}
68
69/// A handle for interacting with a relay circuit.
70#[allow(unused)] // TODO(relay)
71#[derive(Debug)]
72pub struct RelayCirc(pub(crate) CircReactorHandle<Forward, Backward>);
73
74impl RelayCirc {
75 /// Shut down this circuit, along with all streams that are using it.
76 /// Happens asynchronously (i.e. the tunnel won't necessarily be done shutting down
77 /// immediately after this function returns!).
78 ///
79 /// Note that other references to this tunnel may exist.
80 /// If they do, they will stop working after you call this function.
81 ///
82 /// It's not necessary to call this method if you're just done with a circuit:
83 /// the circuit should close on its own once nothing is using it any more.
84 pub fn terminate(&self) {
85 let _ = self.0.command.unbounded_send(CtrlCmd::Shutdown);
86 }
87
88 /// Return true if this circuit is closed and therefore unusable.
89 pub fn is_closing(&self) -> bool {
90 self.0.control.is_closed()
91 }
92
93 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
94 ///
95 /// Typically the circuit reactor would send this new rate in an XON message to the other end of
96 /// the stream.
97 /// But it may decide not to, and may discard this update.
98 /// For example the stream may have a large amount of buffered data, and the reactor may not
99 /// want to send an XON while the buffer is large.
100 ///
101 /// This sends a message to inform the circuit reactor of the new drain rate,
102 /// but it does not block or wait for a response from the reactor.
103 /// An error is only returned if we are unable to send the update.
104 //
105 // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
106 pub(crate) fn drain_rate_update(
107 &self,
108 _stream_id: StreamId,
109 _rate: XonKbpsEwma,
110 ) -> crate::Result<()> {
111 todo!()
112 }
113
114 /// Request to send a SENDME cell for this stream.
115 ///
116 /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
117 /// block or wait for a response from the circuit reactor.
118 /// An error is only returned if we are unable to send the request.
119 /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
120 /// this here and an error will not be returned.
121 //
122 // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
123 pub(crate) fn send_sendme(&self, _stream_id: StreamId) -> crate::Result<()> {
124 todo!()
125 }
126
127 /// Close the pending stream that owns this StreamTarget, delivering the specified
128 /// END message (if any)
129 ///
130 /// The stream is closed by sending a control message (`ClosePendingStream`)
131 /// to the reactor.
132 ///
133 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
134 ///
135 /// The StreamTarget will set the correct stream ID and pick the
136 /// right hop, but will not validate that the message is well-formed
137 /// or meaningful in context.
138 ///
139 /// Note that in many cases, the actual contents of an END message can leak unwanted
140 /// information. Please consider carefully before sending anything but an
141 /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
142 /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
143 ///
144 /// In addition to sending the END message, this function also ensures
145 /// the state of the stream map entry of this stream is updated
146 /// accordingly.
147 ///
148 /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
149 /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
150 /// function is for closing pending incoming streams (a stream is said to be pending if we have
151 /// received the message initiating the stream but have not responded to it yet).
152 ///
153 /// **NOTE**: This function should be called at most once per request.
154 /// Calling it twice is an error.
155 //
156 // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
157 pub(crate) fn close_pending(
158 &self,
159 _stream_id: StreamId,
160 _message: crate::stream::CloseStreamBehavior,
161 ) -> crate::Result<oneshot::Receiver<crate::Result<()>>> {
162 todo!()
163 }
164
165 /// Tell this reactor to begin allowing incoming stream requests,
166 /// and to return those pending requests in an asynchronous stream.
167 ///
168 /// Ordinarily, these requests are rejected.
169 ///
170 /// Needed for exits. Middle relays should reject every incoming stream,
171 /// either through the `filter` provided in `filter`,
172 /// or by explicitly calling .reject() on each received stream.
173 ///
174 // TODO(relay): I think we will prefer using the .reject() approach
175 // for this, because the filter is only meant for inexpensive quick
176 // checks that are done immediately in the reactor (any blocking
177 // in the filter will block the relay reactor main loop!).
178 ///
179 /// The user of the reactor **must** handle this stream
180 /// (either by .accept()ing and opening and proxying the corresponding
181 /// streams as appropriate, or by .reject()ing).
182 ///
183 // TODO: declare a type-alias for the return type when support for
184 // impl in type aliases gets stabilized.
185 //
186 // See issue #63063 <https://github.com/rust-lang/rust/issues/63063>
187 //
188 /// There can only be one [`Stream`](futures::Stream) of this type created on a given reactor.
189 /// If a such a [`Stream`](futures::Stream) already exists, this method will return
190 /// an error.
191 ///
192 /// After this method has been called on a reactor, the reactor is expected
193 /// to receive requests of this type indefinitely, until it is finally closed.
194 /// If the `Stream` is dropped, the next request on this reactor will cause it to close.
195 ///
196 // TODO: Someday, we might want to allow a stream request handler to be
197 // un-registered. However, nothing in the Tor protocol requires it.
198 //
199 // TODO(DEDUP): *very* similar to ServiceOnionServiceDataTunnel::allow_stream_requests
200 #[allow(unused)] // TODO(relay): call this from the task that creates the circ
201 pub(crate) async fn allow_stream_requests<'a, FILT>(
202 self: Arc<Self>,
203 allow_commands: &'a [tor_cell::relaycell::RelayCmd],
204 filter: FILT,
205 ) -> crate::Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
206 where
207 FILT: IncomingStreamRequestFilter,
208 {
209 let tunnel = Arc::clone(&self);
210 /// The size of the channel receiving IncomingStreamRequestContexts.
211 ///
212 // TODO(relay-tuning): buffer size
213 const INCOMING_BUFFER: usize = crate::stream::STREAM_READER_BUFFER;
214
215 let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER).new_mq(
216 self.0.time_provider.clone(),
217 tunnel.0.memquota.as_raw_account(),
218 )?;
219
220 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
221 let (tx, rx) = oneshot::channel();
222 let cmd = forward::CtrlCmd::AwaitStreamRequests {
223 incoming_sender,
224 cmd_checker,
225 hop: None,
226 filter: Box::new(filter),
227 done: tx,
228 };
229
230 tunnel
231 .0
232 .command
233 .unbounded_send(CtrlCmd::Forward(cmd))
234 .map_err(|_| Error::CircuitClosed)?;
235
236 // Check whether the AwaitStreamRequest was processed successfully.
237 rx.await.map_err(|_| Error::CircuitClosed)??;
238
239 // TODO(relay): this is more or less copy-pasta from client code
240 let stream = incoming_receiver.map(move |req_ctx| {
241 let StreamReqInfo {
242 req,
243 stream_id,
244 hop,
245 receiver,
246 msg_tx,
247 rate_limit_stream,
248 drain_rate_request_stream,
249 memquota,
250 relay_cell_format,
251 } = req_ctx;
252
253 // There is no originating hop if we're a relay
254 debug_assert!(hop.is_none());
255
256 let target = StreamTarget {
257 tunnel: Tunnel::Relay(Arc::clone(&tunnel)),
258 tx: msg_tx,
259 hop: None,
260 stream_id,
261 relay_cell_format,
262 rate_limit_stream,
263 };
264
265 // can be used to build a reader that supports XON/XOFF flow control
266 let xon_xoff_reader_ctrl =
267 XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
268
269 let reader = StreamReceiver {
270 target: target.clone(),
271 receiver,
272 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
273 ended: false,
274 };
275
276 let components = StreamComponents {
277 stream_receiver: reader,
278 target,
279 memquota,
280 xon_xoff_reader_ctrl,
281 };
282
283 IncomingStream::new(self.0.time_provider.clone(), req, components)
284 });
285
286 Ok(stream)
287 }
288}
289
290#[cfg(test)]
291mod test {
292 // @@ begin test lint list maintained by maint/add_warning @@
293 #![allow(clippy::bool_assert_comparison)]
294 #![allow(clippy::clone_on_copy)]
295 #![allow(clippy::dbg_macro)]
296 #![allow(clippy::mixed_attributes_style)]
297 #![allow(clippy::print_stderr)]
298 #![allow(clippy::print_stdout)]
299 #![allow(clippy::single_char_pattern)]
300 #![allow(clippy::unwrap_used)]
301 #![allow(clippy::unchecked_time_subtraction)]
302 #![allow(clippy::useless_vec)]
303 #![allow(clippy::needless_pass_by_value)]
304 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
305
306 #[test]
307 fn relay_circ_chan_msg() {
308 use tor_cell::chancell::msg::{self, AnyChanMsg};
309 fn good(m: AnyChanMsg) {
310 use crate::relay::RelayCircChanMsg;
311 assert!(RelayCircChanMsg::try_from(m).is_ok());
312 }
313 fn bad(m: AnyChanMsg) {
314 use crate::relay::RelayCircChanMsg;
315 assert!(RelayCircChanMsg::try_from(m).is_err());
316 }
317
318 good(msg::Destroy::new(2.into()).into());
319 bad(msg::CreatedFast::new(&b"The great globular mass"[..]).into());
320 bad(msg::Created2::new(&b"of protoplasmic slush"[..]).into());
321 good(msg::Relay::new(&b"undulated slightly,"[..]).into());
322 good(msg::AnyChanMsg::RelayEarly(
323 msg::Relay::new(&b"as if aware of him"[..]).into(),
324 ));
325 bad(msg::Versions::new([1, 2, 3]).unwrap().into());
326 good(msg::PaddingNegotiate::start_default().into());
327 good(msg::RelayEarly::from(msg::Relay::new(b"snail-like unipedular organism")).into());
328 }
329}