arti_rpcserver/connection.rs
1//! RPC connection support, mainloop, and protocol implementation.
2
3pub(crate) mod auth;
4mod methods;
5use std::{
6 collections::HashMap,
7 io::Error as IoError,
8 pin::Pin,
9 sync::{Arc, Mutex, RwLock, Weak},
10};
11
12use asynchronous_codec::JsonCodecError;
13use derive_deftly::Deftly;
14use futures::{
15 AsyncWriteExt as _, FutureExt, Sink, SinkExt as _, StreamExt,
16 channel::mpsc,
17 stream::{FusedStream, FuturesUnordered},
18};
19use rpc::dispatch::BoxedUpdateSink;
20use serde_json::error::Category as JsonErrorCategory;
21use tor_async_utils::{SinkExt as _, mpsc_channel_no_memquota};
22
23use crate::{
24 RpcAuthentication,
25 cancel::{self, Cancel, CancelHandle},
26 err::RequestParseError,
27 globalid::{GlobalId, MacKey},
28 msgs::{BoxedResponse, FlexibleRequest, ReqMeta, Request, RequestId, ResponseBody},
29 objmap::{GenIdx, ObjMap},
30};
31
32use tor_rpcbase::templates::*;
33use tor_rpcbase::{self as rpc, RpcError};
34
35/// A function we use to construct Session objects in response to authentication.
36//
37// TODO RPC: Perhaps this should return a Result?
38type SessionFactory = Box<dyn Fn(&RpcAuthentication) -> Arc<dyn rpc::Object> + Send + Sync>;
39
40/// An open connection from an RPC client.
41///
42/// Tracks information that persists from one request to another.
43///
44/// The client might not have authenticated;
45/// access and permissions control is handled via the capability system.
46/// Specifically, the `objects` table in `Inner` hold capabilities
47/// that the client will use to do things,
48/// including an `RpcSession`.
49///
50/// # In the Arti RPC System
51///
52/// A connection to Arti.
53///
54/// This object is available as soon as you open a connection to Arti RPC,
55/// even before you authenticate. Its ObjectId is always `"connection"`.
56///
57/// Because this object is available before authentication,
58/// it provides only those methods that you need
59/// in order to perform authentication
60/// and receive an `RpcSession`.
61///
62/// Note that a connection can only be authenticated once:
63/// If you drop the `RpcSession` returned by authenticating,
64/// you cannot get another one on the same connection.
65#[derive(Deftly)]
66#[derive_deftly(Object)]
67pub struct Connection {
68 /// The mutable state of this connection.
69 inner: Mutex<Inner>,
70
71 /// Lookup table to find the implementations for methods
72 /// based on RPC object and method types.
73 ///
74 /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
75 dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
76
77 /// A unique identifier for this connection.
78 ///
79 /// This kind of ID is used to refer to the connection from _outside_ of the
80 /// context of an RPC connection: it can uniquely identify the connection
81 /// from e.g. a SOCKS session so that clients can attach streams to it.
82 connection_id: ConnectionId,
83
84 /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
85 /// need to exist outside this connection.
86 global_id_mac_key: MacKey,
87
88 /// The authentication type that's required in order to get a session.
89 require_auth: tor_rpc_connect::auth::RpcAuth,
90}
91
92/// The inner, lock-protected part of an RPC connection.
93struct Inner {
94 /// Map from request ID to handles; used when we need to cancel a request.
95 //
96 // TODO: We have two options here for handling colliding IDs. We can either turn
97 // this into a multimap, or we can declare that cancelling a request only
98 // cancels the most recent request sent with that ID.
99 inflight: HashMap<RequestId, Option<CancelHandle>>,
100
101 /// An object map used to look up most objects by ID, and keep track of
102 /// which objects are owned by this connection.
103 objects: ObjMap,
104
105 /// A reference to this connection itself.
106 ///
107 /// Used when we're looking up the connection within the RPC system as an object.
108 ///
109 /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
110 this_connection: Option<Weak<Connection>>,
111
112 /// A SessionFactory that will be used to create a session if authentication is successful.
113 ///
114 /// This is None if the connection has already been authenticated.
115 session_factory: Option<SessionFactory>,
116}
117
118/// How many updates can be pending, per connection, before they start to block?
119const UPDATE_CHAN_SIZE: usize = 128;
120
121/// A type-erased [`FusedStream`] yielding [`Request`]s.
122//
123// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
124// nice and simple.)
125pub(crate) type BoxedRequestStream = Pin<
126 Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
127>;
128
129/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
130pub(crate) type BoxedResponseSink =
131 Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
132
133/// A random value used to identify an connection.
134#[derive(
135 Copy,
136 Clone,
137 Debug,
138 Eq,
139 PartialEq,
140 Hash,
141 derive_more::From,
142 derive_more::Into,
143 derive_more::AsRef,
144)]
145pub(crate) struct ConnectionId([u8; 16]);
146
147impl ConnectionId {
148 /// The length of a ConnectionId.
149 pub(crate) const LEN: usize = 16;
150}
151
152impl Connection {
153 /// A special object ID that indicates the connection itself.
154 ///
155 /// On a fresh connection, this is the only ObjectId that exists.
156 //
157 // TODO: We might want to move responsibility for tracking this ID and its value into ObjMap.
158 const CONNECTION_OBJ_ID: &'static str = "connection";
159
160 /// Create a new connection.
161 pub(crate) fn new(
162 connection_id: ConnectionId,
163 dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
164 global_id_mac_key: MacKey,
165 require_auth: tor_rpc_connect::auth::RpcAuth,
166 session_factory: SessionFactory,
167 ) -> Arc<Self> {
168 Arc::new_cyclic(|this_connection| Self {
169 inner: Mutex::new(Inner {
170 inflight: HashMap::new(),
171 objects: ObjMap::new(),
172 this_connection: Some(Weak::clone(this_connection)),
173 session_factory: Some(session_factory),
174 }),
175 dispatch_table,
176 connection_id,
177 global_id_mac_key,
178 require_auth,
179 })
180 }
181
182 /// Construct a new object to serve as the `session` for a connection.
183 pub(crate) fn create_session(
184 &self,
185 auth: &RpcAuthentication,
186 ) -> Result<Arc<dyn rpc::Object>, RpcError> {
187 let mut inner = self.inner.lock().expect("lock poisoned");
188 let session_factory = inner.session_factory.take().ok_or_else(|| {
189 RpcError::new(
190 "Cannot authenticate the same connection twice".into(),
191 rpc::RpcErrorKind::RequestError,
192 )
193 })?;
194 Ok((session_factory)(auth))
195 }
196
197 /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
198 /// this connection's ObjMap.
199 fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
200 // Design note: It's not really necessary from a security POV to
201 // check the MAC here; any possible GenIdx we return will either
202 // refer to some object we're allowed to name in this session, or to
203 // no object at all. Still, we check anyway, since it shouldn't
204 // hurt to do so.
205 if let Some(global_id) = GlobalId::try_decode(&self.global_id_mac_key, id)? {
206 // We have a GlobalId with a valid MAC. Let's make sure it applies
207 // to this connection's ObjMap. (We do not support referring to
208 // anyone else's objects.)
209 //
210 // Design note: As above, this check is a protection against
211 // accidental misuse, not a security feature: even if we removed
212 // this check, we would still only allow objects that this session
213 // is allowed to name.
214 if global_id.connection == self.connection_id {
215 Ok(global_id.local_id)
216 } else {
217 Err(rpc::LookupError::NoObject(id.clone()))
218 }
219 } else {
220 // It's not a GlobalId; let's see if we can make sense of it as an
221 // ObjMap index.
222 Ok(GenIdx::try_decode(id)?)
223 }
224 }
225
226 /// Look up a given object by its object ID relative to this connection.
227 pub(crate) fn lookup_object(
228 &self,
229 id: &rpc::ObjectId,
230 ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
231 if id.as_ref() == Self::CONNECTION_OBJ_ID {
232 let this = self
233 .inner
234 .lock()
235 .expect("lock poisoned")
236 .this_connection
237 .as_ref()
238 .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?
239 .upgrade()
240 .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?;
241 Ok(this as Arc<_>)
242 } else {
243 let local_id = self.id_into_local_idx(id)?;
244
245 self.lookup_by_idx(local_id)
246 .map_err(|e| e.to_rpc_lookup_error(id.clone()))
247 }
248 }
249
250 /// As `lookup_object`, but expect a `GenIdx`.
251 pub(crate) fn lookup_by_idx(
252 &self,
253 idx: crate::objmap::GenIdx,
254 ) -> Result<Arc<dyn rpc::Object>, crate::objmap::LookupError> {
255 let inner = self.inner.lock().expect("lock poisoned");
256 inner.objects.lookup(idx)
257 }
258
259 /// Un-register the request `id` and stop tracking its information.
260 fn remove_request(&self, id: &RequestId) {
261 let mut inner = self.inner.lock().expect("lock poisoned");
262 inner.inflight.remove(id);
263 }
264
265 /// Register the request `id` as a cancellable request.
266 ///
267 /// If `handle` is none, register it as an uncancellable request.
268 fn register_request(&self, id: RequestId, handle: Option<CancelHandle>) {
269 let mut inner = self.inner.lock().expect("lock poisoned");
270 inner.inflight.insert(id, handle);
271 }
272
273 /// Try to cancel the request `id`.
274 ///
275 /// Return an error when `id` cannot be found, or cannot be cancelled.
276 /// (These cases are indistinguishable.)
277 fn cancel_request(&self, id: &RequestId) -> Result<(), CancelError> {
278 let mut inner = self.inner.lock().expect("lock poisoned");
279 match inner.inflight.remove(id) {
280 Some(Some(handle)) => {
281 drop(inner);
282 handle.cancel()?;
283 Ok(())
284 }
285 Some(None) => {
286 // Put it back in case somebody tries again.
287 inner.inflight.insert(id.clone(), None);
288 Err(CancelError::CannotCancelRequest)
289 }
290 None => Err(CancelError::RequestNotFound),
291 }
292 }
293
294 /// Run in a loop, decoding JSON requests from `input` and
295 /// writing JSON responses onto `output`.
296 pub async fn run<IN, OUT>(
297 self: Arc<Self>,
298 input: IN,
299 mut output: OUT,
300 ) -> Result<(), ConnectionError>
301 where
302 IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
303 OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
304 {
305 /// Banner line to send, indicating that Arti is ready to receive requests.
306 ///
307 /// The key in this json object is mandatory; the value can be anything.
308 const BANNER: &[u8] = b"{\"arti_rpc\":{}}\n";
309
310 output
311 .write_all(BANNER)
312 .await
313 .map_err(|e| ConnectionError::WriteFailed(Arc::new(e)))?;
314
315 let write = Box::pin(asynchronous_codec::FramedWrite::new(
316 output,
317 crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
318 ));
319
320 let read = Box::pin(
321 asynchronous_codec::FramedRead::new(
322 input,
323 asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
324 )
325 .fuse(),
326 );
327
328 self.run_loop(read, write).await
329 }
330
331 /// Run in a loop, handling requests from `request_stream` and writing
332 /// responses onto `response_stream`.
333 ///
334 /// After this returns, even if it returns `Ok(())`, the connection must no longer be used.
335 pub(crate) async fn run_loop(
336 self: Arc<Self>,
337 mut request_stream: BoxedRequestStream,
338 mut response_sink: BoxedResponseSink,
339 ) -> Result<(), ConnectionError> {
340 // This function will multiplex on three streams:
341 // * `request_stream` -- a stream of incoming requests from the client.
342 // * `finished_requests` -- a stream of requests that are done.
343 // * `rx_response` -- a stream of updates and final responses sent from
344 // in-progress tasks. (We put updates and final responsese onto the
345 // same channel to ensure that they stay in-order for each method
346 // invocation.
347 //
348 // Note that the blocking behavior here is deliberate: We want _all_ of
349 // these reads to start blocking when response_sink.send is blocked.
350
351 // TODO RPC should this queue participate in memquota?
352 let (tx_response, mut rx_response) =
353 mpsc_channel_no_memquota::<BoxedResponse>(UPDATE_CHAN_SIZE);
354
355 let mut finished_requests = FuturesUnordered::new();
356 finished_requests.push(futures::future::pending().boxed());
357
358 /// Helper: enforce an explicit "continue".
359 struct Continue;
360
361 // We create a separate async block here and immediately await it,
362 // so that any internal `returns` and `?`s do not escape the function.
363 let outcome = async {
364 loop {
365 let _: Continue = futures::select! {
366 r = finished_requests.next() => {
367 // A task is done, so we can forget about it.
368 let () = r.expect("Somehow, future::pending() terminated.");
369 Continue
370 }
371
372 r = rx_response.next() => {
373 // The future for some request has sent a response (success,
374 // failure, or update), so we can inform the client.
375 let update = r.expect("Somehow, tx_update got closed.");
376 // Calling `await` here (and below) is deliberate: we _want_
377 // to stop reading the client's requests if the client is
378 // not reading their responses (or not) reading them fast
379 // enough.
380 response_sink.send(update).await.map_err(ConnectionError::writing)?;
381 Continue
382 }
383
384 req = request_stream.next() => {
385 match req {
386 None => {
387 // We've reached the end of the stream of requests;
388 // time to close.
389 return Ok(());
390 }
391 Some(Err(e)) => {
392 // We got a non-recoverable error from the JSON codec.
393 return Err(ConnectionError::from_read_error(e));
394
395 }
396 Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
397 // We decoded the request as Json, but not as a `Valid`` request.
398 // Send back a response indicating what was wrong with it.
399 let response = BoxedResponse::from_error(
400 bad_req.id().cloned(), bad_req.error()
401 );
402 response_sink
403 .send(response)
404 .await
405 .map_err( ConnectionError::writing)?;
406 if bad_req.id().is_none() {
407 // The spec says we must close the connection in this case.
408 return Err(bad_req.error().into());
409 }
410 Continue
411
412 }
413 Some(Ok(FlexibleRequest::Valid(req))) => {
414 // We have a request. Time to launch it!
415 let tx = tx_response.clone();
416 let fut = self.run_method_and_deliver_response(tx, req);
417 finished_requests.push(fut.boxed());
418 Continue
419 }
420 }
421 }
422 };
423 }
424 }
425 .await;
426
427 match outcome {
428 Err(e) if e.is_connection_close() => Ok(()),
429 other => other,
430 }
431 }
432
433 /// Invoke `request` and send all of its responses to `tx_response`.
434 async fn run_method_and_deliver_response(
435 self: &Arc<Self>,
436 mut tx_response: mpsc::Sender<BoxedResponse>,
437 request: Request,
438 ) {
439 let Request {
440 id,
441 obj,
442 meta,
443 method,
444 } = request;
445
446 let update_sender: BoxedUpdateSink = if meta.updates {
447 let id_clone = id.clone();
448 let sink =
449 tx_response
450 .clone()
451 .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
452 Result::<BoxedResponse, _>::Ok(BoxedResponse {
453 id: Some(id_clone.clone()),
454 body: ResponseBody::Update(obj),
455 })
456 });
457 Box::pin(sink)
458 } else {
459 let sink = futures::sink::drain().sink_err_into();
460 Box::pin(sink)
461 };
462
463 let is_cancellable = method.is_cancellable();
464
465 // Create `run_method_lowlevel` future, and make it cancellable.
466 let fut = self.run_method_lowlevel(update_sender, obj, method, meta);
467
468 // Optionally register the future as cancellable. Then run it to completion.
469 let outcome = if is_cancellable {
470 let (handle, fut) = Cancel::new(fut);
471 self.register_request(id.clone(), Some(handle));
472 fut.await
473 } else {
474 self.register_request(id.clone(), None);
475 Ok(fut.await)
476 };
477
478 // Figure out how to respond.
479 let body = match outcome {
480 Ok(Ok(value)) => ResponseBody::Success(value),
481 // TODO: If we're going to box this, let's do so earlier.
482 Ok(Err(err)) => {
483 if err.is_internal() {
484 tracing::warn!(
485 "Reporting an internal error on an RPC connection: {:?}",
486 err
487 );
488 }
489 ResponseBody::Error(Box::new(err))
490 }
491 Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
492 };
493
494 // Send the response.
495 //
496 // (It's okay to ignore the error here, since it can only mean that the
497 // RPC connection has closed.)
498 let _ignore_err = tx_response
499 .send(BoxedResponse {
500 id: Some(id.clone()),
501 body,
502 })
503 .await;
504
505 // Unregister the request.
506 //
507 // TODO: This may unregister a different request if the user sent
508 // in another request with the same ID.
509 self.remove_request(&id);
510 }
511
512 /// Run a single method, and return its final response.
513 ///
514 /// If `tx_updates` is provided, and this method generates updates, it
515 /// should send those updates on `tx_updates`
516 ///
517 /// Note that this function is able to send responses with IDs that do not
518 /// match the original. It should enforce correct IDs on whatever response
519 /// it generates.
520 async fn run_method_lowlevel(
521 self: &Arc<Self>,
522 tx_updates: rpc::dispatch::BoxedUpdateSink,
523 obj_id: rpc::ObjectId,
524 method: Box<dyn rpc::DeserMethod>,
525 meta: ReqMeta,
526 ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
527 if !meta.require.is_empty() {
528 // TODO RPC: Eventually, we will need a way to tell which "features" are actually
529 // available. But for now, we have no features, so if the require list is nonempty,
530 // we can safely reject the request.
531 return Err(MissingFeaturesError(meta.require).into());
532 }
533
534 let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
535
536 let invoke_future =
537 rpc::invoke_rpc_method(context, &obj_id, method.upcast_box(), tx_updates)?;
538
539 // Note that we drop the read lock before we await this future!
540 invoke_future.await
541 }
542
543 /// Helper: Implementation for register_weak and register_strong.
544 fn register_impl(
545 &self,
546 insert_into: impl FnOnce(&mut ObjMap) -> GenIdx,
547 use_global_id: bool,
548 ) -> rpc::ObjectId {
549 let local_id = insert_into(&mut self.inner.lock().expect("Lock poisoned").objects);
550
551 // Design note: It is a deliberate decision to _always_ use GlobalId for
552 // objects whose IDs are _ever_ exported for use in SOCKS requests. Some
553 // alternatives would be to use GlobalId conditionally, or to have a
554 // separate Method to create a new GlobalId given an existing LocalId.
555 if use_global_id {
556 GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
557 } else {
558 local_id.encode()
559 }
560 }
561}
562
563/// An error returned when an RPC request lists some feature as required,
564/// but we don't have every such feature.
565#[derive(Clone, Debug, thiserror::Error)]
566#[error("Required features not available")]
567struct MissingFeaturesError(
568 /// A list of the features that were requested but not available.
569 Vec<String>,
570);
571
572impl From<MissingFeaturesError> for RpcError {
573 fn from(err: MissingFeaturesError) -> Self {
574 let mut e = RpcError::new(
575 err.to_string(),
576 tor_rpcbase::RpcErrorKind::FeatureNotPresent,
577 );
578 e.set_datum("rpc:unsupported_features".to_string(), err.0)
579 .expect("invalid keyword");
580 e
581 }
582}
583
584/// A failure that results in closing a [`Connection`].
585#[derive(Clone, Debug, thiserror::Error)]
586#[non_exhaustive]
587pub enum ConnectionError {
588 /// Unable to write to our connection.
589 #[error("Could not write to connection")]
590 WriteFailed(#[source] Arc<IoError>),
591 /// Read error from connection.
592 #[error("Problem reading from connection")]
593 ReadFailed(#[source] Arc<IoError>),
594 /// Read something that we could not decode.
595 #[error("Unable to decode request from connection")]
596 DecodeFailed(#[source] Arc<serde_json::Error>),
597 /// Unable to write our response as json.
598 #[error("Unable to encode response onto connection")]
599 EncodeFailed(#[source] Arc<serde_json::Error>),
600 /// We encountered a problem when parsing a request that was (in our judgment)
601 /// too severe to recover from.
602 #[error("Unrecoverable problem from parsed request")]
603 RequestParseFailed(#[from] RequestParseError),
604}
605
606impl ConnectionError {
607 /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
608 fn writing(error: JsonCodecError) -> Self {
609 match error {
610 JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
611 JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
612 }
613 }
614
615 /// Return true if this error is (or might be) due to the peer closing the connection.
616 ///
617 /// Such errors should be tolerated without much complaint;
618 /// other errors should at least be logged somewhere.
619 fn is_connection_close(&self) -> bool {
620 use JsonErrorCategory as JK;
621 use std::io::ErrorKind as IK;
622 #[allow(clippy::match_like_matches_macro)]
623 match self {
624 Self::ReadFailed(e) | Self::WriteFailed(e) => match e.kind() {
625 IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => true,
626 _ => false,
627 },
628 Self::DecodeFailed(e) => match e.classify() {
629 JK::Eof => true,
630 _ => false,
631 },
632 _ => false,
633 }
634 }
635
636 /// Construct a `ConnectionError` from a JsonCodecError that occurred while reading.
637 fn from_read_error(error: JsonCodecError) -> Self {
638 match error {
639 JsonCodecError::Io(e) => Self::ReadFailed(Arc::new(e)),
640 JsonCodecError::Json(e) => Self::DecodeFailed(Arc::new(e)),
641 }
642 }
643}
644
645impl rpc::Context for Connection {
646 fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
647 Connection::lookup_object(self, id)
648 }
649
650 fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
651 let expose = object.expose_outside_of_session();
652 self.register_impl(|map| map.insert_strong(object), expose)
653 }
654
655 fn register_weak(&self, object: &Arc<dyn tor_rpcbase::Object>) -> tor_rpcbase::ObjectId {
656 let expose = object.expose_outside_of_session();
657 self.register_impl(|map| map.insert_weak(object), expose)
658 }
659
660 fn release(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
661 let removed_some = if id.as_ref() == Self::CONNECTION_OBJ_ID {
662 self.inner
663 .lock()
664 .expect("Lock poisoned")
665 .this_connection
666 .take()
667 .is_some()
668 } else {
669 let idx = self.id_into_local_idx(id)?;
670
671 self.inner
672 .lock()
673 .expect("Lock poisoned")
674 .objects
675 .remove(idx)
676 };
677
678 if removed_some {
679 Ok(())
680 } else {
681 Err(rpc::LookupError::NoObject(id.clone()))
682 }
683 }
684
685 fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
686 &self.dispatch_table
687 }
688}
689
690/// An error given when an RPC request is cancelled.
691///
692/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
693/// we want to move that type into a general-purpose location, and make it not
694/// RPC-specific.
695#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
696#[error("RPC request was cancelled")]
697pub(crate) struct RequestCancelled;
698
699impl From<RequestCancelled> for RpcError {
700 fn from(_: RequestCancelled) -> Self {
701 RpcError::new(
702 "Request cancelled".into(),
703 rpc::RpcErrorKind::RequestCancelled,
704 )
705 }
706}
707
708/// An error given when we attempt to cancel an RPC request, but cannot.
709///
710#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
711pub(crate) enum CancelError {
712 /// We didn't find any request with the provided ID.
713 ///
714 /// Since we don't keep track of requests after they finish or are cancelled,
715 /// we cannot distinguish the cases where a request has finished,
716 /// where the request has been cancelled,
717 /// or where the request never existed.
718 /// Therefore we collapse them into a single error type.
719 #[error("RPC request not found")]
720 RequestNotFound,
721
722 /// This kind of request cannot be cancelled.
723 #[error("Uncancellable request")]
724 CannotCancelRequest,
725
726 /// We tried to cancel a request but found out it was already cancelled.
727 ///
728 /// This error should be impossible.
729 #[error("Request somehow cancelled twice!")]
730 AlreadyCancelled,
731}
732
733impl From<cancel::CannotCancel> for CancelError {
734 fn from(value: cancel::CannotCancel) -> Self {
735 use CancelError as CE;
736 use cancel::CannotCancel as CC;
737 match value {
738 CC::Cancelled => CE::AlreadyCancelled,
739 // We map "finished" to RequestNotFound since it is not in the general case
740 // distinguishable from it; see documentation on RequestNotFound.
741 CC::Finished => CE::RequestNotFound,
742 }
743 }
744}
745
746impl From<CancelError> for RpcError {
747 fn from(err: CancelError) -> Self {
748 use CancelError as CE;
749 use rpc::RpcErrorKind as REK;
750 let code = match err {
751 CE::RequestNotFound => REK::RequestError,
752 CE::CannotCancelRequest => REK::RequestError,
753 CE::AlreadyCancelled => REK::InternalError,
754 };
755 RpcError::new(err.to_string(), code)
756 }
757}