cuprate_database_service/service/
read.rs1use std::{
2 sync::Arc,
3 task::{Context, Poll},
4};
5
6use futures::channel::oneshot;
7use rayon::ThreadPool;
8use tower::Service;
9
10use cuprate_database::{ConcreteEnv, DbResult, RuntimeError};
11use cuprate_helper::asynch::InfallibleOneshotReceiver;
12
13pub struct DatabaseReadService<Req, Res> {
20 pool: Arc<ThreadPool>,
25
26 inner_handler: Arc<dyn Fn(Req) -> DbResult<Res> + Send + Sync + 'static>,
28}
29
30impl<Req, Res> Clone for DatabaseReadService<Req, Res> {
32 fn clone(&self) -> Self {
33 Self {
34 pool: Arc::clone(&self.pool),
35 inner_handler: Arc::clone(&self.inner_handler),
36 }
37 }
38}
39
40impl<Req, Res> DatabaseReadService<Req, Res>
41where
42 Req: Send + 'static,
43 Res: Send + 'static,
44{
45 #[cold]
50 #[inline(never)] pub fn new(
52 env: Arc<ConcreteEnv>,
53 pool: Arc<ThreadPool>,
54 req_handler: impl Fn(&ConcreteEnv, Req) -> DbResult<Res> + Send + Sync + 'static,
55 ) -> Self {
56 let inner_handler = Arc::new(move |req| req_handler(&env, req));
57
58 Self {
59 pool,
60 inner_handler,
61 }
62 }
63}
64
65impl<Req, Res> Service<Req> for DatabaseReadService<Req, Res>
66where
67 Req: Send + 'static,
68 Res: Send + 'static,
69{
70 type Response = Res;
71 type Error = RuntimeError;
72 type Future = InfallibleOneshotReceiver<DbResult<Self::Response>>;
73
74 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
75 Poll::Ready(Ok(()))
76 }
77
78 fn call(&mut self, req: Req) -> Self::Future {
79 let (response_sender, receiver) = oneshot::channel();
81
82 let handler = Arc::clone(&self.inner_handler);
83
84 self.pool.spawn(move || {
90 drop(response_sender.send(handler(req)));
91 });
92
93 InfallibleOneshotReceiver::from(receiver)
94 }
95}