cuprated/blockchain/
manager.rsuse std::{collections::HashMap, sync::Arc};
use futures::StreamExt;
use monero_serai::block::Block;
use tokio::sync::{mpsc, oneshot, Notify};
use tower::{Service, ServiceExt};
use tracing::error;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_consensus_context::RawBlockChainContext;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig},
BroadcastSvc, NetworkInterface,
};
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::TxpoolWriteHandle;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, TransactionVerificationData,
};
use crate::{
blockchain::{
chain_service::ChainService,
interface::COMMAND_TX,
syncer,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};
mod commands;
mod handler;
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
) {
let (batch_tx, batch_rx) = mpsc::channel(1);
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(3);
COMMAND_TX.set(command_tx).unwrap();
tokio::spawn(syncer::syncer(
blockchain_context_service.clone(),
ChainService(blockchain_read_handle.clone()),
clearnet_interface.clone(),
batch_tx,
Arc::clone(&stop_current_block_downloader),
block_downloader_config,
));
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let manager = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
txpool_write_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc: clearnet_interface.broadcast_svc(),
};
tokio::spawn(manager.run(batch_rx, command_rx));
}
pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
blockchain_context_service: BlockChainContextService,
cached_blockchain_context: RawBlockChainContext,
block_verifier_service: ConcreteBlockVerifierService,
stop_current_block_downloader: Arc<Notify>,
broadcast_svc: BroadcastSvc<ClearNet>,
}
impl BlockchainManager {
pub async fn run(
mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>,
mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
) {
loop {
tokio::select! {
Some(batch) = block_batch_rx.recv() => {
self.handle_incoming_block_batch(
batch,
).await;
}
Some(incoming_command) = command_rx.recv() => {
self.handle_command(incoming_command).await;
}
else => {
todo!("TODO: exit the BC manager")
}
}
}
}
}