Skip to main content

tor_proto/channel/
reactor.rs

1//! Code to handle incoming cells on a channel.
2//!
3//! The role of this code is to run in a separate asynchronous task,
4//! and routes cells to the right circuits.
5//!
6//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7//! or in the error handling behavior.
8
9use super::circmap::{CircEnt, CircMap};
10use crate::circuit::CircuitRxSender;
11use crate::client::circuit::halfcirc::HalfCirc;
12use crate::client::circuit::padding::{
13    PaddingController, PaddingEvent, PaddingEventStream, SendPadding, StartBlocking,
14};
15use crate::util::err::ReactorError;
16use crate::util::oneshot_broadcast;
17use crate::{Error, HopNum, Result};
18use tor_async_utils::SinkPrepareExt as _;
19use tor_cell::chancell::ChanMsg;
20use tor_cell::chancell::msg::{Destroy, DestroyReason, Padding, PaddingNegotiate};
21use tor_cell::chancell::{AnyChanCell, CircId, msg::AnyChanMsg};
22use tor_error::debug_report;
23use tor_rtcompat::{DynTimeProvider, Runtime};
24
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_error::error_report;
27#[cfg_attr(not(target_os = "linux"), allow(unused))]
28use tor_rtcompat::StreamOps;
29
30use futures::channel::mpsc;
31use oneshot_fused_workaround as oneshot;
32
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::sink::SinkExt;
36use futures::stream::Stream;
37use futures::{select, select_biased};
38use tor_error::internal;
39
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43
44use crate::channel::{ChannelDetails, CloseInfo, kist::KistParams, padding, params::*, unique_id};
45use crate::circuit::celltypes::CreateResponse;
46use tracing::{debug, instrument, trace};
47
48#[cfg(feature = "relay")]
49use {
50    crate::channel::Channel,
51    crate::circuit::celltypes::CreateRequest,
52    crate::relay::channel::create_handler::{CreateRequestHandler, RelayCircComponents},
53    std::sync::Weak,
54    tor_llcrypto::pk::ed25519::Ed25519Identity,
55    tor_llcrypto::pk::rsa::RsaIdentity,
56};
57
58/// A boxed trait object that can provide `ChanCell`s.
59pub(super) type BoxedChannelStream =
60    Box<dyn Stream<Item = std::result::Result<AnyChanCell, Error>> + Send + Unpin + 'static>;
61/// A boxed trait object that can sink `ChanCell`s.
62pub(super) type BoxedChannelSink =
63    Box<dyn Sink<AnyChanCell, Error = Error> + Send + Unpin + 'static>;
64/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
65pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
66/// The type of a oneshot channel used to inform reactor users of the result of an operation.
67pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
68
69cfg_if::cfg_if! {
70    if #[cfg(feature = "circ-padding")] {
71        use crate::util::sink_blocker::{SinkBlocker, CountingPolicy};
72        /// Type used by a channel reactor to send cells to the network.
73        pub(super) type ChannelOutputSink = SinkBlocker<BoxedChannelSink, CountingPolicy>;
74    } else {
75        /// Type used by a channel reactor to send cells to the network.
76        pub(super) type ChannelOutputSink = BoxedChannelSink;
77    }
78}
79
80/// A message telling the channel reactor to do something.
81#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
82#[derive(Debug)]
83#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
84#[allow(clippy::exhaustive_enums, private_interfaces)]
85pub enum CtrlMsg {
86    /// Shut down the reactor.
87    Shutdown,
88    /// Tell the reactor that a given circuit has gone away.
89    CloseCircuit(CircId),
90    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
91    /// and registering senders for messages received for the circuit.
92    AllocateCircuit {
93        /// Channel to send the circuit's `CreateResponse` down.
94        created_sender: oneshot::Sender<CreateResponse>,
95        /// Channel to send other messages from this circuit down.
96        sender: CircuitRxSender,
97        /// Oneshot channel to send the new circuit's identifiers down.
98        tx: ReactorResultChannel<(
99            CircId,
100            crate::circuit::UniqId,
101            PaddingController,
102            PaddingEventStream,
103        )>,
104    },
105    /// Enable/disable/reconfigure channel padding
106    ///
107    /// The sender of these messages is responsible for the optimisation of
108    /// ensuring that "no-change" messages are elided.
109    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
110    ///
111    /// These updates are done via a control message to avoid adding additional branches to the
112    /// main reactor `select!`.
113    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
114    /// Enable/disable/reconfigure KIST.
115    ///
116    /// Like in the case of `ConfigUpdate`,
117    /// the sender of these messages is responsible for the optimisation of
118    /// ensuring that "no-change" messages are elided.
119    KistConfigUpdate(KistParams),
120    /// Change the current padding implementation to the one provided.
121    #[cfg(feature = "circ-padding-manual")]
122    SetChannelPadder {
123        /// The padder to install, or None to remove any existing padder.
124        padder: Option<crate::client::CircuitPadder>,
125        /// A oneshot channel to use in reporting the outcome.
126        sender: oneshot::Sender<Result<()>>,
127    },
128}
129
130/// Object to handle incoming cells and background tasks on a channel.
131///
132/// This type is returned when you finish a channel; you need to spawn a
133/// new task that calls `run()` on it.
134#[must_use = "If you don't call run() on a reactor, the channel won't work."]
135pub struct Reactor<R: Runtime> {
136    /// Underlying runtime we use for generating sleep futures and telling time.
137    pub(super) runtime: R,
138    /// A receiver for control messages from `Channel` objects.
139    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
140    /// A oneshot sender that is used to alert other tasks when this reactor is
141    /// finally dropped.
142    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
143    /// A receiver for cells to be sent on this reactor's sink.
144    ///
145    /// `Channel` objects have a sender that can send cells here.
146    pub(super) cells: super::CellRx,
147    /// A Stream from which we can read `ChanCell`s.
148    ///
149    /// This should be backed by a TLS connection if you want it to be secure.
150    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
151    /// A Sink to which we can write `ChanCell`s.
152    ///
153    /// This should also be backed by a TLS connection if you want it to be secure.
154    pub(super) output: ChannelOutputSink,
155    /// A handler for setting stream options on the underlying stream.
156    #[cfg_attr(not(target_os = "linux"), allow(unused))]
157    pub(super) streamops: BoxedChannelStreamOps,
158    /// A handler and associated data for CREATE2/CREATE_FAST messages,
159    /// if this channel should handle them.
160    #[cfg(feature = "relay")]
161    pub(super) create_request_handler: Option<CreateRequestHandlerAndData>,
162    /// Timer tracking when to generate channel padding.
163    ///
164    /// Note that this is _distinct_ from the experimental maybenot-based padding
165    /// implemented with padding_ctrl and padding_stream.
166    /// This is the existing per-channel padding
167    /// in the tor protocol used to resist netflow attacks.
168    pub(super) padding_timer: Pin<Box<padding::Timer<R>>>,
169    /// Outgoing cells introduced at the channel reactor
170    pub(super) special_outgoing: SpecialOutgoing,
171    /// A map from circuit ID to Sinks on which we can deliver cells.
172    pub(super) circs: CircMap,
173    /// A unique identifier for this channel.
174    pub(super) unique_id: super::UniqId,
175    /// Information shared with the frontend
176    pub(super) details: Arc<ChannelDetails>,
177    /// Context for allocating unique circuit log identifiers.
178    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
179    /// A padding controller to which padding-related events should be reported.
180    ///
181    /// (This is used for experimental maybenot-based padding.)
182    //
183    // TODO: It would be good to use S here instead of DynTimeProvider,
184    // but we still need the latter for the clones of padding_ctrl that we hand out
185    // inside ChannelSender.
186    pub(super) padding_ctrl: PaddingController<DynTimeProvider>,
187    /// An event stream telling us about padding-related events.
188    ///
189    /// (This is used for experimental maybenot-based padding.)
190    pub(super) padding_event_stream: PaddingEventStream<DynTimeProvider>,
191    /// If present, the current rules for blocking the output based on the padding framework.
192    pub(super) padding_blocker: Option<StartBlocking>,
193    /// What link protocol is the channel using?
194    #[allow(dead_code)] // We don't support protocols where this would matter
195    pub(super) link_protocol: u16,
196}
197
198/// Outgoing cells introduced at the channel reactor
199#[derive(Default, Debug, Clone)]
200pub(super) struct SpecialOutgoing {
201    /// If we must send a `PaddingNegotiate`, this is present.
202    padding_negotiate: Option<PaddingNegotiate>,
203    /// A number of pending PADDING cells that we have to send, once there is space.
204    n_padding: u16,
205}
206
207impl SpecialOutgoing {
208    /// Do we have a special cell to send?
209    ///
210    /// Called by the reactor before looking for cells from the reactor's clients.
211    /// The returned message *must* be sent by the caller, not dropped!
212    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
213    fn next(&mut self) -> Option<AnyChanCell> {
214        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
215        // enum, so that we can fast-path the usual case of "no special message to send".
216        if let Some(p) = self.padding_negotiate.take() {
217            return Some(p.into());
218        }
219        if self.n_padding > 0 {
220            self.n_padding -= 1;
221            return Some(Padding::new().into());
222        }
223        None
224    }
225
226    /// Try to queue a padding cell to be sent.
227    fn queue_padding_cell(&mut self) {
228        self.n_padding = self.n_padding.saturating_add(1);
229    }
230}
231
232/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
233///
234/// There is no risk of confusion because no-one would try to print a
235/// Reactor for some other reason.
236impl<R: Runtime> fmt::Display for Reactor<R> {
237    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238        fmt::Debug::fmt(&self.unique_id, f)
239    }
240}
241
242impl<R: Runtime> Reactor<R> {
243    /// Launch the reactor, and run until the channel closes or we
244    /// encounter an error.
245    ///
246    /// Once this function returns, the channel is dead, and can't be
247    /// used again.
248    #[instrument(level = "trace", skip_all)]
249    pub async fn run(mut self) -> Result<()> {
250        trace!(channel_id = %self, "Running reactor");
251        let result: Result<()> = loop {
252            match self.run_once().await {
253                Ok(()) => (),
254                Err(ReactorError::Shutdown) => break Ok(()),
255                Err(ReactorError::Err(e)) => break Err(e),
256            }
257        };
258
259        // Log that the reactor stopped, possibly with the associated error as a report.
260        // May log at a higher level depending on the error kind.
261        const MSG: &str = "Reactor stopped";
262        match &result {
263            Ok(()) => debug!(channel_id = %self, "{MSG}"),
264            Err(e) => debug_report!(e, channel_id = %self, "{MSG}"),
265        }
266
267        // Inform any waiters that the channel has closed.
268        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
269        self.reactor_closed_tx.send(close_msg);
270        result
271    }
272
273    /// Helper for run(): handles only one action.
274    #[instrument(level = "trace", skip_all)]
275    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
276        select! {
277
278            // See if the output sink can have cells written to it yet.
279            // If so, see if we have to-be-transmitted cells.
280            ret = self.output.prepare_send_from(async {
281                // This runs if we will be able to write, so try to obtain a cell:
282
283                if let Some(l) = self.special_outgoing.next() {
284                    // See reasoning below.
285                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
286                    self.padding_timer.as_mut().note_cell_sent();
287                    return Some((l, None));
288                }
289
290                select_biased! {
291                    n = self.cells.next() => {
292                        // Note transmission on *input* to the reactor, not ultimate
293                        // transmission.  Ideally we would tap into the TCP stream at the far
294                        // end of our TLS or perhaps during encoding on entry to the TLS, but
295                        // both of those would involve quite some plumbing.  Doing it here in
296                        // the reactor avoids additional inter-task communication, mutexes,
297                        // etc.  (And there is no real difference between doing it here on
298                        // input, to just below, on enquieing into the `sendable`.)
299                        //
300                        // Padding is sent when the output channel is idle, and the effect of
301                        // buffering is just that we might sent it a little early because we
302                        // measure idleness when we last put something into the output layers.
303                        //
304                        // We can revisit this if measurement shows it to be bad in practice.
305                        //
306                        // (We in any case need padding that we generate when idle to make it
307                        // through to the output promptly, or it will be late and ineffective.)
308                        self.padding_timer.as_mut().note_cell_sent();
309                        n
310                    },
311                    p = self.padding_timer.as_mut().next() => {
312                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
313
314                        // Note that we treat padding from the padding_timer as a normal cell,
315                        // since it doesn't have a padding machine.
316                        self.padding_ctrl.queued_data(HopNum::from(0));
317
318                        self.padding_timer.as_mut().note_cell_sent();
319                        Some((p.into(), None))
320                    },
321                }
322            }) => {
323                self.padding_ctrl.flushed_channel_cell();
324                let (queued, sendable) = ret?;
325                let (msg, cell_padding_info) = queued.ok_or(ReactorError::Shutdown)?;
326                // Tell the relevant circuit padder that this cell is getting flushed.
327                // Note that, technically, it won't go onto the network for a while longer:
328                // it has to go through the TLS buffer, and the kernel TCP buffer.
329                // We've got to live with that.
330                // TODO: conceivably we could defer this even longer, but it would take
331                // some tricky hacking!
332                if let (Some(cell_padding_info), Some(circid)) = (cell_padding_info, msg.circid()) {
333                    self.circs.note_cell_flushed(circid, cell_padding_info);
334                }
335                sendable.send(msg)?;
336            }
337
338            ret = self.control.next() => {
339                let ctrl = match ret {
340                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
341                    Some(x) => x,
342                };
343                self.handle_control(ctrl).await?;
344            }
345
346            ret = self.padding_event_stream.next() => {
347                let event = ret.ok_or_else(|| Error::from(internal!("Padding event stream was exhausted")))?;
348                self.handle_padding_event(event).await?;
349            }
350
351            ret = self.input.next() => {
352                let item = ret
353                    .ok_or(ReactorError::Shutdown)??;
354                crate::note_incoming_traffic();
355                self.handle_cell(item).await?;
356            }
357
358        }
359        Ok(()) // Run again.
360    }
361
362    /// Handle a CtrlMsg other than Shutdown.
363    #[instrument(level = "trace", skip(self))] // Intentionally omitting skip_all, msg is useful and not sensitive
364    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
365        trace!(
366            channel_id = %self,
367            msg = ?msg,
368            "reactor received control message"
369        );
370
371        match msg {
372            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
373            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
374            CtrlMsg::AllocateCircuit {
375                created_sender,
376                sender,
377                tx,
378            } => {
379                let mut rng = rand::rng();
380                let my_unique_id = self.unique_id;
381                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
382                // NOTE: This is a very weird place to be calling new_padding, but:
383                //  - we need to do it here or earlier, so we can add it as part of the CircEnt to
384                //    our map.
385                //  - We need to do it at some point where we have a runtime, which implies in a
386                //    reactor.
387                //
388                // TODO circpad: We might want to lazy-allocate this somehow, or try harder to make
389                // it a no-op when we aren't padding on a particular circuit.
390                let (padding_ctrl, padding_stream) = crate::client::circuit::padding::new_padding(
391                    // TODO: avoid using DynTimeProvider at some point, and re-parameterize for efficiency.
392                    DynTimeProvider::new(self.runtime.clone()),
393                );
394                let ret: Result<_> = self
395                    .circs
396                    .add_origin_ent(&mut rng, created_sender, sender, padding_ctrl.clone())
397                    .map(|id| (id, circ_unique_id, padding_ctrl, padding_stream));
398                let _ = tx.send(ret); // don't care about other side going away
399                self.update_disused_since();
400            }
401            CtrlMsg::ConfigUpdate(updates) => {
402                if self.link_protocol == 4 {
403                    // Link protocol 4 does not permit sending, or negotiating, link padding.
404                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
405                    // keep doing padding things.
406                    return Ok(());
407                }
408
409                let ChannelPaddingInstructionsUpdates {
410                    // List all the fields explicitly; that way the compiler will warn us
411                    // if one is added and we fail to handle it here.
412                    padding_enable,
413                    padding_parameters,
414                    padding_negotiate,
415                } = &*updates;
416                if let Some(parameters) = padding_parameters {
417                    self.padding_timer.as_mut().reconfigure(parameters)?;
418                }
419                if let Some(enable) = padding_enable {
420                    if *enable {
421                        self.padding_timer.as_mut().enable();
422                    } else {
423                        self.padding_timer.as_mut().disable();
424                    }
425                }
426                if let Some(padding_negotiate) = padding_negotiate {
427                    // This replaces any previous PADDING_NEGOTIATE cell that we were
428                    // told to send, but which we didn't manage to send yet.
429                    // It doesn't make sense to queue them up.
430                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
431                }
432            }
433            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
434            #[cfg(feature = "circ-padding-manual")]
435            CtrlMsg::SetChannelPadder { padder, sender } => {
436                self.padding_ctrl
437                    .install_padder_padding_at_hop(HopNum::from(0), padder);
438                let _ignore = sender.send(Ok(()));
439            }
440        }
441        Ok(())
442    }
443
444    /// Take the padding action described in `action`.
445    ///
446    /// (With circuit padding disabled, PaddingEvent can't be constructed.)
447    #[cfg(not(feature = "circ-padding"))]
448    #[allow(clippy::unused_async)] // for symmetry with the version below
449    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
450        void::unreachable(action.0)
451    }
452
453    /// Take the padding action described in `action`.
454    #[cfg(feature = "circ-padding")]
455    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
456        use PaddingEvent as PE;
457        match action {
458            PE::SendPadding(send_padding) => {
459                self.handle_send_padding(send_padding).await?;
460            }
461            PE::StartBlocking(start_blocking) => {
462                if self.output.is_unlimited() {
463                    self.output.set_blocked();
464                }
465                self.padding_blocker = Some(start_blocking);
466            }
467            PE::StopBlocking => {
468                self.output.set_unlimited();
469            }
470        }
471        Ok(())
472    }
473
474    /// Send the padding described in `padding`.
475    #[cfg(feature = "circ-padding")]
476    async fn handle_send_padding(&mut self, padding: SendPadding) -> Result<()> {
477        // TODO circpad: This is somewhat duplicative of the logic in `Circuit::send_padding` and
478        // `Circuit::padding_disposition`.  It might be good to unify them at some point.
479        // For now (Oct 2025), though, they have slightly different inputs and behaviors.
480
481        use crate::client::circuit::padding::{Bypass::*, Replace::*};
482        // multihop padding belongs in circuit padders, not here.
483        let hop = HopNum::from(0);
484        assert_eq!(padding.hop, hop);
485
486        // If true, there is blocking, but we are allowed to bypass it.
487        let blocking_bypassed = matches!(
488            (&self.padding_blocker, padding.may_bypass_block()),
489            (
490                Some(StartBlocking {
491                    is_bypassable: true
492                }),
493                BypassBlocking
494            )
495        );
496        // If true, there is blocking, and we can't bypass it.
497        let this_padding_blocked = self.padding_blocker.is_some() && !blocking_bypassed;
498
499        if padding.may_replace_with_data() == Replaceable {
500            if self.output_is_full().await? {
501                // When the output buffer is full,
502                // we _always_ treat it as satisfying our replaceable padding.
503                //
504                // TODO circpad: It would be better to check whether
505                // the output has any bytes at all, but futures_codec doesn't seem to give us a
506                // way to check that.  If we manage to do so in the future, we should change the
507                // logic in this function.
508                self.padding_ctrl
509                    .replaceable_padding_already_queued(hop, padding);
510                return Ok(());
511            } else if self.cells.approx_count() > 0 {
512                // We can replace the padding with outbound cells!
513                if this_padding_blocked {
514                    // In the blocked case, we just declare that the pending data _is_ the queued padding.
515                    self.padding_ctrl
516                        .replaceable_padding_already_queued(hop, padding);
517                } else {
518                    // Otherwise we report that queued data _became_ padding,
519                    // and we allow it to pass any blocking that's present.
520                    self.padding_ctrl.queued_data_as_padding(hop, padding);
521                    if blocking_bypassed {
522                        self.output.allow_n_additional_items(1);
523                    }
524                }
525                return Ok(());
526            } else {
527                // There's nothing to replace this with, so fall through.
528            }
529        }
530
531        // There's no replacement, so we queue unconditionally.
532        self.special_outgoing.queue_padding_cell();
533        self.padding_ctrl.queued_padding(hop, padding);
534        if blocking_bypassed {
535            self.output.allow_n_additional_items(1);
536        }
537
538        Ok(())
539    }
540
541    /// Return true if the output stream is full.
542    ///
543    /// We use this in circuit padding to implement replaceable padding.
544    //
545    // TODO circpad: We'd rather check whether there is any data at all queued in self.output,
546    // but futures_codec doesn't give us a way to do that.
547    #[cfg(feature = "circ-padding")]
548    async fn output_is_full(&mut self) -> Result<bool> {
549        use futures::future::poll_fn;
550        use std::task::Poll;
551        // We use poll_fn to get a cx that we can pass to poll_ready_unpin.
552        poll_fn(|cx| {
553            Poll::Ready(match self.output.poll_ready_unpin(cx) {
554                // If if's ready to send, it isn't full.
555                Poll::Ready(Ok(())) => Ok(false),
556                // If it isn't ready to send, it's full.
557                Poll::Pending => Ok(true),
558                // Propagate errors:
559                Poll::Ready(Err(e)) => Err(e),
560            })
561        })
562        .await
563    }
564
565    /// Helper: process a cell on a channel.  Most cell types get ignored
566    /// or rejected; a few get delivered to circuits.
567    #[instrument(level = "trace", skip_all)]
568    async fn handle_cell(&mut self, cell: AnyChanCell) -> Result<()> {
569        let (circid, msg) = cell.into_circid_and_msg();
570        use AnyChanMsg::*;
571
572        match msg {
573            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
574            _ => trace!(
575                channel_id = %self,
576                "received {} for {}",
577                msg.cmd(),
578                CircId::get_or_zero(circid)
579            ),
580        }
581
582        // Report the message to the padding controller.
583        match msg {
584            Padding(_) | Vpadding(_) => {
585                // We always accept channel padding, even if we haven't negotiated any.
586                let _always_acceptable = self.padding_ctrl.decrypted_padding(HopNum::from(0));
587            }
588            _ => self.padding_ctrl.decrypted_data(HopNum::from(0)),
589        }
590
591        match msg {
592            // These are allowed, and need to be handled.
593            Relay(_) => self.deliver_relay(circid, msg).await,
594
595            // The 'if' guard is important as we should not consider this branch if we're not
596            // supposed to handle CREATE* cells (and therefore RELAY_EARLY),
597            // regardless of whether the "relay" feature is set.
598            #[cfg(feature = "relay")]
599            RelayEarly(_) if self.create_request_handler.is_some() => {
600                self.deliver_relay(circid, msg).await
601            }
602
603            Destroy(_) => self.deliver_destroy(circid, msg).await,
604
605            // The 'if' guard is important as we should not consider this branch if we're not
606            // supposed to handle CREATE* cells, regardless of whether the "relay" feature is set.
607            // We should instead fall through to the wildcard pattern.
608            //
609            // Clients that enable the "relay" feature, and outgoing channels for bridges,
610            // will not have a handler set.
611            #[cfg(feature = "relay")]
612            CreateFast(msg) if self.create_request_handler.is_some() => {
613                self.handle_create(circid, CreateRequest::CreateFast(msg))
614                    .await
615            }
616            #[cfg(feature = "relay")]
617            Create2(msg) if self.create_request_handler.is_some() => {
618                self.handle_create(circid, CreateRequest::Create2(msg))
619                    .await
620            }
621
622            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg),
623
624            // These are always ignored.
625            Padding(_) | Vpadding(_) => Ok(()),
626            _ => Err(Error::ChanProto(format!("Unexpected cell: {msg:?}"))),
627        }
628    }
629
630    /// Give the RELAY (or possibly RELAY_EARLY) cell `msg` to the appropriate circuit.
631    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
632        let Some(circid) = circid else {
633            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
634        };
635
636        let mut ent = self
637            .circs
638            .get_mut(circid)
639            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
640
641        match &mut *ent {
642            CircEnt::OpenOrigin { cell_sender: s, .. } => {
643                // There's an open circuit; we can give it the RELAY cell.
644                if s.send(msg).await.is_err() {
645                    drop(ent);
646                    // The circuit's receiver went away, so we should destroy the circuit.
647                    self.outbound_destroy_circ(circid).await?;
648                }
649                Ok(())
650            }
651            #[cfg(feature = "relay")]
652            CircEnt::OpenRelay { cell_sender: s, .. } => {
653                // There's an open circuit; we can give it the RELAY cell.
654                if s.send(msg).await.is_err() {
655                    drop(ent);
656                    // The circuit's receiver went away, so we should destroy the circuit.
657                    // We send a DESTROY on our own channel, and the circuit reactor should have
658                    // taken care of sending a DESTROY on the other channel.
659                    self.outbound_destroy_circ(circid).await?;
660                }
661                Ok(())
662            }
663            CircEnt::Opening { .. } => Err(Error::ChanProto(
664                "Relay cell on pending circuit before CREATED* received".into(),
665            )),
666            CircEnt::DestroySent(hs) => hs.receive_cell(),
667        }
668    }
669
670    /// Handle a CREATE* cell `msg`.
671    #[cfg(feature = "relay")]
672    async fn handle_create(&mut self, circid: Option<CircId>, msg: CreateRequest) -> Result<()> {
673        let Some(ref create_request_handler) = self.create_request_handler else {
674            // We should have checked this in an 'if' guard in 'handle_cell()'.
675            return Err(internal!("Called 'deliver_relay()', but handler isn't set").into());
676        };
677
678        let Some(circid) = circid else {
679            let err = format!("Received {} cell without circuit ID", msg.cmd());
680            return Err(Error::ChanProto(err));
681        };
682
683        let Some(chan) = create_request_handler.channel.upgrade() else {
684            // This can happen if the last `Arc<Channel>` was dropped before the reactor had a
685            // chance to notice.
686            // We'll just try to reject the new circuit request and let the reactor shut down
687            // normally, rather than return an error.
688            let destroy = Destroy::new(DestroyReason::CHANNEL_CLOSED);
689            let destroy = AnyChanCell::new(Some(circid), destroy.into());
690
691            debug!(
692                "Unable to upgrade weak `Channel` while handling {}; sending {}",
693                msg.cmd(),
694                destroy.msg().cmd(),
695            );
696            return self.send_cell(destroy).await;
697        };
698
699        // Allocate an internal circuit ID, regardless of if the create fails or not.
700        // We expect that this will not overflow since it would require an attacker to send
701        // 500*2^32 bytes (~2 TiB) worth of cells.
702        let circ_uniq_id = self.circ_unique_id_ctx.next(self.unique_id);
703
704        // Build the relay circuit.
705        let create_result = create_request_handler.handler.handle_create(
706            &self.runtime,
707            &chan,
708            &create_request_handler.our_ed25519_id,
709            &create_request_handler.our_rsa_id,
710            circid,
711            &msg,
712            &self.details.memquota,
713            circ_uniq_id,
714        );
715
716        // Add the circuit to the circuit map.
717        let response = match create_result {
718            Ok((response, components)) => {
719                let RelayCircComponents {
720                    circ,
721                    sender,
722                    padding_ctrl,
723                } = components;
724
725                if let Err(reason) = self.circs.add_relay_ent(circid, circ, sender, padding_ctrl) {
726                    debug!("Unable to add circuit map entry for incoming circuit: {reason}");
727                    CreateResponse::Destroy(Destroy::new(reason))
728                } else {
729                    response
730                }
731            }
732            Err(destroy) => CreateResponse::Destroy(destroy),
733        };
734
735        let response = AnyChanCell::new(Some(circid), response.into());
736        self.send_cell(response).await
737    }
738
739    /// Handle a CREATED{_FAST,2} cell by passing it on to the appropriate
740    /// circuit, if that circuit is waiting for one.
741    fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
742        let Some(circid) = circid else {
743            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
744        };
745
746        let target = self.circs.advance_from_opening(circid)?;
747        let created = msg.try_into()?;
748        // TODO(nickm) I think that this one actually means the other side
749        // is closed. See arti#269.
750        target.send(created).map_err(|_| {
751            Error::from(internal!(
752                "Circuit queue rejected created message. Is it closing?"
753            ))
754        })
755    }
756
757    /// Handle a DESTROY cell by removing the corresponding circuit
758    /// from the map, and passing the destroy cell onward to the circuit.
759    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
760        let Some(circid) = circid else {
761            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
762        };
763
764        /// Helper to send DESTROY cell.
765        async fn send_destroy(mut sender: CircuitRxSender, msg: AnyChanMsg) -> Result<()> {
766            sender
767                .send(msg)
768                .await
769                // TODO(nickm) I think that this one actually means the other side
770                // is closed. See arti#269.
771                .map_err(|_| internal!("open circuit wasn't interested in destroy cell?").into())
772        }
773
774        // Remove the circuit from the map: nothing more can be done with it.
775        let entry = self.circs.remove(circid);
776        self.update_disused_since();
777        match entry {
778            // If the circuit is waiting for CREATED, tell it that it
779            // won't get one.
780            Some(CircEnt::Opening {
781                create_response_sender,
782                ..
783            }) => {
784                trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
785                create_response_sender
786                    .send(msg.try_into()?)
787                    // TODO(nickm) I think that this one actually means the other side
788                    // is closed. See arti#269.
789                    .map_err(|_| {
790                        internal!("pending circuit wasn't interested in destroy cell?").into()
791                    })
792            }
793            // It's an open origin circuit: tell it that it got a DESTROY cell.
794            Some(CircEnt::OpenOrigin { cell_sender, .. }) => {
795                trace!(channel_id = %self, "Passing destroy to open origin circuit {}", circid);
796                send_destroy(cell_sender, msg).await
797            }
798            // It's an open relay circuit: tell it that it got a DESTROY cell.
799            #[cfg(feature = "relay")]
800            Some(CircEnt::OpenRelay { cell_sender, .. }) => {
801                trace!(channel_id = %self, "Passing destroy to open relay circuit {}", circid);
802                send_destroy(cell_sender, msg).await
803            }
804            // We've sent a destroy; we can leave this circuit removed.
805            Some(CircEnt::DestroySent(_)) => Ok(()),
806            // Got a DESTROY cell for a circuit we don't have.
807            None => {
808                trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
809                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
810            }
811        }
812    }
813
814    /// Helper: send a cell on the outbound sink.
815    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
816        self.output.send(cell).await?;
817        Ok(())
818    }
819
820    /// Called when a circuit goes away: sends a DESTROY cell and removes
821    /// the circuit.
822    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
823        trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
824        // Remove the circuit's entry from the map: nothing more
825        // can be done with it.
826        // TODO: It would be great to have a tighter upper bound for
827        // the number of relay cells we'll receive.
828        self.circs.destroy_sent(id, HalfCirc::new(3000));
829        self.update_disused_since();
830        let destroy = Destroy::new(DestroyReason::NONE).into();
831        let cell = AnyChanCell::new(Some(id), destroy);
832        self.send_cell(cell).await?;
833
834        Ok(())
835    }
836
837    /// Update disused timestamp with current time if this channel is no longer used
838    fn update_disused_since(&self) {
839        if self.circs.open_ent_count() == 0 {
840            // Update disused_since if it still indicates that the channel is in use
841            self.details.unused_since.update_if_none();
842        } else {
843            // Mark this channel as in use
844            self.details.unused_since.clear();
845        }
846    }
847
848    /// Use the new KIST parameters.
849    #[cfg(target_os = "linux")]
850    fn apply_kist_params(&self, params: &KistParams) {
851        use super::kist::KistMode;
852
853        let set_tcp_notsent_lowat = |v: u32| {
854            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
855                // This is bad, but not fatal: not setting the KIST options
856                // comes with a performance penalty, but we don't have to crash.
857                error_report!(e, "Failed to set KIST socket options");
858            }
859        };
860
861        match params.kist_enabled() {
862            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
863            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
864        }
865    }
866
867    /// Use the new KIST parameters.
868    #[cfg(not(target_os = "linux"))]
869    fn apply_kist_params(&self, params: &KistParams) {
870        use super::kist::KistMode;
871
872        if params.kist_enabled() != KistMode::Disabled {
873            tracing::warn!("KIST not currently supported on non-linux platforms");
874        }
875    }
876}
877
878/// If the channel is configured to handle CREATE* requests,
879/// this contains anything that is needed solely for this purpose.
880#[cfg(feature = "relay")]
881pub(super) struct CreateRequestHandlerAndData {
882    /// A handler for CREATE2/CREATE_FAST messages.
883    pub(super) handler: Arc<CreateRequestHandler>,
884    /// The [`Channel`] associated with this reactor.
885    ///
886    /// We don't want the channel reactor to access its `Channel` directly
887    /// (shared data should use its [`ChannelDetails`] instead),
888    /// but we need it to pass it to new circuit reactors,
889    /// so we store a copy here.
890    pub(super) channel: Weak<Channel>,
891    /// Our Ed25519 identity for ntor-v3 handshakes.
892    pub(super) our_ed25519_id: Ed25519Identity,
893    /// Our RSA identity for ntor handshakes.
894    pub(super) our_rsa_id: RsaIdentity,
895}
896
897#[cfg(test)]
898pub(crate) mod test {
899    #![allow(clippy::unwrap_used)]
900    use super::*;
901    use crate::channel::{Canonicity, ChannelMode, ClosedUnexpectedly, UniqId};
902    use crate::client::circuit::CircParameters;
903    use crate::client::circuit::padding::new_padding;
904    use crate::fake_mpsc;
905    use crate::peer::{PeerAddr, PeerInfo};
906    use crate::util::{DummyTimeoutEstimator, fake_mq};
907    use futures::sink::SinkExt;
908    use futures::stream::StreamExt;
909    use tor_cell::chancell::msg;
910    use tor_linkspec::{OwnedChanTarget, RelayIdsBuilder};
911    use tor_rtcompat::SpawnExt;
912    use tor_rtcompat::{DynTimeProvider, NoOpStreamOpsHandle, Runtime};
913
914    pub(crate) type CodecResult = std::result::Result<AnyChanCell, Error>;
915
916    pub(crate) fn new_reactor<R: Runtime>(
917        runtime: R,
918    ) -> (
919        Arc<crate::channel::Channel>,
920        Reactor<R>,
921        mpsc::Receiver<AnyChanCell>,
922        mpsc::Sender<CodecResult>,
923    ) {
924        let link_protocol = 4;
925        let (send1, recv1) = mpsc::channel(32);
926        let (send2, recv2) = mpsc::channel(32);
927        let unique_id = UniqId::new();
928        let ed = [6; 32].into();
929        let rsa = [10; 20].into();
930        let dummy_target = OwnedChanTarget::builder()
931            .ed_identity(ed)
932            .rsa_identity(rsa)
933            .build()
934            .unwrap();
935        let mut peer_ids = RelayIdsBuilder::default();
936        peer_ids.ed_identity(ed);
937        peer_ids.rsa_identity(rsa);
938        let peer_info = PeerInfo::new(PeerAddr::UNSPECIFIED, peer_ids.build().unwrap());
939        let send1 = send1.sink_map_err(|e| {
940            trace!("got sink error: {:?}", e);
941            Error::CellDecodeErr {
942                object: "reactor test",
943                err: tor_cell::Error::ChanProto("dummy message".into()),
944            }
945        });
946        let stream_ops = NoOpStreamOpsHandle::default();
947        let (chan, reactor) = crate::channel::Channel::new(
948            ChannelMode::Client,
949            link_protocol,
950            Box::new(send1),
951            Box::new(recv2),
952            Box::new(stream_ops),
953            unique_id,
954            dummy_target,
955            safelog::MaybeSensitive::not_sensitive(peer_info),
956            crate::ClockSkew::None,
957            runtime,
958            fake_mq(),
959            Canonicity::new_canonical(),
960        )
961        .expect("channel create failed");
962        (chan, reactor, recv1, send2)
963    }
964
965    // Try shutdown from inside run_once..
966    #[test]
967    fn shutdown() {
968        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
969            let (chan, mut reactor, _output, _input) = new_reactor(rt);
970
971            chan.terminate();
972            let r = reactor.run_once().await;
973            assert!(matches!(r, Err(ReactorError::Shutdown)));
974        });
975    }
976
977    // Try shutdown while reactor is running.
978    #[test]
979    fn shutdown2() {
980        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
981            // TODO: Ask a rust person if this is how to do this.
982
983            use futures::future::FutureExt;
984            use futures::join;
985
986            let (chan, reactor, _output, _input) = new_reactor(rt);
987            // Let's get the reactor running...
988            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
989
990            let rr = run_reactor.clone();
991
992            let exit_then_check = async {
993                assert!(rr.peek().is_none());
994                // ... and terminate the channel while that's happening.
995                chan.terminate();
996            };
997
998            let (rr_s, _) = join!(run_reactor, exit_then_check);
999
1000            // Now let's see. The reactor should not _still_ be running.
1001            assert!(rr_s);
1002        });
1003    }
1004
1005    #[test]
1006    fn new_circ_closed() {
1007        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1008            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
1009            assert!(chan.duration_unused().is_some()); // unused yet
1010
1011            let (ret, reac) = futures::join!(
1012                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
1013                reactor.run_once()
1014            );
1015            let (pending, circr) = ret.unwrap();
1016            rt.spawn(async {
1017                let _ignore = circr.run().await;
1018            })
1019            .unwrap();
1020            assert!(reac.is_ok());
1021
1022            let id = pending.peek_circid();
1023
1024            let ent = reactor.circs.get_mut(id);
1025            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
1026            assert!(chan.duration_unused().is_none()); // in use
1027
1028            // Now drop the circuit; this should tell the reactor to remove
1029            // the circuit from the map.
1030            drop(pending);
1031
1032            reactor.run_once().await.unwrap();
1033            let ent = reactor.circs.get_mut(id);
1034            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
1035            let cell = output.next().await.unwrap();
1036            assert_eq!(cell.circid(), Some(id));
1037            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
1038            assert!(chan.duration_unused().is_some()); // unused again
1039        });
1040    }
1041
1042    // Test proper delivery of a created cell that doesn't make a channel
1043    #[test]
1044    #[ignore] // See bug #244: re-enable this test once it passes reliably.
1045    fn new_circ_create_failure() {
1046        use std::time::Duration;
1047        use tor_rtcompat::SleepProvider;
1048
1049        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1050            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
1051
1052            let (ret, reac) = futures::join!(
1053                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
1054                reactor.run_once()
1055            );
1056            let (pending, circr) = ret.unwrap();
1057            rt.spawn(async {
1058                let _ignore = circr.run().await;
1059            })
1060            .unwrap();
1061            assert!(reac.is_ok());
1062
1063            let circparams = CircParameters::default();
1064
1065            let id = pending.peek_circid();
1066
1067            let ent = reactor.circs.get_mut(id);
1068            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
1069
1070            #[allow(clippy::clone_on_copy)]
1071            let rtc = rt.clone();
1072            let send_response = async {
1073                rtc.sleep(Duration::from_millis(100)).await;
1074                trace!("sending createdfast");
1075                // We'll get a bad handshake result from this createdfast cell.
1076                let created_cell = AnyChanCell::new(Some(id), msg::CreatedFast::new(*b"x").into());
1077                input.send(Ok(created_cell)).await.unwrap();
1078                reactor.run_once().await.unwrap();
1079            };
1080
1081            let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
1082            // Make sure statuses are as expected.
1083            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
1084
1085            reactor.run_once().await.unwrap();
1086
1087            // Make sure that the createfast cell got sent
1088            let cell_sent = output.next().await.unwrap();
1089            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
1090
1091            // But the next run if the reactor will make the circuit get closed.
1092            let ent = reactor.circs.get_mut(id);
1093            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
1094        });
1095    }
1096
1097    // Try incoming cells that shouldn't arrive on channels.
1098    #[test]
1099    fn bad_cells() {
1100        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1101            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
1102
1103            // shouldn't get created2 cells for nonexistent circuits
1104            let created2_cell = msg::Created2::new(*b"hihi").into();
1105            input
1106                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1107                .await
1108                .unwrap();
1109
1110            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1111            assert_eq!(
1112                format!("{}", e),
1113                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
1114            );
1115
1116            // Can't get a relay cell on a circuit we've never heard of.
1117            let relay_cell = msg::Relay::new(b"abc").into();
1118            input
1119                .send(Ok(AnyChanCell::new(CircId::new(4), relay_cell)))
1120                .await
1121                .unwrap();
1122            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1123            assert_eq!(
1124                format!("{}", e),
1125                "Channel protocol violation: Relay cell on nonexistent circuit"
1126            );
1127
1128            // There used to be tests here for other types, but now that we only
1129            // accept OpenClientChanCell, we know that the codec can't even try
1130            // to give us e.g. VERSIONS or CREATE.
1131        });
1132    }
1133
1134    #[test]
1135    fn deliver_relay() {
1136        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1137            use oneshot_fused_workaround as oneshot;
1138
1139            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1140
1141            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1142
1143            let (_circ_stream_7, mut circ_stream_13) = {
1144                let (snd1, _rcv1) = oneshot::channel();
1145                let (snd2, rcv2) = fake_mpsc(64);
1146                reactor.circs.put_unchecked(
1147                    CircId::new(7).unwrap(),
1148                    CircEnt::Opening {
1149                        create_response_sender: snd1,
1150                        cell_sender: snd2,
1151                        padding_ctrl: padding_ctrl.clone(),
1152                    },
1153                );
1154
1155                let (snd3, rcv3) = fake_mpsc(64);
1156                reactor.circs.put_unchecked(
1157                    CircId::new(13).unwrap(),
1158                    CircEnt::OpenOrigin {
1159                        cell_sender: snd3,
1160                        padding_ctrl,
1161                    },
1162                );
1163
1164                reactor.circs.put_unchecked(
1165                    CircId::new(23).unwrap(),
1166                    CircEnt::DestroySent(HalfCirc::new(25)),
1167                );
1168                (rcv2, rcv3)
1169            };
1170
1171            // If a relay cell is sent on an open channel, the correct circuit
1172            // should get it.
1173            let relaycell: AnyChanMsg = msg::Relay::new(b"do you suppose").into();
1174            input
1175                .send(Ok(AnyChanCell::new(CircId::new(13), relaycell.clone())))
1176                .await
1177                .unwrap();
1178            reactor.run_once().await.unwrap();
1179            let got = circ_stream_13.next().await.unwrap();
1180            assert!(matches!(got, AnyChanMsg::Relay(_)));
1181
1182            // If a relay cell is sent on an opening channel, that's an error.
1183            input
1184                .send(Ok(AnyChanCell::new(CircId::new(7), relaycell.clone())))
1185                .await
1186                .unwrap();
1187            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1188            assert_eq!(
1189                format!("{}", e),
1190                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
1191            );
1192
1193            // If a relay cell is sent on a non-existent channel, that's an error.
1194            input
1195                .send(Ok(AnyChanCell::new(CircId::new(101), relaycell.clone())))
1196                .await
1197                .unwrap();
1198            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1199            assert_eq!(
1200                format!("{}", e),
1201                "Channel protocol violation: Relay cell on nonexistent circuit"
1202            );
1203
1204            // It's fine to get a relay cell on a DestroySent channel: that happens
1205            // when the other side hasn't noticed the Destroy yet.
1206
1207            // We can do this 25 more times according to our setup:
1208            for _ in 0..25 {
1209                input
1210                    .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1211                    .await
1212                    .unwrap();
1213                reactor.run_once().await.unwrap(); // should be fine.
1214            }
1215
1216            // This one will fail.
1217            input
1218                .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1219                .await
1220                .unwrap();
1221            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1222            assert_eq!(
1223                format!("{}", e),
1224                "Channel protocol violation: Too many cells received on destroyed circuit"
1225            );
1226        });
1227    }
1228
1229    #[test]
1230    fn deliver_destroy() {
1231        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1232            use crate::circuit::celltypes::*;
1233            use oneshot_fused_workaround as oneshot;
1234
1235            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1236
1237            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1238
1239            let (circ_oneshot_7, mut circ_stream_13) = {
1240                let (snd1, rcv1) = oneshot::channel();
1241                let (snd2, _rcv2) = fake_mpsc(64);
1242                reactor.circs.put_unchecked(
1243                    CircId::new(7).unwrap(),
1244                    CircEnt::Opening {
1245                        create_response_sender: snd1,
1246                        cell_sender: snd2,
1247                        padding_ctrl: padding_ctrl.clone(),
1248                    },
1249                );
1250
1251                let (snd3, rcv3) = fake_mpsc(64);
1252                reactor.circs.put_unchecked(
1253                    CircId::new(13).unwrap(),
1254                    CircEnt::OpenOrigin {
1255                        cell_sender: snd3,
1256                        padding_ctrl: padding_ctrl.clone(),
1257                    },
1258                );
1259
1260                reactor.circs.put_unchecked(
1261                    CircId::new(23).unwrap(),
1262                    CircEnt::DestroySent(HalfCirc::new(25)),
1263                );
1264                (rcv1, rcv3)
1265            };
1266
1267            // Destroying an opening circuit is fine.
1268            let destroycell: AnyChanMsg = msg::Destroy::new(0.into()).into();
1269            input
1270                .send(Ok(AnyChanCell::new(CircId::new(7), destroycell.clone())))
1271                .await
1272                .unwrap();
1273            reactor.run_once().await.unwrap();
1274            let msg = circ_oneshot_7.await;
1275            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
1276
1277            // Destroying an open circuit is fine.
1278            input
1279                .send(Ok(AnyChanCell::new(CircId::new(13), destroycell.clone())))
1280                .await
1281                .unwrap();
1282            reactor.run_once().await.unwrap();
1283            let msg = circ_stream_13.next().await.unwrap();
1284            assert!(matches!(msg, AnyChanMsg::Destroy(_)));
1285
1286            // Destroying a DestroySent circuit is fine.
1287            input
1288                .send(Ok(AnyChanCell::new(CircId::new(23), destroycell.clone())))
1289                .await
1290                .unwrap();
1291            reactor.run_once().await.unwrap();
1292
1293            // Destroying a nonexistent circuit is an error.
1294            input
1295                .send(Ok(AnyChanCell::new(CircId::new(101), destroycell.clone())))
1296                .await
1297                .unwrap();
1298            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1299            assert_eq!(
1300                format!("{}", e),
1301                "Channel protocol violation: Destroy for nonexistent circuit"
1302            );
1303        });
1304    }
1305
1306    #[test]
1307    fn closing_if_reactor_dropped() {
1308        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1309            let (chan, reactor, _output, _input) = new_reactor(rt);
1310
1311            assert!(!chan.is_closing());
1312            drop(reactor);
1313            assert!(chan.is_closing());
1314
1315            assert!(matches!(
1316                chan.wait_for_close().await,
1317                Err(ClosedUnexpectedly::ReactorDropped),
1318            ));
1319        });
1320    }
1321
1322    #[test]
1323    fn closing_if_reactor_shutdown() {
1324        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1325            let (chan, reactor, _output, _input) = new_reactor(rt);
1326
1327            assert!(!chan.is_closing());
1328            chan.terminate();
1329            assert!(!chan.is_closing());
1330
1331            let r = reactor.run().await;
1332            assert!(r.is_ok());
1333            assert!(chan.is_closing());
1334
1335            assert!(chan.wait_for_close().await.is_ok());
1336        });
1337    }
1338
1339    #[test]
1340    fn reactor_error_wait_for_close() {
1341        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1342            let (chan, reactor, _output, mut input) = new_reactor(rt);
1343
1344            // force an error by sending created2 cell for nonexistent circuit
1345            let created2_cell = msg::Created2::new(*b"hihi").into();
1346            input
1347                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1348                .await
1349                .unwrap();
1350
1351            // `reactor.run()` should return an error
1352            let run_error = reactor.run().await.unwrap_err();
1353
1354            // `chan.wait_for_close()` should return the same error
1355            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
1356            else {
1357                panic!("Expected a 'ReactorError'");
1358            };
1359
1360            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
1361            assert_eq!(run_error.to_string(), wait_error.to_string());
1362        });
1363    }
1364}