cuprated/blockchain/manager/
handler.rs

1//! The blockchain manager handler functions.
2use std::{collections::HashMap, sync::Arc};
3
4use bytes::Bytes;
5use futures::{TryFutureExt, TryStreamExt};
6use monero_serai::{
7    block::Block,
8    transaction::{Input, Transaction},
9};
10use rayon::prelude::*;
11use tower::{Service, ServiceExt};
12use tracing::{info, instrument, Span};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::{
16    block::{
17        batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
18        verify_prepped_main_chain_block, PreparedBlock,
19    },
20    transactions::new_tx_verification_data,
21    BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
22};
23use cuprate_consensus_context::NewBlockData;
24use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
25use cuprate_helper::cast::usize_to_u64;
26use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
27use cuprate_txpool::service::interface::TxpoolWriteRequest;
28use cuprate_types::{
29    blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
30    AltBlockInformation, Chain, HardFork, TransactionVerificationData, VerifiedBlockInformation,
31};
32
33use crate::{
34    blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
35    constants::PANIC_CRITICAL_SERVICE_ERROR,
36    signals::REORG_LOCK,
37};
38
39impl super::BlockchainManager {
40    /// Handle an incoming command from another part of Cuprate.
41    ///
42    /// # Panics
43    ///
44    /// This function will panic if any internal service returns an unexpected error that we cannot
45    /// recover from.
46    pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
47        match command {
48            BlockchainManagerCommand::AddBlock {
49                block,
50                prepped_txs,
51                response_tx,
52            } => {
53                let res = self.handle_incoming_block(block, prepped_txs).await;
54
55                drop(response_tx.send(res));
56            }
57        }
58    }
59
60    /// Broadcast a valid block to the network.
61    async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
62        self.broadcast_svc
63            .ready()
64            .await
65            .expect("Broadcast service is Infallible.")
66            .call(BroadcastRequest::Block {
67                block_bytes,
68                current_blockchain_height: usize_to_u64(blockchain_height),
69            })
70            .await
71            .expect("Broadcast service is Infallible.");
72    }
73
74    /// Handle an incoming [`Block`].
75    ///
76    /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow
77    /// the top of the main chain.
78    ///
79    /// Otherwise, this function will validate and add the block to the main chain.
80    ///
81    /// # Panics
82    ///
83    /// This function will panic if any internal service returns an unexpected error that we cannot
84    /// recover from.
85    pub async fn handle_incoming_block(
86        &mut self,
87        block: Block,
88        prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
89    ) -> Result<IncomingBlockOk, anyhow::Error> {
90        if block.header.previous
91            != self
92                .blockchain_context_service
93                .blockchain_context()
94                .top_hash
95        {
96            self.handle_incoming_alt_block(block, prepared_txs).await?;
97            return Ok(IncomingBlockOk::AddedToAltChain);
98        }
99
100        let verified_block = verify_main_chain_block(
101            block,
102            prepared_txs,
103            &mut self.blockchain_context_service,
104            self.blockchain_read_handle.clone(),
105        )
106        .await?;
107
108        let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
109        self.add_valid_block_to_main_chain(verified_block).await;
110
111        let chain_height = self
112            .blockchain_context_service
113            .blockchain_context()
114            .chain_height;
115
116        self.broadcast_block(block_blob, chain_height).await;
117
118        Ok(IncomingBlockOk::AddedToMainChain)
119    }
120
121    /// Handle an incoming [`BlockBatch`].
122    ///
123    /// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`]
124    /// depending on if the first block in the batch follows from the top of our chain.
125    ///
126    /// # Panics
127    ///
128    /// This function will panic if the batch is empty or if any internal service returns an unexpected
129    /// error that we cannot recover from or if the incoming batch contains no blocks.
130    #[instrument(
131        name = "incoming_block_batch",
132        skip_all,
133        level = "info",
134        fields(
135            start_height = batch.blocks.first().unwrap().0.number().unwrap(),
136            len = batch.blocks.len()
137        )
138    )]
139    pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
140        let (first_block, _) = batch
141            .blocks
142            .first()
143            .expect("Block batch should not be empty");
144
145        if first_block.header.previous
146            == self
147                .blockchain_context_service
148                .blockchain_context()
149                .top_hash
150        {
151            self.handle_incoming_block_batch_main_chain(batch).await;
152        } else {
153            self.handle_incoming_block_batch_alt_chain(batch).await;
154        }
155    }
156
157    /// Handles an incoming [`BlockBatch`] that follows the main chain.
158    ///
159    /// This function will handle validating the blocks in the batch and adding them to the blockchain
160    /// database and context cache.
161    ///
162    /// This function will also handle banning the peer and canceling the block downloader if the
163    /// block is invalid.
164    ///
165    /// # Panics
166    ///
167    /// This function will panic if any internal service returns an unexpected error that we cannot
168    /// recover from or if the incoming batch contains no blocks.
169    async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
170        if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
171            self.handle_incoming_block_batch_fast_sync(batch).await;
172            return;
173        }
174
175        let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
176            batch.blocks,
177            &mut self.blockchain_context_service,
178            self.blockchain_read_handle.clone(),
179        )
180        .await
181        else {
182            batch.peer_handle.ban_peer(LONG_BAN);
183            self.stop_current_block_downloader.notify_one();
184            return;
185        };
186
187        for (block, txs) in prepped_blocks {
188            let Ok(verified_block) = verify_prepped_main_chain_block(
189                block,
190                txs,
191                &mut self.blockchain_context_service,
192                self.blockchain_read_handle.clone(),
193                Some(&mut output_cache),
194            )
195            .await
196            else {
197                batch.peer_handle.ban_peer(LONG_BAN);
198                self.stop_current_block_downloader.notify_one();
199                return;
200            };
201
202            self.add_valid_block_to_main_chain(verified_block).await;
203        }
204        info!(fast_sync = false, "Successfully added block batch");
205    }
206
207    /// Handles an incoming block batch while we are under the fast sync height.
208    ///
209    /// # Panics
210    ///
211    /// This function will panic if any internal service returns an unexpected error that we cannot
212    /// recover from.
213    async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
214        let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
215        for (block, txs) in batch.blocks {
216            let block = block_to_verified_block_information(
217                block,
218                txs,
219                self.blockchain_context_service.blockchain_context(),
220            );
221            self.add_valid_block_to_blockchain_cache(&block).await;
222
223            valid_blocks.push(block);
224        }
225
226        self.batch_add_valid_block_to_blockchain_database(valid_blocks)
227            .await;
228
229        info!(fast_sync = true, "Successfully added block batch");
230    }
231
232    /// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
233    ///
234    /// This function will handle validating the alt-blocks to add them to our cache and reorging the
235    /// chain if the alt-chain has a higher cumulative difficulty.
236    ///
237    /// This function will also handle banning the peer and canceling the block downloader if the
238    /// alt block is invalid or if a reorg fails.
239    ///
240    /// # Panics
241    ///
242    /// This function will panic if any internal service returns an unexpected error that we cannot
243    /// recover from.
244    async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
245        // TODO: this needs testing (this whole section does but alt-blocks specifically).
246        let mut blocks = batch.blocks.into_iter();
247
248        while let Some((block, txs)) = blocks.next() {
249            // async blocks work as try blocks.
250            let res = async {
251                let txs = txs
252                    .into_par_iter()
253                    .map(|tx| {
254                        let tx = new_tx_verification_data(tx)?;
255                        Ok((tx.tx_hash, tx))
256                    })
257                    .collect::<Result<_, anyhow::Error>>()?;
258
259                let reorged = self.handle_incoming_alt_block(block, txs).await?;
260
261                Ok::<_, anyhow::Error>(reorged)
262            }
263            .await;
264
265            match res {
266                Err(e) => {
267                    batch.peer_handle.ban_peer(LONG_BAN);
268                    self.stop_current_block_downloader.notify_one();
269                    return;
270                }
271                Ok(AddAltBlock::Reorged) => {
272                    // Collect the remaining blocks and add them to the main chain instead.
273                    batch.blocks = blocks.collect();
274                    self.handle_incoming_block_batch_main_chain(batch).await;
275                    return;
276                }
277                // continue adding alt blocks.
278                Ok(AddAltBlock::Cached) => (),
279            }
280        }
281
282        info!(alt_chain = true, "Successfully added block batch");
283    }
284
285    /// Handles an incoming alt [`Block`].
286    ///
287    /// This function will do some pre-validation of the alt block, then if the cumulative difficulty
288    /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
289    /// the alt block to the alt block cache.
290    ///
291    /// # Errors
292    ///
293    /// This will return an [`Err`] if:
294    ///  - The alt block was invalid.
295    ///  - An attempt to reorg the chain failed.
296    ///
297    /// # Panics
298    ///
299    /// This function will panic if any internal service returns an unexpected error that we cannot
300    /// recover from.
301    async fn handle_incoming_alt_block(
302        &mut self,
303        block: Block,
304        prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
305    ) -> Result<AddAltBlock, anyhow::Error> {
306        // Check if a block already exists.
307        let BlockchainResponse::FindBlock(chain) = self
308            .blockchain_read_handle
309            .ready()
310            .await
311            .expect(PANIC_CRITICAL_SERVICE_ERROR)
312            .call(BlockchainReadRequest::FindBlock(block.hash()))
313            .await
314            .expect(PANIC_CRITICAL_SERVICE_ERROR)
315        else {
316            unreachable!();
317        };
318
319        match chain {
320            Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached),
321            Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
322            None => (),
323        }
324
325        let alt_block_info =
326            sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
327                .await?;
328
329        // If this alt chain has more cumulative difficulty, reorg.
330        if alt_block_info.cumulative_difficulty
331            > self
332                .blockchain_context_service
333                .blockchain_context()
334                .cumulative_difficulty
335        {
336            self.try_do_reorg(alt_block_info).await?;
337            return Ok(AddAltBlock::Reorged);
338        }
339
340        self.blockchain_write_handle
341            .ready()
342            .await
343            .expect(PANIC_CRITICAL_SERVICE_ERROR)
344            .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
345            .await?;
346
347        Ok(AddAltBlock::Cached)
348    }
349
350    /// Attempt a re-org with the given top block of the alt-chain.
351    ///
352    /// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database
353    /// and context cache to verify the alt-chain. It will then attempt to verify and add each block
354    /// in the alt-chain to the main-chain. Releasing the lock on [`REORG_LOCK`] when finished.
355    ///
356    /// # Errors
357    ///
358    /// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain
359    /// will be returned back into its state it was at when then function was called.
360    ///
361    /// # Panics
362    ///
363    /// This function will panic if any internal service returns an unexpected error that we cannot
364    /// recover from.
365    async fn try_do_reorg(
366        &mut self,
367        top_alt_block: AltBlockInformation,
368    ) -> Result<(), anyhow::Error> {
369        let _guard = REORG_LOCK.write().await;
370
371        let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
372            .blockchain_read_handle
373            .ready()
374            .await
375            .expect(PANIC_CRITICAL_SERVICE_ERROR)
376            .call(BlockchainReadRequest::AltBlocksInChain(
377                top_alt_block.chain_id,
378            ))
379            .await
380            .map_err(|e| anyhow::anyhow!(e))?
381        else {
382            unreachable!();
383        };
384
385        alt_blocks.push(top_alt_block);
386
387        let split_height = alt_blocks[0].height;
388        let current_main_chain_height = self
389            .blockchain_context_service
390            .blockchain_context()
391            .chain_height;
392
393        let BlockchainResponse::PopBlocks(old_main_chain_id) = self
394            .blockchain_write_handle
395            .ready()
396            .await
397            .expect(PANIC_CRITICAL_SERVICE_ERROR)
398            .call(BlockchainWriteRequest::PopBlocks(
399                current_main_chain_height - split_height,
400            ))
401            .await
402            .expect(PANIC_CRITICAL_SERVICE_ERROR)
403        else {
404            unreachable!();
405        };
406
407        self.blockchain_context_service
408            .ready()
409            .await
410            .expect(PANIC_CRITICAL_SERVICE_ERROR)
411            .call(BlockChainContextRequest::PopBlocks {
412                numb_blocks: current_main_chain_height - split_height,
413            })
414            .await
415            .expect(PANIC_CRITICAL_SERVICE_ERROR);
416
417        let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
418
419        match reorg_res {
420            Ok(()) => Ok(()),
421            Err(e) => {
422                todo!("Reverse reorg")
423            }
424        }
425    }
426
427    /// Verify and add a list of [`AltBlockInformation`]s to the main-chain.
428    ///
429    /// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain
430    /// for the blockchain database and the context cache, or in other words that the blockchain database
431    /// and context cache have already had the top blocks popped to where the alt-chain meets the main-chain.
432    ///
433    /// # Errors
434    ///
435    /// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should
436    /// be aborted and the chain should be returned to its previous state.
437    ///
438    /// # Panics
439    ///
440    /// This function will panic if any internal service returns an unexpected error that we cannot
441    /// recover from.
442    async fn verify_add_alt_blocks_to_main_chain(
443        &mut self,
444        alt_blocks: Vec<AltBlockInformation>,
445    ) -> Result<(), anyhow::Error> {
446        for mut alt_block in alt_blocks {
447            let prepped_txs = alt_block
448                .txs
449                .drain(..)
450                .map(|tx| Ok(tx.try_into()?))
451                .collect::<Result<_, anyhow::Error>>()?;
452
453            let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
454
455            let verified_block = verify_prepped_main_chain_block(
456                prepped_block,
457                prepped_txs,
458                &mut self.blockchain_context_service,
459                self.blockchain_read_handle.clone(),
460                None,
461            )
462            .await?;
463
464            self.add_valid_block_to_main_chain(verified_block).await;
465        }
466
467        Ok(())
468    }
469
470    /// Adds a [`VerifiedBlockInformation`] to the main-chain.
471    ///
472    /// This function will update the blockchain database and the context cache.
473    ///
474    /// # Panics
475    ///
476    /// This function will panic if any internal service returns an unexpected error that we cannot
477    /// recover from.
478    pub async fn add_valid_block_to_main_chain(
479        &mut self,
480        verified_block: VerifiedBlockInformation,
481    ) {
482        // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate.
483        let spent_key_images = verified_block
484            .txs
485            .iter()
486            .flat_map(|tx| {
487                tx.tx.prefix().inputs.iter().map(|input| match input {
488                    Input::ToKey { key_image, .. } => key_image.0,
489                    Input::Gen(_) => unreachable!(),
490                })
491            })
492            .collect::<Vec<[u8; 32]>>();
493
494        self.add_valid_block_to_blockchain_cache(&verified_block)
495            .await;
496
497        self.blockchain_write_handle
498            .ready()
499            .await
500            .expect(PANIC_CRITICAL_SERVICE_ERROR)
501            .call(BlockchainWriteRequest::WriteBlock(verified_block))
502            .await
503            .expect(PANIC_CRITICAL_SERVICE_ERROR);
504
505        self.txpool_write_handle
506            .ready()
507            .await
508            .expect(PANIC_CRITICAL_SERVICE_ERROR)
509            .call(TxpoolWriteRequest::NewBlock { spent_key_images })
510            .await
511            .expect(PANIC_CRITICAL_SERVICE_ERROR);
512    }
513
514    /// Adds a [`VerifiedBlockInformation`] to the blockchain context cache.
515    ///
516    /// # Panics
517    ///
518    /// This function will panic if any internal service returns an unexpected error that we cannot
519    /// recover from.
520    async fn add_valid_block_to_blockchain_cache(
521        &mut self,
522        verified_block: &VerifiedBlockInformation,
523    ) {
524        self.blockchain_context_service
525            .ready()
526            .await
527            .expect(PANIC_CRITICAL_SERVICE_ERROR)
528            .call(BlockChainContextRequest::Update(NewBlockData {
529                block_hash: verified_block.block_hash,
530                height: verified_block.height,
531                timestamp: verified_block.block.header.timestamp,
532                weight: verified_block.weight,
533                long_term_weight: verified_block.long_term_weight,
534                generated_coins: verified_block.generated_coins,
535                vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
536                cumulative_difficulty: verified_block.cumulative_difficulty,
537            }))
538            .await
539            .expect(PANIC_CRITICAL_SERVICE_ERROR);
540    }
541
542    /// Batch writes the [`VerifiedBlockInformation`]s to the database.
543    ///
544    /// The blocks must be sequential.
545    ///
546    /// # Panics
547    ///
548    /// This function will panic if any internal service returns an unexpected error that we cannot
549    /// recover from.
550    async fn batch_add_valid_block_to_blockchain_database(
551        &mut self,
552        blocks: Vec<VerifiedBlockInformation>,
553    ) {
554        self.blockchain_write_handle
555            .ready()
556            .await
557            .expect(PANIC_CRITICAL_SERVICE_ERROR)
558            .call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
559            .await
560            .expect(PANIC_CRITICAL_SERVICE_ERROR);
561    }
562}
563
564/// The result from successfully adding an alt-block.
565enum AddAltBlock {
566    /// The alt-block was cached.
567    Cached,
568    /// The chain was reorged.
569    Reorged,
570}