1mod halfstream;
4
5use crate::congestion::sendme;
6use crate::stream::RECV_WINDOW_INIT;
7use crate::stream::StreamMpscReceiver;
8use crate::stream::cmdcheck::AnyCmdChecker;
9use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10use crate::stream::queue::StreamQueueSender;
11use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
12use crate::{Error, Result};
13use pin_project::pin_project;
14use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
15use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
16use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
17use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
18use tor_cell::relaycell::{StreamId, msg::AnyRelayMsg};
19
20use std::collections::HashMap;
21use std::collections::hash_map;
22use std::num::NonZeroU16;
23use std::pin::Pin;
24use std::task::{Poll, Waker};
25use tor_error::{bad_api_usage, internal};
26use web_time_compat::Instant;
27
28use rand::Rng;
29
30use tracing::debug;
31
32use halfstream::HalfStream;
33
34#[derive(Debug)]
39#[pin_project]
40pub(super) struct OpenStreamEnt {
41 pub(super) sink: StreamQueueSender,
43 pub(super) dropped: u16,
46 pub(super) cmd_checker: AnyCmdChecker,
48 flow_ctrl: StreamFlowCtrl,
52 #[pin]
57 rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
58 flow_ctrl_waker: Option<Waker>,
61}
62
63impl OpenStreamEnt {
64 pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
66 self.flow_ctrl.can_send(msg)
67 }
68
69 pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
74 self.flow_ctrl.put_for_incoming_sendme(msg)?;
75 if let Some(waker) = self.flow_ctrl_waker.take() {
77 waker.wake();
78 }
79 Ok(())
80 }
81
82 fn approx_stream_bytes_buffered(&self) -> usize {
84 self.sink.approx_stream_bytes()
96 }
97
98 pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
103 self.flow_ctrl
104 .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
105 }
106
107 pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
112 self.flow_ctrl
113 .maybe_send_xoff(self.approx_stream_bytes_buffered())
114 }
115
116 pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
121 self.flow_ctrl.handle_incoming_xon(msg)
122 }
123
124 pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
129 self.flow_ctrl.handle_incoming_xoff(msg)
130 }
131
132 pub(crate) fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
139 self.flow_ctrl.about_to_send(msg)
140 }
141}
142
143#[derive(Debug)]
147#[pin_project]
148struct OpenStreamEntStream {
149 #[pin]
151 inner: OpenStreamEnt,
152}
153
154impl futures::Stream for OpenStreamEntStream {
155 type Item = AnyRelayMsg;
156
157 fn poll_next(
158 mut self: std::pin::Pin<&mut Self>,
159 cx: &mut std::task::Context<'_>,
160 ) -> Poll<Option<Self::Item>> {
161 if !self.as_mut().poll_peek_mut(cx).is_ready() {
162 return Poll::Pending;
163 };
164 let res = self.project().inner.project().rx.poll_next(cx);
165 debug_assert!(res.is_ready());
166 res
173 }
174}
175
176impl PeekableStream for OpenStreamEntStream {
177 fn poll_peek_mut(
178 self: Pin<&mut Self>,
179 cx: &mut std::task::Context<'_>,
180 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
181 let s = self.project();
182 let inner = s.inner.project();
183 let m = match inner.rx.poll_peek_mut(cx) {
184 Poll::Ready(Some(m)) => m,
185 Poll::Ready(None) => return Poll::Ready(None),
186 Poll::Pending => return Poll::Pending,
187 };
188 if !inner.flow_ctrl.can_send(m) {
189 inner.flow_ctrl_waker.replace(cx.waker().clone());
190 return Poll::Pending;
191 }
192 Poll::Ready(Some(m))
193 }
194}
195
196impl UnobtrusivePeekableStream for OpenStreamEntStream {
197 fn unobtrusive_peek_mut(
198 self: std::pin::Pin<&mut Self>,
199 ) -> Option<&mut <Self as futures::Stream>::Item> {
200 let s = self.project();
201 let inner = s.inner.project();
202 let m = inner.rx.unobtrusive_peek_mut()?;
203 if inner.flow_ctrl.can_send(m) {
204 Some(m)
205 } else {
206 None
207 }
208 }
209}
210
211#[derive(Debug)]
214pub(super) struct EndSentStreamEnt {
215 pub(super) half_stream: HalfStream,
218 explicitly_dropped: bool,
221 pub(super) expiry: Instant,
226}
227
228#[derive(Debug)]
230enum ClosedStreamEnt {
231 EndReceived,
234 EndSent(EndSentStreamEnt),
240}
241
242pub(super) enum StreamEntMut<'a> {
244 Open(&'a mut OpenStreamEnt),
246 EndReceived,
249 EndSent(&'a mut EndSentStreamEnt),
252}
253
254impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
255 fn from(value: &'a mut ClosedStreamEnt) -> Self {
256 match value {
257 ClosedStreamEnt::EndReceived => Self::EndReceived,
258 ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
259 }
260 }
261}
262
263impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
264 fn from(value: &'a mut OpenStreamEntStream) -> Self {
265 Self::Open(&mut value.inner)
266 }
267}
268
269#[derive(Debug, Copy, Clone, Eq, PartialEq)]
272pub(super) enum ShouldSendEnd {
273 Send,
275 DontSend,
277}
278
279#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
281struct Priority(u64);
282
283pub(crate) struct StreamMap {
286 open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
290 closed_streams: HashMap<StreamId, ClosedStreamEnt>,
294 next_stream_id: StreamId,
297 next_priority: Priority,
302}
303
304impl StreamMap {
305 pub(crate) fn new() -> Self {
307 let mut rng = rand::rng();
308 let next_stream_id: NonZeroU16 = rng.random();
309 StreamMap {
310 open_streams: StreamPollSet::new(),
311 closed_streams: HashMap::new(),
312 next_stream_id: next_stream_id.into(),
313 next_priority: Priority(0),
314 }
315 }
316
317 pub(super) fn n_open_streams(&self) -> usize {
319 self.open_streams.len()
320 }
321
322 pub(super) fn tunnel_activity(&self) -> crate::util::tunnel_activity::TunnelActivity {
324 self.open_streams.tunnel_activity()
325 }
326
327 fn take_next_priority(&mut self) -> Priority {
329 let rv = self.next_priority;
330 self.next_priority = Priority(rv.0 + 1);
331 rv
332 }
333
334 pub(super) fn add_ent(
336 &mut self,
337 sink: StreamQueueSender,
338 rx: StreamMpscReceiver<AnyRelayMsg>,
339 flow_ctrl: StreamFlowCtrl,
340 cmd_checker: AnyCmdChecker,
341 ) -> Result<StreamId> {
342 let mut stream_ent = OpenStreamEntStream {
343 inner: OpenStreamEnt {
344 sink,
345 flow_ctrl,
346 dropped: 0,
347 cmd_checker,
348 rx: StreamUnobtrusivePeeker::new(rx),
349 flow_ctrl_waker: None,
350 },
351 };
352 let priority = self.take_next_priority();
353 for _ in 1..=65536 {
358 let id: StreamId = self.next_stream_id;
359 self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
360 stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
361 Ok(_) => return Ok(id),
362 Err(KeyAlreadyInsertedError {
363 key: _,
364 priority: _,
365 stream,
366 }) => stream,
367 };
368 }
369
370 Err(Error::IdRangeFull)
371 }
372
373 #[cfg(any(feature = "hs-service", feature = "relay"))]
375 pub(super) fn add_ent_with_id(
376 &mut self,
377 sink: StreamQueueSender,
378 rx: StreamMpscReceiver<AnyRelayMsg>,
379 flow_ctrl: StreamFlowCtrl,
380 id: StreamId,
381 cmd_checker: AnyCmdChecker,
382 ) -> Result<()> {
383 let stream_ent = OpenStreamEntStream {
384 inner: OpenStreamEnt {
385 sink,
386 flow_ctrl,
387 dropped: 0,
388 cmd_checker,
389 rx: StreamUnobtrusivePeeker::new(rx),
390 flow_ctrl_waker: None,
391 },
392 };
393 let priority = self.take_next_priority();
394 self.open_streams
395 .try_insert(id, priority, stream_ent)
396 .map_err(|_| Error::IdUnavailable(id))
397 }
398
399 pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
401 if let Some(e) = self.open_streams.stream_mut(&id) {
402 return Some(e.into());
403 }
404 if let Some(e) = self.closed_streams.get_mut(&id) {
405 return Some(e.into());
406 }
407 None
408 }
409
410 pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
415 if self.open_streams.remove(&id).is_some() {
416 let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
417 debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
418 return Ok(());
419 }
420 let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
421 return Err(Error::CircProto(
422 "Received END cell on nonexistent stream".into(),
423 ));
424 };
425 match closed_entry.get() {
427 ClosedStreamEnt::EndReceived => Err(Error::CircProto(
428 "Received two END cells on same stream".into(),
429 )),
430 ClosedStreamEnt::EndSent { .. } => {
431 debug!("Actually got an end cell on a half-closed stream!");
432 closed_entry.remove_entry();
435 Ok(())
436 }
437 }
438 }
439
440 pub(super) fn terminate(
444 &mut self,
445 id: StreamId,
446 why: TerminateReason,
447 expiry: Instant,
448 ) -> Result<ShouldSendEnd> {
449 use TerminateReason as TR;
450
451 if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
452 let OpenStreamEntStream {
453 inner:
454 OpenStreamEnt {
455 flow_ctrl,
456 dropped,
457 cmd_checker,
458 ..
461 },
462 } = ent;
463 let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
467 recv_window.decrement_n(dropped)?;
468 let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
470 let explicitly_dropped = why == TR::StreamTargetClosed;
471
472 let prev = self.closed_streams.insert(
473 id,
474 ClosedStreamEnt::EndSent(EndSentStreamEnt {
475 half_stream,
476 explicitly_dropped,
477 expiry,
478 }),
479 );
480 debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
481 return Ok(ShouldSendEnd::Send);
482 }
483
484 match self
486 .closed_streams
487 .remove(&id)
488 .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
489 {
490 ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
491 ClosedStreamEnt::EndSent(EndSentStreamEnt {
492 ref mut explicitly_dropped,
493 ..
494 }) => match (*explicitly_dropped, why) {
495 (false, TR::StreamTargetClosed) => {
496 *explicitly_dropped = true;
497 Ok(ShouldSendEnd::DontSend)
498 }
499 (true, TR::StreamTargetClosed) => {
500 Err(bad_api_usage!("Tried to close an already closed stream.").into())
501 }
502 (_, TR::ExplicitEnd) => Err(bad_api_usage!(
503 "Tried to end an already closed stream. (explicitly_dropped={:?})",
504 *explicitly_dropped
505 )
506 .into()),
507 },
508 }
509 }
510
511 pub(super) fn poll_ready_streams_iter<'a>(
521 &'a mut self,
522 cx: &mut std::task::Context,
523 ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a + use<'a> {
524 self.open_streams
525 .poll_ready_iter_mut(cx)
526 .map(|(sid, _priority, ent)| {
527 let ent = Pin::new(ent);
528 let msg = ent.unobtrusive_peek();
529 (*sid, msg)
530 })
531 }
532
533 pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
537 let new_priority = self.take_next_priority();
538 let (_prev_priority, val) = self
539 .open_streams
540 .take_ready_value_and_reprioritize(&sid, new_priority)?;
541 Some(val)
542 }
543
544 pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
546 self.closed_streams.retain(|_sid, entry| match entry {
547 ClosedStreamEnt::EndReceived => true,
548 ClosedStreamEnt::EndSent(ent) => ent.expiry > now,
549 });
550 }
551}
552
553#[derive(Copy, Clone, Debug, PartialEq, Eq)]
557pub(super) enum TerminateReason {
558 StreamTargetClosed,
561 ExplicitEnd,
564}
565
566fn wrapping_next_stream_id(id: StreamId) -> StreamId {
568 let next_val = NonZeroU16::from(id)
569 .checked_add(1)
570 .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
571 next_val.into()
572}
573
574#[cfg(test)]
575mod test {
576 #![allow(clippy::bool_assert_comparison)]
578 #![allow(clippy::clone_on_copy)]
579 #![allow(clippy::dbg_macro)]
580 #![allow(clippy::mixed_attributes_style)]
581 #![allow(clippy::print_stderr)]
582 #![allow(clippy::print_stdout)]
583 #![allow(clippy::single_char_pattern)]
584 #![allow(clippy::unwrap_used)]
585 #![allow(clippy::unchecked_time_subtraction)]
586 #![allow(clippy::useless_vec)]
587 #![allow(clippy::needless_pass_by_value)]
588 use super::*;
590 use crate::client::circuit::test::fake_mpsc;
591 use crate::stream::queue::fake_stream_queue;
592 use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
593 use web_time_compat::InstantExt;
594
595 #[test]
596 fn test_wrapping_next_stream_id() {
597 let one = StreamId::new(1).unwrap();
598 let two = StreamId::new(2).unwrap();
599 let max = StreamId::new(0xffff).unwrap();
600 assert_eq!(wrapping_next_stream_id(one), two);
601 assert_eq!(wrapping_next_stream_id(max), one);
602 }
603
604 #[test]
605 #[allow(clippy::cognitive_complexity)]
606 fn streammap_basics() -> Result<()> {
607 let mut map = StreamMap::new();
608 let mut next_id = map.next_stream_id;
609 let mut ids = Vec::new();
610
611 assert_eq!(map.n_open_streams(), 0);
612
613 for n in 1..=128 {
615 let (sink, _) = fake_stream_queue(
616 #[cfg(not(feature = "flowctl-cc"))]
617 128,
618 );
619 let (_, rx) = fake_mpsc(2);
620 let id = map.add_ent(
621 sink,
622 rx,
623 StreamFlowCtrl::new_window(StreamSendWindow::new(500)),
624 OutboundDataCmdChecker::new_any(),
625 )?;
626 let expect_id: StreamId = next_id;
627 assert_eq!(expect_id, id);
628 next_id = wrapping_next_stream_id(next_id);
629 ids.push(id);
630 assert_eq!(map.n_open_streams(), n);
631 }
632
633 let nonesuch_id = next_id;
635 assert!(matches!(
636 map.get_mut(ids[0]),
637 Some(StreamEntMut::Open { .. })
638 ));
639 assert!(map.get_mut(nonesuch_id).is_none());
640
641 assert!(map.ending_msg_received(nonesuch_id).is_err());
643 assert_eq!(map.n_open_streams(), 128);
644 assert!(map.ending_msg_received(ids[1]).is_ok());
645 assert_eq!(map.n_open_streams(), 127);
646 assert!(matches!(
647 map.get_mut(ids[1]),
648 Some(StreamEntMut::EndReceived)
649 ));
650 assert!(map.ending_msg_received(ids[1]).is_err());
651
652 use TerminateReason as TR;
654 let expiry = Instant::get(); assert!(map.terminate(nonesuch_id, TR::ExplicitEnd, expiry).is_err());
656 assert_eq!(map.n_open_streams(), 127);
657 assert_eq!(
658 map.terminate(ids[2], TR::ExplicitEnd, expiry).unwrap(),
659 ShouldSendEnd::Send
660 );
661 assert_eq!(map.n_open_streams(), 126);
662 assert!(matches!(
663 map.get_mut(ids[2]),
664 Some(StreamEntMut::EndSent { .. })
665 ));
666 assert_eq!(
667 map.terminate(ids[1], TR::ExplicitEnd, expiry).unwrap(),
668 ShouldSendEnd::DontSend
669 );
670 assert_eq!(map.n_open_streams(), 126);
673 assert!(map.get_mut(ids[1]).is_none());
674
675 assert!(map.ending_msg_received(ids[2]).is_ok());
677 assert!(map.get_mut(ids[2]).is_none());
678 assert_eq!(map.n_open_streams(), 126);
679
680 Ok(())
681 }
682}