1use std::{collections::HashMap, sync::Arc};
3
4use bytes::Bytes;
5use futures::{TryFutureExt, TryStreamExt};
6use monero_oxide::{
7 block::Block,
8 transaction::{Input, Transaction},
9};
10use rayon::prelude::*;
11use tower::{Service, ServiceExt};
12use tracing::{info, instrument, warn, Span};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::{
16 block::{
17 batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
18 verify_prepped_main_chain_block, PreparedBlock,
19 },
20 transactions::new_tx_verification_data,
21 BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
22};
23use cuprate_consensus_context::{BlockchainContext, NewBlockData};
24use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
25use cuprate_helper::cast::usize_to_u64;
26use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
27use cuprate_txpool::service::interface::TxpoolWriteRequest;
28use cuprate_types::{
29 blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
30 AltBlockInformation, Chain, ChainId, HardFork, TransactionVerificationData,
31 VerifiedBlockInformation, VerifiedTransactionInformation,
32};
33
34use crate::{
35 blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
36 constants::PANIC_CRITICAL_SERVICE_ERROR,
37 signals::REORG_LOCK,
38};
39
40impl super::BlockchainManager {
41 pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
48 match command {
49 BlockchainManagerCommand::AddBlock {
50 block,
51 prepped_txs,
52 response_tx,
53 } => {
54 let res = self.handle_incoming_block(block, prepped_txs).await;
55
56 drop(response_tx.send(res));
57 }
58 BlockchainManagerCommand::PopBlocks {
59 numb_blocks,
60 response_tx,
61 } => {
62 let _guard = REORG_LOCK.write().await;
63 self.pop_blocks(numb_blocks).await;
64 self.blockchain_write_handle
65 .ready()
66 .await
67 .expect(PANIC_CRITICAL_SERVICE_ERROR)
68 .call(BlockchainWriteRequest::FlushAltBlocks)
69 .await
70 .expect(PANIC_CRITICAL_SERVICE_ERROR);
71 #[expect(clippy::let_underscore_must_use)]
72 let _ = response_tx.send(());
73 }
74 }
75 }
76
77 async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
79 self.broadcast_svc
80 .ready()
81 .await
82 .expect("Broadcast service is Infallible.")
83 .call(BroadcastRequest::Block {
84 block_bytes,
85 current_blockchain_height: usize_to_u64(blockchain_height),
86 })
87 .await
88 .expect("Broadcast service is Infallible.");
89 }
90
91 #[instrument(
103 name = "incoming_block",
104 skip_all,
105 level = "info",
106 fields(
107 height = block.number().unwrap(),
108 txs = block.transactions.len(),
109 )
110 )]
111 pub async fn handle_incoming_block(
112 &mut self,
113 block: Block,
114 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
115 ) -> Result<IncomingBlockOk, anyhow::Error> {
116 if block.header.previous
117 != self
118 .blockchain_context_service
119 .blockchain_context()
120 .top_hash
121 {
122 let block_hash = block.hash();
123 let res = self.handle_incoming_alt_block(block, prepared_txs).await?;
124
125 if matches!(res, AddAltBlock::Cached(true)) {
126 info!(
127 alt_block = true,
128 hash = hex::encode(block_hash),
129 "Successfully added block"
130 );
131 }
132
133 return Ok(IncomingBlockOk::AddedToAltChain);
134 }
135
136 let verified_block = verify_main_chain_block(
137 block,
138 prepared_txs,
139 &mut self.blockchain_context_service,
140 self.blockchain_read_handle.clone(),
141 )
142 .await?;
143
144 let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
145 self.add_valid_block_to_main_chain(verified_block).await;
146
147 let chain_height = self
148 .blockchain_context_service
149 .blockchain_context()
150 .chain_height;
151
152 self.broadcast_block(block_blob, chain_height).await;
153
154 info!(
155 hash = hex::encode(
156 self.blockchain_context_service
157 .blockchain_context()
158 .top_hash
159 ),
160 "Successfully added block"
161 );
162
163 Ok(IncomingBlockOk::AddedToMainChain)
164 }
165
166 #[instrument(
176 name = "incoming_block_batch",
177 skip_all,
178 level = "info",
179 fields(
180 start_height = batch.blocks.first().unwrap().0.number().unwrap(),
181 len = batch.blocks.len()
182 )
183 )]
184 pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
185 let (first_block, _) = batch
186 .blocks
187 .first()
188 .expect("Block batch should not be empty");
189
190 if first_block.header.previous
191 == self
192 .blockchain_context_service
193 .blockchain_context()
194 .top_hash
195 {
196 self.handle_incoming_block_batch_main_chain(batch).await;
197 } else {
198 self.handle_incoming_block_batch_alt_chain(batch).await;
199 }
200 }
201
202 async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
215 if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
216 self.handle_incoming_block_batch_fast_sync(batch).await;
217 return;
218 }
219
220 let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
221 batch.blocks,
222 &mut self.blockchain_context_service,
223 self.blockchain_read_handle.clone(),
224 )
225 .await
226 else {
227 batch.peer_handle.ban_peer(LONG_BAN);
228 self.stop_current_block_downloader.notify_one();
229 return;
230 };
231
232 for (block, txs) in prepped_blocks {
233 let Ok(verified_block) = verify_prepped_main_chain_block(
234 block,
235 txs,
236 &mut self.blockchain_context_service,
237 self.blockchain_read_handle.clone(),
238 Some(&mut output_cache),
239 )
240 .await
241 else {
242 batch.peer_handle.ban_peer(LONG_BAN);
243 self.stop_current_block_downloader.notify_one();
244 return;
245 };
246
247 self.add_valid_block_to_main_chain(verified_block).await;
248 }
249 info!(fast_sync = false, "Successfully added block batch");
250 }
251
252 async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
259 let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
260 for (block, txs) in batch.blocks {
261 let block = block_to_verified_block_information(
262 block,
263 txs,
264 self.blockchain_context_service.blockchain_context(),
265 );
266 self.add_valid_block_to_blockchain_cache(&block).await;
267
268 valid_blocks.push(block);
269 }
270
271 self.batch_add_valid_block_to_blockchain_database(valid_blocks)
272 .await;
273
274 info!(fast_sync = true, "Successfully added block batch");
275 }
276
277 async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
290 let mut blocks = batch.blocks.into_iter();
291
292 while let Some((block, txs)) = blocks.next() {
293 let res = async {
295 let txs = txs
296 .into_par_iter()
297 .map(|tx| {
298 let tx = new_tx_verification_data(tx)?;
299 Ok((tx.tx_hash, tx))
300 })
301 .collect::<Result<_, anyhow::Error>>()?;
302
303 let reorged = self.handle_incoming_alt_block(block, txs).await?;
304
305 Ok::<_, anyhow::Error>(reorged)
306 }
307 .await;
308
309 match res {
310 Err(e) => {
311 batch.peer_handle.ban_peer(LONG_BAN);
312 self.stop_current_block_downloader.notify_one();
313 return;
314 }
315 Ok(AddAltBlock::Reorged) => {
316 batch.blocks = blocks.collect();
318
319 if batch.blocks.is_empty() {
320 return;
321 }
322
323 self.handle_incoming_block_batch_main_chain(batch).await;
324 return;
325 }
326 Ok(AddAltBlock::Cached(_)) => (),
328 }
329 }
330
331 info!(alt_chain = true, "Successfully added block batch");
332 }
333
334 async fn handle_incoming_alt_block(
351 &mut self,
352 block: Block,
353 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
354 ) -> Result<AddAltBlock, anyhow::Error> {
355 let BlockchainResponse::FindBlock(chain) = self
357 .blockchain_read_handle
358 .ready()
359 .await
360 .expect(PANIC_CRITICAL_SERVICE_ERROR)
361 .call(BlockchainReadRequest::FindBlock(block.hash()))
362 .await
363 .expect(PANIC_CRITICAL_SERVICE_ERROR)
364 else {
365 unreachable!();
366 };
367
368 match chain {
369 Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached(false)),
370 Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
371 None => (),
372 }
373
374 let alt_block_info =
375 sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
376 .await?;
377
378 if alt_block_info.cumulative_difficulty
380 > self
381 .blockchain_context_service
382 .blockchain_context()
383 .cumulative_difficulty
384 {
385 self.try_do_reorg(alt_block_info).await?;
386 return Ok(AddAltBlock::Reorged);
387 }
388
389 self.blockchain_write_handle
390 .ready()
391 .await
392 .expect(PANIC_CRITICAL_SERVICE_ERROR)
393 .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
394 .await?;
395
396 Ok(AddAltBlock::Cached(true))
397 }
398
399 #[instrument(name = "try_do_reorg", skip_all, level = "info")]
415 async fn try_do_reorg(
416 &mut self,
417 top_alt_block: AltBlockInformation,
418 ) -> Result<(), anyhow::Error> {
419 let _guard = REORG_LOCK.write().await;
420
421 let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
422 .blockchain_read_handle
423 .ready()
424 .await
425 .expect(PANIC_CRITICAL_SERVICE_ERROR)
426 .call(BlockchainReadRequest::AltBlocksInChain(
427 top_alt_block.chain_id,
428 ))
429 .await
430 .map_err(|e| anyhow::anyhow!(e))?
431 else {
432 unreachable!();
433 };
434
435 alt_blocks.push(top_alt_block);
436
437 let split_height = alt_blocks[0].height;
438 let current_main_chain_height = self
439 .blockchain_context_service
440 .blockchain_context()
441 .chain_height;
442
443 info!(split_height, "Attempting blockchain reorg");
444
445 let old_main_chain_id = self
446 .pop_blocks(current_main_chain_height - split_height)
447 .await;
448
449 let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
450
451 match reorg_res {
452 Ok(()) => {
453 info!(
454 top_hash = hex::encode(
455 self.blockchain_context_service
456 .blockchain_context()
457 .top_hash
458 ),
459 "Successfully reorged"
460 );
461 Ok(())
462 }
463 Err(e) => {
464 self.reverse_reorg(old_main_chain_id).await;
465 Err(e)
466 }
467 }
468 }
469
470 #[instrument(name = "reverse_reorg", skip_all, level = "info")]
480 async fn reverse_reorg(&mut self, old_main_chain_id: ChainId) {
481 warn!("Reorg failed, reverting to old chain.");
482
483 let BlockchainResponse::AltBlocksInChain(mut blocks) = self
484 .blockchain_read_handle
485 .ready()
486 .await
487 .expect(PANIC_CRITICAL_SERVICE_ERROR)
488 .call(BlockchainReadRequest::AltBlocksInChain(old_main_chain_id))
489 .await
490 .expect(PANIC_CRITICAL_SERVICE_ERROR)
491 else {
492 unreachable!();
493 };
494
495 let split_height = blocks[0].height;
496 let current_main_chain_height = self
497 .blockchain_context_service
498 .blockchain_context()
499 .chain_height;
500
501 let numb_blocks = current_main_chain_height - split_height;
502
503 if numb_blocks > 0 {
504 self.pop_blocks(current_main_chain_height - split_height)
505 .await;
506 }
507
508 for block in blocks {
509 let verified_block = alt_block_to_verified_block_information(
510 block,
511 self.blockchain_context_service.blockchain_context(),
512 );
513 self.add_valid_block_to_main_chain(verified_block).await;
514 }
515
516 self.blockchain_write_handle
517 .ready()
518 .await
519 .expect(PANIC_CRITICAL_SERVICE_ERROR)
520 .call(BlockchainWriteRequest::FlushAltBlocks)
521 .await
522 .expect(PANIC_CRITICAL_SERVICE_ERROR);
523
524 info!("Successfully reversed reorg");
525 }
526
527 #[instrument(name = "pop_blocks", skip(self), level = "info")]
536 async fn pop_blocks(&mut self, numb_blocks: usize) -> ChainId {
537 let BlockchainResponse::PopBlocks(old_main_chain_id) = self
538 .blockchain_write_handle
539 .ready()
540 .await
541 .expect(PANIC_CRITICAL_SERVICE_ERROR)
542 .call(BlockchainWriteRequest::PopBlocks(numb_blocks))
543 .await
544 .expect(PANIC_CRITICAL_SERVICE_ERROR)
545 else {
546 unreachable!();
547 };
548
549 self.blockchain_context_service
550 .ready()
551 .await
552 .expect(PANIC_CRITICAL_SERVICE_ERROR)
553 .call(BlockChainContextRequest::PopBlocks { numb_blocks })
554 .await
555 .expect(PANIC_CRITICAL_SERVICE_ERROR);
556
557 old_main_chain_id
558 }
559
560 async fn verify_add_alt_blocks_to_main_chain(
576 &mut self,
577 alt_blocks: Vec<AltBlockInformation>,
578 ) -> Result<(), anyhow::Error> {
579 for mut alt_block in alt_blocks {
580 let prepped_txs = alt_block
581 .txs
582 .drain(..)
583 .map(|tx| Ok(tx.try_into()?))
584 .collect::<Result<_, anyhow::Error>>()?;
585
586 let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
587
588 let verified_block = verify_prepped_main_chain_block(
589 prepped_block,
590 prepped_txs,
591 &mut self.blockchain_context_service,
592 self.blockchain_read_handle.clone(),
593 None,
594 )
595 .await?;
596
597 self.add_valid_block_to_main_chain(verified_block).await;
598 }
599
600 Ok(())
601 }
602
603 pub async fn add_valid_block_to_main_chain(
612 &mut self,
613 verified_block: VerifiedBlockInformation,
614 ) {
615 let spent_key_images = verified_block
617 .txs
618 .iter()
619 .flat_map(|tx| {
620 tx.tx.prefix().inputs.iter().map(|input| match input {
621 Input::ToKey { key_image, .. } => key_image.0,
622 Input::Gen(_) => unreachable!(),
623 })
624 })
625 .collect::<Vec<[u8; 32]>>();
626
627 self.add_valid_block_to_blockchain_cache(&verified_block)
628 .await;
629
630 self.blockchain_write_handle
631 .ready()
632 .await
633 .expect(PANIC_CRITICAL_SERVICE_ERROR)
634 .call(BlockchainWriteRequest::WriteBlock(verified_block))
635 .await
636 .expect(PANIC_CRITICAL_SERVICE_ERROR);
637
638 self.txpool_manager_handle
639 .new_block(spent_key_images)
640 .await
641 .expect(PANIC_CRITICAL_SERVICE_ERROR);
642 }
643
644 async fn add_valid_block_to_blockchain_cache(
651 &mut self,
652 verified_block: &VerifiedBlockInformation,
653 ) {
654 self.blockchain_context_service
655 .ready()
656 .await
657 .expect(PANIC_CRITICAL_SERVICE_ERROR)
658 .call(BlockChainContextRequest::Update(NewBlockData {
659 block_hash: verified_block.block_hash,
660 height: verified_block.height,
661 timestamp: verified_block.block.header.timestamp,
662 weight: verified_block.weight,
663 long_term_weight: verified_block.long_term_weight,
664 generated_coins: verified_block.generated_coins,
665 vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
666 cumulative_difficulty: verified_block.cumulative_difficulty,
667 }))
668 .await
669 .expect(PANIC_CRITICAL_SERVICE_ERROR);
670 }
671
672 async fn batch_add_valid_block_to_blockchain_database(
681 &mut self,
682 blocks: Vec<VerifiedBlockInformation>,
683 ) {
684 self.blockchain_write_handle
685 .ready()
686 .await
687 .expect(PANIC_CRITICAL_SERVICE_ERROR)
688 .call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
689 .await
690 .expect(PANIC_CRITICAL_SERVICE_ERROR);
691 }
692}
693
694enum AddAltBlock {
696 Cached(bool),
700 Reorged,
702}
703
704pub fn alt_block_to_verified_block_information(
710 block: AltBlockInformation,
711 blockchain_ctx: &BlockchainContext,
712) -> VerifiedBlockInformation {
713 assert_eq!(
714 block.height, blockchain_ctx.chain_height,
715 "alt-block invalid"
716 );
717
718 let total_fees = block.txs.iter().map(|tx| tx.fee).sum::<u64>();
719 let total_outputs = block
720 .block
721 .miner_transaction
722 .prefix()
723 .outputs
724 .iter()
725 .map(|output| output.amount.unwrap_or(0))
726 .sum::<u64>();
727
728 let generated_coins = total_outputs - total_fees;
729
730 VerifiedBlockInformation {
731 block_blob: block.block_blob,
732 txs: block.txs,
733 block_hash: block.block_hash,
734 pow_hash: [u8::MAX; 32],
735 height: block.height,
736 generated_coins,
737 weight: block.weight,
738 long_term_weight: blockchain_ctx.next_block_long_term_weight(block.weight),
739 cumulative_difficulty: blockchain_ctx.cumulative_difficulty
740 + blockchain_ctx.next_difficulty,
741 block: block.block,
742 }
743}