1//! The blockchain manager interface.
2//!
3//! This module contains all the functions to mutate the blockchain's state in any way, through the
4//! blockchain manager.
5use std::{
6 collections::{HashMap, HashSet},
7 sync::{LazyLock, Mutex, OnceLock},
8};
910use monero_serai::{block::Block, transaction::Transaction};
11use tokio::sync::{mpsc, oneshot};
12use tower::{Service, ServiceExt};
1314use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::transactions::new_tx_verification_data;
16use cuprate_txpool::service::{
17 interface::{TxpoolReadRequest, TxpoolReadResponse},
18 TxpoolReadHandle,
19};
20use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
2122use crate::{
23 blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
24 constants::PANIC_CRITICAL_SERVICE_ERROR,
25};
2627/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
28///
29/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
30/// in this file document what happens if this is not initialized when they are called.
31pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
3233/// An error that can be returned from [`handle_incoming_block`].
34#[derive(Debug, thiserror::Error)]
35pub enum IncomingBlockError {
36/// Some transactions in the block were unknown.
37 ///
38 /// The inner values are the block hash and the indexes of the missing txs in the block.
39#[error("Unknown transactions in block.")]
40UnknownTransactions([u8; 32], Vec<usize>),
41/// We are missing the block's parent.
42#[error("The block has an unknown parent.")]
43Orphan,
44/// The block was invalid.
45#[error(transparent)]
46InvalidBlock(anyhow::Error),
47}
4849/// Try to add a new block to the blockchain.
50///
51/// On success returns [`IncomingBlockOk`].
52///
53/// # Errors
54///
55/// This function will return an error if:
56/// - the block was invalid
57/// - we are missing transactions
58/// - the block's parent is unknown
59pub async fn handle_incoming_block(
60 block: Block,
61mut given_txs: HashMap<[u8; 32], Transaction>,
62 blockchain_read_handle: &mut BlockchainReadHandle,
63 txpool_read_handle: &mut TxpoolReadHandle,
64) -> Result<IncomingBlockOk, IncomingBlockError> {
65/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
66 ///
67 /// This lock prevents sending the same block to the blockchain manager from multiple connections
68 /// before one of them actually gets added to the chain, allowing peers to do other things.
69 ///
70 /// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
71 /// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
72 /// which are also more expensive than `Mutex`s.
73static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
74 LazyLock::new(|| Mutex::new(HashSet::new()));
7576if given_txs.len() > block.transactions.len() {
77return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
78"Too many transactions given for block"
79)));
80 }
8182if !block_exists(block.header.previous, blockchain_read_handle)
83 .await
84.expect(PANIC_CRITICAL_SERVICE_ERROR)
85 {
86return Err(IncomingBlockError::Orphan);
87 }
8889let block_hash = block.hash();
9091if block_exists(block_hash, blockchain_read_handle)
92 .await
93.expect(PANIC_CRITICAL_SERVICE_ERROR)
94 {
95return Ok(IncomingBlockOk::AlreadyHave);
96 }
9798let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
99 .ready()
100 .await
101.expect(PANIC_CRITICAL_SERVICE_ERROR)
102 .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
103 .await
104.expect(PANIC_CRITICAL_SERVICE_ERROR)
105else {
106unreachable!()
107 };
108109if !missing.is_empty() {
110let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
111112for needed_hash in needed_hashes {
113let Some(tx) = given_txs.remove(&needed_hash) else {
114// We return back the indexes of all txs missing from our pool, not taking into account the txs
115 // that were given with the block, as these txs will be dropped. It is not worth it to try to add
116 // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
117 // the size limit.
118return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
119 };
120121 txs.insert(
122 needed_hash,
123 new_tx_verification_data(tx)
124 .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
125 );
126 }
127 }
128129let Some(incoming_block_tx) = COMMAND_TX.get() else {
130// We could still be starting up the blockchain manager.
131return Ok(IncomingBlockOk::NotReady);
132 };
133134// Add the blocks hash to the blocks being handled.
135if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
136// If another place is already adding this block then we can stop.
137return Ok(IncomingBlockOk::AlreadyHave);
138 }
139140// We must remove the block hash from `BLOCKS_BEING_HANDLED`.
141let _guard = {
142struct RemoveFromBlocksBeingHandled {
143 block_hash: [u8; 32],
144 }
145impl Drop for RemoveFromBlocksBeingHandled {
146fn drop(&mut self) {
147 BLOCKS_BEING_HANDLED
148 .lock()
149 .unwrap()
150 .remove(&self.block_hash);
151 }
152 }
153 RemoveFromBlocksBeingHandled { block_hash }
154 };
155156let (response_tx, response_rx) = oneshot::channel();
157158 incoming_block_tx
159 .send(BlockchainManagerCommand::AddBlock {
160 block,
161 prepped_txs: txs,
162 response_tx,
163 })
164 .await
165.expect("TODO: don't actually panic here, an err means we are shutting down");
166167 response_rx
168 .await
169.expect("The blockchain manager will always respond")
170 .map_err(IncomingBlockError::InvalidBlock)
171}
172173/// Check if we have a block with the given hash.
174async fn block_exists(
175 block_hash: [u8; 32],
176 blockchain_read_handle: &mut BlockchainReadHandle,
177) -> Result<bool, anyhow::Error> {
178let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
179 .ready()
180 .await?
181.call(BlockchainReadRequest::FindBlock(block_hash))
182 .await?
183else {
184unreachable!();
185 };
186187Ok(chain.is_some())
188}