#![expect(
unreachable_code,
unused_variables,
clippy::unnecessary_wraps,
clippy::needless_pass_by_value,
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
)]
use std::{
cmp::min,
collections::{HashMap, HashSet},
sync::Arc,
};
use rayon::{
iter::{Either, IntoParallelIterator, ParallelIterator},
prelude::*,
ThreadPool,
};
use thread_local::ThreadLocal;
use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, TxsInBlock,
};
use crate::{
ops::{
alt_block::{
get_alt_block, get_alt_block_extended_header_from_height, get_alt_block_hash,
get_alt_chain_history_ranges,
},
block::{
block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry,
get_block_extended_header_from_height, get_block_height, get_block_info,
},
blockchain::{cumulative_generated_coins, find_split_point, top_block_height},
key_image::key_image_exists,
output::id_to_output_on_chain,
},
service::{
free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
types::{BlockchainReadHandle, ResponseResult},
},
tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter},
types::{
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
},
};
#[cold]
#[inline(never)] pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BlockchainReadHandle {
init_read_service_with_pool(env, init_thread_pool(threads))
}
#[cold]
#[inline(never)] pub fn init_read_service_with_pool(
env: Arc<ConcreteEnv>,
pool: Arc<ThreadPool>,
) -> BlockchainReadHandle {
DatabaseReadService::new(env, pool, map_request)
}
fn map_request(
env: &ConcreteEnv, request: BlockchainReadRequest, ) -> ResponseResult {
use BlockchainReadRequest as R;
match request {
R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block, chain) => block_hash(env, block, chain),
R::FindBlock(block_hash) => find_block(env, block_hash),
R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
R::BlockExtendedHeaderInRange(range, chain) => {
block_extended_header_in_range(env, range, chain)
}
R::ChainHeight => chain_height(env),
R::GeneratedCoins(height) => generated_coins(env, height),
R::Outputs(map) => outputs(env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
R::KeyImagesSpent(set) => key_images_spent(env, set),
R::CompactChainHistory => compact_chain_history(env),
R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount),
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
R::TxsInBlock {
block_hash,
tx_indexes,
} => txs_in_block(env, block_hash, tx_indexes),
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
R::Block { height } => block(env, height),
R::BlockByHash(hash) => block_by_hash(env, hash),
R::TotalTxCount => total_tx_count(env),
R::DatabaseSize => database_size(env),
R::OutputHistogram(input) => output_histogram(env, input),
R::CoinbaseTxSum { height, count } => coinbase_tx_sum(env, height, count),
R::AltChains => alt_chains(env),
R::AltChainCount => alt_chain_count(env),
}
}
#[inline]
fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
ThreadLocal::with_capacity(env.config().reader_threads.get())
}
macro_rules! get_tables {
($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
$tables.get_or_try(|| {
match $env_inner.open_tables($tx_ro) {
Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
Err(e) => Err(e),
}
})
}};
}
fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let (missing_hashes, blocks) = block_hashes
.into_par_iter()
.map(|block_hash| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
match get_block_complete_entry(&block_hash, tables) {
Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
res => res.map(Either::Right),
}
})
.collect::<DbResult<_>>()?;
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
Ok(BlockchainResponse::BlockCompleteEntries {
blocks,
missing_hashes,
blockchain_height,
})
}
#[inline]
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
Ok(BlockchainResponse::BlockExtendedHeader(
get_block_extended_header_from_height(&block_height, &tables)?,
))
}
#[inline]
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let block_hash = match chain {
Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
Chain::Alt(chain) => {
get_alt_block_hash(&block_height, chain, &env_inner.open_tables(&tx_ro)?)?
}
};
Ok(BlockchainResponse::BlockHash(block_hash))
}
fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
match table_block_heights.get(&block_hash) {
Ok(height) => return Ok(BlockchainResponse::FindBlock(Some((Chain::Main, height)))),
Err(RuntimeError::KeyNotFound) => (),
Err(e) => return Err(e),
}
let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
match table_alt_block_heights.get(&block_hash) {
Ok(height) => Ok(BlockchainResponse::FindBlock(Some((
Chain::Alt(height.chain_id.into()),
height.height,
)))),
Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)),
Err(e) => Err(e),
}
}
#[inline]
fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let mut err = None;
hashes.retain(
|block_hash| match block_exists(block_hash, &table_block_heights) {
Ok(exists) => exists,
Err(e) => {
err.get_or_insert(e);
false
}
},
);
if let Some(e) = err {
Err(e)
} else {
Ok(BlockchainResponse::FilterUnknownHashes(hashes))
}
}
#[inline]
fn block_extended_header_in_range(
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
chain: Chain,
) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let vec = match chain {
Chain::Main => range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<DbResult<Vec<ExtendedBlockHeader>>>()?,
Chain::Alt(chain_id) => {
let ranges = {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let alt_chains = tables.alt_chain_infos();
get_alt_chain_history_ranges(range, chain_id, alt_chains)?
};
ranges
.par_iter()
.rev()
.flat_map(|(chain, range)| {
range.clone().into_par_iter().map(|height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
match *chain {
Chain::Main => get_block_extended_header_from_height(&height, tables),
Chain::Alt(chain_id) => get_alt_block_extended_header_from_height(
&AltBlockHeight {
chain_id: chain_id.into(),
height,
},
tables,
),
}
})
})
.collect::<DbResult<Vec<_>>>()?
}
};
Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec))
}
#[inline]
fn chain_height(env: &ConcreteEnv) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
let block_hash =
get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
Ok(BlockchainResponse::ChainHeight(chain_height, block_hash))
}
#[inline]
fn generated_coins(env: &ConcreteEnv, height: usize) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
Ok(BlockchainResponse::GeneratedCoins(
cumulative_generated_coins(&height, &table_block_infos)?,
))
}
#[inline]
fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let inner_map = |amount, amount_index| -> DbResult<(AmountIndex, OutputOnChain)> {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let id = PreRctOutputId {
amount,
amount_index,
};
let output_on_chain = id_to_output_on_chain(&id, tables)?;
Ok((amount_index, output_on_chain))
};
let map = outputs
.into_par_iter()
.map(|(amount, amount_index_set)| {
Ok((
amount,
amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<DbResult<HashMap<AmountIndex, OutputOnChain>>>()?,
))
})
.collect::<DbResult<HashMap<Amount, HashMap<AmountIndex, OutputOnChain>>>>()?;
Ok(BlockchainResponse::Outputs(map))
}
#[inline]
fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
#[expect(
clippy::cast_possible_truncation,
reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
)]
let num_rct_outputs = {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
tables.rct_outputs().len()? as usize
};
let map = amounts
.into_par_iter()
.map(|amount| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
if amount == 0 {
Ok((amount, num_rct_outputs))
} else {
match tables.num_outputs().get(&amount) {
#[expect(
clippy::cast_possible_truncation,
reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
)]
Ok(count) => Ok((amount, count as usize)),
Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
Err(e) => Err(e),
}
}
})
.collect::<DbResult<HashMap<Amount, usize>>>()?;
Ok(BlockchainResponse::NumberOutputsWithAmount(map))
}
#[inline]
fn key_images_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let key_image_exists = |key_image| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
key_image_exists(&key_image, tables.key_images())
};
match key_images
.into_par_iter()
.map(key_image_exists)
.find_any(|result| !matches!(result, Ok(false)))
{
None | Some(Ok(false)) => Ok(BlockchainResponse::KeyImagesSpent(false)), Some(Ok(true)) => Ok(BlockchainResponse::KeyImagesSpent(true)), Some(Err(e)) => Err(e), }
}
fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_block_height = top_block_height(&table_block_heights)?;
let top_block_info = get_block_info(&top_block_height, &table_block_infos)?;
let cumulative_difficulty = combine_low_high_bits_to_u128(
top_block_info.cumulative_difficulty_low,
top_block_info.cumulative_difficulty_high,
);
const INITIAL_BLOCKS: usize = 11;
let mut block_ids = (0..)
.map(compact_history_index_to_height_offset::<INITIAL_BLOCKS>)
.map_while(|i| top_block_height.checked_sub(i))
.map(|height| Ok(get_block_info(&height, &table_block_infos)?.block_hash))
.collect::<DbResult<Vec<_>>>()?;
if compact_history_genesis_not_included::<INITIAL_BLOCKS>(top_block_height) {
block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash);
}
Ok(BlockchainResponse::CompactChainHistory {
cumulative_difficulty,
block_ids,
})
}
fn next_chain_entry(
env: &ConcreteEnv,
block_ids: &[BlockHash],
next_entry_size: usize,
) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
let table_block_heights = tables.block_heights();
let table_block_infos = tables.block_infos_iter();
let idx = find_split_point(block_ids, false, table_block_heights)?;
if idx == block_ids.len() {
return Ok(BlockchainResponse::NextChainEntry {
start_height: None,
chain_height: 0,
block_ids: vec![],
block_weights: vec![],
cumulative_difficulty: 0,
first_block_blob: None,
});
}
let first_known_block_hash = block_ids[idx];
let first_known_height = table_block_heights.get(&first_known_block_hash)?;
let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?;
let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height);
let (block_ids, block_weights) = (first_known_height..last_height_in_chain_entry)
.map(|height| {
let block_info = table_block_infos.get(&height)?;
Ok((block_info.block_hash, block_info.weight))
})
.collect::<DbResult<(Vec<_>, Vec<_>)>>()?;
let top_block_info = table_block_infos.get(&(chain_height - 1))?;
let first_block_blob = if block_ids.len() >= 2 {
Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0)
} else {
None
};
Ok(BlockchainResponse::NextChainEntry {
start_height: Some(first_known_height),
chain_height,
block_ids,
block_weights,
cumulative_difficulty: combine_low_high_bits_to_u128(
top_block_info.cumulative_difficulty_low,
top_block_info.cumulative_difficulty_high,
),
first_block_blob,
})
}
fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let idx = find_split_point(block_ids, true, &table_block_heights)?;
Ok(if idx == block_ids.len() {
BlockchainResponse::FindFirstUnknown(None)
} else if idx == 0 {
BlockchainResponse::FindFirstUnknown(Some((0, 0)))
} else {
let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?;
BlockchainResponse::FindFirstUnknown(Some((idx, last_known_height + 1)))
})
}
fn txs_in_block(env: &ConcreteEnv, block_hash: [u8; 32], missing_txs: Vec<u64>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
let block_height = tables.block_heights().get(&block_hash)?;
let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?;
let first_tx_index = miner_tx_index + 1;
if numb_txs < missing_txs.len() {
return Ok(BlockchainResponse::TxsInBlock(None));
}
let txs = missing_txs
.into_iter()
.map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0))
.collect::<DbResult<_>>()?;
Ok(BlockchainResponse::TxsInBlock(Some(TxsInBlock {
block,
txs,
})))
}
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let history = {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_alt_chain_history_ranges(0..usize::MAX, chain_id, tables.alt_chain_infos())?
};
let blocks = history
.par_iter()
.rev()
.skip(1)
.flat_map(|(chain_id, range)| {
let Chain::Alt(chain_id) = chain_id else {
panic!("Should not have main chain blocks here we skipped last range");
};
range.clone().into_par_iter().map(|height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_alt_block(
&AltBlockHeight {
chain_id: (*chain_id).into(),
height,
},
tables,
)
})
})
.collect::<DbResult<_>>()?;
Ok(BlockchainResponse::AltBlocksInChain(blocks))
}
fn block(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
Ok(BlockchainResponse::Block(todo!()))
}
fn block_by_hash(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
Ok(BlockchainResponse::Block(todo!()))
}
fn total_tx_count(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::TotalTxCount(todo!()))
}
fn database_size(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::DatabaseSize {
database_size: todo!(),
free_space: todo!(),
})
}
fn output_histogram(env: &ConcreteEnv, input: OutputHistogramInput) -> ResponseResult {
Ok(BlockchainResponse::OutputHistogram(todo!()))
}
fn coinbase_tx_sum(env: &ConcreteEnv, height: usize, count: u64) -> ResponseResult {
Ok(BlockchainResponse::CoinbaseTxSum(todo!()))
}
fn alt_chains(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::AltChains(todo!()))
}
fn alt_chain_count(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::AltChainCount(todo!()))
}