cuprate_p2p_core/client/
timeout_monitor.rs

1//! Timeout Monitor
2//!
3//! This module holds the task that sends periodic [`TimedSync`](PeerRequest::TimedSync) requests to a peer to make
4//! sure the connection is still active.
5use std::sync::Arc;
6
7use futures::channel::oneshot;
8use tokio::{
9    sync::{mpsc, Semaphore},
10    time::{interval, MissedTickBehavior},
11};
12use tower::ServiceExt;
13use tracing::instrument;
14
15use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage};
16
17use crate::{
18    client::{connection::ConnectionTaskRequest, PeerInformation},
19    constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL},
20    services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse},
21    AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse,
22};
23
24/// The timeout monitor task, this task will send periodic timed sync requests to the peer to make sure it is still active.
25#[instrument(
26    name = "timeout_monitor",
27    level = "debug",
28    fields(addr = %peer_information.id),
29    skip_all,
30)]
31pub async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync>(
32    peer_information: PeerInformation<N::Addr>,
33
34    connection_tx: mpsc::Sender<ConnectionTaskRequest>,
35    semaphore: Arc<Semaphore>,
36
37    mut address_book_svc: AdrBook,
38    mut core_sync_svc: CSync,
39) -> Result<(), tower::BoxError>
40where
41    AdrBook: AddressBook<N>,
42    CSync: CoreSyncSvc,
43{
44    let connection_tx_weak = connection_tx.downgrade();
45    drop(connection_tx);
46
47    // Instead of tracking the time from last message from the peer and sending a timed sync if this value is too high,
48    // we just send a timed sync every [TIMEOUT_INTERVAL] seconds.
49    let mut interval = interval(TIMEOUT_INTERVAL);
50
51    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
52
53    // The first tick ticks instantly.
54    interval.tick().await;
55
56    loop {
57        tokio::select! {
58            () = peer_information.handle.closed() => {
59                tracing::debug!("Closing timeout monitor, connection disconnected.");
60                return Ok(());
61            }
62            _ = interval.tick() => ()
63        }
64
65        tracing::trace!("timeout monitor tick.");
66
67        let Some(connection_tx) = connection_tx_weak.upgrade() else {
68            tracing::debug!("Closing timeout monitor, connection disconnected.");
69            return Ok(());
70        };
71
72        let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else {
73            // If we can't get a permit the connection is currently waiting for a response, so no need to
74            // do a timed sync.
75            continue;
76        };
77
78        let ping_span = tracing::debug_span!("timed_sync");
79
80        // get our core sync data
81        tracing::trace!(parent: &ping_span, "Attempting to get our core sync data");
82        let CoreSyncDataResponse(core_sync_data) = core_sync_svc
83            .ready()
84            .await?
85            .call(CoreSyncDataRequest)
86            .await?;
87
88        let (tx, rx) = oneshot::channel();
89
90        // TODO: Instead of always sending timed syncs, send pings if we have a full peer list.
91
92        tracing::debug!(parent: &ping_span, "Sending timed sync to peer");
93        connection_tx
94            .send(ConnectionTaskRequest {
95                request: PeerRequest::Admin(AdminRequestMessage::TimedSync(TimedSyncRequest {
96                    payload_data: core_sync_data,
97                })),
98                response_channel: tx,
99                permit: Some(permit),
100            })
101            .await?;
102
103        let PeerResponse::Admin(AdminResponseMessage::TimedSync(timed_sync)) = rx.await?? else {
104            panic!("Connection task returned wrong response!");
105        };
106
107        tracing::debug!(
108            parent: &ping_span,
109            "Received timed sync response, incoming peer list len: {}",
110            timed_sync.local_peerlist_new.len()
111        );
112
113        if timed_sync.local_peerlist_new.len() > MAX_PEERS_IN_PEER_LIST_MESSAGE {
114            return Err("Peer sent too many peers in peer list".into());
115        }
116
117        // Tell our address book about the new peers.
118        address_book_svc
119            .ready()
120            .await?
121            .call(AddressBookRequest::IncomingPeerList(
122                timed_sync
123                    .local_peerlist_new
124                    .into_iter()
125                    .map(TryInto::try_into)
126                    .collect::<Result<_, _>>()?,
127            ))
128            .await?;
129
130        *peer_information.core_sync_data.lock().unwrap() = timed_sync.payload_data;
131    }
132}