tor_proto/conflux/msghandler.rs
1//! A conflux-aware message handler.
2
3use std::cmp::Ordering;
4use std::sync::Arc;
5use std::sync::atomic::{self, AtomicU64};
6use std::time::{Duration, SystemTime};
7
8use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, StreamId, UnparsedRelayMsg};
9use tor_error::{Bug, internal};
10
11use crate::Error;
12use crate::crypto::cell::HopNum;
13
14/// Cell handler for conflux cells.
15///
16/// One per Circuit.
17//
18// Note: this is not a `MetaCellHandler` because we need a slightly different API here.
19// Perhaps we should redesign `MetaCellHandler` API to make it work for this too?
20pub(crate) struct ConfluxMsgHandler {
21 /// Inner message handler
22 ///
23 /// Customizes the cell handling logic,
24 /// because clients and exits behave differently.
25 ///
26 /// TODO: can/should we avoid dynamic dispatch here?
27 handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
28 /// The absolute sequence number of the last message delivered to a stream.
29 ///
30 /// This is shared by all the circuits in a conflux set.
31 last_seq_delivered: Arc<AtomicU64>,
32}
33
34impl ConfluxMsgHandler {
35 /// Create a new message handler using the specified [`AbstractConfluxMsgHandler`].
36 ///
37 /// Clients and relays both use this function.
38 ///
39 // TODO(relay): exits will need to implement their own AbstractConfluxMsgHandler
40 pub(crate) fn new(
41 handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
42 last_seq_delivered: Arc<AtomicU64>,
43 ) -> Self {
44 Self {
45 handler,
46 last_seq_delivered,
47 }
48 }
49
50 /// Validate the specified source hop of a conflux cell.
51 ///
52 /// The client impl of this function returns an error if the hop is not the last hop.
53 ///
54 /// The exit impl of this function returns an error if the source hop is not the last hop,
55 /// or if there are further hops after the source hop.
56 fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
57 self.handler.validate_source_hop(msg, hop)
58 }
59
60 /// Handle the specified conflux `msg`.
61 pub(crate) fn handle_conflux_msg(
62 &mut self,
63 msg: UnparsedRelayMsg,
64 hop: HopNum,
65 ) -> Option<ConfluxCmd> {
66 let res = (|| {
67 // Ensure the conflux cell came from the expected hop
68 // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329).
69 let () = self.validate_source_hop(&msg, hop)?;
70 self.handler.handle_msg(msg, hop)
71 })();
72
73 // Returning an error here would cause the reactor to shut down,
74 // so we return a ConfluxCmd::RemoveLeg command instead.
75 //
76 // After removing the leg, the reactor will decide whether it needs
77 // to shut down or not.
78 match res {
79 Ok(cmd) => cmd,
80 Err(e) => {
81 // Tell the reactor to remove this leg from the conflux set,
82 // and to notify the handshake initiator of the error
83 Some(ConfluxCmd::RemoveLeg(RemoveLegReason::ConfluxHandshakeErr(
84 e,
85 )))
86 }
87 }
88 }
89
90 /// Client-only: note that the LINK cell was sent.
91 ///
92 /// Used for the initial RTT measurement.
93 pub(crate) fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
94 self.handler.note_link_sent(ts)
95 }
96
97 /// Get the wallclock time when the handshake on this circuit is supposed to time out.
98 ///
99 /// Returns `None` if the handshake is not currently in progress.
100 pub(crate) fn handshake_timeout(&self) -> Option<SystemTime> {
101 self.handler.handshake_timeout()
102 }
103
104 /// Returns the conflux status of this handler.
105 pub(crate) fn status(&self) -> ConfluxStatus {
106 self.handler.status()
107 }
108
109 /// Check our sequence numbers to see if the current msg is in order.
110 ///
111 /// Returns an internal error if the relative seqno is lower than the absolute seqno.
112 fn is_msg_in_order(&self) -> Result<bool, Bug> {
113 let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
114 match self.handler.last_seq_recv().cmp(&(last_seq_delivered + 1)) {
115 Ordering::Less => {
116 // Our internal accounting is busted!
117 Err(internal!(
118 "Got a conflux cell with a sequence number less than the last delivered"
119 ))
120 }
121 Ordering::Equal => Ok(true),
122 Ordering::Greater => Ok(false),
123 }
124 }
125
126 /// Return a [`OooRelayMsg`] for the reactor to buffer.
127 fn prepare_ooo_entry(
128 &self,
129 hopnum: HopNum,
130 cell_counts_towards_windows: bool,
131 streamid: StreamId,
132 msg: UnparsedRelayMsg,
133 ) -> OooRelayMsg {
134 OooRelayMsg {
135 seqno: self.handler.last_seq_recv(),
136 hopnum,
137 cell_counts_towards_windows,
138 streamid,
139 msg,
140 }
141 }
142
143 /// Check the sequence number of the specified `msg`,
144 /// and decide whether it should be delivered or buffered.
145 #[cfg(feature = "conflux")]
146 pub(crate) fn action_for_msg(
147 &mut self,
148 hopnum: HopNum,
149 cell_counts_towards_windows: bool,
150 streamid: StreamId,
151 msg: UnparsedRelayMsg,
152 ) -> Result<ConfluxAction, Bug> {
153 if !super::cmd_counts_towards_seqno(msg.cmd()) {
154 return Ok(ConfluxAction::Deliver(msg));
155 }
156
157 // Increment the relative seqno on this leg.
158 self.handler.inc_last_seq_recv();
159
160 let action = if self.is_msg_in_order()? {
161 ConfluxAction::Deliver(msg)
162 } else {
163 ConfluxAction::Enqueue(self.prepare_ooo_entry(
164 hopnum,
165 cell_counts_towards_windows,
166 streamid,
167 msg,
168 ))
169 };
170
171 Ok(action)
172 }
173
174 /// Increment the absolute "delivered" seqno for this conflux set
175 /// if the specified message counts towards sequence numbers.
176 pub(crate) fn inc_last_seq_delivered(&self, msg: &UnparsedRelayMsg) {
177 if super::cmd_counts_towards_seqno(msg.cmd()) {
178 self.last_seq_delivered
179 .fetch_add(1, atomic::Ordering::AcqRel);
180 }
181 }
182
183 /// Returns the initial RTT measured by this handler.
184 pub(crate) fn init_rtt(&self) -> Option<Duration> {
185 self.handler.init_rtt()
186 }
187
188 /// Return the sequence number of the last message sent on this leg.
189 pub(crate) fn last_seq_sent(&self) -> u64 {
190 self.handler.last_seq_sent()
191 }
192
193 /// Set the sequence number of the last message sent on this leg.
194 pub(crate) fn set_last_seq_sent(&mut self, n: u64) {
195 self.handler.set_last_seq_sent(n);
196 }
197
198 /// Return the sequence number of the last message received on this leg.
199 pub(crate) fn last_seq_recv(&self) -> u64 {
200 self.handler.last_seq_recv()
201 }
202
203 /// Note that a cell has been sent.
204 ///
205 /// Updates the internal sequence numbers.
206 pub(crate) fn note_cell_sent(&mut self, cmd: RelayCmd) {
207 if super::cmd_counts_towards_seqno(cmd) {
208 self.handler.inc_last_seq_sent();
209 }
210 }
211}
212
213/// An action to take after processing a potentially out of order message.
214#[derive(Debug)]
215#[cfg(feature = "conflux")]
216pub(crate) enum ConfluxAction {
217 /// Deliver the message to its corresponding stream
218 Deliver(UnparsedRelayMsg),
219 /// Enqueue the specified entry in the out-of-order queue.
220 Enqueue(OooRelayMsg),
221}
222
223/// An object that can process conflux relay messages
224/// and manage the conflux state of a circuit.
225///
226/// This is indirectly used by the circuit reactor (via `ConfluxSet`)
227/// for conflux handling.
228pub(crate) trait AbstractConfluxMsgHandler {
229 /// Validate the specified source hop of a conflux cell.
230 fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()>;
231
232 /// Handle the specified conflux `msg`.
233 fn handle_msg(
234 &mut self,
235 msg: UnparsedRelayMsg,
236 hop: HopNum,
237 ) -> crate::Result<Option<ConfluxCmd>>;
238
239 /// Returns the conflux status of this handler.
240 fn status(&self) -> ConfluxStatus;
241
242 /// Client-only: note that the LINK cell was sent.
243 fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug>;
244
245 /// Get the wallclock time when the handshake on this circuit is supposed to time out.
246 ///
247 /// Returns `None` if the handshake is not currently in progress.
248 fn handshake_timeout(&self) -> Option<SystemTime>;
249
250 /// Returns the initial RTT measured by this handler.
251 fn init_rtt(&self) -> Option<Duration>;
252
253 /// Return the sequence number of the last message received on this leg.
254 fn last_seq_recv(&self) -> u64;
255
256 /// Return the sequence number of the last message sent on this leg.
257 fn last_seq_sent(&self) -> u64;
258
259 /// Set the sequence number of the last message sent on this leg.
260 fn set_last_seq_sent(&mut self, n: u64);
261
262 /// Increment the sequence number of the last message received on this leg.
263 fn inc_last_seq_recv(&mut self);
264
265 /// Increment the sequence number of the last message sent on this leg.
266 fn inc_last_seq_sent(&mut self);
267}
268
269/// An out-of-order message.
270#[derive(Debug)]
271pub(crate) struct OooRelayMsg {
272 /// The sequence number of the message.
273 pub(crate) seqno: u64,
274 /// The hop this message originated from.
275 pub(crate) hopnum: HopNum,
276 /// Whether the cell this message originated from counts towards
277 /// the stream-level SENDME window.
278 ///
279 /// See "SENDME window accounting" in prop340.
280 pub(crate) cell_counts_towards_windows: bool,
281 /// The stream ID of this message.
282 pub(crate) streamid: StreamId,
283 /// The actual message.
284 pub(crate) msg: UnparsedRelayMsg,
285}
286
287impl Ord for OooRelayMsg {
288 fn cmp(&self, other: &Self) -> Ordering {
289 self.seqno.cmp(&other.seqno).reverse()
290 }
291}
292
293impl PartialOrd for OooRelayMsg {
294 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
295 Some(self.cmp(other))
296 }
297}
298
299impl PartialEq for OooRelayMsg {
300 fn eq(&self, other: &Self) -> bool {
301 self.seqno == other.seqno
302 }
303}
304
305impl Eq for OooRelayMsg {}
306
307/// The outcome of [`AbstractConfluxMsgHandler::handle_msg`].
308#[derive(Debug)]
309pub(crate) enum ConfluxCmd {
310 /// Remove this circuit from the conflux set.
311 ///
312 /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
313 /// (originating from wrong hop), and for messages that are rejected
314 /// by its inner `AbstractMsgHandler`.
315 RemoveLeg(RemoveLegReason),
316
317 /// This circuit has completed the conflux handshake,
318 /// and wants to send the specified cell.
319 ///
320 /// Returned by an `AbstractMsgHandler` to signal to the reactor that
321 /// the conflux handshake is complete.
322 HandshakeComplete {
323 /// The hop number.
324 hop: HopNum,
325 /// Whether to use a RELAY_EARLY cell.
326 early: bool,
327 /// The cell to send.
328 cell: AnyRelayMsgOuter,
329 },
330}
331
332/// The reason for removing a circuit leg from the conflux set.
333#[derive(Debug, derive_more::Display)]
334pub(crate) enum RemoveLegReason {
335 /// The conflux handshake timed out.
336 ///
337 /// On the client-side, this means we didn't receive
338 /// the CONFLUX_LINKED response in time.
339 #[display("conflux handshake timed out")]
340 ConfluxHandshakeTimeout,
341 /// An error occurred during conflux handshake.
342 #[display("{}", _0)]
343 ConfluxHandshakeErr(Error),
344 /// The channel was closed.
345 #[display("channel closed")]
346 ChannelClosed,
347}
348
349/// The conflux status of a conflux circuit.
350#[derive(Copy, Clone, Debug, PartialEq, Eq)]
351pub(crate) enum ConfluxStatus {
352 /// Circuit has not begun the conflux handshake yet.
353 Unlinked,
354 /// Conflux handshake is in progress.
355 Pending,
356 /// A linked conflux circuit.
357 Linked,
358}