cuprate_database_service/service/
write.rs

1use std::{
2    fmt::Debug,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use futures::channel::oneshot;
8
9use cuprate_database::{ConcreteEnv, DbResult, Env, RuntimeError};
10use cuprate_helper::asynch::InfallibleOneshotReceiver;
11
12//---------------------------------------------------------------------------------------------------- Constants
13/// Name of the writer thread.
14const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
15
16//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
17/// Write handle to the database.
18///
19/// This is handle that allows `async`hronously writing to the database.
20///
21/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
22/// will return an `async`hronous channel that can be `.await`ed upon
23/// to receive the corresponding response.
24#[derive(Debug)]
25pub struct DatabaseWriteHandle<Req, Res> {
26    /// Sender channel to the database write thread-pool.
27    ///
28    /// We provide the response channel for the thread-pool.
29    pub(super) sender: crossbeam::channel::Sender<(Req, oneshot::Sender<DbResult<Res>>)>,
30}
31
32impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
33    fn clone(&self) -> Self {
34        Self {
35            sender: self.sender.clone(),
36        }
37    }
38}
39
40impl<Req, Res> DatabaseWriteHandle<Req, Res>
41where
42    Req: Send + 'static,
43    Res: Debug + Send + 'static,
44{
45    /// Initialize the single `DatabaseWriter` thread.
46    #[cold]
47    #[inline(never)] // Only called once.
48    pub fn init(
49        env: Arc<ConcreteEnv>,
50        inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res> + Send + 'static,
51    ) -> Self {
52        // Initialize `Request/Response` channels.
53        let (sender, receiver) = crossbeam::channel::unbounded();
54
55        // Spawn the writer.
56        std::thread::Builder::new()
57            .name(WRITER_THREAD_NAME.into())
58            .spawn(move || database_writer(&env, &receiver, inner_handler))
59            .unwrap();
60
61        Self { sender }
62    }
63}
64
65impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
66    type Response = Res;
67    type Error = RuntimeError;
68    type Future = InfallibleOneshotReceiver<DbResult<Res>>;
69
70    #[inline]
71    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
72        Poll::Ready(Ok(()))
73    }
74
75    #[inline]
76    fn call(&mut self, request: Req) -> Self::Future {
77        // Response channel we `.await` on.
78        let (response_sender, receiver) = oneshot::channel();
79
80        // Send the write request.
81        self.sender.send((request, response_sender)).unwrap();
82
83        InfallibleOneshotReceiver::from(receiver)
84    }
85}
86
87//---------------------------------------------------------------------------------------------------- database_writer
88/// The main function of the writer thread.
89fn database_writer<Req, Res>(
90    env: &ConcreteEnv,
91    receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<DbResult<Res>>)>,
92    inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res>,
93) where
94    Req: Send + 'static,
95    Res: Debug + Send + 'static,
96{
97    // 1. Hang on request channel
98    // 2. Map request to some database function
99    // 3. Execute that function, get the result
100    // 4. Return the result via channel
101    'main: loop {
102        let Ok((request, response_sender)) = receiver.recv() else {
103            // If this receive errors, it means that the channel is empty
104            // and disconnected, meaning the other side (all senders) have
105            // been dropped. This means "shutdown", and we return here to
106            // exit the thread.
107            //
108            // Since the channel is empty, it means we've also processed
109            // all requests. Since it is disconnected, it means future
110            // ones cannot come in.
111            return;
112        };
113
114        /// How many times should we retry handling the request on resize errors?
115        ///
116        /// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
117        const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
118
119        // Map [`Request`]'s to specific database functions.
120        //
121        // Both will:
122        // 1. Map the request to a function
123        // 2. Call the function
124        // 3. (manual resize only) If resize is needed, resize and retry
125        // 4. (manual resize only) Redo step {1, 2}
126        // 5. Send the function's `Result` back to the requester
127        //
128        // FIXME: there's probably a more elegant way
129        // to represent this retry logic with recursive
130        // functions instead of a loop.
131        'retry: for retry in 0..REQUEST_RETRY_LIMIT {
132            // FIXME: will there be more than 1 write request?
133            // this won't have to be an enum.
134            let response = inner_handler(env, &request);
135
136            // If the database needs to resize, do so.
137            if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
138                // If this is the last iteration of the outer `for` loop and we
139                // encounter a resize error _again_, it means something is wrong.
140                assert_ne!(
141                    retry, REQUEST_RETRY_LIMIT,
142                    "database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
143                );
144
145                // Resize the map, and retry the request handling loop.
146                //
147                // FIXME:
148                // We could pass in custom resizes to account for
149                // batches, i.e., we're about to add ~5GB of data,
150                // add that much instead of the default 1GB.
151                // <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
152                let old = env.current_map_size();
153                let new = env.resize_map(None);
154
155                // TODO: use tracing.
156                println!("resizing database memory map, old: {old}B, new: {new}B");
157
158                // Try handling the request again.
159                continue 'retry;
160            }
161
162            // Automatically resizing databases should not be returning a resize error.
163            #[cfg(debug_assertions)]
164            if !ConcreteEnv::MANUAL_RESIZE {
165                assert!(
166                    !matches!(response, Err(RuntimeError::ResizeNeeded)),
167                    "auto-resizing database returned a ResizeNeeded error"
168                );
169            }
170
171            // Send the response back, whether if it's an `Ok` or `Err`.
172            if let Err(e) = response_sender.send(response) {
173                // TODO: use tracing.
174                println!("database writer failed to send response: {e:?}");
175            }
176
177            continue 'main;
178        }
179
180        // Above retry loop should either:
181        // - continue to the next ['main] loop or...
182        // - ...retry until panic
183        unreachable!();
184    }
185}