cuprate_database_service/service/write.rs
1use std::{
2 fmt::Debug,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use futures::channel::oneshot;
8
9use cuprate_database::{ConcreteEnv, DbResult, Env, RuntimeError};
10use cuprate_helper::asynch::InfallibleOneshotReceiver;
11
12//---------------------------------------------------------------------------------------------------- Constants
13/// Name of the writer thread.
14const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
15
16//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
17/// Write handle to the database.
18///
19/// This is handle that allows `async`hronously writing to the database.
20///
21/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
22/// will return an `async`hronous channel that can be `.await`ed upon
23/// to receive the corresponding response.
24#[derive(Debug)]
25pub struct DatabaseWriteHandle<Req, Res> {
26 /// Sender channel to the database write thread-pool.
27 ///
28 /// We provide the response channel for the thread-pool.
29 pub(super) sender: crossbeam::channel::Sender<(Req, oneshot::Sender<DbResult<Res>>)>,
30}
31
32impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
33 fn clone(&self) -> Self {
34 Self {
35 sender: self.sender.clone(),
36 }
37 }
38}
39
40impl<Req, Res> DatabaseWriteHandle<Req, Res>
41where
42 Req: Send + 'static,
43 Res: Debug + Send + 'static,
44{
45 /// Initialize the single `DatabaseWriter` thread.
46 #[cold]
47 #[inline(never)] // Only called once.
48 pub fn init(
49 env: Arc<ConcreteEnv>,
50 inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res> + Send + 'static,
51 ) -> Self {
52 // Initialize `Request/Response` channels.
53 let (sender, receiver) = crossbeam::channel::unbounded();
54
55 // Spawn the writer.
56 std::thread::Builder::new()
57 .name(WRITER_THREAD_NAME.into())
58 .spawn(move || database_writer(&env, &receiver, inner_handler))
59 .unwrap();
60
61 Self { sender }
62 }
63}
64
65impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
66 type Response = Res;
67 type Error = RuntimeError;
68 type Future = InfallibleOneshotReceiver<DbResult<Res>>;
69
70 #[inline]
71 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
72 Poll::Ready(Ok(()))
73 }
74
75 #[inline]
76 fn call(&mut self, request: Req) -> Self::Future {
77 // Response channel we `.await` on.
78 let (response_sender, receiver) = oneshot::channel();
79
80 // Send the write request.
81 self.sender.send((request, response_sender)).unwrap();
82
83 InfallibleOneshotReceiver::from(receiver)
84 }
85}
86
87//---------------------------------------------------------------------------------------------------- database_writer
88/// The main function of the writer thread.
89fn database_writer<Req, Res>(
90 env: &ConcreteEnv,
91 receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<DbResult<Res>>)>,
92 inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res>,
93) where
94 Req: Send + 'static,
95 Res: Debug + Send + 'static,
96{
97 // 1. Hang on request channel
98 // 2. Map request to some database function
99 // 3. Execute that function, get the result
100 // 4. Return the result via channel
101 'main: loop {
102 let Ok((request, response_sender)) = receiver.recv() else {
103 // If this receive errors, it means that the channel is empty
104 // and disconnected, meaning the other side (all senders) have
105 // been dropped. This means "shutdown", and we return here to
106 // exit the thread.
107 //
108 // Since the channel is empty, it means we've also processed
109 // all requests. Since it is disconnected, it means future
110 // ones cannot come in.
111 return;
112 };
113
114 /// How many times should we retry handling the request on resize errors?
115 ///
116 /// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
117 const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
118
119 // Map [`Request`]'s to specific database functions.
120 //
121 // Both will:
122 // 1. Map the request to a function
123 // 2. Call the function
124 // 3. (manual resize only) If resize is needed, resize and retry
125 // 4. (manual resize only) Redo step {1, 2}
126 // 5. Send the function's `Result` back to the requester
127 //
128 // FIXME: there's probably a more elegant way
129 // to represent this retry logic with recursive
130 // functions instead of a loop.
131 'retry: for retry in 0..REQUEST_RETRY_LIMIT {
132 // FIXME: will there be more than 1 write request?
133 // this won't have to be an enum.
134 let response = inner_handler(env, &request);
135
136 // If the database needs to resize, do so.
137 if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
138 // If this is the last iteration of the outer `for` loop and we
139 // encounter a resize error _again_, it means something is wrong.
140 assert_ne!(
141 retry, REQUEST_RETRY_LIMIT,
142 "database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
143 );
144
145 // Resize the map, and retry the request handling loop.
146 //
147 // FIXME:
148 // We could pass in custom resizes to account for
149 // batches, i.e., we're about to add ~5GB of data,
150 // add that much instead of the default 1GB.
151 // <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
152 let old = env.current_map_size();
153 let new = env.resize_map(None);
154
155 // TODO: use tracing.
156 println!("resizing database memory map, old: {old}B, new: {new}B");
157
158 // Try handling the request again.
159 continue 'retry;
160 }
161
162 // Automatically resizing databases should not be returning a resize error.
163 #[cfg(debug_assertions)]
164 if !ConcreteEnv::MANUAL_RESIZE {
165 assert!(
166 !matches!(response, Err(RuntimeError::ResizeNeeded)),
167 "auto-resizing database returned a ResizeNeeded error"
168 );
169 }
170
171 // Send the response back, whether if it's an `Ok` or `Err`.
172 if let Err(e) = response_sender.send(response) {
173 // TODO: use tracing.
174 println!("database writer failed to send response: {e:?}");
175 }
176
177 continue 'main;
178 }
179
180 // Above retry loop should either:
181 // - continue to the next ['main] loop or...
182 // - ...retry until panic
183 unreachable!();
184 }
185}