1use std::{collections::HashMap, sync::Arc};
3
4use bytes::Bytes;
5use futures::{TryFutureExt, TryStreamExt};
6use monero_serai::{
7 block::Block,
8 transaction::{Input, Transaction},
9};
10use rayon::prelude::*;
11use tower::{Service, ServiceExt};
12use tracing::{info, instrument, warn, Span};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::{
16 block::{
17 batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
18 verify_prepped_main_chain_block, PreparedBlock,
19 },
20 transactions::new_tx_verification_data,
21 BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
22};
23use cuprate_consensus_context::{BlockchainContext, NewBlockData};
24use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
25use cuprate_helper::cast::usize_to_u64;
26use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
27use cuprate_txpool::service::interface::TxpoolWriteRequest;
28use cuprate_types::{
29 blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
30 AltBlockInformation, Chain, ChainId, HardFork, TransactionVerificationData,
31 VerifiedBlockInformation, VerifiedTransactionInformation,
32};
33
34use crate::{
35 blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
36 constants::PANIC_CRITICAL_SERVICE_ERROR,
37 signals::REORG_LOCK,
38};
39
40impl super::BlockchainManager {
41 pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
48 match command {
49 BlockchainManagerCommand::AddBlock {
50 block,
51 prepped_txs,
52 response_tx,
53 } => {
54 let res = self.handle_incoming_block(block, prepped_txs).await;
55
56 drop(response_tx.send(res));
57 }
58 }
59 }
60
61 async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
63 self.broadcast_svc
64 .ready()
65 .await
66 .expect("Broadcast service is Infallible.")
67 .call(BroadcastRequest::Block {
68 block_bytes,
69 current_blockchain_height: usize_to_u64(blockchain_height),
70 })
71 .await
72 .expect("Broadcast service is Infallible.");
73 }
74
75 #[instrument(
87 name = "incoming_block",
88 skip_all,
89 level = "info",
90 fields(
91 height = block.number().unwrap(),
92 txs = block.transactions.len(),
93 )
94 )]
95 pub async fn handle_incoming_block(
96 &mut self,
97 block: Block,
98 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
99 ) -> Result<IncomingBlockOk, anyhow::Error> {
100 if block.header.previous
101 != self
102 .blockchain_context_service
103 .blockchain_context()
104 .top_hash
105 {
106 let block_hash = block.hash();
107 let res = self.handle_incoming_alt_block(block, prepared_txs).await?;
108
109 if matches!(res, AddAltBlock::Cached(true)) {
110 info!(
111 alt_block = true,
112 hash = hex::encode(block_hash),
113 "Successfully added block"
114 );
115 }
116
117 return Ok(IncomingBlockOk::AddedToAltChain);
118 }
119
120 let verified_block = verify_main_chain_block(
121 block,
122 prepared_txs,
123 &mut self.blockchain_context_service,
124 self.blockchain_read_handle.clone(),
125 )
126 .await?;
127
128 let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
129 self.add_valid_block_to_main_chain(verified_block).await;
130
131 let chain_height = self
132 .blockchain_context_service
133 .blockchain_context()
134 .chain_height;
135
136 self.broadcast_block(block_blob, chain_height).await;
137
138 info!(
139 hash = hex::encode(
140 self.blockchain_context_service
141 .blockchain_context()
142 .top_hash
143 ),
144 "Successfully added block"
145 );
146
147 Ok(IncomingBlockOk::AddedToMainChain)
148 }
149
150 #[instrument(
160 name = "incoming_block_batch",
161 skip_all,
162 level = "info",
163 fields(
164 start_height = batch.blocks.first().unwrap().0.number().unwrap(),
165 len = batch.blocks.len()
166 )
167 )]
168 pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
169 let (first_block, _) = batch
170 .blocks
171 .first()
172 .expect("Block batch should not be empty");
173
174 if first_block.header.previous
175 == self
176 .blockchain_context_service
177 .blockchain_context()
178 .top_hash
179 {
180 self.handle_incoming_block_batch_main_chain(batch).await;
181 } else {
182 self.handle_incoming_block_batch_alt_chain(batch).await;
183 }
184 }
185
186 async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
199 if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
200 self.handle_incoming_block_batch_fast_sync(batch).await;
201 return;
202 }
203
204 let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
205 batch.blocks,
206 &mut self.blockchain_context_service,
207 self.blockchain_read_handle.clone(),
208 )
209 .await
210 else {
211 batch.peer_handle.ban_peer(LONG_BAN);
212 self.stop_current_block_downloader.notify_one();
213 return;
214 };
215
216 for (block, txs) in prepped_blocks {
217 let Ok(verified_block) = verify_prepped_main_chain_block(
218 block,
219 txs,
220 &mut self.blockchain_context_service,
221 self.blockchain_read_handle.clone(),
222 Some(&mut output_cache),
223 )
224 .await
225 else {
226 batch.peer_handle.ban_peer(LONG_BAN);
227 self.stop_current_block_downloader.notify_one();
228 return;
229 };
230
231 self.add_valid_block_to_main_chain(verified_block).await;
232 }
233 info!(fast_sync = false, "Successfully added block batch");
234 }
235
236 async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
243 let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
244 for (block, txs) in batch.blocks {
245 let block = block_to_verified_block_information(
246 block,
247 txs,
248 self.blockchain_context_service.blockchain_context(),
249 );
250 self.add_valid_block_to_blockchain_cache(&block).await;
251
252 valid_blocks.push(block);
253 }
254
255 self.batch_add_valid_block_to_blockchain_database(valid_blocks)
256 .await;
257
258 info!(fast_sync = true, "Successfully added block batch");
259 }
260
261 async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
274 let mut blocks = batch.blocks.into_iter();
275
276 while let Some((block, txs)) = blocks.next() {
277 let res = async {
279 let txs = txs
280 .into_par_iter()
281 .map(|tx| {
282 let tx = new_tx_verification_data(tx)?;
283 Ok((tx.tx_hash, tx))
284 })
285 .collect::<Result<_, anyhow::Error>>()?;
286
287 let reorged = self.handle_incoming_alt_block(block, txs).await?;
288
289 Ok::<_, anyhow::Error>(reorged)
290 }
291 .await;
292
293 match res {
294 Err(e) => {
295 batch.peer_handle.ban_peer(LONG_BAN);
296 self.stop_current_block_downloader.notify_one();
297 return;
298 }
299 Ok(AddAltBlock::Reorged) => {
300 batch.blocks = blocks.collect();
302
303 if batch.blocks.is_empty() {
304 return;
305 }
306
307 self.handle_incoming_block_batch_main_chain(batch).await;
308 return;
309 }
310 Ok(AddAltBlock::Cached(_)) => (),
312 }
313 }
314
315 info!(alt_chain = true, "Successfully added block batch");
316 }
317
318 async fn handle_incoming_alt_block(
335 &mut self,
336 block: Block,
337 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
338 ) -> Result<AddAltBlock, anyhow::Error> {
339 let BlockchainResponse::FindBlock(chain) = self
341 .blockchain_read_handle
342 .ready()
343 .await
344 .expect(PANIC_CRITICAL_SERVICE_ERROR)
345 .call(BlockchainReadRequest::FindBlock(block.hash()))
346 .await
347 .expect(PANIC_CRITICAL_SERVICE_ERROR)
348 else {
349 unreachable!();
350 };
351
352 match chain {
353 Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached(false)),
354 Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
355 None => (),
356 }
357
358 let alt_block_info =
359 sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
360 .await?;
361
362 if alt_block_info.cumulative_difficulty
364 > self
365 .blockchain_context_service
366 .blockchain_context()
367 .cumulative_difficulty
368 {
369 self.try_do_reorg(alt_block_info).await?;
370 return Ok(AddAltBlock::Reorged);
371 }
372
373 self.blockchain_write_handle
374 .ready()
375 .await
376 .expect(PANIC_CRITICAL_SERVICE_ERROR)
377 .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
378 .await?;
379
380 Ok(AddAltBlock::Cached(true))
381 }
382
383 #[instrument(name = "try_do_reorg", skip_all, level = "info")]
399 async fn try_do_reorg(
400 &mut self,
401 top_alt_block: AltBlockInformation,
402 ) -> Result<(), anyhow::Error> {
403 let _guard = REORG_LOCK.write().await;
404
405 let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
406 .blockchain_read_handle
407 .ready()
408 .await
409 .expect(PANIC_CRITICAL_SERVICE_ERROR)
410 .call(BlockchainReadRequest::AltBlocksInChain(
411 top_alt_block.chain_id,
412 ))
413 .await
414 .map_err(|e| anyhow::anyhow!(e))?
415 else {
416 unreachable!();
417 };
418
419 alt_blocks.push(top_alt_block);
420
421 let split_height = alt_blocks[0].height;
422 let current_main_chain_height = self
423 .blockchain_context_service
424 .blockchain_context()
425 .chain_height;
426
427 info!(split_height, "Attempting blockchain reorg");
428
429 let old_main_chain_id = self
430 .pop_blocks(current_main_chain_height - split_height)
431 .await;
432
433 let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
434
435 match reorg_res {
436 Ok(()) => {
437 info!(
438 top_hash = hex::encode(
439 self.blockchain_context_service
440 .blockchain_context()
441 .top_hash
442 ),
443 "Successfully reorged"
444 );
445 Ok(())
446 }
447 Err(e) => {
448 self.reverse_reorg(old_main_chain_id).await;
449 Err(e)
450 }
451 }
452 }
453
454 #[instrument(name = "reverse_reorg", skip_all, level = "info")]
464 async fn reverse_reorg(&mut self, old_main_chain_id: ChainId) {
465 warn!("Reorg failed, reverting to old chain.");
466
467 let BlockchainResponse::AltBlocksInChain(mut blocks) = self
468 .blockchain_read_handle
469 .ready()
470 .await
471 .expect(PANIC_CRITICAL_SERVICE_ERROR)
472 .call(BlockchainReadRequest::AltBlocksInChain(old_main_chain_id))
473 .await
474 .expect(PANIC_CRITICAL_SERVICE_ERROR)
475 else {
476 unreachable!();
477 };
478
479 let split_height = blocks[0].height;
480 let current_main_chain_height = self
481 .blockchain_context_service
482 .blockchain_context()
483 .chain_height;
484
485 let numb_blocks = current_main_chain_height - split_height;
486
487 if numb_blocks > 0 {
488 self.pop_blocks(current_main_chain_height - split_height)
489 .await;
490 }
491
492 for block in blocks {
493 let verified_block = alt_block_to_verified_block_information(
494 block,
495 self.blockchain_context_service.blockchain_context(),
496 );
497 self.add_valid_block_to_main_chain(verified_block).await;
498 }
499
500 self.blockchain_write_handle
501 .ready()
502 .await
503 .expect(PANIC_CRITICAL_SERVICE_ERROR)
504 .call(BlockchainWriteRequest::FlushAltBlocks)
505 .await
506 .expect(PANIC_CRITICAL_SERVICE_ERROR);
507
508 info!("Successfully reversed reorg");
509 }
510
511 #[instrument(name = "pop_blocks", skip(self), level = "info")]
520 async fn pop_blocks(&mut self, numb_blocks: usize) -> ChainId {
521 let BlockchainResponse::PopBlocks(old_main_chain_id) = self
522 .blockchain_write_handle
523 .ready()
524 .await
525 .expect(PANIC_CRITICAL_SERVICE_ERROR)
526 .call(BlockchainWriteRequest::PopBlocks(numb_blocks))
527 .await
528 .expect(PANIC_CRITICAL_SERVICE_ERROR)
529 else {
530 unreachable!();
531 };
532
533 self.blockchain_context_service
534 .ready()
535 .await
536 .expect(PANIC_CRITICAL_SERVICE_ERROR)
537 .call(BlockChainContextRequest::PopBlocks { numb_blocks })
538 .await
539 .expect(PANIC_CRITICAL_SERVICE_ERROR);
540
541 old_main_chain_id
542 }
543
544 async fn verify_add_alt_blocks_to_main_chain(
560 &mut self,
561 alt_blocks: Vec<AltBlockInformation>,
562 ) -> Result<(), anyhow::Error> {
563 for mut alt_block in alt_blocks {
564 let prepped_txs = alt_block
565 .txs
566 .drain(..)
567 .map(|tx| Ok(tx.try_into()?))
568 .collect::<Result<_, anyhow::Error>>()?;
569
570 let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
571
572 let verified_block = verify_prepped_main_chain_block(
573 prepped_block,
574 prepped_txs,
575 &mut self.blockchain_context_service,
576 self.blockchain_read_handle.clone(),
577 None,
578 )
579 .await?;
580
581 self.add_valid_block_to_main_chain(verified_block).await;
582 }
583
584 Ok(())
585 }
586
587 pub async fn add_valid_block_to_main_chain(
596 &mut self,
597 verified_block: VerifiedBlockInformation,
598 ) {
599 let spent_key_images = verified_block
601 .txs
602 .iter()
603 .flat_map(|tx| {
604 tx.tx.prefix().inputs.iter().map(|input| match input {
605 Input::ToKey { key_image, .. } => key_image.0,
606 Input::Gen(_) => unreachable!(),
607 })
608 })
609 .collect::<Vec<[u8; 32]>>();
610
611 self.add_valid_block_to_blockchain_cache(&verified_block)
612 .await;
613
614 self.blockchain_write_handle
615 .ready()
616 .await
617 .expect(PANIC_CRITICAL_SERVICE_ERROR)
618 .call(BlockchainWriteRequest::WriteBlock(verified_block))
619 .await
620 .expect(PANIC_CRITICAL_SERVICE_ERROR);
621
622 self.txpool_write_handle
623 .ready()
624 .await
625 .expect(PANIC_CRITICAL_SERVICE_ERROR)
626 .call(TxpoolWriteRequest::NewBlock { spent_key_images })
627 .await
628 .expect(PANIC_CRITICAL_SERVICE_ERROR);
629 }
630
631 async fn add_valid_block_to_blockchain_cache(
638 &mut self,
639 verified_block: &VerifiedBlockInformation,
640 ) {
641 self.blockchain_context_service
642 .ready()
643 .await
644 .expect(PANIC_CRITICAL_SERVICE_ERROR)
645 .call(BlockChainContextRequest::Update(NewBlockData {
646 block_hash: verified_block.block_hash,
647 height: verified_block.height,
648 timestamp: verified_block.block.header.timestamp,
649 weight: verified_block.weight,
650 long_term_weight: verified_block.long_term_weight,
651 generated_coins: verified_block.generated_coins,
652 vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
653 cumulative_difficulty: verified_block.cumulative_difficulty,
654 }))
655 .await
656 .expect(PANIC_CRITICAL_SERVICE_ERROR);
657 }
658
659 async fn batch_add_valid_block_to_blockchain_database(
668 &mut self,
669 blocks: Vec<VerifiedBlockInformation>,
670 ) {
671 self.blockchain_write_handle
672 .ready()
673 .await
674 .expect(PANIC_CRITICAL_SERVICE_ERROR)
675 .call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
676 .await
677 .expect(PANIC_CRITICAL_SERVICE_ERROR);
678 }
679}
680
681enum AddAltBlock {
683 Cached(bool),
687 Reorged,
689}
690
691pub fn alt_block_to_verified_block_information(
697 block: AltBlockInformation,
698 blockchain_ctx: &BlockchainContext,
699) -> VerifiedBlockInformation {
700 assert_eq!(
701 block.height, blockchain_ctx.chain_height,
702 "alt-block invalid"
703 );
704
705 let total_fees = block.txs.iter().map(|tx| tx.fee).sum::<u64>();
706 let total_outputs = block
707 .block
708 .miner_transaction
709 .prefix()
710 .outputs
711 .iter()
712 .map(|output| output.amount.unwrap_or(0))
713 .sum::<u64>();
714
715 let generated_coins = total_outputs - total_fees;
716
717 VerifiedBlockInformation {
718 block_blob: block.block_blob,
719 txs: block.txs,
720 block_hash: block.block_hash,
721 pow_hash: [u8::MAX; 32],
722 height: block.height,
723 generated_coins,
724 weight: block.weight,
725 long_term_weight: blockchain_ctx.next_block_long_term_weight(block.weight),
726 cumulative_difficulty: blockchain_ctx.cumulative_difficulty
727 + blockchain_ctx.next_difficulty,
728 block: block.block,
729 }
730}