Skip to main content

tor_proto/client/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_tunnel] method.  This yields
14//! a [PendingClientTunnel] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientTunnel::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the tunnel to
19//! build it into a multi-hop tunnel.  Finally, you can use
20//! [ClientTunnel::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65    CircId,
66    msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72use web_time_compat::Instant;
73
74use crate::circuit::UniqId;
75
76use super::{ClientTunnel, TargetHop};
77
78use futures::channel::mpsc;
79use oneshot_fused_workaround as oneshot;
80
81use futures::FutureExt as _;
82use std::collections::HashMap;
83use std::sync::{Arc, Mutex};
84use tor_memquota::derive_deftly_template_HasMemoryCost;
85
86use crate::crypto::handshake::ntor::NtorPublicKey;
87
88#[cfg(test)]
89use crate::stream::{StreamMpscReceiver, StreamMpscSender};
90
91pub use crate::crypto::binding::CircuitBinding;
92pub use path::{Path, PathEntry};
93
94/// The size of the buffer for communication between `ClientCirc` and its reactor.
95pub const CIRCUIT_BUFFER_SIZE: usize = 128;
96
97// TODO: export this from the top-level instead (it's not client-specific).
98pub use crate::circuit::CircParameters;
99
100// TODO(relay): reexport this from somewhere else (it's not client-specific)
101pub use crate::util::timeout::TimeoutEstimator;
102
103/// A subclass of ChanMsg that can correctly arrive on a live client
104/// circuit (one where a CREATED* has been received).
105#[derive(Debug, Deftly)]
106#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
107#[derive_deftly(HasMemoryCost)]
108#[derive_deftly(RestrictedChanMsgSet)]
109#[deftly(usage = "on an open client circuit")]
110pub(super) enum ClientCircChanMsg {
111    /// A relay cell telling us some kind of remote command from some
112    /// party on the circuit.
113    Relay(chanmsg::Relay),
114    /// A cell telling us to destroy the circuit.
115    Destroy(chanmsg::Destroy),
116    // Note: RelayEarly is not valid for clients!
117}
118
119#[derive(Debug)]
120/// A circuit that we have constructed over the Tor network.
121///
122/// # Circuit life cycle
123///
124/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_tunnel`],
125/// which returns a [`PendingClientTunnel`].  To get a real (one-hop) tunnel from
126/// one of these, you invoke one of its `create_firsthop` methods (typically
127/// [`create_firsthop_fast()`](PendingClientTunnel::create_firsthop_fast) or
128/// [`create_firsthop()`](PendingClientTunnel::create_firsthop)).
129/// Then, to add more hops to the circuit, you can call
130/// [`extend()`](ClientCirc::extend) on it.
131///
132/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
133/// `tor-proto` are probably not what you need.
134///
135/// After a circuit is created, it will persist until it is closed in one of
136/// five ways:
137///    1. A remote error occurs.
138///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
139///       circuit.
140///    3. The circuit's channel is closed.
141///    4. Someone calls [`ClientTunnel::terminate`] on the tunnel owning the circuit.
142///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
143///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
144///       circuit from closing until all those streams have gone away.)
145///
146/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
147/// will just be unusable for most purposes.  Most operations on it will fail
148/// with an error.
149//
150// Effectively, this struct contains two Arcs: one for `path` and one for
151// `control` (which surely has something Arc-like in it).  We cannot unify
152// these by putting a single Arc around the whole struct, and passing
153// an Arc strong reference to the `Reactor`, because then `control` would
154// not be dropped when the last user of the circuit goes away.  We could
155// make the reactor have a weak reference but weak references are more
156// expensive to dereference.
157//
158// Because of the above, cloning this struct is always going to involve
159// two atomic refcount changes/checks.  Wrapping it in another Arc would
160// be overkill.
161//
162pub struct ClientCirc {
163    /// Mutable state shared with the `Reactor`.
164    pub(super) mutable: Arc<TunnelMutableState>,
165    /// A unique identifier for this circuit.
166    unique_id: UniqId,
167    /// Channel to send control messages to the reactor.
168    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
169    /// Channel to send commands to the reactor.
170    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
171    /// A future that resolves to Cancelled once the reactor is shut down,
172    /// meaning that the circuit is closed.
173    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
174    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
175    /// For testing purposes: the CircId, for use in peek_circid().
176    #[cfg(test)]
177    circid: CircId,
178    /// Memory quota account
179    pub(super) memquota: CircuitAccount,
180    /// Time provider
181    pub(super) time_provider: DynTimeProvider,
182    /// Indicate if this reactor is a multi path or not. This is flagged at the very first
183    /// LinkCircuit seen and never changed after.
184    ///
185    /// We can't just look at the number of legs because a multi path tunnel could have 1 leg only
186    /// because the other(s) have collapsed.
187    ///
188    /// This is very important because it allows to make a quick efficient safety check by the
189    /// circmgr higher level tunnel type without locking the mutable state or using the command
190    /// channel.
191    pub(super) is_multi_path: bool,
192}
193
194/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
195///
196/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
197/// but it is currently the best option we have for sharing
198/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
199/// In practice, these mutexes won't be accessed very often
200/// (they're accessed for writing when a circuit is extended,
201/// and for reading by the various `ClientCirc` APIs),
202/// so they shouldn't really impact performance.
203///
204/// Alternatively, the circuit state information could be shared
205/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
206/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
207/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
208/// asynchronous, which will significantly complicate their callsites,
209/// which would in turn need to be made async too.
210///
211/// We should revisit this decision at some point, and decide whether an async API
212/// would be preferable.
213#[derive(Debug, Default)]
214pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
215
216impl TunnelMutableState {
217    /// Add the [`MutableState`] of a circuit.
218    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
219        #[allow(unused)] // unused in non-debug builds
220        let state = self
221            .0
222            .lock()
223            .expect("lock poisoned")
224            .insert(unique_id, mutable);
225
226        debug_assert!(state.is_none());
227    }
228
229    /// Remove the [`MutableState`] of a circuit.
230    pub(super) fn remove(&self, unique_id: UniqId) {
231        #[allow(unused)] // unused in non-debug builds
232        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
233
234        debug_assert!(state.is_some());
235    }
236
237    /// Return a [`Path`] object describing all the circuits in this tunnel.
238    fn all_paths(&self) -> Vec<Arc<Path>> {
239        let lock = self.0.lock().expect("lock poisoned");
240        lock.values().map(|mutable| mutable.path()).collect()
241    }
242
243    /// Return a representation of the Paths for all the circuits in this tunnel,
244    /// as a map from each circuits' UniqId to its path.
245    ///
246    /// This is only exposed for the RPC subsystem, where it is documented that the
247    /// format of `UniqId` is not stable.
248    #[cfg(feature = "rpc")]
249    pub(super) fn tagged_paths(&self) -> HashMap<UniqId, Arc<Path>> {
250        let lock = self.0.lock().expect("lock poisoned");
251        lock.iter()
252            .map(|(id, mutable)| (*id, mutable.path()))
253            .collect()
254    }
255
256    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
257    ///
258    /// Returns an error if the tunnel has more than one tunnel.
259    //
260    // TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
261    //
262    // See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
263    #[allow(unstable_name_collisions)]
264    fn single_path(&self) -> Result<Arc<Path>> {
265        use itertools::Itertools as _;
266
267        self.all_paths().into_iter().exactly_one().map_err(|_| {
268            bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
269        })
270    }
271
272    /// Return a description of the first hop of this circuit.
273    ///
274    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
275    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
276    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
277        let lock = self.0.lock().expect("lock poisoned");
278        let mutable = lock
279            .get(&unique_id)
280            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
281
282        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
283            path::HopDetail::Relay(r) => r,
284            #[cfg(feature = "hs-common")]
285            path::HopDetail::Virtual => {
286                panic!("somehow made a circuit with a virtual first hop.")
287            }
288        });
289
290        Ok(first_hop)
291    }
292
293    /// Return the [`HopNum`] of the last hop of the specified circuit.
294    ///
295    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
296    ///
297    /// See [`MutableState::last_hop_num`].
298    pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
299        let lock = self.0.lock().expect("lock poisoned");
300        let mutable = lock
301            .get(&unique_id)
302            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
303
304        Ok(mutable.last_hop_num())
305    }
306
307    /// Return the number of hops in the specified circuit.
308    ///
309    /// See [`MutableState::n_hops`].
310    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
311        let lock = self.0.lock().expect("lock poisoned");
312        let mutable = lock
313            .get(&unique_id)
314            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
315
316        Ok(mutable.n_hops())
317    }
318}
319
320/// The mutable state of a circuit.
321#[derive(Educe, Default)]
322#[educe(Debug)]
323pub(super) struct MutableState(Mutex<CircuitState>);
324
325impl MutableState {
326    /// Add a hop to the path of this circuit.
327    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
328        let mut mutable = self.0.lock().expect("poisoned lock");
329        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
330        mutable.binding.push(binding);
331    }
332
333    /// Get a copy of the circuit's current [`path::Path`].
334    pub(super) fn path(&self) -> Arc<path::Path> {
335        let mutable = self.0.lock().expect("poisoned lock");
336        Arc::clone(&mutable.path)
337    }
338
339    /// Return the cryptographic material used to prove knowledge of a shared
340    /// secret with with `hop`.
341    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
342        let mutable = self.0.lock().expect("poisoned lock");
343
344        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
345        // NOTE: I'm not thrilled to have to copy this information, but we use
346        // it very rarely, so it's not _that_ bad IMO.
347    }
348
349    /// Return a description of the first hop of this circuit.
350    fn first_hop(&self) -> Option<HopDetail> {
351        let mutable = self.0.lock().expect("poisoned lock");
352        mutable.path.first_hop()
353    }
354
355    /// Return the [`HopNum`] of the last hop of this circuit.
356    ///
357    /// NOTE: This function will return the [`HopNum`] of the hop
358    /// that is _currently_ the last. If there is an extend operation in progress,
359    /// the currently pending hop may or may not be counted, depending on whether
360    /// the extend operation finishes before this call is done.
361    fn last_hop_num(&self) -> Option<HopNum> {
362        let mutable = self.0.lock().expect("poisoned lock");
363        mutable.path.last_hop_num()
364    }
365
366    /// Return the number of hops in this circuit.
367    ///
368    /// NOTE: This function will currently return only the number of hops
369    /// _currently_ in the circuit. If there is an extend operation in progress,
370    /// the currently pending hop may or may not be counted, depending on whether
371    /// the extend operation finishes before this call is done.
372    fn n_hops(&self) -> usize {
373        let mutable = self.0.lock().expect("poisoned lock");
374        mutable.path.n_hops()
375    }
376}
377
378/// The shared state of a circuit.
379#[derive(Educe, Default)]
380#[educe(Debug)]
381pub(super) struct CircuitState {
382    /// Information about this circuit's path.
383    ///
384    /// This is stored in an Arc so that we can cheaply give a copy of it to
385    /// client code; when we need to add a hop (which is less frequent) we use
386    /// [`Arc::make_mut()`].
387    path: Arc<path::Path>,
388
389    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
390    /// in the circuit's path.
391    ///
392    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
393    /// fair chance that this will change in the future, and I don't want other
394    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
395    /// an `Option`.
396    #[educe(Debug(ignore))]
397    binding: Vec<Option<CircuitBinding>>,
398}
399
400/// A ClientCirc that needs to send a create cell and receive a created* cell.
401///
402/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
403/// to negotiate the cryptographic handshake with the first hop.
404pub struct PendingClientTunnel {
405    /// A oneshot receiver on which we'll receive a CREATED* cell,
406    /// or a DESTROY cell.
407    recvcreated: oneshot::Receiver<CreateResponse>,
408    /// The ClientCirc object that we can expose on success.
409    circ: ClientCirc,
410}
411
412impl ClientCirc {
413    /// Convert this `ClientCirc` into a single circuit [`ClientTunnel`].
414    pub fn into_tunnel(self) -> Result<ClientTunnel> {
415        self.try_into()
416    }
417
418    /// Return a description of the first hop of this circuit.
419    ///
420    /// # Panics
421    ///
422    /// Panics if there is no first hop.  (This should be impossible outside of
423    /// the tor-proto crate, but within the crate it's possible to have a
424    /// circuit with no hops.)
425    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
426        Ok(self
427            .mutable
428            .first_hop(self.unique_id)
429            .map_err(|_| Error::CircuitClosed)?
430            .expect("called first_hop on an un-constructed circuit"))
431    }
432
433    /// Return a description of the last hop of the tunnel.
434    ///
435    /// Return None if the last hop is virtual.
436    ///
437    /// # Panics
438    ///
439    /// Panics if there is no last hop.  (This should be impossible outside of
440    /// the tor-proto crate, but within the crate it's possible to have a
441    /// circuit with no hops.)
442    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
443        let all_paths = self.all_paths();
444        let path = all_paths.first().ok_or_else(|| {
445            tor_error::bad_api_usage!("Called last_hop_info on an un-constructed tunnel")
446        })?;
447        Ok(path
448            .hops()
449            .last()
450            .expect("Called last_hop on an un-constructed circuit")
451            .as_chan_target()
452            .map(OwnedChanTarget::from_chan_target))
453    }
454
455    /// Return the [`HopNum`] of the last hop of this circuit.
456    ///
457    /// Returns an error if there is no last hop.  (This should be impossible outside of the
458    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
459    ///
460    /// NOTE: This function will return the [`HopNum`] of the hop
461    /// that is _currently_ the last. If there is an extend operation in progress,
462    /// the currently pending hop may or may not be counted, depending on whether
463    /// the extend operation finishes before this call is done.
464    pub fn last_hop_num(&self) -> Result<HopNum> {
465        Ok(self
466            .mutable
467            .last_hop_num(self.unique_id)?
468            .ok_or_else(|| internal!("no last hop index"))?)
469    }
470
471    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
472    /// HopLocation with its id and hop number.
473    ///
474    /// Return an error if there is no last hop.
475    pub fn last_hop(&self) -> Result<TargetHop> {
476        let hop_num = self
477            .mutable
478            .last_hop_num(self.unique_id)?
479            .ok_or_else(|| bad_api_usage!("no last hop"))?;
480        Ok((self.unique_id, hop_num).into())
481    }
482
483    /// Return a list of [`Path`] objects describing all the circuits in this tunnel.
484    ///
485    /// Note that these `Path`s are not automatically updated if the underlying
486    /// circuits are extended.
487    pub fn all_paths(&self) -> Vec<Arc<Path>> {
488        self.mutable.all_paths()
489    }
490
491    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
492    ///
493    /// Returns an error if the tunnel has more than one tunnel.
494    pub fn single_path(&self) -> Result<Arc<Path>> {
495        self.mutable.single_path()
496    }
497
498    /// Return the time at which this circuit last had any open streams.
499    ///
500    /// Returns `None` if this circuit has never had any open streams,
501    /// or if it currently has open streams.
502    ///
503    /// NOTE that the Instant returned by this method is not affected by
504    /// any runtime mocking; it is the output of an ordinary call to
505    /// `Instant::get()`.
506    pub async fn disused_since(&self) -> Result<Option<Instant>> {
507        let (tx, rx) = oneshot::channel();
508        self.command
509            .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
510            .map_err(|_| Error::CircuitClosed)?;
511
512        Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
513    }
514
515    /// Get the clock skew claimed by the first hop of the circuit.
516    ///
517    /// See [`Channel::clock_skew()`].
518    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
519        let (tx, rx) = oneshot::channel();
520
521        self.control
522            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
523            .map_err(|_| Error::CircuitClosed)?;
524
525        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
526    }
527
528    /// Return a reference to this circuit's memory quota account
529    pub fn mq_account(&self) -> &CircuitAccount {
530        &self.memquota
531    }
532
533    /// Return the cryptographic material used to prove knowledge of a shared
534    /// secret with with `hop`.
535    ///
536    /// See [`CircuitBinding`] for more information on how this is used.
537    ///
538    /// Return None if we have no circuit binding information for the hop, or if
539    /// the hop does not exist.
540    #[cfg(feature = "hs-service")]
541    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
542        let (sender, receiver) = oneshot::channel();
543        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
544        self.command
545            .unbounded_send(msg)
546            .map_err(|_| Error::CircuitClosed)?;
547
548        receiver.await.map_err(|_| Error::CircuitClosed)?
549    }
550
551    /// Extend the circuit, via the most appropriate circuit extension handshake,
552    /// to the chosen `target` hop.
553    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
554    where
555        Tg: CircTarget,
556    {
557        #![allow(deprecated)]
558
559        // For now we use the simplest decision-making mechanism:
560        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
561        //
562        // This behavior is slightly different from C tor, which uses ntor v3
563        // only whenever it want to send any extension in the circuit message.
564        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
565        // want to use an extension if we can, and so it doesn't make too much
566        // sense to detect the case where we have no extensions.
567        //
568        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
569        // on the tor network, and so we cannot simply assume that everybody has it.)
570        if target
571            .protovers()
572            .supports_named_subver(named::RELAY_NTORV3)
573        {
574            self.extend_ntor_v3(target, params).await
575        } else {
576            self.extend_ntor(target, params).await
577        }
578    }
579
580    /// Extend the circuit via the ntor handshake to a new target last
581    /// hop.
582    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
583    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
584    where
585        Tg: CircTarget,
586    {
587        let key = NtorPublicKey {
588            id: *target
589                .rsa_identity()
590                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
591            pk: *target.ntor_onion_key(),
592        };
593        let mut linkspecs = target
594            .linkspecs()
595            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
596        if !params.extend_by_ed25519_id {
597            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
598        }
599
600        let (tx, rx) = oneshot::channel();
601
602        let peer_id = OwnedChanTarget::from_chan_target(target);
603        let settings = HopSettings::from_params_and_caps(
604            HopNegotiationType::None,
605            &params,
606            target.protovers(),
607        )?;
608        self.control
609            .unbounded_send(CtrlMsg::ExtendNtor {
610                peer_id,
611                public_key: key,
612                linkspecs,
613                settings,
614                done: tx,
615            })
616            .map_err(|_| Error::CircuitClosed)?;
617
618        rx.await.map_err(|_| Error::CircuitClosed)??;
619
620        Ok(())
621    }
622
623    /// Extend the circuit via the ntor handshake to a new target last
624    /// hop.
625    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
626    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
627    where
628        Tg: CircTarget,
629    {
630        let key = NtorV3PublicKey {
631            id: *target
632                .ed_identity()
633                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
634            pk: *target.ntor_onion_key(),
635        };
636        let mut linkspecs = target
637            .linkspecs()
638            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
639        if !params.extend_by_ed25519_id {
640            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
641        }
642
643        let (tx, rx) = oneshot::channel();
644
645        let peer_id = OwnedChanTarget::from_chan_target(target);
646        let settings = HopSettings::from_params_and_caps(
647            HopNegotiationType::Full,
648            &params,
649            target.protovers(),
650        )?;
651        self.control
652            .unbounded_send(CtrlMsg::ExtendNtorV3 {
653                peer_id,
654                public_key: key,
655                linkspecs,
656                settings,
657                done: tx,
658            })
659            .map_err(|_| Error::CircuitClosed)?;
660
661        rx.await.map_err(|_| Error::CircuitClosed)??;
662
663        Ok(())
664    }
665
666    /// Extend this circuit by a single, "virtual" hop.
667    ///
668    /// A virtual hop is one for which we do not add an actual network connection
669    /// between separate hosts (such as Relays).  We only add a layer of
670    /// cryptography.
671    ///
672    /// This is used to implement onion services: the client and the service
673    /// both build a circuit to a single rendezvous point, and tell the
674    /// rendezvous point to relay traffic between their two circuits.  Having
675    /// completed a [`handshake`] out of band[^1], the parties each extend their
676    /// circuits by a single "virtual" encryption hop that represents their
677    /// shared cryptographic context.
678    ///
679    /// Once a circuit has been extended in this way, it is an error to try to
680    /// extend it in any other way.
681    ///
682    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
683    ///     client sends their half of the handshake in an ` message, and the
684    ///     service's response is inline in its `RENDEZVOUS2` message.
685    //
686    // TODO hs: let's try to enforce the "you can't extend a circuit again once
687    // it has been extended this way" property.  We could do that with internal
688    // state, or some kind of a type state pattern.
689    #[cfg(feature = "hs-common")]
690    pub async fn extend_virtual(
691        &self,
692        protocol: handshake::RelayProtocol,
693        role: handshake::HandshakeRole,
694        seed: impl handshake::KeyGenerator,
695        params: &CircParameters,
696        capabilities: &tor_protover::Protocols,
697    ) -> Result<()> {
698        use self::handshake::BoxedClientLayer;
699
700        // TODO CGO: Possibly refactor this match into a separate method when we revisit this.
701        let negotiation_type = match protocol {
702            handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
703        };
704        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
705
706        let BoxedClientLayer { fwd, back, binding } =
707            protocol.construct_client_layers(role, seed)?;
708
709        let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
710        let (tx, rx) = oneshot::channel();
711        let message = CtrlCmd::ExtendVirtual {
712            cell_crypto: (fwd, back, binding),
713            settings,
714            done: tx,
715        };
716
717        self.command
718            .unbounded_send(message)
719            .map_err(|_| Error::CircuitClosed)?;
720
721        rx.await.map_err(|_| Error::CircuitClosed)?
722    }
723
724    /// Install a [`CircuitPadder`] at the listed `hop`.
725    ///
726    /// Replaces any previous padder installed at that hop.
727    #[cfg(feature = "circ-padding-manual")]
728    pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
729        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
730            .await
731    }
732
733    /// Remove any [`CircuitPadder`] at the listed `hop`.
734    ///
735    /// Does nothing if there was not a padder installed there.
736    #[cfg(feature = "circ-padding-manual")]
737    pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
738        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
739            .await
740    }
741
742    /// Helper: replace the padder at `hop` with the provided `padder`, or with `None`.
743    #[cfg(feature = "circ-padding-manual")]
744    pub(super) async fn set_padder_impl(
745        &self,
746        hop: crate::HopLocation,
747        padder: Option<CircuitPadder>,
748    ) -> Result<()> {
749        let (tx, rx) = oneshot::channel();
750        let msg = CtrlCmd::SetPadder {
751            hop,
752            padder,
753            sender: tx,
754        };
755        self.command
756            .unbounded_send(msg)
757            .map_err(|_| Error::CircuitClosed)?;
758        rx.await.map_err(|_| Error::CircuitClosed)?
759    }
760
761    /// Return true if this circuit is closed and therefore unusable.
762    pub fn is_closing(&self) -> bool {
763        self.control.is_closed()
764    }
765
766    /// Return a process-unique identifier for this circuit.
767    pub fn unique_id(&self) -> UniqId {
768        self.unique_id
769    }
770
771    /// Return the number of hops in this circuit.
772    ///
773    /// NOTE: This function will currently return only the number of hops
774    /// _currently_ in the circuit. If there is an extend operation in progress,
775    /// the currently pending hop may or may not be counted, depending on whether
776    /// the extend operation finishes before this call is done.
777    pub fn n_hops(&self) -> Result<usize> {
778        self.mutable
779            .n_hops(self.unique_id)
780            .map_err(|_| Error::CircuitClosed)
781    }
782
783    /// Return a future that will resolve once this circuit has closed.
784    ///
785    /// Note that this method does not itself cause the circuit to shut down.
786    ///
787    /// TODO: Perhaps this should return some kind of status indication instead
788    /// of just ()
789    pub fn wait_for_close(
790        &self,
791    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
792        self.reactor_closed_rx.clone().map(|_| ())
793    }
794}
795
796impl PendingClientTunnel {
797    /// Instantiate a new circuit object: used from Channel::new_tunnel().
798    ///
799    /// Does not send a CREATE* cell on its own.
800    #[allow(clippy::too_many_arguments)]
801    pub(crate) fn new(
802        id: CircId,
803        channel: Arc<Channel>,
804        createdreceiver: oneshot::Receiver<CreateResponse>,
805        input: CircuitRxReceiver,
806        unique_id: UniqId,
807        runtime: DynTimeProvider,
808        memquota: CircuitAccount,
809        padding_ctrl: PaddingController,
810        padding_stream: PaddingEventStream,
811        timeouts: Arc<dyn TimeoutEstimator>,
812    ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
813        let time_provider = channel.time_provider().clone();
814        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
815            channel,
816            id,
817            unique_id,
818            input,
819            runtime,
820            memquota.clone(),
821            padding_ctrl,
822            padding_stream,
823            timeouts,
824        );
825
826        let circuit = ClientCirc {
827            mutable,
828            unique_id,
829            control: control_tx,
830            command: command_tx,
831            reactor_closed_rx: reactor_closed_rx.shared(),
832            #[cfg(test)]
833            circid: id,
834            memquota,
835            time_provider,
836            is_multi_path: false,
837        };
838
839        let pending = PendingClientTunnel {
840            recvcreated: createdreceiver,
841            circ: circuit,
842        };
843        (pending, reactor)
844    }
845
846    /// Extract the process-unique identifier for this pending circuit.
847    pub fn peek_unique_id(&self) -> UniqId {
848        self.circ.unique_id
849    }
850
851    /// Use the (questionable!) CREATE_FAST handshake to connect to the
852    /// first hop of this circuit.
853    ///
854    /// There's no authentication in CRATE_FAST,
855    /// so we don't need to know whom we're connecting to: we're just
856    /// connecting to whichever relay the channel is for.
857    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
858        // We know nothing about this relay, so we assume it supports no protocol capabilities at all.
859        //
860        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
861        // TODO prop364: When we implement CreateOneHop, we will want a Protocols argument here.
862        let protocols = tor_protover::Protocols::new();
863        let settings =
864            HopSettings::from_params_and_caps(HopNegotiationType::None, &params, &protocols)?;
865        let (tx, rx) = oneshot::channel();
866        self.circ
867            .control
868            .unbounded_send(CtrlMsg::Create {
869                recv_created: self.recvcreated,
870                handshake: CircuitHandshake::CreateFast,
871                settings,
872                done: tx,
873            })
874            .map_err(|_| Error::CircuitClosed)?;
875
876        rx.await.map_err(|_| Error::CircuitClosed)??;
877
878        self.circ.into_tunnel()
879    }
880
881    /// Use the most appropriate handshake to connect to the first hop of this circuit.
882    ///
883    /// Note that the provided 'target' must match the channel's target,
884    /// or the handshake will fail.
885    pub async fn create_firsthop<Tg>(
886        self,
887        target: &Tg,
888        params: CircParameters,
889    ) -> Result<ClientTunnel>
890    where
891        Tg: tor_linkspec::CircTarget,
892    {
893        #![allow(deprecated)]
894        // (See note in ClientCirc::extend.)
895        if target
896            .protovers()
897            .supports_named_subver(named::RELAY_NTORV3)
898        {
899            self.create_firsthop_ntor_v3(target, params).await
900        } else {
901            self.create_firsthop_ntor(target, params).await
902        }
903    }
904
905    /// Use the ntor handshake to connect to the first hop of this circuit.
906    ///
907    /// Note that the provided 'target' must match the channel's target,
908    /// or the handshake will fail.
909    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
910    pub async fn create_firsthop_ntor<Tg>(
911        self,
912        target: &Tg,
913        params: CircParameters,
914    ) -> Result<ClientTunnel>
915    where
916        Tg: tor_linkspec::CircTarget,
917    {
918        let (tx, rx) = oneshot::channel();
919        let settings = HopSettings::from_params_and_caps(
920            HopNegotiationType::None,
921            &params,
922            target.protovers(),
923        )?;
924
925        self.circ
926            .control
927            .unbounded_send(CtrlMsg::Create {
928                recv_created: self.recvcreated,
929                handshake: CircuitHandshake::Ntor {
930                    public_key: NtorPublicKey {
931                        id: *target
932                            .rsa_identity()
933                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
934                        pk: *target.ntor_onion_key(),
935                    },
936                    ed_identity: *target
937                        .ed_identity()
938                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
939                },
940                settings,
941                done: tx,
942            })
943            .map_err(|_| Error::CircuitClosed)?;
944
945        rx.await.map_err(|_| Error::CircuitClosed)??;
946
947        self.circ.into_tunnel()
948    }
949
950    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
951    ///
952    /// Assumes that the target supports ntor_v3. The caller should verify
953    /// this before calling this function, e.g. by validating that the target
954    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
955    ///
956    /// Note that the provided 'target' must match the channel's target,
957    /// or the handshake will fail.
958    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
959    pub async fn create_firsthop_ntor_v3<Tg>(
960        self,
961        target: &Tg,
962        params: CircParameters,
963    ) -> Result<ClientTunnel>
964    where
965        Tg: tor_linkspec::CircTarget,
966    {
967        let settings = HopSettings::from_params_and_caps(
968            HopNegotiationType::Full,
969            &params,
970            target.protovers(),
971        )?;
972        let (tx, rx) = oneshot::channel();
973
974        self.circ
975            .control
976            .unbounded_send(CtrlMsg::Create {
977                recv_created: self.recvcreated,
978                handshake: CircuitHandshake::NtorV3 {
979                    public_key: NtorV3PublicKey {
980                        id: *target
981                            .ed_identity()
982                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
983                        pk: *target.ntor_onion_key(),
984                    },
985                },
986                settings,
987                done: tx,
988            })
989            .map_err(|_| Error::CircuitClosed)?;
990
991        rx.await.map_err(|_| Error::CircuitClosed)??;
992
993        self.circ.into_tunnel()
994    }
995}
996
997#[cfg(test)]
998pub(crate) mod test {
999    // @@ begin test lint list maintained by maint/add_warning @@
1000    #![allow(clippy::bool_assert_comparison)]
1001    #![allow(clippy::clone_on_copy)]
1002    #![allow(clippy::dbg_macro)]
1003    #![allow(clippy::mixed_attributes_style)]
1004    #![allow(clippy::print_stderr)]
1005    #![allow(clippy::print_stdout)]
1006    #![allow(clippy::single_char_pattern)]
1007    #![allow(clippy::unwrap_used)]
1008    #![allow(clippy::unchecked_time_subtraction)]
1009    #![allow(clippy::useless_vec)]
1010    #![allow(clippy::needless_pass_by_value)]
1011    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1012
1013    use super::*;
1014    use crate::channel::test::{CodecResult, new_reactor};
1015    use crate::circuit::CircuitRxSender;
1016    use crate::circuit::reactor::test::rmsg_to_ccmsg;
1017    use crate::client::circuit::padding::new_padding;
1018    use crate::client::stream::DataStream;
1019    use crate::congestion::params::CongestionControlParams;
1020    use crate::congestion::test_utils::params::build_cc_vegas_params;
1021    use crate::crypto::cell::RelayCellBody;
1022    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1023    use crate::memquota::SpecificAccount as _;
1024    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1025    use crate::util::DummyTimeoutEstimator;
1026    use assert_matches::assert_matches;
1027    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1028    use futures::channel::mpsc::{Receiver, Sender};
1029    use futures::io::{AsyncReadExt, AsyncWriteExt};
1030    use futures::sink::SinkExt;
1031    use futures::stream::StreamExt;
1032    use hex_literal::hex;
1033    use std::collections::{HashMap, VecDeque};
1034    use std::fmt::Debug;
1035    use std::time::Duration;
1036    use tor_basic_utils::test_rng::testing_rng;
1037    use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1038    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1039    use tor_cell::relaycell::msg::SendmeTag;
1040    use tor_cell::relaycell::{
1041        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1042    };
1043    use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1044    use tor_linkspec::OwnedCircTarget;
1045    use tor_memquota::HasMemoryCost;
1046    use tor_rtcompat::Runtime;
1047    use tor_rtcompat::SpawnExt;
1048    use tracing::trace;
1049    use tracing_test::traced_test;
1050
1051    #[cfg(feature = "conflux")]
1052    use {
1053        crate::client::reactor::ConfluxHandshakeResult,
1054        crate::util::err::ConfluxHandshakeError,
1055        futures::future::FusedFuture,
1056        futures::lock::Mutex as AsyncMutex,
1057        std::pin::Pin,
1058        std::result::Result as StdResult,
1059        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1060        tor_cell::relaycell::msg::ConfluxLink,
1061        tor_rtmock::MockRuntime,
1062    };
1063
1064    #[cfg(feature = "hs-service")]
1065    use crate::circuit::reactor::test::AllowAllStreamsFilter;
1066
1067    impl PendingClientTunnel {
1068        /// Testing only: Extract the circuit ID for this pending circuit.
1069        pub(crate) fn peek_circid(&self) -> CircId {
1070            self.circ.circid
1071        }
1072    }
1073
1074    impl ClientCirc {
1075        /// Testing only: Extract the circuit ID of this circuit.
1076        pub(crate) fn peek_circid(&self) -> CircId {
1077            self.circid
1078        }
1079    }
1080
1081    impl ClientTunnel {
1082        pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1083            let (sender, receiver) = oneshot::channel();
1084            let _ =
1085                self.as_single_circ()
1086                    .unwrap()
1087                    .command
1088                    .unbounded_send(CtrlCmd::ResolveTargetHop {
1089                        hop: TargetHop::LastHop,
1090                        done: sender,
1091                    });
1092            TargetHop::Hop(receiver.await.unwrap().unwrap())
1093        }
1094    }
1095
1096    // Example relay IDs and keys
1097    const EXAMPLE_SK: [u8; 32] =
1098        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1099    const EXAMPLE_PK: [u8; 32] =
1100        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1101    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1102    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1103
1104    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1105    #[cfg(test)]
1106    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1107        buffer: usize,
1108    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1109        crate::fake_mpsc(buffer)
1110    }
1111
1112    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1113    fn example_target() -> OwnedCircTarget {
1114        let mut builder = OwnedCircTarget::builder();
1115        builder
1116            .chan_target()
1117            .ed_identity(EXAMPLE_ED_ID.into())
1118            .rsa_identity(EXAMPLE_RSA_ID.into());
1119        builder
1120            .ntor_onion_key(EXAMPLE_PK.into())
1121            .protocols("FlowCtrl=1-2".parse().unwrap())
1122            .build()
1123            .unwrap()
1124    }
1125    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1126        crate::crypto::handshake::ntor::NtorSecretKey::new(
1127            EXAMPLE_SK.into(),
1128            EXAMPLE_PK.into(),
1129            EXAMPLE_RSA_ID.into(),
1130        )
1131    }
1132    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1133        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1134            EXAMPLE_SK.into(),
1135            EXAMPLE_PK.into(),
1136            EXAMPLE_ED_ID.into(),
1137        )
1138    }
1139
1140    fn working_fake_channel<R: Runtime>(
1141        rt: &R,
1142    ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1143        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1144        rt.spawn(async {
1145            let _ignore = chan_reactor.run().await;
1146        })
1147        .unwrap();
1148        (channel, rx, tx)
1149    }
1150
1151    /// Which handshake type to use.
1152    #[derive(Copy, Clone)]
1153    enum HandshakeType {
1154        Fast,
1155        Ntor,
1156        NtorV3,
1157    }
1158
1159    #[allow(deprecated)]
1160    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1161        // We want to try progressing from a pending circuit to a circuit
1162        // via a crate_fast handshake.
1163
1164        use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1165
1166        let (chan, mut rx, _sink) = working_fake_channel(rt);
1167        let circid = CircId::new(128).unwrap();
1168        let (created_send, created_recv) = oneshot::channel();
1169        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1170        let unique_id = UniqId::new(23, 17);
1171        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1172
1173        let (pending, reactor) = PendingClientTunnel::new(
1174            circid,
1175            chan,
1176            created_recv,
1177            circmsg_recv,
1178            unique_id,
1179            DynTimeProvider::new(rt.clone()),
1180            CircuitAccount::new_noop(),
1181            padding_ctrl,
1182            padding_stream,
1183            Arc::new(DummyTimeoutEstimator),
1184        );
1185
1186        rt.spawn(async {
1187            let _ignore = reactor.run().await;
1188        })
1189        .unwrap();
1190
1191        // Future to pretend to be a relay on the other end of the circuit.
1192        let simulate_relay_fut = async move {
1193            let mut rng = testing_rng();
1194            let create_cell = rx.next().await.unwrap();
1195            assert_eq!(create_cell.circid(), Some(circid));
1196            let reply = match handshake_type {
1197                HandshakeType::Fast => {
1198                    let cf = match create_cell.msg() {
1199                        AnyChanMsg::CreateFast(cf) => cf,
1200                        other => panic!("{:?}", other),
1201                    };
1202                    let (_, rep) = CreateFastServer::server(
1203                        &mut rng,
1204                        &mut |_: &()| Some(()),
1205                        &[()],
1206                        cf.handshake(),
1207                    )
1208                    .unwrap();
1209                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1210                }
1211                HandshakeType::Ntor => {
1212                    let c2 = match create_cell.msg() {
1213                        AnyChanMsg::Create2(c2) => c2,
1214                        other => panic!("{:?}", other),
1215                    };
1216                    let (_, rep) = NtorServer::server(
1217                        &mut rng,
1218                        &mut |_: &()| Some(()),
1219                        &[example_ntor_key()],
1220                        c2.body(),
1221                    )
1222                    .unwrap();
1223                    CreateResponse::Created2(Created2::new(rep))
1224                }
1225                HandshakeType::NtorV3 => {
1226                    let c2 = match create_cell.msg() {
1227                        AnyChanMsg::Create2(c2) => c2,
1228                        other => panic!("{:?}", other),
1229                    };
1230                    let mut reply_fn = if with_cc {
1231                        |client_exts: &[CircRequestExt]| {
1232                            let _ = client_exts
1233                                .iter()
1234                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1235                                .expect("Client failed to request CC");
1236                            // This needs to be aligned to test_utils params
1237                            // value due to validation that needs it in range.
1238                            Some(vec![CircResponseExt::CcResponse(
1239                                extend_ext::CcResponse::new(31),
1240                            )])
1241                        }
1242                    } else {
1243                        |_: &_| Some(vec![])
1244                    };
1245                    let (_, rep) = NtorV3Server::server(
1246                        &mut rng,
1247                        &mut reply_fn,
1248                        &[example_ntor_v3_key()],
1249                        c2.body(),
1250                    )
1251                    .unwrap();
1252                    CreateResponse::Created2(Created2::new(rep))
1253                }
1254            };
1255            created_send.send(reply).unwrap();
1256        };
1257        // Future to pretend to be a client.
1258        let client_fut = async move {
1259            let target = example_target();
1260            let params = CircParameters::default();
1261            let ret = match handshake_type {
1262                HandshakeType::Fast => {
1263                    trace!("doing fast create");
1264                    pending.create_firsthop_fast(params).await
1265                }
1266                HandshakeType::Ntor => {
1267                    trace!("doing ntor create");
1268                    pending.create_firsthop_ntor(&target, params).await
1269                }
1270                HandshakeType::NtorV3 => {
1271                    let params = if with_cc {
1272                        // Setup CC vegas parameters.
1273                        CircParameters::new(
1274                            true,
1275                            build_cc_vegas_params(),
1276                            FlowCtrlParameters::defaults_for_tests(),
1277                        )
1278                    } else {
1279                        params
1280                    };
1281                    trace!("doing ntor_v3 create");
1282                    pending.create_firsthop_ntor_v3(&target, params).await
1283                }
1284            };
1285            trace!("create done: result {:?}", ret);
1286            ret
1287        };
1288
1289        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1290
1291        let _circ = circ.unwrap();
1292
1293        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1294        assert_eq!(_circ.n_hops().unwrap(), 1);
1295    }
1296
1297    #[traced_test]
1298    #[test]
1299    fn test_create_fast() {
1300        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1301            test_create(&rt, HandshakeType::Fast, false).await;
1302        });
1303    }
1304    #[traced_test]
1305    #[test]
1306    fn test_create_ntor() {
1307        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1308            test_create(&rt, HandshakeType::Ntor, false).await;
1309        });
1310    }
1311    #[traced_test]
1312    #[test]
1313    fn test_create_ntor_v3() {
1314        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1315            test_create(&rt, HandshakeType::NtorV3, false).await;
1316        });
1317    }
1318    #[traced_test]
1319    #[test]
1320    #[cfg(feature = "flowctl-cc")]
1321    fn test_create_ntor_v3_with_cc() {
1322        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1323            test_create(&rt, HandshakeType::NtorV3, true).await;
1324        });
1325    }
1326
1327    // An encryption layer that doesn't do any crypto.   Can be used
1328    // as inbound or outbound, but not both at once.
1329    pub(crate) struct DummyCrypto {
1330        counter_tag: [u8; 20],
1331        counter: u32,
1332        lasthop: bool,
1333    }
1334    impl DummyCrypto {
1335        fn next_tag(&mut self) -> SendmeTag {
1336            #![allow(clippy::identity_op)]
1337            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1338            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1339            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1340            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1341            self.counter += 1;
1342            self.counter_tag.into()
1343        }
1344    }
1345
1346    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1347        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1348            self.next_tag()
1349        }
1350        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1351    }
1352    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1353        fn decrypt_inbound(
1354            &mut self,
1355            _cmd: ChanCmd,
1356            _cell: &mut RelayCellBody,
1357        ) -> Option<SendmeTag> {
1358            if self.lasthop {
1359                Some(self.next_tag())
1360            } else {
1361                None
1362            }
1363        }
1364    }
1365    impl DummyCrypto {
1366        pub(crate) fn new(lasthop: bool) -> Self {
1367            DummyCrypto {
1368                counter_tag: [0; 20],
1369                counter: 0,
1370                lasthop,
1371            }
1372        }
1373    }
1374
1375    // Helper: set up a 3-hop circuit with no encryption, where the
1376    // next inbound message seems to come from hop next_msg_from
1377    async fn newtunnel_ext<R: Runtime>(
1378        rt: &R,
1379        unique_id: UniqId,
1380        chan: Arc<Channel>,
1381        hops: Vec<path::HopDetail>,
1382        next_msg_from: HopNum,
1383        params: CircParameters,
1384    ) -> (ClientTunnel, CircuitRxSender) {
1385        let circid = CircId::new(128).unwrap();
1386        let (_created_send, created_recv) = oneshot::channel();
1387        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1388        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1389
1390        let (pending, reactor) = PendingClientTunnel::new(
1391            circid,
1392            chan,
1393            created_recv,
1394            circmsg_recv,
1395            unique_id,
1396            DynTimeProvider::new(rt.clone()),
1397            CircuitAccount::new_noop(),
1398            padding_ctrl,
1399            padding_stream,
1400            Arc::new(DummyTimeoutEstimator),
1401        );
1402
1403        rt.spawn(async {
1404            let _ignore = reactor.run().await;
1405        })
1406        .unwrap();
1407        let PendingClientTunnel {
1408            circ,
1409            recvcreated: _,
1410        } = pending;
1411
1412        // TODO #1067: Support other formats
1413        let relay_cell_format = RelayCellFormat::V0;
1414
1415        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1416        for (idx, peer_id) in hops.into_iter().enumerate() {
1417            let (tx, rx) = oneshot::channel();
1418            let idx = idx as u8;
1419
1420            circ.command
1421                .unbounded_send(CtrlCmd::AddFakeHop {
1422                    relay_cell_format,
1423                    fwd_lasthop: idx == last_hop_num,
1424                    rev_lasthop: idx == u8::from(next_msg_from),
1425                    peer_id,
1426                    params: params.clone(),
1427                    done: tx,
1428                })
1429                .unwrap();
1430            rx.await.unwrap().unwrap();
1431        }
1432        (circ.into_tunnel().unwrap(), circmsg_send)
1433    }
1434
1435    // Helper: set up a 3-hop circuit with no encryption, where the
1436    // next inbound message seems to come from hop next_msg_from
1437    async fn newtunnel<R: Runtime>(
1438        rt: &R,
1439        chan: Arc<Channel>,
1440    ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1441        let hops = std::iter::repeat_with(|| {
1442            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1443                .ed_identity([4; 32].into())
1444                .rsa_identity([5; 20].into())
1445                .build()
1446                .expect("Could not construct fake hop");
1447
1448            path::HopDetail::Relay(peer_id)
1449        })
1450        .take(3)
1451        .collect();
1452
1453        let unique_id = UniqId::new(23, 17);
1454        let (tunnel, circmsg_send) = newtunnel_ext(
1455            rt,
1456            unique_id,
1457            chan,
1458            hops,
1459            2.into(),
1460            CircParameters::default(),
1461        )
1462        .await;
1463
1464        (Arc::new(tunnel), circmsg_send)
1465    }
1466
1467    /// Create `n` distinct [`path::HopDetail`]s,
1468    /// with the specified `start_idx` for the dummy identities.
1469    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1470        (0..n)
1471            .map(|idx| {
1472                let peer_id = tor_linkspec::OwnedChanTarget::builder()
1473                    .ed_identity([idx + start_idx; 32].into())
1474                    .rsa_identity([idx + start_idx + 1; 20].into())
1475                    .build()
1476                    .expect("Could not construct fake hop");
1477
1478                path::HopDetail::Relay(peer_id)
1479            })
1480            .collect()
1481    }
1482
1483    #[allow(deprecated)]
1484    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1485        use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1486
1487        let (chan, mut rx, _sink) = working_fake_channel(rt);
1488        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1489        let circ = Arc::new(tunnel.as_single_circ().unwrap());
1490        let circid = circ.peek_circid();
1491        let params = CircParameters::default();
1492
1493        let extend_fut = async move {
1494            let target = example_target();
1495            match handshake_type {
1496                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1497                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1498                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1499            };
1500            circ // gotta keep the circ alive, or the reactor would exit.
1501        };
1502        let reply_fut = async move {
1503            // We've disabled encryption on this circuit, so we can just
1504            // read the extend2 cell.
1505            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1506            assert_eq!(id, Some(circid));
1507            let rmsg = match chmsg {
1508                AnyChanMsg::RelayEarly(r) => {
1509                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1510                        .unwrap()
1511                }
1512                other => panic!("{:?}", other),
1513            };
1514            let e2 = match rmsg.msg() {
1515                AnyRelayMsg::Extend2(e2) => e2,
1516                other => panic!("{:?}", other),
1517            };
1518            let mut rng = testing_rng();
1519            let reply = match handshake_type {
1520                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1521                HandshakeType::Ntor => {
1522                    let (_keygen, reply) = NtorServer::server(
1523                        &mut rng,
1524                        &mut |_: &()| Some(()),
1525                        &[example_ntor_key()],
1526                        e2.handshake(),
1527                    )
1528                    .unwrap();
1529                    reply
1530                }
1531                HandshakeType::NtorV3 => {
1532                    let (_keygen, reply) = NtorV3Server::server(
1533                        &mut rng,
1534                        &mut |_: &[CircRequestExt]| Some(vec![]),
1535                        &[example_ntor_v3_key()],
1536                        e2.handshake(),
1537                    )
1538                    .unwrap();
1539                    reply
1540                }
1541            };
1542
1543            let extended2 = relaymsg::Extended2::new(reply).into();
1544            sink.send(rmsg_to_ccmsg(None, extended2, false))
1545                .await
1546                .unwrap();
1547            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1548        };
1549
1550        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1551
1552        // Did we really add another hop?
1553        assert_eq!(circ.n_hops().unwrap(), 4);
1554
1555        // Do the path accessors report a reasonable outcome?
1556        {
1557            let path = circ.single_path().unwrap();
1558            let path = path
1559                .all_hops()
1560                .filter_map(|hop| match hop {
1561                    path::HopDetail::Relay(r) => Some(r),
1562                    #[cfg(feature = "hs-common")]
1563                    path::HopDetail::Virtual => None,
1564                })
1565                .collect::<Vec<_>>();
1566
1567            assert_eq!(path.len(), 4);
1568            use tor_linkspec::HasRelayIds;
1569            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1570            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1571        }
1572        {
1573            let path = circ.single_path().unwrap();
1574            assert_eq!(path.n_hops(), 4);
1575            use tor_linkspec::HasRelayIds;
1576            assert_eq!(
1577                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1578                example_target().ed_identity()
1579            );
1580            assert_ne!(
1581                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1582                example_target().ed_identity()
1583            );
1584        }
1585    }
1586
1587    #[traced_test]
1588    #[test]
1589    fn test_extend_ntor() {
1590        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1591            test_extend(&rt, HandshakeType::Ntor).await;
1592        });
1593    }
1594
1595    #[traced_test]
1596    #[test]
1597    fn test_extend_ntor_v3() {
1598        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1599            test_extend(&rt, HandshakeType::NtorV3).await;
1600        });
1601    }
1602
1603    #[allow(deprecated)]
1604    async fn bad_extend_test_impl<R: Runtime>(
1605        rt: &R,
1606        reply_hop: HopNum,
1607        bad_reply: AnyChanMsg,
1608    ) -> Error {
1609        let (chan, mut rx, _sink) = working_fake_channel(rt);
1610        let hops = std::iter::repeat_with(|| {
1611            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1612                .ed_identity([4; 32].into())
1613                .rsa_identity([5; 20].into())
1614                .build()
1615                .expect("Could not construct fake hop");
1616
1617            path::HopDetail::Relay(peer_id)
1618        })
1619        .take(3)
1620        .collect();
1621
1622        let unique_id = UniqId::new(23, 17);
1623        let (tunnel, mut sink) = newtunnel_ext(
1624            rt,
1625            unique_id,
1626            chan,
1627            hops,
1628            reply_hop,
1629            CircParameters::default(),
1630        )
1631        .await;
1632        let params = CircParameters::default();
1633
1634        let target = example_target();
1635        let reply_task_handle = rt
1636            .spawn_with_handle(async move {
1637                // Wait for a cell, and make sure it's EXTEND2.
1638                let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1639                let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1640                    panic!("unexpected message {chanmsg:?}");
1641                };
1642                let relaymsg = UnparsedRelayMsg::from_singleton_body(
1643                    RelayCellFormat::V0,
1644                    relay_early.into_relay_body(),
1645                )
1646                .unwrap();
1647                assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1648
1649                // Send back the "bad_reply."
1650                sink.send(bad_reply).await.unwrap();
1651                sink
1652            })
1653            .unwrap();
1654        let outcome = tunnel
1655            .as_single_circ()
1656            .unwrap()
1657            .extend_ntor(&target, params)
1658            .await;
1659        let _sink = reply_task_handle.await;
1660
1661        assert_eq!(tunnel.n_hops().unwrap(), 3);
1662        assert!(outcome.is_err());
1663        outcome.unwrap_err()
1664    }
1665
1666    #[traced_test]
1667    #[test]
1668    fn bad_extend_wronghop() {
1669        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1670            let extended2 = relaymsg::Extended2::new(vec![]).into();
1671            let cc = rmsg_to_ccmsg(None, extended2, false);
1672
1673            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1674            // This case shows up as a CircDestroy, since a message sent
1675            // from the wrong hop won't even be delivered to the extend
1676            // code's meta-handler.  Instead the unexpected message will cause
1677            // the circuit to get torn down.
1678            match error {
1679                Error::CircuitClosed => {}
1680                x => panic!("got other error: {}", x),
1681            }
1682        });
1683    }
1684
1685    #[traced_test]
1686    #[test]
1687    fn bad_extend_wrongtype() {
1688        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1689            let extended = relaymsg::Extended::new(vec![7; 200]).into();
1690            let cc = rmsg_to_ccmsg(None, extended, false);
1691
1692            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1693            match error {
1694                Error::BytesErr {
1695                    err: tor_bytes::Error::InvalidMessage(_),
1696                    object: "extended2 message",
1697                } => {}
1698                other => panic!("{:?}", other),
1699            }
1700        });
1701    }
1702
1703    #[traced_test]
1704    #[test]
1705    fn bad_extend_destroy() {
1706        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707            let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1708            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1709            match error {
1710                Error::CircuitClosed => {}
1711                other => panic!("{:?}", other),
1712            }
1713        });
1714    }
1715
1716    #[traced_test]
1717    #[test]
1718    fn bad_extend_crypto() {
1719        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1720            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1721            let cc = rmsg_to_ccmsg(None, extended2, false);
1722            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1723            assert_matches!(error, Error::BadCircHandshakeAuth);
1724        });
1725    }
1726
1727    #[traced_test]
1728    #[test]
1729    fn begindir() {
1730        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1731            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1732            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1733            let circ = tunnel.as_single_circ().unwrap();
1734            let circid = circ.peek_circid();
1735
1736            let begin_and_send_fut = async move {
1737                // Here we'll say we've got a circuit, and we want to
1738                // make a simple BEGINDIR request with it.
1739                let mut stream = tunnel.begin_dir_stream().await.unwrap();
1740                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1741                stream.flush().await.unwrap();
1742                let mut buf = [0_u8; 1024];
1743                let n = stream.read(&mut buf).await.unwrap();
1744                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1745                let n = stream.read(&mut buf).await.unwrap();
1746                assert_eq!(n, 0);
1747                stream
1748            };
1749            let reply_fut = async move {
1750                // We've disabled encryption on this circuit, so we can just
1751                // read the begindir cell.
1752                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1753                assert_eq!(id, Some(circid));
1754                let rmsg = match chmsg {
1755                    AnyChanMsg::Relay(r) => {
1756                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1757                            .unwrap()
1758                    }
1759                    other => panic!("{:?}", other),
1760                };
1761                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1762                assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1763
1764                // Reply with a Connected cell to indicate success.
1765                let connected = relaymsg::Connected::new_empty().into();
1766                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1767                    .await
1768                    .unwrap();
1769
1770                // Now read a DATA cell...
1771                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1772                assert_eq!(id, Some(circid));
1773                let rmsg = match chmsg {
1774                    AnyChanMsg::Relay(r) => {
1775                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1776                            .unwrap()
1777                    }
1778                    other => panic!("{:?}", other),
1779                };
1780                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1781                assert_eq!(streamid_2, streamid);
1782                if let AnyRelayMsg::Data(d) = rmsg {
1783                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1784                } else {
1785                    panic!();
1786                }
1787
1788                // Write another data cell in reply!
1789                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1790                    .unwrap()
1791                    .into();
1792                sink.send(rmsg_to_ccmsg(streamid, data, false))
1793                    .await
1794                    .unwrap();
1795
1796                // Send an END cell to say that the conversation is over.
1797                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1798                sink.send(rmsg_to_ccmsg(streamid, end, false))
1799                    .await
1800                    .unwrap();
1801
1802                (rx, sink) // gotta keep these alive, or the reactor will exit.
1803            };
1804
1805            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1806        });
1807    }
1808
1809    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
1810    fn close_stream_helper(by_drop: bool) {
1811        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1812            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1813            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1814
1815            let stream_fut = async move {
1816                let stream = tunnel
1817                    .begin_stream("www.example.com", 80, None)
1818                    .await
1819                    .unwrap();
1820
1821                let (r, mut w) = stream.split();
1822                if by_drop {
1823                    // Drop the writer and the reader, which should close the stream.
1824                    drop(r);
1825                    drop(w);
1826                    (None, tunnel) // make sure to keep the circuit alive
1827                } else {
1828                    // Call close on the writer, while keeping the reader alive.
1829                    w.close().await.unwrap();
1830                    (Some(r), tunnel)
1831                }
1832            };
1833            let handler_fut = async {
1834                // Read the BEGIN message.
1835                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1836                let rmsg = match msg {
1837                    AnyChanMsg::Relay(r) => {
1838                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1839                            .unwrap()
1840                    }
1841                    other => panic!("{:?}", other),
1842                };
1843                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1844                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1845
1846                // Reply with a CONNECTED.
1847                let connected =
1848                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1849                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1850                    .await
1851                    .unwrap();
1852
1853                // Expect an END.
1854                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1855                let rmsg = match msg {
1856                    AnyChanMsg::Relay(r) => {
1857                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1858                            .unwrap()
1859                    }
1860                    other => panic!("{:?}", other),
1861                };
1862                let (_, rmsg) = rmsg.into_streamid_and_msg();
1863                assert_eq!(rmsg.cmd(), RelayCmd::END);
1864
1865                (rx, sink) // keep these alive or the reactor will exit.
1866            };
1867
1868            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1869        });
1870    }
1871
1872    #[traced_test]
1873    #[test]
1874    fn drop_stream() {
1875        close_stream_helper(true);
1876    }
1877
1878    #[traced_test]
1879    #[test]
1880    fn close_stream() {
1881        close_stream_helper(false);
1882    }
1883
1884    #[traced_test]
1885    #[test]
1886    fn expire_halfstreams() {
1887        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1888            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1889            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1890
1891            let client_fut = async move {
1892                let stream = tunnel
1893                    .begin_stream("www.example.com", 80, None)
1894                    .await
1895                    .unwrap();
1896
1897                let (r, mut w) = stream.split();
1898                // Close the stream
1899                w.close().await.unwrap();
1900                (Some(r), tunnel)
1901            };
1902            let exit_fut = async {
1903                // Read the BEGIN message.
1904                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1905                let rmsg = match msg {
1906                    AnyChanMsg::Relay(r) => {
1907                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1908                            .unwrap()
1909                    }
1910                    other => panic!("{:?}", other),
1911                };
1912                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1913                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1914
1915                // Reply with a CONNECTED.
1916                let connected =
1917                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1918                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1919                    .await
1920                    .unwrap();
1921
1922                (rx, streamid, sink) // keep these alive or the reactor will exit.
1923            };
1924
1925            let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1926                futures::join!(client_fut, exit_fut);
1927
1928            // Progress all futures to ensure the reactor has a chance to notice
1929            // we closed the stream.
1930            rt.progress_until_stalled().await;
1931
1932            // The tunnel should remain open
1933            assert!(!tunnel.is_closed());
1934
1935            // Write some more data on the half-stream.
1936            // The half-stream hasn't expired yet, so it will simply be ignored.
1937            let data = relaymsg::Data::new(b"hello").unwrap();
1938            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1939                .await
1940                .unwrap();
1941            rt.progress_until_stalled().await;
1942
1943            // This was not a protocol violation, so the tunnel is still alive.
1944            assert!(!tunnel.is_closed());
1945
1946            // Advance the time to cause the half-streams to get garbage collected.
1947            //
1948            // Advancing it by 2 * CBT ought to be enough, because the RTT estimator
1949            // won't yet have an estimate for the max_rtt.
1950            let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1951            rt.advance_by(2 * stream_timeout).await;
1952
1953            // Sending this cell is a protocol violation now
1954            // that the half-stream expired.
1955            let data = relaymsg::Data::new(b"hello").unwrap();
1956            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1957                .await
1958                .unwrap();
1959            rt.progress_until_stalled().await;
1960
1961            // The tunnel shut down because of the proto violation.
1962            assert!(tunnel.is_closed());
1963        });
1964    }
1965
1966    // Set up a circuit and stream that expects some incoming SENDMEs.
1967    async fn setup_incoming_sendme_case<R: Runtime>(
1968        rt: &R,
1969        n_to_send: usize,
1970    ) -> (
1971        Arc<ClientTunnel>,
1972        DataStream,
1973        CircuitRxSender,
1974        Option<StreamId>,
1975        usize,
1976        Receiver<AnyChanCell>,
1977        Sender<CodecResult>,
1978    ) {
1979        let (chan, mut rx, sink2) = working_fake_channel(rt);
1980        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1981        let circid = tunnel.as_single_circ().unwrap().peek_circid();
1982
1983        let begin_and_send_fut = {
1984            let tunnel = tunnel.clone();
1985            async move {
1986                // Take our circuit and make a stream on it.
1987                let mut stream = tunnel
1988                    .begin_stream("www.example.com", 443, None)
1989                    .await
1990                    .unwrap();
1991                let junk = [0_u8; 1024];
1992                let mut remaining = n_to_send;
1993                while remaining > 0 {
1994                    let n = std::cmp::min(remaining, junk.len());
1995                    stream.write_all(&junk[..n]).await.unwrap();
1996                    remaining -= n;
1997                }
1998                stream.flush().await.unwrap();
1999                stream
2000            }
2001        };
2002
2003        let receive_fut = async move {
2004            // Read the begin cell.
2005            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2006            let rmsg = match chmsg {
2007                AnyChanMsg::Relay(r) => {
2008                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2009                        .unwrap()
2010                }
2011                other => panic!("{:?}", other),
2012            };
2013            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2014            assert_matches!(rmsg, AnyRelayMsg::Begin(_));
2015            // Reply with a connected cell...
2016            let connected = relaymsg::Connected::new_empty().into();
2017            sink.send(rmsg_to_ccmsg(streamid, connected, false))
2018                .await
2019                .unwrap();
2020            // Now read bytes from the stream until we have them all.
2021            let mut bytes_received = 0_usize;
2022            let mut cells_received = 0_usize;
2023            while bytes_received < n_to_send {
2024                // Read a data cell, and remember how much we got.
2025                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2026                assert_eq!(id, Some(circid));
2027
2028                let rmsg = match chmsg {
2029                    AnyChanMsg::Relay(r) => {
2030                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2031                            .unwrap()
2032                    }
2033                    other => panic!("{:?}", other),
2034                };
2035                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2036                assert_eq!(streamid2, streamid);
2037                if let AnyRelayMsg::Data(dat) = rmsg {
2038                    cells_received += 1;
2039                    bytes_received += dat.as_ref().len();
2040                } else {
2041                    panic!();
2042                }
2043            }
2044
2045            (sink, streamid, cells_received, rx)
2046        };
2047
2048        let (stream, (sink, streamid, cells_received, rx)) =
2049            futures::join!(begin_and_send_fut, receive_fut);
2050
2051        (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2052    }
2053
2054    #[traced_test]
2055    #[test]
2056    fn accept_valid_sendme() {
2057        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2058            let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2059                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2060            let circ = tunnel.as_single_circ().unwrap();
2061
2062            assert_eq!(cells_received, 301);
2063
2064            // Make sure that the circuit is indeed expecting the right sendmes
2065            {
2066                let (tx, rx) = oneshot::channel();
2067                circ.command
2068                    .unbounded_send(CtrlCmd::QuerySendWindow {
2069                        hop: 2.into(),
2070                        leg: tunnel.unique_id(),
2071                        done: tx,
2072                    })
2073                    .unwrap();
2074                let (window, tags) = rx.await.unwrap().unwrap();
2075                assert_eq!(window, 1000 - 301);
2076                assert_eq!(tags.len(), 3);
2077                // 100
2078                assert_eq!(
2079                    tags[0],
2080                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2081                );
2082                // 200
2083                assert_eq!(
2084                    tags[1],
2085                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2086                );
2087                // 300
2088                assert_eq!(
2089                    tags[2],
2090                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2091                );
2092            }
2093
2094            let reply_with_sendme_fut = async move {
2095                // make and send a circuit-level sendme.
2096                let c_sendme =
2097                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2098                        .into();
2099                sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2100                    .await
2101                    .unwrap();
2102
2103                // Make and send a stream-level sendme.
2104                let s_sendme = relaymsg::Sendme::new_empty().into();
2105                sink.send(rmsg_to_ccmsg(streamid, s_sendme, false))
2106                    .await
2107                    .unwrap();
2108
2109                sink
2110            };
2111
2112            let _sink = reply_with_sendme_fut.await;
2113
2114            rt.advance_until_stalled().await;
2115
2116            // Now make sure that the circuit is still happy, and its
2117            // window is updated.
2118            {
2119                let (tx, rx) = oneshot::channel();
2120                circ.command
2121                    .unbounded_send(CtrlCmd::QuerySendWindow {
2122                        hop: 2.into(),
2123                        leg: tunnel.unique_id(),
2124                        done: tx,
2125                    })
2126                    .unwrap();
2127                let (window, _tags) = rx.await.unwrap().unwrap();
2128                assert_eq!(window, 1000 - 201);
2129            }
2130        });
2131    }
2132
2133    #[traced_test]
2134    #[test]
2135    fn invalid_circ_sendme() {
2136        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2137            // Same setup as accept_valid_sendme() test above but try giving
2138            // a sendme with the wrong tag.
2139
2140            let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2141                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2142
2143            let reply_with_sendme_fut = async move {
2144                // make and send a circuit-level sendme with a bad tag.
2145                let c_sendme =
2146                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2147                        .into();
2148                sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2149                    .await
2150                    .unwrap();
2151                sink
2152            };
2153
2154            let _sink = reply_with_sendme_fut.await;
2155
2156            // Check whether the reactor dies as a result of receiving invalid data.
2157            rt.advance_until_stalled().await;
2158            assert!(tunnel.is_closed());
2159        });
2160    }
2161
2162    #[traced_test]
2163    #[test]
2164    fn test_busy_stream_fairness() {
2165        // Number of streams to use.
2166        const N_STREAMS: usize = 3;
2167        // Number of cells (roughly) for each stream to send.
2168        const N_CELLS: usize = 20;
2169        // Number of bytes that *each* stream will send, and that we'll read
2170        // from the channel.
2171        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2172        // Ignoring cell granularity, with perfect fairness we'd expect
2173        // `N_BYTES/N_STREAMS` bytes from each stream.
2174        //
2175        // We currently allow for up to a full cell less than that.  This is
2176        // somewhat arbitrary and can be changed as needed, since we don't
2177        // provide any specific fairness guarantees.
2178        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2179            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2180
2181        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2182            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2183            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2184
2185            // Run clients in a single task, doing our own round-robin
2186            // scheduling of writes to the reactor. Conversely, if we were to
2187            // put each client in its own task, we would be at the mercy of
2188            // how fairly the runtime schedules the client tasks, which is outside
2189            // the scope of this test.
2190            rt.spawn({
2191                // Clone the circuit to keep it alive after writers have
2192                // finished with it.
2193                let tunnel = tunnel.clone();
2194                async move {
2195                    let mut clients = VecDeque::new();
2196                    struct Client {
2197                        stream: DataStream,
2198                        to_write: &'static [u8],
2199                    }
2200                    for _ in 0..N_STREAMS {
2201                        clients.push_back(Client {
2202                            stream: tunnel
2203                                .begin_stream("www.example.com", 80, None)
2204                                .await
2205                                .unwrap(),
2206                            to_write: &[0_u8; N_BYTES][..],
2207                        });
2208                    }
2209                    while let Some(mut client) = clients.pop_front() {
2210                        if client.to_write.is_empty() {
2211                            // Client is done. Don't put back in queue.
2212                            continue;
2213                        }
2214                        let written = client.stream.write(client.to_write).await.unwrap();
2215                        client.to_write = &client.to_write[written..];
2216                        clients.push_back(client);
2217                    }
2218                }
2219            })
2220            .unwrap();
2221
2222            let channel_handler_fut = async {
2223                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2224                let mut total_bytes_received = 0;
2225
2226                loop {
2227                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2228                    let rmsg = match msg {
2229                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2230                            RelayCellFormat::V0,
2231                            r.into_relay_body(),
2232                        )
2233                        .unwrap(),
2234                        other => panic!("Unexpected chanmsg: {other:?}"),
2235                    };
2236                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2237                    match rmsg.cmd() {
2238                        RelayCmd::BEGIN => {
2239                            // Add an entry for this stream.
2240                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2241                            assert_eq!(prev, None);
2242                            // Reply with a CONNECTED.
2243                            let connected = relaymsg::Connected::new_with_addr(
2244                                "10.0.0.1".parse().unwrap(),
2245                                1234,
2246                            )
2247                            .into();
2248                            sink.send(rmsg_to_ccmsg(streamid, connected, false))
2249                                .await
2250                                .unwrap();
2251                        }
2252                        RelayCmd::DATA => {
2253                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2254                            let nbytes = data_msg.as_ref().len();
2255                            total_bytes_received += nbytes;
2256                            let streamid = streamid.unwrap();
2257                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2258                            *stream_bytes += nbytes;
2259                            if total_bytes_received >= N_BYTES {
2260                                break;
2261                            }
2262                        }
2263                        RelayCmd::END => {
2264                            // Stream is done. If fair scheduling is working as
2265                            // expected we *probably* shouldn't get here, but we
2266                            // can ignore it and save the failure until we
2267                            // actually have the final stats.
2268                            continue;
2269                        }
2270                        other => {
2271                            panic!("Unexpected command {other:?}");
2272                        }
2273                    }
2274                }
2275
2276                // Return our stats, along with the `rx` and `sink` to keep the
2277                // reactor alive (since clients could still be writing).
2278                (total_bytes_received, stream_bytes_received, rx, sink)
2279            };
2280
2281            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2282                channel_handler_fut.await;
2283            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2284            for (sid, stream_bytes) in stream_bytes_received {
2285                assert!(
2286                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2287                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2288                );
2289            }
2290        });
2291    }
2292
2293    #[test]
2294    fn basic_params() {
2295        use super::CircParameters;
2296        let mut p = CircParameters::default();
2297        assert!(p.extend_by_ed25519_id);
2298
2299        p.extend_by_ed25519_id = false;
2300        assert!(!p.extend_by_ed25519_id);
2301    }
2302
2303    #[traced_test]
2304    #[test]
2305    #[cfg(feature = "hs-service")]
2306    fn allow_stream_requests_twice() {
2307        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2308            let (chan, _rx, _sink) = working_fake_channel(&rt);
2309            let (tunnel, _send) = newtunnel(&rt, chan).await;
2310
2311            let _incoming = tunnel
2312                .allow_stream_requests(
2313                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2314                    tunnel.resolve_last_hop().await,
2315                    AllowAllStreamsFilter,
2316                )
2317                .await
2318                .unwrap();
2319
2320            let incoming = tunnel
2321                .allow_stream_requests(
2322                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2323                    tunnel.resolve_last_hop().await,
2324                    AllowAllStreamsFilter,
2325                )
2326                .await;
2327
2328            // There can only be one IncomingStream at a time on any given circuit.
2329            assert!(incoming.is_err());
2330        });
2331    }
2332
2333    #[traced_test]
2334    #[test]
2335    #[cfg(feature = "hs-service")]
2336    fn allow_stream_requests() {
2337        use tor_cell::relaycell::msg::BeginFlags;
2338
2339        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2340            const TEST_DATA: &[u8] = b"ping";
2341
2342            let (chan, _rx, _sink) = working_fake_channel(&rt);
2343            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2344
2345            let rfmt = RelayCellFormat::V0;
2346
2347            // A helper channel for coordinating the "client"/"service" interaction
2348            let (tx, rx) = oneshot::channel();
2349            let mut incoming = tunnel
2350                .allow_stream_requests(
2351                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2352                    tunnel.resolve_last_hop().await,
2353                    AllowAllStreamsFilter,
2354                )
2355                .await
2356                .unwrap();
2357
2358            let simulate_service = async move {
2359                let stream = incoming.next().await.unwrap();
2360                let mut data_stream = stream
2361                    .accept_data(relaymsg::Connected::new_empty())
2362                    .await
2363                    .unwrap();
2364                // Notify the client task we're ready to accept DATA cells
2365                tx.send(()).unwrap();
2366
2367                // Read the data the client sent us
2368                let mut buf = [0_u8; TEST_DATA.len()];
2369                data_stream.read_exact(&mut buf).await.unwrap();
2370                assert_eq!(&buf, TEST_DATA);
2371
2372                tunnel
2373            };
2374
2375            let simulate_client = async move {
2376                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2377                let body: BoxedCellBody =
2378                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2379                        .encode(rfmt, &mut testing_rng())
2380                        .unwrap();
2381                let begin_msg = chanmsg::Relay::from(body);
2382
2383                // Pretend to be a client at the other end of the circuit sending a begin cell
2384                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2385
2386                // Wait until the service is ready to accept data
2387                // TODO: we shouldn't need to wait! This is needed because the service will reject
2388                // any DATA cells that aren't associated with a known stream. We need to wait until
2389                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2390                // new stream).
2391                rx.await.unwrap();
2392                // Now send some data along the newly established circuit..
2393                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2394                let body: BoxedCellBody =
2395                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2396                        .encode(rfmt, &mut testing_rng())
2397                        .unwrap();
2398                let data_msg = chanmsg::Relay::from(body);
2399
2400                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2401                send
2402            };
2403
2404            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2405        });
2406    }
2407
2408    #[traced_test]
2409    #[test]
2410    #[cfg(feature = "hs-service")]
2411    fn accept_stream_after_reject() {
2412        use tor_cell::relaycell::msg::AnyRelayMsg;
2413        use tor_cell::relaycell::msg::BeginFlags;
2414        use tor_cell::relaycell::msg::EndReason;
2415
2416        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2417            const TEST_DATA: &[u8] = b"ping";
2418            const STREAM_COUNT: usize = 2;
2419            let rfmt = RelayCellFormat::V0;
2420
2421            let (chan, _rx, _sink) = working_fake_channel(&rt);
2422            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2423
2424            // A helper channel for coordinating the "client"/"service" interaction
2425            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2426
2427            let mut incoming = tunnel
2428                .allow_stream_requests(
2429                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2430                    tunnel.resolve_last_hop().await,
2431                    AllowAllStreamsFilter,
2432                )
2433                .await
2434                .unwrap();
2435
2436            let simulate_service = async move {
2437                // Process 2 incoming streams
2438                for i in 0..STREAM_COUNT {
2439                    let stream = incoming.next().await.unwrap();
2440
2441                    // Reject the first one
2442                    if i == 0 {
2443                        stream
2444                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2445                            .await
2446                            .unwrap();
2447                        // Notify the client
2448                        tx.send(()).await.unwrap();
2449                        continue;
2450                    }
2451
2452                    let mut data_stream = stream
2453                        .accept_data(relaymsg::Connected::new_empty())
2454                        .await
2455                        .unwrap();
2456                    // Notify the client task we're ready to accept DATA cells
2457                    tx.send(()).await.unwrap();
2458
2459                    // Read the data the client sent us
2460                    let mut buf = [0_u8; TEST_DATA.len()];
2461                    data_stream.read_exact(&mut buf).await.unwrap();
2462                    assert_eq!(&buf, TEST_DATA);
2463                }
2464
2465                tunnel
2466            };
2467
2468            let simulate_client = async move {
2469                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2470                let body: BoxedCellBody =
2471                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2472                        .encode(rfmt, &mut testing_rng())
2473                        .unwrap();
2474                let begin_msg = chanmsg::Relay::from(body);
2475
2476                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2477                // cells (the first one will be rejected by the test service).
2478                for _ in 0..STREAM_COUNT {
2479                    send.send(AnyChanMsg::Relay(begin_msg.clone()))
2480                        .await
2481                        .unwrap();
2482
2483                    // Wait until the service rejects our request
2484                    rx.next().await.unwrap();
2485                }
2486
2487                // Now send some data along the newly established circuit..
2488                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2489                let body: BoxedCellBody =
2490                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2491                        .encode(rfmt, &mut testing_rng())
2492                        .unwrap();
2493                let data_msg = chanmsg::Relay::from(body);
2494
2495                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2496                send
2497            };
2498
2499            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2500        });
2501    }
2502
2503    #[traced_test]
2504    #[test]
2505    #[cfg(feature = "hs-service")]
2506    fn incoming_stream_bad_hop() {
2507        use tor_cell::relaycell::msg::BeginFlags;
2508
2509        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2510            /// Expect the originator of the BEGIN cell to be hop 1.
2511            const EXPECTED_HOP: u8 = 1;
2512            let rfmt = RelayCellFormat::V0;
2513
2514            let (chan, _rx, _sink) = working_fake_channel(&rt);
2515            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2516
2517            // Expect to receive incoming streams from hop EXPECTED_HOP
2518            let mut incoming = tunnel
2519                .allow_stream_requests(
2520                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2521                    // Build the precise HopLocation with the underlying circuit.
2522                    (
2523                        tunnel.as_single_circ().unwrap().unique_id(),
2524                        EXPECTED_HOP.into(),
2525                    )
2526                        .into(),
2527                    AllowAllStreamsFilter,
2528                )
2529                .await
2530                .unwrap();
2531
2532            let simulate_service = async move {
2533                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2534                // so we expect the reactor to shut down.
2535                assert!(incoming.next().await.is_none());
2536                tunnel
2537            };
2538
2539            let simulate_client = async move {
2540                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2541                let body: BoxedCellBody =
2542                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2543                        .encode(rfmt, &mut testing_rng())
2544                        .unwrap();
2545                let begin_msg = chanmsg::Relay::from(body);
2546
2547                // Pretend to be a client at the other end of the circuit sending a begin cell
2548                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2549
2550                send
2551            };
2552
2553            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2554        });
2555    }
2556
2557    #[traced_test]
2558    #[test]
2559    #[cfg(feature = "conflux")]
2560    fn multipath_circ_validation() {
2561        use std::error::Error as _;
2562
2563        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2564            let params = CircParameters::default();
2565            let invalid_tunnels = [
2566                setup_bad_conflux_tunnel(&rt).await,
2567                setup_conflux_tunnel(&rt, true, params).await,
2568            ];
2569
2570            for tunnel in invalid_tunnels {
2571                let TestTunnelCtx {
2572                    tunnel: _tunnel,
2573                    circs: _circs,
2574                    conflux_link_rx,
2575                } = tunnel;
2576
2577                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2578                let err_src = conflux_hs_err.source().unwrap();
2579
2580                // The two circuits don't end in the same hop (no join point),
2581                // so the reactor will refuse to link them
2582                assert!(
2583                    err_src
2584                        .to_string()
2585                        .contains("one more conflux circuits are invalid")
2586                );
2587            }
2588        });
2589    }
2590
2591    // TODO: this structure could be reused for the other tests,
2592    // to address nickm's comment:
2593    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
2594    #[derive(Debug)]
2595    #[allow(unused)]
2596    #[cfg(feature = "conflux")]
2597    struct TestCircuitCtx {
2598        chan_rx: Receiver<AnyChanCell>,
2599        chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2600        circ_tx: CircuitRxSender,
2601        unique_id: UniqId,
2602    }
2603
2604    #[derive(Debug)]
2605    #[cfg(feature = "conflux")]
2606    struct TestTunnelCtx {
2607        tunnel: Arc<ClientTunnel>,
2608        circs: Vec<TestCircuitCtx>,
2609        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2610    }
2611
2612    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
2613    #[cfg(feature = "conflux")]
2614    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2615        // Wait for the LINK cell...
2616        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2617        let rmsg = match chmsg {
2618            AnyChanMsg::Relay(r) => {
2619                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2620                    .unwrap()
2621            }
2622            other => panic!("{:?}", other),
2623        };
2624        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2625
2626        let link = match rmsg {
2627            AnyRelayMsg::ConfluxLink(link) => link,
2628            _ => panic!("unexpected relay message {rmsg:?}"),
2629        };
2630
2631        assert!(streamid.is_none());
2632
2633        link
2634    }
2635
2636    #[cfg(feature = "conflux")]
2637    async fn setup_conflux_tunnel(
2638        rt: &MockRuntime,
2639        same_hops: bool,
2640        params: CircParameters,
2641    ) -> TestTunnelCtx {
2642        let hops1 = hop_details(3, 0);
2643        let hops2 = if same_hops {
2644            hops1.clone()
2645        } else {
2646            hop_details(3, 10)
2647        };
2648
2649        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2650        let (mut tunnel1, sink1) = newtunnel_ext(
2651            rt,
2652            UniqId::new(1, 3),
2653            chan1,
2654            hops1,
2655            2.into(),
2656            params.clone(),
2657        )
2658        .await;
2659
2660        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2661
2662        let (tunnel2, sink2) =
2663            newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2664
2665        let (answer_tx, answer_rx) = oneshot::channel();
2666        tunnel2
2667            .as_single_circ()
2668            .unwrap()
2669            .command
2670            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2671            .unwrap();
2672
2673        let circuit = answer_rx.await.unwrap().unwrap();
2674        // The circuit should be shutting down its reactor
2675        rt.advance_until_stalled().await;
2676        assert!(tunnel2.is_closed());
2677
2678        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2679        // Tell the first circuit to link with the second and form a multipath tunnel
2680        tunnel1
2681            .as_single_circ()
2682            .unwrap()
2683            .control
2684            .unbounded_send(CtrlMsg::LinkCircuits {
2685                circuits: vec![circuit],
2686                answer: conflux_link_tx,
2687            })
2688            .unwrap();
2689
2690        let circ_ctx1 = TestCircuitCtx {
2691            chan_rx: rx1,
2692            chan_tx: chan_sink1,
2693            circ_tx: sink1,
2694            unique_id: tunnel1.unique_id(),
2695        };
2696
2697        let circ_ctx2 = TestCircuitCtx {
2698            chan_rx: rx2,
2699            chan_tx: chan_sink2,
2700            circ_tx: sink2,
2701            unique_id: tunnel2.unique_id(),
2702        };
2703
2704        // TODO(conflux): nothing currently sets this,
2705        // so we need to manually set it.
2706        //
2707        // Instead of doing this, we should have a ClientCirc
2708        // API that sends CtrlMsg::Link circuits and sets this to true
2709        tunnel1.circ.is_multi_path = true;
2710        TestTunnelCtx {
2711            tunnel: Arc::new(tunnel1),
2712            circs: vec![circ_ctx1, circ_ctx2],
2713            conflux_link_rx,
2714        }
2715    }
2716
2717    #[cfg(feature = "conflux")]
2718    async fn setup_good_conflux_tunnel(
2719        rt: &MockRuntime,
2720        cc_params: CongestionControlParams,
2721    ) -> TestTunnelCtx {
2722        // Our 2 test circuits are identical, so they both have the same guards,
2723        // which technically violates the conflux set rule mentioned in prop354.
2724        // For testing purposes this is fine, but in production we'll need to ensure
2725        // the calling code prevents guard reuse (except in the case where
2726        // one of the guards happens to be Guard + Exit)
2727        let same_hops = true;
2728        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2729        let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2730        setup_conflux_tunnel(rt, same_hops, params).await
2731    }
2732
2733    #[cfg(feature = "conflux")]
2734    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2735        // The two circuits don't share any hops,
2736        // so they won't end in the same hop (no join point),
2737        // causing the reactor to refuse to link them.
2738        let same_hops = false;
2739        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2740        let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2741        setup_conflux_tunnel(rt, same_hops, params).await
2742    }
2743
2744    #[traced_test]
2745    #[test]
2746    #[cfg(feature = "conflux")]
2747    fn reject_conflux_linked_before_hs() {
2748        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2749            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2750            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2751
2752            let nonce = V1Nonce::new(&mut testing_rng());
2753            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2754            // Send a LINKED cell
2755            let linked = relaymsg::ConfluxLinked::new(payload).into();
2756            sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
2757
2758            rt.advance_until_stalled().await;
2759            assert!(tunnel.is_closed());
2760        });
2761    }
2762
2763    #[traced_test]
2764    #[test]
2765    #[cfg(feature = "conflux")]
2766    fn conflux_hs_timeout() {
2767        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2768            let TestTunnelCtx {
2769                tunnel: _tunnel,
2770                circs,
2771                conflux_link_rx,
2772            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2773
2774            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2775
2776            // Wait for the LINK cell
2777            let link = await_link_payload(&mut circ1.chan_rx).await;
2778
2779            // Send a LINK cell on the first leg...
2780            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2781            circ1
2782                .circ_tx
2783                .send(rmsg_to_ccmsg(None, linked, false))
2784                .await
2785                .unwrap();
2786
2787            // Do nothing, and wait for the handshake to timeout on the second leg
2788            rt.advance_by(Duration::from_secs(60)).await;
2789
2790            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2791
2792            // Get the handshake results of each circuit
2793            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2794                conflux_hs_res.try_into().unwrap();
2795
2796            assert!(res1.is_ok());
2797
2798            let err = res2.unwrap_err();
2799            assert_matches!(err, ConfluxHandshakeError::Timeout);
2800        });
2801    }
2802
2803    #[traced_test]
2804    #[test]
2805    #[cfg(feature = "conflux")]
2806    fn conflux_bad_hs() {
2807        use crate::util::err::ConfluxHandshakeError;
2808
2809        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2810            let nonce = V1Nonce::new(&mut testing_rng());
2811            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2812            //let extended2 = relaymsg::Extended2::new(vec![]).into();
2813            let bad_hs_responses = [
2814                (
2815                    rmsg_to_ccmsg(
2816                        None,
2817                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2818                        false,
2819                    ),
2820                    "Received CONFLUX_LINKED cell with mismatched nonce",
2821                ),
2822                (
2823                    rmsg_to_ccmsg(
2824                        None,
2825                        relaymsg::ConfluxLink::new(bad_link_payload).into(),
2826                        false,
2827                    ),
2828                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2829                ),
2830                (
2831                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2832                    "Received CONFLUX_SWITCH on unlinked circuit?!",
2833                ),
2834                // TODO: this currently causes the reactor to shut down immediately,
2835                // without sending a response on the handshake channel
2836                /*
2837                (
2838                    rmsg_to_ccmsg(None, extended2, false),
2839                    "Received CONFLUX_LINKED cell with mismatched nonce",
2840                ),
2841                */
2842            ];
2843
2844            for (bad_cell, expected_err) in bad_hs_responses {
2845                let TestTunnelCtx {
2846                    tunnel,
2847                    circs,
2848                    conflux_link_rx,
2849                } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2850
2851                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2852
2853                // Respond with a bogus cell on one of the legs
2854                circ2.circ_tx.send(bad_cell).await.unwrap();
2855
2856                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2857                // Get the handshake results (the handshake results are reported early,
2858                // without waiting for the second circuit leg's handshake to timeout,
2859                // because this is a protocol violation causing the entire tunnel to shut down)
2860                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2861                    conflux_hs_res.try_into().unwrap();
2862
2863                match res2.unwrap_err() {
2864                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2865                        assert_eq!(e, expected_err);
2866                    }
2867                    e => panic!("unexpected error: {e:?}"),
2868                }
2869
2870                assert!(tunnel.is_closed());
2871            }
2872        });
2873    }
2874
2875    #[traced_test]
2876    #[test]
2877    #[cfg(feature = "conflux")]
2878    fn unexpected_conflux_cell() {
2879        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2880            let nonce = V1Nonce::new(&mut testing_rng());
2881            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2882            let bad_cells = [
2883                rmsg_to_ccmsg(
2884                    None,
2885                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2886                    false,
2887                ),
2888                rmsg_to_ccmsg(
2889                    None,
2890                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2891                    false,
2892                ),
2893                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2894            ];
2895
2896            for bad_cell in bad_cells {
2897                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2898                let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2899
2900                sink.send(bad_cell).await.unwrap();
2901                rt.advance_until_stalled().await;
2902
2903                // Note: unfortunately we can't assert the circuit is
2904                // closing for the reason, because the reactor just logs
2905                // the error and then exits.
2906                assert!(tunnel.is_closed());
2907            }
2908        });
2909    }
2910
2911    #[traced_test]
2912    #[test]
2913    #[cfg(feature = "conflux")]
2914    fn conflux_bad_linked() {
2915        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2916            let TestTunnelCtx {
2917                tunnel,
2918                circs,
2919                conflux_link_rx: _,
2920            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2921
2922            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2923
2924            let link = await_link_payload(&mut circ1.chan_rx).await;
2925
2926            // Send a LINK cell on the first leg...
2927            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2928            circ1
2929                .circ_tx
2930                .send(rmsg_to_ccmsg(None, linked, false))
2931                .await
2932                .unwrap();
2933
2934            // ...and two LINKED cells on the second
2935            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2936            circ2
2937                .circ_tx
2938                .send(rmsg_to_ccmsg(None, linked, false))
2939                .await
2940                .unwrap();
2941            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2942            circ2
2943                .circ_tx
2944                .send(rmsg_to_ccmsg(None, linked, false))
2945                .await
2946                .unwrap();
2947
2948            rt.advance_until_stalled().await;
2949
2950            // Receiving a LINKED cell on an already linked leg causes
2951            // the tunnel to be torn down
2952            assert!(tunnel.is_closed());
2953        });
2954    }
2955
2956    #[traced_test]
2957    #[test]
2958    #[cfg(feature = "conflux")]
2959    fn conflux_bad_switch() {
2960        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2961            let cc_vegas_params = build_cc_vegas_params();
2962            let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2963            let bad_switch = [
2964                // SWITCH cells with seqno = 0 are not allowed
2965                relaymsg::ConfluxSwitch::new(0),
2966                // SWITCH cells with seqno > cc_init_cwnd are not allowed
2967                // on tunnels that have not received any data
2968                relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2969            ];
2970
2971            for bad_cell in bad_switch {
2972                let TestTunnelCtx {
2973                    tunnel,
2974                    circs,
2975                    conflux_link_rx,
2976                } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2977
2978                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2979
2980                let link = await_link_payload(&mut circ1.chan_rx).await;
2981
2982                // Send a LINKED cell on both legs
2983                for circ in [&mut circ1, &mut circ2] {
2984                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2985                    circ.circ_tx
2986                        .send(rmsg_to_ccmsg(None, linked, false))
2987                        .await
2988                        .unwrap();
2989                }
2990
2991                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2992                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2993
2994                // Now send a bad SWITCH cell on the first leg.
2995                // This will cause the tunnel reactor to shut down.
2996                let msg = rmsg_to_ccmsg(None, bad_cell.clone().into(), false);
2997                circ1.circ_tx.send(msg).await.unwrap();
2998
2999                // The tunnel should be shutting down
3000                rt.advance_until_stalled().await;
3001                assert!(tunnel.is_closed());
3002            }
3003        });
3004    }
3005
3006    #[traced_test]
3007    #[test]
3008    #[cfg(feature = "conflux")]
3009    fn conflux_consecutive_switch() {
3010        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3011            let TestTunnelCtx {
3012                tunnel,
3013                circs,
3014                conflux_link_rx,
3015            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3016
3017            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3018
3019            let link = await_link_payload(&mut circ1.chan_rx).await;
3020
3021            // Send a LINKED cell on both legs
3022            for circ in [&mut circ1, &mut circ2] {
3023                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3024                circ.circ_tx
3025                    .send(rmsg_to_ccmsg(None, linked, false))
3026                    .await
3027                    .unwrap();
3028            }
3029
3030            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3031            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3032
3033            // Send a valid SWITCH cell on the first leg.
3034            let switch1 = relaymsg::ConfluxSwitch::new(10);
3035            let msg = rmsg_to_ccmsg(None, switch1.into(), false);
3036            circ1.circ_tx.send(msg).await.unwrap();
3037
3038            // The tunnel should not be shutting down
3039            rt.advance_until_stalled().await;
3040            assert!(!tunnel.is_closed());
3041
3042            // Send another valid SWITCH cell on the same leg.
3043            let switch2 = relaymsg::ConfluxSwitch::new(12);
3044            let msg = rmsg_to_ccmsg(None, switch2.into(), false);
3045            circ1.circ_tx.send(msg).await.unwrap();
3046
3047            // The tunnel should now be shutting down
3048            // (consecutive switches are not allowed)
3049            rt.advance_until_stalled().await;
3050            assert!(tunnel.is_closed());
3051        });
3052    }
3053
3054    // This test ensures CtrlMsg::ShutdownAndReturnCircuit returns an
3055    // error when called on a multi-path tunnel
3056    #[traced_test]
3057    #[test]
3058    #[cfg(feature = "conflux")]
3059    fn shutdown_and_return_circ_multipath() {
3060        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3061            let TestTunnelCtx {
3062                tunnel,
3063                circs,
3064                conflux_link_rx: _,
3065            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3066
3067            rt.progress_until_stalled().await;
3068
3069            let (answer_tx, answer_rx) = oneshot::channel();
3070            tunnel
3071                .circ
3072                .command
3073                .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3074                .unwrap();
3075
3076            // map explicitly returns () for clarity
3077            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3078            let err = answer_rx
3079                .await
3080                .unwrap()
3081                .map(|_| {
3082                    // Map to () so we can call unwrap
3083                    // (Circuit doesn't impl debug)
3084                    ()
3085                })
3086                .unwrap_err();
3087
3088            const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3089            assert!(err.to_string().contains(MSG), "{err}");
3090
3091            // The tunnel reactor should be shutting down,
3092            // regardless of the error
3093            rt.progress_until_stalled().await;
3094            assert!(tunnel.is_closed());
3095
3096            // Keep circs alive, to prevent the reactor
3097            // from shutting down prematurely
3098            drop(circs);
3099        });
3100    }
3101
3102    /// Run a conflux test endpoint.
3103    #[cfg(feature = "conflux")]
3104    #[derive(Debug)]
3105    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3106        /// Pretend to be an exit relay.
3107        Relay(ConfluxExitState<I>),
3108        /// Client task.
3109        Client {
3110            /// Channel for receiving the outcome of the conflux handshakes.
3111            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3112            /// The tunnel reactor handle
3113            tunnel: Arc<ClientTunnel>,
3114            /// Data to send on a stream.
3115            send_data: Vec<u8>,
3116            /// Data we expect to receive on a stream.
3117            recv_data: Vec<u8>,
3118        },
3119    }
3120
3121    /// Structure for returning the sinks, channels, etc. that must stay
3122    /// alive until the test is complete.
3123    #[allow(unused, clippy::large_enum_variant)]
3124    #[derive(Debug)]
3125    #[cfg(feature = "conflux")]
3126    enum ConfluxEndpointResult {
3127        Circuit {
3128            tunnel: Arc<ClientTunnel>,
3129            stream: DataStream,
3130        },
3131        Relay {
3132            circ: TestCircuitCtx,
3133        },
3134    }
3135
3136    /// Stream data, shared by all the mock exit endpoints.
3137    #[derive(Debug)]
3138    #[cfg(feature = "conflux")]
3139    struct ConfluxStreamState {
3140        /// The data received so far on this stream (at the exit).
3141        data_recvd: Vec<u8>,
3142        /// The total amount of data we expect to receive on this stream.
3143        expected_data_len: usize,
3144        /// Whether we have seen a BEGIN cell yet.
3145        begin_recvd: bool,
3146        /// Whether we have seen an END cell yet.
3147        end_recvd: bool,
3148        /// Whether we have sent an END cell yet.
3149        end_sent: bool,
3150    }
3151
3152    #[cfg(feature = "conflux")]
3153    impl ConfluxStreamState {
3154        fn new(expected_data_len: usize) -> Self {
3155            Self {
3156                data_recvd: vec![],
3157                expected_data_len,
3158                begin_recvd: false,
3159                end_recvd: false,
3160                end_sent: false,
3161            }
3162        }
3163    }
3164
3165    /// An object describing a SWITCH cell that we expect to receive
3166    /// in the mock exit
3167    #[derive(Debug)]
3168    #[cfg(feature = "conflux")]
3169    struct ExpectedSwitch {
3170        /// The number of cells we've seen on this leg so far,
3171        /// up to and including the SWITCH.
3172        cells_so_far: usize,
3173        /// The expected seqno in SWITCH cell,
3174        seqno: u32,
3175    }
3176
3177    /// Object dispatching cells for delivery on the appropriate
3178    /// leg in a multipath tunnel.
3179    ///
3180    /// Used to send out-of-order cells from the mock exit
3181    /// to the client under test.
3182    #[cfg(feature = "conflux")]
3183    struct CellDispatcher {
3184        /// Channels on which to send the [`CellToSend`] commands on.
3185        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3186        /// The list of cells to send,
3187        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3188    }
3189
3190    #[cfg(feature = "conflux")]
3191    impl CellDispatcher {
3192        async fn run(mut self) {
3193            while !self.cells_to_send.is_empty() {
3194                let (circ_id, cell) = self.cells_to_send.remove(0);
3195                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3196                let (done_tx, done_rx) = oneshot::channel();
3197                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3198                // Wait for the cell to be sent before sending the next one.
3199                let () = done_rx.await.unwrap();
3200            }
3201        }
3202    }
3203
3204    /// A cell for the mock exit to send on one of its legs.
3205    #[cfg(feature = "conflux")]
3206    #[derive(Debug)]
3207    struct CellToSend {
3208        /// Channel for notifying the control task that the cell was sent.
3209        done_tx: oneshot::Sender<()>,
3210        /// The cell to send.
3211        cell: AnyRelayMsg,
3212    }
3213
3214    /// The state of a mock exit.
3215    #[derive(Debug)]
3216    #[cfg(feature = "conflux")]
3217    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3218        /// The runtime, shared by the test client and mock exit tasks.
3219        ///
3220        /// The mutex prevents the client and mock exit tasks from calling
3221        /// functions like [`MockRuntime::advance_until_stalled`]
3222        /// or [`MockRuntime::progress_until_stalled]` concurrently,
3223        /// as this is not supported by the mock runtime.
3224        runtime: Arc<AsyncMutex<MockRuntime>>,
3225        /// The client view of the tunnel.
3226        tunnel: Arc<ClientTunnel>,
3227        /// The circuit test context.
3228        circ: TestCircuitCtx,
3229        /// The RTT delay to introduce just before each SENDME.
3230        ///
3231        /// Used to trigger the client to send a SWITCH.
3232        rtt_delays: I,
3233        /// State of the (only) expected stream on this tunnel,
3234        /// shared by all the mock exit endpoints.
3235        stream_state: Arc<Mutex<ConfluxStreamState>>,
3236        /// The number of cells after which to expect a SWITCH
3237        /// cell from the client.
3238        expect_switch: Vec<ExpectedSwitch>,
3239        /// Channel for receiving notifications from the other leg.
3240        event_rx: mpsc::Receiver<MockExitEvent>,
3241        /// Channel for sending notifications to the other leg.
3242        event_tx: mpsc::Sender<MockExitEvent>,
3243        /// Whether this circuit leg should act as the primary (sending) leg.
3244        is_sending_leg: bool,
3245        /// A channel for receiving cells to send on this stream.
3246        cells_rx: mpsc::Receiver<CellToSend>,
3247    }
3248
3249    #[cfg(feature = "conflux")]
3250    async fn good_exit_handshake(
3251        runtime: &Arc<AsyncMutex<MockRuntime>>,
3252        init_rtt_delay: Option<Duration>,
3253        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3254        sink: &mut CircuitRxSender,
3255    ) {
3256        // Wait for the LINK cell
3257        let link = await_link_payload(rx).await;
3258
3259        // Introduce an artificial delay, to make one circ have a better initial RTT
3260        // than the other
3261        if let Some(init_rtt_delay) = init_rtt_delay {
3262            runtime.lock().await.advance_by(init_rtt_delay).await;
3263        }
3264
3265        // Reply with a LINKED cell...
3266        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3267        sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
3268
3269        // Wait for the client to respond with LINKED_ACK...
3270        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3271        let rmsg = match chmsg {
3272            AnyChanMsg::Relay(r) => {
3273                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3274                    .unwrap()
3275            }
3276            other => panic!("{other:?}"),
3277        };
3278        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3279
3280        assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3281    }
3282
3283    /// An event sent by one mock conflux leg to another.
3284    #[derive(Copy, Clone, Debug)]
3285    enum MockExitEvent {
3286        /// Inform the other leg we are done.
3287        Done,
3288        /// Inform the other leg a stream was opened.
3289        BeginRecvd(StreamId),
3290    }
3291
3292    #[cfg(feature = "conflux")]
3293    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3294        state: ConfluxExitState<I>,
3295    ) -> ConfluxEndpointResult {
3296        let ConfluxExitState {
3297            runtime,
3298            tunnel,
3299            mut circ,
3300            rtt_delays,
3301            stream_state,
3302            mut expect_switch,
3303            mut event_tx,
3304            mut event_rx,
3305            is_sending_leg,
3306            mut cells_rx,
3307        } = state;
3308
3309        let mut rtt_delays = rtt_delays.into_iter();
3310
3311        // Expect the client to open a stream, and de-multiplex the received stream data
3312        let stream_len = stream_state.lock().unwrap().expected_data_len;
3313        let mut data_cells_received = 0_usize;
3314        let mut cell_count = 0_usize;
3315        let mut tags = vec![];
3316        let mut streamid = None;
3317        let mut done_writing = false;
3318
3319        loop {
3320            let should_exit = {
3321                let stream_state = stream_state.lock().unwrap();
3322                let done_reading = stream_state.data_recvd.len() >= stream_len;
3323
3324                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3325            };
3326
3327            if should_exit {
3328                break;
3329            }
3330
3331            use futures::select;
3332
3333            // Only start reading from the dispatcher channel after the stream is open
3334            // and we're ready to start sending cells.
3335            let mut next_cell = if streamid.is_some() && !done_writing {
3336                Box::pin(cells_rx.next().fuse())
3337                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3338            } else {
3339                Box::pin(std::future::pending().fuse())
3340            };
3341
3342            // Wait for the BEGIN cell to arrive, or for the transfer to complete
3343            // (we need to bail if the other leg already completed);
3344            let res = select! {
3345                res = circ.chan_rx.next() => {
3346                    res.unwrap()
3347                },
3348                res = event_rx.next() => {
3349                    let Some(event) = res else {
3350                        break;
3351                    };
3352
3353                    match event {
3354                        MockExitEvent::Done => {
3355                            break;
3356                        },
3357                        MockExitEvent::BeginRecvd(id) => {
3358                            // The stream is now open (the other leg received the BEGIN),
3359                            // so we're reading to start reading cells from the cell dispatcher.
3360                            streamid = Some(id);
3361                            continue;
3362                        },
3363                    }
3364                }
3365                res = next_cell => {
3366                    if let Some(cell_to_send) = res {
3367                        let CellToSend { cell, done_tx } = cell_to_send;
3368
3369                        // SWITCH cells don't have a stream ID
3370                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3371                            None
3372                        } else {
3373                            streamid
3374                        };
3375
3376                        circ.circ_tx
3377                            .send(rmsg_to_ccmsg(streamid, cell, false))
3378                            .await
3379                            .unwrap();
3380
3381                        runtime.lock().await.advance_until_stalled().await;
3382                        done_tx.send(()).unwrap();
3383                    } else {
3384                        done_writing = true;
3385                    }
3386
3387                    continue;
3388                }
3389            };
3390
3391            let (_id, chmsg) = res.into_circid_and_msg();
3392            cell_count += 1;
3393            let rmsg = match chmsg {
3394                AnyChanMsg::Relay(r) => {
3395                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3396                        .unwrap()
3397                }
3398                other => panic!("{:?}", other),
3399            };
3400            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3401            if streamid.is_none() {
3402                streamid = new_streamid;
3403            }
3404
3405            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3406            let end_recvd = stream_state.lock().unwrap().end_recvd;
3407            match rmsg {
3408                AnyRelayMsg::Begin(_) if begin_recvd => {
3409                    panic!("client tried to open two streams?!");
3410                }
3411                AnyRelayMsg::Begin(_) if !begin_recvd => {
3412                    stream_state.lock().unwrap().begin_recvd = true;
3413                    // Reply with a connected cell...
3414                    let connected = relaymsg::Connected::new_empty().into();
3415                    circ.circ_tx
3416                        .send(rmsg_to_ccmsg(streamid, connected, false))
3417                        .await
3418                        .unwrap();
3419                    // Tell the other leg we received a BEGIN cell
3420                    event_tx
3421                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3422                        .await
3423                        .unwrap();
3424                }
3425                AnyRelayMsg::End(_) if !end_recvd => {
3426                    stream_state.lock().unwrap().end_recvd = true;
3427                    break;
3428                }
3429                AnyRelayMsg::End(_) if end_recvd => {
3430                    panic!("received two END cells for the same stream?!");
3431                }
3432                AnyRelayMsg::ConfluxSwitch(cell) => {
3433                    // Ensure we got the SWITCH after the expected number of cells
3434                    let expected = expect_switch.remove(0);
3435
3436                    assert_eq!(expected.cells_so_far, cell_count);
3437                    assert_eq!(expected.seqno, cell.seqno());
3438
3439                    // To keep the tests simple, we don't handle out of order cells,
3440                    // and simply sort the received data at the end.
3441                    // This ensures all the data was actually received,
3442                    // but it doesn't actually test that the SWITCH cells
3443                    // contain the appropriate seqnos.
3444                    continue;
3445                }
3446                AnyRelayMsg::Data(dat) => {
3447                    data_cells_received += 1;
3448                    stream_state
3449                        .lock()
3450                        .unwrap()
3451                        .data_recvd
3452                        .extend_from_slice(dat.as_ref());
3453
3454                    let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3455                    if is_next_cell_sendme {
3456                        if tags.is_empty() {
3457                            // Important: we need to make sure all the SENDMEs
3458                            // we sent so far have been processed by the reactor
3459                            // (otherwise the next QuerySendWindow call
3460                            // might return an outdated list of tags!)
3461                            runtime.lock().await.advance_until_stalled().await;
3462                            let (tx, rx) = oneshot::channel();
3463                            tunnel
3464                                .circ
3465                                .command
3466                                .unbounded_send(CtrlCmd::QuerySendWindow {
3467                                    hop: 2.into(),
3468                                    leg: circ.unique_id,
3469                                    done: tx,
3470                                })
3471                                .unwrap();
3472
3473                            // Get a fresh batch of tags.
3474                            let (_window, new_tags) = rx.await.unwrap().unwrap();
3475                            tags = new_tags;
3476                        }
3477
3478                        let tag = tags.remove(0);
3479
3480                        // Introduce an artificial delay, to make one circ have worse RTT
3481                        // than the other, and thus trigger a SWITCH
3482                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
3483                            runtime.lock().await.advance_by(rtt_delay).await;
3484                        }
3485                        // Make and send a circuit-level SENDME
3486                        let sendme = relaymsg::Sendme::from(tag).into();
3487
3488                        circ.circ_tx
3489                            .send(rmsg_to_ccmsg(None, sendme, false))
3490                            .await
3491                            .unwrap();
3492                    }
3493                }
3494                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3495            }
3496        }
3497
3498        let end_recvd = stream_state.lock().unwrap().end_recvd;
3499
3500        // Close the stream if the other endpoint hasn't already done so
3501        if is_sending_leg && !end_recvd {
3502            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3503            circ.circ_tx
3504                .send(rmsg_to_ccmsg(streamid, end, false))
3505                .await
3506                .unwrap();
3507            stream_state.lock().unwrap().end_sent = true;
3508        }
3509
3510        // This is allowed to fail, because the other leg might have exited first.
3511        let _ = event_tx.send(MockExitEvent::Done).await;
3512
3513        // Ensure we received all the switch cells we were expecting
3514        assert!(
3515            expect_switch.is_empty(),
3516            "expect_switch = {expect_switch:?}"
3517        );
3518
3519        ConfluxEndpointResult::Relay { circ }
3520    }
3521
3522    #[cfg(feature = "conflux")]
3523    async fn run_conflux_client(
3524        tunnel: Arc<ClientTunnel>,
3525        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3526        send_data: Vec<u8>,
3527        recv_data: Vec<u8>,
3528    ) -> ConfluxEndpointResult {
3529        let res = conflux_link_rx.await;
3530
3531        let res = res.unwrap().unwrap();
3532        assert_eq!(res.len(), 2);
3533
3534        // All circuit legs have completed the conflux handshake,
3535        // so we now have a multipath tunnel
3536
3537        // Now we're ready to open a stream
3538        let mut stream = tunnel
3539            .begin_stream("www.example.com", 443, None)
3540            .await
3541            .unwrap();
3542
3543        stream.write_all(&send_data).await.unwrap();
3544        stream.flush().await.unwrap();
3545
3546        let mut recv: Vec<u8> = Vec::new();
3547        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3548        assert_eq!(recv_len, recv_data.len());
3549        assert_eq!(recv_data, recv);
3550
3551        ConfluxEndpointResult::Circuit { tunnel, stream }
3552    }
3553
3554    #[cfg(feature = "conflux")]
3555    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3556        endpoint: ConfluxTestEndpoint<I>,
3557    ) -> ConfluxEndpointResult {
3558        match endpoint {
3559            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3560            ConfluxTestEndpoint::Client {
3561                tunnel,
3562                conflux_link_rx,
3563                send_data,
3564                recv_data,
3565            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3566        }
3567    }
3568
3569    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3570    // with 2 legs, opens a stream and sends 300 DATA cells on it.
3571    //
3572    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3573    // which mock the behavior of an exit. The two relay tasks introduce
3574    // artificial delays before each SENDME sent to the client,
3575    // in order to trigger it to switch its sending leg predictably.
3576    //
3577    // The mock exit does not send any data on the stream.
3578    //
3579    // This test checks that the client sends SWITCH cells at the right time,
3580    // and that all the data it sent over the stream arrived at the exit.
3581    //
3582    // Note, however, that it doesn't check that the client sends the data in
3583    // the right order. For simplicity, the test concatenates the data received
3584    // on both legs, sorts it, and then compares it against the of the data sent
3585    // by the client (TODO: improve this)
3586    #[traced_test]
3587    #[test]
3588    #[cfg(feature = "conflux")]
3589    fn multipath_client_to_exit() {
3590        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3591            /// The number of data cells to send.
3592            const NUM_CELLS: usize = 300;
3593            /// 498 bytes per DATA cell.
3594            const CELL_SIZE: usize = 498;
3595
3596            let TestTunnelCtx {
3597                tunnel,
3598                circs,
3599                conflux_link_rx,
3600            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3601            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3602
3603            // The stream data we're going to send over the conflux tunnel
3604            let mut send_data = (0..255_u8)
3605                .cycle()
3606                .take(NUM_CELLS * CELL_SIZE)
3607                .collect::<Vec<_>>();
3608            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3609
3610            let mut tasks = vec![];
3611
3612            // Channels used by the mock relays to notify each other
3613            // of various events.
3614            let (tx1, rx1) = mpsc::channel(1);
3615            let (tx2, rx2) = mpsc::channel(1);
3616
3617            // The 9 RTT delays to insert before each of the 9 SENDMEs
3618            // the exit will end up sending.
3619            //
3620            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
3621            let circ1_rtt_delays = [
3622                // Initially, circ1 has better RTT, so we will start on this leg.
3623                Some(Duration::from_millis(100)),
3624                // But then its RTT takes a turn for the worse,
3625                // triggering a switch after the first SENDME is processed
3626                // (this happens after sending 123 DATA cells).
3627                Some(Duration::from_millis(500)),
3628                Some(Duration::from_millis(700)),
3629                Some(Duration::from_millis(900)),
3630                Some(Duration::from_millis(1100)),
3631                Some(Duration::from_millis(1300)),
3632                Some(Duration::from_millis(1500)),
3633                Some(Duration::from_millis(1700)),
3634                Some(Duration::from_millis(1900)),
3635                Some(Duration::from_millis(2100)),
3636            ]
3637            .into_iter();
3638
3639            let circ2_rtt_delays = [
3640                Some(Duration::from_millis(200)),
3641                Some(Duration::from_millis(400)),
3642                Some(Duration::from_millis(600)),
3643                Some(Duration::from_millis(800)),
3644                Some(Duration::from_millis(1000)),
3645                Some(Duration::from_millis(1200)),
3646                Some(Duration::from_millis(1400)),
3647                Some(Duration::from_millis(1600)),
3648                Some(Duration::from_millis(1800)),
3649                Some(Duration::from_millis(2000)),
3650            ]
3651            .into_iter();
3652
3653            let expected_switches1 = vec![ExpectedSwitch {
3654                // We start on this leg, and receive a BEGIN cell,
3655                // followed by (4 * 31 - 1) = 123 DATA cells.
3656                // Then it becomes blocked on CC, then finally the reactor
3657                // realizes it has some SENDMEs to process, and
3658                // then as a result of the new RTT measurement, we switch to circ1,
3659                // and then finally we switch back here, and get another SWITCH
3660                // as the 126th cell.
3661                cells_so_far: 126,
3662                // Leg 2 switches back to this leg after the 249th cell
3663                // (just before sending the 250th one):
3664                // seqno = 125 carried over from leg 1 (see the seqno of the
3665                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
3666                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
3667                // is deemed favorable again.
3668                //
3669                // 249 - 125 (last_seq_sent of leg 1) = 124
3670                seqno: 124,
3671            }];
3672
3673            let expected_switches2 = vec![ExpectedSwitch {
3674                // The SWITCH is the first cell we received after the conflux HS
3675                // on this leg.
3676                cells_so_far: 1,
3677                // See explanation on the ExpectedSwitch from circ1 above.
3678                seqno: 125,
3679            }];
3680
3681            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3682
3683            // Drop the senders and close the channels,
3684            // we have nothing to send in this test.
3685            let (_, cells_rx1) = mpsc::channel(1);
3686            let (_, cells_rx2) = mpsc::channel(1);
3687
3688            let relay1 = ConfluxExitState {
3689                runtime: Arc::clone(&relay_runtime),
3690                tunnel: Arc::clone(&tunnel),
3691                circ: circ1,
3692                rtt_delays: circ1_rtt_delays,
3693                stream_state: Arc::clone(&stream_state),
3694                expect_switch: expected_switches1,
3695                event_tx: tx1,
3696                event_rx: rx2,
3697                is_sending_leg: true,
3698                cells_rx: cells_rx1,
3699            };
3700
3701            let relay2 = ConfluxExitState {
3702                runtime: Arc::clone(&relay_runtime),
3703                tunnel: Arc::clone(&tunnel),
3704                circ: circ2,
3705                rtt_delays: circ2_rtt_delays,
3706                stream_state: Arc::clone(&stream_state),
3707                expect_switch: expected_switches2,
3708                event_tx: tx2,
3709                event_rx: rx1,
3710                is_sending_leg: false,
3711                cells_rx: cells_rx2,
3712            };
3713
3714            for mut mock_relay in [relay1, relay2] {
3715                let leg = mock_relay.circ.unique_id;
3716
3717                // Do the conflux handshake
3718                //
3719                // We do this outside of run_conflux_endpoint,
3720                // toa void running both handshakes at concurrently
3721                // (this gives more predictable RTT delays:
3722                // if both handshake tasks run at once, they race
3723                // to advance the mock runtime's clock)
3724                good_exit_handshake(
3725                    &relay_runtime,
3726                    mock_relay.rtt_delays.next().flatten(),
3727                    &mut mock_relay.circ.chan_rx,
3728                    &mut mock_relay.circ.circ_tx,
3729                )
3730                .await;
3731
3732                let relay = ConfluxTestEndpoint::Relay(mock_relay);
3733
3734                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3735            }
3736
3737            tasks.push(rt.spawn_join(
3738                "client task".to_string(),
3739                run_conflux_endpoint(ConfluxTestEndpoint::Client {
3740                    tunnel,
3741                    conflux_link_rx,
3742                    send_data: send_data.clone(),
3743                    recv_data: vec![],
3744                }),
3745            ));
3746            let _sinks = futures::future::join_all(tasks).await;
3747            let mut stream_state = stream_state.lock().unwrap();
3748            assert!(stream_state.begin_recvd);
3749
3750            stream_state.data_recvd.sort();
3751            send_data.sort();
3752            assert_eq!(stream_state.data_recvd, send_data);
3753        });
3754    }
3755
3756    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3757    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
3758    //
3759    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3760    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
3761    // cells on the two circuit "legs" such that some cells arrive out of order.
3762    // This forces the client to buffer some cells, and then reorder them when
3763    // the missing cells finally arrive.
3764    //
3765    // The client does not send any data on the stream.
3766    #[cfg(feature = "conflux")]
3767    async fn run_multipath_exit_to_client_test(
3768        rt: MockRuntime,
3769        tunnel: TestTunnelCtx,
3770        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3771        send_data: Vec<u8>,
3772        recv_data: Vec<u8>,
3773    ) -> Arc<Mutex<ConfluxStreamState>> {
3774        let TestTunnelCtx {
3775            tunnel,
3776            circs,
3777            conflux_link_rx,
3778        } = tunnel;
3779        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3780
3781        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3782
3783        let mut tasks = vec![];
3784        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3785        let (cells_tx1, cells_rx1) = mpsc::channel(1);
3786        let (cells_tx2, cells_rx2) = mpsc::channel(1);
3787
3788        let dispatcher = CellDispatcher {
3789            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3790                .into_iter()
3791                .collect(),
3792            cells_to_send,
3793        };
3794
3795        // Channels used by the mock relays to notify each other
3796        // of various events.
3797        let (tx1, rx1) = mpsc::channel(1);
3798        let (tx2, rx2) = mpsc::channel(1);
3799
3800        let relay1 = ConfluxExitState {
3801            runtime: Arc::clone(&relay_runtime),
3802            tunnel: Arc::clone(&tunnel),
3803            circ: circ1,
3804            rtt_delays: [].into_iter(),
3805            stream_state: Arc::clone(&stream_state),
3806            // Expect no SWITCH cells from the client
3807            expect_switch: vec![],
3808            event_tx: tx1,
3809            event_rx: rx2,
3810            is_sending_leg: false,
3811            cells_rx: cells_rx1,
3812        };
3813
3814        let relay2 = ConfluxExitState {
3815            runtime: Arc::clone(&relay_runtime),
3816            tunnel: Arc::clone(&tunnel),
3817            circ: circ2,
3818            rtt_delays: [].into_iter(),
3819            stream_state: Arc::clone(&stream_state),
3820            // Expect no SWITCH cells from the client
3821            expect_switch: vec![],
3822            event_tx: tx2,
3823            event_rx: rx1,
3824            is_sending_leg: true,
3825            cells_rx: cells_rx2,
3826        };
3827
3828        // Run the cell dispatcher, which tells each exit leg task
3829        // what cells to write.
3830        //
3831        // This enables us to write out-of-order cells deterministically.
3832        rt.spawn(dispatcher.run()).unwrap();
3833
3834        for mut mock_relay in [relay1, relay2] {
3835            let leg = mock_relay.circ.unique_id;
3836
3837            good_exit_handshake(
3838                &relay_runtime,
3839                mock_relay.rtt_delays.next().flatten(),
3840                &mut mock_relay.circ.chan_rx,
3841                &mut mock_relay.circ.circ_tx,
3842            )
3843            .await;
3844
3845            let relay = ConfluxTestEndpoint::Relay(mock_relay);
3846
3847            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3848        }
3849
3850        tasks.push(rt.spawn_join(
3851            "client task".to_string(),
3852            run_conflux_endpoint(ConfluxTestEndpoint::Client {
3853                tunnel,
3854                conflux_link_rx,
3855                send_data: send_data.clone(),
3856                recv_data,
3857            }),
3858        ));
3859
3860        // Wait for all the tasks to complete
3861        let _sinks = futures::future::join_all(tasks).await;
3862
3863        stream_state
3864    }
3865
3866    #[traced_test]
3867    #[test]
3868    #[cfg(feature = "conflux")]
3869    fn multipath_exit_to_client() {
3870        // The data we expect the client to read from the stream
3871        const TO_SEND: &[u8] =
3872            b"But something about Buster Friendly irritated John Isidore, one specific thing";
3873
3874        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3875            // The indices of the tunnel legs.
3876            const CIRC1: usize = 0;
3877            const CIRC2: usize = 1;
3878
3879            // The client receives the following cells, in the order indicated
3880            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
3881            // S = SWITCH):
3882            //
3883            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
3884            //                              |                      |    |    | \
3885            //                              |                      |    |    |  v
3886            //                              |                      |    |    | client
3887            //                              |                      |    |    |  ^
3888            //                              |                      |    |    |/
3889            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
3890            //                 |   |    |   |    |       |         |    |    |
3891            //                 |   |    |   |    |       |         |    |    |
3892            //                 |   |    |   |    |       |         |    |    |
3893            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
3894            //
3895            //
3896            //  The cells marked with * are out of order.
3897            //
3898            // Note: t0 is the time when the client receives the first cell,
3899            // and t8 is the time when it receives the last one.
3900            // In other words, this test simulates a mock exit that "sent" the cells
3901            // in the order t0, t1, t2, t5, t4, t6, t7, t8
3902            let simple_switch = vec![
3903                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3904                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3905                // Switch to sending on the second leg
3906                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3907                // An out of order cell!
3908                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3909                // The missing cell (as indicated by seqno = 4 from the switch cell above)
3910                // is finally arriving on leg1
3911                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3912                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3913                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3914            ];
3915
3916            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
3917            //                                  |        |          |           |   |                              | \
3918            //                                  |        |          |           |   |                              |  v
3919            //                                  |        |          |           |   |                              |  client
3920            //                                  |        |          |           |   |                              |  ^
3921            //                                  |        |          |           |   |                              | /
3922            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
3923            //                 |        |       |    |   |          |           |   |    |     |         |         |
3924            //                 |        |       |    |   |          |           |   |    |     |         |         |
3925            //                 |        |       |    |   |          |           |   |    |     |         |         |
3926            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
3927            //  =====================================================================================================
3928            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
3929            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
3930            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
3931            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
3932            //
3933            //
3934            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
3935            //
3936            //
3937            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
3938            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
3939            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
3940            // which is the seqno of the last cell delivered to a stream
3941            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
3942            //
3943            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
3944            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
3945            // and the value of this absolute seqno when leg `N` was last used.
3946            //
3947            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
3948            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
3949            // haven't been received yet. At this point, the exit decides to switch to leg 2,
3950            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
3951            //
3952            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
3953            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
3954            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
3955            // the absolute seqno was `3`.
3956            //
3957            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
3958            // When the exit last sent on leg 2 (which we are switching to),
3959            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
3960            //
3961            // [^1]: only counting the cells that count towards sequence numbers
3962            let multiple_switches = vec![
3963                // Immediately switch to sending on the second leg
3964                // (indicating that we've already sent 3 cells (including the CONNECTED)
3965                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3966                // Two out of order cells!
3967                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3968                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3969                // The missing cells finally arrive on the first leg
3970                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3971                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3972                // Switch back to the first leg
3973                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3974                // OOO cell
3975                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3976                // Missing cell is received
3977                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3978                // The remaining cells are in-order
3979                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3980                // Switch right after we've sent all the data we had to send
3981                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3982            ];
3983
3984            // TODO: give these tests the ability to control when END cells are sent
3985            // (currently we have ensure the is_sending_leg is set to true
3986            // on the leg that ends up sending the last data cell).
3987            //
3988            // TODO: test the edge cases
3989            let tests = [simple_switch, multiple_switches];
3990
3991            for cells_to_send in tests {
3992                let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3993                assert_eq!(tunnel.circs.len(), 2);
3994                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3995                let cells_to_send = cells_to_send
3996                    .into_iter()
3997                    .map(|(i, cell)| (circ_ids[i], cell))
3998                    .collect();
3999
4000                // The client won't be sending any DATA cells on this stream
4001                let send_data = vec![];
4002                let stream_state = run_multipath_exit_to_client_test(
4003                    rt.clone(),
4004                    tunnel,
4005                    cells_to_send,
4006                    send_data.clone(),
4007                    TO_SEND.into(),
4008                )
4009                .await;
4010                let stream_state = stream_state.lock().unwrap();
4011                assert!(stream_state.begin_recvd);
4012                // We don't expect the client to have sent anything
4013                assert!(stream_state.data_recvd.is_empty());
4014            }
4015        });
4016    }
4017
4018    #[traced_test]
4019    #[test]
4020    #[cfg(all(feature = "conflux", feature = "hs-service"))]
4021    fn conflux_incoming_stream() {
4022        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4023            use std::error::Error as _;
4024
4025            const EXPECTED_HOP: u8 = 1;
4026
4027            let TestTunnelCtx {
4028                tunnel,
4029                circs,
4030                conflux_link_rx,
4031            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4032
4033            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4034
4035            let link = await_link_payload(&mut circ1.chan_rx).await;
4036            for circ in [&mut circ1, &mut circ2] {
4037                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4038                circ.circ_tx
4039                    .send(rmsg_to_ccmsg(None, linked, false))
4040                    .await
4041                    .unwrap();
4042            }
4043
4044            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4045            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4046
4047            // TODO(#2002): we don't currently support conflux for onion services
4048            let err = tunnel
4049                .allow_stream_requests(
4050                    &[tor_cell::relaycell::RelayCmd::BEGIN],
4051                    (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4052                    AllowAllStreamsFilter,
4053                )
4054                .await
4055                // IncomingStream doesn't impl Debug, so we need to map to a different type
4056                .map(|_| ())
4057                .unwrap_err();
4058
4059            let err_src = err.source().unwrap().to_string();
4060            assert!(
4061                err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4062                "{err_src}"
4063            );
4064        });
4065    }
4066
4067    #[test]
4068    fn client_circ_chan_msg() {
4069        use tor_cell::chancell::msg::{self, AnyChanMsg};
4070        fn good(m: AnyChanMsg) {
4071            assert!(ClientCircChanMsg::try_from(m).is_ok());
4072        }
4073        fn bad(m: AnyChanMsg) {
4074            assert!(ClientCircChanMsg::try_from(m).is_err());
4075        }
4076
4077        good(msg::Destroy::new(2.into()).into());
4078        bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4079        bad(msg::Created2::new(&b"and the next"[..]).into());
4080        good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4081        bad(msg::AnyChanMsg::RelayEarly(
4082            msg::Relay::new(&b"for the world and its mother"[..]).into(),
4083        ));
4084        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4085    }
4086}