cuprate_blockchain/service/
write.rs

1//! Database writer thread definitions and logic.
2//---------------------------------------------------------------------------------------------------- Import
3use std::sync::Arc;
4
5use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, TxRw};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_types::{
8    blockchain::{BlockchainResponse, BlockchainWriteRequest},
9    AltBlockInformation, Chain, ChainId, VerifiedBlockInformation,
10};
11
12use crate::{
13    service::{
14        free::map_valid_alt_block_to_verified_block,
15        types::{BlockchainWriteHandle, ResponseResult},
16    },
17    tables::{OpenTables, Tables},
18    types::AltBlockHeight,
19};
20
21/// Write functions within this module abort if the write transaction
22/// could not be aborted successfully to maintain atomicity.
23///
24/// This is the panic message if the `abort()` fails.
25const TX_RW_ABORT_FAIL: &str =
26    "Could not maintain blockchain database atomicity by aborting write transaction";
27
28//---------------------------------------------------------------------------------------------------- init_write_service
29/// Initialize the blockchain write service from a [`ConcreteEnv`].
30pub fn init_write_service(env: Arc<ConcreteEnv>) -> BlockchainWriteHandle {
31    DatabaseWriteHandle::init(env, handle_blockchain_request)
32}
33
34//---------------------------------------------------------------------------------------------------- handle_bc_request
35/// Handle an incoming [`BlockchainWriteRequest`], returning a [`BlockchainResponse`].
36fn handle_blockchain_request(
37    env: &ConcreteEnv,
38    req: &BlockchainWriteRequest,
39) -> DbResult<BlockchainResponse> {
40    match req {
41        BlockchainWriteRequest::WriteBlock(block) => write_block(env, block),
42        BlockchainWriteRequest::BatchWriteBlocks(blocks) => write_blocks(env, blocks),
43        BlockchainWriteRequest::WriteAltBlock(alt_block) => write_alt_block(env, alt_block),
44        BlockchainWriteRequest::PopBlocks(numb_blocks) => pop_blocks(env, *numb_blocks),
45        BlockchainWriteRequest::ReverseReorg(old_main_chain_id) => {
46            reverse_reorg(env, *old_main_chain_id)
47        }
48        BlockchainWriteRequest::FlushAltBlocks => flush_alt_blocks(env),
49    }
50}
51
52//---------------------------------------------------------------------------------------------------- Handler functions
53// These are the actual functions that do stuff according to the incoming [`Request`].
54//
55// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
56// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
57//
58// Each function will return the [`Response`] that we
59// should send back to the caller in [`map_request()`].
60
61/// [`BlockchainWriteRequest::WriteBlock`].
62#[inline]
63fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
64    let env_inner = env.env_inner();
65    let tx_rw = env_inner.tx_rw()?;
66
67    let result = {
68        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
69        crate::ops::block::add_block(block, &mut tables_mut)
70    };
71
72    match result {
73        Ok(()) => {
74            TxRw::commit(tx_rw)?;
75            Ok(BlockchainResponse::Ok)
76        }
77        Err(e) => {
78            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
79            Err(e)
80        }
81    }
82}
83
84/// [`BlockchainWriteRequest::BatchWriteBlocks`].
85#[inline]
86fn write_blocks(env: &ConcreteEnv, block: &Vec<VerifiedBlockInformation>) -> ResponseResult {
87    let env_inner = env.env_inner();
88    let tx_rw = env_inner.tx_rw()?;
89
90    let result = {
91        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
92        for block in block {
93            crate::ops::block::add_block(block, &mut tables_mut)?;
94        }
95
96        Ok(())
97    };
98
99    match result {
100        Ok(()) => {
101            TxRw::commit(tx_rw)?;
102            Ok(BlockchainResponse::Ok)
103        }
104        Err(e) => {
105            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
106            Err(e)
107        }
108    }
109}
110
111/// [`BlockchainWriteRequest::WriteAltBlock`].
112#[inline]
113fn write_alt_block(env: &ConcreteEnv, block: &AltBlockInformation) -> ResponseResult {
114    let env_inner = env.env_inner();
115    let tx_rw = env_inner.tx_rw()?;
116
117    let result = {
118        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
119        crate::ops::alt_block::add_alt_block(block, &mut tables_mut)
120    };
121
122    match result {
123        Ok(()) => {
124            TxRw::commit(tx_rw)?;
125            Ok(BlockchainResponse::Ok)
126        }
127        Err(e) => {
128            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
129            Err(e)
130        }
131    }
132}
133
134/// [`BlockchainWriteRequest::PopBlocks`].
135fn pop_blocks(env: &ConcreteEnv, numb_blocks: usize) -> ResponseResult {
136    let env_inner = env.env_inner();
137    let mut tx_rw = env_inner.tx_rw()?;
138
139    // FIXME: turn this function into a try block once stable.
140    let mut result = || {
141        // flush all the current alt blocks as they may reference blocks to be popped.
142        crate::ops::alt_block::flush_alt_blocks(&env_inner, &mut tx_rw)?;
143
144        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
145        // generate a `ChainId` for the popped blocks.
146        let old_main_chain_id = ChainId(rand::random());
147
148        // pop the blocks
149        for _ in 0..numb_blocks {
150            crate::ops::block::pop_block(Some(old_main_chain_id), &mut tables_mut)?;
151        }
152
153        Ok(old_main_chain_id)
154    };
155
156    match result() {
157        Ok(old_main_chain_id) => {
158            TxRw::commit(tx_rw)?;
159            Ok(BlockchainResponse::PopBlocks(old_main_chain_id))
160        }
161        Err(e) => {
162            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
163            Err(e)
164        }
165    }
166}
167
168/// [`BlockchainWriteRequest::ReverseReorg`].
169fn reverse_reorg(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
170    let env_inner = env.env_inner();
171    let mut tx_rw = env_inner.tx_rw()?;
172
173    // FIXME: turn this function into a try block once stable.
174    let mut result = || {
175        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
176
177        let chain_info = tables_mut.alt_chain_infos().get(&chain_id.into())?;
178        // Although this doesn't guarantee the chain was popped from the main-chain, it's an easy
179        // thing for us to check.
180        assert_eq!(Chain::from(chain_info.parent_chain), Chain::Main);
181
182        let top_block_height =
183            crate::ops::blockchain::top_block_height(tables_mut.block_heights())?;
184
185        // pop any blocks that were added as part of a re-org.
186        for _ in chain_info.common_ancestor_height..top_block_height {
187            crate::ops::block::pop_block(None, &mut tables_mut)?;
188        }
189
190        // Add the old main chain blocks back to the main chain.
191        for height in (chain_info.common_ancestor_height + 1)..chain_info.chain_height {
192            let alt_block = crate::ops::alt_block::get_alt_block(
193                &AltBlockHeight {
194                    chain_id: chain_id.into(),
195                    height,
196                },
197                &tables_mut,
198            )?;
199            let verified_block = map_valid_alt_block_to_verified_block(alt_block);
200            crate::ops::block::add_block(&verified_block, &mut tables_mut)?;
201        }
202
203        drop(tables_mut);
204        crate::ops::alt_block::flush_alt_blocks(&env_inner, &mut tx_rw)?;
205
206        Ok(())
207    };
208
209    match result() {
210        Ok(()) => {
211            TxRw::commit(tx_rw)?;
212            Ok(BlockchainResponse::Ok)
213        }
214        Err(e) => {
215            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
216            Err(e)
217        }
218    }
219}
220
221/// [`BlockchainWriteRequest::FlushAltBlocks`].
222#[inline]
223fn flush_alt_blocks(env: &ConcreteEnv) -> ResponseResult {
224    let env_inner = env.env_inner();
225    let mut tx_rw = env_inner.tx_rw()?;
226
227    let result = crate::ops::alt_block::flush_alt_blocks(&env_inner, &mut tx_rw);
228
229    match result {
230        Ok(()) => {
231            TxRw::commit(tx_rw)?;
232            Ok(BlockchainResponse::Ok)
233        }
234        Err(e) => {
235            TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
236            Err(e)
237        }
238    }
239}