#![expect(
unreachable_code,
unused_variables,
clippy::unnecessary_wraps,
clippy::needless_pass_by_value,
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
)]
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use rayon::{
iter::{IntoParallelIterator, ParallelIterator},
prelude::*,
ThreadPool,
};
use thread_local::ThreadLocal;
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain,
};
use crate::{
ops::{
alt_block::{
get_alt_block, get_alt_block_extended_header_from_height, get_alt_block_hash,
get_alt_chain_history_ranges,
},
block::{
block_exists, get_block_extended_header_from_height, get_block_height, get_block_info,
},
blockchain::{cumulative_generated_coins, top_block_height},
key_image::key_image_exists,
output::id_to_output_on_chain,
},
service::{
free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
types::{BlockchainReadHandle, ResponseResult},
},
tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables},
types::{
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
},
};
#[cold]
#[inline(never)] pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BlockchainReadHandle {
init_read_service_with_pool(env, init_thread_pool(threads))
}
#[cold]
#[inline(never)] pub fn init_read_service_with_pool(
env: Arc<ConcreteEnv>,
pool: Arc<ThreadPool>,
) -> BlockchainReadHandle {
DatabaseReadService::new(env, pool, map_request)
}
fn map_request(
env: &ConcreteEnv, request: BlockchainReadRequest, ) -> ResponseResult {
use BlockchainReadRequest as R;
match request {
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block, chain) => block_hash(env, block, chain),
R::FindBlock(block_hash) => find_block(env, block_hash),
R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
R::BlockExtendedHeaderInRange(range, chain) => {
block_extended_header_in_range(env, range, chain)
}
R::ChainHeight => chain_height(env),
R::GeneratedCoins(height) => generated_coins(env, height),
R::Outputs(map) => outputs(env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
R::KeyImagesSpent(set) => key_images_spent(env, set),
R::CompactChainHistory => compact_chain_history(env),
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
R::Block { height } => block(env, height),
R::BlockByHash(hash) => block_by_hash(env, hash),
R::TotalTxCount => total_tx_count(env),
R::DatabaseSize => database_size(env),
R::OutputHistogram(input) => output_histogram(env, input),
R::CoinbaseTxSum { height, count } => coinbase_tx_sum(env, height, count),
R::AltChains => alt_chains(env),
R::AltChainCount => alt_chain_count(env),
}
}
#[inline]
fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
ThreadLocal::with_capacity(env.config().reader_threads.get())
}
macro_rules! get_tables {
($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
$tables.get_or_try(|| {
match $env_inner.open_tables($tx_ro) {
Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
Err(e) => Err(e),
}
})
}};
}
#[inline]
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
Ok(BlockchainResponse::BlockExtendedHeader(
get_block_extended_header_from_height(&block_height, &tables)?,
))
}
#[inline]
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let block_hash = match chain {
Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
Chain::Alt(chain) => {
get_alt_block_hash(&block_height, chain, &env_inner.open_tables(&tx_ro)?)?
}
};
Ok(BlockchainResponse::BlockHash(block_hash))
}
fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
match table_block_heights.get(&block_hash) {
Ok(height) => return Ok(BlockchainResponse::FindBlock(Some((Chain::Main, height)))),
Err(RuntimeError::KeyNotFound) => (),
Err(e) => return Err(e),
}
let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
match table_alt_block_heights.get(&block_hash) {
Ok(height) => Ok(BlockchainResponse::FindBlock(Some((
Chain::Alt(height.chain_id.into()),
height.height,
)))),
Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)),
Err(e) => Err(e),
}
}
#[inline]
fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let mut err = None;
hashes.retain(
|block_hash| match block_exists(block_hash, &table_block_heights) {
Ok(exists) => exists,
Err(e) => {
err.get_or_insert(e);
false
}
},
);
if let Some(e) = err {
Err(e)
} else {
Ok(BlockchainResponse::FilterUnknownHashes(hashes))
}
}
#[inline]
fn block_extended_header_in_range(
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
chain: Chain,
) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let vec = match chain {
Chain::Main => range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?,
Chain::Alt(chain_id) => {
let ranges = {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let alt_chains = tables.alt_chain_infos();
get_alt_chain_history_ranges(range, chain_id, alt_chains)?
};
ranges
.par_iter()
.rev()
.flat_map(|(chain, range)| {
range.clone().into_par_iter().map(|height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
match *chain {
Chain::Main => get_block_extended_header_from_height(&height, tables),
Chain::Alt(chain_id) => get_alt_block_extended_header_from_height(
&AltBlockHeight {
chain_id: chain_id.into(),
height,
},
tables,
),
}
})
})
.collect::<Result<Vec<_>, _>>()?
}
};
Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec))
}
#[inline]
fn chain_height(env: &ConcreteEnv) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
let block_hash =
get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
Ok(BlockchainResponse::ChainHeight(chain_height, block_hash))
}
#[inline]
fn generated_coins(env: &ConcreteEnv, height: usize) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
Ok(BlockchainResponse::GeneratedCoins(
cumulative_generated_coins(&height, &table_block_infos)?,
))
}
#[inline]
fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let inner_map = |amount, amount_index| -> Result<(AmountIndex, OutputOnChain), RuntimeError> {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let id = PreRctOutputId {
amount,
amount_index,
};
let output_on_chain = id_to_output_on_chain(&id, tables)?;
Ok((amount_index, output_on_chain))
};
let map = outputs
.into_par_iter()
.map(|(amount, amount_index_set)| {
Ok((
amount,
amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<Result<HashMap<AmountIndex, OutputOnChain>, RuntimeError>>()?,
))
})
.collect::<Result<HashMap<Amount, HashMap<AmountIndex, OutputOnChain>>, RuntimeError>>()?;
Ok(BlockchainResponse::Outputs(map))
}
#[inline]
fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
#[expect(
clippy::cast_possible_truncation,
reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
)]
let num_rct_outputs = {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
tables.rct_outputs().len()? as usize
};
let map = amounts
.into_par_iter()
.map(|amount| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
if amount == 0 {
Ok((amount, num_rct_outputs))
} else {
match tables.num_outputs().get(&amount) {
#[expect(
clippy::cast_possible_truncation,
reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
)]
Ok(count) => Ok((amount, count as usize)),
Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
Err(e) => Err(e),
}
}
})
.collect::<Result<HashMap<Amount, usize>, RuntimeError>>()?;
Ok(BlockchainResponse::NumberOutputsWithAmount(map))
}
#[inline]
fn key_images_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let key_image_exists = |key_image| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
key_image_exists(&key_image, tables.key_images())
};
match key_images
.into_par_iter()
.map(key_image_exists)
.find_any(|result| !matches!(result, Ok(false)))
{
None | Some(Ok(false)) => Ok(BlockchainResponse::KeyImagesSpent(false)), Some(Ok(true)) => Ok(BlockchainResponse::KeyImagesSpent(true)), Some(Err(e)) => Err(e), }
}
fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_block_height = top_block_height(&table_block_heights)?;
let top_block_info = get_block_info(&top_block_height, &table_block_infos)?;
let cumulative_difficulty = combine_low_high_bits_to_u128(
top_block_info.cumulative_difficulty_low,
top_block_info.cumulative_difficulty_high,
);
const INITIAL_BLOCKS: usize = 11;
let mut block_ids = (0..)
.map(compact_history_index_to_height_offset::<INITIAL_BLOCKS>)
.map_while(|i| top_block_height.checked_sub(i))
.map(|height| Ok(get_block_info(&height, &table_block_infos)?.block_hash))
.collect::<Result<Vec<_>, RuntimeError>>()?;
if compact_history_genesis_not_included::<INITIAL_BLOCKS>(top_block_height) {
block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash);
}
Ok(BlockchainResponse::CompactChainHistory {
cumulative_difficulty,
block_ids,
})
}
fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let mut err = None;
let idx =
block_ids.partition_point(
|block_id| match block_exists(block_id, &table_block_heights) {
Ok(exists) => exists,
Err(e) => {
err.get_or_insert(e);
false
}
},
);
if let Some(e) = err {
return Err(e);
}
Ok(if idx == block_ids.len() {
BlockchainResponse::FindFirstUnknown(None)
} else if idx == 0 {
BlockchainResponse::FindFirstUnknown(Some((0, 0)))
} else {
let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?;
BlockchainResponse::FindFirstUnknown(Some((idx, last_known_height + 1)))
})
}
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let history = {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_alt_chain_history_ranges(0..usize::MAX, chain_id, tables.alt_chain_infos())?
};
let blocks = history
.par_iter()
.rev()
.skip(1)
.flat_map(|(chain_id, range)| {
let Chain::Alt(chain_id) = chain_id else {
panic!("Should not have main chain blocks here we skipped last range");
};
range.clone().into_par_iter().map(|height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_alt_block(
&AltBlockHeight {
chain_id: (*chain_id).into(),
height,
},
tables,
)
})
})
.collect::<Result<_, _>>()?;
Ok(BlockchainResponse::AltBlocksInChain(blocks))
}
fn block(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
Ok(BlockchainResponse::Block(todo!()))
}
fn block_by_hash(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
Ok(BlockchainResponse::Block(todo!()))
}
fn total_tx_count(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::TotalTxCount(todo!()))
}
fn database_size(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::DatabaseSize {
database_size: todo!(),
free_space: todo!(),
})
}
fn output_histogram(env: &ConcreteEnv, input: OutputHistogramInput) -> ResponseResult {
Ok(BlockchainResponse::OutputHistogram(todo!()))
}
fn coinbase_tx_sum(env: &ConcreteEnv, height: usize, count: u64) -> ResponseResult {
Ok(BlockchainResponse::CoinbaseTxSum(todo!()))
}
fn alt_chains(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::AltChains(todo!()))
}
fn alt_chain_count(env: &ConcreteEnv) -> ResponseResult {
Ok(BlockchainResponse::AltChainCount(todo!()))
}