1pub(crate) mod circhop;
4pub(super) mod extender;
5
6use crate::channel::Channel;
7use crate::circuit::cell_sender::CircuitCellSender;
8use crate::circuit::celltypes::CreateResponse;
9use crate::circuit::circhop::HopSettings;
10use crate::circuit::create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
11use crate::circuit::padding::CircPaddingDisposition;
12use crate::circuit::{CircuitRxReceiver, UniqId};
13use crate::client::circuit::handshake::{BoxedClientLayer, HandshakeRole};
14use crate::client::circuit::padding::{
15 self, PaddingController, PaddingEventStream, QueuedCellPaddingInfo,
16};
17use crate::client::circuit::{ClientCircChanMsg, MutableState, path};
18use crate::client::reactor::MetaCellDisposition;
19use crate::congestion::CongestionSignals;
20use crate::congestion::sendme;
21use crate::crypto::binding::CircuitBinding;
22use crate::crypto::cell::{
23 HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
24 RelayCellBody,
25};
26use crate::crypto::handshake::fast::CreateFastClient;
27use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
28use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
29use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
30use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
31use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
32use crate::stream::flow_ctrl::state::StreamRateLimit;
33use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
34use crate::stream::queue::{StreamQueueSender, stream_queue};
35use crate::stream::{StreamMpscReceiver, msg_streamid};
36use crate::streammap;
37use crate::tunnel::TunnelScopedCircId;
38use crate::util::err::ReactorError;
39use crate::util::notify::NotifySender;
40use crate::util::timeout::TimeoutEstimator;
41use crate::{ClockSkew, Error, Result};
42
43use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
44use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
45use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
46use tor_cell::chancell::{BoxedCellBody, ChanMsg};
47use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
48use tor_cell::relaycell::{
49 AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
50};
51use tor_error::{Bug, internal};
52use tor_linkspec::RelayIds;
53use tor_llcrypto::pk;
54use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
55use web_time_compat::{Duration, Instant, SystemTime};
56
57use futures::SinkExt as _;
58use oneshot_fused_workaround as oneshot;
59use postage::watch;
60use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
61use tracing::{debug, instrument, trace, warn};
62
63use super::{
64 CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
65};
66
67use crate::conflux::msghandler::ConfluxStatus;
68
69use std::borrow::Borrow;
70use std::pin::Pin;
71use std::result::Result as StdResult;
72use std::sync::Arc;
73
74use extender::HandshakeAuxDataHandler;
75
76#[cfg(feature = "hs-service")]
77use {
78 crate::circuit::CircHopSyncView,
79 crate::client::stream::{InboundDataCmdChecker, IncomingStreamRequest},
80 tor_cell::relaycell::msg::Begin,
81};
82
83#[cfg(feature = "conflux")]
84use {
85 crate::conflux::msghandler::{ConfluxAction, ConfluxCmd, ConfluxMsgHandler, OooRelayMsg},
86 crate::tunnel::TunnelId,
87};
88
89#[cfg(not(feature = "flowctl-cc"))]
90use crate::stream::STREAM_READER_BUFFER;
91
92pub(super) use circhop::{CircHop, CircHopList};
93
94pub(crate) struct Circuit {
99 runtime: DynTimeProvider,
101 channel: Arc<Channel>,
103 pub(super) chan_sender: CircuitCellSender,
108 pub(super) input: CircuitRxReceiver,
113 crypto_in: InboundClientCrypt,
117 crypto_out: OutboundClientCrypt,
119 pub(super) hops: CircHopList,
121 mutable: Arc<MutableState>,
124 channel_id: CircId,
126 unique_id: TunnelScopedCircId,
128 #[cfg(feature = "conflux")]
133 conflux_handler: Option<ConfluxMsgHandler>,
134 padding_ctrl: PaddingController,
136 pub(super) padding_event_stream: PaddingEventStream,
144 #[cfg(feature = "circ-padding")]
146 padding_block: Option<padding::StartBlocking>,
147 timeouts: Arc<dyn TimeoutEstimator>,
151 #[allow(dead_code)] memquota: CircuitAccount,
154}
155
156#[derive(Debug, derive_more::From)]
164pub(super) enum CircuitCmd {
165 Send(SendRelayCell),
167 HandleSendMe {
169 hop: HopNum,
171 sendme: Sendme,
173 },
174 CloseStream {
176 hop: HopNum,
178 sid: StreamId,
180 behav: CloseStreamBehavior,
182 reason: streammap::TerminateReason,
184 },
185 #[cfg(feature = "conflux")]
187 Conflux(ConfluxCmd),
188 CleanShutdown,
190 #[cfg(feature = "conflux")]
192 Enqueue(OooRelayMsg),
193}
194
195macro_rules! unsupported_client_cell {
202 ($msg:expr) => {{
203 unsupported_client_cell!(@ $msg, "")
204 }};
205
206 ($msg:expr, $hopnum:expr) => {{
207 let hop: HopNum = $hopnum;
208 let hop_display = format!(" from hop {}", hop.display());
209 unsupported_client_cell!(@ $msg, hop_display)
210 }};
211
212 (@ $msg:expr, $hopnum_display:expr) => {
213 Err(crate::Error::CircProto(format!(
214 "Unexpected {} cell{} on client circuit",
215 $msg.cmd(),
216 $hopnum_display,
217 )))
218 };
219}
220
221pub(super) use unsupported_client_cell;
222
223impl Circuit {
224 #[allow(clippy::too_many_arguments)]
226 pub(super) fn new(
227 runtime: DynTimeProvider,
228 channel: Arc<Channel>,
229 channel_id: CircId,
230 unique_id: TunnelScopedCircId,
231 input: CircuitRxReceiver,
232 memquota: CircuitAccount,
233 mutable: Arc<MutableState>,
234 padding_ctrl: PaddingController,
235 padding_event_stream: PaddingEventStream,
236 timeouts: Arc<dyn TimeoutEstimator>,
237 ) -> Self {
238 let chan_sender = CircuitCellSender::from_channel_sender(channel.sender());
239
240 let crypto_out = OutboundClientCrypt::new();
241 Circuit {
242 runtime,
243 channel,
244 chan_sender,
245 input,
246 crypto_in: InboundClientCrypt::new(),
247 hops: CircHopList::default(),
248 unique_id,
249 channel_id,
250 crypto_out,
251 mutable,
252 #[cfg(feature = "conflux")]
253 conflux_handler: None,
254 padding_ctrl,
255 padding_event_stream,
256 #[cfg(feature = "circ-padding")]
257 padding_block: None,
258 timeouts,
259 memquota,
260 }
261 }
262
263 pub(super) fn unique_id(&self) -> UniqId {
265 self.unique_id.unique_id()
266 }
267
268 pub(super) fn mutable(&self) -> &Arc<MutableState> {
270 &self.mutable
271 }
272
273 #[cfg(feature = "conflux")]
278 pub(super) fn add_to_conflux_tunnel(
279 &mut self,
280 tunnel_id: TunnelId,
281 conflux_handler: ConfluxMsgHandler,
282 ) {
283 self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
284 self.conflux_handler = Some(conflux_handler);
285 }
286
287 #[cfg(feature = "conflux")]
292 pub(super) async fn begin_conflux_link(
293 &mut self,
294 hop: HopNum,
295 cell: AnyRelayMsgOuter,
296 runtime: &tor_rtcompat::DynTimeProvider,
297 ) -> Result<()> {
298 use tor_rtcompat::SleepProvider as _;
299
300 if self.conflux_handler.is_none() {
301 return Err(internal!(
302 "tried to send LINK cell before installing a ConfluxMsgHandler?!"
303 )
304 .into());
305 }
306
307 let cell = SendRelayCell {
308 hop: Some(hop),
309 early: false,
310 cell,
311 };
312 self.send_relay_cell(cell).await?;
313
314 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
315 return Err(internal!("ConfluxMsgHandler disappeared?!").into());
316 };
317
318 Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
319 }
320
321 pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
325 cfg_if::cfg_if! {
326 if #[cfg(feature = "conflux")] {
327 self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
328 } else {
329 None
330 }
331 }
332 }
333
334 #[cfg(test)]
336 pub(super) fn handle_add_fake_hop(
337 &mut self,
338 format: RelayCellFormat,
339 fwd_lasthop: bool,
340 rev_lasthop: bool,
341 dummy_peer_id: path::HopDetail,
342 params: &crate::client::circuit::CircParameters,
346 done: ReactorResultChannel<()>,
347 ) {
348 use tor_protover::{Protocols, named};
349
350 use crate::client::circuit::test::DummyCrypto;
351
352 assert!(matches!(format, RelayCellFormat::V0));
353 let _ = format; let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
356 let rev = Box::new(DummyCrypto::new(rev_lasthop));
357 let binding = None;
358
359 let settings = HopSettings::from_params_and_caps(
360 crate::circuit::circhop::HopNegotiationType::Full,
362 params,
363 &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
364 )
365 .expect("Can't construct HopSettings");
366 self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
367 .expect("could not add hop to circuit");
368 let _ = done.send(Ok(()));
369 }
370
371 fn encode_relay_cell(
375 crypto_out: &mut OutboundClientCrypt,
376 relay_format: RelayCellFormat,
377 hop: HopNum,
378 early: bool,
379 msg: AnyRelayMsgOuter,
380 ) -> Result<(AnyChanMsg, SendmeTag)> {
381 let mut body: RelayCellBody = msg
382 .encode(relay_format, &mut rand::rng())
383 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
384 .into();
385 let cmd = if early {
386 ChanCmd::RELAY_EARLY
387 } else {
388 ChanCmd::RELAY
389 };
390 let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
391 let msg = Relay::from(BoxedCellBody::from(body));
392 let msg = if early {
393 AnyChanMsg::RelayEarly(msg.into())
394 } else {
395 AnyChanMsg::Relay(msg)
396 };
397
398 Ok((msg, tag))
399 }
400
401 #[instrument(level = "trace", skip_all)]
412 pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
413 self.send_relay_cell_inner(msg, None).await
414 }
415
416 #[instrument(level = "trace", skip_all)]
422 async fn send_relay_cell_inner(
423 &mut self,
424 msg: SendRelayCell,
425 padding_info: Option<QueuedCellPaddingInfo>,
426 ) -> Result<()> {
427 let SendRelayCell {
428 hop,
429 early,
430 cell: msg,
431 } = msg;
432
433 let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
434 if !is_conflux_link && self.is_conflux_pending() {
435 return Err(internal!("tried to send cell on unlinked circuit").into());
438 }
439
440 trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
441
442 let runtime = self.runtime.clone();
444 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
445 let stream_id = msg.stream_id();
446 let hop = hop.expect("missing hop in client SendRelayCell?!");
447 let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
448
449 circhop.decrement_outbound_cell_limit()?;
454
455 if c_t_w {
457 if let Some(stream_id) = stream_id {
458 circhop.about_to_send(stream_id, msg.msg())?;
459 }
460 }
461
462 let relay_cmd = msg.cmd();
466
467 let (msg, tag) = Self::encode_relay_cell(
470 &mut self.crypto_out,
471 circhop.relay_cell_format(),
472 hop,
473 early,
474 msg,
475 )?;
476 if c_t_w {
479 circhop.ccontrol().note_data_sent(&runtime, &tag)?;
480 }
481
482 let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
484
485 self.send_msg(msg, padding_info).await?;
486
487 #[cfg(feature = "conflux")]
488 if let Some(conflux) = self.conflux_handler.as_mut() {
489 conflux.note_cell_sent(relay_cmd);
490 }
491
492 Ok(())
493 }
494
495 pub(super) fn handle_cell(
508 &mut self,
509 handlers: &mut CellHandlers,
510 leg: UniqId,
511 cell: ClientCircChanMsg,
512 ) -> Result<Vec<CircuitCmd>> {
513 trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
514 use ClientCircChanMsg::*;
515 match cell {
516 Relay(r) => self.handle_relay_cell(handlers, leg, r),
517 Destroy(d) => {
518 let reason = d.reason();
519 debug!(
520 circ_id = %self.unique_id,
521 "Received DESTROY cell. Reason: {} [{}]",
522 reason.human_str(),
523 reason
524 );
525
526 self.handle_destroy_cell().map(|c| vec![c])
527 }
528 }
529 }
530
531 fn decode_relay_cell(
534 &mut self,
535 cell: Relay,
536 ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
537 let cmd = cell.cmd();
539 let mut body = cell.into_relay_body().into();
540
541 let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
544
545 let decode_res = self
547 .hop_mut(hopnum)
548 .ok_or_else(|| {
549 Error::from(internal!(
550 "Trying to decode cell from nonexistent hop {:?}",
551 hopnum
552 ))
553 })?
554 .decode(body.into())?;
555
556 Ok((hopnum, tag, decode_res))
557 }
558
559 fn handle_relay_cell(
561 &mut self,
562 handlers: &mut CellHandlers,
563 leg: UniqId,
564 cell: Relay,
565 ) -> Result<Vec<CircuitCmd>> {
566 let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
567
568 if decode_res.is_padding() {
569 self.padding_ctrl.decrypted_padding(hopnum)?;
570 } else {
571 self.padding_ctrl.decrypted_data(hopnum);
572 }
573
574 self.hop_mut(hopnum)
576 .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
577 .decrement_inbound_cell_limit()?;
578
579 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
580
581 let send_circ_sendme = if c_t_w {
584 self.hop_mut(hopnum)
585 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
586 .ccontrol()
587 .note_data_received()?
588 } else {
589 false
590 };
591
592 let mut circ_cmds = vec![];
593 if send_circ_sendme {
595 let sendme = Sendme::from(tag);
600 let cell = AnyRelayMsgOuter::new(None, sendme.into());
601 circ_cmds.push(CircuitCmd::Send(SendRelayCell {
602 hop: Some(hopnum),
603 early: false,
604 cell,
605 }));
606
607 self.hop_mut(hopnum)
609 .ok_or_else(|| {
610 Error::from(internal!(
611 "Trying to send SENDME to nonexistent hop {:?}",
612 hopnum
613 ))
614 })?
615 .ccontrol()
616 .note_sendme_sent()?;
617 }
618
619 let (mut msgs, incomplete) = decode_res.into_parts();
620 while let Some(msg) = msgs.next() {
621 let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
622
623 match msg_status {
624 None => continue,
625 Some(msg @ CircuitCmd::CleanShutdown) => {
626 for m in msgs {
627 debug!(
628 "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
629 id = self.unique_id
630 );
631 }
632 if let Some(incomplete) = incomplete {
633 debug!(
634 "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
635 incomplete,
636 id = self.unique_id,
637 );
638 }
639 circ_cmds.push(msg);
640 return Ok(circ_cmds);
641 }
642 Some(msg) => {
643 circ_cmds.push(msg);
644 }
645 }
646 }
647
648 Ok(circ_cmds)
649 }
650
651 fn handle_relay_msg(
653 &mut self,
654 handlers: &mut CellHandlers,
655 hopnum: HopNum,
656 leg: UniqId,
657 cell_counts_toward_windows: bool,
658 msg: UnparsedRelayMsg,
659 ) -> Result<Option<CircuitCmd>> {
660 let streamid = msg_streamid(&msg)?;
663
664 let Some(streamid) = streamid else {
667 return self.handle_meta_cell(handlers, hopnum, msg);
668 };
669
670 #[cfg(feature = "conflux")]
671 let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
672 match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
673 ConfluxAction::Deliver(msg) => {
674 msg
681 }
682 ConfluxAction::Enqueue(msg) => {
683 return Ok(Some(CircuitCmd::Enqueue(msg)));
685 }
686 }
687 } else {
688 msg
691 };
692
693 self.handle_in_order_relay_msg(
694 handlers,
695 hopnum,
696 leg,
697 cell_counts_toward_windows,
698 streamid,
699 msg,
700 )
701 }
702
703 pub(super) fn handle_in_order_relay_msg(
705 &mut self,
706 handlers: &mut CellHandlers,
707 hopnum: HopNum,
708 leg: UniqId,
709 cell_counts_toward_windows: bool,
710 streamid: StreamId,
711 msg: UnparsedRelayMsg,
712 ) -> Result<Option<CircuitCmd>> {
713 let now = self.runtime.now();
714
715 #[cfg(feature = "conflux")]
716 if let Some(conflux) = self.conflux_handler.as_mut() {
717 conflux.inc_last_seq_delivered(&msg);
718 }
719
720 let path = self.mutable.path();
721
722 let nonexistent_hop_err = || Error::CircProto("Cell from nonexistent hop!".into());
723 let hop = self.hop_mut(hopnum).ok_or_else(nonexistent_hop_err)?;
724
725 let hop_detail = path
726 .iter()
727 .nth(usize::from(hopnum))
728 .ok_or_else(nonexistent_hop_err)?;
729
730 let res = hop.handle_msg(hop_detail, cell_counts_toward_windows, streamid, msg, now)?;
733
734 if let Some(msg) = res {
737 cfg_if::cfg_if! {
738 if #[cfg(feature = "hs-service")] {
739 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
740 } else {
741 return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
742 }
743 }
744 }
745
746 if let Some(cell) = hop.maybe_send_xoff(streamid)? {
748 let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
749 let cell = SendRelayCell {
750 hop: Some(hopnum),
751 early: false,
752 cell,
753 };
754 return Ok(Some(CircuitCmd::Send(cell)));
755 }
756
757 Ok(None)
758 }
759
760 #[cfg(feature = "conflux")]
768 fn handle_conflux_msg(
769 &mut self,
770 hop: HopNum,
771 msg: UnparsedRelayMsg,
772 ) -> Result<Option<ConfluxCmd>> {
773 let Some(conflux_handler) = self.conflux_handler.as_mut() else {
774 return Err(Error::CircProto(format!(
777 "Received {} cell from hop {} on non-conflux client circuit?!",
778 msg.cmd(),
779 hop.display(),
780 )));
781 };
782
783 Ok(conflux_handler.handle_conflux_msg(msg, hop))
784 }
785
786 #[cfg(feature = "conflux")]
790 pub(super) fn last_seq_sent(&self) -> Result<u64> {
791 let handler = self
792 .conflux_handler
793 .as_ref()
794 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
795
796 Ok(handler.last_seq_sent())
797 }
798
799 #[cfg(feature = "conflux")]
803 pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
804 let handler = self
805 .conflux_handler
806 .as_mut()
807 .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
808
809 handler.set_last_seq_sent(n);
810 Ok(())
811 }
812
813 #[cfg(feature = "conflux")]
817 pub(super) fn last_seq_recv(&self) -> Result<u64> {
818 let handler = self
819 .conflux_handler
820 .as_ref()
821 .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
822
823 Ok(handler.last_seq_recv())
824 }
825
826 #[cfg(feature = "hs-service")]
830 fn handle_incoming_stream_request(
831 &mut self,
832 handlers: &mut CellHandlers,
833 msg: UnparsedRelayMsg,
834 stream_id: StreamId,
835 hop_num: HopNum,
836 leg: UniqId,
837 ) -> Result<Option<CircuitCmd>> {
838 use tor_cell::relaycell::msg::EndReason;
839 use tor_error::into_internal;
840 use tor_log_ratelim::log_ratelim;
841
842 use crate::client::circuit::CIRCUIT_BUFFER_SIZE;
843 use crate::stream::incoming::StreamReqInfo;
844
845 let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
848 return Err(Error::CircProto(
849 "Cannot handle BEGIN cells on this circuit".into(),
850 ));
851 };
852
853 let expected_hop_num = handler
855 .hop_num
856 .ok_or_else(|| internal!("Handler HopNum is None in client impl?!"))?;
857
858 if hop_num != expected_hop_num {
859 return Err(Error::CircProto(format!(
860 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
861 expected_hop_num.display(),
862 msg.cmd(),
863 hop_num.display()
864 )));
865 }
866
867 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
868
869 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
877
878 if message_closes_stream {
879 hop.ending_msg_received(stream_id)?;
880
881 return Ok(None);
882 }
883
884 let begin = msg
885 .decode::<Begin>()
886 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
887 .into_msg();
888
889 let req = IncomingStreamRequest::Begin(begin);
890
891 {
892 use crate::client::stream::IncomingStreamRequestDisposition::*;
893
894 let ctx = crate::client::stream::IncomingStreamRequestContext { request: &req };
895 let view = CircHopSyncView::new(hop.outbound());
901
902 match handler.filter.as_mut().disposition(&ctx, &view)? {
903 Accept => {}
904 CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
905 RejectRequest(end) => {
906 let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
907 let cell = SendRelayCell {
908 hop: Some(hop_num),
909 early: false,
910 cell: end_msg,
911 };
912 return Ok(Some(CircuitCmd::Send(cell)));
913 }
914 }
915 }
916
917 let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
920 let relay_cell_format = hop.relay_cell_format();
921
922 let memquota = StreamAccount::new(&self.memquota)?;
923
924 let (sender, receiver) = stream_queue(
925 #[cfg(not(feature = "flowctl-cc"))]
926 STREAM_READER_BUFFER,
927 &memquota,
928 self.chan_sender.time_provider(),
929 )?;
930
931 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
932 self.chan_sender.time_provider().clone(),
933 memquota.as_raw_account(),
934 )?;
935
936 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
937
938 let mut drain_rate_request_tx = NotifySender::new_typed();
942 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
943
944 let cmd_checker = InboundDataCmdChecker::new_connected();
945 hop.add_ent_with_id(
946 sender,
947 msg_rx,
948 rate_limit_tx,
949 drain_rate_request_tx,
950 stream_id,
951 cmd_checker,
952 )?;
953
954 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
955 req,
956 stream_id,
957 hop: Some((leg, hop_num).into()),
958 msg_tx,
959 receiver,
960 rate_limit_stream: rate_limit_rx,
961 drain_rate_request_stream: drain_rate_request_rx,
962 memquota,
963 relay_cell_format,
964 });
965
966 log_ratelim!("Delivering message to incoming stream handler"; outcome);
967
968 if let Err(e) = outcome {
969 if e.is_full() {
970 let end_msg = AnyRelayMsgOuter::new(
974 Some(stream_id),
975 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
976 );
977
978 let cell = SendRelayCell {
979 hop: Some(hop_num),
980 early: false,
981 cell: end_msg,
982 };
983 return Ok(Some(CircuitCmd::Send(cell)));
984 } else if e.is_disconnected() {
985 debug!(
997 circ_id = %self.unique_id,
998 "Incoming stream request receiver dropped",
999 );
1000 return Err(Error::CircuitClosed);
1002 } else {
1003 return Err(Error::from((into_internal!(
1007 "try_send failed unexpectedly"
1008 ))(e)));
1009 }
1010 }
1011
1012 Ok(None)
1013 }
1014
1015 #[allow(clippy::unnecessary_wraps)]
1017 fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
1018 Ok(CircuitCmd::CleanShutdown)
1020 }
1021
1022 pub(super) async fn handle_create(
1024 &mut self,
1025 recv_created: oneshot::Receiver<CreateResponse>,
1026 handshake: CircuitHandshake,
1027 settings: HopSettings,
1028 done: ReactorResultChannel<()>,
1029 ) -> StdResult<(), ReactorError> {
1030 let ret = match handshake {
1031 CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
1032 CircuitHandshake::Ntor {
1033 public_key,
1034 ed_identity,
1035 } => {
1036 self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
1037 .await
1038 }
1039 CircuitHandshake::NtorV3 { public_key } => {
1040 self.create_firsthop_ntor_v3(recv_created, public_key, settings)
1041 .await
1042 }
1043 };
1044 let _ = done.send(ret); self.chan_sender.flush().await?;
1049
1050 Ok(())
1051 }
1052
1053 async fn create_impl<H, W, M>(
1059 &mut self,
1060 recvcreated: oneshot::Receiver<CreateResponse>,
1061 wrap: &W,
1062 key: &H::KeyType,
1063 mut settings: HopSettings,
1064 msg: &M,
1065 ) -> Result<()>
1066 where
1067 H: ClientHandshake + HandshakeAuxDataHandler,
1068 W: CreateHandshakeWrap,
1069 H::KeyGen: KeyGenerator,
1070 M: Borrow<H::ClientAuxData>,
1071 {
1072 let (state, msg) = H::client1(&mut rand::rng(), key, msg)?;
1077 let create_cell = wrap.to_chanmsg(msg);
1078 trace!(
1079 circ_id = %self.unique_id,
1080 create = %create_cell.cmd(),
1081 "Extending to hop 1",
1082 );
1083 self.send_msg(create_cell, None).await?;
1084
1085 let reply = recvcreated
1086 .await
1087 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1088
1089 let relay_handshake = wrap.decode_chanmsg(reply)?;
1090 let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1091
1092 H::handle_server_aux_data(&mut settings, &server_msg)?;
1093
1094 let BoxedClientLayer { fwd, back, binding } = settings
1095 .relay_crypt_protocol()
1096 .construct_client_layers(HandshakeRole::Initiator, keygen)?;
1097
1098 trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
1099
1100 let peer_id = self.channel.target().clone();
1101
1102 self.add_hop(
1103 path::HopDetail::Relay(peer_id),
1104 fwd,
1105 back,
1106 binding,
1107 &settings,
1108 )?;
1109 Ok(())
1110 }
1111
1112 async fn create_firsthop_fast(
1119 &mut self,
1120 recvcreated: oneshot::Receiver<CreateResponse>,
1121 settings: HopSettings,
1122 ) -> Result<()> {
1123 let wrap = CreateFastWrap;
1125 self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
1126 .await
1127 }
1128
1129 async fn create_firsthop_ntor(
1134 &mut self,
1135 recvcreated: oneshot::Receiver<CreateResponse>,
1136 ed_identity: pk::ed25519::Ed25519Identity,
1137 pubkey: NtorPublicKey,
1138 settings: HopSettings,
1139 ) -> Result<()> {
1140 let target = RelayIds::builder()
1142 .ed_identity(ed_identity)
1143 .rsa_identity(pubkey.id)
1144 .build()
1145 .expect("Unable to build RelayIds");
1146 self.channel.check_match(&target)?;
1147
1148 let wrap = Create2Wrap {
1149 handshake_type: HandshakeType::NTOR,
1150 };
1151 self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
1152 .await
1153 }
1154
1155 async fn create_firsthop_ntor_v3(
1160 &mut self,
1161 recvcreated: oneshot::Receiver<CreateResponse>,
1162 pubkey: NtorV3PublicKey,
1163 settings: HopSettings,
1164 ) -> Result<()> {
1165 let target = RelayIds::builder()
1167 .ed_identity(pubkey.id)
1168 .build()
1169 .expect("Unable to build RelayIds");
1170 self.channel.check_match(&target)?;
1171
1172 let client_extensions = settings.circuit_request_extensions()?;
1174 let wrap = Create2Wrap {
1175 handshake_type: HandshakeType::NTOR_V3,
1176 };
1177
1178 self.create_impl::<NtorV3Client, _, _>(
1179 recvcreated,
1180 &wrap,
1181 &pubkey,
1182 settings,
1183 &client_extensions,
1184 )
1185 .await
1186 }
1187
1188 pub(super) fn add_hop(
1192 &mut self,
1193 peer_id: path::HopDetail,
1194 fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1195 rev: Box<dyn InboundClientLayer + 'static + Send>,
1196 binding: Option<CircuitBinding>,
1197 settings: &HopSettings,
1198 ) -> StdResult<(), Bug> {
1199 let hop_num = self.hops.len();
1200 debug_assert_eq!(hop_num, usize::from(self.num_hops()));
1201
1202 if hop_num == usize::from(u8::MAX) {
1206 return Err(internal!(
1207 "cannot add more hops to a circuit with `u8::MAX` hops"
1208 ));
1209 }
1210
1211 let hop_num = (hop_num as u8).into();
1212
1213 let hop = CircHop::new(self.unique_id, hop_num, settings);
1214 self.hops.push(hop);
1215 self.crypto_in.add_layer(rev);
1216 self.crypto_out.add_layer(fwd);
1217 self.mutable.add_hop(peer_id, binding);
1218
1219 Ok(())
1220 }
1221
1222 #[allow(clippy::cognitive_complexity)]
1238 fn handle_meta_cell(
1239 &mut self,
1240 handlers: &mut CellHandlers,
1241 hopnum: HopNum,
1242 msg: UnparsedRelayMsg,
1243 ) -> Result<Option<CircuitCmd>> {
1244 if msg.cmd() == RelayCmd::SENDME {
1254 let sendme = msg
1255 .decode::<Sendme>()
1256 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1257 .into_msg();
1258
1259 return Ok(Some(CircuitCmd::HandleSendMe {
1260 hop: hopnum,
1261 sendme,
1262 }));
1263 }
1264 if msg.cmd() == RelayCmd::TRUNCATED {
1265 let truncated = msg
1266 .decode::<Truncated>()
1267 .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1268 .into_msg();
1269 let reason = truncated.reason();
1270 debug!(
1271 circ_id = %self.unique_id,
1272 "Truncated from hop {}. Reason: {} [{}]",
1273 hopnum.display(),
1274 reason.human_str(),
1275 reason
1276 );
1277
1278 return Ok(Some(CircuitCmd::CleanShutdown));
1279 }
1280
1281 if msg.cmd() == RelayCmd::DROP {
1282 cfg_if::cfg_if! {
1283 if #[cfg(feature = "circ-padding")] {
1284 return Ok(None);
1285 } else {
1286 use crate::util::err::ExcessPadding;
1287 return Err(Error::ExcessPadding(ExcessPadding::NoPaddingNegotiated, hopnum));
1288 }
1289 }
1290 }
1291
1292 trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
1293
1294 #[cfg(feature = "conflux")]
1295 if matches!(
1296 msg.cmd(),
1297 RelayCmd::CONFLUX_LINK
1298 | RelayCmd::CONFLUX_LINKED
1299 | RelayCmd::CONFLUX_LINKED_ACK
1300 | RelayCmd::CONFLUX_SWITCH
1301 ) {
1302 let cmd = self.handle_conflux_msg(hopnum, msg)?;
1303 return Ok(cmd.map(CircuitCmd::from));
1304 }
1305
1306 if self.is_conflux_pending() {
1307 warn!(
1308 circ_id = %self.unique_id,
1309 "received unexpected cell {msg:?} on unlinked conflux circuit",
1310 );
1311 return Err(Error::CircProto(
1312 "Received unexpected cell on unlinked circuit".into(),
1313 ));
1314 }
1315
1316 if let Some(mut handler) = handlers.meta_handler.take() {
1324 if handler.expected_hop() == (self.unique_id(), hopnum).into() {
1326 let ret = handler.handle_msg(msg, self);
1328 trace!(
1329 circ_id = %self.unique_id,
1330 result = ?ret,
1331 "meta handler completed",
1332 );
1333 match ret {
1334 #[cfg(feature = "send-control-msg")]
1335 Ok(MetaCellDisposition::Consumed) => {
1336 handlers.meta_handler = Some(handler);
1337 Ok(None)
1338 }
1339 Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1340 #[cfg(feature = "send-control-msg")]
1341 Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
1342 Err(e) => Err(e),
1343 }
1344 } else {
1345 handlers.meta_handler = Some(handler);
1348
1349 unsupported_client_cell!(msg, hopnum)
1350 }
1351 } else {
1352 unsupported_client_cell!(msg)
1355 }
1356 }
1357
1358 #[instrument(level = "trace", skip_all)]
1360 pub(super) fn handle_sendme(
1361 &mut self,
1362 hopnum: HopNum,
1363 msg: Sendme,
1364 signals: CongestionSignals,
1365 ) -> Result<Option<CircuitCmd>> {
1366 let runtime = self.runtime.clone();
1368
1369 let hop = self
1372 .hop_mut(hopnum)
1373 .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1374
1375 let tag = msg.into_sendme_tag().ok_or_else(||
1376 Error::CircProto("missing tag on circuit sendme".into()))?;
1379 hop.ccontrol()
1381 .note_sendme_received(&runtime, tag, signals)?;
1382 Ok(None)
1383 }
1384
1385 #[instrument(level = "trace", skip_all)]
1400 async fn send_msg(
1401 &mut self,
1402 msg: AnyChanMsg,
1403 info: Option<QueuedCellPaddingInfo>,
1404 ) -> Result<()> {
1405 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1406 Pin::new(&mut self.chan_sender)
1408 .send_unbounded((cell, info))
1409 .await?;
1410 Ok(())
1411 }
1412
1413 pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
1415 self.hops.remove_expired_halfstreams(now);
1416 }
1417
1418 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
1420 self.hops.hop(hopnum)
1421 }
1422
1423 pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1425 self.hops.get_mut(hopnum)
1426 }
1427
1428 #[allow(clippy::too_many_arguments)]
1431 pub(super) fn begin_stream(
1432 &mut self,
1433 hop_num: HopNum,
1434 message: AnyRelayMsg,
1435 sender: StreamQueueSender,
1436 rx: StreamMpscReceiver<AnyRelayMsg>,
1437 rate_limit_notifier: watch::Sender<StreamRateLimit>,
1438 drain_rate_requester: NotifySender<DrainRateRequest>,
1439 cmd_checker: AnyCmdChecker,
1440 ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
1441 let Some(hop) = self.hop_mut(hop_num) else {
1442 return Err(internal!(
1443 "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
1444 self.unique_id,
1445 ));
1446 };
1447
1448 Ok(hop.begin_stream(
1449 message,
1450 sender,
1451 rx,
1452 rate_limit_notifier,
1453 drain_rate_requester,
1454 cmd_checker,
1455 ))
1456 }
1457
1458 #[instrument(level = "trace", skip_all)]
1460 pub(super) async fn close_stream(
1461 &mut self,
1462 hop_num: HopNum,
1463 sid: StreamId,
1464 behav: CloseStreamBehavior,
1465 reason: streammap::TerminateReason,
1466 expiry: Instant,
1467 ) -> Result<()> {
1468 if let Some(hop) = self.hop_mut(hop_num) {
1469 let res = hop.close_stream(sid, behav, reason, expiry)?;
1470 if let Some(cell) = res {
1471 self.send_relay_cell(cell).await?;
1472 }
1473 }
1474 Ok(())
1475 }
1476
1477 pub(super) fn has_streams(&self) -> bool {
1483 self.hops.has_streams()
1484 }
1485
1486 pub(super) fn num_hops(&self) -> u8 {
1488 self.hops
1493 .len()
1494 .try_into()
1495 .expect("`hops.len()` has more than `u8::MAX` hops")
1496 }
1497
1498 pub(super) fn has_hops(&self) -> bool {
1500 !self.hops.is_empty()
1501 }
1502
1503 pub(super) fn last_hop_num(&self) -> Option<HopNum> {
1507 let num_hops = self.num_hops();
1508 if num_hops == 0 {
1509 return None;
1511 }
1512 Some(HopNum::from(num_hops - 1))
1513 }
1514
1515 pub(super) fn path(&self) -> Arc<path::Path> {
1519 self.mutable.path()
1520 }
1521
1522 pub(super) fn clock_skew(&self) -> ClockSkew {
1525 self.channel.clock_skew()
1526 }
1527
1528 pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
1532 let hop = self.hop(hop)?;
1533 Some(hop.ccontrol().uses_stream_sendme())
1534 }
1535
1536 pub(super) fn is_conflux_pending(&self) -> bool {
1538 let Some(status) = self.conflux_status() else {
1539 return false;
1540 };
1541
1542 status != ConfluxStatus::Linked
1543 }
1544
1545 pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
1549 cfg_if::cfg_if! {
1550 if #[cfg(feature = "conflux")] {
1551 self.conflux_handler
1552 .as_ref()
1553 .map(|handler| handler.status())
1554 } else {
1555 None
1556 }
1557 }
1558 }
1559
1560 #[cfg(feature = "conflux")]
1562 pub(super) fn init_rtt(&self) -> Option<Duration> {
1563 self.conflux_handler
1564 .as_ref()
1565 .map(|handler| handler.init_rtt())?
1566 }
1567
1568 #[cfg(feature = "circ-padding-manual")]
1574 pub(super) fn set_padding_at_hop(
1575 &self,
1576 hop: HopNum,
1577 padder: Option<padding::CircuitPadder>,
1578 ) -> Result<()> {
1579 if self.hop(hop).is_none() {
1580 return Err(Error::NoSuchHop);
1581 }
1582 self.padding_ctrl.install_padder_padding_at_hop(hop, padder);
1583 Ok(())
1584 }
1585
1586 #[cfg(feature = "circ-padding")]
1595 fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
1596 crate::circuit::padding::padding_disposition(
1597 send_padding,
1598 &self.chan_sender,
1599 self.padding_block.as_ref(),
1600 )
1601 }
1602
1603 #[cfg(feature = "circ-padding")]
1605 pub(super) async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
1606 use CircPaddingDisposition::*;
1607
1608 let target_hop = send_padding.hop;
1609
1610 match self.padding_disposition(&send_padding) {
1611 QueuePaddingNormally => {
1612 let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
1613 self.queue_padding_cell_for_hop(target_hop, queue_info)
1614 .await?;
1615 }
1616 QueuePaddingAndBypass => {
1617 let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
1618 self.queue_padding_cell_for_hop(target_hop, queue_info)
1619 .await?;
1620 }
1621 TreatQueuedCellAsPadding => {
1622 self.padding_ctrl
1623 .replaceable_padding_already_queued(target_hop, send_padding);
1624 }
1625 }
1626 Ok(())
1627 }
1628
1629 #[cfg(feature = "circ-padding")]
1633 async fn queue_padding_cell_for_hop(
1634 &mut self,
1635 target_hop: HopNum,
1636 queue_info: Option<QueuedCellPaddingInfo>,
1637 ) -> Result<()> {
1638 use tor_cell::relaycell::msg::Drop as DropMsg;
1639 let msg = SendRelayCell {
1640 hop: Some(target_hop),
1641 early: false,
1643 cell: AnyRelayMsgOuter::new(None, DropMsg::default().into()),
1644 };
1645 self.send_relay_cell_inner(msg, queue_info).await
1646 }
1647
1648 #[cfg(feature = "circ-padding")]
1651 pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
1652 self.chan_sender.start_blocking();
1653 self.padding_block = Some(block);
1654 }
1655
1656 #[cfg(feature = "circ-padding")]
1658 pub(super) fn stop_blocking_for_padding(&mut self) {
1659 self.chan_sender.stop_blocking();
1660 self.padding_block = None;
1661 }
1662
1663 pub(super) fn estimate_cbt(&self, length: usize) -> Duration {
1665 self.timeouts.circuit_build_timeout(length)
1666 }
1667}
1668
1669impl Drop for Circuit {
1670 fn drop(&mut self) {
1671 let _ = self.channel.close_circuit(self.channel_id);
1672 }
1673}