Skip to main content

tor_proto/util/
stream_poll_set.rs

1//! Provides [`StreamPollSet`]
2
3// So that we can declare these things as if they were in their own crate.
4#![allow(unreachable_pub)]
5
6use std::{
7    collections::{BTreeMap, HashMap, hash_map},
8    future::Future,
9    hash::Hash,
10    pin::Pin,
11    task::{Context, Poll, Waker},
12};
13
14use futures::{FutureExt, StreamExt as _};
15use tor_async_utils::peekable_stream::PeekableStream;
16
17use crate::util::{
18    keyed_futures_unordered::KeyedFuturesUnordered,
19    tunnel_activity::{InTunnelActivity, TunnelActivity},
20};
21
22/// A future that wraps a [`PeekableStream`], and yields the stream
23/// when an item becomes available.
24struct PeekableReady<S> {
25    /// The stream to be peeked.
26    stream: Option<S>,
27}
28
29impl<S> PeekableReady<S> {
30    /// Create a new [`PeekableReady`].
31    fn new(st: S) -> Self {
32        Self { stream: Some(st) }
33    }
34
35    /// Get a reference to the inner `S`.
36    ///
37    /// None if the future has already completed.
38    fn get_ref(&self) -> Option<&S> {
39        self.stream.as_ref()
40    }
41
42    /// Get a mut reference to the inner `S`.
43    ///
44    /// None if the future has already completed.
45    fn get_mut(&mut self) -> Option<&mut S> {
46        self.stream.as_mut()
47    }
48
49    /// Unwrap inner `S`.
50    ///
51    /// None if the future has already completed.
52    fn into_inner(self) -> Option<S> {
53        self.stream
54    }
55}
56
57impl<S> Future for PeekableReady<S>
58where
59    S: PeekableStream + Unpin,
60{
61    type Output = S;
62
63    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64        let Some(stream) = &mut self.stream else {
65            panic!("Polled completed future");
66        };
67        match Pin::new(stream).poll_peek(cx) {
68            Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
69            Poll::Pending => Poll::Pending,
70        }
71    }
72}
73
74/// Manages a dynamic set of [`futures::Stream`] with associated keys and
75/// priorities.
76///
77/// Notable features:
78///
79/// * Prioritization: streams have an associated priority, and ready-streams are
80///   iterated over in ascending priority order.
81/// * Efficient polling: an unready stream won't be polled again until it's
82///   ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
83///   dropped). A ready stream won't be polled again until the ready item has been
84///   removed.
85pub struct StreamPollSet<K, P, S>
86where
87    S: PeekableStream + Unpin,
88{
89    /// Priority for each stream in the set, and associated InTunnelActivity token.
90    // We keep the priority for each stream here instead of bundling it together
91    // with the stream, so that the priority can easily be changed even while a
92    // future waiting on the stream is still pending (e.g. to support rescaling
93    // priorities for EWMA).
94    // Invariants:
95    // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
96    priorities: HashMap<K, (P, InTunnelActivity)>,
97    /// Streams that have a result ready, in ascending order by priority.
98    // Invariants:
99    // * Keys are a (non-strict) subset of those in `priorities`.
100    ready_streams: BTreeMap<(P, K), S>,
101    /// Streams for which we're still waiting for the next result.
102    // Invariants:
103    // * Keys are a (non-strict) subset of those in `priorities`.
104    pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
105
106    /// Information about how active this particular hop has been,
107    /// with respect to tracking overall tunnel activity.
108    tunnel_activity: TunnelActivity,
109}
110
111impl<K, P, S> StreamPollSet<K, P, S>
112where
113    K: Ord + Hash + Clone + Send + Sync + 'static,
114    S: PeekableStream + Unpin,
115    P: Ord + Clone,
116{
117    /// Create a new, empty, `StreamPollSet`.
118    pub fn new() -> Self {
119        Self {
120            priorities: Default::default(),
121            ready_streams: Default::default(),
122            pending_streams: KeyedFuturesUnordered::new(),
123            tunnel_activity: TunnelActivity::never_used(),
124        }
125    }
126
127    /// Insert a `stream`, with an associated `key` and `priority`.
128    ///
129    /// If the `key` is already in use, the parameters are returned without altering `self`.
130    // To *replace* an existing key, we'd need to cancel any pending future and
131    // ensure that the cancellation is processed before inserting the new key, to
132    // ensure we don't assign a value from the previous key to the new key's
133    // stream.
134    pub fn try_insert(
135        &mut self,
136        key: K,
137        priority: P,
138        stream: S,
139    ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
140        let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
141            // We already have an entry for this key.
142            return Err(KeyAlreadyInsertedError {
143                key,
144                priority,
145                stream,
146            });
147        };
148        self.pending_streams
149            .try_insert(key, PeekableReady::new(stream))
150            // By `pending_streams` invariant that keys are a subset of those in
151            // `priorities`.
152            .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
153        let token = self.tunnel_activity.inc_streams();
154        v.insert((priority, token));
155        Ok(())
156    }
157
158    /// Remove the entry for `key`, if any. This is the key, priority, buffered
159    /// poll_next result, and stream.
160    pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
161        let (priority, token) = self.priorities.remove(key)?;
162        self.tunnel_activity.dec_streams(token);
163        if let Some((key, fut)) = self.pending_streams.remove(key) {
164            // Validate `priorities` invariant that keys are also present in exactly one of
165            // `pending_streams` and `ready_values`.
166            debug_assert!(
167                !self
168                    .ready_streams
169                    .contains_key(&(priority.clone(), key.clone()))
170            );
171            let stream = fut
172                .into_inner()
173                // We know the future hasn't completed, so the stream should be present.
174                .expect("Missing stream");
175            Some((key, priority, stream))
176        } else {
177            let ((_priority, key), stream) = self
178                .ready_streams
179                .remove_entry(&(priority.clone(), key.clone()))
180                // By
181                // * `pending_streams` invariant that keys are also present in
182                // exactly one of `pending_streams` and `ready_values`.
183                // * validated above that the key was in `pending_streams`, and
184                // not in `ready_values`.
185                .expect("Unexpectedly no value for key");
186            Some((key, priority, stream))
187        }
188    }
189
190    /// Polls streams that are ready to be polled, and returns an iterator over all streams
191    /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
192    ///
193    /// Registers the provided [`Context`] to be woken when
194    /// any of the internal streams that weren't ready in the previous call to
195    /// this method (and therefore wouldn't have appeared in the iterator
196    /// results) become potentially ready (based on when the inner stream wakes
197    /// the `Context` provided to its own `poll_next`).
198    ///
199    /// The same restrictions apply as for [`Self::stream_mut`].  e.g. do not
200    /// directly call [`PeekableStream::poll_peek`] to see what item is
201    /// available on the stream; instead use [`Self::peek_mut`]. (Or
202    /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
203    /// implemented for the stream).
204    ///
205    /// This method does *not* drain ready items. `Some` values can be removed
206    /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
207    /// be removed by removing the whole stream with [`Self::remove`].
208    ///
209    /// This API is meant to allow callers to find the first stream (in priority
210    /// order) that is ready, and that the caller is able to process now. i.e.
211    /// it's specifically to support the use-case where external factors may
212    /// prevent the processing of some streams but not others.
213    ///
214    /// Example:
215    ///
216    /// ```nocompile
217    /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
218    /// # // TODO: take away the nocompile if we make this pub or implement some
219    /// # // workaround to expose it to doc-tests.
220    /// # type Key=u64;
221    /// # type Value=u64;
222    /// # type Priority=u64;
223    /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
224    /// # fn can_process(key: &Key, val: &Value) -> bool { true }
225    /// # fn process(val: Value) { }
226    /// # fn new_priority(priority: &Priority) -> Priority { *priority }
227    /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
228    ///   let mut iter = sps.poll_ready_iter(cx);
229    ///   while let Some((key, priority, stream)) = iter.next() {
230    ///     let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
231    ///        // Stream exhausted. Remove the stream. We have to drop the iterator
232    ///        // first, though, so that we can mutate.
233    ///        let key = *key;
234    ///        drop(iter);
235    ///        sps.remove(&key).unwrap();
236    ///        return std::task::Poll::Ready(());
237    ///     };
238    ///     if can_process(key, value) {
239    ///        let key = *key;
240    ///        let priority = new_priority(priority);
241    ///        drop(iter);
242    ///        let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
243    ///        process(value);
244    ///        return std::task::Poll::Ready(());
245    ///     }
246    ///   }
247    ///   return std::task::Poll::Pending;
248    /// }
249    /// ```
250    // In the current implementation we *could* actually permit the caller to
251    // `poll_peek` a stream that we know is ready. But this may change as the
252    // impl evolves further, and it's probably better to blanket disallow it
253    // than to have complex rules for the caller about when it's ok.
254    //
255    // TODO: It would be nice if the returned iterator supported additional
256    // actions, e.g. allowing the user to consume the iterator and take and
257    // reprioritize the inner value, but this is tricky.
258    //
259    // I've sketched out a working "cursor" that holds the current position (K, P)
260    // and a &mut StreamPollSet. This can't implement the Iterator interface though
261    // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
262    // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
263    // we can't use the same technique again since the object would need a mut reference to the
264    // StreamMap *and* to this inner cursor object, which is illegal.
265    pub fn poll_ready_iter_mut<'a>(
266        &'a mut self,
267        cx: &mut Context,
268    ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a + use<'a, K, P, S> {
269        // First poll for ready streams
270        while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
271            let (priority, _) = self
272                .priorities
273                .get(&key)
274                // By `pending_streams` invariant that all keys are also in `priorities`.
275                .expect("Missing priority");
276            let prev = self.ready_streams.insert((priority.clone(), key), stream);
277            assert!(prev.is_none());
278        }
279        self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
280    }
281
282    /// If the stream for `key` has `Some(value)` ready, take that value and set the
283    /// priority for it to `new_priority`.
284    ///
285    /// This method doesn't register a waker with the polled stream. Use
286    /// `poll_ready_iter` to ensure streams make progress.
287    ///
288    /// If the key doesn't exist, the stream isn't ready, or the stream's value
289    /// is `None` (indicating the end of the stream), this function returns
290    /// `None` without mutating anything.
291    ///
292    /// Ended streams should be removed using [`Self::remove`].
293    pub fn take_ready_value_and_reprioritize(
294        &mut self,
295        key: &K,
296        new_priority: P,
297    ) -> Option<(P, S::Item)> {
298        // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
299        let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
300        else {
301            // Key isn't present at all.
302            return None;
303        };
304        let (priority_mut, _) = priority_entry.get_mut();
305        let Some(((_p, key), mut stream)) = self
306            .ready_streams
307            .remove_entry(&(priority_mut.clone(), key.clone()))
308        else {
309            // This stream isn't in the ready list.
310            return None;
311        };
312        match Pin::new(&mut stream).poll_peek(&mut Context::from_waker(Waker::noop())) {
313            Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
314            Poll::Ready(None) => {
315                // Stream is ready, but is terminated.
316                // Leave in place and return `None`.
317                return None;
318            }
319            Poll::Pending => {
320                // Stream wasn't actually ready, despite being on the ready
321                // list. This should be impossible by the stability guarantees
322                // of `PeekableStream` and our own internal logic, but we can
323                // recover.
324                tracing::error!("Bug: Stream unexpectedly unready");
325                self.pending_streams
326                    .try_insert(key.clone(), PeekableReady::new(stream))
327                    // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
328                    .unwrap_or_else(|_| {
329                        unreachable!("Key unexpectedly in both ready and unready list")
330                    });
331                return None;
332            }
333        }
334        let Some(Some(val)) = stream.next().now_or_never() else {
335            panic!("Polling stream returned a different result than peeking");
336        };
337        let prev_priority = std::mem::replace(priority_mut, new_priority);
338        self.pending_streams
339            .try_insert(key, PeekableReady::new(stream))
340            // We verified above that the key wasn't present in `priorities`,
341            // and `pending_streams` has the invariant that its keys are a
342            // subset of those in `priorities`.
343            .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
344        Some((prev_priority, val))
345    }
346
347    /// Get a mut reference to a ready value for key `key`, if one exists.
348    ///
349    /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
350    /// ensure streams make progress.
351    // This will be used for packing and fragmentation, to take part of a DATA message.
352    #[allow(unused)]
353    pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
354        let (priority, _) = self.priorities.get(key)?;
355        let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
356            return Some(Poll::Pending);
357        };
358        // We don't have a waker registered here, so we can just use the noop waker.
359        // TODO: Create a mut future for `PeekableStream`.
360        Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(Waker::noop())))
361    }
362
363    /// Get a reference to the stream for `key`.
364    ///
365    /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
366    /// interior mutability).
367    #[allow(dead_code)]
368    pub fn stream(&self, key: &K) -> Option<&S> {
369        if let Some(s) = self.pending_streams.get(key) {
370            let s = s.get_ref();
371            // Stream must be present since it's still pending.
372            debug_assert!(s.is_some(), "Unexpected missing pending stream");
373            return s;
374        }
375        let (priority, _) = self.priorities.get(key)?;
376        self.ready_streams.get(&(priority.clone(), key.clone()))
377    }
378
379    /// Get a mut reference to the stream for `key`.
380    ///
381    /// Polling the stream through this reference, or otherwise causing its
382    /// registered `Waker` to be removed without waking it, will result in
383    /// unspecified (but not unsound) behavior.
384    ///
385    /// This is mostly intended for accessing non-`Stream` functionality of the stream
386    /// object, though it *is* permitted to mutate it in a way that the stream becomes
387    /// ready (potentially removing and waking its registered Waker(s)).
388    //
389    // In particular:
390    // * Polling a stream in the pending list and getting a Pending result
391    //   will overwrite our Waker, resulting in us not polling it again.
392    // * Doing so with a stream on the pending list and getting a Ready result
393    //   might be ok if it had already woken our waker. Otoh it could potentially
394    //   result in our waker never getting woken, and hence us not polling it again.
395    // * Doing so with a stream on the ready list should actually be ok, since
396    //   we don't have a registered waker, and don't do our own buffering.
397    pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
398        if let Some(s) = self.pending_streams.get_mut(key) {
399            let s = s.get_mut();
400            // Stream must be present since it's still pending.
401            debug_assert!(s.is_some(), "Unexpected missing pending stream");
402            return s;
403        }
404        let (priority, _) = self.priorities.get(key)?;
405        self.ready_streams.get_mut(&(priority.clone(), key.clone()))
406    }
407
408    /// Number of streams managed by this object.
409    pub fn len(&self) -> usize {
410        self.priorities.len()
411    }
412
413    /// Return a `TunnelActivity` for this hop.
414    pub fn tunnel_activity(&self) -> TunnelActivity {
415        assert_eq!(self.len(), self.tunnel_activity.n_open_streams());
416        self.tunnel_activity
417    }
418}
419
420impl<K, P, S> Drop for StreamPollSet<K, P, S>
421where
422    S: PeekableStream + Unpin,
423{
424    fn drop(&mut self) {
425        self.priorities
426            .drain()
427            .for_each(|(_key, (_prio, token))| token.consume_and_forget());
428    }
429}
430
431/// Error returned by [`StreamPollSet::try_insert`].
432#[derive(Debug, thiserror::Error)]
433#[allow(clippy::exhaustive_structs)]
434pub struct KeyAlreadyInsertedError<K, P, S> {
435    /// Key that caller tried to insert.
436    #[allow(dead_code)]
437    pub key: K,
438    /// Priority that caller tried to insert.
439    #[allow(dead_code)]
440    pub priority: P,
441    /// Stream that caller tried to insert.
442    #[allow(dead_code)]
443    pub stream: S,
444}
445
446#[cfg(test)]
447mod test {
448    // @@ begin test lint list maintained by maint/add_warning @@
449    #![allow(clippy::bool_assert_comparison)]
450    #![allow(clippy::clone_on_copy)]
451    #![allow(clippy::dbg_macro)]
452    #![allow(clippy::mixed_attributes_style)]
453    #![allow(clippy::print_stderr)]
454    #![allow(clippy::print_stdout)]
455    #![allow(clippy::single_char_pattern)]
456    #![allow(clippy::unwrap_used)]
457    #![allow(clippy::unchecked_time_subtraction)]
458    #![allow(clippy::useless_vec)]
459    #![allow(clippy::needless_pass_by_value)]
460    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
461
462    use std::{
463        collections::VecDeque,
464        sync::{Arc, Mutex},
465        task::Poll,
466    };
467
468    use futures::{SinkExt as _, stream::Peekable};
469    use pin_project::pin_project;
470    use tor_rtmock::MockRuntime;
471
472    use super::*;
473
474    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
475    struct Key(u64);
476
477    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
478    struct Priority(u64);
479
480    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
481    struct Value(u64);
482
483    /// Test stream that we can directly manipulate and examine.
484    #[derive(Debug)]
485    #[pin_project]
486    struct VecDequeStream<T> {
487        // Ready items.
488        vec: VecDeque<T>,
489        // Whether any more items will be written.
490        closed: bool,
491        // Registered waker.
492        waker: Option<std::task::Waker>,
493    }
494    impl<T> VecDequeStream<T> {
495        fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
496            Self {
497                vec: VecDeque::from_iter(values),
498                waker: None,
499                closed: false,
500            }
501        }
502        fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
503            Self {
504                vec: VecDeque::from_iter(values),
505                waker: None,
506                closed: true,
507            }
508        }
509        fn push(&mut self, value: T) {
510            assert!(!self.closed);
511            self.vec.push_back(value);
512            if let Some(waker) = self.waker.take() {
513                waker.wake();
514            }
515        }
516    }
517    impl<T> futures::Stream for VecDequeStream<T> {
518        type Item = T;
519
520        fn poll_next(
521            mut self: std::pin::Pin<&mut Self>,
522            cx: &mut std::task::Context<'_>,
523        ) -> Poll<Option<Self::Item>> {
524            if let Some(val) = self.as_mut().vec.pop_front() {
525                Poll::Ready(Some(val))
526            } else if self.as_mut().closed {
527                // No more items coming.
528                Poll::Ready(None)
529            } else {
530                self.as_mut().waker.replace(cx.waker().clone());
531                Poll::Pending
532            }
533        }
534    }
535    impl<T> PeekableStream for VecDequeStream<T> {
536        fn poll_peek_mut(
537            self: Pin<&mut Self>,
538            cx: &mut Context<'_>,
539        ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
540            let s = self.project();
541            if let Some(val) = s.vec.front_mut() {
542                Poll::Ready(Some(val))
543            } else if *s.closed {
544                // No more items coming.
545                Poll::Ready(None)
546            } else {
547                s.waker.replace(cx.waker().clone());
548                Poll::Pending
549            }
550        }
551    }
552    impl<T> std::cmp::PartialEq for VecDequeStream<T>
553    where
554        T: std::cmp::PartialEq,
555    {
556        fn eq(&self, other: &Self) -> bool {
557            // Ignore waker, which isn't comparable
558            self.vec == other.vec && self.closed == other.closed
559        }
560    }
561    impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
562
563    type TestStream = VecDequeStream<Value>;
564
565    #[test]
566    fn test_empty() {
567        futures::executor::block_on(futures::future::poll_fn(|ctx| {
568            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
569            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
570            Poll::Ready(())
571        }));
572    }
573
574    #[test]
575    fn test_one_pending() {
576        futures::executor::block_on(futures::future::poll_fn(|ctx| {
577            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
578            pollset
579                .try_insert(Key(0), Priority(0), TestStream::new_open([]))
580                .unwrap();
581            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
582            Poll::Ready(())
583        }));
584    }
585
586    #[test]
587    fn test_one_ready() {
588        futures::executor::block_on(futures::future::poll_fn(|ctx| {
589            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
590            pollset
591                .try_insert(
592                    Key(0),
593                    Priority(0),
594                    TestStream::new_closed([Value(1), Value(2)]),
595                )
596                .unwrap();
597
598            // We only see the first value of the one ready stream.
599            assert_eq!(
600                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
601                vec![(
602                    &Key(0),
603                    &Priority(0),
604                    &mut TestStream::new_closed([Value(1), Value(2)])
605                )],
606            );
607
608            // Same result, the same value is still at the head of the stream..
609            assert_eq!(
610                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
611                vec![(
612                    &Key(0),
613                    &Priority(0),
614                    &mut TestStream::new_closed([Value(1), Value(2)])
615                )]
616            );
617
618            // Take the head of the stream.
619            assert_eq!(
620                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
621                Some((Priority(0), Value(1)))
622            );
623
624            // Should see the next value, with the new priority.
625            assert_eq!(
626                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
627                vec![(
628                    &Key(0),
629                    &Priority(1),
630                    &mut TestStream::new_closed([Value(2)])
631                )]
632            );
633
634            // Take again.
635            assert_eq!(
636                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
637                Some((Priority(1), Value(2)))
638            );
639
640            // Should see end-of-stream.
641            assert_eq!(
642                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
643                vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
644            );
645
646            // Remove the now-ended stream.
647            assert_eq!(
648                pollset.remove(&Key(0)),
649                Some((Key(0), Priority(2), TestStream::new_closed([])))
650            );
651
652            // Should now be empty.
653            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
654
655            Poll::Ready(())
656        }));
657    }
658
659    #[test]
660    fn test_round_robin() {
661        futures::executor::block_on(futures::future::poll_fn(|ctx| {
662            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
663            pollset
664                .try_insert(
665                    Key(0),
666                    Priority(0),
667                    TestStream::new_closed([Value(1), Value(2)]),
668                )
669                .unwrap();
670            pollset
671                .try_insert(
672                    Key(1),
673                    Priority(1),
674                    TestStream::new_closed([Value(3), Value(4)]),
675                )
676                .unwrap();
677
678            // Should see both ready streams, in priority order.
679            assert_eq!(
680                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
681                vec![
682                    (
683                        &Key(0),
684                        &Priority(0),
685                        &mut TestStream::new_closed([Value(1), Value(2)])
686                    ),
687                    (
688                        &Key(1),
689                        &Priority(1),
690                        &mut TestStream::new_closed([Value(3), Value(4)])
691                    ),
692                ]
693            );
694
695            // Take from the first stream and send it to the back via priority assignment.
696            assert_eq!(
697                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
698                Some((Priority(0), Value(1)))
699            );
700
701            // Should see both ready streams, in the new priority order.
702            assert_eq!(
703                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
704                vec![
705                    (
706                        &Key(1),
707                        &Priority(1),
708                        &mut TestStream::new_closed([Value(3), Value(4)])
709                    ),
710                    (
711                        &Key(0),
712                        &Priority(2),
713                        &mut TestStream::new_closed([Value(2)])
714                    ),
715                ]
716            );
717
718            // Keep going ...
719            assert_eq!(
720                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
721                Some((Priority(1), Value(3)))
722            );
723            assert_eq!(
724                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
725                vec![
726                    (
727                        &Key(0),
728                        &Priority(2),
729                        &mut TestStream::new_closed([Value(2)])
730                    ),
731                    (
732                        &Key(1),
733                        &Priority(3),
734                        &mut TestStream::new_closed([Value(4)])
735                    ),
736                ]
737            );
738            assert_eq!(
739                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
740                Some((Priority(2), Value(2)))
741            );
742            assert_eq!(
743                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
744                vec![
745                    (
746                        &Key(1),
747                        &Priority(3),
748                        &mut TestStream::new_closed([Value(4)])
749                    ),
750                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
751                ]
752            );
753            assert_eq!(
754                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
755                Some((Priority(3), Value(4)))
756            );
757            assert_eq!(
758                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
759                vec![
760                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
761                    (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
762                ]
763            );
764
765            Poll::Ready(())
766        }));
767    }
768
769    #[test]
770    fn test_remove_and_reuse_key() {
771        futures::executor::block_on(futures::future::poll_fn(|ctx| {
772            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
773            pollset
774                .try_insert(
775                    Key(0),
776                    Priority(0),
777                    TestStream::new_closed([Value(1), Value(2)]),
778                )
779                .unwrap();
780            assert_eq!(
781                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
782                vec![(
783                    &Key(0),
784                    &Priority(0),
785                    &mut TestStream::new_closed([Value(1), Value(2)])
786                ),]
787            );
788            assert_eq!(
789                pollset.remove(&Key(0)),
790                Some((
791                    Key(0),
792                    Priority(0),
793                    TestStream::new_closed([Value(1), Value(2)])
794                ))
795            );
796            pollset
797                .try_insert(
798                    Key(0),
799                    Priority(1),
800                    TestStream::new_closed([Value(3), Value(4)]),
801                )
802                .unwrap();
803            // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
804            assert_eq!(
805                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
806                vec![(
807                    &Key(0),
808                    &Priority(1),
809                    &mut TestStream::new_closed([Value(3), Value(4)])
810                ),]
811            );
812            Poll::Ready(())
813        }));
814    }
815
816    #[test]
817    fn get_ready_stream() {
818        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
819            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
820            pollset
821                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
822                .unwrap();
823            assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
824            Poll::Ready(())
825        }));
826    }
827
828    #[test]
829    fn get_pending_stream() {
830        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
831            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
832            pollset
833                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
834                .unwrap();
835            assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
836            Poll::Ready(())
837        }));
838    }
839
840    #[test]
841    fn mutate_pending_stream() {
842        futures::executor::block_on(futures::future::poll_fn(|ctx| {
843            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
844            pollset
845                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
846                .unwrap();
847            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
848
849            // This should cause the stream to become ready.
850            pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
851
852            assert_eq!(
853                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
854                vec![(
855                    &Key(0),
856                    &Priority(0),
857                    &mut VecDequeStream::new_open([Value(0)])
858                ),]
859            );
860
861            Poll::Ready(())
862        }));
863    }
864
865    #[test]
866    fn mutate_ready_stream() {
867        futures::executor::block_on(futures::future::poll_fn(|ctx| {
868            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
869            pollset
870                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
871                .unwrap();
872            assert_eq!(
873                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
874                vec![(
875                    &Key(0),
876                    &Priority(0),
877                    &mut VecDequeStream::new_open([Value(0)])
878                ),]
879            );
880
881            pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
882
883            assert_eq!(
884                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
885                vec![(
886                    &Key(0),
887                    &Priority(0),
888                    &mut VecDequeStream::new_open([Value(0), Value(1)])
889                ),]
890            );
891
892            // Consume the value that was there.
893            assert_eq!(
894                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
895                Some((Priority(0), Value(0)))
896            );
897
898            // We should now see the value we added.
899            assert_eq!(
900                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
901                vec![(
902                    &Key(0),
903                    &Priority(0),
904                    &mut VecDequeStream::new_open([Value(1)])
905                ),]
906            );
907
908            Poll::Ready(())
909        }));
910    }
911
912    #[test]
913    fn test_async() {
914        MockRuntime::test_with_various(|rt| async move {
915            let mut pollset = StreamPollSet::<
916                Key,
917                Priority,
918                Peekable<futures::channel::mpsc::Receiver<Value>>,
919            >::new();
920
921            // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
922            // These are analogous to Tor streams.
923            for streami in 1..=2 {
924                let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
925                pollset
926                    .try_insert(Key(streami), Priority(streami), recv.peekable())
927                    .unwrap();
928                rt.spawn_identified(format!("stream{streami}"), async move {
929                    for val in 0..10 {
930                        send.send(Value(val * streami)).await.unwrap();
931                    }
932                });
933            }
934
935            let output = Arc::new(Mutex::new(Vec::new()));
936
937            rt.spawn_identified("mux", {
938                let output = output.clone();
939                async move {
940                    loop {
941                        let (key, priority, value) = futures::future::poll_fn(|ctx| {
942                            match pollset.poll_ready_iter_mut(ctx).next() {
943                                Some((key, priority, stream)) => {
944                                    let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
945                                        panic!("poll_ready_iter_mut returned non-ready stream")
946                                    };
947                                    Poll::Ready((*key, *priority, value.copied()))
948                                }
949                                // No streams ready, but there could be more items coming.
950                                // The current `ctx` should be registered to wake us
951                                // if and when there are.
952                                None => Poll::Pending,
953                            }
954                        })
955                        .await;
956                        if let Some(value) = value {
957                            // Take the value, and haphazardly set priority to push this stream "back".
958                            pollset
959                                .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
960                                .unwrap();
961                            output.lock().unwrap().push((key, value));
962                        } else {
963                            // Stream ended. Remove it.
964                            let _ = pollset.remove(&key).unwrap();
965                        }
966                    }
967                }
968            });
969
970            rt.advance_until_stalled().await;
971
972            let output = output.lock().unwrap();
973
974            // We can't predict exactly how the stream values will be
975            // interleaved, but we should get all items from each stream, with
976            // correct order within each stream.
977            for streami in 1..=2 {
978                let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
979                let actual = output
980                    .iter()
981                    .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
982                    .collect::<Vec<_>>();
983                assert_eq!(actual, expected);
984            }
985        });
986    }
987}