tor_proto/util/stream_poll_set.rs
1//! Provides [`StreamPollSet`]
2
3// So that we can declare these things as if they were in their own crate.
4#![allow(unreachable_pub)]
5
6use std::{
7 collections::{BTreeMap, HashMap, hash_map},
8 future::Future,
9 hash::Hash,
10 pin::Pin,
11 task::{Context, Poll, Waker},
12};
13
14use futures::{FutureExt, StreamExt as _};
15use tor_async_utils::peekable_stream::PeekableStream;
16
17use crate::util::{
18 keyed_futures_unordered::KeyedFuturesUnordered,
19 tunnel_activity::{InTunnelActivity, TunnelActivity},
20};
21
22/// A future that wraps a [`PeekableStream`], and yields the stream
23/// when an item becomes available.
24struct PeekableReady<S> {
25 /// The stream to be peeked.
26 stream: Option<S>,
27}
28
29impl<S> PeekableReady<S> {
30 /// Create a new [`PeekableReady`].
31 fn new(st: S) -> Self {
32 Self { stream: Some(st) }
33 }
34
35 /// Get a reference to the inner `S`.
36 ///
37 /// None if the future has already completed.
38 fn get_ref(&self) -> Option<&S> {
39 self.stream.as_ref()
40 }
41
42 /// Get a mut reference to the inner `S`.
43 ///
44 /// None if the future has already completed.
45 fn get_mut(&mut self) -> Option<&mut S> {
46 self.stream.as_mut()
47 }
48
49 /// Unwrap inner `S`.
50 ///
51 /// None if the future has already completed.
52 fn into_inner(self) -> Option<S> {
53 self.stream
54 }
55}
56
57impl<S> Future for PeekableReady<S>
58where
59 S: PeekableStream + Unpin,
60{
61 type Output = S;
62
63 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64 let Some(stream) = &mut self.stream else {
65 panic!("Polled completed future");
66 };
67 match Pin::new(stream).poll_peek(cx) {
68 Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
69 Poll::Pending => Poll::Pending,
70 }
71 }
72}
73
74/// Manages a dynamic set of [`futures::Stream`] with associated keys and
75/// priorities.
76///
77/// Notable features:
78///
79/// * Prioritization: streams have an associated priority, and ready-streams are
80/// iterated over in ascending priority order.
81/// * Efficient polling: an unready stream won't be polled again until it's
82/// ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
83/// dropped). A ready stream won't be polled again until the ready item has been
84/// removed.
85pub struct StreamPollSet<K, P, S>
86where
87 S: PeekableStream + Unpin,
88{
89 /// Priority for each stream in the set, and associated InTunnelActivity token.
90 // We keep the priority for each stream here instead of bundling it together
91 // with the stream, so that the priority can easily be changed even while a
92 // future waiting on the stream is still pending (e.g. to support rescaling
93 // priorities for EWMA).
94 // Invariants:
95 // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
96 priorities: HashMap<K, (P, InTunnelActivity)>,
97 /// Streams that have a result ready, in ascending order by priority.
98 // Invariants:
99 // * Keys are a (non-strict) subset of those in `priorities`.
100 ready_streams: BTreeMap<(P, K), S>,
101 /// Streams for which we're still waiting for the next result.
102 // Invariants:
103 // * Keys are a (non-strict) subset of those in `priorities`.
104 pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
105
106 /// Information about how active this particular hop has been,
107 /// with respect to tracking overall tunnel activity.
108 tunnel_activity: TunnelActivity,
109}
110
111impl<K, P, S> StreamPollSet<K, P, S>
112where
113 K: Ord + Hash + Clone + Send + Sync + 'static,
114 S: PeekableStream + Unpin,
115 P: Ord + Clone,
116{
117 /// Create a new, empty, `StreamPollSet`.
118 pub fn new() -> Self {
119 Self {
120 priorities: Default::default(),
121 ready_streams: Default::default(),
122 pending_streams: KeyedFuturesUnordered::new(),
123 tunnel_activity: TunnelActivity::never_used(),
124 }
125 }
126
127 /// Insert a `stream`, with an associated `key` and `priority`.
128 ///
129 /// If the `key` is already in use, the parameters are returned without altering `self`.
130 // To *replace* an existing key, we'd need to cancel any pending future and
131 // ensure that the cancellation is processed before inserting the new key, to
132 // ensure we don't assign a value from the previous key to the new key's
133 // stream.
134 pub fn try_insert(
135 &mut self,
136 key: K,
137 priority: P,
138 stream: S,
139 ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
140 let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
141 // We already have an entry for this key.
142 return Err(KeyAlreadyInsertedError {
143 key,
144 priority,
145 stream,
146 });
147 };
148 self.pending_streams
149 .try_insert(key, PeekableReady::new(stream))
150 // By `pending_streams` invariant that keys are a subset of those in
151 // `priorities`.
152 .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
153 let token = self.tunnel_activity.inc_streams();
154 v.insert((priority, token));
155 Ok(())
156 }
157
158 /// Remove the entry for `key`, if any. This is the key, priority, buffered
159 /// poll_next result, and stream.
160 pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
161 let (priority, token) = self.priorities.remove(key)?;
162 self.tunnel_activity.dec_streams(token);
163 if let Some((key, fut)) = self.pending_streams.remove(key) {
164 // Validate `priorities` invariant that keys are also present in exactly one of
165 // `pending_streams` and `ready_values`.
166 debug_assert!(
167 !self
168 .ready_streams
169 .contains_key(&(priority.clone(), key.clone()))
170 );
171 let stream = fut
172 .into_inner()
173 // We know the future hasn't completed, so the stream should be present.
174 .expect("Missing stream");
175 Some((key, priority, stream))
176 } else {
177 let ((_priority, key), stream) = self
178 .ready_streams
179 .remove_entry(&(priority.clone(), key.clone()))
180 // By
181 // * `pending_streams` invariant that keys are also present in
182 // exactly one of `pending_streams` and `ready_values`.
183 // * validated above that the key was in `pending_streams`, and
184 // not in `ready_values`.
185 .expect("Unexpectedly no value for key");
186 Some((key, priority, stream))
187 }
188 }
189
190 /// Polls streams that are ready to be polled, and returns an iterator over all streams
191 /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
192 ///
193 /// Registers the provided [`Context`] to be woken when
194 /// any of the internal streams that weren't ready in the previous call to
195 /// this method (and therefore wouldn't have appeared in the iterator
196 /// results) become potentially ready (based on when the inner stream wakes
197 /// the `Context` provided to its own `poll_next`).
198 ///
199 /// The same restrictions apply as for [`Self::stream_mut`]. e.g. do not
200 /// directly call [`PeekableStream::poll_peek`] to see what item is
201 /// available on the stream; instead use [`Self::peek_mut`]. (Or
202 /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
203 /// implemented for the stream).
204 ///
205 /// This method does *not* drain ready items. `Some` values can be removed
206 /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
207 /// be removed by removing the whole stream with [`Self::remove`].
208 ///
209 /// This API is meant to allow callers to find the first stream (in priority
210 /// order) that is ready, and that the caller is able to process now. i.e.
211 /// it's specifically to support the use-case where external factors may
212 /// prevent the processing of some streams but not others.
213 ///
214 /// Example:
215 ///
216 /// ```nocompile
217 /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
218 /// # // TODO: take away the nocompile if we make this pub or implement some
219 /// # // workaround to expose it to doc-tests.
220 /// # type Key=u64;
221 /// # type Value=u64;
222 /// # type Priority=u64;
223 /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
224 /// # fn can_process(key: &Key, val: &Value) -> bool { true }
225 /// # fn process(val: Value) { }
226 /// # fn new_priority(priority: &Priority) -> Priority { *priority }
227 /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
228 /// let mut iter = sps.poll_ready_iter(cx);
229 /// while let Some((key, priority, stream)) = iter.next() {
230 /// let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
231 /// // Stream exhausted. Remove the stream. We have to drop the iterator
232 /// // first, though, so that we can mutate.
233 /// let key = *key;
234 /// drop(iter);
235 /// sps.remove(&key).unwrap();
236 /// return std::task::Poll::Ready(());
237 /// };
238 /// if can_process(key, value) {
239 /// let key = *key;
240 /// let priority = new_priority(priority);
241 /// drop(iter);
242 /// let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
243 /// process(value);
244 /// return std::task::Poll::Ready(());
245 /// }
246 /// }
247 /// return std::task::Poll::Pending;
248 /// }
249 /// ```
250 // In the current implementation we *could* actually permit the caller to
251 // `poll_peek` a stream that we know is ready. But this may change as the
252 // impl evolves further, and it's probably better to blanket disallow it
253 // than to have complex rules for the caller about when it's ok.
254 //
255 // TODO: It would be nice if the returned iterator supported additional
256 // actions, e.g. allowing the user to consume the iterator and take and
257 // reprioritize the inner value, but this is tricky.
258 //
259 // I've sketched out a working "cursor" that holds the current position (K, P)
260 // and a &mut StreamPollSet. This can't implement the Iterator interface though
261 // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
262 // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
263 // we can't use the same technique again since the object would need a mut reference to the
264 // StreamMap *and* to this inner cursor object, which is illegal.
265 pub fn poll_ready_iter_mut<'a>(
266 &'a mut self,
267 cx: &mut Context,
268 ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a + use<'a, K, P, S> {
269 // First poll for ready streams
270 while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
271 let (priority, _) = self
272 .priorities
273 .get(&key)
274 // By `pending_streams` invariant that all keys are also in `priorities`.
275 .expect("Missing priority");
276 let prev = self.ready_streams.insert((priority.clone(), key), stream);
277 assert!(prev.is_none());
278 }
279 self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
280 }
281
282 /// If the stream for `key` has `Some(value)` ready, take that value and set the
283 /// priority for it to `new_priority`.
284 ///
285 /// This method doesn't register a waker with the polled stream. Use
286 /// `poll_ready_iter` to ensure streams make progress.
287 ///
288 /// If the key doesn't exist, the stream isn't ready, or the stream's value
289 /// is `None` (indicating the end of the stream), this function returns
290 /// `None` without mutating anything.
291 ///
292 /// Ended streams should be removed using [`Self::remove`].
293 pub fn take_ready_value_and_reprioritize(
294 &mut self,
295 key: &K,
296 new_priority: P,
297 ) -> Option<(P, S::Item)> {
298 // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
299 let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
300 else {
301 // Key isn't present at all.
302 return None;
303 };
304 let (priority_mut, _) = priority_entry.get_mut();
305 let Some(((_p, key), mut stream)) = self
306 .ready_streams
307 .remove_entry(&(priority_mut.clone(), key.clone()))
308 else {
309 // This stream isn't in the ready list.
310 return None;
311 };
312 match Pin::new(&mut stream).poll_peek(&mut Context::from_waker(Waker::noop())) {
313 Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
314 Poll::Ready(None) => {
315 // Stream is ready, but is terminated.
316 // Leave in place and return `None`.
317 return None;
318 }
319 Poll::Pending => {
320 // Stream wasn't actually ready, despite being on the ready
321 // list. This should be impossible by the stability guarantees
322 // of `PeekableStream` and our own internal logic, but we can
323 // recover.
324 tracing::error!("Bug: Stream unexpectedly unready");
325 self.pending_streams
326 .try_insert(key.clone(), PeekableReady::new(stream))
327 // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
328 .unwrap_or_else(|_| {
329 unreachable!("Key unexpectedly in both ready and unready list")
330 });
331 return None;
332 }
333 }
334 let Some(Some(val)) = stream.next().now_or_never() else {
335 panic!("Polling stream returned a different result than peeking");
336 };
337 let prev_priority = std::mem::replace(priority_mut, new_priority);
338 self.pending_streams
339 .try_insert(key, PeekableReady::new(stream))
340 // We verified above that the key wasn't present in `priorities`,
341 // and `pending_streams` has the invariant that its keys are a
342 // subset of those in `priorities`.
343 .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
344 Some((prev_priority, val))
345 }
346
347 /// Get a mut reference to a ready value for key `key`, if one exists.
348 ///
349 /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
350 /// ensure streams make progress.
351 // This will be used for packing and fragmentation, to take part of a DATA message.
352 #[allow(unused)]
353 pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
354 let (priority, _) = self.priorities.get(key)?;
355 let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
356 return Some(Poll::Pending);
357 };
358 // We don't have a waker registered here, so we can just use the noop waker.
359 // TODO: Create a mut future for `PeekableStream`.
360 Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(Waker::noop())))
361 }
362
363 /// Get a reference to the stream for `key`.
364 ///
365 /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
366 /// interior mutability).
367 #[allow(dead_code)]
368 pub fn stream(&self, key: &K) -> Option<&S> {
369 if let Some(s) = self.pending_streams.get(key) {
370 let s = s.get_ref();
371 // Stream must be present since it's still pending.
372 debug_assert!(s.is_some(), "Unexpected missing pending stream");
373 return s;
374 }
375 let (priority, _) = self.priorities.get(key)?;
376 self.ready_streams.get(&(priority.clone(), key.clone()))
377 }
378
379 /// Get a mut reference to the stream for `key`.
380 ///
381 /// Polling the stream through this reference, or otherwise causing its
382 /// registered `Waker` to be removed without waking it, will result in
383 /// unspecified (but not unsound) behavior.
384 ///
385 /// This is mostly intended for accessing non-`Stream` functionality of the stream
386 /// object, though it *is* permitted to mutate it in a way that the stream becomes
387 /// ready (potentially removing and waking its registered Waker(s)).
388 //
389 // In particular:
390 // * Polling a stream in the pending list and getting a Pending result
391 // will overwrite our Waker, resulting in us not polling it again.
392 // * Doing so with a stream on the pending list and getting a Ready result
393 // might be ok if it had already woken our waker. Otoh it could potentially
394 // result in our waker never getting woken, and hence us not polling it again.
395 // * Doing so with a stream on the ready list should actually be ok, since
396 // we don't have a registered waker, and don't do our own buffering.
397 pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
398 if let Some(s) = self.pending_streams.get_mut(key) {
399 let s = s.get_mut();
400 // Stream must be present since it's still pending.
401 debug_assert!(s.is_some(), "Unexpected missing pending stream");
402 return s;
403 }
404 let (priority, _) = self.priorities.get(key)?;
405 self.ready_streams.get_mut(&(priority.clone(), key.clone()))
406 }
407
408 /// Number of streams managed by this object.
409 pub fn len(&self) -> usize {
410 self.priorities.len()
411 }
412
413 /// Return a `TunnelActivity` for this hop.
414 pub fn tunnel_activity(&self) -> TunnelActivity {
415 assert_eq!(self.len(), self.tunnel_activity.n_open_streams());
416 self.tunnel_activity
417 }
418}
419
420impl<K, P, S> Drop for StreamPollSet<K, P, S>
421where
422 S: PeekableStream + Unpin,
423{
424 fn drop(&mut self) {
425 self.priorities
426 .drain()
427 .for_each(|(_key, (_prio, token))| token.consume_and_forget());
428 }
429}
430
431/// Error returned by [`StreamPollSet::try_insert`].
432#[derive(Debug, thiserror::Error)]
433#[allow(clippy::exhaustive_structs)]
434pub struct KeyAlreadyInsertedError<K, P, S> {
435 /// Key that caller tried to insert.
436 #[allow(dead_code)]
437 pub key: K,
438 /// Priority that caller tried to insert.
439 #[allow(dead_code)]
440 pub priority: P,
441 /// Stream that caller tried to insert.
442 #[allow(dead_code)]
443 pub stream: S,
444}
445
446#[cfg(test)]
447mod test {
448 // @@ begin test lint list maintained by maint/add_warning @@
449 #![allow(clippy::bool_assert_comparison)]
450 #![allow(clippy::clone_on_copy)]
451 #![allow(clippy::dbg_macro)]
452 #![allow(clippy::mixed_attributes_style)]
453 #![allow(clippy::print_stderr)]
454 #![allow(clippy::print_stdout)]
455 #![allow(clippy::single_char_pattern)]
456 #![allow(clippy::unwrap_used)]
457 #![allow(clippy::unchecked_time_subtraction)]
458 #![allow(clippy::useless_vec)]
459 #![allow(clippy::needless_pass_by_value)]
460 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
461
462 use std::{
463 collections::VecDeque,
464 sync::{Arc, Mutex},
465 task::Poll,
466 };
467
468 use futures::{SinkExt as _, stream::Peekable};
469 use pin_project::pin_project;
470 use tor_rtmock::MockRuntime;
471
472 use super::*;
473
474 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
475 struct Key(u64);
476
477 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
478 struct Priority(u64);
479
480 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
481 struct Value(u64);
482
483 /// Test stream that we can directly manipulate and examine.
484 #[derive(Debug)]
485 #[pin_project]
486 struct VecDequeStream<T> {
487 // Ready items.
488 vec: VecDeque<T>,
489 // Whether any more items will be written.
490 closed: bool,
491 // Registered waker.
492 waker: Option<std::task::Waker>,
493 }
494 impl<T> VecDequeStream<T> {
495 fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
496 Self {
497 vec: VecDeque::from_iter(values),
498 waker: None,
499 closed: false,
500 }
501 }
502 fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
503 Self {
504 vec: VecDeque::from_iter(values),
505 waker: None,
506 closed: true,
507 }
508 }
509 fn push(&mut self, value: T) {
510 assert!(!self.closed);
511 self.vec.push_back(value);
512 if let Some(waker) = self.waker.take() {
513 waker.wake();
514 }
515 }
516 }
517 impl<T> futures::Stream for VecDequeStream<T> {
518 type Item = T;
519
520 fn poll_next(
521 mut self: std::pin::Pin<&mut Self>,
522 cx: &mut std::task::Context<'_>,
523 ) -> Poll<Option<Self::Item>> {
524 if let Some(val) = self.as_mut().vec.pop_front() {
525 Poll::Ready(Some(val))
526 } else if self.as_mut().closed {
527 // No more items coming.
528 Poll::Ready(None)
529 } else {
530 self.as_mut().waker.replace(cx.waker().clone());
531 Poll::Pending
532 }
533 }
534 }
535 impl<T> PeekableStream for VecDequeStream<T> {
536 fn poll_peek_mut(
537 self: Pin<&mut Self>,
538 cx: &mut Context<'_>,
539 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
540 let s = self.project();
541 if let Some(val) = s.vec.front_mut() {
542 Poll::Ready(Some(val))
543 } else if *s.closed {
544 // No more items coming.
545 Poll::Ready(None)
546 } else {
547 s.waker.replace(cx.waker().clone());
548 Poll::Pending
549 }
550 }
551 }
552 impl<T> std::cmp::PartialEq for VecDequeStream<T>
553 where
554 T: std::cmp::PartialEq,
555 {
556 fn eq(&self, other: &Self) -> bool {
557 // Ignore waker, which isn't comparable
558 self.vec == other.vec && self.closed == other.closed
559 }
560 }
561 impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
562
563 type TestStream = VecDequeStream<Value>;
564
565 #[test]
566 fn test_empty() {
567 futures::executor::block_on(futures::future::poll_fn(|ctx| {
568 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
569 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
570 Poll::Ready(())
571 }));
572 }
573
574 #[test]
575 fn test_one_pending() {
576 futures::executor::block_on(futures::future::poll_fn(|ctx| {
577 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
578 pollset
579 .try_insert(Key(0), Priority(0), TestStream::new_open([]))
580 .unwrap();
581 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
582 Poll::Ready(())
583 }));
584 }
585
586 #[test]
587 fn test_one_ready() {
588 futures::executor::block_on(futures::future::poll_fn(|ctx| {
589 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
590 pollset
591 .try_insert(
592 Key(0),
593 Priority(0),
594 TestStream::new_closed([Value(1), Value(2)]),
595 )
596 .unwrap();
597
598 // We only see the first value of the one ready stream.
599 assert_eq!(
600 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
601 vec![(
602 &Key(0),
603 &Priority(0),
604 &mut TestStream::new_closed([Value(1), Value(2)])
605 )],
606 );
607
608 // Same result, the same value is still at the head of the stream..
609 assert_eq!(
610 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
611 vec![(
612 &Key(0),
613 &Priority(0),
614 &mut TestStream::new_closed([Value(1), Value(2)])
615 )]
616 );
617
618 // Take the head of the stream.
619 assert_eq!(
620 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
621 Some((Priority(0), Value(1)))
622 );
623
624 // Should see the next value, with the new priority.
625 assert_eq!(
626 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
627 vec![(
628 &Key(0),
629 &Priority(1),
630 &mut TestStream::new_closed([Value(2)])
631 )]
632 );
633
634 // Take again.
635 assert_eq!(
636 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
637 Some((Priority(1), Value(2)))
638 );
639
640 // Should see end-of-stream.
641 assert_eq!(
642 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
643 vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
644 );
645
646 // Remove the now-ended stream.
647 assert_eq!(
648 pollset.remove(&Key(0)),
649 Some((Key(0), Priority(2), TestStream::new_closed([])))
650 );
651
652 // Should now be empty.
653 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
654
655 Poll::Ready(())
656 }));
657 }
658
659 #[test]
660 fn test_round_robin() {
661 futures::executor::block_on(futures::future::poll_fn(|ctx| {
662 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
663 pollset
664 .try_insert(
665 Key(0),
666 Priority(0),
667 TestStream::new_closed([Value(1), Value(2)]),
668 )
669 .unwrap();
670 pollset
671 .try_insert(
672 Key(1),
673 Priority(1),
674 TestStream::new_closed([Value(3), Value(4)]),
675 )
676 .unwrap();
677
678 // Should see both ready streams, in priority order.
679 assert_eq!(
680 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
681 vec![
682 (
683 &Key(0),
684 &Priority(0),
685 &mut TestStream::new_closed([Value(1), Value(2)])
686 ),
687 (
688 &Key(1),
689 &Priority(1),
690 &mut TestStream::new_closed([Value(3), Value(4)])
691 ),
692 ]
693 );
694
695 // Take from the first stream and send it to the back via priority assignment.
696 assert_eq!(
697 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
698 Some((Priority(0), Value(1)))
699 );
700
701 // Should see both ready streams, in the new priority order.
702 assert_eq!(
703 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
704 vec![
705 (
706 &Key(1),
707 &Priority(1),
708 &mut TestStream::new_closed([Value(3), Value(4)])
709 ),
710 (
711 &Key(0),
712 &Priority(2),
713 &mut TestStream::new_closed([Value(2)])
714 ),
715 ]
716 );
717
718 // Keep going ...
719 assert_eq!(
720 pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
721 Some((Priority(1), Value(3)))
722 );
723 assert_eq!(
724 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
725 vec![
726 (
727 &Key(0),
728 &Priority(2),
729 &mut TestStream::new_closed([Value(2)])
730 ),
731 (
732 &Key(1),
733 &Priority(3),
734 &mut TestStream::new_closed([Value(4)])
735 ),
736 ]
737 );
738 assert_eq!(
739 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
740 Some((Priority(2), Value(2)))
741 );
742 assert_eq!(
743 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
744 vec![
745 (
746 &Key(1),
747 &Priority(3),
748 &mut TestStream::new_closed([Value(4)])
749 ),
750 (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
751 ]
752 );
753 assert_eq!(
754 pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
755 Some((Priority(3), Value(4)))
756 );
757 assert_eq!(
758 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
759 vec![
760 (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
761 (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
762 ]
763 );
764
765 Poll::Ready(())
766 }));
767 }
768
769 #[test]
770 fn test_remove_and_reuse_key() {
771 futures::executor::block_on(futures::future::poll_fn(|ctx| {
772 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
773 pollset
774 .try_insert(
775 Key(0),
776 Priority(0),
777 TestStream::new_closed([Value(1), Value(2)]),
778 )
779 .unwrap();
780 assert_eq!(
781 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
782 vec![(
783 &Key(0),
784 &Priority(0),
785 &mut TestStream::new_closed([Value(1), Value(2)])
786 ),]
787 );
788 assert_eq!(
789 pollset.remove(&Key(0)),
790 Some((
791 Key(0),
792 Priority(0),
793 TestStream::new_closed([Value(1), Value(2)])
794 ))
795 );
796 pollset
797 .try_insert(
798 Key(0),
799 Priority(1),
800 TestStream::new_closed([Value(3), Value(4)]),
801 )
802 .unwrap();
803 // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
804 assert_eq!(
805 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
806 vec![(
807 &Key(0),
808 &Priority(1),
809 &mut TestStream::new_closed([Value(3), Value(4)])
810 ),]
811 );
812 Poll::Ready(())
813 }));
814 }
815
816 #[test]
817 fn get_ready_stream() {
818 futures::executor::block_on(futures::future::poll_fn(|_ctx| {
819 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
820 pollset
821 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
822 .unwrap();
823 assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
824 Poll::Ready(())
825 }));
826 }
827
828 #[test]
829 fn get_pending_stream() {
830 futures::executor::block_on(futures::future::poll_fn(|_ctx| {
831 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
832 pollset
833 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
834 .unwrap();
835 assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
836 Poll::Ready(())
837 }));
838 }
839
840 #[test]
841 fn mutate_pending_stream() {
842 futures::executor::block_on(futures::future::poll_fn(|ctx| {
843 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
844 pollset
845 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
846 .unwrap();
847 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
848
849 // This should cause the stream to become ready.
850 pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
851
852 assert_eq!(
853 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
854 vec![(
855 &Key(0),
856 &Priority(0),
857 &mut VecDequeStream::new_open([Value(0)])
858 ),]
859 );
860
861 Poll::Ready(())
862 }));
863 }
864
865 #[test]
866 fn mutate_ready_stream() {
867 futures::executor::block_on(futures::future::poll_fn(|ctx| {
868 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
869 pollset
870 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
871 .unwrap();
872 assert_eq!(
873 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
874 vec![(
875 &Key(0),
876 &Priority(0),
877 &mut VecDequeStream::new_open([Value(0)])
878 ),]
879 );
880
881 pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
882
883 assert_eq!(
884 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
885 vec![(
886 &Key(0),
887 &Priority(0),
888 &mut VecDequeStream::new_open([Value(0), Value(1)])
889 ),]
890 );
891
892 // Consume the value that was there.
893 assert_eq!(
894 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
895 Some((Priority(0), Value(0)))
896 );
897
898 // We should now see the value we added.
899 assert_eq!(
900 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
901 vec![(
902 &Key(0),
903 &Priority(0),
904 &mut VecDequeStream::new_open([Value(1)])
905 ),]
906 );
907
908 Poll::Ready(())
909 }));
910 }
911
912 #[test]
913 fn test_async() {
914 MockRuntime::test_with_various(|rt| async move {
915 let mut pollset = StreamPollSet::<
916 Key,
917 Priority,
918 Peekable<futures::channel::mpsc::Receiver<Value>>,
919 >::new();
920
921 // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
922 // These are analogous to Tor streams.
923 for streami in 1..=2 {
924 let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
925 pollset
926 .try_insert(Key(streami), Priority(streami), recv.peekable())
927 .unwrap();
928 rt.spawn_identified(format!("stream{streami}"), async move {
929 for val in 0..10 {
930 send.send(Value(val * streami)).await.unwrap();
931 }
932 });
933 }
934
935 let output = Arc::new(Mutex::new(Vec::new()));
936
937 rt.spawn_identified("mux", {
938 let output = output.clone();
939 async move {
940 loop {
941 let (key, priority, value) = futures::future::poll_fn(|ctx| {
942 match pollset.poll_ready_iter_mut(ctx).next() {
943 Some((key, priority, stream)) => {
944 let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
945 panic!("poll_ready_iter_mut returned non-ready stream")
946 };
947 Poll::Ready((*key, *priority, value.copied()))
948 }
949 // No streams ready, but there could be more items coming.
950 // The current `ctx` should be registered to wake us
951 // if and when there are.
952 None => Poll::Pending,
953 }
954 })
955 .await;
956 if let Some(value) = value {
957 // Take the value, and haphazardly set priority to push this stream "back".
958 pollset
959 .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
960 .unwrap();
961 output.lock().unwrap().push((key, value));
962 } else {
963 // Stream ended. Remove it.
964 let _ = pollset.remove(&key).unwrap();
965 }
966 }
967 }
968 });
969
970 rt.advance_until_stalled().await;
971
972 let output = output.lock().unwrap();
973
974 // We can't predict exactly how the stream values will be
975 // interleaved, but we should get all items from each stream, with
976 // correct order within each stream.
977 for streami in 1..=2 {
978 let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
979 let actual = output
980 .iter()
981 .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
982 .collect::<Vec<_>>();
983 assert_eq!(actual, expected);
984 }
985 });
986 }
987}