cuprate_consensus/block/
batch_prepare.rs

1use std::{collections::HashMap, sync::Arc};
2
3use monero_serai::{block::Block, transaction::Transaction};
4use rayon::prelude::*;
5use tower::{Service, ServiceExt};
6use tracing::instrument;
7
8use cuprate_consensus_context::{rx_vms::RandomXVm, BlockchainContextService};
9use cuprate_consensus_rules::{
10    blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError},
11    hard_forks::HardForkError,
12    miner_tx::MinerTxError,
13    ConsensusError, HardFork,
14};
15use cuprate_helper::asynch::rayon_spawn_async;
16use cuprate_types::{output_cache::OutputCache, TransactionVerificationData};
17
18use crate::{
19    batch_verifier::MultiThreadedBatchVerifier,
20    block::{free::order_transactions, PreparedBlock, PreparedBlockExPow},
21    transactions::{check_kis_unique, contextual_data::get_output_cache, start_tx_verification},
22    BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
23    __private::Database,
24};
25
26/// Cached state created when batch preparing a group of blocks.
27///
28/// This cache is only valid for the set of blocks it was created with, it should not be used for
29/// other blocks.
30pub struct BatchPrepareCache {
31    pub(crate) output_cache: OutputCache,
32    /// [`true`] if all the key images in the batch have been checked for double spends in the batch and
33    /// the whole chain.
34    pub(crate) key_images_spent_checked: bool,
35}
36
37/// Batch prepares a list of blocks for verification.
38#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
39#[expect(clippy::type_complexity)]
40pub async fn batch_prepare_main_chain_blocks<D: Database>(
41    blocks: Vec<(Block, Vec<Transaction>)>,
42    context_svc: &mut BlockchainContextService,
43    mut database: D,
44) -> Result<
45    (
46        Vec<(PreparedBlock, Vec<TransactionVerificationData>)>,
47        BatchPrepareCache,
48    ),
49    ExtendedConsensusError,
50> {
51    let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
52
53    tracing::debug!("Calculating block hashes.");
54    let blocks: Vec<PreparedBlockExPow> = rayon_spawn_async(|| {
55        blocks
56            .into_iter()
57            .map(PreparedBlockExPow::new)
58            .collect::<Result<Vec<_>, _>>()
59    })
60    .await?;
61
62    let Some(last_block) = blocks.last() else {
63        return Err(ExtendedConsensusError::NoBlocksToVerify);
64    };
65
66    // hard-forks cannot be reversed, so the last block will contain the highest hard fork (provided the
67    // batch is valid).
68    let top_hf_in_batch = last_block.hf_version;
69
70    // A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block.
71    let mut timestamps_hfs = Vec::with_capacity(blocks.len());
72    let mut new_rx_vm = None;
73
74    tracing::debug!("Checking blocks follow each other.");
75
76    // For every block make sure they have the correct height and previous ID
77    for window in blocks.windows(2) {
78        let block_0 = &window[0];
79        let block_1 = &window[1];
80
81        // Make sure no blocks in the batch have a higher hard fork than the last block.
82        if block_0.hf_version > top_hf_in_batch {
83            return Err(ConsensusError::Block(BlockError::HardForkError(
84                HardForkError::VersionIncorrect,
85            ))
86            .into());
87        }
88
89        if block_0.block_hash != block_1.block.header.previous
90            || block_0.height != block_1.height - 1
91        {
92            tracing::debug!("Blocks do not follow each other, verification failed.");
93            return Err(ConsensusError::Block(BlockError::PreviousIDIncorrect).into());
94        }
95
96        // Cache any potential RX VM seeds as we may need them for future blocks in the batch.
97        if is_randomx_seed_height(block_0.height) && top_hf_in_batch >= HardFork::V12 {
98            new_rx_vm = Some((block_0.height, block_0.block_hash));
99        }
100
101        timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version));
102    }
103
104    // Calculate the expected difficulties for each block in the batch.
105    let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
106        .ready()
107        .await?
108        .call(BlockChainContextRequest::BatchGetDifficulties(
109            timestamps_hfs,
110        ))
111        .await?
112    else {
113        panic!("Context service returned wrong response!");
114    };
115
116    // Get the current blockchain context.
117    let context = context_svc.blockchain_context();
118
119    // Make sure the blocks follow the main chain.
120
121    if context.chain_height != blocks[0].height {
122        tracing::debug!("Blocks do not follow main chain, verification failed.");
123
124        return Err(ConsensusError::Block(BlockError::MinerTxError(
125            MinerTxError::InputsHeightIncorrect,
126        ))
127        .into());
128    }
129
130    if context.top_hash != blocks[0].block.header.previous {
131        tracing::debug!("Blocks do not follow main chain, verification failed.");
132
133        return Err(ConsensusError::Block(BlockError::PreviousIDIncorrect).into());
134    }
135
136    let mut rx_vms = if top_hf_in_batch < HardFork::V12 {
137        HashMap::new()
138    } else {
139        let BlockChainContextResponse::RxVms(rx_vms) = context_svc
140            .ready()
141            .await?
142            .call(BlockChainContextRequest::CurrentRxVms)
143            .await?
144        else {
145            panic!("Blockchain context service returned wrong response!");
146        };
147
148        rx_vms
149    };
150
151    // If we have a RX seed in the batch calculate it.
152    if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
153        tracing::debug!("New randomX seed in batch, initialising VM");
154
155        let new_vm = rayon_spawn_async(move || {
156            Arc::new(RandomXVm::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
157        })
158        .await;
159
160        // Give the new VM to the context service, so it can cache it.
161        context_svc
162            .oneshot(BlockChainContextRequest::NewRXVM((
163                new_vm_seed,
164                Arc::clone(&new_vm),
165            )))
166            .await?;
167
168        rx_vms.insert(new_vm_height, new_vm);
169    }
170
171    tracing::debug!("Calculating PoW and prepping transaction");
172
173    let blocks = rayon_spawn_async(move || {
174        let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads());
175
176        let res = blocks
177            .into_par_iter()
178            .zip(difficulties)
179            .zip(txs)
180            .map(|((block, difficultly), txs)| {
181                // Calculate the PoW for the block.
182                let height = block.height;
183                let block = PreparedBlock::new_prepped(
184                    block,
185                    rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
186                )?;
187
188                // Check the PoW
189                check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
190
191                let mut txs = start_tx_verification()
192                    .append_txs(txs)
193                    .prepare()?
194                    .only_semantic(block.hf_version)
195                    .queue(&batch_verifier)?;
196
197                // Order the txs correctly.
198                order_transactions(&block.block, &mut txs)?;
199
200                Ok((block, txs))
201            })
202            .collect::<Result<Vec<_>, ExtendedConsensusError>>()?;
203
204        if !batch_verifier.verify() {
205            return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid);
206        }
207
208        Ok(res)
209    })
210    .await?;
211
212    check_kis_unique(blocks.iter().flat_map(|(_, txs)| txs.iter()), &mut database).await?;
213
214    let output_cache =
215        get_output_cache(blocks.iter().flat_map(|(_, txs)| txs.iter()), database).await?;
216
217    Ok((
218        blocks,
219        BatchPrepareCache {
220            output_cache,
221            key_images_spent_checked: true,
222        },
223    ))
224}