1mod extend_handler;
4
5use extend_handler::ExtendRequestHandler;
6
7use crate::channel::{Channel, ChannelSender};
8use crate::circuit::CircuitRxReceiver;
9use crate::circuit::UniqId;
10use crate::circuit::reactor::ControlHandler;
11use crate::circuit::reactor::backward::BackwardReactorCmd;
12use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
13use crate::circuit::reactor::hop_mgr::HopMgr;
14use crate::crypto::cell::OutboundRelayLayer;
15use crate::crypto::cell::RelayCellBody;
16use crate::relay::RelayCircChanMsg;
17use crate::util::err::ReactorError;
18use crate::{Error, HopNum, Result};
19
20use crate::client::circuit::padding::QueuedCellPaddingInfo;
22
23use crate::relay::channel_provider::ChannelProvider;
24use crate::relay::reactor::CircuitAccount;
25use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
26use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
27use tor_cell::relaycell::msg::{Extended2, SendmeTag};
28use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
29use tor_error::internal;
30use tor_linkspec::OwnedChanTarget;
31use tor_rtcompat::Runtime;
32
33use futures::channel::mpsc;
34use futures::{SinkExt as _, future};
35use tracing::trace;
36
37use std::result::Result as StdResult;
38use std::sync::Arc;
39use std::task::Poll;
40
41type CtrlMsg = ();
43
44type CtrlCmd = ();
46
47const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
51
52pub(crate) struct Forward {
54 unique_id: UniqId,
56 outbound: Option<Outbound>,
62 crypto_out: Box<dyn OutboundRelayLayer + Send>,
64 relay_early_count: usize,
68 extend_handler: ExtendRequestHandler,
72}
73
74pub(crate) enum CircEvent {
76 ExtendResult(StdResult<ExtendResult, ReactorError>),
78}
79
80pub(crate) struct ExtendResult {
82 extended2: Extended2,
84 outbound: Outbound,
86 outbound_chan_rx: CircuitRxReceiver,
90}
91
92struct Outbound {
94 circ_id: CircId,
96 channel: Arc<Channel>,
98 outbound_chan_tx: ChannelSender,
100}
101
102enum CellDecodeResult {
104 Recognized(SendmeTag, RelayCellDecoderResult),
106 Unrecognizd(RelayCellBody),
108}
109
110impl Forward {
111 pub(crate) fn new(
113 inbound_chan: &Arc<Channel>,
114 unique_id: UniqId,
115 crypto_out: Box<dyn OutboundRelayLayer + Send>,
116 chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
117 event_tx: mpsc::Sender<CircEvent>,
118 memquota: CircuitAccount,
119 ) -> Self {
120 let inbound_peer = Arc::clone(inbound_chan.peer_info());
121 let extend_handler =
122 ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
123
124 Self {
125 unique_id,
126 outbound: None,
128 crypto_out,
129 relay_early_count: 0,
130 extend_handler,
131 }
132 }
133
134 fn decode_relay_cell<R: Runtime>(
136 &mut self,
137 hop_mgr: &mut HopMgr<R>,
138 cell: Relay,
139 ) -> Result<(Option<HopNum>, CellDecodeResult)> {
140 let hopnum = None;
142 let cmd = cell.cmd();
143 let mut body = cell.into_relay_body().into();
144 let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
145 return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
146 };
147
148 let mut hops = hop_mgr.hops().write().expect("poisoned lock");
150 let decode_res = hops
151 .get_mut(hopnum)
152 .ok_or_else(|| internal!("msg from non-existent hop???"))?
153 .inbound
154 .decode(body.into())?;
155
156 Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
157 }
158
159 #[allow(clippy::unnecessary_wraps)] fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
162 cfg_if::cfg_if! {
163 if #[cfg(feature = "circ-padding")] {
164 Err(internal!("relay circuit padding not yet supported").into())
165 } else {
166 Ok(())
167 }
168 }
169 }
170
171 fn handle_extend_result(
173 &mut self,
174 res: StdResult<ExtendResult, ReactorError>,
175 ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
176 let ExtendResult {
177 extended2,
178 outbound,
179 outbound_chan_rx,
180 } = res?;
181
182 self.outbound = Some(outbound);
183
184 Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
185 hop: None,
186 extended2,
187 outbound_chan_rx,
188 }))
189 }
190
191 fn handle_relay_cell<R: Runtime>(
193 &mut self,
194 hop_mgr: &mut HopMgr<R>,
195 cell: Relay,
196 early: bool,
197 ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
198 if early {
199 self.relay_early_count += 1;
200
201 if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
202 return Err(
203 Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
204 );
205 }
206 }
207
208 let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
209 let (tag, decode_res) = match res {
210 CellDecodeResult::Unrecognizd(body) => {
211 self.handle_unrecognized_cell(body, None, early)?;
212 return Ok(None);
213 }
214 CellDecodeResult::Recognized(tag, res) => (tag, res),
215 };
216
217 Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
218 cell: decode_res,
219 early,
220 hopnum,
221 tag,
222 }))
223 }
224
225 fn handle_unrecognized_cell(
227 &mut self,
228 body: RelayCellBody,
229 info: Option<QueuedCellPaddingInfo>,
230 early: bool,
231 ) -> StdResult<(), ReactorError> {
232 trace!(
236 circ_id = %self.unique_id,
237 "Forwarding unrecognized cell"
238 );
239
240 let Some(chan) = self.outbound.as_mut() else {
241 return Err(Error::CircProto(
244 "Asked to forward cell before the circuit was extended?!".into(),
245 )
246 .into());
247 };
248
249 let msg = Relay::from(BoxedCellBody::from(body));
250 let relay = if early {
251 AnyChanMsg::RelayEarly(msg.into())
252 } else {
253 AnyChanMsg::Relay(msg)
254 };
255 let cell = AnyChanCell::new(Some(chan.circ_id), relay);
256
257 chan.outbound_chan_tx.start_send_unpin((cell, info))?;
260
261 Ok(())
262 }
263
264 #[allow(clippy::unused_async)] async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
267 Err(internal!("TRUNCATE is not implemented").into())
273 }
274
275 #[allow(clippy::needless_pass_by_value)] fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
278 Err(internal!("DESTROY is not implemented").into())
279 }
280
281 #[allow(clippy::needless_pass_by_value)] fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
284 Err(internal!("PADDING_NEGOTIATE is not implemented").into())
285 }
286}
287
288impl ForwardHandler for Forward {
289 type BuildSpec = OwnedChanTarget;
290 type CircChanMsg = RelayCircChanMsg;
291 type CircEvent = CircEvent;
292
293 async fn handle_meta_msg<R: Runtime>(
294 &mut self,
295 runtime: &R,
296 early: bool,
297 _hopnum: Option<HopNum>,
298 msg: UnparsedRelayMsg,
299 _relay_cell_format: RelayCellFormat,
300 ) -> StdResult<(), ReactorError> {
301 match msg.cmd() {
302 RelayCmd::DROP => self.handle_drop(),
303 RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
304 RelayCmd::TRUNCATE => self.handle_truncate().await,
305 cmd => Err(internal!("relay cmd {cmd} not supported").into()),
306 }
307 }
308
309 async fn handle_forward_cell<R: Runtime>(
310 &mut self,
311 hop_mgr: &mut HopMgr<R>,
312 cell: RelayCircChanMsg,
313 ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
314 use RelayCircChanMsg::*;
315
316 match cell {
317 Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
318 RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
319 Destroy(d) => {
320 self.handle_destroy_cell(d)?;
321 Ok(None)
322 }
323 PaddingNegotiate(p) => {
324 self.handle_padding_negotiate(p)?;
325 Ok(None)
326 }
327 }
328 }
329
330 fn handle_event(
331 &mut self,
332 event: Self::CircEvent,
333 ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
334 match event {
335 CircEvent::ExtendResult(res) => self.handle_extend_result(res),
336 }
337 }
338
339 async fn outbound_chan_ready(&mut self) -> Result<()> {
340 future::poll_fn(|cx| match &mut self.outbound {
341 Some(chan) => {
342 let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
343
344 chan.outbound_chan_tx.poll_ready_unpin(cx)
345 }
346 None => {
347 Poll::Ready(Ok(()))
357 }
358 })
359 .await
360 }
361}
362
363impl ControlHandler for Forward {
364 type CtrlMsg = CtrlMsg;
365 type CtrlCmd = CtrlCmd;
366
367 fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
368 let () = cmd;
369 Ok(())
370 }
371
372 fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
373 let () = msg;
374 Ok(())
375 }
376}
377
378impl Drop for Forward {
379 fn drop(&mut self) {
380 if let Some(outbound) = self.outbound.as_mut() {
381 let _ = outbound.channel.close_circuit(outbound.circ_id);
383 }
384 }
385}