Skip to main content

tor_proto/
client.rs

1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "rpc")]
8pub mod rpc;
9
10#[cfg(feature = "send-control-msg")]
11pub(crate) mod msghandler;
12pub(crate) mod reactor;
13
14use derive_deftly::Deftly;
15use oneshot_fused_workaround as oneshot;
16use std::net::IpAddr;
17use std::sync::Arc;
18use tracing::instrument;
19
20use crate::circuit::UniqId;
21#[cfg(feature = "circ-padding-manual")]
22pub use crate::client::circuit::padding::{
23    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
24};
25use crate::client::stream::{
26    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
27    StreamReceiver,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::STREAM_READER_BUFFER;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::state::StreamRateLimit;
35use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36use crate::stream::queue::stream_queue;
37use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
38use crate::util::notify::NotifySender;
39use crate::{Error, ResolveError, Result};
40use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
41use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
42
43use postage::watch;
44use tor_cell::relaycell::StreamId;
45use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
46use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
47use tor_error::bad_api_usage;
48use tor_linkspec::OwnedChanTarget;
49use tor_memquota::derive_deftly_template_HasMemoryCost;
50use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51
52#[cfg(feature = "hs-service")]
53use crate::stream::incoming::StreamReqInfo;
54
55#[cfg(feature = "hs-service")]
56use crate::client::stream::{IncomingCmdChecker, IncomingStream};
57
58#[cfg(feature = "send-control-msg")]
59use msghandler::{MsgHandler, UserMsgHandler};
60
61/// Handle to use during an ongoing protocol exchange with a circuit's last hop
62///
63/// This is obtained from [`ClientTunnel::start_conversation`],
64/// and used to send messages to the last hop relay.
65//
66// TODO(conflux): this should use ClientTunnel, and it should be moved into
67// the tunnel module.
68#[cfg(feature = "send-control-msg")]
69pub struct Conversation<'r>(&'r ClientTunnel);
70
71#[cfg(feature = "send-control-msg")]
72impl Conversation<'_> {
73    /// Send a protocol message as part of an ad-hoc exchange
74    ///
75    /// Responses are handled by the `UserMsgHandler` set up
76    /// when the `Conversation` was created.
77    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
78        self.send_internal(Some(msg), None).await
79    }
80
81    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
82    ///
83    /// The guts of `start_conversation` and `Conversation::send_msg`
84    pub(crate) async fn send_internal(
85        &self,
86        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
87        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
88    ) -> Result<()> {
89        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
90        let (sender, receiver) = oneshot::channel();
91
92        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
93            msg,
94            handler,
95            sender,
96        };
97        self.0
98            .circ
99            .control
100            .unbounded_send(ctrl_msg)
101            .map_err(|_| Error::CircuitClosed)?;
102
103        receiver.await.map_err(|_| Error::CircuitClosed)?
104    }
105}
106
107/// A low-level client tunnel API.
108///
109/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
110///
111/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
112/// desired subset of functionality depending on purpose and path size.
113///
114/// Some API calls are for single path and some for multi path. A check with the underlying reactor
115/// is done preventing for instance multi path calls to be used on a single path. Top level types
116/// should prevent this and thus this object should never be used directly.
117#[derive(Debug)]
118#[cfg_attr(
119    feature = "rpc",
120    derive(derive_deftly::Deftly),
121    derive_deftly(tor_rpcbase::templates::Object)
122)]
123#[allow(dead_code)] // TODO(conflux)
124pub struct ClientTunnel {
125    /// The underlying handle to the reactor.
126    circ: ClientCirc,
127}
128
129impl ClientTunnel {
130    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
131    /// circuit tunnel.
132    ///
133    /// Returns an error if the tunnel has more than one circuit.
134    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
135        if self.circ.is_multi_path {
136            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
137        }
138        Ok(&self.circ)
139    }
140
141    /// Return the channel target of the first hop.
142    ///
143    /// Can only be used for single path tunnel.
144    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
145        self.as_single_circ()?.first_hop()
146    }
147
148    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
149    /// receiving or sending.
150    pub fn is_closed(&self) -> bool {
151        self.circ.is_closing()
152    }
153
154    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
155    /// HopLocation with its id and hop number.
156    ///
157    /// Return an error if there is no last hop.
158    pub fn last_hop(&self) -> Result<TargetHop> {
159        let uniq_id = self.unique_id();
160        let hop_num = self
161            .circ
162            .mutable
163            .last_hop_num(uniq_id)?
164            .ok_or_else(|| bad_api_usage!("no last hop"))?;
165        Ok((uniq_id, hop_num).into())
166    }
167
168    /// Return a description of the last hop of the tunnel.
169    ///
170    /// Return None if the last hop is virtual; return an error
171    /// if the tunnel has no circuits, or all of its circuits are zero length.
172    ///
173    ///
174    /// # Panics
175    ///
176    /// Panics if there is no last hop.  (This should be impossible outside of
177    /// the tor-proto crate, but within the crate it's possible to have a
178    /// circuit with no hops.)
179    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
180        self.circ.last_hop_info()
181    }
182
183    /// Return the number of hops this tunnel as. Fail for a multi path.
184    pub fn n_hops(&self) -> Result<usize> {
185        self.as_single_circ()?.n_hops()
186    }
187
188    /// Return the [`Path`] objects describing all the hops
189    /// of all the circuits in this tunnel.
190    pub fn all_paths(&self) -> Vec<Arc<Path>> {
191        self.circ.all_paths()
192    }
193
194    /// Return a representation of the Paths for all the circuits in this tunnel,
195    /// as a map from each circuits' UniqId to its path.
196    ///
197    /// This is only exposed for the RPC subsystem, where it is documented that the
198    /// format of `UniqId` is not stable.
199    #[cfg(feature = "rpc")]
200    pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
201        self.circ.mutable.tagged_paths()
202    }
203
204    /// Return a process-unique identifier for this tunnel.
205    ///
206    /// Returns the reactor unique ID of the main reactor.
207    pub fn unique_id(&self) -> UniqId {
208        self.circ.unique_id()
209    }
210
211    /// Return the time at which this tunnel last had any open streams.
212    ///
213    /// Returns `None` if this tunnel has never had any open streams,
214    /// or if it currently has open streams.
215    ///
216    /// NOTE that the Instant returned by this method is not affected by
217    /// any runtime mocking; it is the output of an ordinary call to
218    /// `Instant::now()`.
219    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
220        self.circ.disused_since().await
221    }
222
223    /// Return a future that will resolve once the underlying circuit reactor has closed.
224    ///
225    /// Note that this method does not itself cause the tunnel to shut down.
226    pub fn wait_for_close(
227        self: &Arc<Self>,
228    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
229        self.circ.wait_for_close()
230    }
231
232    /// Single-path tunnel only. Multi path onion service is not supported yet.
233    ///
234    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
235    /// to create new Tor streams, and to return those pending requests in an
236    /// asynchronous stream.
237    ///
238    /// Ordinarily, these requests are rejected.
239    ///
240    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
241    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
242    /// an error.
243    ///
244    /// After this method has been called on a tunnel, the tunnel is expected
245    /// to receive requests of this type indefinitely, until it is finally closed.
246    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
247    ///
248    /// Only onion services (and eventually) exit relays should call this
249    /// method.
250    //
251    // TODO: Someday, we might want to allow a stream request handler to be
252    // un-registered.  However, nothing in the Tor protocol requires it.
253    //
254    // Any incoming request handlers installed on the other circuits
255    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
256    // will be discarded (along with the reactor of that circuit)
257    #[cfg(feature = "hs-service")]
258    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
259    pub async fn allow_stream_requests<'a, FILT>(
260        self: &Arc<Self>,
261        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
262        hop: TargetHop,
263        filter: FILT,
264    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
265    where
266        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
267    {
268        use futures::stream::StreamExt;
269
270        /// The size of the channel receiving IncomingStreamRequestContexts.
271        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
272
273        // TODO(#2002): support onion service conflux
274        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
275            "Cannot allow stream requests on a multi-path tunnel"
276        ))?;
277
278        let time_prov = circ.time_provider.clone();
279        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
280        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
281            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
282        let (tx, rx) = oneshot::channel();
283
284        circ.command
285            .unbounded_send(CtrlCmd::AwaitStreamRequest {
286                cmd_checker,
287                incoming_sender,
288                hop,
289                done: tx,
290                filter: Box::new(filter),
291            })
292            .map_err(|_| Error::CircuitClosed)?;
293
294        // Check whether the AwaitStreamRequest was processed successfully.
295        rx.await.map_err(|_| Error::CircuitClosed)??;
296
297        let allowed_hop_loc: HopLocation = match hop {
298            TargetHop::Hop(loc) => Some(loc),
299            _ => None,
300        }
301        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
302
303        let tunnel = self.clone();
304        Ok(incoming_receiver.map(move |req_ctx| {
305            let StreamReqInfo {
306                req,
307                stream_id,
308                hop,
309                receiver,
310                msg_tx,
311                rate_limit_stream,
312                drain_rate_request_stream,
313                memquota,
314                relay_cell_format,
315            } = req_ctx;
316
317            // We already enforce this in handle_incoming_stream_request; this
318            // assertion is just here to make sure that we don't ever
319            // accidentally remove or fail to enforce that check, since it is
320            // security-critical.
321            assert_eq!(Some(allowed_hop_loc), hop);
322
323            // TODO(#2002): figure out what this is going to look like
324            // for onion services (perhaps we should forbid this function
325            // from being called on a multipath circuit?)
326            //
327            // See also:
328            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
329            let target = StreamTarget {
330                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
331                tx: msg_tx,
332                hop: Some(allowed_hop_loc),
333                stream_id,
334                relay_cell_format,
335                rate_limit_stream,
336            };
337
338            // can be used to build a reader that supports XON/XOFF flow control
339            let xon_xoff_reader_ctrl =
340                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
341
342            let reader = StreamReceiver {
343                target: target.clone(),
344                receiver,
345                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
346                ended: false,
347            };
348
349            let components = StreamComponents {
350                stream_receiver: reader,
351                target,
352                memquota,
353                xon_xoff_reader_ctrl,
354            };
355
356            IncomingStream::new(time_prov.clone(), req, components)
357        }))
358    }
359
360    /// Single and Multi path helper, used to begin a stream.
361    ///
362    /// This function allocates a stream ID, and sends the message
363    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
364    ///
365    /// The caller will typically want to see the first cell in response,
366    /// to see whether it is e.g. an END or a CONNECTED.
367    async fn begin_stream_impl(
368        self: &Arc<Self>,
369        begin_msg: AnyRelayMsg,
370        cmd_checker: AnyCmdChecker,
371    ) -> Result<StreamComponents> {
372        // TODO: Possibly this should take a hop, rather than just
373        // assuming it's the last hop.
374        let hop = TargetHop::LastHop;
375
376        let time_prov = self.circ.time_provider.clone();
377
378        let memquota = StreamAccount::new(self.circ.mq_account())?;
379        let (sender, receiver) = stream_queue(
380            #[cfg(not(feature = "flowctl-cc"))]
381            STREAM_READER_BUFFER,
382            &memquota,
383            &time_prov,
384        )?;
385        let (tx, rx) = oneshot::channel();
386        let (msg_tx, msg_rx) =
387            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
388
389        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
390
391        // A channel for the reactor to request a new drain rate from the reader.
392        // Typically this notification will be sent after an XOFF is sent so that the reader can
393        // send us a new drain rate when the stream data queue becomes empty.
394        let mut drain_rate_request_tx = NotifySender::new_typed();
395        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
396
397        self.circ
398            .control
399            .unbounded_send(CtrlMsg::BeginStream {
400                hop,
401                message: begin_msg,
402                sender,
403                rx: msg_rx,
404                rate_limit_notifier: rate_limit_tx,
405                drain_rate_requester: drain_rate_request_tx,
406                done: tx,
407                cmd_checker,
408            })
409            .map_err(|_| Error::CircuitClosed)?;
410
411        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
412
413        let target = StreamTarget {
414            tunnel: Tunnel::Client(self.clone()),
415            tx: msg_tx,
416            hop: Some(hop),
417            stream_id,
418            relay_cell_format,
419            rate_limit_stream: rate_limit_rx,
420        };
421
422        // can be used to build a reader that supports XON/XOFF flow control
423        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
424
425        let stream_receiver = StreamReceiver {
426            target: target.clone(),
427            receiver,
428            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
429            ended: false,
430        };
431
432        let components = StreamComponents {
433            stream_receiver,
434            target,
435            memquota,
436            xon_xoff_reader_ctrl,
437        };
438
439        Ok(components)
440    }
441
442    /// Install a [`CircuitPadder`] at the listed `hop`.
443    ///
444    /// Replaces any previous padder installed at that hop.
445    #[cfg(feature = "circ-padding-manual")]
446    pub async fn start_padding_at_hop(
447        self: &Arc<Self>,
448        hop: HopLocation,
449        padder: CircuitPadder,
450    ) -> Result<()> {
451        self.circ.set_padder_impl(hop, Some(padder)).await
452    }
453
454    /// Remove any [`CircuitPadder`] at the listed `hop`.
455    ///
456    /// Does nothing if there was not a padder installed there.
457    #[cfg(feature = "circ-padding-manual")]
458    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
459        self.circ.set_padder_impl(hop, None).await
460    }
461
462    /// Start a DataStream (anonymized connection) to the given
463    /// address and port, using a BEGIN cell.
464    async fn begin_data_stream(
465        self: &Arc<Self>,
466        msg: AnyRelayMsg,
467        optimistic: bool,
468    ) -> Result<DataStream> {
469        let components = self
470            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
471            .await?;
472
473        let StreamComponents {
474            stream_receiver,
475            target,
476            memquota,
477            xon_xoff_reader_ctrl,
478        } = components;
479
480        let mut stream = DataStream::new(
481            self.circ.time_provider.clone(),
482            stream_receiver,
483            xon_xoff_reader_ctrl,
484            target,
485            memquota,
486        );
487        if !optimistic {
488            stream.wait_for_connection().await?;
489        }
490        Ok(stream)
491    }
492
493    /// Single and multi path helper.
494    ///
495    /// Start a stream to the given address and port, using a BEGIN
496    /// cell.
497    ///
498    /// The use of a string for the address is intentional: you should let
499    /// the remote Tor relay do the hostname lookup for you.
500    #[instrument(level = "trace", skip_all)]
501    pub async fn begin_stream(
502        self: &Arc<Self>,
503        target: &str,
504        port: u16,
505        parameters: Option<StreamParameters>,
506    ) -> Result<DataStream> {
507        let parameters = parameters.unwrap_or_default();
508        let begin_flags = parameters.begin_flags();
509        let optimistic = parameters.is_optimistic();
510        let target = if parameters.suppressing_hostname() {
511            ""
512        } else {
513            target
514        };
515        let beginmsg = Begin::new(target, port, begin_flags)
516            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
517        self.begin_data_stream(beginmsg.into(), optimistic).await
518    }
519
520    /// Start a new stream to the last relay in the tunnel, using
521    /// a BEGIN_DIR cell.
522    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
523        // Note that we always open begindir connections optimistically.
524        // Since they are local to a relay that we've already authenticated
525        // with and built a tunnel to, there should be no additional checks
526        // we need to perform to see whether the BEGINDIR will succeed.
527        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
528            .await
529    }
530
531    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
532    /// in this tunnel.
533    ///
534    /// Note that this function does not check for timeouts; that's
535    /// the caller's responsibility.
536    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
537        let resolve_msg = Resolve::new(hostname);
538
539        let resolved_msg = self.try_resolve(resolve_msg).await?;
540
541        resolved_msg
542            .into_answers()
543            .into_iter()
544            .filter_map(|(val, _)| match resolvedval_to_result(val) {
545                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
546                Ok(_) => None,
547                Err(e) => Some(Err(e)),
548            })
549            .collect()
550    }
551
552    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
553    /// the last relay on this tunnel.
554    ///
555    /// Note that this function does not check for timeouts; that's
556    /// the caller's responsibility.
557    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
558        let resolve_ptr_msg = Resolve::new_reverse(&addr);
559
560        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
561
562        resolved_msg
563            .into_answers()
564            .into_iter()
565            .filter_map(|(val, _)| match resolvedval_to_result(val) {
566                Ok(ResolvedVal::Hostname(v)) => Some(
567                    String::from_utf8(v)
568                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
569                ),
570                Ok(_) => None,
571                Err(e) => Some(Err(e)),
572            })
573            .collect()
574    }
575
576    /// Send an ad-hoc message to a given hop on the circuit, without expecting
577    /// a reply.
578    ///
579    /// (If you want to handle one or more possible replies, see
580    /// [`ClientTunnel::start_conversation`].)
581    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
582    #[cfg(feature = "send-control-msg")]
583    pub async fn send_raw_msg(
584        &self,
585        msg: tor_cell::relaycell::msg::AnyRelayMsg,
586        hop: TargetHop,
587    ) -> Result<()> {
588        let (sender, receiver) = oneshot::channel();
589        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
590        self.circ
591            .control
592            .unbounded_send(ctrl_msg)
593            .map_err(|_| Error::CircuitClosed)?;
594
595        receiver.await.map_err(|_| Error::CircuitClosed)?
596    }
597
598    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
599    ///
600    /// To use this:
601    ///
602    ///  0. Create an inter-task channel you'll use to receive
603    ///     the outcome of your conversation,
604    ///     and bundle it into a [`UserMsgHandler`].
605    ///
606    ///  1. Call `start_conversation`.
607    ///     This will install a your handler, for incoming messages,
608    ///     and send the outgoing message (if you provided one).
609    ///     After that, each message on the circuit
610    ///     that isn't handled by the core machinery
611    ///     is passed to your provided `reply_handler`.
612    ///
613    ///  2. Possibly call `send_msg` on the [`Conversation`],
614    ///     from the call site of `start_conversation`,
615    ///     possibly multiple times, from time to time,
616    ///     to send further desired messages to the peer.
617    ///
618    ///  3. In your [`UserMsgHandler`], process the incoming messages.
619    ///     You may respond by
620    ///     sending additional messages
621    ///     When the protocol exchange is finished,
622    ///     `UserMsgHandler::handle_msg` should return
623    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
624    ///
625    /// If you don't need the `Conversation` to send followup messages,
626    /// you may simply drop it,
627    /// and rely on the responses you get from your handler,
628    /// on the channel from step 0 above.
629    /// Your handler will remain installed and able to process incoming messages
630    /// until it returns `ConversationFinished`.
631    ///
632    /// (If you don't want to accept any replies at all, it may be
633    /// simpler to use [`ClientTunnel::send_raw_msg`].)
634    ///
635    /// Note that it is quite possible to use this function to violate the tor
636    /// protocol; most users of this API will not need to call it.  It is used
637    /// to implement most of the onion service handshake.
638    ///
639    /// # Limitations
640    ///
641    /// Only one conversation may be active at any one time,
642    /// for any one circuit.
643    /// This generally means that this function should not be called
644    /// on a tunnel which might be shared with anyone else.
645    ///
646    /// Likewise, it is forbidden to try to extend the tunnel,
647    /// while the conversation is in progress.
648    ///
649    /// After the conversation has finished, the tunnel may be extended.
650    /// Or, `start_conversation` may be called again;
651    /// but, in that case there will be a gap between the two conversations,
652    /// during which no `UserMsgHandler` is installed,
653    /// and unexpected incoming messages would close the tunnel.
654    ///
655    /// If these restrictions are violated, the tunnel will be closed with an error.
656    ///
657    /// ## Precise definition of the lifetime of a conversation
658    ///
659    /// A conversation is in progress from entry to `start_conversation`,
660    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
661    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
662    /// (*Entry* since `handle_msg` is synchronously embedded
663    /// into the incoming message processing.)
664    /// So you may start a new conversation as soon as you have the final response
665    /// via your inter-task channel from (0) above.
666    ///
667    /// The lifetime relationship of the [`Conversation`],
668    /// vs the handler returning `ConversationFinished`
669    /// is not enforced by the type system.
670    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
671    // at least while allowing sending followup messages from outside the handler.
672    #[cfg(feature = "send-control-msg")]
673    pub async fn start_conversation(
674        &self,
675        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
676        reply_handler: impl MsgHandler + Send + 'static,
677        hop: TargetHop,
678    ) -> Result<Conversation<'_>> {
679        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
680        // the right Leg/Hop with inbound cell.
681        let (sender, receiver) = oneshot::channel();
682        self.circ
683            .command
684            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
685            .map_err(|_| Error::CircuitClosed)?;
686        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
687        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
688        let conversation = Conversation(self);
689        conversation.send_internal(msg, Some(handler)).await?;
690        Ok(conversation)
691    }
692
693    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
694    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
695    /// returns!).
696    ///
697    /// Note that other references to this tunnel may exist. If they do, they will stop working
698    /// after you call this function.
699    ///
700    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
701    /// close on its own once nothing is using it any more.
702    // TODO(conflux): This should use the ReactorHandle instead.
703    pub fn terminate(&self) {
704        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
705    }
706
707    /// Helper: Send the resolve message, and read resolved message from
708    /// resolve stream.
709    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
710        let components = self
711            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
712            .await?;
713
714        let StreamComponents {
715            stream_receiver,
716            target: _,
717            memquota,
718            xon_xoff_reader_ctrl: _,
719        } = components;
720
721        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
722        resolve_stream.read_msg().await
723    }
724
725    // TODO(conflux)
726}
727
728// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
729// has the expected (non-zero) number of hops.
730impl TryFrom<ClientCirc> for ClientTunnel {
731    type Error = Error;
732
733    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
734        Ok(Self { circ })
735    }
736}
737
738/// Convert a [`ResolvedVal`] into a Result, based on whether or not
739/// it represents an error.
740fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
741    match val {
742        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
743        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
744        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
745        _ => Ok(val),
746    }
747}
748
749/// A precise position in a tunnel.
750#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
751#[derive_deftly(HasMemoryCost)]
752#[non_exhaustive]
753pub enum HopLocation {
754    /// A specific position in a tunnel.
755    Hop((UniqId, HopNum)),
756    /// The join point of a multi-path tunnel.
757    JoinPoint,
758}
759
760/// A position in a tunnel.
761#[derive(Debug, Copy, Clone, PartialEq, Eq)]
762#[non_exhaustive]
763pub enum TargetHop {
764    /// A specific position in a tunnel.
765    Hop(HopLocation),
766    /// The last hop of a tunnel.
767    ///
768    /// This should be used only when you don't care about what specific hop is used.
769    /// Some tunnels may be extended or truncated,
770    /// which means that the "last hop" may change at any time.
771    LastHop,
772}
773
774impl From<(UniqId, HopNum)> for HopLocation {
775    fn from(v: (UniqId, HopNum)) -> Self {
776        HopLocation::Hop(v)
777    }
778}
779
780impl From<(UniqId, HopNum)> for TargetHop {
781    fn from(v: (UniqId, HopNum)) -> Self {
782        TargetHop::Hop(v.into())
783    }
784}
785
786impl HopLocation {
787    /// Return the hop number if not a JointPoint.
788    pub fn hop_num(&self) -> Option<HopNum> {
789        match self {
790            Self::Hop((_, hop_num)) => Some(*hop_num),
791            Self::JoinPoint => None,
792        }
793    }
794}
795
796impl ClientTunnel {
797    /// Close the pending stream that owns this StreamTarget, delivering the specified
798    /// END message (if any)
799    ///
800    /// See [`StreamTarget::close_pending`].
801    #[cfg(feature = "hs-service")]
802    pub(crate) fn close_pending(
803        &self,
804        stream_id: StreamId,
805        hop: Option<HopLocation>,
806        message: crate::stream::CloseStreamBehavior,
807    ) -> Result<oneshot::Receiver<Result<()>>> {
808        let (tx, rx) = oneshot::channel();
809
810        self.circ
811            .control
812            .unbounded_send(CtrlMsg::ClosePendingStream {
813                stream_id,
814                hop: hop.expect("missing stream hop for client tunnel"),
815                message,
816                done: tx,
817            })
818            .map_err(|_| Error::CircuitClosed)?;
819
820        Ok(rx)
821    }
822
823    /// Request to send a SENDME cell for this stream.
824    ///
825    /// See [`StreamTarget::send_sendme`].
826    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
827        self.circ
828            .control
829            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
830                msg: FlowCtrlMsg::Sendme,
831                stream_id,
832                hop: hop.expect("missing stream hop for client tunnel"),
833            })
834            .map_err(|_| Error::CircuitClosed)
835    }
836
837    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
838    ///
839    /// See [`StreamTarget::drain_rate_update`].
840    pub(crate) fn drain_rate_update(
841        &self,
842        stream_id: StreamId,
843        hop: Option<HopLocation>,
844        rate: XonKbpsEwma,
845    ) -> Result<()> {
846        self.circ
847            .control
848            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
849                msg: FlowCtrlMsg::Xon(rate),
850                stream_id,
851                hop: hop.expect("missing stream hop for client tunnel"),
852            })
853            .map_err(|_| Error::CircuitClosed)
854    }
855}