cuprate_consensus/block/
batch_prepare.rs1use std::{collections::HashMap, sync::Arc};
2
3use monero_serai::{block::Block, transaction::Transaction};
4use rayon::prelude::*;
5use tower::{Service, ServiceExt};
6use tracing::instrument;
7
8use cuprate_consensus_context::{rx_vms::RandomXVm, BlockchainContextService};
9use cuprate_consensus_rules::{
10 blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError},
11 hard_forks::HardForkError,
12 miner_tx::MinerTxError,
13 ConsensusError, HardFork,
14};
15use cuprate_helper::asynch::rayon_spawn_async;
16use cuprate_types::{output_cache::OutputCache, TransactionVerificationData};
17
18use crate::{
19 batch_verifier::MultiThreadedBatchVerifier,
20 block::{free::order_transactions, PreparedBlock, PreparedBlockExPow},
21 transactions::{check_kis_unique, contextual_data::get_output_cache, start_tx_verification},
22 BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
23 __private::Database,
24};
25
26pub struct BatchPrepareCache {
31 pub(crate) output_cache: OutputCache,
32 pub(crate) key_images_spent_checked: bool,
35}
36
37#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
39#[expect(clippy::type_complexity)]
40pub async fn batch_prepare_main_chain_blocks<D: Database>(
41 blocks: Vec<(Block, Vec<Transaction>)>,
42 context_svc: &mut BlockchainContextService,
43 mut database: D,
44) -> Result<
45 (
46 Vec<(PreparedBlock, Vec<TransactionVerificationData>)>,
47 BatchPrepareCache,
48 ),
49 ExtendedConsensusError,
50> {
51 let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
52
53 tracing::debug!("Calculating block hashes.");
54 let blocks: Vec<PreparedBlockExPow> = rayon_spawn_async(|| {
55 blocks
56 .into_iter()
57 .map(PreparedBlockExPow::new)
58 .collect::<Result<Vec<_>, _>>()
59 })
60 .await?;
61
62 let Some(last_block) = blocks.last() else {
63 return Err(ExtendedConsensusError::NoBlocksToVerify);
64 };
65
66 let top_hf_in_batch = last_block.hf_version;
69
70 let mut timestamps_hfs = Vec::with_capacity(blocks.len());
72 let mut new_rx_vm = None;
73
74 tracing::debug!("Checking blocks follow each other.");
75
76 for window in blocks.windows(2) {
78 let block_0 = &window[0];
79 let block_1 = &window[1];
80
81 if block_0.hf_version > top_hf_in_batch {
83 return Err(ConsensusError::Block(BlockError::HardForkError(
84 HardForkError::VersionIncorrect,
85 ))
86 .into());
87 }
88
89 if block_0.block_hash != block_1.block.header.previous
90 || block_0.height != block_1.height - 1
91 {
92 tracing::debug!("Blocks do not follow each other, verification failed.");
93 return Err(ConsensusError::Block(BlockError::PreviousIDIncorrect).into());
94 }
95
96 if is_randomx_seed_height(block_0.height) && top_hf_in_batch >= HardFork::V12 {
98 new_rx_vm = Some((block_0.height, block_0.block_hash));
99 }
100
101 timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version));
102 }
103
104 let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
106 .ready()
107 .await?
108 .call(BlockChainContextRequest::BatchGetDifficulties(
109 timestamps_hfs,
110 ))
111 .await?
112 else {
113 panic!("Context service returned wrong response!");
114 };
115
116 let context = context_svc.blockchain_context();
118
119 if context.chain_height != blocks[0].height {
122 tracing::debug!("Blocks do not follow main chain, verification failed.");
123
124 return Err(ConsensusError::Block(BlockError::MinerTxError(
125 MinerTxError::InputsHeightIncorrect,
126 ))
127 .into());
128 }
129
130 if context.top_hash != blocks[0].block.header.previous {
131 tracing::debug!("Blocks do not follow main chain, verification failed.");
132
133 return Err(ConsensusError::Block(BlockError::PreviousIDIncorrect).into());
134 }
135
136 let mut rx_vms = if top_hf_in_batch < HardFork::V12 {
137 HashMap::new()
138 } else {
139 let BlockChainContextResponse::RxVms(rx_vms) = context_svc
140 .ready()
141 .await?
142 .call(BlockChainContextRequest::CurrentRxVms)
143 .await?
144 else {
145 panic!("Blockchain context service returned wrong response!");
146 };
147
148 rx_vms
149 };
150
151 if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
153 tracing::debug!("New randomX seed in batch, initialising VM");
154
155 let new_vm = rayon_spawn_async(move || {
156 Arc::new(RandomXVm::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
157 })
158 .await;
159
160 context_svc
162 .oneshot(BlockChainContextRequest::NewRXVM((
163 new_vm_seed,
164 Arc::clone(&new_vm),
165 )))
166 .await?;
167
168 rx_vms.insert(new_vm_height, new_vm);
169 }
170
171 tracing::debug!("Calculating PoW and prepping transaction");
172
173 let blocks = rayon_spawn_async(move || {
174 let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads());
175
176 let res = blocks
177 .into_par_iter()
178 .zip(difficulties)
179 .zip(txs)
180 .map(|((block, difficultly), txs)| {
181 let height = block.height;
183 let block = PreparedBlock::new_prepped(
184 block,
185 rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
186 )?;
187
188 check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
190
191 let mut txs = start_tx_verification()
192 .append_txs(txs)
193 .prepare()?
194 .only_semantic(block.hf_version)
195 .queue(&batch_verifier)?;
196
197 order_transactions(&block.block, &mut txs)?;
199
200 Ok((block, txs))
201 })
202 .collect::<Result<Vec<_>, ExtendedConsensusError>>()?;
203
204 if !batch_verifier.verify() {
205 return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid);
206 }
207
208 Ok(res)
209 })
210 .await?;
211
212 check_kis_unique(blocks.iter().flat_map(|(_, txs)| txs.iter()), &mut database).await?;
213
214 let output_cache =
215 get_output_cache(blocks.iter().flat_map(|(_, txs)| txs.iter()), database).await?;
216
217 Ok((
218 blocks,
219 BatchPrepareCache {
220 output_cache,
221 key_images_spent_checked: true,
222 },
223 ))
224}