cuprated/blockchain/
manager.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures::StreamExt;
4use monero_serai::block::Block;
5use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit};
6use tower::{BoxError, Service, ServiceExt};
7use tracing::error;
8
9use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
10use cuprate_consensus::{
11    BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
12    ExtendedConsensusError,
13};
14use cuprate_p2p::{
15    block_downloader::{BlockBatch, BlockDownloaderConfig},
16    BroadcastSvc, NetworkInterface,
17};
18use cuprate_p2p_core::ClearNet;
19use cuprate_txpool::service::TxpoolWriteHandle;
20use cuprate_types::{
21    blockchain::{BlockchainReadRequest, BlockchainResponse},
22    Chain, TransactionVerificationData,
23};
24
25use crate::{
26    blockchain::{
27        chain_service::ChainService, interface::COMMAND_TX, syncer,
28        types::ConsensusBlockchainReadHandle,
29    },
30    constants::PANIC_CRITICAL_SERVICE_ERROR,
31    txpool::TxpoolManagerHandle,
32};
33
34mod commands;
35mod handler;
36
37#[cfg(test)]
38mod tests;
39
40pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
41
42/// Initialize the blockchain manager.
43///
44/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
45/// can be called.
46pub async fn init_blockchain_manager(
47    clearnet_interface: NetworkInterface<ClearNet>,
48    blockchain_write_handle: BlockchainWriteHandle,
49    blockchain_read_handle: BlockchainReadHandle,
50    txpool_manager_handle: TxpoolManagerHandle,
51    mut blockchain_context_service: BlockchainContextService,
52    block_downloader_config: BlockDownloaderConfig,
53) {
54    // TODO: find good values for these size limits
55    let (batch_tx, batch_rx) = mpsc::channel(1);
56    let stop_current_block_downloader = Arc::new(Notify::new());
57    let (command_tx, command_rx) = mpsc::channel(3);
58
59    COMMAND_TX.set(command_tx).unwrap();
60
61    tokio::spawn(syncer::syncer(
62        blockchain_context_service.clone(),
63        ChainService(blockchain_read_handle.clone()),
64        clearnet_interface.clone(),
65        batch_tx,
66        Arc::clone(&stop_current_block_downloader),
67        block_downloader_config,
68    ));
69
70    let manager = BlockchainManager {
71        blockchain_write_handle,
72        blockchain_read_handle: ConsensusBlockchainReadHandle::new(
73            blockchain_read_handle,
74            BoxError::from,
75        ),
76        txpool_manager_handle,
77        blockchain_context_service,
78        stop_current_block_downloader,
79        broadcast_svc: clearnet_interface.broadcast_svc(),
80    };
81
82    tokio::spawn(manager.run(batch_rx, command_rx));
83}
84
85/// The blockchain manager.
86///
87/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must
88/// go through this.
89///
90/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface).
91pub struct BlockchainManager {
92    /// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`]
93    /// is held.
94    blockchain_write_handle: BlockchainWriteHandle,
95    /// A [`BlockchainReadHandle`].
96    blockchain_read_handle: ConsensusBlockchainReadHandle,
97
98    txpool_manager_handle: TxpoolManagerHandle,
99    /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
100    /// values without needing to go to a [`BlockchainReadHandle`].
101    blockchain_context_service: BlockchainContextService,
102    /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
103    /// attempt.
104    stop_current_block_downloader: Arc<Notify>,
105    /// The broadcast service, to broadcast new blocks.
106    broadcast_svc: BroadcastSvc<ClearNet>,
107}
108
109impl BlockchainManager {
110    /// The [`BlockchainManager`] task.
111    pub async fn run(
112        mut self,
113        mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
114        mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
115    ) {
116        loop {
117            tokio::select! {
118                Some((batch, permit)) = block_batch_rx.recv() => {
119                    self.handle_incoming_block_batch(
120                        batch,
121                    ).await;
122
123                    drop(permit);
124                }
125                Some(incoming_command) = command_rx.recv() => {
126                    self.handle_command(incoming_command).await;
127                }
128                else => {
129                    todo!("TODO: exit the BC manager")
130                }
131            }
132        }
133    }
134}