tor_proto/client/stream/
resolve.rs1use crate::client::stream::StreamReceiver;
4use crate::memquota::StreamAccount;
5use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
6use crate::{Error, Result};
7
8use futures::StreamExt;
9use tor_cell::relaycell::RelayCmd;
10use tor_cell::relaycell::msg::Resolved;
11use tor_cell::restricted_msg;
12
13pub struct ResolveStream {
16 s: StreamReceiver,
18
19 _memquota: StreamAccount,
23}
24
25restricted_msg! {
26 enum ResolveResponseMsg : RelayMsg {
28 End,
29 Resolved,
30 }
31}
32
33impl ResolveStream {
34 pub(crate) fn new(s: StreamReceiver, memquota: StreamAccount) -> Self {
38 ResolveStream {
39 s,
40 _memquota: memquota,
41 }
42 }
43
44 pub async fn read_msg(&mut self) -> Result<Resolved> {
47 use ResolveResponseMsg::*;
48 let cell = match self.s.next().await {
49 Some(cell) => cell?,
50 None => return Err(Error::NotConnected),
51 };
52 let msg = match cell.decode::<ResolveResponseMsg>() {
53 Ok(cell) => cell.into_msg(),
54 Err(e) => {
55 self.s.protocol_error();
56 return Err(Error::from_bytes_err(e, "response on a resolve stream"));
57 }
58 };
59 match msg {
60 End(e) => Err(Error::EndReceived(e.reason())),
61 Resolved(r) => Ok(r),
62 }
63 }
64}
65
66#[derive(Debug, Default)]
83pub(crate) struct ResolveCmdChecker {}
84
85impl CmdChecker for ResolveCmdChecker {
86 fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
87 use StreamStatus::Closed;
88 match msg.cmd() {
89 RelayCmd::RESOLVED => Ok(Closed),
90 RelayCmd::END => Ok(Closed),
91 _ => Err(Error::StreamProto(format!(
92 "Unexpected {} on resolve stream",
93 msg.cmd()
94 ))),
95 }
96 }
97
98 fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
99 let _ = msg
100 .decode::<ResolveResponseMsg>()
101 .map_err(|err| Error::from_bytes_err(err, "message on resolve stream."))?;
102 Ok(())
103 }
104}
105
106impl ResolveCmdChecker {
107 pub(crate) fn new_any() -> AnyCmdChecker {
110 Box::<Self>::default()
111 }
112}