Skip to main content

tor_proto/client/reactor/conflux/
msghandler.rs

1//! Client-side conflux message handling.
2
3use std::sync::Arc;
4use std::sync::atomic::{self, AtomicU64};
5use std::time::{Duration, SystemTime};
6
7use tor_cell::relaycell::conflux::V1Nonce;
8use tor_cell::relaycell::msg::{ConfluxLinked, ConfluxLinkedAck, ConfluxSwitch};
9use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, UnparsedRelayMsg};
10use tor_error::{Bug, internal, warn_report};
11use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
12
13use crate::Error;
14use crate::client::HopNum;
15use crate::client::reactor::circuit::unsupported_client_cell;
16use crate::congestion::params::CongestionWindowParams;
17
18use crate::conflux::msghandler::{AbstractConfluxMsgHandler, ConfluxCmd, ConfluxStatus};
19
20/// Client-side implementation of a conflux message handler.
21pub(crate) struct ClientConfluxMsgHandler {
22    /// The current state this leg is in.
23    state: ConfluxState,
24    /// The nonce associated with the circuits from this set.
25    nonce: V1Nonce,
26    /// The expected conflux join point.
27    join_point: HopNum,
28    //// The initial round-trip time measurement,
29    /// measured during the conflux handshake.
30    ///
31    /// On the client side, this is the RTT between
32    /// `RELAY_CONFLUX_LINK` and `RELAY_CONFLUX_LINKED`.
33    init_rtt: Option<Duration>,
34    /// The time when the handshake was initiated.
35    link_sent: Option<SystemTime>,
36    /// A handle to the time provider.
37    runtime: DynTimeProvider,
38    /// The sequence number of the last message received on this leg.
39    ///
40    /// This is a *relative* number.
41    ///
42    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::action_for_msg)
43    /// each time a cell that counts towards sequence numbers is received on this leg.
44    last_seq_recv: u64,
45    /// The sequence number of the last message sent on this leg.
46    ///
47    /// This is a *relative* number.
48    ///
49    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::note_cell_sent)
50    /// each time a cell that counts towards sequence numbers is sent on this leg.
51    last_seq_sent: u64,
52    /// The absolute sequence number of the last message delivered to a stream.
53    ///
54    /// This is shared by all the circuits in a conflux set,
55    ///
56    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::inc_last_seq_delivered)
57    /// upon delivering the cell to its corresponding stream.
58    last_seq_delivered: Arc<AtomicU64>,
59    /// Whether we have processed any SWITCH cells on the leg this handler is installed on.
60    have_seen_switch: bool,
61    /// The number of cells that count towards the conflux seqnos
62    /// received on this leg since the last SWITCH.
63    cells_since_switch: usize,
64    /// The congestion window parameters.
65    ///
66    /// Used for SWITCH cell validation.
67    cwnd_params: CongestionWindowParams,
68}
69
70/// The state of a client circuit from a conflux set.
71#[derive(Copy, Clone, Debug, PartialEq, Eq)]
72enum ConfluxState {
73    /// The circuit is not linked.
74    Unlinked,
75    /// The LINK cell was sent, awaiting response.
76    AwaitingLink(V1Nonce),
77    /// The circuit is linked.
78    Linked,
79}
80
81impl AbstractConfluxMsgHandler for ClientConfluxMsgHandler {
82    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
83        if hop != self.join_point {
84            return Err(Error::CircProto(format!(
85                "Received {} cell from unexpected hop {} on client conflux circuit",
86                msg.cmd(),
87                hop.display(),
88            )));
89        }
90
91        Ok(())
92    }
93
94    fn handle_msg(
95        &mut self,
96        msg: UnparsedRelayMsg,
97        hop: HopNum,
98    ) -> crate::Result<Option<ConfluxCmd>> {
99        match msg.cmd() {
100            RelayCmd::CONFLUX_LINK => self.handle_conflux_link(msg, hop),
101            RelayCmd::CONFLUX_LINKED => self.handle_conflux_linked(msg, hop),
102            RelayCmd::CONFLUX_LINKED_ACK => self.handle_conflux_linked_ack(msg, hop),
103            RelayCmd::CONFLUX_SWITCH => self.handle_conflux_switch(msg, hop),
104            _ => Err(internal!("received non-conflux cell in conflux handler?!").into()),
105        }
106    }
107
108    fn status(&self) -> ConfluxStatus {
109        match self.state {
110            ConfluxState::Unlinked => ConfluxStatus::Unlinked,
111            ConfluxState::AwaitingLink(_) => ConfluxStatus::Pending,
112            ConfluxState::Linked => ConfluxStatus::Linked,
113        }
114    }
115
116    fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
117        match self.state {
118            ConfluxState::Unlinked => {
119                self.state = ConfluxState::AwaitingLink(self.nonce);
120            }
121            ConfluxState::AwaitingLink(_) | ConfluxState::Linked => {
122                return Err(internal!("Sent duplicate LINK cell?!"));
123            }
124        }
125
126        self.link_sent = Some(ts);
127        Ok(())
128    }
129
130    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
131    ///
132    /// Returns `None` if the handshake is not currently in progress.
133    fn handshake_timeout(&self) -> Option<SystemTime> {
134        /// The maximum amount of time to wait for the LINKED cell to arrive.
135        //
136        // TODO(conflux-tuning): we should pick a less arbitrary timeout
137        //
138        // "This timeout MUST be no larger than the normal SOCKS/stream timeout in
139        // use for RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead.
140        // (The C-Tor implementation currently uses Circuit Build Timeout)".
141        const LINK_TIMEOUT: Duration = Duration::from_secs(60);
142
143        if matches!(self.state, ConfluxState::AwaitingLink(_)) {
144            debug_assert!(
145                self.link_sent.is_some(),
146                "awaiting LINKED, but LINK not sent?!"
147            );
148            self.link_sent.map(|link_sent| link_sent + LINK_TIMEOUT)
149        } else {
150            None
151        }
152    }
153
154    /// Returns the initial RTT measured by this handler.
155    fn init_rtt(&self) -> Option<Duration> {
156        self.init_rtt
157    }
158
159    fn last_seq_recv(&self) -> u64 {
160        self.last_seq_recv
161    }
162
163    fn last_seq_sent(&self) -> u64 {
164        self.last_seq_sent
165    }
166
167    fn set_last_seq_sent(&mut self, n: u64) {
168        self.last_seq_sent = n;
169    }
170
171    fn inc_last_seq_recv(&mut self) {
172        self.last_seq_recv += 1;
173        self.cells_since_switch += 1;
174    }
175
176    fn inc_last_seq_sent(&mut self) {
177        self.last_seq_sent += 1;
178    }
179}
180
181impl ClientConfluxMsgHandler {
182    /// Create a new client conflux message handler.
183    pub(crate) fn new(
184        join_point: HopNum,
185        nonce: V1Nonce,
186        last_seq_delivered: Arc<AtomicU64>,
187        cwnd_params: CongestionWindowParams,
188        runtime: DynTimeProvider,
189    ) -> Self {
190        Self {
191            state: ConfluxState::Unlinked,
192            nonce,
193            last_seq_delivered,
194            join_point,
195            link_sent: None,
196            runtime,
197            init_rtt: None,
198            last_seq_recv: 0,
199            last_seq_sent: 0,
200            have_seen_switch: false,
201            cells_since_switch: 0,
202            cwnd_params,
203        }
204    }
205
206    /// Handle a conflux LINK request coming from the specified hop.
207    #[allow(clippy::needless_pass_by_value)]
208    fn handle_conflux_link(
209        &mut self,
210        msg: UnparsedRelayMsg,
211        hop: HopNum,
212    ) -> crate::Result<Option<ConfluxCmd>> {
213        unsupported_client_cell!(msg, hop)
214    }
215
216    /// Handle a conflux LINKED response coming from the specified `hop`.
217    ///
218    /// The caller must validate the `hop` the cell originated from.
219    ///
220    /// To prevent [DROPMARK] attacks, this returns a protocol error
221    /// if any of the following conditions are not met:
222    ///
223    ///   * this is an unlinked circuit that sent a LINK command
224    ///   * that the nonce matches the nonce used in the LINK command
225    ///
226    /// [DROPMARK]: https://www.petsymposium.org/2018/files/papers/issue2/popets-2018-0011.pdf
227    fn handle_conflux_linked(
228        &mut self,
229        msg: UnparsedRelayMsg,
230        hop: HopNum,
231    ) -> crate::Result<Option<ConfluxCmd>> {
232        // See [SIDE_CHANNELS] for rules for when to reject unexpected handshake cells.
233
234        let Some(link_sent) = self.link_sent else {
235            return Err(Error::CircProto(
236                "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
237            ));
238        };
239
240        let expected_nonce = match self.state {
241            ConfluxState::Unlinked => {
242                return Err(Error::CircProto(
243                    "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
244                ));
245            }
246            ConfluxState::AwaitingLink(expected_nonce) => expected_nonce,
247            ConfluxState::Linked => {
248                return Err(Error::CircProto(
249                    "Received CONFLUX_LINKED on already linked circuit".into(),
250                ));
251            }
252        };
253
254        let linked = msg
255            .decode::<ConfluxLinked>()
256            .map_err(|e| Error::from_bytes_err(e, "linked message"))?
257            .into_msg();
258
259        let linked_nonce = *linked.payload().nonce();
260
261        if expected_nonce == linked_nonce {
262            self.state = ConfluxState::Linked;
263        } else {
264            return Err(Error::CircProto(
265                "Received CONFLUX_LINKED cell with mismatched nonce".into(),
266            ));
267        }
268
269        let now = self.runtime.wallclock();
270        // Measure the initial RTT between the time we sent the LINK and received the LINKED
271        self.init_rtt = Some(now.duration_since(link_sent).unwrap_or_else(|e| {
272            warn_report!(e, "failed to calculate initial RTT for conflux circuit",);
273
274            // TODO(conflux): this is terrible, because SystemTime is not monotonic.
275            // Can we somehow use Instant instead of SystemTime?
276            // (DynTimeProvider doesn't have a way of sleeping until an Instant,
277            // it only has sleep_until_wallclock)
278            Duration::from_secs(u64::MAX)
279        }));
280
281        let linked_ack = ConfluxLinkedAck::default();
282        let cell = AnyRelayMsgOuter::new(None, linked_ack.into());
283
284        Ok(Some(ConfluxCmd::HandshakeComplete {
285            hop,
286            early: false,
287            cell,
288        }))
289    }
290
291    /// Handle a conflux LINKED acknowledgment coming from the specified hop.
292    #[allow(clippy::needless_pass_by_value)]
293    fn handle_conflux_linked_ack(
294        &mut self,
295        msg: UnparsedRelayMsg,
296        hop: HopNum,
297    ) -> crate::Result<Option<ConfluxCmd>> {
298        unsupported_client_cell!(msg, hop)
299    }
300
301    /// Handle a conflux SWITCH coming from the specified hop.
302    fn handle_conflux_switch(
303        &mut self,
304        msg: UnparsedRelayMsg,
305        _hop: HopNum,
306    ) -> crate::Result<Option<ConfluxCmd>> {
307        if self.state != ConfluxState::Linked {
308            return Err(Error::CircProto(
309                "Received CONFLUX_SWITCH on unlinked circuit?!".into(),
310            ));
311        }
312
313        if self.have_seen_switch && self.cells_since_switch == 0 {
314            return Err(Error::CircProto(
315                "Received consecutive SWITCH cells on circuit?!".into(),
316            ));
317        }
318
319        let switch = msg
320            .decode::<ConfluxSwitch>()
321            .map_err(|e| Error::from_bytes_err(e, "switch message"))?
322            .into_msg();
323
324        let rel_seqno = switch.seqno();
325
326        self.validate_switch_seqno(rel_seqno)?;
327
328        // Update the absolute sequence number on this leg by the delta.
329        // Since this cell is not multiplexed, we do not count it towards
330        // absolute sequence numbers. We only increment the sequence
331        // numbers for multiplexed cells. Hence there is no +1 here.
332        self.last_seq_recv += u64::from(rel_seqno);
333        // Note that we've received at least one SWITCH on this leg.
334        self.have_seen_switch = true;
335        // Reset our counter for the number of relevant (DATA, etc.) cells
336        // received since the last SWITCH
337        self.cells_since_switch = 0;
338
339        Ok(None)
340    }
341
342    /// Validate the relative sequence number specified in a switch command.
343    ///
344    /// Returns an error if
345    ///
346    ///   * `rel_seqno` is 0 (i.e. the SWITCH cell does not actually increment
347    ///     the `last_seq_recv` seqno on this leg)
348    ///   * the tunnel has not yet received any data and `rel_seqno` is greater
349    ///     than the initial congestion window,
350    fn validate_switch_seqno(&self, rel_seqno: u32) -> crate::Result<()> {
351        // The sequence number from the switch must be non-zero.
352        if rel_seqno == 0 {
353            return Err(Error::CircProto(
354                "Received SWITCH cell with seqno = 0".into(),
355            ));
356        }
357
358        let no_data = self.last_seq_delivered.load(atomic::Ordering::Acquire) == 0;
359        let is_first_switch = !self.have_seen_switch;
360
361        // If we haven't received any DATA cells on this tunnel,
362        // the seqno delta from the first SWITCH can't possibly
363        // exceed the initial congestion window.
364        if no_data && is_first_switch && rel_seqno > self.cwnd_params.cwnd_init() {
365            return Err(Error::CircProto(
366                "SWITCH cell seqno exceeds initial cwnd".into(),
367            ));
368        }
369
370        Ok(())
371    }
372}