cuprated/blockchain/
manager.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures::StreamExt;
4use monero_serai::block::Block;
5use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit};
6use tower::{BoxError, Service, ServiceExt};
7use tracing::error;
8
9use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
10use cuprate_consensus::{
11    BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
12    ExtendedConsensusError,
13};
14use cuprate_p2p::{
15    block_downloader::{BlockBatch, BlockDownloaderConfig},
16    BroadcastSvc, NetworkInterface,
17};
18use cuprate_p2p_core::ClearNet;
19use cuprate_txpool::service::TxpoolWriteHandle;
20use cuprate_types::{
21    blockchain::{BlockchainReadRequest, BlockchainResponse},
22    Chain, TransactionVerificationData,
23};
24
25use crate::{
26    blockchain::{
27        chain_service::ChainService, interface::COMMAND_TX, syncer,
28        types::ConsensusBlockchainReadHandle,
29    },
30    constants::PANIC_CRITICAL_SERVICE_ERROR,
31};
32
33mod commands;
34mod handler;
35
36#[cfg(test)]
37mod tests;
38
39pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
40
41/// Initialize the blockchain manager.
42///
43/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
44/// can be called.
45pub async fn init_blockchain_manager(
46    clearnet_interface: NetworkInterface<ClearNet>,
47    blockchain_write_handle: BlockchainWriteHandle,
48    blockchain_read_handle: BlockchainReadHandle,
49    txpool_write_handle: TxpoolWriteHandle,
50    mut blockchain_context_service: BlockchainContextService,
51    block_downloader_config: BlockDownloaderConfig,
52) {
53    // TODO: find good values for these size limits
54    let (batch_tx, batch_rx) = mpsc::channel(1);
55    let stop_current_block_downloader = Arc::new(Notify::new());
56    let (command_tx, command_rx) = mpsc::channel(3);
57
58    COMMAND_TX.set(command_tx).unwrap();
59
60    tokio::spawn(syncer::syncer(
61        blockchain_context_service.clone(),
62        ChainService(blockchain_read_handle.clone()),
63        clearnet_interface.clone(),
64        batch_tx,
65        Arc::clone(&stop_current_block_downloader),
66        block_downloader_config,
67    ));
68
69    let manager = BlockchainManager {
70        blockchain_write_handle,
71        blockchain_read_handle: ConsensusBlockchainReadHandle::new(
72            blockchain_read_handle,
73            BoxError::from,
74        ),
75        txpool_write_handle,
76        blockchain_context_service,
77        stop_current_block_downloader,
78        broadcast_svc: clearnet_interface.broadcast_svc(),
79    };
80
81    tokio::spawn(manager.run(batch_rx, command_rx));
82}
83
84/// The blockchain manager.
85///
86/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must
87/// go through this.
88///
89/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface).
90pub struct BlockchainManager {
91    /// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`]
92    /// is held.
93    blockchain_write_handle: BlockchainWriteHandle,
94    /// A [`BlockchainReadHandle`].
95    blockchain_read_handle: ConsensusBlockchainReadHandle,
96    /// A [`TxpoolWriteHandle`].
97    txpool_write_handle: TxpoolWriteHandle,
98    /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
99    /// values without needing to go to a [`BlockchainReadHandle`].
100    blockchain_context_service: BlockchainContextService,
101    /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
102    /// attempt.
103    stop_current_block_downloader: Arc<Notify>,
104    /// The broadcast service, to broadcast new blocks.
105    broadcast_svc: BroadcastSvc<ClearNet>,
106}
107
108impl BlockchainManager {
109    /// The [`BlockchainManager`] task.
110    pub async fn run(
111        mut self,
112        mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
113        mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
114    ) {
115        loop {
116            tokio::select! {
117                Some((batch, permit)) = block_batch_rx.recv() => {
118                    self.handle_incoming_block_batch(
119                        batch,
120                    ).await;
121
122                    drop(permit);
123                }
124                Some(incoming_command) = command_rx.recv() => {
125                    self.handle_command(incoming_command).await;
126                }
127                else => {
128                    todo!("TODO: exit the BC manager")
129                }
130            }
131        }
132    }
133}