1use std::{
2 sync::Arc,
3 task::{Context, Poll},
4};
56use futures::channel::oneshot;
7use rayon::ThreadPool;
8use tower::Service;
910use cuprate_database::{ConcreteEnv, DbResult, RuntimeError};
11use cuprate_helper::asynch::InfallibleOneshotReceiver;
1213/// The [`rayon::ThreadPool`] service.
14///
15/// Uses an inner request handler and a rayon thread-pool to asynchronously handle requests.
16///
17/// - `Req` is the request type
18/// - `Res` is the response type
19pub struct DatabaseReadService<Req, Res> {
20/// Handle to the custom `rayon` DB reader thread-pool.
21 ///
22 /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool,
23 /// and responses are returned via a channel we (the caller) provide.
24pool: Arc<ThreadPool>,
2526/// The function used to handle request.
27inner_handler: Arc<dyn Fn(Req) -> DbResult<Res> + Send + Sync + 'static>,
28}
2930// Deriving [`Clone`] means `Req` & `Res` need to be `Clone`, even if they aren't.
31impl<Req, Res> Clone for DatabaseReadService<Req, Res> {
32fn clone(&self) -> Self {
33Self {
34 pool: Arc::clone(&self.pool),
35 inner_handler: Arc::clone(&self.inner_handler),
36 }
37 }
38}
3940impl<Req, Res> DatabaseReadService<Req, Res>
41where
42Req: Send + 'static,
43 Res: Send + 'static,
44{
45/// Creates the [`DatabaseReadService`] with the provided backing thread-pool.
46 ///
47 /// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`DatabaseReadService`]
48 /// is the correct way to get multiple handles to the database.
49#[cold]
50 #[inline(never)] // Only called once.
51pub fn new(
52 env: Arc<ConcreteEnv>,
53 pool: Arc<ThreadPool>,
54 req_handler: impl Fn(&ConcreteEnv, Req) -> DbResult<Res> + Send + Sync + 'static,
55 ) -> Self {
56let inner_handler = Arc::new(move |req| req_handler(&env, req));
5758Self {
59 pool,
60 inner_handler,
61 }
62 }
63}
6465impl<Req, Res> Service<Req> for DatabaseReadService<Req, Res>
66where
67Req: Send + 'static,
68 Res: Send + 'static,
69{
70type Response = Res;
71type Error = RuntimeError;
72type Future = InfallibleOneshotReceiver<DbResult<Self::Response>>;
7374fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
75 Poll::Ready(Ok(()))
76 }
7778fn call(&mut self, req: Req) -> Self::Future {
79// Response channel we `.await` on.
80let (response_sender, receiver) = oneshot::channel();
8182let handler = Arc::clone(&self.inner_handler);
8384// Spawn the request in the rayon DB thread-pool.
85 //
86 // Note that this uses `self.pool` instead of `rayon::spawn`
87 // such that any `rayon` parallel code that runs within
88 // the passed closure uses the same `rayon` threadpool.
89self.pool.spawn(move || {
90 drop(response_sender.send(handler(req)));
91 });
9293 InfallibleOneshotReceiver::from(receiver)
94 }
95}