1use arti_client::TorClient;
4use arti_rpcserver::RpcAuthentication;
5use derive_deftly::Deftly;
6use futures::stream::StreamExt as _;
7use std::sync::Arc;
8use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9use tor_rpc_connect::SuperuserPermission;
10use tor_rpcbase::{self as rpc};
11use tor_rtcompat::Runtime;
12
13use crate::{
14 proxy::port_info,
15 rpc::{listener::RpcConnInfo, superuser::RpcSuperuser},
16};
17
18use super::proxyinfo::{self, ProxyInfo};
19
20#[derive(Deftly)]
35#[derive_deftly(rpc::Object)]
36#[deftly(rpc(
37 delegate_with = "|this: &Self| Some(this.session.clone())",
38 delegate_type = "arti_rpcserver::RpcSession"
39))]
40#[deftly(rpc(expose_outside_of_session))]
41pub(super) struct ArtiRpcSession {
42 pub(super) arti_state: Arc<RpcVisibleArtiState>,
44 session: Arc<arti_rpcserver::RpcSession>,
46}
47
48pub(crate) struct RpcVisibleArtiState {
59 proxy_info: postage::watch::Receiver<ProxyInfoState>,
63}
64
65#[derive(Debug)]
67pub(crate) struct RpcStateSender {
68 proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
70}
71
72impl ArtiRpcSession {
73 pub(super) fn new<R: Runtime>(
80 auth: &RpcAuthentication,
81 client_root: &TorClient<R>,
82 arti_state: &Arc<RpcVisibleArtiState>,
83 listener_info: &RpcConnInfo,
84 ) -> Arc<Self> {
85 let _ = auth; let client = client_root.isolated_client();
87 let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
88 if listener_info.allow_superuser == SuperuserPermission::Allowed {
89 session.provide_superuser_permission(Arc::new(RpcSuperuser::new(client_root.clone())) as _);
90 }
91 Arc::new(ArtiRpcSession {
92 session,
93 arti_state: arti_state.clone(),
94 })
95 }
96}
97
98#[derive(Debug, Clone)]
100enum ProxyInfoState {
101 Unset,
103 Set(Arc<ProxyInfo>),
105 Eof,
107}
108
109impl DropNotifyEofSignallable for ProxyInfoState {
110 fn eof() -> Self {
111 Self::Eof
112 }
113}
114
115impl RpcVisibleArtiState {
116 pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
118 let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
119 let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
120 (
121 Arc::new(Self { proxy_info }),
122 RpcStateSender { proxy_info_sender },
123 )
124 }
125
126 pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
130 let mut proxy_info = self.proxy_info.clone();
131 while let Some(v) = proxy_info.next().await {
132 match v {
133 ProxyInfoState::Unset => {
134 }
136 ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
137 ProxyInfoState::Eof => return Err(()),
138 }
139 }
140 Err(())
141 }
142}
143
144impl RpcStateSender {
145 pub(crate) fn set_stream_listeners(&mut self, ports: &[port_info::Port]) {
149 let info = ProxyInfo {
150 proxies: ports
151 .iter()
152 .filter_map(|port| {
153 Some(proxyinfo::Proxy {
154 listener: proxyinfo::ProxyListener::try_from_portinfo(port)?,
155 })
156 })
157 .collect(),
158 };
159 *self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
160 }
161}
162
163#[cfg(test)]
164mod test {
165 #![allow(clippy::bool_assert_comparison)]
167 #![allow(clippy::clone_on_copy)]
168 #![allow(clippy::dbg_macro)]
169 #![allow(clippy::mixed_attributes_style)]
170 #![allow(clippy::print_stderr)]
171 #![allow(clippy::print_stdout)]
172 #![allow(clippy::single_char_pattern)]
173 #![allow(clippy::unwrap_used)]
174 #![allow(clippy::unchecked_time_subtraction)]
175 #![allow(clippy::useless_vec)]
176 #![allow(clippy::needless_pass_by_value)]
177 use tor_rtcompat::SpawnExt as _;
180 use tor_rtmock::MockRuntime;
181
182 use super::*;
183
184 #[test]
185 fn set_proxy_info() {
186 MockRuntime::test_with_various(|rt| async move {
187 let (state, mut sender) = RpcVisibleArtiState::new();
188 let _task = rt.clone().spawn_with_handle(async move {
189 sender.set_stream_listeners(&[port_info::Port {
190 protocol: port_info::SupportedProtocol::Socks,
191 address: "8.8.8.8:40".parse().unwrap(),
192 }]);
193 sender });
195
196 let value = state.get_proxy_info().await;
197
198 let value_again = state.get_proxy_info().await;
201 assert_eq!(value.unwrap(), value_again.unwrap());
202 });
203 }
204}