cuprated/blockchain/manager/
handler.rs1use std::{collections::HashMap, sync::Arc};
3
4use bytes::Bytes;
5use futures::{TryFutureExt, TryStreamExt};
6use monero_serai::{
7 block::Block,
8 transaction::{Input, Transaction},
9};
10use rayon::prelude::*;
11use tower::{Service, ServiceExt};
12use tracing::{info, instrument, Span};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::{
16 block::{
17 batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
18 verify_prepped_main_chain_block, PreparedBlock,
19 },
20 transactions::new_tx_verification_data,
21 BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
22};
23use cuprate_consensus_context::NewBlockData;
24use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
25use cuprate_helper::cast::usize_to_u64;
26use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
27use cuprate_txpool::service::interface::TxpoolWriteRequest;
28use cuprate_types::{
29 blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
30 AltBlockInformation, Chain, HardFork, TransactionVerificationData, VerifiedBlockInformation,
31};
32
33use crate::{
34 blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
35 constants::PANIC_CRITICAL_SERVICE_ERROR,
36 signals::REORG_LOCK,
37};
38
39impl super::BlockchainManager {
40 pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
47 match command {
48 BlockchainManagerCommand::AddBlock {
49 block,
50 prepped_txs,
51 response_tx,
52 } => {
53 let res = self.handle_incoming_block(block, prepped_txs).await;
54
55 drop(response_tx.send(res));
56 }
57 }
58 }
59
60 async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
62 self.broadcast_svc
63 .ready()
64 .await
65 .expect("Broadcast service is Infallible.")
66 .call(BroadcastRequest::Block {
67 block_bytes,
68 current_blockchain_height: usize_to_u64(blockchain_height),
69 })
70 .await
71 .expect("Broadcast service is Infallible.");
72 }
73
74 pub async fn handle_incoming_block(
86 &mut self,
87 block: Block,
88 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
89 ) -> Result<IncomingBlockOk, anyhow::Error> {
90 if block.header.previous
91 != self
92 .blockchain_context_service
93 .blockchain_context()
94 .top_hash
95 {
96 self.handle_incoming_alt_block(block, prepared_txs).await?;
97 return Ok(IncomingBlockOk::AddedToAltChain);
98 }
99
100 let verified_block = verify_main_chain_block(
101 block,
102 prepared_txs,
103 &mut self.blockchain_context_service,
104 self.blockchain_read_handle.clone(),
105 )
106 .await?;
107
108 let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
109 self.add_valid_block_to_main_chain(verified_block).await;
110
111 let chain_height = self
112 .blockchain_context_service
113 .blockchain_context()
114 .chain_height;
115
116 self.broadcast_block(block_blob, chain_height).await;
117
118 Ok(IncomingBlockOk::AddedToMainChain)
119 }
120
121 #[instrument(
131 name = "incoming_block_batch",
132 skip_all,
133 level = "info",
134 fields(
135 start_height = batch.blocks.first().unwrap().0.number().unwrap(),
136 len = batch.blocks.len()
137 )
138 )]
139 pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
140 let (first_block, _) = batch
141 .blocks
142 .first()
143 .expect("Block batch should not be empty");
144
145 if first_block.header.previous
146 == self
147 .blockchain_context_service
148 .blockchain_context()
149 .top_hash
150 {
151 self.handle_incoming_block_batch_main_chain(batch).await;
152 } else {
153 self.handle_incoming_block_batch_alt_chain(batch).await;
154 }
155 }
156
157 async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
170 if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
171 self.handle_incoming_block_batch_fast_sync(batch).await;
172 return;
173 }
174
175 let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
176 batch.blocks,
177 &mut self.blockchain_context_service,
178 self.blockchain_read_handle.clone(),
179 )
180 .await
181 else {
182 batch.peer_handle.ban_peer(LONG_BAN);
183 self.stop_current_block_downloader.notify_one();
184 return;
185 };
186
187 for (block, txs) in prepped_blocks {
188 let Ok(verified_block) = verify_prepped_main_chain_block(
189 block,
190 txs,
191 &mut self.blockchain_context_service,
192 self.blockchain_read_handle.clone(),
193 Some(&mut output_cache),
194 )
195 .await
196 else {
197 batch.peer_handle.ban_peer(LONG_BAN);
198 self.stop_current_block_downloader.notify_one();
199 return;
200 };
201
202 self.add_valid_block_to_main_chain(verified_block).await;
203 }
204 info!(fast_sync = false, "Successfully added block batch");
205 }
206
207 async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
214 let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
215 for (block, txs) in batch.blocks {
216 let block = block_to_verified_block_information(
217 block,
218 txs,
219 self.blockchain_context_service.blockchain_context(),
220 );
221 self.add_valid_block_to_blockchain_cache(&block).await;
222
223 valid_blocks.push(block);
224 }
225
226 self.batch_add_valid_block_to_blockchain_database(valid_blocks)
227 .await;
228
229 info!(fast_sync = true, "Successfully added block batch");
230 }
231
232 async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
245 let mut blocks = batch.blocks.into_iter();
247
248 while let Some((block, txs)) = blocks.next() {
249 let res = async {
251 let txs = txs
252 .into_par_iter()
253 .map(|tx| {
254 let tx = new_tx_verification_data(tx)?;
255 Ok((tx.tx_hash, tx))
256 })
257 .collect::<Result<_, anyhow::Error>>()?;
258
259 let reorged = self.handle_incoming_alt_block(block, txs).await?;
260
261 Ok::<_, anyhow::Error>(reorged)
262 }
263 .await;
264
265 match res {
266 Err(e) => {
267 batch.peer_handle.ban_peer(LONG_BAN);
268 self.stop_current_block_downloader.notify_one();
269 return;
270 }
271 Ok(AddAltBlock::Reorged) => {
272 batch.blocks = blocks.collect();
274 self.handle_incoming_block_batch_main_chain(batch).await;
275 return;
276 }
277 Ok(AddAltBlock::Cached) => (),
279 }
280 }
281
282 info!(alt_chain = true, "Successfully added block batch");
283 }
284
285 async fn handle_incoming_alt_block(
302 &mut self,
303 block: Block,
304 prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
305 ) -> Result<AddAltBlock, anyhow::Error> {
306 let BlockchainResponse::FindBlock(chain) = self
308 .blockchain_read_handle
309 .ready()
310 .await
311 .expect(PANIC_CRITICAL_SERVICE_ERROR)
312 .call(BlockchainReadRequest::FindBlock(block.hash()))
313 .await
314 .expect(PANIC_CRITICAL_SERVICE_ERROR)
315 else {
316 unreachable!();
317 };
318
319 match chain {
320 Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached),
321 Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
322 None => (),
323 }
324
325 let alt_block_info =
326 sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
327 .await?;
328
329 if alt_block_info.cumulative_difficulty
331 > self
332 .blockchain_context_service
333 .blockchain_context()
334 .cumulative_difficulty
335 {
336 self.try_do_reorg(alt_block_info).await?;
337 return Ok(AddAltBlock::Reorged);
338 }
339
340 self.blockchain_write_handle
341 .ready()
342 .await
343 .expect(PANIC_CRITICAL_SERVICE_ERROR)
344 .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
345 .await?;
346
347 Ok(AddAltBlock::Cached)
348 }
349
350 async fn try_do_reorg(
366 &mut self,
367 top_alt_block: AltBlockInformation,
368 ) -> Result<(), anyhow::Error> {
369 let _guard = REORG_LOCK.write().await;
370
371 let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
372 .blockchain_read_handle
373 .ready()
374 .await
375 .expect(PANIC_CRITICAL_SERVICE_ERROR)
376 .call(BlockchainReadRequest::AltBlocksInChain(
377 top_alt_block.chain_id,
378 ))
379 .await
380 .map_err(|e| anyhow::anyhow!(e))?
381 else {
382 unreachable!();
383 };
384
385 alt_blocks.push(top_alt_block);
386
387 let split_height = alt_blocks[0].height;
388 let current_main_chain_height = self
389 .blockchain_context_service
390 .blockchain_context()
391 .chain_height;
392
393 let BlockchainResponse::PopBlocks(old_main_chain_id) = self
394 .blockchain_write_handle
395 .ready()
396 .await
397 .expect(PANIC_CRITICAL_SERVICE_ERROR)
398 .call(BlockchainWriteRequest::PopBlocks(
399 current_main_chain_height - split_height,
400 ))
401 .await
402 .expect(PANIC_CRITICAL_SERVICE_ERROR)
403 else {
404 unreachable!();
405 };
406
407 self.blockchain_context_service
408 .ready()
409 .await
410 .expect(PANIC_CRITICAL_SERVICE_ERROR)
411 .call(BlockChainContextRequest::PopBlocks {
412 numb_blocks: current_main_chain_height - split_height,
413 })
414 .await
415 .expect(PANIC_CRITICAL_SERVICE_ERROR);
416
417 let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
418
419 match reorg_res {
420 Ok(()) => Ok(()),
421 Err(e) => {
422 todo!("Reverse reorg")
423 }
424 }
425 }
426
427 async fn verify_add_alt_blocks_to_main_chain(
443 &mut self,
444 alt_blocks: Vec<AltBlockInformation>,
445 ) -> Result<(), anyhow::Error> {
446 for mut alt_block in alt_blocks {
447 let prepped_txs = alt_block
448 .txs
449 .drain(..)
450 .map(|tx| Ok(tx.try_into()?))
451 .collect::<Result<_, anyhow::Error>>()?;
452
453 let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
454
455 let verified_block = verify_prepped_main_chain_block(
456 prepped_block,
457 prepped_txs,
458 &mut self.blockchain_context_service,
459 self.blockchain_read_handle.clone(),
460 None,
461 )
462 .await?;
463
464 self.add_valid_block_to_main_chain(verified_block).await;
465 }
466
467 Ok(())
468 }
469
470 pub async fn add_valid_block_to_main_chain(
479 &mut self,
480 verified_block: VerifiedBlockInformation,
481 ) {
482 let spent_key_images = verified_block
484 .txs
485 .iter()
486 .flat_map(|tx| {
487 tx.tx.prefix().inputs.iter().map(|input| match input {
488 Input::ToKey { key_image, .. } => key_image.0,
489 Input::Gen(_) => unreachable!(),
490 })
491 })
492 .collect::<Vec<[u8; 32]>>();
493
494 self.add_valid_block_to_blockchain_cache(&verified_block)
495 .await;
496
497 self.blockchain_write_handle
498 .ready()
499 .await
500 .expect(PANIC_CRITICAL_SERVICE_ERROR)
501 .call(BlockchainWriteRequest::WriteBlock(verified_block))
502 .await
503 .expect(PANIC_CRITICAL_SERVICE_ERROR);
504
505 self.txpool_write_handle
506 .ready()
507 .await
508 .expect(PANIC_CRITICAL_SERVICE_ERROR)
509 .call(TxpoolWriteRequest::NewBlock { spent_key_images })
510 .await
511 .expect(PANIC_CRITICAL_SERVICE_ERROR);
512 }
513
514 async fn add_valid_block_to_blockchain_cache(
521 &mut self,
522 verified_block: &VerifiedBlockInformation,
523 ) {
524 self.blockchain_context_service
525 .ready()
526 .await
527 .expect(PANIC_CRITICAL_SERVICE_ERROR)
528 .call(BlockChainContextRequest::Update(NewBlockData {
529 block_hash: verified_block.block_hash,
530 height: verified_block.height,
531 timestamp: verified_block.block.header.timestamp,
532 weight: verified_block.weight,
533 long_term_weight: verified_block.long_term_weight,
534 generated_coins: verified_block.generated_coins,
535 vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
536 cumulative_difficulty: verified_block.cumulative_difficulty,
537 }))
538 .await
539 .expect(PANIC_CRITICAL_SERVICE_ERROR);
540 }
541
542 async fn batch_add_valid_block_to_blockchain_database(
551 &mut self,
552 blocks: Vec<VerifiedBlockInformation>,
553 ) {
554 self.blockchain_write_handle
555 .ready()
556 .await
557 .expect(PANIC_CRITICAL_SERVICE_ERROR)
558 .call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
559 .await
560 .expect(PANIC_CRITICAL_SERVICE_ERROR);
561 }
562}
563
564enum AddAltBlock {
566 Cached,
568 Reorged,
570}