cuprated/blockchain/
manager.rs1use std::{collections::HashMap, sync::Arc};
2
3use futures::StreamExt;
4use monero_serai::block::Block;
5use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit};
6use tower::{BoxError, Service, ServiceExt};
7use tracing::error;
8
9use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
10use cuprate_consensus::{
11 BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
12 ExtendedConsensusError,
13};
14use cuprate_p2p::{
15 block_downloader::{BlockBatch, BlockDownloaderConfig},
16 BroadcastSvc, NetworkInterface,
17};
18use cuprate_p2p_core::ClearNet;
19use cuprate_txpool::service::TxpoolWriteHandle;
20use cuprate_types::{
21 blockchain::{BlockchainReadRequest, BlockchainResponse},
22 Chain, TransactionVerificationData,
23};
24
25use crate::{
26 blockchain::{
27 chain_service::ChainService, interface::COMMAND_TX, syncer,
28 types::ConsensusBlockchainReadHandle,
29 },
30 constants::PANIC_CRITICAL_SERVICE_ERROR,
31 txpool::TxpoolManagerHandle,
32};
33
34mod commands;
35mod handler;
36
37#[cfg(test)]
38mod tests;
39
40pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
41
42pub async fn init_blockchain_manager(
47 clearnet_interface: NetworkInterface<ClearNet>,
48 blockchain_write_handle: BlockchainWriteHandle,
49 blockchain_read_handle: BlockchainReadHandle,
50 txpool_manager_handle: TxpoolManagerHandle,
51 mut blockchain_context_service: BlockchainContextService,
52 block_downloader_config: BlockDownloaderConfig,
53) {
54 let (batch_tx, batch_rx) = mpsc::channel(1);
56 let stop_current_block_downloader = Arc::new(Notify::new());
57 let (command_tx, command_rx) = mpsc::channel(3);
58
59 COMMAND_TX.set(command_tx).unwrap();
60
61 tokio::spawn(syncer::syncer(
62 blockchain_context_service.clone(),
63 ChainService(blockchain_read_handle.clone()),
64 clearnet_interface.clone(),
65 batch_tx,
66 Arc::clone(&stop_current_block_downloader),
67 block_downloader_config,
68 ));
69
70 let manager = BlockchainManager {
71 blockchain_write_handle,
72 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
73 blockchain_read_handle,
74 BoxError::from,
75 ),
76 txpool_manager_handle,
77 blockchain_context_service,
78 stop_current_block_downloader,
79 broadcast_svc: clearnet_interface.broadcast_svc(),
80 };
81
82 tokio::spawn(manager.run(batch_rx, command_rx));
83}
84
85pub struct BlockchainManager {
92 blockchain_write_handle: BlockchainWriteHandle,
95 blockchain_read_handle: ConsensusBlockchainReadHandle,
97
98 txpool_manager_handle: TxpoolManagerHandle,
99 blockchain_context_service: BlockchainContextService,
102 stop_current_block_downloader: Arc<Notify>,
105 broadcast_svc: BroadcastSvc<ClearNet>,
107}
108
109impl BlockchainManager {
110 pub async fn run(
112 mut self,
113 mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
114 mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
115 ) {
116 loop {
117 tokio::select! {
118 Some((batch, permit)) = block_batch_rx.recv() => {
119 self.handle_incoming_block_batch(
120 batch,
121 ).await;
122
123 drop(permit);
124 }
125 Some(incoming_command) = command_rx.recv() => {
126 self.handle_command(incoming_command).await;
127 }
128 else => {
129 todo!("TODO: exit the BC manager")
130 }
131 }
132 }
133 }
134}