cuprate_database_service/service/write.rs
1use std::{
2 fmt::Debug,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use futures::channel::oneshot;
8use tracing::{info, warn};
9
10use cuprate_database::{ConcreteEnv, DbResult, Env, RuntimeError};
11use cuprate_helper::asynch::InfallibleOneshotReceiver;
12
13//---------------------------------------------------------------------------------------------------- Constants
14/// Name of the writer thread.
15const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
16
17//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
18/// Write handle to the database.
19///
20/// This is handle that allows `async`hronously writing to the database.
21///
22/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
23/// will return an `async`hronous channel that can be `.await`ed upon
24/// to receive the corresponding response.
25#[derive(Debug)]
26pub struct DatabaseWriteHandle<Req, Res> {
27 /// Sender channel to the database write thread-pool.
28 ///
29 /// We provide the response channel for the thread-pool.
30 pub(super) sender: crossbeam::channel::Sender<(Req, oneshot::Sender<DbResult<Res>>)>,
31}
32
33impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
34 fn clone(&self) -> Self {
35 Self {
36 sender: self.sender.clone(),
37 }
38 }
39}
40
41impl<Req, Res> DatabaseWriteHandle<Req, Res>
42where
43 Req: Send + 'static,
44 Res: Debug + Send + 'static,
45{
46 /// Initialize the single `DatabaseWriter` thread.
47 #[cold]
48 #[inline(never)] // Only called once.
49 pub fn init(
50 env: Arc<ConcreteEnv>,
51 inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res> + Send + 'static,
52 ) -> Self {
53 // Initialize `Request/Response` channels.
54 let (sender, receiver) = crossbeam::channel::unbounded();
55
56 // Spawn the writer.
57 std::thread::Builder::new()
58 .name(WRITER_THREAD_NAME.into())
59 .spawn(move || database_writer(&env, &receiver, inner_handler))
60 .unwrap();
61
62 Self { sender }
63 }
64}
65
66impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
67 type Response = Res;
68 type Error = RuntimeError;
69 type Future = InfallibleOneshotReceiver<DbResult<Res>>;
70
71 #[inline]
72 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
73 Poll::Ready(Ok(()))
74 }
75
76 #[inline]
77 fn call(&mut self, request: Req) -> Self::Future {
78 // Response channel we `.await` on.
79 let (response_sender, receiver) = oneshot::channel();
80
81 // Send the write request.
82 self.sender.send((request, response_sender)).unwrap();
83
84 InfallibleOneshotReceiver::from(receiver)
85 }
86}
87
88//---------------------------------------------------------------------------------------------------- database_writer
89/// The main function of the writer thread.
90fn database_writer<Req, Res>(
91 env: &ConcreteEnv,
92 receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<DbResult<Res>>)>,
93 inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res>,
94) where
95 Req: Send + 'static,
96 Res: Debug + Send + 'static,
97{
98 // 1. Hang on request channel
99 // 2. Map request to some database function
100 // 3. Execute that function, get the result
101 // 4. Return the result via channel
102 'main: loop {
103 let Ok((request, response_sender)) = receiver.recv() else {
104 // If this receive errors, it means that the channel is empty
105 // and disconnected, meaning the other side (all senders) have
106 // been dropped. This means "shutdown", and we return here to
107 // exit the thread.
108 //
109 // Since the channel is empty, it means we've also processed
110 // all requests. Since it is disconnected, it means future
111 // ones cannot come in.
112 return;
113 };
114
115 /// How many times should we retry handling the request on resize errors?
116 ///
117 /// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
118 const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
119
120 // Map [`Request`]'s to specific database functions.
121 //
122 // Both will:
123 // 1. Map the request to a function
124 // 2. Call the function
125 // 3. (manual resize only) If resize is needed, resize and retry
126 // 4. (manual resize only) Redo step {1, 2}
127 // 5. Send the function's `Result` back to the requester
128 //
129 // FIXME: there's probably a more elegant way
130 // to represent this retry logic with recursive
131 // functions instead of a loop.
132 'retry: for retry in 0..REQUEST_RETRY_LIMIT {
133 // FIXME: will there be more than 1 write request?
134 // this won't have to be an enum.
135 let response = inner_handler(env, &request);
136
137 // If the database needs to resize, do so.
138 if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
139 // If this is the last iteration of the outer `for` loop and we
140 // encounter a resize error _again_, it means something is wrong.
141 assert_ne!(
142 retry, REQUEST_RETRY_LIMIT,
143 "database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
144 );
145
146 // Resize the map, and retry the request handling loop.
147 //
148 // FIXME:
149 // We could pass in custom resizes to account for
150 // batches, i.e., we're about to add ~5GB of data,
151 // add that much instead of the default 1GB.
152 // <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
153 let old = env.current_map_size();
154 let new = env.resize_map(None).get();
155
156 const fn bytes_to_megabytes(bytes: usize) -> usize {
157 bytes / 1_000_000
158 }
159
160 let old_mb = bytes_to_megabytes(old);
161 let new_mb = bytes_to_megabytes(new);
162
163 info!("Resizing database memory map, old: {old_mb}MB, new: {new_mb}MB");
164
165 // Try handling the request again.
166 continue 'retry;
167 }
168
169 // Automatically resizing databases should not be returning a resize error.
170 #[cfg(debug_assertions)]
171 if !ConcreteEnv::MANUAL_RESIZE {
172 assert!(
173 !matches!(response, Err(RuntimeError::ResizeNeeded)),
174 "auto-resizing database returned a ResizeNeeded error"
175 );
176 }
177
178 // Send the response back, whether if it's an `Ok` or `Err`.
179 if let Err(e) = response_sender.send(response) {
180 warn!("Database writer failed to send response: {e:?}");
181 }
182
183 continue 'main;
184 }
185
186 // Above retry loop should either:
187 // - continue to the next ['main] loop or...
188 // - ...retry until panic
189 unreachable!();
190 }
191}