Skip to main content

arti/rpc/
session.rs

1//! Declare the RPC session object as exposed from the RPC server run by the `arti` crate.
2
3use arti_client::TorClient;
4use arti_rpcserver::RpcAuthentication;
5use derive_deftly::Deftly;
6use futures::stream::StreamExt as _;
7use std::sync::Arc;
8use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9use tor_rpc_connect::SuperuserPermission;
10use tor_rpcbase::{self as rpc};
11use tor_rtcompat::Runtime;
12
13use crate::{
14    proxy::port_info,
15    rpc::{listener::RpcConnInfo, superuser::RpcSuperuser},
16};
17
18use super::proxyinfo::{self, ProxyInfo};
19
20/// A top-level RPC session object.
21///
22/// This is the first object that an RPC user receives upon authenticating;
23/// It is returned by `auth:authenticate`.
24///
25/// Other objects (`TorClient`,`RpcDataStream`, etc)
26/// are available using methods on this object.
27/// (See the list of available methods.)
28///
29/// This type wraps and delegates to [`arti_rpcserver::RpcSession`],
30/// but exposes additional functionality not available at the
31/// level of [`arti_rpcserver`], including information about configured proxies.
32///
33/// This ObjectID for this object can be used as the target of a SOCKS stream.
34#[derive(Deftly)]
35#[derive_deftly(rpc::Object)]
36#[deftly(rpc(
37    delegate_with = "|this: &Self| Some(this.session.clone())",
38    delegate_type = "arti_rpcserver::RpcSession"
39))]
40#[deftly(rpc(expose_outside_of_session))]
41pub(super) struct ArtiRpcSession {
42    /// State about the `arti` server, as seen by the Rpc system.
43    pub(super) arti_state: Arc<RpcVisibleArtiState>,
44    /// The underlying RpcSession object that we delegate to.
45    session: Arc<arti_rpcserver::RpcSession>,
46}
47
48/// Information about the current global top-level Arti state,
49/// as exposed to an Rpc Session.
50//
51// TODO: This type is dangerously close to being a collection of globals.
52// We should refactor it aggressively when we refactor the `arti` crate.
53//
54// TODO: Right now this is constructed in the same form that it's used in
55// ArtiRpcSession.  Later on, we could split it into one type that
56// the rest of this crate constructs, and another type that the
57// ArtiRpcSession actually uses. We should do that if the needs seem to diverge.
58pub(crate) struct RpcVisibleArtiState {
59    /// A `ProxyInfo` that we hand out when asked to list our proxy ports.
60    ///
61    /// Right now it only lists Socks; in the future it may list more.
62    proxy_info: postage::watch::Receiver<ProxyInfoState>,
63}
64
65/// Handle to set RPC state across RPC sessions.  (See `RpcVisibleArtiState`.)
66#[derive(Debug)]
67pub(crate) struct RpcStateSender {
68    /// Sender for setting our list of proxy ports.
69    proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
70}
71
72impl ArtiRpcSession {
73    /// Construct a new `ArtiRpcSession`.
74    ///
75    /// Privileges on the session (if any) are derived from `auth`, which describes
76    /// how the user authenticated.
77    ///
78    /// The session receives a new isolated TorClient, based on `client_root`.
79    pub(super) fn new<R: Runtime>(
80        auth: &RpcAuthentication,
81        client_root: &TorClient<R>,
82        arti_state: &Arc<RpcVisibleArtiState>,
83        listener_info: &RpcConnInfo,
84    ) -> Arc<Self> {
85        let _ = auth; // This is currently unused; any authentication gives the same result.
86        let client = client_root.isolated_client();
87        let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
88        if listener_info.allow_superuser == SuperuserPermission::Allowed {
89            session.provide_superuser_permission(Arc::new(RpcSuperuser::new(client_root.clone())) as _);
90        }
91        Arc::new(ArtiRpcSession {
92            session,
93            arti_state: arti_state.clone(),
94        })
95    }
96}
97
98/// Possible state for a watched proxy_info.
99#[derive(Debug, Clone)]
100enum ProxyInfoState {
101    /// We haven't set it yet.
102    Unset,
103    /// We've set it to a given value.
104    Set(Arc<ProxyInfo>),
105    /// The sender has been dropped.
106    Eof,
107}
108
109impl DropNotifyEofSignallable for ProxyInfoState {
110    fn eof() -> Self {
111        Self::Eof
112    }
113}
114
115impl RpcVisibleArtiState {
116    /// Construct a new `RpcVisibleArtiState`.
117    pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
118        let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
119        let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
120        (
121            Arc::new(Self { proxy_info }),
122            RpcStateSender { proxy_info_sender },
123        )
124    }
125
126    /// Return the latest proxy info, waiting until it is set.
127    ///
128    /// Return an error if the sender has been closed.
129    pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
130        let mut proxy_info = self.proxy_info.clone();
131        while let Some(v) = proxy_info.next().await {
132            match v {
133                ProxyInfoState::Unset => {
134                    // Not yet set, try again.
135                }
136                ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
137                ProxyInfoState::Eof => return Err(()),
138            }
139        }
140        Err(())
141    }
142}
143
144impl RpcStateSender {
145    /// Set the list of stream listener addresses on this state.
146    ///
147    /// This method may only be called once per state.
148    pub(crate) fn set_stream_listeners(&mut self, ports: &[port_info::Port]) {
149        let info = ProxyInfo {
150            proxies: ports
151                .iter()
152                .filter_map(|port| {
153                    Some(proxyinfo::Proxy {
154                        listener: proxyinfo::ProxyListener::try_from_portinfo(port)?,
155                    })
156                })
157                .collect(),
158        };
159        *self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
160    }
161}
162
163#[cfg(test)]
164mod test {
165    // @@ begin test lint list maintained by maint/add_warning @@
166    #![allow(clippy::bool_assert_comparison)]
167    #![allow(clippy::clone_on_copy)]
168    #![allow(clippy::dbg_macro)]
169    #![allow(clippy::mixed_attributes_style)]
170    #![allow(clippy::print_stderr)]
171    #![allow(clippy::print_stdout)]
172    #![allow(clippy::single_char_pattern)]
173    #![allow(clippy::unwrap_used)]
174    #![allow(clippy::unchecked_time_subtraction)]
175    #![allow(clippy::useless_vec)]
176    #![allow(clippy::needless_pass_by_value)]
177    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
178
179    use tor_rtcompat::SpawnExt as _;
180    use tor_rtmock::MockRuntime;
181
182    use super::*;
183
184    #[test]
185    fn set_proxy_info() {
186        MockRuntime::test_with_various(|rt| async move {
187            let (state, mut sender) = RpcVisibleArtiState::new();
188            let _task = rt.clone().spawn_with_handle(async move {
189                sender.set_stream_listeners(&[port_info::Port {
190                    protocol: port_info::SupportedProtocol::Socks,
191                    address: "8.8.8.8:40".parse().unwrap(),
192                }]);
193                sender // keep sender alive
194            });
195
196            let value = state.get_proxy_info().await;
197
198            // At this point, we've returned once, so this will test that we get a fresh answer even
199            // if we already set the inner value.
200            let value_again = state.get_proxy_info().await;
201            assert_eq!(value.unwrap(), value_again.unwrap());
202        });
203    }
204}