Skip to main content

tor_proto/conflux/
msghandler.rs

1//! A conflux-aware message handler.
2
3use std::cmp::Ordering;
4use std::sync::Arc;
5use std::sync::atomic::{self, AtomicU64};
6use std::time::{Duration, SystemTime};
7
8use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, StreamId, UnparsedRelayMsg};
9use tor_error::{Bug, internal};
10
11use crate::Error;
12use crate::crypto::cell::HopNum;
13
14/// Cell handler for conflux cells.
15///
16/// One per Circuit.
17//
18// Note: this is not a `MetaCellHandler` because we need a slightly different API here.
19// Perhaps we should redesign `MetaCellHandler` API to make it work for this too?
20pub(crate) struct ConfluxMsgHandler {
21    /// Inner message handler
22    ///
23    /// Customizes the cell handling logic,
24    /// because clients and exits behave differently.
25    ///
26    /// TODO: can/should we avoid dynamic dispatch here?
27    handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
28    /// The absolute sequence number of the last message delivered to a stream.
29    ///
30    /// This is shared by all the circuits in a conflux set.
31    last_seq_delivered: Arc<AtomicU64>,
32}
33
34impl ConfluxMsgHandler {
35    /// Create a new message handler using the specified [`AbstractConfluxMsgHandler`].
36    ///
37    /// Clients and relays both use this function.
38    ///
39    // TODO(relay): exits will need to implement their own AbstractConfluxMsgHandler
40    pub(crate) fn new(
41        handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
42        last_seq_delivered: Arc<AtomicU64>,
43    ) -> Self {
44        Self {
45            handler,
46            last_seq_delivered,
47        }
48    }
49
50    /// Validate the specified source hop of a conflux cell.
51    ///
52    /// The client impl of this function returns an error if the hop is not the last hop.
53    ///
54    /// The exit impl of this function returns an error if the source hop is not the last hop,
55    /// or if there are further hops after the source hop.
56    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
57        self.handler.validate_source_hop(msg, hop)
58    }
59
60    /// Handle the specified conflux `msg`.
61    pub(crate) fn handle_conflux_msg(
62        &mut self,
63        msg: UnparsedRelayMsg,
64        hop: HopNum,
65    ) -> Option<ConfluxCmd> {
66        let res = (|| {
67            // Ensure the conflux cell came from the expected hop
68            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329).
69            let () = self.validate_source_hop(&msg, hop)?;
70            self.handler.handle_msg(msg, hop)
71        })();
72
73        // Returning an error here would cause the reactor to shut down,
74        // so we return a ConfluxCmd::RemoveLeg command instead.
75        //
76        // After removing the leg, the reactor will decide whether it needs
77        // to shut down or not.
78        match res {
79            Ok(cmd) => cmd,
80            Err(e) => {
81                // Tell the reactor to remove this leg from the conflux set,
82                // and to notify the handshake initiator of the error
83                Some(ConfluxCmd::RemoveLeg(RemoveLegReason::ConfluxHandshakeErr(
84                    e,
85                )))
86            }
87        }
88    }
89
90    /// Client-only: note that the LINK cell was sent.
91    ///
92    /// Used for the initial RTT measurement.
93    pub(crate) fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
94        self.handler.note_link_sent(ts)
95    }
96
97    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
98    ///
99    /// Returns `None` if the handshake is not currently in progress.
100    pub(crate) fn handshake_timeout(&self) -> Option<SystemTime> {
101        self.handler.handshake_timeout()
102    }
103
104    /// Returns the conflux status of this handler.
105    pub(crate) fn status(&self) -> ConfluxStatus {
106        self.handler.status()
107    }
108
109    /// Check our sequence numbers to see if the current msg is in order.
110    ///
111    /// Returns an internal error if the relative seqno is lower than the absolute seqno.
112    fn is_msg_in_order(&self) -> Result<bool, Bug> {
113        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
114        match self.handler.last_seq_recv().cmp(&(last_seq_delivered + 1)) {
115            Ordering::Less => {
116                // Our internal accounting is busted!
117                Err(internal!(
118                    "Got a conflux cell with a sequence number less than the last delivered"
119                ))
120            }
121            Ordering::Equal => Ok(true),
122            Ordering::Greater => Ok(false),
123        }
124    }
125
126    /// Return a [`OooRelayMsg`] for the reactor to buffer.
127    fn prepare_ooo_entry(
128        &self,
129        hopnum: HopNum,
130        cell_counts_towards_windows: bool,
131        streamid: StreamId,
132        msg: UnparsedRelayMsg,
133    ) -> OooRelayMsg {
134        OooRelayMsg {
135            seqno: self.handler.last_seq_recv(),
136            hopnum,
137            cell_counts_towards_windows,
138            streamid,
139            msg,
140        }
141    }
142
143    /// Check the sequence number of the specified `msg`,
144    /// and decide whether it should be delivered or buffered.
145    #[cfg(feature = "conflux")]
146    pub(crate) fn action_for_msg(
147        &mut self,
148        hopnum: HopNum,
149        cell_counts_towards_windows: bool,
150        streamid: StreamId,
151        msg: UnparsedRelayMsg,
152    ) -> Result<ConfluxAction, Bug> {
153        if !super::cmd_counts_towards_seqno(msg.cmd()) {
154            return Ok(ConfluxAction::Deliver(msg));
155        }
156
157        // Increment the relative seqno on this leg.
158        self.handler.inc_last_seq_recv();
159
160        let action = if self.is_msg_in_order()? {
161            ConfluxAction::Deliver(msg)
162        } else {
163            ConfluxAction::Enqueue(self.prepare_ooo_entry(
164                hopnum,
165                cell_counts_towards_windows,
166                streamid,
167                msg,
168            ))
169        };
170
171        Ok(action)
172    }
173
174    /// Increment the absolute "delivered" seqno for this conflux set
175    /// if the specified message counts towards sequence numbers.
176    pub(crate) fn inc_last_seq_delivered(&self, msg: &UnparsedRelayMsg) {
177        if super::cmd_counts_towards_seqno(msg.cmd()) {
178            self.last_seq_delivered
179                .fetch_add(1, atomic::Ordering::AcqRel);
180        }
181    }
182
183    /// Returns the initial RTT measured by this handler.
184    pub(crate) fn init_rtt(&self) -> Option<Duration> {
185        self.handler.init_rtt()
186    }
187
188    /// Return the sequence number of the last message sent on this leg.
189    pub(crate) fn last_seq_sent(&self) -> u64 {
190        self.handler.last_seq_sent()
191    }
192
193    /// Set the sequence number of the last message sent on this leg.
194    pub(crate) fn set_last_seq_sent(&mut self, n: u64) {
195        self.handler.set_last_seq_sent(n);
196    }
197
198    /// Return the sequence number of the last message received on this leg.
199    pub(crate) fn last_seq_recv(&self) -> u64 {
200        self.handler.last_seq_recv()
201    }
202
203    /// Note that a cell has been sent.
204    ///
205    /// Updates the internal sequence numbers.
206    pub(crate) fn note_cell_sent(&mut self, cmd: RelayCmd) {
207        if super::cmd_counts_towards_seqno(cmd) {
208            self.handler.inc_last_seq_sent();
209        }
210    }
211}
212
213/// An action to take after processing a potentially out of order message.
214#[derive(Debug)]
215#[cfg(feature = "conflux")]
216pub(crate) enum ConfluxAction {
217    /// Deliver the message to its corresponding stream
218    Deliver(UnparsedRelayMsg),
219    /// Enqueue the specified entry in the out-of-order queue.
220    Enqueue(OooRelayMsg),
221}
222
223/// An object that can process conflux relay messages
224/// and manage the conflux state of a circuit.
225///
226/// This is indirectly used by the circuit reactor (via `ConfluxSet`)
227/// for conflux handling.
228pub(crate) trait AbstractConfluxMsgHandler {
229    /// Validate the specified source hop of a conflux cell.
230    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()>;
231
232    /// Handle the specified conflux `msg`.
233    fn handle_msg(
234        &mut self,
235        msg: UnparsedRelayMsg,
236        hop: HopNum,
237    ) -> crate::Result<Option<ConfluxCmd>>;
238
239    /// Returns the conflux status of this handler.
240    fn status(&self) -> ConfluxStatus;
241
242    /// Client-only: note that the LINK cell was sent.
243    fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug>;
244
245    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
246    ///
247    /// Returns `None` if the handshake is not currently in progress.
248    fn handshake_timeout(&self) -> Option<SystemTime>;
249
250    /// Returns the initial RTT measured by this handler.
251    fn init_rtt(&self) -> Option<Duration>;
252
253    /// Return the sequence number of the last message received on this leg.
254    fn last_seq_recv(&self) -> u64;
255
256    /// Return the sequence number of the last message sent on this leg.
257    fn last_seq_sent(&self) -> u64;
258
259    /// Set the sequence number of the last message sent on this leg.
260    fn set_last_seq_sent(&mut self, n: u64);
261
262    /// Increment the sequence number of the last message received on this leg.
263    fn inc_last_seq_recv(&mut self);
264
265    /// Increment the sequence number of the last message sent on this leg.
266    fn inc_last_seq_sent(&mut self);
267}
268
269/// An out-of-order message.
270#[derive(Debug)]
271pub(crate) struct OooRelayMsg {
272    /// The sequence number of the message.
273    pub(crate) seqno: u64,
274    /// The hop this message originated from.
275    pub(crate) hopnum: HopNum,
276    /// Whether the cell this message originated from counts towards
277    /// the stream-level SENDME window.
278    ///
279    /// See "SENDME window accounting" in prop340.
280    pub(crate) cell_counts_towards_windows: bool,
281    /// The stream ID of this message.
282    pub(crate) streamid: StreamId,
283    /// The actual message.
284    pub(crate) msg: UnparsedRelayMsg,
285}
286
287impl Ord for OooRelayMsg {
288    fn cmp(&self, other: &Self) -> Ordering {
289        self.seqno.cmp(&other.seqno).reverse()
290    }
291}
292
293impl PartialOrd for OooRelayMsg {
294    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
295        Some(self.cmp(other))
296    }
297}
298
299impl PartialEq for OooRelayMsg {
300    fn eq(&self, other: &Self) -> bool {
301        self.seqno == other.seqno
302    }
303}
304
305impl Eq for OooRelayMsg {}
306
307/// The outcome of [`AbstractConfluxMsgHandler::handle_msg`].
308#[derive(Debug)]
309pub(crate) enum ConfluxCmd {
310    /// Remove this circuit from the conflux set.
311    ///
312    /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
313    /// (originating from wrong hop), and for messages that are rejected
314    /// by its inner `AbstractMsgHandler`.
315    RemoveLeg(RemoveLegReason),
316
317    /// This circuit has completed the conflux handshake,
318    /// and wants to send the specified cell.
319    ///
320    /// Returned by an `AbstractMsgHandler` to signal to the reactor that
321    /// the conflux handshake is complete.
322    HandshakeComplete {
323        /// The hop number.
324        hop: HopNum,
325        /// Whether to use a RELAY_EARLY cell.
326        early: bool,
327        /// The cell to send.
328        cell: AnyRelayMsgOuter,
329    },
330}
331
332/// The reason for removing a circuit leg from the conflux set.
333#[derive(Debug, derive_more::Display)]
334pub(crate) enum RemoveLegReason {
335    /// The conflux handshake timed out.
336    ///
337    /// On the client-side, this means we didn't receive
338    /// the CONFLUX_LINKED response in time.
339    #[display("conflux handshake timed out")]
340    ConfluxHandshakeTimeout,
341    /// An error occurred during conflux handshake.
342    #[display("{}", _0)]
343    ConfluxHandshakeErr(Error),
344    /// The channel was closed.
345    #[display("channel closed")]
346    ChannelClosed,
347}
348
349/// The conflux status of a conflux circuit.
350#[derive(Copy, Clone, Debug, PartialEq, Eq)]
351pub(crate) enum ConfluxStatus {
352    /// Circuit has not begun the conflux handshake yet.
353    Unlinked,
354    /// Conflux handshake is in progress.
355    Pending,
356    /// A linked conflux circuit.
357    Linked,
358}