Skip to main content

tor_proto/
streammap.rs

1//! Types and code for mapping StreamIDs to streams on a circuit.
2
3mod halfstream;
4
5use crate::congestion::sendme;
6use crate::stream::RECV_WINDOW_INIT;
7use crate::stream::StreamMpscReceiver;
8use crate::stream::cmdcheck::AnyCmdChecker;
9use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10use crate::stream::queue::StreamQueueSender;
11use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
12use crate::{Error, Result};
13use pin_project::pin_project;
14use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
15use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
16use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
17use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
18use tor_cell::relaycell::{StreamId, msg::AnyRelayMsg};
19
20use std::collections::HashMap;
21use std::collections::hash_map;
22use std::num::NonZeroU16;
23use std::pin::Pin;
24use std::task::{Poll, Waker};
25use tor_error::{bad_api_usage, internal};
26use web_time_compat::Instant;
27
28use rand::Rng;
29
30use tracing::debug;
31
32use halfstream::HalfStream;
33
34/// Entry for an open stream
35///
36/// (For the purposes of this module, an open stream is one where we have not
37/// sent or received any message indicating that the stream is ended.)
38#[derive(Debug)]
39#[pin_project]
40pub(super) struct OpenStreamEnt {
41    /// Sink to send relay cells tagged for this stream into.
42    pub(super) sink: StreamQueueSender,
43    /// Number of cells dropped due to the stream disappearing before we can
44    /// transform this into an `EndSent`.
45    pub(super) dropped: u16,
46    /// A `CmdChecker` used to tell whether cells on this stream are valid.
47    pub(super) cmd_checker: AnyCmdChecker,
48    /// Flow control for this stream.
49    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
50    // `flow_ctrl_waker` is woken.
51    flow_ctrl: StreamFlowCtrl,
52    /// Stream for cells that should be sent down this stream.
53    // Not directly exposed. This should only be polled via
54    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
55    // only be used through `StreamPollSet`.
56    #[pin]
57    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
58    /// Waker to be woken when more sending capacity becomes available (e.g.
59    /// receiving a SENDME).
60    flow_ctrl_waker: Option<Waker>,
61}
62
63impl OpenStreamEnt {
64    /// Whether this stream is ready to send `msg`.
65    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
66        self.flow_ctrl.can_send(msg)
67    }
68
69    /// Handle an incoming sendme.
70    ///
71    /// On failure, return an error: the caller should close the stream or
72    /// circuit with a protocol error.
73    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
74        self.flow_ctrl.put_for_incoming_sendme(msg)?;
75        // Wake the stream if it was blocked on flow control.
76        if let Some(waker) = self.flow_ctrl_waker.take() {
77            waker.wake();
78        }
79        Ok(())
80    }
81
82    /// The approximate number of stream inbound data bytes buffered.
83    fn approx_stream_bytes_buffered(&self) -> usize {
84        // NOTE: Here we want to know the total number of buffered incoming stream data bytes. We
85        // have access to the `StreamQueueSender` and can get how many bytes are buffered in that
86        // queue.
87        // But this isn't always the total number of buffered bytes since some bytes might be
88        // buffered outside of this queue.
89        // For example `DataReaderImpl` stores some stream bytes in its `pending` buffer, and we
90        // have no way to access that from here in the reactor. So it's impossible to know the total
91        // number of incoming stream data bytes that are buffered.
92        //
93        // This isn't really an issue in practice since *most* of the bytes will be queued in the
94        // `StreamQueueSender`, the XOFF threshold is very large, and we don't need to be exact.
95        self.sink.approx_stream_bytes()
96    }
97
98    /// Check if we should send an XON message.
99    ///
100    /// If we should, then returns the XON message that should be sent.
101    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
102    pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
103        self.flow_ctrl
104            .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
105    }
106
107    /// Check if we should send an XOFF message.
108    ///
109    /// If we should, then returns the XOFF message that should be sent.
110    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
111    pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
112        self.flow_ctrl
113            .maybe_send_xoff(self.approx_stream_bytes_buffered())
114    }
115
116    /// Handle an incoming XON message.
117    ///
118    /// On failure, return an error: the caller should close the stream or
119    /// circuit with a protocol error.
120    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
121        self.flow_ctrl.handle_incoming_xon(msg)
122    }
123
124    /// Handle an incoming XOFF message.
125    ///
126    /// On failure, return an error: the caller should close the stream or
127    /// circuit with a protocol error.
128    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
129        self.flow_ctrl.handle_incoming_xoff(msg)
130    }
131
132    /// Inform the flow control code that we're about to send `msg`.
133    /// Should be called at the point we've fully committed to sending the message.
134    /// Returns an error if we can't send `msg` and should close the circuit.
135    //
136    // TODO: Consider not exposing this, and instead calling in
137    // `StreamMap::take_ready_msg`.
138    pub(crate) fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
139        self.flow_ctrl.about_to_send(msg)
140    }
141}
142
143/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
144/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
145/// can't directly access the stream.
146#[derive(Debug)]
147#[pin_project]
148struct OpenStreamEntStream {
149    /// Inner value.
150    #[pin]
151    inner: OpenStreamEnt,
152}
153
154impl futures::Stream for OpenStreamEntStream {
155    type Item = AnyRelayMsg;
156
157    fn poll_next(
158        mut self: std::pin::Pin<&mut Self>,
159        cx: &mut std::task::Context<'_>,
160    ) -> Poll<Option<Self::Item>> {
161        if !self.as_mut().poll_peek_mut(cx).is_ready() {
162            return Poll::Pending;
163        };
164        let res = self.project().inner.project().rx.poll_next(cx);
165        debug_assert!(res.is_ready());
166        // TODO: consider calling `inner.flow_ctrl.about_to_send` here;
167        // particularly if we change it to return a wrapper type that proves
168        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
169        // to be sure we've correctly taken the capacity, since messages can originate
170        // in other parts of the code (currently none of those should be of types that
171        // count towards flow control, but that may change).
172        res
173    }
174}
175
176impl PeekableStream for OpenStreamEntStream {
177    fn poll_peek_mut(
178        self: Pin<&mut Self>,
179        cx: &mut std::task::Context<'_>,
180    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
181        let s = self.project();
182        let inner = s.inner.project();
183        let m = match inner.rx.poll_peek_mut(cx) {
184            Poll::Ready(Some(m)) => m,
185            Poll::Ready(None) => return Poll::Ready(None),
186            Poll::Pending => return Poll::Pending,
187        };
188        if !inner.flow_ctrl.can_send(m) {
189            inner.flow_ctrl_waker.replace(cx.waker().clone());
190            return Poll::Pending;
191        }
192        Poll::Ready(Some(m))
193    }
194}
195
196impl UnobtrusivePeekableStream for OpenStreamEntStream {
197    fn unobtrusive_peek_mut(
198        self: std::pin::Pin<&mut Self>,
199    ) -> Option<&mut <Self as futures::Stream>::Item> {
200        let s = self.project();
201        let inner = s.inner.project();
202        let m = inner.rx.unobtrusive_peek_mut()?;
203        if inner.flow_ctrl.can_send(m) {
204            Some(m)
205        } else {
206            None
207        }
208    }
209}
210
211/// Entry for a stream where we have sent an END, or other message
212/// indicating that the stream is terminated.
213#[derive(Debug)]
214pub(super) struct EndSentStreamEnt {
215    /// A "half-stream" that we use to check the validity of incoming
216    /// messages on this stream.
217    pub(super) half_stream: HalfStream,
218    /// True if the sender on this stream has been explicitly dropped;
219    /// false if we got an explicit close from `close_pending`
220    explicitly_dropped: bool,
221    /// When this entry should be removed from the stream map.
222    ///
223    /// This is the amount of time we are willing to wait for
224    /// an END ack before removing the half-stream from the map.
225    pub(super) expiry: Instant,
226}
227
228/// The entry for a stream.
229#[derive(Debug)]
230enum ClosedStreamEnt {
231    /// A stream for which we have received an END cell, but not yet
232    /// had the stream object get dropped.
233    EndReceived,
234    /// A stream for which we have sent an END cell but not yet received an END
235    /// cell.
236    ///
237    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
238    /// these?
239    EndSent(EndSentStreamEnt),
240}
241
242/// Mutable reference to a stream entry.
243pub(super) enum StreamEntMut<'a> {
244    /// An open stream.
245    Open(&'a mut OpenStreamEnt),
246    /// A stream for which we have received an END cell, but not yet
247    /// had the stream object get dropped.
248    EndReceived,
249    /// A stream for which we have sent an END cell but not yet received an END
250    /// cell.
251    EndSent(&'a mut EndSentStreamEnt),
252}
253
254impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
255    fn from(value: &'a mut ClosedStreamEnt) -> Self {
256        match value {
257            ClosedStreamEnt::EndReceived => Self::EndReceived,
258            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
259        }
260    }
261}
262
263impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
264    fn from(value: &'a mut OpenStreamEntStream) -> Self {
265        Self::Open(&mut value.inner)
266    }
267}
268
269/// Return value to indicate whether or not we send an END cell upon
270/// terminating a given stream.
271#[derive(Debug, Copy, Clone, Eq, PartialEq)]
272pub(super) enum ShouldSendEnd {
273    /// An END cell should be sent.
274    Send,
275    /// An END cell should not be sent.
276    DontSend,
277}
278
279/// A priority for use with [`StreamPollSet`].
280#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
281struct Priority(u64);
282
283/// A map from stream IDs to stream entries. Each circuit has one for each
284/// hop.
285pub(crate) struct StreamMap {
286    /// Open streams.
287    // Invariants:
288    // * Keys are disjoint with `closed_streams`.
289    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
290    /// Closed streams.
291    // Invariants:
292    // * Keys are disjoint with `open_streams`.
293    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
294    /// The next StreamId that we should use for a newly allocated
295    /// circuit.
296    next_stream_id: StreamId,
297    /// Next priority to use in `open_streams`. We implement round-robin scheduling of
298    /// handling outgoing messages from streams by assigning a stream the next
299    /// priority whenever an outgoing message is processed from that stream,
300    /// putting it last in line.
301    next_priority: Priority,
302}
303
304impl StreamMap {
305    /// Make a new empty StreamMap.
306    pub(crate) fn new() -> Self {
307        let mut rng = rand::rng();
308        let next_stream_id: NonZeroU16 = rng.random();
309        StreamMap {
310            open_streams: StreamPollSet::new(),
311            closed_streams: HashMap::new(),
312            next_stream_id: next_stream_id.into(),
313            next_priority: Priority(0),
314        }
315    }
316
317    /// Return the number of open streams in this map.
318    pub(super) fn n_open_streams(&self) -> usize {
319        self.open_streams.len()
320    }
321
322    /// Return a [`TunnelActivity`](crate::util::tunnel_activity::TunnelActivity) for this hop.
323    pub(super) fn tunnel_activity(&self) -> crate::util::tunnel_activity::TunnelActivity {
324        self.open_streams.tunnel_activity()
325    }
326
327    /// Return the next available priority.
328    fn take_next_priority(&mut self) -> Priority {
329        let rv = self.next_priority;
330        self.next_priority = Priority(rv.0 + 1);
331        rv
332    }
333
334    /// Add an entry to this map; return the newly allocated StreamId.
335    pub(super) fn add_ent(
336        &mut self,
337        sink: StreamQueueSender,
338        rx: StreamMpscReceiver<AnyRelayMsg>,
339        flow_ctrl: StreamFlowCtrl,
340        cmd_checker: AnyCmdChecker,
341    ) -> Result<StreamId> {
342        let mut stream_ent = OpenStreamEntStream {
343            inner: OpenStreamEnt {
344                sink,
345                flow_ctrl,
346                dropped: 0,
347                cmd_checker,
348                rx: StreamUnobtrusivePeeker::new(rx),
349                flow_ctrl_waker: None,
350            },
351        };
352        let priority = self.take_next_priority();
353        // This "65536" seems too aggressive, but it's what tor does.
354        //
355        // Also, going around in a loop here is (sadly) needed in order
356        // to look like Tor clients.
357        for _ in 1..=65536 {
358            let id: StreamId = self.next_stream_id;
359            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
360            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
361                Ok(_) => return Ok(id),
362                Err(KeyAlreadyInsertedError {
363                    key: _,
364                    priority: _,
365                    stream,
366                }) => stream,
367            };
368        }
369
370        Err(Error::IdRangeFull)
371    }
372
373    /// Add an entry to this map using the specified StreamId.
374    #[cfg(any(feature = "hs-service", feature = "relay"))]
375    pub(super) fn add_ent_with_id(
376        &mut self,
377        sink: StreamQueueSender,
378        rx: StreamMpscReceiver<AnyRelayMsg>,
379        flow_ctrl: StreamFlowCtrl,
380        id: StreamId,
381        cmd_checker: AnyCmdChecker,
382    ) -> Result<()> {
383        let stream_ent = OpenStreamEntStream {
384            inner: OpenStreamEnt {
385                sink,
386                flow_ctrl,
387                dropped: 0,
388                cmd_checker,
389                rx: StreamUnobtrusivePeeker::new(rx),
390                flow_ctrl_waker: None,
391            },
392        };
393        let priority = self.take_next_priority();
394        self.open_streams
395            .try_insert(id, priority, stream_ent)
396            .map_err(|_| Error::IdUnavailable(id))
397    }
398
399    /// Return the entry for `id` in this map, if any.
400    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
401        if let Some(e) = self.open_streams.stream_mut(&id) {
402            return Some(e.into());
403        }
404        if let Some(e) = self.closed_streams.get_mut(&id) {
405            return Some(e.into());
406        }
407        None
408    }
409
410    /// Note that we received an END message (or other message indicating the end of
411    /// the stream) on the stream with `id`.
412    ///
413    /// Returns true if there was really a stream there.
414    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
415        if self.open_streams.remove(&id).is_some() {
416            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
417            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
418            return Ok(());
419        }
420        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
421            return Err(Error::CircProto(
422                "Received END cell on nonexistent stream".into(),
423            ));
424        };
425        // Progress the stream's state machine accordingly
426        match closed_entry.get() {
427            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
428                "Received two END cells on same stream".into(),
429            )),
430            ClosedStreamEnt::EndSent { .. } => {
431                debug!("Actually got an end cell on a half-closed stream!");
432                // We got an END, and we already sent an END. Great!
433                // we can forget about this stream.
434                closed_entry.remove_entry();
435                Ok(())
436            }
437        }
438    }
439
440    /// Handle a termination of the stream with `id` from this side of
441    /// the circuit. Return true if the stream was open and an END
442    /// ought to be sent.
443    pub(super) fn terminate(
444        &mut self,
445        id: StreamId,
446        why: TerminateReason,
447        expiry: Instant,
448    ) -> Result<ShouldSendEnd> {
449        use TerminateReason as TR;
450
451        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
452            let OpenStreamEntStream {
453                inner:
454                    OpenStreamEnt {
455                        flow_ctrl,
456                        dropped,
457                        cmd_checker,
458                        // notably absent: the channels for sink and stream, which will get dropped and
459                        // closed (meaning reads/writes from/to this stream will now fail)
460                        ..
461                    },
462            } = ent;
463            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
464            //             so a malicious peer can send us slightly more data than they should
465            //             be able to; see arti#230.
466            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
467            recv_window.decrement_n(dropped)?;
468            // TODO: would be nice to avoid new_ref.
469            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
470            let explicitly_dropped = why == TR::StreamTargetClosed;
471
472            let prev = self.closed_streams.insert(
473                id,
474                ClosedStreamEnt::EndSent(EndSentStreamEnt {
475                    half_stream,
476                    explicitly_dropped,
477                    expiry,
478                }),
479            );
480            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
481            return Ok(ShouldSendEnd::Send);
482        }
483
484        // Progress the stream's state machine accordingly
485        match self
486            .closed_streams
487            .remove(&id)
488            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
489        {
490            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
491            ClosedStreamEnt::EndSent(EndSentStreamEnt {
492                ref mut explicitly_dropped,
493                ..
494            }) => match (*explicitly_dropped, why) {
495                (false, TR::StreamTargetClosed) => {
496                    *explicitly_dropped = true;
497                    Ok(ShouldSendEnd::DontSend)
498                }
499                (true, TR::StreamTargetClosed) => {
500                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
501                }
502                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
503                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
504                    *explicitly_dropped
505                )
506                .into()),
507            },
508        }
509    }
510
511    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
512    /// indicates that the local sender has been dropped.
513    ///
514    /// Conceptually all streams are in a queue; new streams are added to the
515    /// back of the queue, and a stream is sent to the back of the queue
516    /// whenever a ready message is taken from it (via
517    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
518    /// this queue, showing the subset of streams that have a message ready to
519    /// send, or whose sender has been dropped.
520    pub(super) fn poll_ready_streams_iter<'a>(
521        &'a mut self,
522        cx: &mut std::task::Context,
523    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a + use<'a> {
524        self.open_streams
525            .poll_ready_iter_mut(cx)
526            .map(|(sid, _priority, ent)| {
527                let ent = Pin::new(ent);
528                let msg = ent.unobtrusive_peek();
529                (*sid, msg)
530            })
531    }
532
533    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
534    /// to the "back of the line" with respect to
535    /// [`Self::poll_ready_streams_iter`].
536    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
537        let new_priority = self.take_next_priority();
538        let (_prev_priority, val) = self
539            .open_streams
540            .take_ready_value_and_reprioritize(&sid, new_priority)?;
541        Some(val)
542    }
543
544    /// Remove all halfstreams that are expired at `now`.
545    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
546        self.closed_streams.retain(|_sid, entry| match entry {
547            ClosedStreamEnt::EndReceived => true,
548            ClosedStreamEnt::EndSent(ent) => ent.expiry > now,
549        });
550    }
551}
552
553/// A reason for terminating a stream.
554///
555/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
556#[derive(Copy, Clone, Debug, PartialEq, Eq)]
557pub(super) enum TerminateReason {
558    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
559    /// corresponding senders were all dropped.
560    StreamTargetClosed,
561    /// Closing a stream because we were explicitly told to end it via
562    /// [`StreamTarget::close_pending`](crate::stream::StreamTarget::close_pending).
563    ExplicitEnd,
564}
565
566/// Convenience function for doing a wrapping increment of a `StreamId`.
567fn wrapping_next_stream_id(id: StreamId) -> StreamId {
568    let next_val = NonZeroU16::from(id)
569        .checked_add(1)
570        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
571    next_val.into()
572}
573
574#[cfg(test)]
575mod test {
576    // @@ begin test lint list maintained by maint/add_warning @@
577    #![allow(clippy::bool_assert_comparison)]
578    #![allow(clippy::clone_on_copy)]
579    #![allow(clippy::dbg_macro)]
580    #![allow(clippy::mixed_attributes_style)]
581    #![allow(clippy::print_stderr)]
582    #![allow(clippy::print_stdout)]
583    #![allow(clippy::single_char_pattern)]
584    #![allow(clippy::unwrap_used)]
585    #![allow(clippy::unchecked_time_subtraction)]
586    #![allow(clippy::useless_vec)]
587    #![allow(clippy::needless_pass_by_value)]
588    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
589    use super::*;
590    use crate::client::circuit::test::fake_mpsc;
591    use crate::stream::queue::fake_stream_queue;
592    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
593    use web_time_compat::InstantExt;
594
595    #[test]
596    fn test_wrapping_next_stream_id() {
597        let one = StreamId::new(1).unwrap();
598        let two = StreamId::new(2).unwrap();
599        let max = StreamId::new(0xffff).unwrap();
600        assert_eq!(wrapping_next_stream_id(one), two);
601        assert_eq!(wrapping_next_stream_id(max), one);
602    }
603
604    #[test]
605    #[allow(clippy::cognitive_complexity)]
606    fn streammap_basics() -> Result<()> {
607        let mut map = StreamMap::new();
608        let mut next_id = map.next_stream_id;
609        let mut ids = Vec::new();
610
611        assert_eq!(map.n_open_streams(), 0);
612
613        // Try add_ent
614        for n in 1..=128 {
615            let (sink, _) = fake_stream_queue(
616                #[cfg(not(feature = "flowctl-cc"))]
617                128,
618            );
619            let (_, rx) = fake_mpsc(2);
620            let id = map.add_ent(
621                sink,
622                rx,
623                StreamFlowCtrl::new_window(StreamSendWindow::new(500)),
624                OutboundDataCmdChecker::new_any(),
625            )?;
626            let expect_id: StreamId = next_id;
627            assert_eq!(expect_id, id);
628            next_id = wrapping_next_stream_id(next_id);
629            ids.push(id);
630            assert_eq!(map.n_open_streams(), n);
631        }
632
633        // Test get_mut.
634        let nonesuch_id = next_id;
635        assert!(matches!(
636            map.get_mut(ids[0]),
637            Some(StreamEntMut::Open { .. })
638        ));
639        assert!(map.get_mut(nonesuch_id).is_none());
640
641        // Test end_received
642        assert!(map.ending_msg_received(nonesuch_id).is_err());
643        assert_eq!(map.n_open_streams(), 128);
644        assert!(map.ending_msg_received(ids[1]).is_ok());
645        assert_eq!(map.n_open_streams(), 127);
646        assert!(matches!(
647            map.get_mut(ids[1]),
648            Some(StreamEntMut::EndReceived)
649        ));
650        assert!(map.ending_msg_received(ids[1]).is_err());
651
652        // Test terminate
653        use TerminateReason as TR;
654        let expiry = Instant::get(); // dummy value, unused outside of the reactor
655        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd, expiry).is_err());
656        assert_eq!(map.n_open_streams(), 127);
657        assert_eq!(
658            map.terminate(ids[2], TR::ExplicitEnd, expiry).unwrap(),
659            ShouldSendEnd::Send
660        );
661        assert_eq!(map.n_open_streams(), 126);
662        assert!(matches!(
663            map.get_mut(ids[2]),
664            Some(StreamEntMut::EndSent { .. })
665        ));
666        assert_eq!(
667            map.terminate(ids[1], TR::ExplicitEnd, expiry).unwrap(),
668            ShouldSendEnd::DontSend
669        );
670        // This stream was already closed when we called `ending_msg_received`
671        // above.
672        assert_eq!(map.n_open_streams(), 126);
673        assert!(map.get_mut(ids[1]).is_none());
674
675        // Try receiving an end after a terminate.
676        assert!(map.ending_msg_received(ids[2]).is_ok());
677        assert!(map.get_mut(ids[2]).is_none());
678        assert_eq!(map.n_open_streams(), 126);
679
680        Ok(())
681    }
682}