cuprated/blockchain/
interface.rs

1//! The blockchain manager interface.
2//!
3//! This module contains all the functions to mutate the blockchain's state in any way, through the
4//! blockchain manager.
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{LazyLock, Mutex, OnceLock},
8};
9
10use monero_serai::{block::Block, transaction::Transaction};
11use tokio::sync::{mpsc, oneshot};
12use tower::{Service, ServiceExt};
13
14use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::transactions::new_tx_verification_data;
16use cuprate_txpool::service::{
17    interface::{TxpoolReadRequest, TxpoolReadResponse},
18    TxpoolReadHandle,
19};
20use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
21
22use crate::{
23    blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
24    constants::PANIC_CRITICAL_SERVICE_ERROR,
25};
26
27/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
28///
29/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
30/// in this file document what happens if this is not initialized when they are called.
31pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
32
33/// An error that can be returned from [`handle_incoming_block`].
34#[derive(Debug, thiserror::Error)]
35pub enum IncomingBlockError {
36    /// Some transactions in the block were unknown.
37    ///
38    /// The inner values are the block hash and the indexes of the missing txs in the block.
39    #[error("Unknown transactions in block.")]
40    UnknownTransactions([u8; 32], Vec<usize>),
41    /// We are missing the block's parent.
42    #[error("The block has an unknown parent.")]
43    Orphan,
44    /// The block was invalid.
45    #[error(transparent)]
46    InvalidBlock(anyhow::Error),
47}
48
49/// Try to add a new block to the blockchain.
50///
51/// On success returns [`IncomingBlockOk`].
52///
53/// # Errors
54///
55/// This function will return an error if:
56///  - the block was invalid
57///  - we are missing transactions
58///  - the block's parent is unknown
59pub async fn handle_incoming_block(
60    block: Block,
61    mut given_txs: HashMap<[u8; 32], Transaction>,
62    blockchain_read_handle: &mut BlockchainReadHandle,
63    txpool_read_handle: &mut TxpoolReadHandle,
64) -> Result<IncomingBlockOk, IncomingBlockError> {
65    /// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
66    ///
67    /// This lock prevents sending the same block to the blockchain manager from multiple connections
68    /// before one of them actually gets added to the chain, allowing peers to do other things.
69    ///
70    /// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
71    /// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
72    /// which are also more expensive than `Mutex`s.
73    static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
74        LazyLock::new(|| Mutex::new(HashSet::new()));
75
76    if given_txs.len() > block.transactions.len() {
77        return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
78            "Too many transactions given for block"
79        )));
80    }
81
82    if !block_exists(block.header.previous, blockchain_read_handle)
83        .await
84        .expect(PANIC_CRITICAL_SERVICE_ERROR)
85    {
86        return Err(IncomingBlockError::Orphan);
87    }
88
89    let block_hash = block.hash();
90
91    if block_exists(block_hash, blockchain_read_handle)
92        .await
93        .expect(PANIC_CRITICAL_SERVICE_ERROR)
94    {
95        return Ok(IncomingBlockOk::AlreadyHave);
96    }
97
98    let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
99        .ready()
100        .await
101        .expect(PANIC_CRITICAL_SERVICE_ERROR)
102        .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
103        .await
104        .expect(PANIC_CRITICAL_SERVICE_ERROR)
105    else {
106        unreachable!()
107    };
108
109    if !missing.is_empty() {
110        let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
111
112        for needed_hash in needed_hashes {
113            let Some(tx) = given_txs.remove(&needed_hash) else {
114                // We return back the indexes of all txs missing from our pool, not taking into account the txs
115                // that were given with the block, as these txs will be dropped. It is not worth it to try to add
116                // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
117                // the size limit.
118                return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
119            };
120
121            txs.insert(
122                needed_hash,
123                new_tx_verification_data(tx)
124                    .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
125            );
126        }
127    }
128
129    let Some(incoming_block_tx) = COMMAND_TX.get() else {
130        // We could still be starting up the blockchain manager.
131        return Ok(IncomingBlockOk::NotReady);
132    };
133
134    // Add the blocks hash to the blocks being handled.
135    if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
136        // If another place is already adding this block then we can stop.
137        return Ok(IncomingBlockOk::AlreadyHave);
138    }
139
140    // We must remove the block hash from `BLOCKS_BEING_HANDLED`.
141    let _guard = {
142        struct RemoveFromBlocksBeingHandled {
143            block_hash: [u8; 32],
144        }
145        impl Drop for RemoveFromBlocksBeingHandled {
146            fn drop(&mut self) {
147                BLOCKS_BEING_HANDLED
148                    .lock()
149                    .unwrap()
150                    .remove(&self.block_hash);
151            }
152        }
153        RemoveFromBlocksBeingHandled { block_hash }
154    };
155
156    let (response_tx, response_rx) = oneshot::channel();
157
158    incoming_block_tx
159        .send(BlockchainManagerCommand::AddBlock {
160            block,
161            prepped_txs: txs,
162            response_tx,
163        })
164        .await
165        .expect("TODO: don't actually panic here, an err means we are shutting down");
166
167    response_rx
168        .await
169        .expect("The blockchain manager will always respond")
170        .map_err(IncomingBlockError::InvalidBlock)
171}
172
173/// Check if we have a block with the given hash.
174async fn block_exists(
175    block_hash: [u8; 32],
176    blockchain_read_handle: &mut BlockchainReadHandle,
177) -> Result<bool, anyhow::Error> {
178    let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
179        .ready()
180        .await?
181        .call(BlockchainReadRequest::FindBlock(block_hash))
182        .await?
183    else {
184        unreachable!();
185    };
186
187    Ok(chain.is_some())
188}