tor_proto/client/reactor/conflux/
msghandler.rs1use std::sync::Arc;
4use std::sync::atomic::{self, AtomicU64};
5use std::time::{Duration, SystemTime};
6
7use tor_cell::relaycell::conflux::V1Nonce;
8use tor_cell::relaycell::msg::{ConfluxLinked, ConfluxLinkedAck, ConfluxSwitch};
9use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, UnparsedRelayMsg};
10use tor_error::{Bug, internal, warn_report};
11use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
12
13use crate::Error;
14use crate::client::HopNum;
15use crate::client::reactor::circuit::unsupported_client_cell;
16use crate::congestion::params::CongestionWindowParams;
17
18use crate::conflux::msghandler::{AbstractConfluxMsgHandler, ConfluxCmd, ConfluxStatus};
19
20pub(crate) struct ClientConfluxMsgHandler {
22 state: ConfluxState,
24 nonce: V1Nonce,
26 join_point: HopNum,
28 init_rtt: Option<Duration>,
34 link_sent: Option<SystemTime>,
36 runtime: DynTimeProvider,
38 last_seq_recv: u64,
45 last_seq_sent: u64,
52 last_seq_delivered: Arc<AtomicU64>,
59 have_seen_switch: bool,
61 cells_since_switch: usize,
64 cwnd_params: CongestionWindowParams,
68}
69
70#[derive(Copy, Clone, Debug, PartialEq, Eq)]
72enum ConfluxState {
73 Unlinked,
75 AwaitingLink(V1Nonce),
77 Linked,
79}
80
81impl AbstractConfluxMsgHandler for ClientConfluxMsgHandler {
82 fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
83 if hop != self.join_point {
84 return Err(Error::CircProto(format!(
85 "Received {} cell from unexpected hop {} on client conflux circuit",
86 msg.cmd(),
87 hop.display(),
88 )));
89 }
90
91 Ok(())
92 }
93
94 fn handle_msg(
95 &mut self,
96 msg: UnparsedRelayMsg,
97 hop: HopNum,
98 ) -> crate::Result<Option<ConfluxCmd>> {
99 match msg.cmd() {
100 RelayCmd::CONFLUX_LINK => self.handle_conflux_link(msg, hop),
101 RelayCmd::CONFLUX_LINKED => self.handle_conflux_linked(msg, hop),
102 RelayCmd::CONFLUX_LINKED_ACK => self.handle_conflux_linked_ack(msg, hop),
103 RelayCmd::CONFLUX_SWITCH => self.handle_conflux_switch(msg, hop),
104 _ => Err(internal!("received non-conflux cell in conflux handler?!").into()),
105 }
106 }
107
108 fn status(&self) -> ConfluxStatus {
109 match self.state {
110 ConfluxState::Unlinked => ConfluxStatus::Unlinked,
111 ConfluxState::AwaitingLink(_) => ConfluxStatus::Pending,
112 ConfluxState::Linked => ConfluxStatus::Linked,
113 }
114 }
115
116 fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
117 match self.state {
118 ConfluxState::Unlinked => {
119 self.state = ConfluxState::AwaitingLink(self.nonce);
120 }
121 ConfluxState::AwaitingLink(_) | ConfluxState::Linked => {
122 return Err(internal!("Sent duplicate LINK cell?!"));
123 }
124 }
125
126 self.link_sent = Some(ts);
127 Ok(())
128 }
129
130 fn handshake_timeout(&self) -> Option<SystemTime> {
134 const LINK_TIMEOUT: Duration = Duration::from_secs(60);
142
143 if matches!(self.state, ConfluxState::AwaitingLink(_)) {
144 debug_assert!(
145 self.link_sent.is_some(),
146 "awaiting LINKED, but LINK not sent?!"
147 );
148 self.link_sent.map(|link_sent| link_sent + LINK_TIMEOUT)
149 } else {
150 None
151 }
152 }
153
154 fn init_rtt(&self) -> Option<Duration> {
156 self.init_rtt
157 }
158
159 fn last_seq_recv(&self) -> u64 {
160 self.last_seq_recv
161 }
162
163 fn last_seq_sent(&self) -> u64 {
164 self.last_seq_sent
165 }
166
167 fn set_last_seq_sent(&mut self, n: u64) {
168 self.last_seq_sent = n;
169 }
170
171 fn inc_last_seq_recv(&mut self) {
172 self.last_seq_recv += 1;
173 self.cells_since_switch += 1;
174 }
175
176 fn inc_last_seq_sent(&mut self) {
177 self.last_seq_sent += 1;
178 }
179}
180
181impl ClientConfluxMsgHandler {
182 pub(crate) fn new(
184 join_point: HopNum,
185 nonce: V1Nonce,
186 last_seq_delivered: Arc<AtomicU64>,
187 cwnd_params: CongestionWindowParams,
188 runtime: DynTimeProvider,
189 ) -> Self {
190 Self {
191 state: ConfluxState::Unlinked,
192 nonce,
193 last_seq_delivered,
194 join_point,
195 link_sent: None,
196 runtime,
197 init_rtt: None,
198 last_seq_recv: 0,
199 last_seq_sent: 0,
200 have_seen_switch: false,
201 cells_since_switch: 0,
202 cwnd_params,
203 }
204 }
205
206 #[allow(clippy::needless_pass_by_value)]
208 fn handle_conflux_link(
209 &mut self,
210 msg: UnparsedRelayMsg,
211 hop: HopNum,
212 ) -> crate::Result<Option<ConfluxCmd>> {
213 unsupported_client_cell!(msg, hop)
214 }
215
216 fn handle_conflux_linked(
228 &mut self,
229 msg: UnparsedRelayMsg,
230 hop: HopNum,
231 ) -> crate::Result<Option<ConfluxCmd>> {
232 let Some(link_sent) = self.link_sent else {
235 return Err(Error::CircProto(
236 "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
237 ));
238 };
239
240 let expected_nonce = match self.state {
241 ConfluxState::Unlinked => {
242 return Err(Error::CircProto(
243 "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
244 ));
245 }
246 ConfluxState::AwaitingLink(expected_nonce) => expected_nonce,
247 ConfluxState::Linked => {
248 return Err(Error::CircProto(
249 "Received CONFLUX_LINKED on already linked circuit".into(),
250 ));
251 }
252 };
253
254 let linked = msg
255 .decode::<ConfluxLinked>()
256 .map_err(|e| Error::from_bytes_err(e, "linked message"))?
257 .into_msg();
258
259 let linked_nonce = *linked.payload().nonce();
260
261 if expected_nonce == linked_nonce {
262 self.state = ConfluxState::Linked;
263 } else {
264 return Err(Error::CircProto(
265 "Received CONFLUX_LINKED cell with mismatched nonce".into(),
266 ));
267 }
268
269 let now = self.runtime.wallclock();
270 self.init_rtt = Some(now.duration_since(link_sent).unwrap_or_else(|e| {
272 warn_report!(e, "failed to calculate initial RTT for conflux circuit",);
273
274 Duration::from_secs(u64::MAX)
279 }));
280
281 let linked_ack = ConfluxLinkedAck::default();
282 let cell = AnyRelayMsgOuter::new(None, linked_ack.into());
283
284 Ok(Some(ConfluxCmd::HandshakeComplete {
285 hop,
286 early: false,
287 cell,
288 }))
289 }
290
291 #[allow(clippy::needless_pass_by_value)]
293 fn handle_conflux_linked_ack(
294 &mut self,
295 msg: UnparsedRelayMsg,
296 hop: HopNum,
297 ) -> crate::Result<Option<ConfluxCmd>> {
298 unsupported_client_cell!(msg, hop)
299 }
300
301 fn handle_conflux_switch(
303 &mut self,
304 msg: UnparsedRelayMsg,
305 _hop: HopNum,
306 ) -> crate::Result<Option<ConfluxCmd>> {
307 if self.state != ConfluxState::Linked {
308 return Err(Error::CircProto(
309 "Received CONFLUX_SWITCH on unlinked circuit?!".into(),
310 ));
311 }
312
313 if self.have_seen_switch && self.cells_since_switch == 0 {
314 return Err(Error::CircProto(
315 "Received consecutive SWITCH cells on circuit?!".into(),
316 ));
317 }
318
319 let switch = msg
320 .decode::<ConfluxSwitch>()
321 .map_err(|e| Error::from_bytes_err(e, "switch message"))?
322 .into_msg();
323
324 let rel_seqno = switch.seqno();
325
326 self.validate_switch_seqno(rel_seqno)?;
327
328 self.last_seq_recv += u64::from(rel_seqno);
333 self.have_seen_switch = true;
335 self.cells_since_switch = 0;
338
339 Ok(None)
340 }
341
342 fn validate_switch_seqno(&self, rel_seqno: u32) -> crate::Result<()> {
351 if rel_seqno == 0 {
353 return Err(Error::CircProto(
354 "Received SWITCH cell with seqno = 0".into(),
355 ));
356 }
357
358 let no_data = self.last_seq_delivered.load(atomic::Ordering::Acquire) == 0;
359 let is_first_switch = !self.have_seen_switch;
360
361 if no_data && is_first_switch && rel_seqno > self.cwnd_params.cwnd_init() {
365 return Err(Error::CircProto(
366 "SWITCH cell seqno exceeds initial cwnd".into(),
367 ));
368 }
369
370 Ok(())
371 }
372}