cuprated/blockchain/
syncer.rs

1// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
2use std::{sync::Arc, time::Duration};
3
4use futures::StreamExt;
5use tokio::{
6    sync::{mpsc, Notify, OwnedSemaphorePermit, Semaphore},
7    time::interval,
8};
9use tower::{Service, ServiceExt};
10use tracing::instrument;
11
12use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext};
13use cuprate_consensus_context::BlockchainContextService;
14use cuprate_p2p::{
15    block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
16    NetworkInterface, PeerSetRequest, PeerSetResponse,
17};
18use cuprate_p2p_core::{ClearNet, NetworkZone};
19
20const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
21
22/// An error returned from the [`syncer`].
23#[derive(Debug, thiserror::Error)]
24pub enum SyncerError {
25    #[error("Incoming block channel closed.")]
26    IncomingBlockChannelClosed,
27    #[error("One of our services returned an error: {0}.")]
28    ServiceError(#[from] tower::BoxError),
29}
30
31/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
32#[instrument(level = "debug", skip_all)]
33#[expect(clippy::significant_drop_tightening)]
34pub async fn syncer<CN>(
35    mut context_svc: BlockchainContextService,
36    our_chain: CN,
37    mut clearnet_interface: NetworkInterface<ClearNet>,
38    incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
39    stop_current_block_downloader: Arc<Notify>,
40    block_downloader_config: BlockDownloaderConfig,
41) -> Result<(), SyncerError>
42where
43    CN: Service<
44            ChainSvcRequest<ClearNet>,
45            Response = ChainSvcResponse<ClearNet>,
46            Error = tower::BoxError,
47        > + Clone
48        + Send
49        + 'static,
50    CN::Future: Send + 'static,
51{
52    tracing::info!("Starting blockchain syncer");
53
54    let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
55
56    tracing::debug!("Waiting for new sync info in top sync channel");
57
58    let semaphore = Arc::new(Semaphore::new(1));
59
60    let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
61    loop {
62        check_sync_interval.tick().await;
63
64        tracing::trace!("Checking connected peers to see if we are behind",);
65
66        let blockchain_context = context_svc.blockchain_context();
67
68        if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
69            continue;
70        }
71
72        tracing::debug!(
73            "We are behind peers claimed cumulative difficulty, starting block downloader"
74        );
75        let mut block_batch_stream =
76            clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
77
78        loop {
79            tokio::select! {
80                () = stop_current_block_downloader.notified() => {
81                    tracing::info!("Received stop signal, stopping block downloader");
82
83                    drop(sync_permit);
84                    sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
85
86                    break;
87                }
88                batch = block_batch_stream.next() => {
89                    let Some(batch) = batch else {
90                        // Wait for all references to the permit have been dropped (which means all blocks in the queue
91                        // have been handled before checking if we are synced.
92                        drop(sync_permit);
93                        sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
94
95                        let blockchain_context = context_svc.blockchain_context();
96
97                        if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
98                            tracing::info!("Synchronised with the network.");
99                        }
100
101                        break;
102                    };
103
104                    tracing::debug!("Got batch, len: {}", batch.blocks.len());
105                    if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() {
106                        return Err(SyncerError::IncomingBlockChannelClosed);
107                    }
108                }
109            }
110        }
111    }
112}
113
114/// Returns `true` if we are behind the current connected network peers.
115async fn check_behind_peers(
116    blockchain_context: &BlockchainContext,
117    mut clearnet_interface: &mut NetworkInterface<ClearNet>,
118) -> Result<bool, tower::BoxError> {
119    let PeerSetResponse::MostPoWSeen {
120        cumulative_difficulty,
121        ..
122    } = clearnet_interface
123        .peer_set()
124        .ready()
125        .await?
126        .call(PeerSetRequest::MostPoWSeen)
127        .await?
128    else {
129        unreachable!();
130    };
131
132    if cumulative_difficulty <= blockchain_context.cumulative_difficulty {
133        return Ok(false);
134    }
135
136    Ok(true)
137}