cuprate_blockchain/service/
write.rs

1//! Database writer thread definitions and logic.
2//---------------------------------------------------------------------------------------------------- Import
3use std::sync::Arc;
4
5use cuprate_database::{ConcreteEnv, DbResult, Env, EnvInner, TxRw};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_types::{
8    blockchain::{BlockchainResponse, BlockchainWriteRequest},
9    AltBlockInformation, ChainId, VerifiedBlockInformation,
10};
11
12use crate::{
13    service::types::{BlockchainWriteHandle, ResponseResult},
14    tables::OpenTables,
15};
16
17/// Write functions within this module abort if the write transaction
18/// could not be aborted successfully to maintain atomicity.
19///
20/// This is the panic message if the `abort()` fails.
21const TX_RW_ABORT_FAIL: &str =
22    "Could not maintain blockchain database atomicity by aborting write transaction";
23
24//---------------------------------------------------------------------------------------------------- init_write_service
25/// Initialize the blockchain write service from a [`ConcreteEnv`].
26pub fn init_write_service(env: Arc<ConcreteEnv>) -> BlockchainWriteHandle {
27    DatabaseWriteHandle::init(env, handle_blockchain_request)
28}
29
30//---------------------------------------------------------------------------------------------------- handle_bc_request
31/// Handle an incoming [`BlockchainWriteRequest`], returning a [`BlockchainResponse`].
32fn handle_blockchain_request(
33    env: &ConcreteEnv,
34    req: &BlockchainWriteRequest,
35) -> DbResult<BlockchainResponse> {
36    match req {
37        BlockchainWriteRequest::WriteBlock(block) => write_block(env, block),
38        BlockchainWriteRequest::BatchWriteBlocks(blocks) => write_blocks(env, blocks),
39        BlockchainWriteRequest::WriteAltBlock(alt_block) => write_alt_block(env, alt_block),
40        BlockchainWriteRequest::PopBlocks(numb_blocks) => pop_blocks(env, *numb_blocks),
41        BlockchainWriteRequest::FlushAltBlocks => flush_alt_blocks(env),
42    }
43}
44
45//---------------------------------------------------------------------------------------------------- Handler functions
46// These are the actual functions that do stuff according to the incoming [`Request`].
47//
48// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
49// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
50//
51// Each function will return the [`Response`] that we
52// should send back to the caller in [`map_request()`].
53
54/// [`BlockchainWriteRequest::WriteBlock`].
55#[inline]
56fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
57    let env_inner = env.env_inner();
58    let tx_rw = env_inner.tx_rw()?;
59
60    let result = {
61        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
62        crate::ops::block::add_block(block, &mut tables_mut)
63    };
64
65    match result {
66        Ok(()) => {
67            TxRw::commit(tx_rw)?;
68            Ok(BlockchainResponse::Ok)
69        }
70        Err(e) => {
71            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
72            Err(e)
73        }
74    }
75}
76
77/// [`BlockchainWriteRequest::BatchWriteBlocks`].
78#[inline]
79fn write_blocks(env: &ConcreteEnv, block: &Vec<VerifiedBlockInformation>) -> ResponseResult {
80    let env_inner = env.env_inner();
81    let tx_rw = env_inner.tx_rw()?;
82
83    let result = {
84        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
85        for block in block {
86            crate::ops::block::add_block(block, &mut tables_mut)?;
87        }
88
89        Ok(())
90    };
91
92    match result {
93        Ok(()) => {
94            TxRw::commit(tx_rw)?;
95            Ok(BlockchainResponse::Ok)
96        }
97        Err(e) => {
98            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
99            Err(e)
100        }
101    }
102}
103
104/// [`BlockchainWriteRequest::WriteAltBlock`].
105#[inline]
106fn write_alt_block(env: &ConcreteEnv, block: &AltBlockInformation) -> ResponseResult {
107    let env_inner = env.env_inner();
108    let tx_rw = env_inner.tx_rw()?;
109
110    let result = {
111        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
112        crate::ops::alt_block::add_alt_block(block, &mut tables_mut)
113    };
114
115    match result {
116        Ok(()) => {
117            TxRw::commit(tx_rw)?;
118            Ok(BlockchainResponse::Ok)
119        }
120        Err(e) => {
121            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
122            Err(e)
123        }
124    }
125}
126
127/// [`BlockchainWriteRequest::PopBlocks`].
128fn pop_blocks(env: &ConcreteEnv, numb_blocks: usize) -> ResponseResult {
129    let env_inner = env.env_inner();
130    let mut tx_rw = env_inner.tx_rw()?;
131
132    // FIXME: turn this function into a try block once stable.
133    let mut result = || {
134        // flush all the current alt blocks as they may reference blocks to be popped.
135        crate::ops::alt_block::flush_alt_blocks(&env_inner, &mut tx_rw)?;
136
137        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
138        // generate a `ChainId` for the popped blocks.
139        let old_main_chain_id = ChainId(rand::random());
140
141        // pop the blocks
142        for _ in 0..numb_blocks {
143            crate::ops::block::pop_block(Some(old_main_chain_id), &mut tables_mut)?;
144        }
145
146        Ok(old_main_chain_id)
147    };
148
149    match result() {
150        Ok(old_main_chain_id) => {
151            TxRw::commit(tx_rw)?;
152            Ok(BlockchainResponse::PopBlocks(old_main_chain_id))
153        }
154        Err(e) => {
155            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
156            Err(e)
157        }
158    }
159}
160
161/// [`BlockchainWriteRequest::FlushAltBlocks`].
162#[inline]
163fn flush_alt_blocks(env: &ConcreteEnv) -> ResponseResult {
164    let env_inner = env.env_inner();
165    let mut tx_rw = env_inner.tx_rw()?;
166
167    let result = crate::ops::alt_block::flush_alt_blocks(&env_inner, &mut tx_rw);
168
169    match result {
170        Ok(()) => {
171            TxRw::commit(tx_rw)?;
172            Ok(BlockchainResponse::Ok)
173        }
174        Err(e) => {
175            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
176            Err(e)
177        }
178    }
179}