cuprate_p2p_core/client/
request_handler.rs

1use futures::TryFutureExt;
2use tower::ServiceExt;
3
4use cuprate_wire::{
5    admin::{
6        PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse,
7        PING_OK_RESPONSE_STATUS_TEXT,
8    },
9    AdminRequestMessage, AdminResponseMessage, BasicNodeData,
10};
11
12use crate::{
13    client::PeerInformation,
14    constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
15    services::{
16        AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
17    },
18    AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler,
19};
20
21#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
22enum PeerRequestHandlerError {
23    #[error("Received a handshake request during a connection.")]
24    ReceivedHandshakeDuringConnection,
25}
26
27/// The peer request handler, handles incoming [`PeerRequest`]s to our node.
28#[derive(Debug, Clone)]
29pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PR> {
30    /// The address book service.
31    pub address_book_svc: A,
32    /// Our core sync service.
33    pub our_sync_svc: CS,
34
35    /// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
36    pub protocol_request_handler: PR,
37
38    /// The basic node data of our node.
39    pub our_basic_node_data: BasicNodeData,
40
41    /// The information on the connected peer.
42    pub peer_info: PeerInformation<Z::Addr>,
43}
44
45impl<Z, A, CS, PR> PeerRequestHandler<Z, A, CS, PR>
46where
47    Z: NetworkZone,
48    A: AddressBook<Z>,
49    CS: CoreSyncSvc,
50    PR: ProtocolRequestHandler,
51{
52    /// Handles an incoming [`PeerRequest`] to our node.
53    pub(crate) async fn handle_peer_request(
54        &mut self,
55        req: PeerRequest,
56    ) -> Result<PeerResponse, tower::BoxError> {
57        match req {
58            PeerRequest::Admin(admin_req) => match admin_req {
59                AdminRequestMessage::Handshake(_) => {
60                    Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into())
61                }
62                AdminRequestMessage::SupportFlags => {
63                    let support_flags = self.our_basic_node_data.support_flags;
64
65                    Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags(
66                        SupportFlagsResponse { support_flags },
67                    )))
68                }
69                AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping(
70                    PingResponse {
71                        peer_id: self.our_basic_node_data.peer_id,
72                        status: PING_OK_RESPONSE_STATUS_TEXT,
73                    },
74                ))),
75                AdminRequestMessage::TimedSync(timed_sync_req) => {
76                    let res = self.handle_timed_sync_request(timed_sync_req).await?;
77
78                    Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res)))
79                }
80            },
81
82            PeerRequest::Protocol(protocol_req) => {
83                // TODO: add limits here
84
85                self.protocol_request_handler
86                    .ready()
87                    .await?
88                    .call(protocol_req)
89                    .map_ok(PeerResponse::Protocol)
90                    .await
91            }
92        }
93    }
94
95    /// Handles a [`TimedSyncRequest`] to our node.
96    async fn handle_timed_sync_request(
97        &mut self,
98        req: TimedSyncRequest,
99    ) -> Result<TimedSyncResponse, tower::BoxError> {
100        // TODO: add a limit on the amount of these requests in a certain time period.
101
102        *self.peer_info.core_sync_data.lock().unwrap() = req.payload_data;
103
104        let AddressBookResponse::Peers(peers) = self
105            .address_book_svc
106            .ready()
107            .await?
108            .call(AddressBookRequest::GetWhitePeers(
109                MAX_PEERS_IN_PEER_LIST_MESSAGE,
110            ))
111            .await?
112        else {
113            panic!("Address book sent incorrect response!");
114        };
115
116        let CoreSyncDataResponse(core_sync_data) = self
117            .our_sync_svc
118            .ready()
119            .await?
120            .call(CoreSyncDataRequest)
121            .await?;
122
123        Ok(TimedSyncResponse {
124            payload_data: core_sync_data,
125            local_peerlist_new: peers.into_iter().map(Into::into).collect(),
126        })
127    }
128}