cuprated/blockchain/
manager.rs1use std::{collections::HashMap, sync::Arc};
2
3use futures::StreamExt;
4use monero_serai::block::Block;
5use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit};
6use tower::{BoxError, Service, ServiceExt};
7use tracing::error;
8
9use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
10use cuprate_consensus::{
11 BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
12 ExtendedConsensusError,
13};
14use cuprate_p2p::{
15 block_downloader::{BlockBatch, BlockDownloaderConfig},
16 BroadcastSvc, NetworkInterface,
17};
18use cuprate_p2p_core::ClearNet;
19use cuprate_txpool::service::TxpoolWriteHandle;
20use cuprate_types::{
21 blockchain::{BlockchainReadRequest, BlockchainResponse},
22 Chain, TransactionVerificationData,
23};
24
25use crate::{
26 blockchain::{
27 chain_service::ChainService, interface::COMMAND_TX, syncer,
28 types::ConsensusBlockchainReadHandle,
29 },
30 constants::PANIC_CRITICAL_SERVICE_ERROR,
31};
32
33mod commands;
34mod handler;
35
36#[cfg(test)]
37mod tests;
38
39pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
40
41pub async fn init_blockchain_manager(
46 clearnet_interface: NetworkInterface<ClearNet>,
47 blockchain_write_handle: BlockchainWriteHandle,
48 blockchain_read_handle: BlockchainReadHandle,
49 txpool_write_handle: TxpoolWriteHandle,
50 mut blockchain_context_service: BlockchainContextService,
51 block_downloader_config: BlockDownloaderConfig,
52) {
53 let (batch_tx, batch_rx) = mpsc::channel(1);
55 let stop_current_block_downloader = Arc::new(Notify::new());
56 let (command_tx, command_rx) = mpsc::channel(3);
57
58 COMMAND_TX.set(command_tx).unwrap();
59
60 tokio::spawn(syncer::syncer(
61 blockchain_context_service.clone(),
62 ChainService(blockchain_read_handle.clone()),
63 clearnet_interface.clone(),
64 batch_tx,
65 Arc::clone(&stop_current_block_downloader),
66 block_downloader_config,
67 ));
68
69 let manager = BlockchainManager {
70 blockchain_write_handle,
71 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
72 blockchain_read_handle,
73 BoxError::from,
74 ),
75 txpool_write_handle,
76 blockchain_context_service,
77 stop_current_block_downloader,
78 broadcast_svc: clearnet_interface.broadcast_svc(),
79 };
80
81 tokio::spawn(manager.run(batch_rx, command_rx));
82}
83
84pub struct BlockchainManager {
91 blockchain_write_handle: BlockchainWriteHandle,
94 blockchain_read_handle: ConsensusBlockchainReadHandle,
96 txpool_write_handle: TxpoolWriteHandle,
98 blockchain_context_service: BlockchainContextService,
101 stop_current_block_downloader: Arc<Notify>,
104 broadcast_svc: BroadcastSvc<ClearNet>,
106}
107
108impl BlockchainManager {
109 pub async fn run(
111 mut self,
112 mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
113 mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
114 ) {
115 loop {
116 tokio::select! {
117 Some((batch, permit)) = block_batch_rx.recv() => {
118 self.handle_incoming_block_batch(
119 batch,
120 ).await;
121
122 drop(permit);
123 }
124 Some(incoming_command) = command_rx.recv() => {
125 self.handle_command(incoming_command).await;
126 }
127 else => {
128 todo!("TODO: exit the BC manager")
129 }
130 }
131 }
132 }
133}