cuprate_database_service/service/
write.rs

1use std::{
2    fmt::Debug,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use futures::channel::oneshot;
8use tracing::{info, warn};
9
10use cuprate_database::{ConcreteEnv, DbResult, Env, RuntimeError};
11use cuprate_helper::asynch::InfallibleOneshotReceiver;
12
13//---------------------------------------------------------------------------------------------------- Constants
14/// Name of the writer thread.
15const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
16
17//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
18/// Write handle to the database.
19///
20/// This is handle that allows `async`hronously writing to the database.
21///
22/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
23/// will return an `async`hronous channel that can be `.await`ed upon
24/// to receive the corresponding response.
25#[derive(Debug)]
26pub struct DatabaseWriteHandle<Req, Res> {
27    /// Sender channel to the database write thread-pool.
28    ///
29    /// We provide the response channel for the thread-pool.
30    pub(super) sender: crossbeam::channel::Sender<(Req, oneshot::Sender<DbResult<Res>>)>,
31}
32
33impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
34    fn clone(&self) -> Self {
35        Self {
36            sender: self.sender.clone(),
37        }
38    }
39}
40
41impl<Req, Res> DatabaseWriteHandle<Req, Res>
42where
43    Req: Send + 'static,
44    Res: Debug + Send + 'static,
45{
46    /// Initialize the single `DatabaseWriter` thread.
47    #[cold]
48    #[inline(never)] // Only called once.
49    pub fn init(
50        env: Arc<ConcreteEnv>,
51        inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res> + Send + 'static,
52    ) -> Self {
53        // Initialize `Request/Response` channels.
54        let (sender, receiver) = crossbeam::channel::unbounded();
55
56        // Spawn the writer.
57        std::thread::Builder::new()
58            .name(WRITER_THREAD_NAME.into())
59            .spawn(move || database_writer(&env, &receiver, inner_handler))
60            .unwrap();
61
62        Self { sender }
63    }
64}
65
66impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
67    type Response = Res;
68    type Error = RuntimeError;
69    type Future = InfallibleOneshotReceiver<DbResult<Res>>;
70
71    #[inline]
72    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
73        Poll::Ready(Ok(()))
74    }
75
76    #[inline]
77    fn call(&mut self, request: Req) -> Self::Future {
78        // Response channel we `.await` on.
79        let (response_sender, receiver) = oneshot::channel();
80
81        // Send the write request.
82        self.sender.send((request, response_sender)).unwrap();
83
84        InfallibleOneshotReceiver::from(receiver)
85    }
86}
87
88//---------------------------------------------------------------------------------------------------- database_writer
89/// The main function of the writer thread.
90fn database_writer<Req, Res>(
91    env: &ConcreteEnv,
92    receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<DbResult<Res>>)>,
93    inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res>,
94) where
95    Req: Send + 'static,
96    Res: Debug + Send + 'static,
97{
98    // 1. Hang on request channel
99    // 2. Map request to some database function
100    // 3. Execute that function, get the result
101    // 4. Return the result via channel
102    'main: loop {
103        let Ok((request, response_sender)) = receiver.recv() else {
104            // If this receive errors, it means that the channel is empty
105            // and disconnected, meaning the other side (all senders) have
106            // been dropped. This means "shutdown", and we return here to
107            // exit the thread.
108            //
109            // Since the channel is empty, it means we've also processed
110            // all requests. Since it is disconnected, it means future
111            // ones cannot come in.
112            return;
113        };
114
115        /// How many times should we retry handling the request on resize errors?
116        ///
117        /// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
118        const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
119
120        // Map [`Request`]'s to specific database functions.
121        //
122        // Both will:
123        // 1. Map the request to a function
124        // 2. Call the function
125        // 3. (manual resize only) If resize is needed, resize and retry
126        // 4. (manual resize only) Redo step {1, 2}
127        // 5. Send the function's `Result` back to the requester
128        //
129        // FIXME: there's probably a more elegant way
130        // to represent this retry logic with recursive
131        // functions instead of a loop.
132        'retry: for retry in 0..REQUEST_RETRY_LIMIT {
133            // FIXME: will there be more than 1 write request?
134            // this won't have to be an enum.
135            let response = inner_handler(env, &request);
136
137            // If the database needs to resize, do so.
138            if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
139                // If this is the last iteration of the outer `for` loop and we
140                // encounter a resize error _again_, it means something is wrong.
141                assert_ne!(
142                    retry, REQUEST_RETRY_LIMIT,
143                    "database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
144                );
145
146                // Resize the map, and retry the request handling loop.
147                //
148                // FIXME:
149                // We could pass in custom resizes to account for
150                // batches, i.e., we're about to add ~5GB of data,
151                // add that much instead of the default 1GB.
152                // <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
153                let old = env.current_map_size();
154                let new = env.resize_map(None).get();
155
156                const fn bytes_to_megabytes(bytes: usize) -> usize {
157                    bytes / 1_000_000
158                }
159
160                let old_mb = bytes_to_megabytes(old);
161                let new_mb = bytes_to_megabytes(new);
162
163                info!("Resizing database memory map, old: {old_mb}MB, new: {new_mb}MB");
164
165                // Try handling the request again.
166                continue 'retry;
167            }
168
169            // Automatically resizing databases should not be returning a resize error.
170            #[cfg(debug_assertions)]
171            if !ConcreteEnv::MANUAL_RESIZE {
172                assert!(
173                    !matches!(response, Err(RuntimeError::ResizeNeeded)),
174                    "auto-resizing database returned a ResizeNeeded error"
175                );
176            }
177
178            // Send the response back, whether if it's an `Ok` or `Err`.
179            if let Err(e) = response_sender.send(response) {
180                warn!("Database writer failed to send response: {e:?}");
181            }
182
183            continue 'main;
184        }
185
186        // Above retry loop should either:
187        // - continue to the next ['main] loop or...
188        // - ...retry until panic
189        unreachable!();
190    }
191}