tor_proto/client.rs
1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "rpc")]
8pub mod rpc;
9
10#[cfg(feature = "send-control-msg")]
11pub(crate) mod msghandler;
12pub(crate) mod reactor;
13
14use derive_deftly::Deftly;
15use oneshot_fused_workaround as oneshot;
16use std::net::IpAddr;
17use std::sync::Arc;
18use tracing::instrument;
19
20use crate::circuit::UniqId;
21#[cfg(feature = "circ-padding-manual")]
22pub use crate::client::circuit::padding::{
23 CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
24};
25use crate::client::stream::{
26 DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
27 StreamReceiver,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::STREAM_READER_BUFFER;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::state::StreamRateLimit;
35use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36use crate::stream::queue::stream_queue;
37use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
38use crate::util::notify::NotifySender;
39use crate::{Error, ResolveError, Result};
40use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
41use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
42
43use postage::watch;
44use tor_cell::relaycell::StreamId;
45use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
46use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
47use tor_error::bad_api_usage;
48use tor_linkspec::OwnedChanTarget;
49use tor_memquota::derive_deftly_template_HasMemoryCost;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52#[cfg(feature = "hs-service")]
53use crate::stream::incoming::StreamReqInfo;
54
55#[cfg(feature = "hs-service")]
56use crate::client::stream::{IncomingCmdChecker, IncomingStream};
57
58#[cfg(feature = "send-control-msg")]
59use msghandler::{MsgHandler, UserMsgHandler};
60
61/// Handle to use during an ongoing protocol exchange with a circuit's last hop
62///
63/// This is obtained from [`ClientTunnel::start_conversation`],
64/// and used to send messages to the last hop relay.
65//
66// TODO(conflux): this should use ClientTunnel, and it should be moved into
67// the tunnel module.
68#[cfg(feature = "send-control-msg")]
69pub struct Conversation<'r>(&'r ClientTunnel);
70
71#[cfg(feature = "send-control-msg")]
72impl Conversation<'_> {
73 /// Send a protocol message as part of an ad-hoc exchange
74 ///
75 /// Responses are handled by the `UserMsgHandler` set up
76 /// when the `Conversation` was created.
77 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
78 self.send_internal(Some(msg), None).await
79 }
80
81 /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
82 ///
83 /// The guts of `start_conversation` and `Conversation::send_msg`
84 pub(crate) async fn send_internal(
85 &self,
86 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
87 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
88 ) -> Result<()> {
89 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
90 let (sender, receiver) = oneshot::channel();
91
92 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
93 msg,
94 handler,
95 sender,
96 };
97 self.0
98 .circ
99 .control
100 .unbounded_send(ctrl_msg)
101 .map_err(|_| Error::CircuitClosed)?;
102
103 receiver.await.map_err(|_| Error::CircuitClosed)?
104 }
105}
106
107/// A low-level client tunnel API.
108///
109/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
110///
111/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
112/// desired subset of functionality depending on purpose and path size.
113///
114/// Some API calls are for single path and some for multi path. A check with the underlying reactor
115/// is done preventing for instance multi path calls to be used on a single path. Top level types
116/// should prevent this and thus this object should never be used directly.
117#[derive(Debug)]
118#[cfg_attr(
119 feature = "rpc",
120 derive(derive_deftly::Deftly),
121 derive_deftly(tor_rpcbase::templates::Object)
122)]
123#[allow(dead_code)] // TODO(conflux)
124pub struct ClientTunnel {
125 /// The underlying handle to the reactor.
126 circ: ClientCirc,
127}
128
129impl ClientTunnel {
130 /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
131 /// circuit tunnel.
132 ///
133 /// Returns an error if the tunnel has more than one circuit.
134 pub fn as_single_circ(&self) -> Result<&ClientCirc> {
135 if self.circ.is_multi_path {
136 return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
137 }
138 Ok(&self.circ)
139 }
140
141 /// Return the channel target of the first hop.
142 ///
143 /// Can only be used for single path tunnel.
144 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
145 self.as_single_circ()?.first_hop()
146 }
147
148 /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
149 /// receiving or sending.
150 pub fn is_closed(&self) -> bool {
151 self.circ.is_closing()
152 }
153
154 /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
155 /// HopLocation with its id and hop number.
156 ///
157 /// Return an error if there is no last hop.
158 pub fn last_hop(&self) -> Result<TargetHop> {
159 let uniq_id = self.unique_id();
160 let hop_num = self
161 .circ
162 .mutable
163 .last_hop_num(uniq_id)?
164 .ok_or_else(|| bad_api_usage!("no last hop"))?;
165 Ok((uniq_id, hop_num).into())
166 }
167
168 /// Return a description of the last hop of the tunnel.
169 ///
170 /// Return None if the last hop is virtual; return an error
171 /// if the tunnel has no circuits, or all of its circuits are zero length.
172 ///
173 ///
174 /// # Panics
175 ///
176 /// Panics if there is no last hop. (This should be impossible outside of
177 /// the tor-proto crate, but within the crate it's possible to have a
178 /// circuit with no hops.)
179 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
180 self.circ.last_hop_info()
181 }
182
183 /// Return the number of hops this tunnel as. Fail for a multi path.
184 pub fn n_hops(&self) -> Result<usize> {
185 self.as_single_circ()?.n_hops()
186 }
187
188 /// Return the [`Path`] objects describing all the hops
189 /// of all the circuits in this tunnel.
190 pub fn all_paths(&self) -> Vec<Arc<Path>> {
191 self.circ.all_paths()
192 }
193
194 /// Return a representation of the Paths for all the circuits in this tunnel,
195 /// as a map from each circuits' UniqId to its path.
196 ///
197 /// This is only exposed for the RPC subsystem, where it is documented that the
198 /// format of `UniqId` is not stable.
199 #[cfg(feature = "rpc")]
200 pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
201 self.circ.mutable.tagged_paths()
202 }
203
204 /// Return a process-unique identifier for this tunnel.
205 ///
206 /// Returns the reactor unique ID of the main reactor.
207 pub fn unique_id(&self) -> UniqId {
208 self.circ.unique_id()
209 }
210
211 /// Return the time at which this tunnel last had any open streams.
212 ///
213 /// Returns `None` if this tunnel has never had any open streams,
214 /// or if it currently has open streams.
215 ///
216 /// NOTE that the Instant returned by this method is not affected by
217 /// any runtime mocking; it is the output of an ordinary call to
218 /// `Instant::now()`.
219 pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
220 self.circ.disused_since().await
221 }
222
223 /// Return a future that will resolve once the underlying circuit reactor has closed.
224 ///
225 /// Note that this method does not itself cause the tunnel to shut down.
226 pub fn wait_for_close(
227 self: &Arc<Self>,
228 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
229 self.circ.wait_for_close()
230 }
231
232 /// Single-path tunnel only. Multi path onion service is not supported yet.
233 ///
234 /// Tell this tunnel to begin allowing the final hop of the tunnel to try
235 /// to create new Tor streams, and to return those pending requests in an
236 /// asynchronous stream.
237 ///
238 /// Ordinarily, these requests are rejected.
239 ///
240 /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
241 /// If a such a [`Stream`](futures::Stream) already exists, this method will return
242 /// an error.
243 ///
244 /// After this method has been called on a tunnel, the tunnel is expected
245 /// to receive requests of this type indefinitely, until it is finally closed.
246 /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
247 ///
248 /// Only onion services (and eventually) exit relays should call this
249 /// method.
250 //
251 // TODO: Someday, we might want to allow a stream request handler to be
252 // un-registered. However, nothing in the Tor protocol requires it.
253 //
254 // Any incoming request handlers installed on the other circuits
255 // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
256 // will be discarded (along with the reactor of that circuit)
257 #[cfg(feature = "hs-service")]
258 #[allow(unreachable_code, unused_variables)] // TODO(conflux)
259 pub async fn allow_stream_requests<'a, FILT>(
260 self: &Arc<Self>,
261 allow_commands: &'a [tor_cell::relaycell::RelayCmd],
262 hop: TargetHop,
263 filter: FILT,
264 ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
265 where
266 FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
267 {
268 use futures::stream::StreamExt;
269
270 /// The size of the channel receiving IncomingStreamRequestContexts.
271 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
272
273 // TODO(#2002): support onion service conflux
274 let circ = self.as_single_circ().map_err(tor_error::into_internal!(
275 "Cannot allow stream requests on a multi-path tunnel"
276 ))?;
277
278 let time_prov = circ.time_provider.clone();
279 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
280 let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
281 .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
282 let (tx, rx) = oneshot::channel();
283
284 circ.command
285 .unbounded_send(CtrlCmd::AwaitStreamRequest {
286 cmd_checker,
287 incoming_sender,
288 hop,
289 done: tx,
290 filter: Box::new(filter),
291 })
292 .map_err(|_| Error::CircuitClosed)?;
293
294 // Check whether the AwaitStreamRequest was processed successfully.
295 rx.await.map_err(|_| Error::CircuitClosed)??;
296
297 let allowed_hop_loc: HopLocation = match hop {
298 TargetHop::Hop(loc) => Some(loc),
299 _ => None,
300 }
301 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
302
303 let tunnel = self.clone();
304 Ok(incoming_receiver.map(move |req_ctx| {
305 let StreamReqInfo {
306 req,
307 stream_id,
308 hop,
309 receiver,
310 msg_tx,
311 rate_limit_stream,
312 drain_rate_request_stream,
313 memquota,
314 relay_cell_format,
315 } = req_ctx;
316
317 // We already enforce this in handle_incoming_stream_request; this
318 // assertion is just here to make sure that we don't ever
319 // accidentally remove or fail to enforce that check, since it is
320 // security-critical.
321 assert_eq!(Some(allowed_hop_loc), hop);
322
323 // TODO(#2002): figure out what this is going to look like
324 // for onion services (perhaps we should forbid this function
325 // from being called on a multipath circuit?)
326 //
327 // See also:
328 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
329 let target = StreamTarget {
330 tunnel: Tunnel::Client(Arc::clone(&tunnel)),
331 tx: msg_tx,
332 hop: Some(allowed_hop_loc),
333 stream_id,
334 relay_cell_format,
335 rate_limit_stream,
336 };
337
338 // can be used to build a reader that supports XON/XOFF flow control
339 let xon_xoff_reader_ctrl =
340 XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
341
342 let reader = StreamReceiver {
343 target: target.clone(),
344 receiver,
345 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
346 ended: false,
347 };
348
349 let components = StreamComponents {
350 stream_receiver: reader,
351 target,
352 memquota,
353 xon_xoff_reader_ctrl,
354 };
355
356 IncomingStream::new(time_prov.clone(), req, components)
357 }))
358 }
359
360 /// Single and Multi path helper, used to begin a stream.
361 ///
362 /// This function allocates a stream ID, and sends the message
363 /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
364 ///
365 /// The caller will typically want to see the first cell in response,
366 /// to see whether it is e.g. an END or a CONNECTED.
367 async fn begin_stream_impl(
368 self: &Arc<Self>,
369 begin_msg: AnyRelayMsg,
370 cmd_checker: AnyCmdChecker,
371 ) -> Result<StreamComponents> {
372 // TODO: Possibly this should take a hop, rather than just
373 // assuming it's the last hop.
374 let hop = TargetHop::LastHop;
375
376 let time_prov = self.circ.time_provider.clone();
377
378 let memquota = StreamAccount::new(self.circ.mq_account())?;
379 let (sender, receiver) = stream_queue(
380 #[cfg(not(feature = "flowctl-cc"))]
381 STREAM_READER_BUFFER,
382 &memquota,
383 &time_prov,
384 )?;
385 let (tx, rx) = oneshot::channel();
386 let (msg_tx, msg_rx) =
387 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
388
389 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
390
391 // A channel for the reactor to request a new drain rate from the reader.
392 // Typically this notification will be sent after an XOFF is sent so that the reader can
393 // send us a new drain rate when the stream data queue becomes empty.
394 let mut drain_rate_request_tx = NotifySender::new_typed();
395 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
396
397 self.circ
398 .control
399 .unbounded_send(CtrlMsg::BeginStream {
400 hop,
401 message: begin_msg,
402 sender,
403 rx: msg_rx,
404 rate_limit_notifier: rate_limit_tx,
405 drain_rate_requester: drain_rate_request_tx,
406 done: tx,
407 cmd_checker,
408 })
409 .map_err(|_| Error::CircuitClosed)?;
410
411 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
412
413 let target = StreamTarget {
414 tunnel: Tunnel::Client(self.clone()),
415 tx: msg_tx,
416 hop: Some(hop),
417 stream_id,
418 relay_cell_format,
419 rate_limit_stream: rate_limit_rx,
420 };
421
422 // can be used to build a reader that supports XON/XOFF flow control
423 let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
424
425 let stream_receiver = StreamReceiver {
426 target: target.clone(),
427 receiver,
428 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
429 ended: false,
430 };
431
432 let components = StreamComponents {
433 stream_receiver,
434 target,
435 memquota,
436 xon_xoff_reader_ctrl,
437 };
438
439 Ok(components)
440 }
441
442 /// Install a [`CircuitPadder`] at the listed `hop`.
443 ///
444 /// Replaces any previous padder installed at that hop.
445 #[cfg(feature = "circ-padding-manual")]
446 pub async fn start_padding_at_hop(
447 self: &Arc<Self>,
448 hop: HopLocation,
449 padder: CircuitPadder,
450 ) -> Result<()> {
451 self.circ.set_padder_impl(hop, Some(padder)).await
452 }
453
454 /// Remove any [`CircuitPadder`] at the listed `hop`.
455 ///
456 /// Does nothing if there was not a padder installed there.
457 #[cfg(feature = "circ-padding-manual")]
458 pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
459 self.circ.set_padder_impl(hop, None).await
460 }
461
462 /// Start a DataStream (anonymized connection) to the given
463 /// address and port, using a BEGIN cell.
464 async fn begin_data_stream(
465 self: &Arc<Self>,
466 msg: AnyRelayMsg,
467 optimistic: bool,
468 ) -> Result<DataStream> {
469 let components = self
470 .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
471 .await?;
472
473 let StreamComponents {
474 stream_receiver,
475 target,
476 memquota,
477 xon_xoff_reader_ctrl,
478 } = components;
479
480 let mut stream = DataStream::new(
481 self.circ.time_provider.clone(),
482 stream_receiver,
483 xon_xoff_reader_ctrl,
484 target,
485 memquota,
486 );
487 if !optimistic {
488 stream.wait_for_connection().await?;
489 }
490 Ok(stream)
491 }
492
493 /// Single and multi path helper.
494 ///
495 /// Start a stream to the given address and port, using a BEGIN
496 /// cell.
497 ///
498 /// The use of a string for the address is intentional: you should let
499 /// the remote Tor relay do the hostname lookup for you.
500 #[instrument(level = "trace", skip_all)]
501 pub async fn begin_stream(
502 self: &Arc<Self>,
503 target: &str,
504 port: u16,
505 parameters: Option<StreamParameters>,
506 ) -> Result<DataStream> {
507 let parameters = parameters.unwrap_or_default();
508 let begin_flags = parameters.begin_flags();
509 let optimistic = parameters.is_optimistic();
510 let target = if parameters.suppressing_hostname() {
511 ""
512 } else {
513 target
514 };
515 let beginmsg = Begin::new(target, port, begin_flags)
516 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
517 self.begin_data_stream(beginmsg.into(), optimistic).await
518 }
519
520 /// Start a new stream to the last relay in the tunnel, using
521 /// a BEGIN_DIR cell.
522 pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
523 // Note that we always open begindir connections optimistically.
524 // Since they are local to a relay that we've already authenticated
525 // with and built a tunnel to, there should be no additional checks
526 // we need to perform to see whether the BEGINDIR will succeed.
527 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
528 .await
529 }
530
531 /// Perform a DNS lookup, using a RESOLVE cell with the last relay
532 /// in this tunnel.
533 ///
534 /// Note that this function does not check for timeouts; that's
535 /// the caller's responsibility.
536 pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
537 let resolve_msg = Resolve::new(hostname);
538
539 let resolved_msg = self.try_resolve(resolve_msg).await?;
540
541 resolved_msg
542 .into_answers()
543 .into_iter()
544 .filter_map(|(val, _)| match resolvedval_to_result(val) {
545 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
546 Ok(_) => None,
547 Err(e) => Some(Err(e)),
548 })
549 .collect()
550 }
551
552 /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
553 /// the last relay on this tunnel.
554 ///
555 /// Note that this function does not check for timeouts; that's
556 /// the caller's responsibility.
557 pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
558 let resolve_ptr_msg = Resolve::new_reverse(&addr);
559
560 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
561
562 resolved_msg
563 .into_answers()
564 .into_iter()
565 .filter_map(|(val, _)| match resolvedval_to_result(val) {
566 Ok(ResolvedVal::Hostname(v)) => Some(
567 String::from_utf8(v)
568 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
569 ),
570 Ok(_) => None,
571 Err(e) => Some(Err(e)),
572 })
573 .collect()
574 }
575
576 /// Send an ad-hoc message to a given hop on the circuit, without expecting
577 /// a reply.
578 ///
579 /// (If you want to handle one or more possible replies, see
580 /// [`ClientTunnel::start_conversation`].)
581 // TODO(conflux): Change this to use the ReactorHandle for the control commands.
582 #[cfg(feature = "send-control-msg")]
583 pub async fn send_raw_msg(
584 &self,
585 msg: tor_cell::relaycell::msg::AnyRelayMsg,
586 hop: TargetHop,
587 ) -> Result<()> {
588 let (sender, receiver) = oneshot::channel();
589 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
590 self.circ
591 .control
592 .unbounded_send(ctrl_msg)
593 .map_err(|_| Error::CircuitClosed)?;
594
595 receiver.await.map_err(|_| Error::CircuitClosed)?
596 }
597
598 /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
599 ///
600 /// To use this:
601 ///
602 /// 0. Create an inter-task channel you'll use to receive
603 /// the outcome of your conversation,
604 /// and bundle it into a [`UserMsgHandler`].
605 ///
606 /// 1. Call `start_conversation`.
607 /// This will install a your handler, for incoming messages,
608 /// and send the outgoing message (if you provided one).
609 /// After that, each message on the circuit
610 /// that isn't handled by the core machinery
611 /// is passed to your provided `reply_handler`.
612 ///
613 /// 2. Possibly call `send_msg` on the [`Conversation`],
614 /// from the call site of `start_conversation`,
615 /// possibly multiple times, from time to time,
616 /// to send further desired messages to the peer.
617 ///
618 /// 3. In your [`UserMsgHandler`], process the incoming messages.
619 /// You may respond by
620 /// sending additional messages
621 /// When the protocol exchange is finished,
622 /// `UserMsgHandler::handle_msg` should return
623 /// [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
624 ///
625 /// If you don't need the `Conversation` to send followup messages,
626 /// you may simply drop it,
627 /// and rely on the responses you get from your handler,
628 /// on the channel from step 0 above.
629 /// Your handler will remain installed and able to process incoming messages
630 /// until it returns `ConversationFinished`.
631 ///
632 /// (If you don't want to accept any replies at all, it may be
633 /// simpler to use [`ClientTunnel::send_raw_msg`].)
634 ///
635 /// Note that it is quite possible to use this function to violate the tor
636 /// protocol; most users of this API will not need to call it. It is used
637 /// to implement most of the onion service handshake.
638 ///
639 /// # Limitations
640 ///
641 /// Only one conversation may be active at any one time,
642 /// for any one circuit.
643 /// This generally means that this function should not be called
644 /// on a tunnel which might be shared with anyone else.
645 ///
646 /// Likewise, it is forbidden to try to extend the tunnel,
647 /// while the conversation is in progress.
648 ///
649 /// After the conversation has finished, the tunnel may be extended.
650 /// Or, `start_conversation` may be called again;
651 /// but, in that case there will be a gap between the two conversations,
652 /// during which no `UserMsgHandler` is installed,
653 /// and unexpected incoming messages would close the tunnel.
654 ///
655 /// If these restrictions are violated, the tunnel will be closed with an error.
656 ///
657 /// ## Precise definition of the lifetime of a conversation
658 ///
659 /// A conversation is in progress from entry to `start_conversation`,
660 /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
661 /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
662 /// (*Entry* since `handle_msg` is synchronously embedded
663 /// into the incoming message processing.)
664 /// So you may start a new conversation as soon as you have the final response
665 /// via your inter-task channel from (0) above.
666 ///
667 /// The lifetime relationship of the [`Conversation`],
668 /// vs the handler returning `ConversationFinished`
669 /// is not enforced by the type system.
670 // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
671 // at least while allowing sending followup messages from outside the handler.
672 #[cfg(feature = "send-control-msg")]
673 pub async fn start_conversation(
674 &self,
675 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
676 reply_handler: impl MsgHandler + Send + 'static,
677 hop: TargetHop,
678 ) -> Result<Conversation<'_>> {
679 // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
680 // the right Leg/Hop with inbound cell.
681 let (sender, receiver) = oneshot::channel();
682 self.circ
683 .command
684 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
685 .map_err(|_| Error::CircuitClosed)?;
686 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
687 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
688 let conversation = Conversation(self);
689 conversation.send_internal(msg, Some(handler)).await?;
690 Ok(conversation)
691 }
692
693 /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
694 /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
695 /// returns!).
696 ///
697 /// Note that other references to this tunnel may exist. If they do, they will stop working
698 /// after you call this function.
699 ///
700 /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
701 /// close on its own once nothing is using it any more.
702 // TODO(conflux): This should use the ReactorHandle instead.
703 pub fn terminate(&self) {
704 let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
705 }
706
707 /// Helper: Send the resolve message, and read resolved message from
708 /// resolve stream.
709 async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
710 let components = self
711 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
712 .await?;
713
714 let StreamComponents {
715 stream_receiver,
716 target: _,
717 memquota,
718 xon_xoff_reader_ctrl: _,
719 } = components;
720
721 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
722 resolve_stream.read_msg().await
723 }
724
725 // TODO(conflux)
726}
727
728// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
729// has the expected (non-zero) number of hops.
730impl TryFrom<ClientCirc> for ClientTunnel {
731 type Error = Error;
732
733 fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
734 Ok(Self { circ })
735 }
736}
737
738/// Convert a [`ResolvedVal`] into a Result, based on whether or not
739/// it represents an error.
740fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
741 match val {
742 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
743 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
744 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
745 _ => Ok(val),
746 }
747}
748
749/// A precise position in a tunnel.
750#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
751#[derive_deftly(HasMemoryCost)]
752#[non_exhaustive]
753pub enum HopLocation {
754 /// A specific position in a tunnel.
755 Hop((UniqId, HopNum)),
756 /// The join point of a multi-path tunnel.
757 JoinPoint,
758}
759
760/// A position in a tunnel.
761#[derive(Debug, Copy, Clone, PartialEq, Eq)]
762#[non_exhaustive]
763pub enum TargetHop {
764 /// A specific position in a tunnel.
765 Hop(HopLocation),
766 /// The last hop of a tunnel.
767 ///
768 /// This should be used only when you don't care about what specific hop is used.
769 /// Some tunnels may be extended or truncated,
770 /// which means that the "last hop" may change at any time.
771 LastHop,
772}
773
774impl From<(UniqId, HopNum)> for HopLocation {
775 fn from(v: (UniqId, HopNum)) -> Self {
776 HopLocation::Hop(v)
777 }
778}
779
780impl From<(UniqId, HopNum)> for TargetHop {
781 fn from(v: (UniqId, HopNum)) -> Self {
782 TargetHop::Hop(v.into())
783 }
784}
785
786impl HopLocation {
787 /// Return the hop number if not a JointPoint.
788 pub fn hop_num(&self) -> Option<HopNum> {
789 match self {
790 Self::Hop((_, hop_num)) => Some(*hop_num),
791 Self::JoinPoint => None,
792 }
793 }
794}
795
796impl ClientTunnel {
797 /// Close the pending stream that owns this StreamTarget, delivering the specified
798 /// END message (if any)
799 ///
800 /// See [`StreamTarget::close_pending`].
801 #[cfg(feature = "hs-service")]
802 pub(crate) fn close_pending(
803 &self,
804 stream_id: StreamId,
805 hop: Option<HopLocation>,
806 message: crate::stream::CloseStreamBehavior,
807 ) -> Result<oneshot::Receiver<Result<()>>> {
808 let (tx, rx) = oneshot::channel();
809
810 self.circ
811 .control
812 .unbounded_send(CtrlMsg::ClosePendingStream {
813 stream_id,
814 hop: hop.expect("missing stream hop for client tunnel"),
815 message,
816 done: tx,
817 })
818 .map_err(|_| Error::CircuitClosed)?;
819
820 Ok(rx)
821 }
822
823 /// Request to send a SENDME cell for this stream.
824 ///
825 /// See [`StreamTarget::send_sendme`].
826 pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
827 self.circ
828 .control
829 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
830 msg: FlowCtrlMsg::Sendme,
831 stream_id,
832 hop: hop.expect("missing stream hop for client tunnel"),
833 })
834 .map_err(|_| Error::CircuitClosed)
835 }
836
837 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
838 ///
839 /// See [`StreamTarget::drain_rate_update`].
840 pub(crate) fn drain_rate_update(
841 &self,
842 stream_id: StreamId,
843 hop: Option<HopLocation>,
844 rate: XonKbpsEwma,
845 ) -> Result<()> {
846 self.circ
847 .control
848 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
849 msg: FlowCtrlMsg::Xon(rate),
850 stream_id,
851 hop: hop.expect("missing stream hop for client tunnel"),
852 })
853 .map_err(|_| Error::CircuitClosed)
854 }
855}