cuprated/blockchain/
interface.rs1use std::{
6 collections::{HashMap, HashSet},
7 sync::{LazyLock, Mutex, OnceLock},
8};
9
10use monero_serai::{block::Block, transaction::Transaction};
11use tokio::sync::{mpsc, oneshot};
12use tower::{Service, ServiceExt};
13
14use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::transactions::new_tx_verification_data;
16use cuprate_txpool::service::{
17 interface::{TxpoolReadRequest, TxpoolReadResponse},
18 TxpoolReadHandle,
19};
20use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
21
22use crate::{
23 blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
24 constants::PANIC_CRITICAL_SERVICE_ERROR,
25};
26
27pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
32
33#[derive(Debug, thiserror::Error)]
35pub enum IncomingBlockError {
36 #[error("Unknown transactions in block.")]
40 UnknownTransactions([u8; 32], Vec<usize>),
41 #[error("The block has an unknown parent.")]
43 Orphan,
44 #[error(transparent)]
46 InvalidBlock(anyhow::Error),
47}
48
49pub async fn handle_incoming_block(
60 block: Block,
61 mut given_txs: HashMap<[u8; 32], Transaction>,
62 blockchain_read_handle: &mut BlockchainReadHandle,
63 txpool_read_handle: &mut TxpoolReadHandle,
64) -> Result<IncomingBlockOk, IncomingBlockError> {
65 static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
74 LazyLock::new(|| Mutex::new(HashSet::new()));
75
76 if given_txs.len() > block.transactions.len() {
77 return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
78 "Too many transactions given for block"
79 )));
80 }
81
82 if !block_exists(block.header.previous, blockchain_read_handle)
83 .await
84 .expect(PANIC_CRITICAL_SERVICE_ERROR)
85 {
86 return Err(IncomingBlockError::Orphan);
87 }
88
89 let block_hash = block.hash();
90
91 if block_exists(block_hash, blockchain_read_handle)
92 .await
93 .expect(PANIC_CRITICAL_SERVICE_ERROR)
94 {
95 return Ok(IncomingBlockOk::AlreadyHave);
96 }
97
98 let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
99 .ready()
100 .await
101 .expect(PANIC_CRITICAL_SERVICE_ERROR)
102 .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
103 .await
104 .expect(PANIC_CRITICAL_SERVICE_ERROR)
105 else {
106 unreachable!()
107 };
108
109 if !missing.is_empty() {
110 let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
111
112 for needed_hash in needed_hashes {
113 let Some(tx) = given_txs.remove(&needed_hash) else {
114 return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
119 };
120
121 txs.insert(
122 needed_hash,
123 new_tx_verification_data(tx)
124 .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
125 );
126 }
127 }
128
129 let Some(incoming_block_tx) = COMMAND_TX.get() else {
130 return Ok(IncomingBlockOk::NotReady);
132 };
133
134 if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
136 return Ok(IncomingBlockOk::AlreadyHave);
138 }
139
140 let _guard = {
142 struct RemoveFromBlocksBeingHandled {
143 block_hash: [u8; 32],
144 }
145 impl Drop for RemoveFromBlocksBeingHandled {
146 fn drop(&mut self) {
147 BLOCKS_BEING_HANDLED
148 .lock()
149 .unwrap()
150 .remove(&self.block_hash);
151 }
152 }
153 RemoveFromBlocksBeingHandled { block_hash }
154 };
155
156 let (response_tx, response_rx) = oneshot::channel();
157
158 incoming_block_tx
159 .send(BlockchainManagerCommand::AddBlock {
160 block,
161 prepped_txs: txs,
162 response_tx,
163 })
164 .await
165 .expect("TODO: don't actually panic here, an err means we are shutting down");
166
167 response_rx
168 .await
169 .expect("The blockchain manager will always respond")
170 .map_err(IncomingBlockError::InvalidBlock)
171}
172
173async fn block_exists(
175 block_hash: [u8; 32],
176 blockchain_read_handle: &mut BlockchainReadHandle,
177) -> Result<bool, anyhow::Error> {
178 let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
179 .ready()
180 .await?
181 .call(BlockchainReadRequest::FindBlock(block_hash))
182 .await?
183 else {
184 unreachable!();
185 };
186
187 Ok(chain.is_some())
188}