1use super::circmap::{CircEnt, CircMap};
10use crate::circuit::CircuitRxSender;
11use crate::client::circuit::halfcirc::HalfCirc;
12use crate::client::circuit::padding::{
13 PaddingController, PaddingEvent, PaddingEventStream, SendPadding, StartBlocking,
14};
15use crate::util::err::ReactorError;
16use crate::util::oneshot_broadcast;
17use crate::{Error, HopNum, Result};
18use tor_async_utils::SinkPrepareExt as _;
19use tor_cell::chancell::ChanMsg;
20use tor_cell::chancell::msg::{Destroy, DestroyReason, Padding, PaddingNegotiate};
21use tor_cell::chancell::{AnyChanCell, CircId, msg::AnyChanMsg};
22use tor_error::debug_report;
23use tor_rtcompat::{DynTimeProvider, Runtime};
24
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_error::error_report;
27#[cfg_attr(not(target_os = "linux"), allow(unused))]
28use tor_rtcompat::StreamOps;
29
30use futures::channel::mpsc;
31use oneshot_fused_workaround as oneshot;
32
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::sink::SinkExt;
36use futures::stream::Stream;
37use futures::{select, select_biased};
38use tor_error::internal;
39
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43
44use crate::channel::{ChannelDetails, CloseInfo, kist::KistParams, padding, params::*, unique_id};
45use crate::circuit::celltypes::CreateResponse;
46use tracing::{debug, instrument, trace};
47
48#[cfg(feature = "relay")]
49use {
50 crate::channel::Channel,
51 crate::circuit::celltypes::CreateRequest,
52 crate::relay::channel::create_handler::{CreateRequestHandler, RelayCircComponents},
53 std::sync::Weak,
54 tor_llcrypto::pk::ed25519::Ed25519Identity,
55 tor_llcrypto::pk::rsa::RsaIdentity,
56};
57
58pub(super) type BoxedChannelStream =
60 Box<dyn Stream<Item = std::result::Result<AnyChanCell, Error>> + Send + Unpin + 'static>;
61pub(super) type BoxedChannelSink =
63 Box<dyn Sink<AnyChanCell, Error = Error> + Send + Unpin + 'static>;
64pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
66pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
68
69cfg_if::cfg_if! {
70 if #[cfg(feature = "circ-padding")] {
71 use crate::util::sink_blocker::{SinkBlocker, CountingPolicy};
72 pub(super) type ChannelOutputSink = SinkBlocker<BoxedChannelSink, CountingPolicy>;
74 } else {
75 pub(super) type ChannelOutputSink = BoxedChannelSink;
77 }
78}
79
80#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
82#[derive(Debug)]
83#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums, private_interfaces)]
85pub enum CtrlMsg {
86 Shutdown,
88 CloseCircuit(CircId),
90 AllocateCircuit {
93 created_sender: oneshot::Sender<CreateResponse>,
95 sender: CircuitRxSender,
97 tx: ReactorResultChannel<(
99 CircId,
100 crate::circuit::UniqId,
101 PaddingController,
102 PaddingEventStream,
103 )>,
104 },
105 ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
114 KistConfigUpdate(KistParams),
120 #[cfg(feature = "circ-padding-manual")]
122 SetChannelPadder {
123 padder: Option<crate::client::CircuitPadder>,
125 sender: oneshot::Sender<Result<()>>,
127 },
128}
129
130#[must_use = "If you don't call run() on a reactor, the channel won't work."]
135pub struct Reactor<R: Runtime> {
136 pub(super) runtime: R,
138 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
140 pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
143 pub(super) cells: super::CellRx,
147 pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
151 pub(super) output: ChannelOutputSink,
155 #[cfg_attr(not(target_os = "linux"), allow(unused))]
157 pub(super) streamops: BoxedChannelStreamOps,
158 #[cfg(feature = "relay")]
161 pub(super) create_request_handler: Option<CreateRequestHandlerAndData>,
162 pub(super) padding_timer: Pin<Box<padding::Timer<R>>>,
169 pub(super) special_outgoing: SpecialOutgoing,
171 pub(super) circs: CircMap,
173 pub(super) unique_id: super::UniqId,
175 pub(super) details: Arc<ChannelDetails>,
177 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
179 pub(super) padding_ctrl: PaddingController<DynTimeProvider>,
187 pub(super) padding_event_stream: PaddingEventStream<DynTimeProvider>,
191 pub(super) padding_blocker: Option<StartBlocking>,
193 #[allow(dead_code)] pub(super) link_protocol: u16,
196}
197
198#[derive(Default, Debug, Clone)]
200pub(super) struct SpecialOutgoing {
201 padding_negotiate: Option<PaddingNegotiate>,
203 n_padding: u16,
205}
206
207impl SpecialOutgoing {
208 #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
213 fn next(&mut self) -> Option<AnyChanCell> {
214 if let Some(p) = self.padding_negotiate.take() {
217 return Some(p.into());
218 }
219 if self.n_padding > 0 {
220 self.n_padding -= 1;
221 return Some(Padding::new().into());
222 }
223 None
224 }
225
226 fn queue_padding_cell(&mut self) {
228 self.n_padding = self.n_padding.saturating_add(1);
229 }
230}
231
232impl<R: Runtime> fmt::Display for Reactor<R> {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 fmt::Debug::fmt(&self.unique_id, f)
239 }
240}
241
242impl<R: Runtime> Reactor<R> {
243 #[instrument(level = "trace", skip_all)]
249 pub async fn run(mut self) -> Result<()> {
250 trace!(channel_id = %self, "Running reactor");
251 let result: Result<()> = loop {
252 match self.run_once().await {
253 Ok(()) => (),
254 Err(ReactorError::Shutdown) => break Ok(()),
255 Err(ReactorError::Err(e)) => break Err(e),
256 }
257 };
258
259 const MSG: &str = "Reactor stopped";
262 match &result {
263 Ok(()) => debug!(channel_id = %self, "{MSG}"),
264 Err(e) => debug_report!(e, channel_id = %self, "{MSG}"),
265 }
266
267 let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
269 self.reactor_closed_tx.send(close_msg);
270 result
271 }
272
273 #[instrument(level = "trace", skip_all)]
275 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
276 select! {
277
278 ret = self.output.prepare_send_from(async {
281 if let Some(l) = self.special_outgoing.next() {
284 self.padding_timer.as_mut().note_cell_sent();
287 return Some((l, None));
288 }
289
290 select_biased! {
291 n = self.cells.next() => {
292 self.padding_timer.as_mut().note_cell_sent();
309 n
310 },
311 p = self.padding_timer.as_mut().next() => {
312 self.padding_ctrl.queued_data(HopNum::from(0));
317
318 self.padding_timer.as_mut().note_cell_sent();
319 Some((p.into(), None))
320 },
321 }
322 }) => {
323 self.padding_ctrl.flushed_channel_cell();
324 let (queued, sendable) = ret?;
325 let (msg, cell_padding_info) = queued.ok_or(ReactorError::Shutdown)?;
326 if let (Some(cell_padding_info), Some(circid)) = (cell_padding_info, msg.circid()) {
333 self.circs.note_cell_flushed(circid, cell_padding_info);
334 }
335 sendable.send(msg)?;
336 }
337
338 ret = self.control.next() => {
339 let ctrl = match ret {
340 None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
341 Some(x) => x,
342 };
343 self.handle_control(ctrl).await?;
344 }
345
346 ret = self.padding_event_stream.next() => {
347 let event = ret.ok_or_else(|| Error::from(internal!("Padding event stream was exhausted")))?;
348 self.handle_padding_event(event).await?;
349 }
350
351 ret = self.input.next() => {
352 let item = ret
353 .ok_or(ReactorError::Shutdown)??;
354 crate::note_incoming_traffic();
355 self.handle_cell(item).await?;
356 }
357
358 }
359 Ok(()) }
361
362 #[instrument(level = "trace", skip(self))] async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
365 trace!(
366 channel_id = %self,
367 msg = ?msg,
368 "reactor received control message"
369 );
370
371 match msg {
372 CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
374 CtrlMsg::AllocateCircuit {
375 created_sender,
376 sender,
377 tx,
378 } => {
379 let mut rng = rand::rng();
380 let my_unique_id = self.unique_id;
381 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
382 let (padding_ctrl, padding_stream) = crate::client::circuit::padding::new_padding(
391 DynTimeProvider::new(self.runtime.clone()),
393 );
394 let ret: Result<_> = self
395 .circs
396 .add_origin_ent(&mut rng, created_sender, sender, padding_ctrl.clone())
397 .map(|id| (id, circ_unique_id, padding_ctrl, padding_stream));
398 let _ = tx.send(ret); self.update_disused_since();
400 }
401 CtrlMsg::ConfigUpdate(updates) => {
402 if self.link_protocol == 4 {
403 return Ok(());
407 }
408
409 let ChannelPaddingInstructionsUpdates {
410 padding_enable,
413 padding_parameters,
414 padding_negotiate,
415 } = &*updates;
416 if let Some(parameters) = padding_parameters {
417 self.padding_timer.as_mut().reconfigure(parameters)?;
418 }
419 if let Some(enable) = padding_enable {
420 if *enable {
421 self.padding_timer.as_mut().enable();
422 } else {
423 self.padding_timer.as_mut().disable();
424 }
425 }
426 if let Some(padding_negotiate) = padding_negotiate {
427 self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
431 }
432 }
433 CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
434 #[cfg(feature = "circ-padding-manual")]
435 CtrlMsg::SetChannelPadder { padder, sender } => {
436 self.padding_ctrl
437 .install_padder_padding_at_hop(HopNum::from(0), padder);
438 let _ignore = sender.send(Ok(()));
439 }
440 }
441 Ok(())
442 }
443
444 #[cfg(not(feature = "circ-padding"))]
448 #[allow(clippy::unused_async)] async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
450 void::unreachable(action.0)
451 }
452
453 #[cfg(feature = "circ-padding")]
455 async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
456 use PaddingEvent as PE;
457 match action {
458 PE::SendPadding(send_padding) => {
459 self.handle_send_padding(send_padding).await?;
460 }
461 PE::StartBlocking(start_blocking) => {
462 if self.output.is_unlimited() {
463 self.output.set_blocked();
464 }
465 self.padding_blocker = Some(start_blocking);
466 }
467 PE::StopBlocking => {
468 self.output.set_unlimited();
469 }
470 }
471 Ok(())
472 }
473
474 #[cfg(feature = "circ-padding")]
476 async fn handle_send_padding(&mut self, padding: SendPadding) -> Result<()> {
477 use crate::client::circuit::padding::{Bypass::*, Replace::*};
482 let hop = HopNum::from(0);
484 assert_eq!(padding.hop, hop);
485
486 let blocking_bypassed = matches!(
488 (&self.padding_blocker, padding.may_bypass_block()),
489 (
490 Some(StartBlocking {
491 is_bypassable: true
492 }),
493 BypassBlocking
494 )
495 );
496 let this_padding_blocked = self.padding_blocker.is_some() && !blocking_bypassed;
498
499 if padding.may_replace_with_data() == Replaceable {
500 if self.output_is_full().await? {
501 self.padding_ctrl
509 .replaceable_padding_already_queued(hop, padding);
510 return Ok(());
511 } else if self.cells.approx_count() > 0 {
512 if this_padding_blocked {
514 self.padding_ctrl
516 .replaceable_padding_already_queued(hop, padding);
517 } else {
518 self.padding_ctrl.queued_data_as_padding(hop, padding);
521 if blocking_bypassed {
522 self.output.allow_n_additional_items(1);
523 }
524 }
525 return Ok(());
526 } else {
527 }
529 }
530
531 self.special_outgoing.queue_padding_cell();
533 self.padding_ctrl.queued_padding(hop, padding);
534 if blocking_bypassed {
535 self.output.allow_n_additional_items(1);
536 }
537
538 Ok(())
539 }
540
541 #[cfg(feature = "circ-padding")]
548 async fn output_is_full(&mut self) -> Result<bool> {
549 use futures::future::poll_fn;
550 use std::task::Poll;
551 poll_fn(|cx| {
553 Poll::Ready(match self.output.poll_ready_unpin(cx) {
554 Poll::Ready(Ok(())) => Ok(false),
556 Poll::Pending => Ok(true),
558 Poll::Ready(Err(e)) => Err(e),
560 })
561 })
562 .await
563 }
564
565 #[instrument(level = "trace", skip_all)]
568 async fn handle_cell(&mut self, cell: AnyChanCell) -> Result<()> {
569 let (circid, msg) = cell.into_circid_and_msg();
570 use AnyChanMsg::*;
571
572 match msg {
573 Relay(_) | Padding(_) | Vpadding(_) => {} _ => trace!(
575 channel_id = %self,
576 "received {} for {}",
577 msg.cmd(),
578 CircId::get_or_zero(circid)
579 ),
580 }
581
582 match msg {
584 Padding(_) | Vpadding(_) => {
585 let _always_acceptable = self.padding_ctrl.decrypted_padding(HopNum::from(0));
587 }
588 _ => self.padding_ctrl.decrypted_data(HopNum::from(0)),
589 }
590
591 match msg {
592 Relay(_) => self.deliver_relay(circid, msg).await,
594
595 #[cfg(feature = "relay")]
599 RelayEarly(_) if self.create_request_handler.is_some() => {
600 self.deliver_relay(circid, msg).await
601 }
602
603 Destroy(_) => self.deliver_destroy(circid, msg).await,
604
605 #[cfg(feature = "relay")]
612 CreateFast(msg) if self.create_request_handler.is_some() => {
613 self.handle_create(circid, CreateRequest::CreateFast(msg))
614 .await
615 }
616 #[cfg(feature = "relay")]
617 Create2(msg) if self.create_request_handler.is_some() => {
618 self.handle_create(circid, CreateRequest::Create2(msg))
619 .await
620 }
621
622 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg),
623
624 Padding(_) | Vpadding(_) => Ok(()),
626 _ => Err(Error::ChanProto(format!("Unexpected cell: {msg:?}"))),
627 }
628 }
629
630 async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
632 let Some(circid) = circid else {
633 return Err(Error::ChanProto("Relay cell without circuit ID".into()));
634 };
635
636 let mut ent = self
637 .circs
638 .get_mut(circid)
639 .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
640
641 match &mut *ent {
642 CircEnt::OpenOrigin { cell_sender: s, .. } => {
643 if s.send(msg).await.is_err() {
645 drop(ent);
646 self.outbound_destroy_circ(circid).await?;
648 }
649 Ok(())
650 }
651 #[cfg(feature = "relay")]
652 CircEnt::OpenRelay { cell_sender: s, .. } => {
653 if s.send(msg).await.is_err() {
655 drop(ent);
656 self.outbound_destroy_circ(circid).await?;
660 }
661 Ok(())
662 }
663 CircEnt::Opening { .. } => Err(Error::ChanProto(
664 "Relay cell on pending circuit before CREATED* received".into(),
665 )),
666 CircEnt::DestroySent(hs) => hs.receive_cell(),
667 }
668 }
669
670 #[cfg(feature = "relay")]
672 async fn handle_create(&mut self, circid: Option<CircId>, msg: CreateRequest) -> Result<()> {
673 let Some(ref create_request_handler) = self.create_request_handler else {
674 return Err(internal!("Called 'deliver_relay()', but handler isn't set").into());
676 };
677
678 let Some(circid) = circid else {
679 let err = format!("Received {} cell without circuit ID", msg.cmd());
680 return Err(Error::ChanProto(err));
681 };
682
683 let Some(chan) = create_request_handler.channel.upgrade() else {
684 let destroy = Destroy::new(DestroyReason::CHANNEL_CLOSED);
689 let destroy = AnyChanCell::new(Some(circid), destroy.into());
690
691 debug!(
692 "Unable to upgrade weak `Channel` while handling {}; sending {}",
693 msg.cmd(),
694 destroy.msg().cmd(),
695 );
696 return self.send_cell(destroy).await;
697 };
698
699 let circ_uniq_id = self.circ_unique_id_ctx.next(self.unique_id);
703
704 let create_result = create_request_handler.handler.handle_create(
706 &self.runtime,
707 &chan,
708 &create_request_handler.our_ed25519_id,
709 &create_request_handler.our_rsa_id,
710 circid,
711 &msg,
712 &self.details.memquota,
713 circ_uniq_id,
714 );
715
716 let response = match create_result {
718 Ok((response, components)) => {
719 let RelayCircComponents {
720 circ,
721 sender,
722 padding_ctrl,
723 } = components;
724
725 if let Err(reason) = self.circs.add_relay_ent(circid, circ, sender, padding_ctrl) {
726 debug!("Unable to add circuit map entry for incoming circuit: {reason}");
727 CreateResponse::Destroy(Destroy::new(reason))
728 } else {
729 response
730 }
731 }
732 Err(destroy) => CreateResponse::Destroy(destroy),
733 };
734
735 let response = AnyChanCell::new(Some(circid), response.into());
736 self.send_cell(response).await
737 }
738
739 fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
742 let Some(circid) = circid else {
743 return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
744 };
745
746 let target = self.circs.advance_from_opening(circid)?;
747 let created = msg.try_into()?;
748 target.send(created).map_err(|_| {
751 Error::from(internal!(
752 "Circuit queue rejected created message. Is it closing?"
753 ))
754 })
755 }
756
757 async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
760 let Some(circid) = circid else {
761 return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
762 };
763
764 async fn send_destroy(mut sender: CircuitRxSender, msg: AnyChanMsg) -> Result<()> {
766 sender
767 .send(msg)
768 .await
769 .map_err(|_| internal!("open circuit wasn't interested in destroy cell?").into())
772 }
773
774 let entry = self.circs.remove(circid);
776 self.update_disused_since();
777 match entry {
778 Some(CircEnt::Opening {
781 create_response_sender,
782 ..
783 }) => {
784 trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
785 create_response_sender
786 .send(msg.try_into()?)
787 .map_err(|_| {
790 internal!("pending circuit wasn't interested in destroy cell?").into()
791 })
792 }
793 Some(CircEnt::OpenOrigin { cell_sender, .. }) => {
795 trace!(channel_id = %self, "Passing destroy to open origin circuit {}", circid);
796 send_destroy(cell_sender, msg).await
797 }
798 #[cfg(feature = "relay")]
800 Some(CircEnt::OpenRelay { cell_sender, .. }) => {
801 trace!(channel_id = %self, "Passing destroy to open relay circuit {}", circid);
802 send_destroy(cell_sender, msg).await
803 }
804 Some(CircEnt::DestroySent(_)) => Ok(()),
806 None => {
808 trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
809 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
810 }
811 }
812 }
813
814 async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
816 self.output.send(cell).await?;
817 Ok(())
818 }
819
820 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
823 trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
824 self.circs.destroy_sent(id, HalfCirc::new(3000));
829 self.update_disused_since();
830 let destroy = Destroy::new(DestroyReason::NONE).into();
831 let cell = AnyChanCell::new(Some(id), destroy);
832 self.send_cell(cell).await?;
833
834 Ok(())
835 }
836
837 fn update_disused_since(&self) {
839 if self.circs.open_ent_count() == 0 {
840 self.details.unused_since.update_if_none();
842 } else {
843 self.details.unused_since.clear();
845 }
846 }
847
848 #[cfg(target_os = "linux")]
850 fn apply_kist_params(&self, params: &KistParams) {
851 use super::kist::KistMode;
852
853 let set_tcp_notsent_lowat = |v: u32| {
854 if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
855 error_report!(e, "Failed to set KIST socket options");
858 }
859 };
860
861 match params.kist_enabled() {
862 KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
863 KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
864 }
865 }
866
867 #[cfg(not(target_os = "linux"))]
869 fn apply_kist_params(&self, params: &KistParams) {
870 use super::kist::KistMode;
871
872 if params.kist_enabled() != KistMode::Disabled {
873 tracing::warn!("KIST not currently supported on non-linux platforms");
874 }
875 }
876}
877
878#[cfg(feature = "relay")]
881pub(super) struct CreateRequestHandlerAndData {
882 pub(super) handler: Arc<CreateRequestHandler>,
884 pub(super) channel: Weak<Channel>,
891 pub(super) our_ed25519_id: Ed25519Identity,
893 pub(super) our_rsa_id: RsaIdentity,
895}
896
897#[cfg(test)]
898pub(crate) mod test {
899 #![allow(clippy::unwrap_used)]
900 use super::*;
901 use crate::channel::{Canonicity, ChannelMode, ClosedUnexpectedly, UniqId};
902 use crate::client::circuit::CircParameters;
903 use crate::client::circuit::padding::new_padding;
904 use crate::fake_mpsc;
905 use crate::peer::{PeerAddr, PeerInfo};
906 use crate::util::{DummyTimeoutEstimator, fake_mq};
907 use futures::sink::SinkExt;
908 use futures::stream::StreamExt;
909 use tor_cell::chancell::msg;
910 use tor_linkspec::{OwnedChanTarget, RelayIdsBuilder};
911 use tor_rtcompat::SpawnExt;
912 use tor_rtcompat::{DynTimeProvider, NoOpStreamOpsHandle, Runtime};
913
914 pub(crate) type CodecResult = std::result::Result<AnyChanCell, Error>;
915
916 pub(crate) fn new_reactor<R: Runtime>(
917 runtime: R,
918 ) -> (
919 Arc<crate::channel::Channel>,
920 Reactor<R>,
921 mpsc::Receiver<AnyChanCell>,
922 mpsc::Sender<CodecResult>,
923 ) {
924 let link_protocol = 4;
925 let (send1, recv1) = mpsc::channel(32);
926 let (send2, recv2) = mpsc::channel(32);
927 let unique_id = UniqId::new();
928 let ed = [6; 32].into();
929 let rsa = [10; 20].into();
930 let dummy_target = OwnedChanTarget::builder()
931 .ed_identity(ed)
932 .rsa_identity(rsa)
933 .build()
934 .unwrap();
935 let mut peer_ids = RelayIdsBuilder::default();
936 peer_ids.ed_identity(ed);
937 peer_ids.rsa_identity(rsa);
938 let peer_info = PeerInfo::new(PeerAddr::UNSPECIFIED, peer_ids.build().unwrap());
939 let send1 = send1.sink_map_err(|e| {
940 trace!("got sink error: {:?}", e);
941 Error::CellDecodeErr {
942 object: "reactor test",
943 err: tor_cell::Error::ChanProto("dummy message".into()),
944 }
945 });
946 let stream_ops = NoOpStreamOpsHandle::default();
947 let (chan, reactor) = crate::channel::Channel::new(
948 ChannelMode::Client,
949 link_protocol,
950 Box::new(send1),
951 Box::new(recv2),
952 Box::new(stream_ops),
953 unique_id,
954 dummy_target,
955 safelog::MaybeSensitive::not_sensitive(peer_info),
956 crate::ClockSkew::None,
957 runtime,
958 fake_mq(),
959 Canonicity::new_canonical(),
960 )
961 .expect("channel create failed");
962 (chan, reactor, recv1, send2)
963 }
964
965 #[test]
967 fn shutdown() {
968 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
969 let (chan, mut reactor, _output, _input) = new_reactor(rt);
970
971 chan.terminate();
972 let r = reactor.run_once().await;
973 assert!(matches!(r, Err(ReactorError::Shutdown)));
974 });
975 }
976
977 #[test]
979 fn shutdown2() {
980 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
981 use futures::future::FutureExt;
984 use futures::join;
985
986 let (chan, reactor, _output, _input) = new_reactor(rt);
987 let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
989
990 let rr = run_reactor.clone();
991
992 let exit_then_check = async {
993 assert!(rr.peek().is_none());
994 chan.terminate();
996 };
997
998 let (rr_s, _) = join!(run_reactor, exit_then_check);
999
1000 assert!(rr_s);
1002 });
1003 }
1004
1005 #[test]
1006 fn new_circ_closed() {
1007 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1008 let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
1009 assert!(chan.duration_unused().is_some()); let (ret, reac) = futures::join!(
1012 chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
1013 reactor.run_once()
1014 );
1015 let (pending, circr) = ret.unwrap();
1016 rt.spawn(async {
1017 let _ignore = circr.run().await;
1018 })
1019 .unwrap();
1020 assert!(reac.is_ok());
1021
1022 let id = pending.peek_circid();
1023
1024 let ent = reactor.circs.get_mut(id);
1025 assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
1026 assert!(chan.duration_unused().is_none()); drop(pending);
1031
1032 reactor.run_once().await.unwrap();
1033 let ent = reactor.circs.get_mut(id);
1034 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
1035 let cell = output.next().await.unwrap();
1036 assert_eq!(cell.circid(), Some(id));
1037 assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
1038 assert!(chan.duration_unused().is_some()); });
1040 }
1041
1042 #[test]
1044 #[ignore] fn new_circ_create_failure() {
1046 use std::time::Duration;
1047 use tor_rtcompat::SleepProvider;
1048
1049 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1050 let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
1051
1052 let (ret, reac) = futures::join!(
1053 chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
1054 reactor.run_once()
1055 );
1056 let (pending, circr) = ret.unwrap();
1057 rt.spawn(async {
1058 let _ignore = circr.run().await;
1059 })
1060 .unwrap();
1061 assert!(reac.is_ok());
1062
1063 let circparams = CircParameters::default();
1064
1065 let id = pending.peek_circid();
1066
1067 let ent = reactor.circs.get_mut(id);
1068 assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
1069
1070 #[allow(clippy::clone_on_copy)]
1071 let rtc = rt.clone();
1072 let send_response = async {
1073 rtc.sleep(Duration::from_millis(100)).await;
1074 trace!("sending createdfast");
1075 let created_cell = AnyChanCell::new(Some(id), msg::CreatedFast::new(*b"x").into());
1077 input.send(Ok(created_cell)).await.unwrap();
1078 reactor.run_once().await.unwrap();
1079 };
1080
1081 let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
1082 assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
1084
1085 reactor.run_once().await.unwrap();
1086
1087 let cell_sent = output.next().await.unwrap();
1089 assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
1090
1091 let ent = reactor.circs.get_mut(id);
1093 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
1094 });
1095 }
1096
1097 #[test]
1099 fn bad_cells() {
1100 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1101 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
1102
1103 let created2_cell = msg::Created2::new(*b"hihi").into();
1105 input
1106 .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1107 .await
1108 .unwrap();
1109
1110 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1111 assert_eq!(
1112 format!("{}", e),
1113 "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
1114 );
1115
1116 let relay_cell = msg::Relay::new(b"abc").into();
1118 input
1119 .send(Ok(AnyChanCell::new(CircId::new(4), relay_cell)))
1120 .await
1121 .unwrap();
1122 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1123 assert_eq!(
1124 format!("{}", e),
1125 "Channel protocol violation: Relay cell on nonexistent circuit"
1126 );
1127
1128 });
1132 }
1133
1134 #[test]
1135 fn deliver_relay() {
1136 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1137 use oneshot_fused_workaround as oneshot;
1138
1139 let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1140
1141 let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1142
1143 let (_circ_stream_7, mut circ_stream_13) = {
1144 let (snd1, _rcv1) = oneshot::channel();
1145 let (snd2, rcv2) = fake_mpsc(64);
1146 reactor.circs.put_unchecked(
1147 CircId::new(7).unwrap(),
1148 CircEnt::Opening {
1149 create_response_sender: snd1,
1150 cell_sender: snd2,
1151 padding_ctrl: padding_ctrl.clone(),
1152 },
1153 );
1154
1155 let (snd3, rcv3) = fake_mpsc(64);
1156 reactor.circs.put_unchecked(
1157 CircId::new(13).unwrap(),
1158 CircEnt::OpenOrigin {
1159 cell_sender: snd3,
1160 padding_ctrl,
1161 },
1162 );
1163
1164 reactor.circs.put_unchecked(
1165 CircId::new(23).unwrap(),
1166 CircEnt::DestroySent(HalfCirc::new(25)),
1167 );
1168 (rcv2, rcv3)
1169 };
1170
1171 let relaycell: AnyChanMsg = msg::Relay::new(b"do you suppose").into();
1174 input
1175 .send(Ok(AnyChanCell::new(CircId::new(13), relaycell.clone())))
1176 .await
1177 .unwrap();
1178 reactor.run_once().await.unwrap();
1179 let got = circ_stream_13.next().await.unwrap();
1180 assert!(matches!(got, AnyChanMsg::Relay(_)));
1181
1182 input
1184 .send(Ok(AnyChanCell::new(CircId::new(7), relaycell.clone())))
1185 .await
1186 .unwrap();
1187 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1188 assert_eq!(
1189 format!("{}", e),
1190 "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
1191 );
1192
1193 input
1195 .send(Ok(AnyChanCell::new(CircId::new(101), relaycell.clone())))
1196 .await
1197 .unwrap();
1198 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1199 assert_eq!(
1200 format!("{}", e),
1201 "Channel protocol violation: Relay cell on nonexistent circuit"
1202 );
1203
1204 for _ in 0..25 {
1209 input
1210 .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1211 .await
1212 .unwrap();
1213 reactor.run_once().await.unwrap(); }
1215
1216 input
1218 .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1219 .await
1220 .unwrap();
1221 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1222 assert_eq!(
1223 format!("{}", e),
1224 "Channel protocol violation: Too many cells received on destroyed circuit"
1225 );
1226 });
1227 }
1228
1229 #[test]
1230 fn deliver_destroy() {
1231 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1232 use crate::circuit::celltypes::*;
1233 use oneshot_fused_workaround as oneshot;
1234
1235 let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1236
1237 let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1238
1239 let (circ_oneshot_7, mut circ_stream_13) = {
1240 let (snd1, rcv1) = oneshot::channel();
1241 let (snd2, _rcv2) = fake_mpsc(64);
1242 reactor.circs.put_unchecked(
1243 CircId::new(7).unwrap(),
1244 CircEnt::Opening {
1245 create_response_sender: snd1,
1246 cell_sender: snd2,
1247 padding_ctrl: padding_ctrl.clone(),
1248 },
1249 );
1250
1251 let (snd3, rcv3) = fake_mpsc(64);
1252 reactor.circs.put_unchecked(
1253 CircId::new(13).unwrap(),
1254 CircEnt::OpenOrigin {
1255 cell_sender: snd3,
1256 padding_ctrl: padding_ctrl.clone(),
1257 },
1258 );
1259
1260 reactor.circs.put_unchecked(
1261 CircId::new(23).unwrap(),
1262 CircEnt::DestroySent(HalfCirc::new(25)),
1263 );
1264 (rcv1, rcv3)
1265 };
1266
1267 let destroycell: AnyChanMsg = msg::Destroy::new(0.into()).into();
1269 input
1270 .send(Ok(AnyChanCell::new(CircId::new(7), destroycell.clone())))
1271 .await
1272 .unwrap();
1273 reactor.run_once().await.unwrap();
1274 let msg = circ_oneshot_7.await;
1275 assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
1276
1277 input
1279 .send(Ok(AnyChanCell::new(CircId::new(13), destroycell.clone())))
1280 .await
1281 .unwrap();
1282 reactor.run_once().await.unwrap();
1283 let msg = circ_stream_13.next().await.unwrap();
1284 assert!(matches!(msg, AnyChanMsg::Destroy(_)));
1285
1286 input
1288 .send(Ok(AnyChanCell::new(CircId::new(23), destroycell.clone())))
1289 .await
1290 .unwrap();
1291 reactor.run_once().await.unwrap();
1292
1293 input
1295 .send(Ok(AnyChanCell::new(CircId::new(101), destroycell.clone())))
1296 .await
1297 .unwrap();
1298 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1299 assert_eq!(
1300 format!("{}", e),
1301 "Channel protocol violation: Destroy for nonexistent circuit"
1302 );
1303 });
1304 }
1305
1306 #[test]
1307 fn closing_if_reactor_dropped() {
1308 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1309 let (chan, reactor, _output, _input) = new_reactor(rt);
1310
1311 assert!(!chan.is_closing());
1312 drop(reactor);
1313 assert!(chan.is_closing());
1314
1315 assert!(matches!(
1316 chan.wait_for_close().await,
1317 Err(ClosedUnexpectedly::ReactorDropped),
1318 ));
1319 });
1320 }
1321
1322 #[test]
1323 fn closing_if_reactor_shutdown() {
1324 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1325 let (chan, reactor, _output, _input) = new_reactor(rt);
1326
1327 assert!(!chan.is_closing());
1328 chan.terminate();
1329 assert!(!chan.is_closing());
1330
1331 let r = reactor.run().await;
1332 assert!(r.is_ok());
1333 assert!(chan.is_closing());
1334
1335 assert!(chan.wait_for_close().await.is_ok());
1336 });
1337 }
1338
1339 #[test]
1340 fn reactor_error_wait_for_close() {
1341 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1342 let (chan, reactor, _output, mut input) = new_reactor(rt);
1343
1344 let created2_cell = msg::Created2::new(*b"hihi").into();
1346 input
1347 .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1348 .await
1349 .unwrap();
1350
1351 let run_error = reactor.run().await.unwrap_err();
1353
1354 let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
1356 else {
1357 panic!("Expected a 'ReactorError'");
1358 };
1359
1360 assert_eq!(run_error.to_string(), wait_error.to_string());
1362 });
1363 }
1364}