Skip to main content

tor_proto/client/stream/
data.rs

1//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2//! for byte-oriented communication.
3
4use crate::{Error, Result};
5use static_assertions::assert_impl_all;
6use tor_cell::relaycell::msg::EndReason;
7use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8
9use futures::io::{AsyncRead, AsyncWrite};
10use futures::stream::StreamExt;
11use futures::task::{Context, Poll};
12use futures::{Future, Stream};
13use pin_project::pin_project;
14use postage::watch;
15
16#[cfg(feature = "tokio")]
17use tokio_crate::io::ReadBuf;
18#[cfg(feature = "tokio")]
19use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20#[cfg(feature = "tokio")]
21use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22use tor_cell::restricted_msg;
23
24use std::fmt::Debug;
25use std::io::Result as IoResult;
26use std::num::NonZero;
27use std::pin::Pin;
28#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29use std::sync::Arc;
30#[cfg(feature = "stream-ctrl")]
31use std::sync::{Mutex, Weak};
32
33use educe::Educe;
34
35use crate::client::ClientTunnel;
36use crate::client::stream::StreamReceiver;
37use crate::memquota::StreamAccount;
38use crate::stream::StreamTarget;
39use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
40use crate::stream::flow_ctrl::state::StreamRateLimit;
41use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
42use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
43use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
44use tor_basic_utils::skip_fmt;
45use tor_cell::relaycell::msg::Data;
46use tor_error::internal;
47use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
48
49/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
50///
51/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
52/// [`DataWriter`], which leaves us with this ugly type.
53/// We use a type alias to make `DataWriter` a little nicer.
54type RateConfigStream = futures::stream::Map<
55    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
56    fn(StreamRateLimit) -> RateLimitedWriterConfig,
57>;
58
59/// An anonymized stream over the Tor network.
60///
61/// For most purposes, you can think of this type as an anonymized
62/// TCP stream: it can read and write data, and get closed when it's done.
63///
64/// [`DataStream`] implements [`futures::io::AsyncRead`] and
65/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
66/// traits are expected.
67///
68/// # Examples
69///
70/// Connecting to an HTTP server and sending a request, using
71/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
72///
73/// ```ignore
74/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
75///
76/// use futures::io::AsyncWriteExt;
77///
78/// stream
79///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
80///     .await?;
81///
82/// // Flushing the stream is important; see below!
83/// stream.flush().await?;
84/// ```
85///
86/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
87///
88/// ```ignore
89/// use futures::io::AsyncReadExt;
90///
91/// let mut buf = Vec::new();
92/// stream.read_to_end(&mut buf).await?;
93///
94/// println!("{}", String::from_utf8_lossy(&buf));
95/// ```
96///
97/// # Usage with Tokio
98///
99/// If the `tokio` crate feature is enabled, this type also implements
100/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
101/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
102/// with code that expects those traits.
103///
104/// # Remember to call `flush`!
105///
106/// DataStream buffers data internally, in order to write as few cells
107/// as possible onto the network.  In order to make sure that your
108/// data has actually been sent, you need to make sure that
109/// [`AsyncWrite::poll_flush`] runs to completion: probably via
110/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
111///
112/// # Splitting the type
113///
114/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
115/// `DataStream::split` method can be used to split it into those two parts, for more
116/// convenient usage with e.g. stream combinators.
117///
118/// # How long does a stream live?
119///
120/// A `DataStream` will live until all references to it are dropped,
121/// or until it is closed explicitly.
122///
123/// If you split the stream into a `DataReader` and a `DataWriter`, it
124/// will survive until _both_ are dropped, or until it is closed
125/// explicitly.
126///
127/// A stream can also close because of a network error,
128/// or because the other side of the stream decided to close it.
129///
130// # Semver note
131//
132// Note that this type is re-exported as a part of the public API of
133// the `arti-client` crate.  Any changes to its API here in
134// `tor-proto` need to be reflected above.
135#[derive(Debug)]
136pub struct DataStream {
137    /// Underlying writer for this stream
138    w: DataWriter,
139    /// Underlying reader for this stream
140    r: DataReader,
141    /// A control object that can be used to monitor and control this stream
142    /// without needing to own it.
143    #[cfg(feature = "stream-ctrl")]
144    ctrl: Arc<ClientDataStreamCtrl>,
145}
146assert_impl_all! { DataStream: Send, Sync }
147
148/// An object used to control and monitor a data stream.
149///
150/// # Notes
151///
152/// This is a separate type from [`DataStream`] because it's useful to have
153/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
154/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
155/// work correctly.
156#[cfg(feature = "stream-ctrl")]
157#[cfg_attr(
158    feature = "rpc",
159    derive(derive_deftly::Deftly),
160    derive_deftly(tor_rpcbase::templates::Object)
161)]
162#[derive(Debug)]
163pub struct ClientDataStreamCtrl {
164    /// The circuit to which this stream is attached.
165    ///
166    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
167    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
168    /// one of those is alive, this reference will be present.
169    ///
170    /// We make this a Weak reference so that once the stream itself is closed,
171    /// we can't leak circuits.
172    tunnel: Weak<ClientTunnel>,
173
174    /// Shared user-visible information about the state of this stream.
175    ///
176    /// TODO RPC: This will probably want to be a `postage::Watch` or something
177    /// similar, if and when it stops moving around.
178    #[cfg(feature = "stream-ctrl")]
179    status: Arc<Mutex<DataStreamStatus>>,
180
181    /// The memory quota account that should be used for this stream's data
182    ///
183    /// Exists to keep the account alive
184    _memquota: StreamAccount,
185}
186
187/// The inner writer for [`DataWriter`].
188///
189/// This type is responsible for taking bytes and packaging them into cells.
190/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
191#[derive(Debug)]
192struct DataWriterInner {
193    /// Internal state for this writer
194    ///
195    /// This is stored in an Option so that we can mutate it in the
196    /// AsyncWrite functions.  It might be possible to do better here,
197    /// and we should refactor if so.
198    state: Option<DataWriterState>,
199
200    /// The memory quota account that should be used for this stream's data
201    ///
202    /// Exists to keep the account alive
203    // If we liked, we could make this conditional; see DataReaderInner.memquota
204    _memquota: StreamAccount,
205
206    /// A control object that can be used to monitor and control this stream
207    /// without needing to own it.
208    #[cfg(feature = "stream-ctrl")]
209    ctrl: Arc<ClientDataStreamCtrl>,
210}
211
212/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
213///
214/// See the [`DataStream`] docs for more information. In particular, note
215/// that this writer requires `poll_flush` to complete in order to guarantee that
216/// all data has been written.
217///
218/// # Usage with Tokio
219///
220/// If the `tokio` crate feature is enabled, this type also implements
221/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
222/// with code that expects that trait.
223///
224/// # Drop and close
225///
226/// Note that dropping a `DataWriter` has no special effect on its own:
227/// if the `DataWriter` is dropped, the underlying stream will still remain open
228/// until the `DataReader` is also dropped.
229///
230/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
231/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
232///
233/// Remember that Tor does not support half-open streams:
234/// If you `close` or `shutdown` a stream,
235/// the other side will not see the stream as half-open,
236/// and so will (probably) not finish sending you any in-progress data.
237/// Do not use `close`/`shutdown` to communicate anything besides
238/// "I am done using this stream."
239///
240// # Semver note
241//
242// Note that this type is re-exported as a part of the public API of
243// the `arti-client` crate.  Any changes to its API here in
244// `tor-proto` need to be reflected above.
245#[derive(Debug)]
246pub struct DataWriter {
247    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
248    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
249}
250
251impl DataWriter {
252    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
253    fn new(
254        inner: DataWriterInner,
255        rate_limit_updates: watch::Receiver<StreamRateLimit>,
256        time_provider: DynTimeProvider,
257    ) -> Self {
258        /// Converts a `rate` into a `RateLimitedWriterConfig`.
259        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
260            let rate = rate.bytes_per_sec();
261            RateLimitedWriterConfig {
262                rate,        // bytes per second
263                burst: rate, // bytes
264                // This number is chosen arbitrarily, but the idea is that we want to balance
265                // between throughput and latency. Assume the user tries to write a large buffer
266                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
267                // frequently and writing a small number of bytes each time to the
268                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
269                // too large (for example 510), we'll be waking up infrequently to write a larger
270                // number of bytes each time. So even if the `DataWriterInner` has almost a full
271                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
272                // bytes before a cell can be sent, it will block until the rate limiter allows 510
273                // more bytes.
274                //
275                // TODO(arti#2028): Is there an optimal value here?
276                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
277            }
278        }
279
280        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
281        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
282
283        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
284        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
285
286        // build the rate limiter
287        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
288        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
289
290        Self { writer }
291    }
292
293    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
294    /// interact with this stream without holding the stream itself.
295    #[cfg(feature = "stream-ctrl")]
296    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
297        Some(self.writer.inner().client_stream_ctrl())
298    }
299}
300
301impl AsyncWrite for DataWriter {
302    fn poll_write(
303        mut self: Pin<&mut Self>,
304        cx: &mut Context<'_>,
305        buf: &[u8],
306    ) -> Poll<IoResult<usize>> {
307        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
308    }
309
310    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
311        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
312    }
313
314    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
315        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
316    }
317}
318
319#[cfg(feature = "tokio")]
320impl TokioAsyncWrite for DataWriter {
321    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
322        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
323    }
324
325    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
326        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
327    }
328
329    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
330        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
331    }
332}
333
334/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
335///
336/// See the [`DataStream`] docs for more information.
337///
338/// # Usage with Tokio
339///
340/// If the `tokio` crate feature is enabled, this type also implements
341/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
342/// with code that expects that trait.
343//
344// # Semver note
345//
346// Note that this type is re-exported as a part of the public API of
347// the `arti-client` crate.  Any changes to its API here in
348// `tor-proto` need to be reflected above.
349#[derive(Debug)]
350pub struct DataReader {
351    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
352    reader: XonXoffReader<DataReaderInner>,
353}
354
355impl DataReader {
356    /// Create a new [`DataReader`].
357    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
358        Self {
359            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
360        }
361    }
362
363    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
364    /// interact with this stream without holding the stream itself.
365    #[cfg(feature = "stream-ctrl")]
366    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
367        Some(self.reader.inner().client_stream_ctrl())
368    }
369}
370
371impl AsyncRead for DataReader {
372    fn poll_read(
373        mut self: Pin<&mut Self>,
374        cx: &mut Context<'_>,
375        buf: &mut [u8],
376    ) -> Poll<IoResult<usize>> {
377        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
378    }
379
380    fn poll_read_vectored(
381        mut self: Pin<&mut Self>,
382        cx: &mut Context<'_>,
383        bufs: &mut [std::io::IoSliceMut<'_>],
384    ) -> Poll<IoResult<usize>> {
385        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
386    }
387}
388
389#[cfg(feature = "tokio")]
390impl TokioAsyncRead for DataReader {
391    fn poll_read(
392        self: Pin<&mut Self>,
393        cx: &mut Context<'_>,
394        buf: &mut ReadBuf<'_>,
395    ) -> Poll<IoResult<()>> {
396        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
397    }
398}
399
400/// The inner reader for [`DataReader`].
401///
402/// This type is responsible for taking stream messages and extracting the stream data from them.
403/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
404#[derive(Debug)]
405pub(crate) struct DataReaderInner {
406    /// Internal state for this reader.
407    ///
408    /// This is stored in an Option so that we can mutate it in
409    /// poll_read().  It might be possible to do better here, and we
410    /// should refactor if so.
411    state: Option<DataReaderState>,
412
413    /// The memory quota account that should be used for this stream's data
414    ///
415    /// Exists to keep the account alive
416    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
417    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
418    _memquota: StreamAccount,
419
420    /// A control object that can be used to monitor and control this stream
421    /// without needing to own it.
422    #[cfg(feature = "stream-ctrl")]
423    ctrl: Arc<ClientDataStreamCtrl>,
424}
425
426impl BufferIsEmpty for DataReaderInner {
427    /// The result will become stale,
428    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
429    fn is_empty(mut self: Pin<&mut Self>) -> bool {
430        match self
431            .state
432            .as_mut()
433            .expect("forgot to put `DataReaderState` back")
434        {
435            DataReaderState::Open(imp) => {
436                // check if the partial cell in `pending` is empty,
437                // and if the message stream is empty
438                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
439            }
440            // closed, so any data should have been discarded
441            DataReaderState::Closed => true,
442        }
443    }
444}
445
446/// Shared status flags for tracking the status of as `DataStream`.
447///
448/// We expect to refactor this a bit, so it's not exposed at all.
449//
450// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
451// from various points in this module, we should instead construct
452// DataStreamStatus as needed from information available elsewhere.  In any
453// case, we should really  eliminate as much duplicate state here as we can.
454// (See discussions at !1198 for some challenges with this.)
455#[cfg(feature = "stream-ctrl")]
456#[derive(Clone, Debug, Default)]
457struct DataStreamStatus {
458    /// True if we've received a CONNECTED message.
459    //
460    // TODO: This is redundant with `connected` in DataReaderImpl.
461    received_connected: bool,
462    /// True if we have decided to send an END message.
463    //
464    // TODO RPC: There is not an easy way to set this from this module!  Really,
465    // the decision to send an "end" is made when the StreamTarget object is
466    // dropped, but we don't currently have any way to see when that happens.
467    // Perhaps we need a different shared StreamStatus object that the
468    // StreamTarget holds?
469    sent_end: bool,
470    /// True if we have received an END message telling us to close the stream.
471    received_end: bool,
472    /// True if we have received an error.
473    ///
474    /// (This is not a subset or superset of received_end; some errors are END
475    /// messages but some aren't; some END messages are errors but some aren't.)
476    received_err: bool,
477}
478
479#[cfg(feature = "stream-ctrl")]
480impl DataStreamStatus {
481    /// Remember that we've received a connected message.
482    fn record_connected(&mut self) {
483        self.received_connected = true;
484    }
485
486    /// Remember that we've received an error of some kind.
487    fn record_error(&mut self, e: &Error) {
488        // TODO: Probably we should remember the actual error in a box or
489        // something.  But that means making a redundant copy of the error
490        // even if nobody will want it.  Do we care?
491        match e {
492            Error::EndReceived(EndReason::DONE) => self.received_end = true,
493            Error::EndReceived(_) => {
494                self.received_end = true;
495                self.received_err = true;
496            }
497            _ => self.received_err = true,
498        }
499    }
500}
501
502restricted_msg! {
503    /// An allowable incoming message on a client data stream.
504    enum ClientDataStreamMsg:RelayMsg {
505        // SENDME is handled by the reactor.
506        Data, End, Connected,
507    }
508}
509
510// TODO RPC: Should we also implement this trait for everything that holds a
511// ClientDataStreamCtrl?
512#[cfg(feature = "stream-ctrl")]
513impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
514    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
515        self.tunnel.upgrade()
516    }
517}
518
519#[cfg(feature = "stream-ctrl")]
520impl ClientDataStreamCtrl {
521    /// Return true if the underlying stream is connected. (That is, if it has
522    /// received a `CONNECTED` message, and has not been closed.)
523    pub fn is_connected(&self) -> bool {
524        let s = self.status.lock().expect("poisoned lock");
525        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
526    }
527
528    // TODO RPC: Add more functions once we have the desired API more nailed
529    // down.
530}
531
532impl DataStream {
533    /// Wrap raw stream receiver and target parts as a DataStream.
534    ///
535    /// For non-optimistic stream, function `wait_for_connection`
536    /// must be called after to make sure CONNECTED is received.
537    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
538        time_provider: P,
539        receiver: StreamReceiver,
540        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
541        target: StreamTarget,
542        memquota: StreamAccount,
543    ) -> Self {
544        Self::new_inner(
545            time_provider,
546            receiver,
547            xon_xoff_reader_ctrl,
548            target,
549            false,
550            memquota,
551        )
552    }
553
554    /// Wrap raw stream receiver and target parts as a connected DataStream.
555    ///
556    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
557    /// CONNECTED cell.
558    ///
559    /// This is used by hidden services, exit relays, and directory servers to accept streams.
560    #[cfg(any(feature = "hs-service", feature = "relay"))]
561    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
562        time_provider: P,
563        receiver: StreamReceiver,
564        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
565        target: StreamTarget,
566        memquota: StreamAccount,
567    ) -> Self {
568        Self::new_inner(
569            time_provider,
570            receiver,
571            xon_xoff_reader_ctrl,
572            target,
573            true,
574            memquota,
575        )
576    }
577
578    /// The shared implementation of the `new*()` functions.
579    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
580        time_provider: P,
581        receiver: StreamReceiver,
582        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
583        target: StreamTarget,
584        connected: bool,
585        memquota: StreamAccount,
586    ) -> Self {
587        let relay_cell_format = target.relay_cell_format();
588        let out_buf_len = Data::max_body_len(relay_cell_format);
589        let rate_limit_stream = target.rate_limit_stream().clone();
590
591        #[cfg(feature = "stream-ctrl")]
592        let status = {
593            let mut data_stream_status = DataStreamStatus::default();
594            if connected {
595                data_stream_status.record_connected();
596            }
597            Arc::new(Mutex::new(data_stream_status))
598        };
599
600        #[cfg(feature = "stream-ctrl")]
601        let ctrl = {
602            let tunnel = match target.tunnel() {
603                crate::stream::Tunnel::Client(t) => Arc::downgrade(t),
604                #[cfg(feature = "relay")]
605                crate::stream::Tunnel::Relay(_) => panic!("created a relay tunnel in the client?!"),
606            };
607
608            Arc::new(ClientDataStreamCtrl {
609                tunnel,
610                status: status.clone(),
611                _memquota: memquota.clone(),
612            })
613        };
614        let r = DataReaderInner {
615            state: Some(DataReaderState::Open(DataReaderImpl {
616                s: receiver,
617                pending: Vec::new(),
618                offset: 0,
619                connected,
620                #[cfg(feature = "stream-ctrl")]
621                status: status.clone(),
622            })),
623            _memquota: memquota.clone(),
624            #[cfg(feature = "stream-ctrl")]
625            ctrl: ctrl.clone(),
626        };
627        let w = DataWriterInner {
628            state: Some(DataWriterState::Ready(DataWriterImpl {
629                s: target,
630                buf: vec![0; out_buf_len].into_boxed_slice(),
631                n_pending: 0,
632                #[cfg(feature = "stream-ctrl")]
633                status,
634                relay_cell_format,
635            })),
636            _memquota: memquota,
637            #[cfg(feature = "stream-ctrl")]
638            ctrl: ctrl.clone(),
639        };
640
641        let time_provider = DynTimeProvider::new(time_provider);
642
643        DataStream {
644            w: DataWriter::new(w, rate_limit_stream, time_provider),
645            r: DataReader::new(r, xon_xoff_reader_ctrl),
646            #[cfg(feature = "stream-ctrl")]
647            ctrl,
648        }
649    }
650
651    /// Divide this DataStream into its constituent parts.
652    pub fn split(self) -> (DataReader, DataWriter) {
653        (self.r, self.w)
654    }
655
656    /// Wait until a CONNECTED cell is received, or some other cell
657    /// is received to indicate an error.
658    ///
659    /// Does nothing if this stream is already connected.
660    pub async fn wait_for_connection(&mut self) -> Result<()> {
661        // We must put state back before returning
662        let state = self
663            .r
664            .reader
665            .inner_mut()
666            .state
667            .take()
668            .expect("Missing state in DataReaderInner");
669
670        if let DataReaderState::Open(mut imp) = state {
671            let result = if imp.connected {
672                Ok(())
673            } else {
674                // This succeeds if the cell is CONNECTED, and fails otherwise.
675                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
676            };
677            self.r.reader.inner_mut().state = Some(match result {
678                Err(_) => DataReaderState::Closed,
679                Ok(_) => DataReaderState::Open(imp),
680            });
681            result
682        } else {
683            Err(Error::from(internal!(
684                "Expected ready state, got {:?}",
685                state
686            )))
687        }
688    }
689
690    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
691    /// interact with this stream without holding the stream itself.
692    #[cfg(feature = "stream-ctrl")]
693    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
694        Some(&self.ctrl)
695    }
696}
697
698impl AsyncRead for DataStream {
699    fn poll_read(
700        mut self: Pin<&mut Self>,
701        cx: &mut Context<'_>,
702        buf: &mut [u8],
703    ) -> Poll<IoResult<usize>> {
704        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
705    }
706}
707
708#[cfg(feature = "tokio")]
709impl TokioAsyncRead for DataStream {
710    fn poll_read(
711        self: Pin<&mut Self>,
712        cx: &mut Context<'_>,
713        buf: &mut ReadBuf<'_>,
714    ) -> Poll<IoResult<()>> {
715        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
716    }
717}
718
719impl AsyncWrite for DataStream {
720    fn poll_write(
721        mut self: Pin<&mut Self>,
722        cx: &mut Context<'_>,
723        buf: &[u8],
724    ) -> Poll<IoResult<usize>> {
725        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
726    }
727    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
728        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
729    }
730    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
731        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
732    }
733}
734
735#[cfg(feature = "tokio")]
736impl TokioAsyncWrite for DataStream {
737    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
738        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
739    }
740
741    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
742        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
743    }
744
745    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
746        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
747    }
748}
749
750/// Helper type: Like BoxFuture, but also requires that the future be Sync.
751type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
752
753/// An enumeration for the state of a DataWriter.
754///
755/// We have to use an enum here because, for as long as we're waiting
756/// for a flush operation to complete, the future returned by
757/// `flush_cell()` owns the DataWriterImpl.
758#[derive(Educe)]
759#[educe(Debug)]
760enum DataWriterState {
761    /// The writer has closed or gotten an error: nothing more to do.
762    Closed,
763    /// The writer is not currently flushing; more data can get queued
764    /// immediately.
765    Ready(DataWriterImpl),
766    /// The writer is flushing a cell.
767    Flushing(
768        #[educe(Debug(method = "skip_fmt"))] //
769        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
770    ),
771}
772
773/// Internal: the write part of a DataStream
774#[derive(Educe)]
775#[educe(Debug)]
776struct DataWriterImpl {
777    /// The underlying StreamTarget object.
778    s: StreamTarget,
779
780    /// Buffered data to send over the connection.
781    ///
782    /// This buffer is currently allocated using a number of bytes
783    /// equal to the maximum that we can package at a time.
784    //
785    // TODO: this buffer is probably smaller than we want, but it's good
786    // enough for now.  If we _do_ make it bigger, we'll have to change
787    // our use of Data::split_from to handle the case where we can't fit
788    // all the data.
789    #[educe(Debug(method = "skip_fmt"))]
790    buf: Box<[u8]>,
791
792    /// Number of unflushed bytes in buf.
793    n_pending: usize,
794
795    /// Relay cell format in use
796    relay_cell_format: RelayCellFormat,
797
798    /// Shared user-visible information about the state of this stream.
799    #[cfg(feature = "stream-ctrl")]
800    status: Arc<Mutex<DataStreamStatus>>,
801}
802
803impl DataWriterInner {
804    /// See [`DataWriter::client_stream_ctrl`].
805    #[cfg(feature = "stream-ctrl")]
806    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
807        &self.ctrl
808    }
809
810    /// Helper for poll_flush() and poll_close(): Performs a flush, then
811    /// closes the stream if should_close is true.
812    fn poll_flush_impl(
813        mut self: Pin<&mut Self>,
814        cx: &mut Context<'_>,
815        should_close: bool,
816    ) -> Poll<IoResult<()>> {
817        let state = self.state.take().expect("Missing state in DataWriter");
818
819        // TODO: this whole function is a bit copy-pasted.
820        let mut future: BoxSyncFuture<_> = match state {
821            DataWriterState::Ready(imp) => {
822                if imp.n_pending == 0 {
823                    // Nothing to flush!
824                    if should_close {
825                        // We need to actually continue with this function to do the closing.
826                        // Thus, make a future that does nothing and is ready immediately.
827                        Box::pin(futures::future::ready((imp, Ok(()))))
828                    } else {
829                        // There's nothing more to do; we can return.
830                        self.state = Some(DataWriterState::Ready(imp));
831                        return Poll::Ready(Ok(()));
832                    }
833                } else {
834                    // We need to flush the buffer's contents; Make a future for that.
835                    Box::pin(imp.flush_buf())
836                }
837            }
838            DataWriterState::Flushing(fut) => fut,
839            DataWriterState::Closed => {
840                self.state = Some(DataWriterState::Closed);
841                return Poll::Ready(Err(Error::NotConnected.into()));
842            }
843        };
844
845        match future.as_mut().poll(cx) {
846            Poll::Ready((_imp, Err(e))) => {
847                self.state = Some(DataWriterState::Closed);
848                Poll::Ready(Err(e.into()))
849            }
850            Poll::Ready((mut imp, Ok(()))) => {
851                if should_close {
852                    // Tell the StreamTarget to close, so that the reactor
853                    // realizes that we are done sending. (Dropping `imp.s` does not
854                    // suffice, since there may be other clones of it.  In particular,
855                    // the StreamReceiver has one, which it uses to keep the stream
856                    // open, among other things.)
857                    imp.s.close();
858
859                    #[cfg(feature = "stream-ctrl")]
860                    {
861                        // TODO RPC:  This is not sufficient to track every case
862                        // where we might have sent an End.  See note on the
863                        // `sent_end` field.
864                        imp.status.lock().expect("lock poisoned").sent_end = true;
865                    }
866                    self.state = Some(DataWriterState::Closed);
867                } else {
868                    self.state = Some(DataWriterState::Ready(imp));
869                }
870                Poll::Ready(Ok(()))
871            }
872            Poll::Pending => {
873                self.state = Some(DataWriterState::Flushing(future));
874                Poll::Pending
875            }
876        }
877    }
878}
879
880impl AsyncWrite for DataWriterInner {
881    fn poll_write(
882        mut self: Pin<&mut Self>,
883        cx: &mut Context<'_>,
884        buf: &[u8],
885    ) -> Poll<IoResult<usize>> {
886        if buf.is_empty() {
887            return Poll::Ready(Ok(0));
888        }
889
890        let state = self.state.take().expect("Missing state in DataWriter");
891
892        let mut future = match state {
893            DataWriterState::Ready(mut imp) => {
894                let n_queued = imp.queue_bytes(buf);
895                if n_queued != 0 {
896                    self.state = Some(DataWriterState::Ready(imp));
897                    return Poll::Ready(Ok(n_queued));
898                }
899                // we couldn't queue anything, so the current cell must be full.
900                Box::pin(imp.flush_buf())
901            }
902            DataWriterState::Flushing(fut) => fut,
903            DataWriterState::Closed => {
904                self.state = Some(DataWriterState::Closed);
905                return Poll::Ready(Err(Error::NotConnected.into()));
906            }
907        };
908
909        match future.as_mut().poll(cx) {
910            Poll::Ready((_imp, Err(e))) => {
911                #[cfg(feature = "stream-ctrl")]
912                {
913                    _imp.status.lock().expect("lock poisoned").record_error(&e);
914                }
915                self.state = Some(DataWriterState::Closed);
916                Poll::Ready(Err(e.into()))
917            }
918            Poll::Ready((mut imp, Ok(()))) => {
919                // Great!  We're done flushing.  Queue as much as we can of this
920                // cell.
921                let n_queued = imp.queue_bytes(buf);
922                self.state = Some(DataWriterState::Ready(imp));
923                Poll::Ready(Ok(n_queued))
924            }
925            Poll::Pending => {
926                self.state = Some(DataWriterState::Flushing(future));
927                Poll::Pending
928            }
929        }
930    }
931
932    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
933        self.poll_flush_impl(cx, false)
934    }
935
936    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
937        self.poll_flush_impl(cx, true)
938    }
939}
940
941#[cfg(feature = "tokio")]
942impl TokioAsyncWrite for DataWriterInner {
943    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
944        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
945    }
946
947    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
948        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
949    }
950
951    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
952        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
953    }
954}
955
956impl DataWriterImpl {
957    /// Try to flush the current buffer contents as a data cell.
958    async fn flush_buf(mut self) -> (Self, Result<()>) {
959        let result = if let Some((cell, remainder)) =
960            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
961        {
962            // TODO: Eventually we may want a larger buffer; if we do,
963            // this invariant will become false.
964            assert!(remainder.is_empty());
965            self.n_pending = 0;
966            self.s.send(cell.into()).await
967        } else {
968            Ok(())
969        };
970
971        (self, result)
972    }
973
974    /// Add as many bytes as possible from `b` to our internal buffer;
975    /// return the number we were able to add.
976    fn queue_bytes(&mut self, b: &[u8]) -> usize {
977        let empty_space = &mut self.buf[self.n_pending..];
978        if empty_space.is_empty() {
979            // that is, len == 0
980            return 0;
981        }
982
983        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
984        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
985        self.n_pending += n_to_copy;
986        n_to_copy
987    }
988}
989
990impl DataReaderInner {
991    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
992    /// interact with this stream without holding the stream itself.
993    #[cfg(feature = "stream-ctrl")]
994    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
995        &self.ctrl
996    }
997}
998
999/// An enumeration for the state of a [`DataReaderInner`].
1000// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
1001// future. There are a few ways we could simplify this. See:
1002// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
1003#[derive(Educe)]
1004#[educe(Debug)]
1005// We allow this since it's expected that streams will spend most of their time in the `Open` state,
1006// and will be cleaned up shortly after closing.
1007#[allow(clippy::large_enum_variant)]
1008enum DataReaderState {
1009    /// In this state we have received an end cell or an error.
1010    Closed,
1011    /// In this state the reader is open.
1012    Open(DataReaderImpl),
1013}
1014
1015/// Wrapper for the read part of a [`DataStream`].
1016#[derive(Educe)]
1017#[educe(Debug)]
1018#[pin_project]
1019struct DataReaderImpl {
1020    /// The underlying StreamReceiver object.
1021    #[educe(Debug(method = "skip_fmt"))]
1022    #[pin]
1023    s: StreamReceiver,
1024
1025    /// If present, data that we received on this stream but have not
1026    /// been able to send to the caller yet.
1027    // TODO: This data structure is probably not what we want, but
1028    // it's good enough for now.
1029    #[educe(Debug(method = "skip_fmt"))]
1030    pending: Vec<u8>,
1031
1032    /// Index into pending to show what we've already read.
1033    offset: usize,
1034
1035    /// If true, we have received a CONNECTED cell on this stream.
1036    connected: bool,
1037
1038    /// Shared user-visible information about the state of this stream.
1039    #[cfg(feature = "stream-ctrl")]
1040    status: Arc<Mutex<DataStreamStatus>>,
1041}
1042
1043impl AsyncRead for DataReaderInner {
1044    fn poll_read(
1045        mut self: Pin<&mut Self>,
1046        cx: &mut Context<'_>,
1047        buf: &mut [u8],
1048    ) -> Poll<IoResult<usize>> {
1049        // We're pulling the state object out of the reader.  We MUST
1050        // put it back before this function returns.
1051        let mut state = self.state.take().expect("Missing state in DataReaderInner");
1052
1053        loop {
1054            let mut imp = match state {
1055                DataReaderState::Open(mut imp) => {
1056                    // There may be data to read already.
1057                    let n_copied = imp.extract_bytes(buf);
1058                    if n_copied != 0 || buf.is_empty() {
1059                        // We read data into the buffer, or the buffer was 0-len to begin with.
1060                        // Tell the caller.
1061                        self.state = Some(DataReaderState::Open(imp));
1062                        return Poll::Ready(Ok(n_copied));
1063                    }
1064
1065                    // No data available!  We have to try reading.
1066                    imp
1067                }
1068                DataReaderState::Closed => {
1069                    // TODO: Why are we returning an error rather than continuing to return EOF?
1070                    self.state = Some(DataReaderState::Closed);
1071                    return Poll::Ready(Err(Error::NotConnected.into()));
1072                }
1073            };
1074
1075            // See if a cell is ready.
1076            match Pin::new(&mut imp).read_cell(cx) {
1077                Poll::Ready(Err(e)) => {
1078                    // There aren't any survivable errors in the current
1079                    // design.
1080                    self.state = Some(DataReaderState::Closed);
1081                    #[cfg(feature = "stream-ctrl")]
1082                    {
1083                        imp.status.lock().expect("lock poisoned").record_error(&e);
1084                    }
1085                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
1086                        Ok(0)
1087                    } else {
1088                        Err(e.into())
1089                    };
1090                    return Poll::Ready(result);
1091                }
1092                Poll::Ready(Ok(())) => {
1093                    // It read a cell!  Continue the loop.
1094                    state = DataReaderState::Open(imp);
1095                }
1096                Poll::Pending => {
1097                    // No cells ready, so tell the
1098                    // caller to get back to us later.
1099                    self.state = Some(DataReaderState::Open(imp));
1100                    return Poll::Pending;
1101                }
1102            }
1103        }
1104    }
1105}
1106
1107#[cfg(feature = "tokio")]
1108impl TokioAsyncRead for DataReaderInner {
1109    fn poll_read(
1110        self: Pin<&mut Self>,
1111        cx: &mut Context<'_>,
1112        buf: &mut ReadBuf<'_>,
1113    ) -> Poll<IoResult<()>> {
1114        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
1115    }
1116}
1117
1118impl DataReaderImpl {
1119    /// Pull as many bytes as we can off of self.pending, and return that
1120    /// number of bytes.
1121    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
1122        let remainder = &self.pending[self.offset..];
1123        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
1124        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
1125        self.offset += n_to_copy;
1126
1127        n_to_copy
1128    }
1129
1130    /// Return true iff there are no buffered bytes here to yield
1131    fn buf_is_empty(&self) -> bool {
1132        self.pending.len() == self.offset
1133    }
1134
1135    /// Load self.pending with the contents of a new data cell.
1136    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1137        use ClientDataStreamMsg::*;
1138        let msg = match self.as_mut().project().s.poll_next(cx) {
1139            Poll::Pending => return Poll::Pending,
1140            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
1141                Ok(cell) => cell.into_msg(),
1142                Err(e) => {
1143                    self.s.protocol_error();
1144                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
1145                }
1146            },
1147            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
1148            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
1149            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
1150            // terminated, we should be returning `None` here and not considering it as an error.
1151            // The `StreamReceiver` will have already returned an error if the cell stream was
1152            // terminated without an END message.
1153            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
1154        };
1155
1156        let result = match msg {
1157            Connected(_) if !self.connected => {
1158                self.connected = true;
1159                #[cfg(feature = "stream-ctrl")]
1160                {
1161                    self.status
1162                        .lock()
1163                        .expect("poisoned lock")
1164                        .record_connected();
1165                }
1166                Ok(())
1167            }
1168            Connected(_) => {
1169                self.s.protocol_error();
1170                Err(Error::StreamProto(
1171                    "Received a second connect cell on a data stream".to_string(),
1172                ))
1173            }
1174            Data(d) if self.connected => {
1175                self.add_data(d.into());
1176                Ok(())
1177            }
1178            Data(_) => {
1179                self.s.protocol_error();
1180                Err(Error::StreamProto(
1181                    "Received a data cell an unconnected stream".to_string(),
1182                ))
1183            }
1184            End(e) => Err(Error::EndReceived(e.reason())),
1185        };
1186
1187        Poll::Ready(result)
1188    }
1189
1190    /// Add the data from `d` to the end of our pending bytes.
1191    fn add_data(&mut self, mut d: Vec<u8>) {
1192        if self.buf_is_empty() {
1193            // No data pending?  Just take d as the new pending.
1194            self.pending = d;
1195            self.offset = 0;
1196        } else {
1197            // TODO(nickm) This has potential to grow `pending` without bound.
1198            // Fortunately, we don't currently read cells or call this
1199            // `add_data` method when pending is nonempty—but if we do in the
1200            // future, we'll have to be careful here.
1201            self.pending.append(&mut d);
1202        }
1203    }
1204}
1205
1206/// A `CmdChecker` that enforces invariants for outbound data streams.
1207#[derive(Debug)]
1208pub(crate) struct OutboundDataCmdChecker {
1209    /// True if we are expecting to receive a CONNECTED message on this stream.
1210    expecting_connected: bool,
1211}
1212
1213impl Default for OutboundDataCmdChecker {
1214    fn default() -> Self {
1215        Self {
1216            expecting_connected: true,
1217        }
1218    }
1219}
1220
1221impl CmdChecker for OutboundDataCmdChecker {
1222    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
1223        use StreamStatus::*;
1224        match msg.cmd() {
1225            RelayCmd::CONNECTED => {
1226                if !self.expecting_connected {
1227                    Err(Error::StreamProto(
1228                        "Received CONNECTED twice on a stream.".into(),
1229                    ))
1230                } else {
1231                    self.expecting_connected = false;
1232                    Ok(Open)
1233                }
1234            }
1235            RelayCmd::DATA => {
1236                if !self.expecting_connected {
1237                    Ok(Open)
1238                } else {
1239                    Err(Error::StreamProto(
1240                        "Received DATA before CONNECTED on a stream".into(),
1241                    ))
1242                }
1243            }
1244            RelayCmd::END => Ok(Closed),
1245            _ => Err(Error::StreamProto(format!(
1246                "Unexpected {} on a data stream!",
1247                msg.cmd()
1248            ))),
1249        }
1250    }
1251
1252    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1253        let _ = msg
1254            .decode::<ClientDataStreamMsg>()
1255            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1256        Ok(())
1257    }
1258}
1259
1260impl OutboundDataCmdChecker {
1261    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
1262    /// constructed connection.
1263    pub(crate) fn new_any() -> AnyCmdChecker {
1264        Box::<Self>::default()
1265    }
1266}