cuprated/blockchain/manager/
handler.rs

1//! The blockchain manager handler functions.
2use std::{collections::HashMap, sync::Arc};
3
4use bytes::Bytes;
5use futures::{TryFutureExt, TryStreamExt};
6use monero_serai::{
7    block::Block,
8    transaction::{Input, Transaction},
9};
10use rayon::prelude::*;
11use tower::{Service, ServiceExt};
12use tracing::{info, instrument, warn, Span};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::{
16    block::{
17        batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
18        verify_prepped_main_chain_block, PreparedBlock,
19    },
20    transactions::new_tx_verification_data,
21    BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
22};
23use cuprate_consensus_context::{BlockchainContext, NewBlockData};
24use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
25use cuprate_helper::cast::usize_to_u64;
26use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
27use cuprate_txpool::service::interface::TxpoolWriteRequest;
28use cuprate_types::{
29    blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
30    AltBlockInformation, Chain, ChainId, HardFork, TransactionVerificationData,
31    VerifiedBlockInformation, VerifiedTransactionInformation,
32};
33
34use crate::{
35    blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
36    constants::PANIC_CRITICAL_SERVICE_ERROR,
37    signals::REORG_LOCK,
38};
39
40impl super::BlockchainManager {
41    /// Handle an incoming command from another part of Cuprate.
42    ///
43    /// # Panics
44    ///
45    /// This function will panic if any internal service returns an unexpected error that we cannot
46    /// recover from.
47    pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
48        match command {
49            BlockchainManagerCommand::AddBlock {
50                block,
51                prepped_txs,
52                response_tx,
53            } => {
54                let res = self.handle_incoming_block(block, prepped_txs).await;
55
56                drop(response_tx.send(res));
57            }
58        }
59    }
60
61    /// Broadcast a valid block to the network.
62    async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
63        self.broadcast_svc
64            .ready()
65            .await
66            .expect("Broadcast service is Infallible.")
67            .call(BroadcastRequest::Block {
68                block_bytes,
69                current_blockchain_height: usize_to_u64(blockchain_height),
70            })
71            .await
72            .expect("Broadcast service is Infallible.");
73    }
74
75    /// Handle an incoming [`Block`].
76    ///
77    /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow
78    /// the top of the main chain.
79    ///
80    /// Otherwise, this function will validate and add the block to the main chain.
81    ///
82    /// # Panics
83    ///
84    /// This function will panic if any internal service returns an unexpected error that we cannot
85    /// recover from.
86    #[instrument(
87        name = "incoming_block",
88        skip_all,
89        level = "info",
90        fields(
91            height = block.number().unwrap(),
92            txs = block.transactions.len(),
93        )
94    )]
95    pub async fn handle_incoming_block(
96        &mut self,
97        block: Block,
98        prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
99    ) -> Result<IncomingBlockOk, anyhow::Error> {
100        if block.header.previous
101            != self
102                .blockchain_context_service
103                .blockchain_context()
104                .top_hash
105        {
106            let block_hash = block.hash();
107            let res = self.handle_incoming_alt_block(block, prepared_txs).await?;
108
109            if matches!(res, AddAltBlock::Cached(true)) {
110                info!(
111                    alt_block = true,
112                    hash = hex::encode(block_hash),
113                    "Successfully added block"
114                );
115            }
116
117            return Ok(IncomingBlockOk::AddedToAltChain);
118        }
119
120        let verified_block = verify_main_chain_block(
121            block,
122            prepared_txs,
123            &mut self.blockchain_context_service,
124            self.blockchain_read_handle.clone(),
125        )
126        .await?;
127
128        let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
129        self.add_valid_block_to_main_chain(verified_block).await;
130
131        let chain_height = self
132            .blockchain_context_service
133            .blockchain_context()
134            .chain_height;
135
136        self.broadcast_block(block_blob, chain_height).await;
137
138        info!(
139            hash = hex::encode(
140                self.blockchain_context_service
141                    .blockchain_context()
142                    .top_hash
143            ),
144            "Successfully added block"
145        );
146
147        Ok(IncomingBlockOk::AddedToMainChain)
148    }
149
150    /// Handle an incoming [`BlockBatch`].
151    ///
152    /// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`]
153    /// depending on if the first block in the batch follows from the top of our chain.
154    ///
155    /// # Panics
156    ///
157    /// This function will panic if the batch is empty or if any internal service returns an unexpected
158    /// error that we cannot recover from or if the incoming batch contains no blocks.
159    #[instrument(
160        name = "incoming_block_batch",
161        skip_all,
162        level = "info",
163        fields(
164            start_height = batch.blocks.first().unwrap().0.number().unwrap(),
165            len = batch.blocks.len()
166        )
167    )]
168    pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
169        let (first_block, _) = batch
170            .blocks
171            .first()
172            .expect("Block batch should not be empty");
173
174        if first_block.header.previous
175            == self
176                .blockchain_context_service
177                .blockchain_context()
178                .top_hash
179        {
180            self.handle_incoming_block_batch_main_chain(batch).await;
181        } else {
182            self.handle_incoming_block_batch_alt_chain(batch).await;
183        }
184    }
185
186    /// Handles an incoming [`BlockBatch`] that follows the main chain.
187    ///
188    /// This function will handle validating the blocks in the batch and adding them to the blockchain
189    /// database and context cache.
190    ///
191    /// This function will also handle banning the peer and canceling the block downloader if the
192    /// block is invalid.
193    ///
194    /// # Panics
195    ///
196    /// This function will panic if any internal service returns an unexpected error that we cannot
197    /// recover from or if the incoming batch contains no blocks.
198    async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
199        if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
200            self.handle_incoming_block_batch_fast_sync(batch).await;
201            return;
202        }
203
204        let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
205            batch.blocks,
206            &mut self.blockchain_context_service,
207            self.blockchain_read_handle.clone(),
208        )
209        .await
210        else {
211            batch.peer_handle.ban_peer(LONG_BAN);
212            self.stop_current_block_downloader.notify_one();
213            return;
214        };
215
216        for (block, txs) in prepped_blocks {
217            let Ok(verified_block) = verify_prepped_main_chain_block(
218                block,
219                txs,
220                &mut self.blockchain_context_service,
221                self.blockchain_read_handle.clone(),
222                Some(&mut output_cache),
223            )
224            .await
225            else {
226                batch.peer_handle.ban_peer(LONG_BAN);
227                self.stop_current_block_downloader.notify_one();
228                return;
229            };
230
231            self.add_valid_block_to_main_chain(verified_block).await;
232        }
233        info!(fast_sync = false, "Successfully added block batch");
234    }
235
236    /// Handles an incoming block batch while we are under the fast sync height.
237    ///
238    /// # Panics
239    ///
240    /// This function will panic if any internal service returns an unexpected error that we cannot
241    /// recover from.
242    async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
243        let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
244        for (block, txs) in batch.blocks {
245            let block = block_to_verified_block_information(
246                block,
247                txs,
248                self.blockchain_context_service.blockchain_context(),
249            );
250            self.add_valid_block_to_blockchain_cache(&block).await;
251
252            valid_blocks.push(block);
253        }
254
255        self.batch_add_valid_block_to_blockchain_database(valid_blocks)
256            .await;
257
258        info!(fast_sync = true, "Successfully added block batch");
259    }
260
261    /// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
262    ///
263    /// This function will handle validating the alt-blocks to add them to our cache and reorging the
264    /// chain if the alt-chain has a higher cumulative difficulty.
265    ///
266    /// This function will also handle banning the peer and canceling the block downloader if the
267    /// alt block is invalid or if a reorg fails.
268    ///
269    /// # Panics
270    ///
271    /// This function will panic if any internal service returns an unexpected error that we cannot
272    /// recover from.
273    async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
274        let mut blocks = batch.blocks.into_iter();
275
276        while let Some((block, txs)) = blocks.next() {
277            // async blocks work as try blocks.
278            let res = async {
279                let txs = txs
280                    .into_par_iter()
281                    .map(|tx| {
282                        let tx = new_tx_verification_data(tx)?;
283                        Ok((tx.tx_hash, tx))
284                    })
285                    .collect::<Result<_, anyhow::Error>>()?;
286
287                let reorged = self.handle_incoming_alt_block(block, txs).await?;
288
289                Ok::<_, anyhow::Error>(reorged)
290            }
291            .await;
292
293            match res {
294                Err(e) => {
295                    batch.peer_handle.ban_peer(LONG_BAN);
296                    self.stop_current_block_downloader.notify_one();
297                    return;
298                }
299                Ok(AddAltBlock::Reorged) => {
300                    // Collect the remaining blocks and add them to the main chain instead.
301                    batch.blocks = blocks.collect();
302
303                    if batch.blocks.is_empty() {
304                        return;
305                    }
306
307                    self.handle_incoming_block_batch_main_chain(batch).await;
308                    return;
309                }
310                // continue adding alt blocks.
311                Ok(AddAltBlock::Cached(_)) => (),
312            }
313        }
314
315        info!(alt_chain = true, "Successfully added block batch");
316    }
317
318    /// Handles an incoming alt [`Block`].
319    ///
320    /// This function will do some pre-validation of the alt block, then if the cumulative difficulty
321    /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
322    /// the alt block to the alt block cache.
323    ///
324    /// # Errors
325    ///
326    /// This will return an [`Err`] if:
327    ///  - The alt block was invalid.
328    ///  - An attempt to reorg the chain failed.
329    ///
330    /// # Panics
331    ///
332    /// This function will panic if any internal service returns an unexpected error that we cannot
333    /// recover from.
334    async fn handle_incoming_alt_block(
335        &mut self,
336        block: Block,
337        prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
338    ) -> Result<AddAltBlock, anyhow::Error> {
339        // Check if a block already exists.
340        let BlockchainResponse::FindBlock(chain) = self
341            .blockchain_read_handle
342            .ready()
343            .await
344            .expect(PANIC_CRITICAL_SERVICE_ERROR)
345            .call(BlockchainReadRequest::FindBlock(block.hash()))
346            .await
347            .expect(PANIC_CRITICAL_SERVICE_ERROR)
348        else {
349            unreachable!();
350        };
351
352        match chain {
353            Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached(false)),
354            Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
355            None => (),
356        }
357
358        let alt_block_info =
359            sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
360                .await?;
361
362        // If this alt chain has more cumulative difficulty, reorg.
363        if alt_block_info.cumulative_difficulty
364            > self
365                .blockchain_context_service
366                .blockchain_context()
367                .cumulative_difficulty
368        {
369            self.try_do_reorg(alt_block_info).await?;
370            return Ok(AddAltBlock::Reorged);
371        }
372
373        self.blockchain_write_handle
374            .ready()
375            .await
376            .expect(PANIC_CRITICAL_SERVICE_ERROR)
377            .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
378            .await?;
379
380        Ok(AddAltBlock::Cached(true))
381    }
382
383    /// Attempt a re-org with the given top block of the alt-chain.
384    ///
385    /// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database
386    /// and context cache to verify the alt-chain. It will then attempt to verify and add each block
387    /// in the alt-chain to the main-chain. Releasing the lock on [`REORG_LOCK`] when finished.
388    ///
389    /// # Errors
390    ///
391    /// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain
392    /// will be returned back into its state it was at when then function was called.
393    ///
394    /// # Panics
395    ///
396    /// This function will panic if any internal service returns an unexpected error that we cannot
397    /// recover from.
398    #[instrument(name = "try_do_reorg", skip_all, level = "info")]
399    async fn try_do_reorg(
400        &mut self,
401        top_alt_block: AltBlockInformation,
402    ) -> Result<(), anyhow::Error> {
403        let _guard = REORG_LOCK.write().await;
404
405        let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
406            .blockchain_read_handle
407            .ready()
408            .await
409            .expect(PANIC_CRITICAL_SERVICE_ERROR)
410            .call(BlockchainReadRequest::AltBlocksInChain(
411                top_alt_block.chain_id,
412            ))
413            .await
414            .map_err(|e| anyhow::anyhow!(e))?
415        else {
416            unreachable!();
417        };
418
419        alt_blocks.push(top_alt_block);
420
421        let split_height = alt_blocks[0].height;
422        let current_main_chain_height = self
423            .blockchain_context_service
424            .blockchain_context()
425            .chain_height;
426
427        info!(split_height, "Attempting blockchain reorg");
428
429        let old_main_chain_id = self
430            .pop_blocks(current_main_chain_height - split_height)
431            .await;
432
433        let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
434
435        match reorg_res {
436            Ok(()) => {
437                info!(
438                    top_hash = hex::encode(
439                        self.blockchain_context_service
440                            .blockchain_context()
441                            .top_hash
442                    ),
443                    "Successfully reorged"
444                );
445                Ok(())
446            }
447            Err(e) => {
448                self.reverse_reorg(old_main_chain_id).await;
449                Err(e)
450            }
451        }
452    }
453
454    /// Reverse a reorg that failed.
455    ///
456    /// This function takes the old chain's [`ChainId`] and reverts the chain state to back to before
457    /// the reorg was attempted.
458    ///
459    /// # Panics
460    ///
461    /// This function will panic if any internal service returns an unexpected error that we cannot
462    /// recover from.
463    #[instrument(name = "reverse_reorg", skip_all, level = "info")]
464    async fn reverse_reorg(&mut self, old_main_chain_id: ChainId) {
465        warn!("Reorg failed, reverting to old chain.");
466
467        let BlockchainResponse::AltBlocksInChain(mut blocks) = self
468            .blockchain_read_handle
469            .ready()
470            .await
471            .expect(PANIC_CRITICAL_SERVICE_ERROR)
472            .call(BlockchainReadRequest::AltBlocksInChain(old_main_chain_id))
473            .await
474            .expect(PANIC_CRITICAL_SERVICE_ERROR)
475        else {
476            unreachable!();
477        };
478
479        let split_height = blocks[0].height;
480        let current_main_chain_height = self
481            .blockchain_context_service
482            .blockchain_context()
483            .chain_height;
484
485        let numb_blocks = current_main_chain_height - split_height;
486
487        if numb_blocks > 0 {
488            self.pop_blocks(current_main_chain_height - split_height)
489                .await;
490        }
491
492        for block in blocks {
493            let verified_block = alt_block_to_verified_block_information(
494                block,
495                self.blockchain_context_service.blockchain_context(),
496            );
497            self.add_valid_block_to_main_chain(verified_block).await;
498        }
499
500        self.blockchain_write_handle
501            .ready()
502            .await
503            .expect(PANIC_CRITICAL_SERVICE_ERROR)
504            .call(BlockchainWriteRequest::FlushAltBlocks)
505            .await
506            .expect(PANIC_CRITICAL_SERVICE_ERROR);
507
508        info!("Successfully reversed reorg");
509    }
510
511    /// Pop blocks from the main chain, moving them to alt-blocks. This function will flush all other alt-blocks.
512    ///
513    /// This returns the [`ChainId`] of the blocks that were popped.
514    ///
515    /// # Panics
516    ///
517    /// This function will panic if any internal service returns an unexpected error that we cannot
518    /// recover from.
519    #[instrument(name = "pop_blocks", skip(self), level = "info")]
520    async fn pop_blocks(&mut self, numb_blocks: usize) -> ChainId {
521        let BlockchainResponse::PopBlocks(old_main_chain_id) = self
522            .blockchain_write_handle
523            .ready()
524            .await
525            .expect(PANIC_CRITICAL_SERVICE_ERROR)
526            .call(BlockchainWriteRequest::PopBlocks(numb_blocks))
527            .await
528            .expect(PANIC_CRITICAL_SERVICE_ERROR)
529        else {
530            unreachable!();
531        };
532
533        self.blockchain_context_service
534            .ready()
535            .await
536            .expect(PANIC_CRITICAL_SERVICE_ERROR)
537            .call(BlockChainContextRequest::PopBlocks { numb_blocks })
538            .await
539            .expect(PANIC_CRITICAL_SERVICE_ERROR);
540
541        old_main_chain_id
542    }
543
544    /// Verify and add a list of [`AltBlockInformation`]s to the main-chain.
545    ///
546    /// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain
547    /// for the blockchain database and the context cache, or in other words that the blockchain database
548    /// and context cache have already had the top blocks popped to where the alt-chain meets the main-chain.
549    ///
550    /// # Errors
551    ///
552    /// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should
553    /// be aborted and the chain should be returned to its previous state.
554    ///
555    /// # Panics
556    ///
557    /// This function will panic if any internal service returns an unexpected error that we cannot
558    /// recover from.
559    async fn verify_add_alt_blocks_to_main_chain(
560        &mut self,
561        alt_blocks: Vec<AltBlockInformation>,
562    ) -> Result<(), anyhow::Error> {
563        for mut alt_block in alt_blocks {
564            let prepped_txs = alt_block
565                .txs
566                .drain(..)
567                .map(|tx| Ok(tx.try_into()?))
568                .collect::<Result<_, anyhow::Error>>()?;
569
570            let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
571
572            let verified_block = verify_prepped_main_chain_block(
573                prepped_block,
574                prepped_txs,
575                &mut self.blockchain_context_service,
576                self.blockchain_read_handle.clone(),
577                None,
578            )
579            .await?;
580
581            self.add_valid_block_to_main_chain(verified_block).await;
582        }
583
584        Ok(())
585    }
586
587    /// Adds a [`VerifiedBlockInformation`] to the main-chain.
588    ///
589    /// This function will update the blockchain database and the context cache.
590    ///
591    /// # Panics
592    ///
593    /// This function will panic if any internal service returns an unexpected error that we cannot
594    /// recover from.
595    pub async fn add_valid_block_to_main_chain(
596        &mut self,
597        verified_block: VerifiedBlockInformation,
598    ) {
599        // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate.
600        let spent_key_images = verified_block
601            .txs
602            .iter()
603            .flat_map(|tx| {
604                tx.tx.prefix().inputs.iter().map(|input| match input {
605                    Input::ToKey { key_image, .. } => key_image.0,
606                    Input::Gen(_) => unreachable!(),
607                })
608            })
609            .collect::<Vec<[u8; 32]>>();
610
611        self.add_valid_block_to_blockchain_cache(&verified_block)
612            .await;
613
614        self.blockchain_write_handle
615            .ready()
616            .await
617            .expect(PANIC_CRITICAL_SERVICE_ERROR)
618            .call(BlockchainWriteRequest::WriteBlock(verified_block))
619            .await
620            .expect(PANIC_CRITICAL_SERVICE_ERROR);
621
622        self.txpool_write_handle
623            .ready()
624            .await
625            .expect(PANIC_CRITICAL_SERVICE_ERROR)
626            .call(TxpoolWriteRequest::NewBlock { spent_key_images })
627            .await
628            .expect(PANIC_CRITICAL_SERVICE_ERROR);
629    }
630
631    /// Adds a [`VerifiedBlockInformation`] to the blockchain context cache.
632    ///
633    /// # Panics
634    ///
635    /// This function will panic if any internal service returns an unexpected error that we cannot
636    /// recover from.
637    async fn add_valid_block_to_blockchain_cache(
638        &mut self,
639        verified_block: &VerifiedBlockInformation,
640    ) {
641        self.blockchain_context_service
642            .ready()
643            .await
644            .expect(PANIC_CRITICAL_SERVICE_ERROR)
645            .call(BlockChainContextRequest::Update(NewBlockData {
646                block_hash: verified_block.block_hash,
647                height: verified_block.height,
648                timestamp: verified_block.block.header.timestamp,
649                weight: verified_block.weight,
650                long_term_weight: verified_block.long_term_weight,
651                generated_coins: verified_block.generated_coins,
652                vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
653                cumulative_difficulty: verified_block.cumulative_difficulty,
654            }))
655            .await
656            .expect(PANIC_CRITICAL_SERVICE_ERROR);
657    }
658
659    /// Batch writes the [`VerifiedBlockInformation`]s to the database.
660    ///
661    /// The blocks must be sequential.
662    ///
663    /// # Panics
664    ///
665    /// This function will panic if any internal service returns an unexpected error that we cannot
666    /// recover from.
667    async fn batch_add_valid_block_to_blockchain_database(
668        &mut self,
669        blocks: Vec<VerifiedBlockInformation>,
670    ) {
671        self.blockchain_write_handle
672            .ready()
673            .await
674            .expect(PANIC_CRITICAL_SERVICE_ERROR)
675            .call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
676            .await
677            .expect(PANIC_CRITICAL_SERVICE_ERROR);
678    }
679}
680
681/// The result from successfully adding an alt-block.
682enum AddAltBlock {
683    /// The alt-block was cached.
684    ///
685    /// The inner `bool` is for if the block was cached before [`false`] or was cached during the call [`true`].
686    Cached(bool),
687    /// The chain was reorged.
688    Reorged,
689}
690
691/// Creates a [`VerifiedBlockInformation`] from an alt-block known to be valid.
692///
693/// # Panics
694///
695/// This may panic if used on an invalid block.
696pub fn alt_block_to_verified_block_information(
697    block: AltBlockInformation,
698    blockchain_ctx: &BlockchainContext,
699) -> VerifiedBlockInformation {
700    assert_eq!(
701        block.height, blockchain_ctx.chain_height,
702        "alt-block invalid"
703    );
704
705    let total_fees = block.txs.iter().map(|tx| tx.fee).sum::<u64>();
706    let total_outputs = block
707        .block
708        .miner_transaction
709        .prefix()
710        .outputs
711        .iter()
712        .map(|output| output.amount.unwrap_or(0))
713        .sum::<u64>();
714
715    let generated_coins = total_outputs - total_fees;
716
717    VerifiedBlockInformation {
718        block_blob: block.block_blob,
719        txs: block.txs,
720        block_hash: block.block_hash,
721        pow_hash: [u8::MAX; 32],
722        height: block.height,
723        generated_coins,
724        weight: block.weight,
725        long_term_weight: blockchain_ctx.next_block_long_term_weight(block.weight),
726        cumulative_difficulty: blockchain_ctx.cumulative_difficulty
727            + blockchain_ctx.next_difficulty,
728        block: block.block,
729    }
730}