1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
use std::{
task::{Context, Poll},
use futures::channel::oneshot;
use cuprate_database::{ConcreteEnv, DbResult, Env, RuntimeError};
use cuprate_helper::asynch::InfallibleOneshotReceiver;
//---------------------------------------------------------------------------------------------------- Constants
/// Name of the writer thread.
const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
/// Write handle to the database.
/// This is handle that allows `async`hronously writing to the database.
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
/// will return an `async`hronous channel that can be `.await`ed upon
/// to receive the corresponding response.
pub struct DatabaseWriteHandle<Req, Res> {
/// Sender channel to the database write thread-pool.
/// We provide the response channel for the thread-pool.
pub(super) sender: crossbeam::channel::Sender<(Req, oneshot::Sender<DbResult<Res>>)>,
impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
impl<Req, Res> DatabaseWriteHandle<Req, Res>
Req: Send + 'static,
Res: Debug + Send + 'static,
/// Initialize the single `DatabaseWriter` thread.
#[inline(never)] // Only called once.
pub fn init(
env: Arc<ConcreteEnv>,
inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res> + Send + 'static,
) -> Self {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
// Spawn the writer.
.spawn(move || database_writer(&env, &receiver, inner_handler))
Self { sender }
impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
type Response = Res;
type Error = RuntimeError;
type Future = InfallibleOneshotReceiver<DbResult<Res>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<DbResult<()>> {
fn call(&mut self, request: Req) -> Self::Future {
// Response channel we `.await` on.
let (response_sender, receiver) = oneshot::channel();
// Send the write request.
self.sender.send((request, response_sender)).unwrap();
//---------------------------------------------------------------------------------------------------- database_writer
/// The main function of the writer thread.
fn database_writer<Req, Res>(
env: &ConcreteEnv,
receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<DbResult<Res>>)>,
inner_handler: impl Fn(&ConcreteEnv, &Req) -> DbResult<Res>,
) where
Req: Send + 'static,
Res: Debug + Send + 'static,
// 1. Hang on request channel
// 2. Map request to some database function
// 3. Execute that function, get the result
// 4. Return the result via channel
'main: loop {
let Ok((request, response_sender)) = receiver.recv() else {
// If this receive errors, it means that the channel is empty
// and disconnected, meaning the other side (all senders) have
// been dropped. This means "shutdown", and we return here to
// exit the thread.
// Since the channel is empty, it means we've also processed
// all requests. Since it is disconnected, it means future
// ones cannot come in.
/// How many times should we retry handling the request on resize errors?
/// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
// Map [`Request`]'s to specific database functions.
// Both will:
// 1. Map the request to a function
// 2. Call the function
// 3. (manual resize only) If resize is needed, resize and retry
// 4. (manual resize only) Redo step {1, 2}
// 5. Send the function's `Result` back to the requester
// FIXME: there's probably a more elegant way
// to represent this retry logic with recursive
// functions instead of a loop.
'retry: for retry in 0..REQUEST_RETRY_LIMIT {
// FIXME: will there be more than 1 write request?
// this won't have to be an enum.
let response = inner_handler(env, &request);
// If the database needs to resize, do so.
if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
// If this is the last iteration of the outer `for` loop and we
// encounter a resize error _again_, it means something is wrong.
"database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
// Resize the map, and retry the request handling loop.
// We could pass in custom resizes to account for
// batches, i.e., we're about to add ~5GB of data,
// add that much instead of the default 1GB.
// <>
let old = env.current_map_size();
let new = env.resize_map(None);
// TODO: use tracing.
println!("resizing database memory map, old: {old}B, new: {new}B");
// Try handling the request again.
continue 'retry;
// Automatically resizing databases should not be returning a resize error.
if !ConcreteEnv::MANUAL_RESIZE {
!matches!(response, Err(RuntimeError::ResizeNeeded)),
"auto-resizing database returned a ResizeNeeded error"
// Send the response back, whether if it's an `Ok` or `Err`.
if let Err(e) = response_sender.send(response) {
// TODO: use tracing.
println!("database writer failed to send response: {e:?}");
continue 'main;
// Above retry loop should either:
// - continue to the next ['main] loop or...
// - ...retry until panic