cuprated/blockchain/interface.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
//! The blockchain manager interface.
//!
//! This module contains all the functions to mutate the blockchain's state in any way, through the
//! blockchain manager.
use std::{
collections::{HashMap, HashSet},
sync::{LazyLock, Mutex, OnceLock},
};
use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
TxpoolReadHandle,
};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use crate::{
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
///
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
/// in this file document what happens if this is not initialized when they are called.
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
/// An error that can be returned from [`handle_incoming_block`].
#[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError {
/// Some transactions in the block were unknown.
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<usize>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
/// The block was invalid.
#[error(transparent)]
InvalidBlock(anyhow::Error),
}
/// Try to add a new block to the blockchain.
///
/// On success returns [`IncomingBlockOk`].
///
/// # Errors
///
/// This function will return an error if:
/// - the block was invalid
/// - we are missing transactions
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
mut given_txs: HashMap<[u8; 32], Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
/// This lock prevents sending the same block to the blockchain manager from multiple connections
/// before one of them actually gets added to the chain, allowing peers to do other things.
///
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
/// which are also more expensive than `Mutex`s.
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
if given_txs.len() > block.transactions.len() {
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
"Too many transactions given for block"
)));
}
if !block_exists(block.header.previous, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Err(IncomingBlockError::Orphan);
}
let block_hash = block.hash();
if block_exists(block_hash, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Ok(IncomingBlockOk::AlreadyHave);
}
let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
if !missing.is_empty() {
let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
for needed_hash in needed_hashes {
let Some(tx) = given_txs.remove(&needed_hash) else {
// We return back the indexes of all txs missing from our pool, not taking into account the txs
// that were given with the block, as these txs will be dropped. It is not worth it to try to add
// these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
// the size limit.
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
};
txs.insert(
needed_hash,
new_tx_verification_data(tx)
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
);
}
}
let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manager.
return Ok(IncomingBlockOk::NotReady);
};
// Add the blocks hash to the blocks being handled.
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
// If another place is already adding this block then we can stop.
return Ok(IncomingBlockOk::AlreadyHave);
}
// We must remove the block hash from `BLOCKS_BEING_HANDLED`.
let _guard = {
struct RemoveFromBlocksBeingHandled {
block_hash: [u8; 32],
}
impl Drop for RemoveFromBlocksBeingHandled {
fn drop(&mut self) {
BLOCKS_BEING_HANDLED
.lock()
.unwrap()
.remove(&self.block_hash);
}
}
RemoveFromBlocksBeingHandled { block_hash }
};
let (response_tx, response_rx) = oneshot::channel();
incoming_block_tx
.send(BlockchainManagerCommand::AddBlock {
block,
prepped_txs: txs,
response_tx,
})
.await
.expect("TODO: don't actually panic here, an err means we are shutting down");
response_rx
.await
.expect("The blockchain manager will always respond")
.map_err(IncomingBlockError::InvalidBlock)
}
/// Check if we have a block with the given hash.
async fn block_exists(
block_hash: [u8; 32],
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, anyhow::Error> {
let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::FindBlock(block_hash))
.await?
else {
unreachable!();
};
Ok(chain.is_some())
}