cuprate_p2p_core/client/
timeout_monitor.rs
1use std::sync::Arc;
6
7use futures::channel::oneshot;
8use tokio::{
9 sync::{mpsc, Semaphore},
10 time::{interval, MissedTickBehavior},
11};
12use tower::ServiceExt;
13use tracing::instrument;
14
15use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage};
16
17use crate::{
18 client::{connection::ConnectionTaskRequest, PeerInformation},
19 constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL},
20 services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse},
21 AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse,
22};
23
24#[instrument(
26 name = "timeout_monitor",
27 level = "debug",
28 fields(addr = %peer_information.id),
29 skip_all,
30)]
31pub async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync>(
32 peer_information: PeerInformation<N::Addr>,
33
34 connection_tx: mpsc::Sender<ConnectionTaskRequest>,
35 semaphore: Arc<Semaphore>,
36
37 mut address_book_svc: AdrBook,
38 mut core_sync_svc: CSync,
39) -> Result<(), tower::BoxError>
40where
41 AdrBook: AddressBook<N>,
42 CSync: CoreSyncSvc,
43{
44 let connection_tx_weak = connection_tx.downgrade();
45 drop(connection_tx);
46
47 let mut interval = interval(TIMEOUT_INTERVAL);
50
51 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
52
53 interval.tick().await;
55
56 loop {
57 tokio::select! {
58 () = peer_information.handle.closed() => {
59 tracing::debug!("Closing timeout monitor, connection disconnected.");
60 return Ok(());
61 }
62 _ = interval.tick() => ()
63 }
64
65 tracing::trace!("timeout monitor tick.");
66
67 let Some(connection_tx) = connection_tx_weak.upgrade() else {
68 tracing::debug!("Closing timeout monitor, connection disconnected.");
69 return Ok(());
70 };
71
72 let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else {
73 continue;
76 };
77
78 let ping_span = tracing::debug_span!("timed_sync");
79
80 tracing::trace!(parent: &ping_span, "Attempting to get our core sync data");
82 let CoreSyncDataResponse(core_sync_data) = core_sync_svc
83 .ready()
84 .await?
85 .call(CoreSyncDataRequest)
86 .await?;
87
88 let (tx, rx) = oneshot::channel();
89
90 tracing::debug!(parent: &ping_span, "Sending timed sync to peer");
93 connection_tx
94 .send(ConnectionTaskRequest {
95 request: PeerRequest::Admin(AdminRequestMessage::TimedSync(TimedSyncRequest {
96 payload_data: core_sync_data,
97 })),
98 response_channel: tx,
99 permit: Some(permit),
100 })
101 .await?;
102
103 let PeerResponse::Admin(AdminResponseMessage::TimedSync(timed_sync)) = rx.await?? else {
104 panic!("Connection task returned wrong response!");
105 };
106
107 tracing::debug!(
108 parent: &ping_span,
109 "Received timed sync response, incoming peer list len: {}",
110 timed_sync.local_peerlist_new.len()
111 );
112
113 if timed_sync.local_peerlist_new.len() > MAX_PEERS_IN_PEER_LIST_MESSAGE {
114 return Err("Peer sent too many peers in peer list".into());
115 }
116
117 address_book_svc
119 .ready()
120 .await?
121 .call(AddressBookRequest::IncomingPeerList(
122 timed_sync
123 .local_peerlist_new
124 .into_iter()
125 .map(TryInto::try_into)
126 .collect::<Result<_, _>>()?,
127 ))
128 .await?;
129
130 *peer_information.core_sync_data.lock().unwrap() = timed_sync.payload_data;
131 }
132}