cuprate_database_service/service/
read.rs

1use std::{
2    sync::Arc,
3    task::{Context, Poll},
4};
5
6use futures::channel::oneshot;
7use rayon::ThreadPool;
8use tower::Service;
9
10use cuprate_database::{ConcreteEnv, DbResult, RuntimeError};
11use cuprate_helper::asynch::InfallibleOneshotReceiver;
12
13/// The [`rayon::ThreadPool`] service.
14///
15/// Uses an inner request handler and a rayon thread-pool to asynchronously handle requests.
16///
17/// - `Req` is the request type
18/// - `Res` is the response type
19pub struct DatabaseReadService<Req, Res> {
20    /// Handle to the custom `rayon` DB reader thread-pool.
21    ///
22    /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool,
23    /// and responses are returned via a channel we (the caller) provide.
24    pool: Arc<ThreadPool>,
25
26    /// The function used to handle request.
27    inner_handler: Arc<dyn Fn(Req) -> DbResult<Res> + Send + Sync + 'static>,
28}
29
30// Deriving [`Clone`] means `Req` & `Res` need to be `Clone`, even if they aren't.
31impl<Req, Res> Clone for DatabaseReadService<Req, Res> {
32    fn clone(&self) -> Self {
33        Self {
34            pool: Arc::clone(&self.pool),
35            inner_handler: Arc::clone(&self.inner_handler),
36        }
37    }
38}
39
40impl<Req, Res> DatabaseReadService<Req, Res>
41where
42    Req: Send + 'static,
43    Res: Send + 'static,
44{
45    /// Creates the [`DatabaseReadService`] with the provided backing thread-pool.
46    ///
47    /// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`DatabaseReadService`]
48    /// is the correct way to get multiple handles to the database.
49    #[cold]
50    #[inline(never)] // Only called once.
51    pub fn new(
52        env: Arc<ConcreteEnv>,
53        pool: Arc<ThreadPool>,
54        req_handler: impl Fn(&ConcreteEnv, Req) -> DbResult<Res> + Send + Sync + 'static,
55    ) -> Self {
56        let inner_handler = Arc::new(move |req| req_handler(&env, req));
57
58        Self {
59            pool,
60            inner_handler,
61        }
62    }
63}
64
65impl<Req, Res> Service<Req> for DatabaseReadService<Req, Res>
66where
67    Req: Send + 'static,
68    Res: Send + 'static,
69{
70    type Response = Res;
71    type Error = RuntimeError;
72    type Future = InfallibleOneshotReceiver<DbResult<Self::Response>>;
73
74    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
75        Poll::Ready(Ok(()))
76    }
77
78    fn call(&mut self, req: Req) -> Self::Future {
79        // Response channel we `.await` on.
80        let (response_sender, receiver) = oneshot::channel();
81
82        let handler = Arc::clone(&self.inner_handler);
83
84        // Spawn the request in the rayon DB thread-pool.
85        //
86        // Note that this uses `self.pool` instead of `rayon::spawn`
87        // such that any `rayon` parallel code that runs within
88        // the passed closure uses the same `rayon` threadpool.
89        self.pool.spawn(move || {
90            drop(response_sender.send(handler(req)));
91        });
92
93        InfallibleOneshotReceiver::from(receiver)
94    }
95}