tor_proto/client/reactor/circuit/
circhop.rs1use super::{CircuitCmd, CloseStreamBehavior};
4use crate::circuit::circhop::{CircHopInbound, CircHopOutbound, HopSettings, SendRelayCell};
5use crate::client::reactor::circuit::path::PathEntry;
6use crate::congestion::CongestionControl;
7use crate::crypto::cell::HopNum;
8use crate::stream::StreamMpscReceiver;
9use crate::stream::cmdcheck::AnyCmdChecker;
10use crate::stream::flow_ctrl::state::StreamRateLimit;
11use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
12use crate::stream::queue::StreamQueueSender;
13use crate::streammap::{self, StreamEntMut, StreamMap};
14use crate::tunnel::TunnelScopedCircId;
15use crate::util::notify::NotifySender;
16use crate::util::tunnel_activity::TunnelActivity;
17use crate::{Error, Result};
18
19use futures::Stream;
20use futures::stream::FuturesUnordered;
21use postage::watch;
22use smallvec::SmallVec;
23use tor_cell::chancell::BoxedCellBody;
24use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
25use tor_cell::relaycell::msg::AnyRelayMsg;
26use tor_cell::relaycell::{
27 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, StreamId,
28 UnparsedRelayMsg,
29};
30use web_time_compat::Instant;
31
32use safelog::sensitive as sv;
33use tor_error::Bug;
34use tracing::instrument;
35
36use std::result::Result as StdResult;
37use std::sync::{Arc, Mutex, MutexGuard};
38use std::task::Poll;
39
40#[cfg(test)]
41use tor_cell::relaycell::msg::SendmeTag;
42
43const NUM_HOPS: usize = 3;
47
48#[derive(Default)]
50pub(crate) struct CircHopList {
51 hops: SmallVec<[CircHop; NUM_HOPS]>,
53}
54
55impl CircHopList {
56 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
58 self.hops.get(Into::<usize>::into(hopnum))
59 }
60
61 pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
63 self.hops.get_mut(Into::<usize>::into(hopnum))
64 }
65
66 pub(crate) fn push(&mut self, hop: CircHop) {
68 self.hops.push(hop);
69 }
70
71 pub(crate) fn is_empty(&self) -> bool {
73 self.hops.is_empty()
74 }
75
76 pub(crate) fn len(&self) -> usize {
78 self.hops.len()
79 }
80
81 pub(in crate::client::reactor) fn ready_streams_iterator(
94 &self,
95 exclude: Option<HopNum>,
96 ) -> impl Stream<Item = CircuitCmd> + use<> {
97 self.hops
98 .iter()
99 .enumerate()
100 .filter_map(|(i, hop)| {
101 let hop_num = HopNum::from(i as u8);
102
103 if exclude == Some(hop_num) {
104 return None;
106 }
107
108 if !hop.ccontrol().can_send() {
109 return None;
125 }
126
127 let hop_map = Arc::clone(self.hops[i].stream_map());
128 Some(futures::future::poll_fn(move |cx| {
129 let mut hop_map = hop_map.lock().expect("lock poisoned");
136 let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
137 return Poll::Pending;
139 };
140
141 if msg.is_none() {
142 return Poll::Ready(CircuitCmd::CloseStream {
143 hop: hop_num,
144 sid,
145 behav: CloseStreamBehavior::default(),
146 reason: streammap::TerminateReason::StreamTargetClosed,
147 });
148 };
149 let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
150
151 #[allow(unused)] let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
153 panic!("Stream {sid} disappeared");
154 };
155
156 debug_assert!(
157 s.can_send(&msg),
158 "Stream {sid} produced a message it can't send: {msg:?}"
159 );
160
161 let cell = SendRelayCell {
162 hop: Some(hop_num),
163 early: false,
164 cell: AnyRelayMsgOuter::new(Some(sid), msg),
165 };
166 Poll::Ready(CircuitCmd::Send(cell))
167 }))
168 })
169 .collect::<FuturesUnordered<_>>()
170 }
171
172 pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
174 for hop in self.hops.iter_mut() {
175 hop.stream_map()
176 .lock()
177 .expect("lock poisoned")
178 .remove_expired_halfstreams(now);
179 }
180 }
181
182 pub(super) fn has_streams(&self) -> bool {
188 self.hops.iter().any(|hop| {
189 hop.stream_map()
190 .lock()
191 .expect("lock poisoned")
192 .n_open_streams()
193 > 0
194 })
195 }
196
197 pub(crate) fn tunnel_activity(&self) -> TunnelActivity {
199 self.hops
200 .iter()
201 .map(|hop| {
202 hop.stream_map()
203 .lock()
204 .expect("Poisoned lock")
205 .tunnel_activity()
206 })
207 .max()
208 .unwrap_or_else(TunnelActivity::never_used)
209 }
210}
211
212pub(crate) struct CircHop {
214 unique_id: TunnelScopedCircId,
216 hop_num: HopNum,
218 inbound: CircHopInbound,
222 outbound: CircHopOutbound,
226}
227
228impl CircHop {
229 pub(crate) fn new(
231 unique_id: TunnelScopedCircId,
232 hop_num: HopNum,
233 settings: &HopSettings,
234 ) -> Self {
235 let relay_format = settings.relay_crypt_protocol().relay_cell_format();
236
237 let ccontrol = Arc::new(Mutex::new(CongestionControl::new(&settings.ccontrol)));
238 let inbound = CircHopInbound::new(RelayCellDecoder::new(relay_format), settings);
239
240 let outbound = CircHopOutbound::new(
241 ccontrol,
242 relay_format,
243 Arc::new(settings.flow_ctrl_params.clone()),
244 settings,
245 );
246
247 CircHop {
248 unique_id,
249 hop_num,
250 inbound,
251 outbound,
252 }
253 }
254
255 pub(crate) fn begin_stream(
258 &mut self,
259 message: AnyRelayMsg,
260 sender: StreamQueueSender,
261 rx: StreamMpscReceiver<AnyRelayMsg>,
262 rate_limit_updater: watch::Sender<StreamRateLimit>,
263 drain_rate_requester: NotifySender<DrainRateRequest>,
264 cmd_checker: AnyCmdChecker,
265 ) -> Result<(SendRelayCell, StreamId)> {
266 self.outbound.begin_stream(
267 Some(self.hop_num),
268 message,
269 sender,
270 rx,
271 rate_limit_updater,
272 drain_rate_requester,
273 cmd_checker,
274 )
275 }
276
277 pub(crate) fn close_stream(
282 &mut self,
283 id: StreamId,
284 message: CloseStreamBehavior,
285 why: streammap::TerminateReason,
286 expiry: Instant,
287 ) -> Result<Option<SendRelayCell>> {
288 self.outbound
289 .close_stream(self.unique_id, id, Some(self.hop_num), message, why, expiry)
290 }
291
292 #[instrument(level = "trace", skip_all)]
296 pub(crate) fn maybe_send_xon(
297 &mut self,
298 rate: XonKbpsEwma,
299 id: StreamId,
300 ) -> Result<Option<Xon>> {
301 self.outbound.maybe_send_xon(rate, id)
302 }
303
304 pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
308 self.outbound.maybe_send_xoff(id)
309 }
310
311 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
316 self.outbound.relay_cell_format()
317 }
318
319 #[cfg(test)]
321 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
322 self.outbound.send_window_and_expected_tags()
323 }
324
325 pub(crate) fn ccontrol(&self) -> MutexGuard<'_, CongestionControl> {
327 self.outbound.ccontrol().lock().expect("poisoned lock")
328 }
329
330 pub(crate) fn outbound(&self) -> &CircHopOutbound {
332 &self.outbound
333 }
334
335 pub(crate) fn about_to_send(&mut self, stream_id: StreamId, msg: &AnyRelayMsg) -> Result<()> {
341 self.outbound.about_to_send(self.unique_id, stream_id, msg)
342 }
343
344 #[cfg(feature = "hs-service")]
346 pub(crate) fn add_ent_with_id(
347 &self,
348 sink: StreamQueueSender,
349 rx: StreamMpscReceiver<AnyRelayMsg>,
350 rate_limit_updater: watch::Sender<StreamRateLimit>,
351 drain_rate_requester: NotifySender<DrainRateRequest>,
352 stream_id: StreamId,
353 cmd_checker: AnyCmdChecker,
354 ) -> Result<()> {
355 self.outbound.add_ent_with_id(
356 sink,
357 rx,
358 rate_limit_updater,
359 drain_rate_requester,
360 stream_id,
361 cmd_checker,
362 )
363 }
364
365 #[cfg(feature = "hs-service")]
370 pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
371 self.outbound.ending_msg_received(stream_id)
372 }
373
374 pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
379 self.inbound.decode(cell)
380 }
381
382 pub(super) fn handle_msg(
390 &self,
391 hop_detail: &PathEntry,
392 cell_counts_toward_windows: bool,
393 streamid: StreamId,
394 msg: UnparsedRelayMsg,
395 now: Instant,
396 ) -> Result<Option<UnparsedRelayMsg>> {
397 let possible_proto_violation_err = |streamid: StreamId| Error::UnknownStream {
398 src: sv(hop_detail.clone()),
399 streamid,
400 };
401
402 self.outbound.handle_msg(
403 possible_proto_violation_err,
404 cell_counts_toward_windows,
405 streamid,
406 msg,
407 now,
408 )
409 }
410
411 pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
413 self.outbound.stream_map()
414 }
415
416 pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
420 self.outbound.set_stream_map(map)
421 }
422
423 pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
426 self.outbound.decrement_cell_limit()
427 }
428
429 pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
432 self.inbound.decrement_cell_limit()
433 }
434}