cuprated/blockchain/
syncer.rs1use std::{sync::Arc, time::Duration};
3
4use futures::StreamExt;
5use tokio::{
6 sync::{mpsc, Notify, OwnedSemaphorePermit, Semaphore},
7 time::interval,
8};
9use tower::{Service, ServiceExt};
10use tracing::instrument;
11
12use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext};
13use cuprate_consensus_context::BlockchainContextService;
14use cuprate_p2p::{
15 block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
16 NetworkInterface, PeerSetRequest, PeerSetResponse,
17};
18use cuprate_p2p_core::{ClearNet, NetworkZone};
19
20const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
21
22#[derive(Debug, thiserror::Error)]
24pub enum SyncerError {
25 #[error("Incoming block channel closed.")]
26 IncomingBlockChannelClosed,
27 #[error("One of our services returned an error: {0}.")]
28 ServiceError(#[from] tower::BoxError),
29}
30
31#[instrument(level = "debug", skip_all)]
33#[expect(clippy::significant_drop_tightening)]
34pub async fn syncer<CN>(
35 mut context_svc: BlockchainContextService,
36 our_chain: CN,
37 mut clearnet_interface: NetworkInterface<ClearNet>,
38 incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
39 stop_current_block_downloader: Arc<Notify>,
40 block_downloader_config: BlockDownloaderConfig,
41) -> Result<(), SyncerError>
42where
43 CN: Service<
44 ChainSvcRequest<ClearNet>,
45 Response = ChainSvcResponse<ClearNet>,
46 Error = tower::BoxError,
47 > + Clone
48 + Send
49 + 'static,
50 CN::Future: Send + 'static,
51{
52 tracing::info!("Starting blockchain syncer");
53
54 let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
55
56 tracing::debug!("Waiting for new sync info in top sync channel");
57
58 let semaphore = Arc::new(Semaphore::new(1));
59
60 let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
61 loop {
62 check_sync_interval.tick().await;
63
64 tracing::trace!("Checking connected peers to see if we are behind",);
65
66 let blockchain_context = context_svc.blockchain_context();
67
68 if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
69 continue;
70 }
71
72 tracing::debug!(
73 "We are behind peers claimed cumulative difficulty, starting block downloader"
74 );
75 let mut block_batch_stream =
76 clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
77
78 loop {
79 tokio::select! {
80 () = stop_current_block_downloader.notified() => {
81 tracing::info!("Received stop signal, stopping block downloader");
82
83 drop(sync_permit);
84 sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
85
86 break;
87 }
88 batch = block_batch_stream.next() => {
89 let Some(batch) = batch else {
90 drop(sync_permit);
93 sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
94
95 let blockchain_context = context_svc.blockchain_context();
96
97 if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
98 tracing::info!("Synchronised with the network.");
99 }
100
101 break;
102 };
103
104 tracing::debug!("Got batch, len: {}", batch.blocks.len());
105 if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() {
106 return Err(SyncerError::IncomingBlockChannelClosed);
107 }
108 }
109 }
110 }
111 }
112}
113
114async fn check_behind_peers(
116 blockchain_context: &BlockchainContext,
117 mut clearnet_interface: &mut NetworkInterface<ClearNet>,
118) -> Result<bool, tower::BoxError> {
119 let PeerSetResponse::MostPoWSeen {
120 cumulative_difficulty,
121 ..
122 } = clearnet_interface
123 .peer_set()
124 .ready()
125 .await?
126 .call(PeerSetRequest::MostPoWSeen)
127 .await?
128 else {
129 unreachable!();
130 };
131
132 if cumulative_difficulty <= blockchain_context.cumulative_difficulty {
133 return Ok(false);
134 }
135
136 Ok(true)
137}