tor_proto/util/sometimes_unbounded_sink.rs
1//! [`SometimesUnboundedSink`]
2
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::task::{Context, Poll, Poll::*, ready};
6
7use futures::{Sink, future};
8
9use pin_project::pin_project;
10
11/// Wraps a [`Sink`], providing an only-sometimes-used unbounded buffer
12///
13/// For example, consider `SometimesUnboundedSink<T, mpsc::Receiver>`.
14/// The `Receiver` is not always ready for writing:
15/// if the capacity is exceeded, `send` will block.
16///
17/// `SometimesUnboundedSink`'s `Sink` implementation works the same way.
18/// But there are also two methods
19/// [`pollish_send_unbounded`](SometimesUnboundedSink::pollish_send_unbounded)
20/// and
21/// [`send_unbounded`](SometimesUnboundedSink::send_unbounded)
22/// which will always succeed immediately.
23/// Items which the underlying sink `S` is not ready to accept are queued,
24/// and will be delivered to `S` when possible.
25///
26/// ### You must poll this type
27///
28/// For queued items to be delivered,
29/// `SometimesUnboundedSink` must be polled,
30/// even if you don't have an item to send.
31///
32/// You can use [`Sink::poll_ready`] for this.
33/// Any [`Context`]-taking methods is suitable.
34///
35/// (This is a difference between [`SometimesUnboundedSink`]
36/// and [`mpsc::UnboundedSender`](futures::channel::mpsc::UnboundedSender):
37/// [`UnboundedSender::unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send)
38/// does not require a flush operation.
39/// In this way, [`SometimesUnboundedSink::send_unbounded`] behaves more like
40/// [`Sink::start_send`], which _does_ require a subsequent flush.)
41//
42/// ### Error handling
43///
44/// Errors from the underlying sink may not be reported immediately,
45/// due to the buffering in `SometimesUnboundedSink`.
46///
47/// However, if the sink reports errors from `poll_ready`
48/// these will surface in a timely fashion.
49///
50/// After an error has been reported, there may still be buffered data,
51/// which will only be delivered if `SometimesUnboundedSink` is polled again
52/// (and the error in the underlying sink was transient).
53#[pin_project]
54pub(crate) struct SometimesUnboundedSink<T, S> {
55 /// Things we couldn't send_unbounded right away
56 ///
57 /// Invariants:
58 ///
59 /// * Everything here must be fed to `inner` before any further user data
60 /// (unbounded user data may be appended).
61 ///
62 /// * If this is nonempty, the executor knows to wake this task.
63 /// This is achieved as follows:
64 /// If this is nonempty, `inner.poll_ready()` has been called.
65 buf: VecDeque<T>,
66
67 /// The actual sink
68 ///
69 /// This also has the relevant `Waker`.
70 ///
71 /// # Waker invariant
72 ///
73 /// Whenever either
74 ///
75 /// * The last call to any of our public methods returned `Pending`, or
76 /// * `buf` is nonempty,
77 ///
78 /// the last method call `inner` *also* returned `Pending`.
79 /// (Or, we have reported an error.)
80 ///
81 /// So, in those situations, this task has been recorded for wakeup
82 /// by `inner` (specifically, its other end, if it's a channel)
83 /// when `inner` becomes readable.
84 ///
85 /// Therefore this task will be woken up, and, if the caller actually
86 /// polls us again (as is usual and is required by our docs),
87 /// we'll drain any queued data.
88 #[pin]
89 inner: S,
90}
91
92impl<T, S: Sink<T>> SometimesUnboundedSink<T, S> {
93 /// Wrap an inner `Sink` with a `SometimesUnboundedSink`
94 //
95 // There is no method for unwrapping. If we make this type more public,
96 // there should be, but that method will need `where S: Unpin`.
97 pub(crate) fn new(inner: S) -> Self {
98 SometimesUnboundedSink {
99 buf: VecDeque::new(),
100 inner,
101 }
102 }
103
104 /// Return the number of T queued in this sink.
105 pub(crate) fn n_queued(&self) -> usize {
106 self.buf.len()
107 }
108
109 /// Return an iterator over the items queued in this sink.
110 ///
111 /// (Used by circuit padding to see whether we have a cell queued for a given hop.)
112 #[cfg(feature = "circ-padding")]
113 pub(crate) fn iter_queue(&self) -> impl Iterator<Item = &T> + '_ {
114 self.buf.iter()
115 }
116
117 /// Hand `item` to the inner Sink if possible, or queue it otherwise
118 ///
119 /// Like a `poll_...` method in that it takes a `Context`.
120 /// That's needed to make sure we get polled again
121 /// when the underlying sink can accept items.
122 ///
123 /// But unlike a `poll_...` method in that it doesn't return `Poll`,
124 /// since completion is always immediate.
125 pub(crate) fn pollish_send_unbounded(
126 mut self: Pin<&mut Self>,
127 cx: &mut Context<'_>,
128 item: T,
129 ) -> Result<(), S::Error> {
130 match self.as_mut().poll_ready(cx) {
131 // Waker invariant: poll_ready only returns Ready(Ok(())) if `buf` is empty
132 Ready(Ok(())) => self.as_mut().start_send(item),
133 // Waker invariant: if we report an error, we're then allowed to expect polling again
134 Ready(Err(e)) => Err(e),
135 Pending => {
136 // Waker invariant: poll_ready() returned Pending,
137 // so the task has indeed already been recorded.
138 self.as_mut().project().buf.push_back(item);
139 Ok(())
140 }
141 }
142 }
143
144 /// Hand `item` to the inner Sink if possible, or queue it otherwise (async fn)
145 ///
146 /// You must `.await` this, but it will never block.
147 /// (Its future is always `Ready`.)
148 pub(crate) async fn send_unbounded(mut self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
149 // Waker invariant: this is just a wrapper around `pollish_send_unbounded`
150 let mut item = Some(item);
151 future::poll_fn(move |cx| {
152 let item = item.take().expect("polled after Ready");
153 Ready(self.as_mut().pollish_send_unbounded(cx, item))
154 })
155 .await
156 }
157
158 /// Flush the buffer. On a `Ready(())` return, it's empty.
159 ///
160 /// This satisfies the Waker invariant as if it were a public method.
161 fn flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
162 let mut self_ = self.as_mut().project();
163 while !self_.buf.is_empty() {
164 // Waker invariant:
165 // if inner gave Pending, we give Pending too: ok
166 // if inner gave Err, we're allowed to want polling again
167 ready!(self_.inner.as_mut().poll_ready(cx))?;
168 let item = self_.buf.pop_front().expect("suddenly empty!");
169 // Waker invariant: returning Err
170 self_.inner.as_mut().start_send(item)?;
171 }
172 // Waker invariant: buffer is empty, and we're not about to return Pending
173 Ready(Ok(()))
174 }
175
176 /// Obtain a reference to the inner `Sink`, `S`
177 ///
178 /// This method should be used with a little care, since it bypasses the wrapper.
179 /// For example, if `S` has interior mutability, and this method is used to
180 /// modify it, the `SometimesUnboundedSink` may malfunction.
181 pub(crate) fn as_inner(&self) -> &S {
182 &self.inner
183 }
184
185 /// Obtain a mutable reference to the inner `Sink`, `S`
186 ///
187 /// This method should be used with extra care, since it bypasses the wrapper.
188 /// Before you call this method,
189 /// make sure you understand the internal invariants for `SometimesUnboundedSink`,
190 /// and make sure that you are not violating them.
191 /// In particular, do not queue anything onto the resulting `Sink` directly.
192 pub(crate) fn as_inner_mut(&mut self) -> &mut S {
193 &mut self.inner
194 }
195}
196
197// Waker invariant for all these impls:
198// returning Err or Pending from flush_buf: OK, flush_buf ensures the condition holds
199// returning from the inner method: trivially OK
200impl<T, S: Sink<T>> Sink<T> for SometimesUnboundedSink<T, S> {
201 type Error = S::Error;
202
203 // Only returns `Ready(Ok(()))` if `buf` is empty
204 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
205 ready!(self.as_mut().flush_buf(cx))?;
206 self.project().inner.poll_ready(cx)
207 }
208
209 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
210 assert!(self.buf.is_empty(), "start_send without poll_ready");
211 self.project().inner.start_send(item)
212 }
213
214 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
215 ready!(self.as_mut().flush_buf(cx))?;
216 self.project().inner.poll_flush(cx)
217 }
218
219 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
220 ready!(self.as_mut().flush_buf(cx))?;
221 self.project().inner.poll_close(cx)
222 }
223}
224
225#[cfg(test)]
226mod test {
227 // @@ begin test lint list maintained by maint/add_warning @@
228 #![allow(clippy::bool_assert_comparison)]
229 #![allow(clippy::clone_on_copy)]
230 #![allow(clippy::dbg_macro)]
231 #![allow(clippy::mixed_attributes_style)]
232 #![allow(clippy::print_stderr)]
233 #![allow(clippy::print_stdout)]
234 #![allow(clippy::single_char_pattern)]
235 #![allow(clippy::unwrap_used)]
236 #![allow(clippy::unchecked_time_subtraction)]
237 #![allow(clippy::useless_vec)]
238 #![allow(clippy::needless_pass_by_value)]
239 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
240 use super::*;
241 use futures::channel::mpsc;
242 use futures::{SinkExt as _, StreamExt as _};
243 use std::pin::pin;
244 use tor_rtmock::MockRuntime;
245
246 #[test]
247 fn cases() {
248 // `test_with_various` runs with both LIFO and FIFO scheduling policies,
249 // so should interleave the sending and receiving tasks
250 // in ways that exercise the corner cases we're interested in.
251 MockRuntime::test_with_various(|runtime| async move {
252 let (tx, rx) = mpsc::channel(1);
253 let tx = SometimesUnboundedSink::new(tx);
254
255 runtime.spawn_identified("sender", async move {
256 let mut tx = pin!(tx);
257 let mut n = 0..;
258 let mut n = move || n.next().unwrap();
259
260 // unbounded when we can send right away
261 tx.as_mut().send_unbounded(n()).await.unwrap();
262 tx.as_mut().send(n()).await.unwrap();
263 tx.as_mut().send(n()).await.unwrap();
264 tx.as_mut().send(n()).await.unwrap();
265 // unbounded when we maybe can't and might queue
266 tx.as_mut().send_unbounded(n()).await.unwrap();
267 tx.as_mut().send_unbounded(n()).await.unwrap();
268 tx.as_mut().send_unbounded(n()).await.unwrap();
269 // some interleaving
270 tx.as_mut().send(n()).await.unwrap();
271 tx.as_mut().send_unbounded(n()).await.unwrap();
272 // flush
273 tx.as_mut().flush().await.unwrap();
274 // close
275 tx.as_mut().close().await.unwrap();
276 });
277
278 runtime.spawn_identified("receiver", async move {
279 let mut rx = pin!(rx);
280 let mut exp = 0..;
281
282 while let Some(n) = rx.next().await {
283 assert_eq!(n, exp.next().unwrap());
284 }
285 assert_eq!(exp.next().unwrap(), 9);
286 });
287
288 runtime.progress_until_stalled().await;
289 });
290 }
291}