cuprate_p2p_core/client/
request_handler.rs

1use futures::TryFutureExt;
2use rand::{thread_rng, Rng};
3use tower::ServiceExt;
4
5use cuprate_pruning::PruningSeed;
6use cuprate_wire::{
7    admin::{
8        PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse,
9        PING_OK_RESPONSE_STATUS_TEXT,
10    },
11    AdminRequestMessage, AdminResponseMessage, BasicNodeData,
12};
13
14use crate::{
15    client::PeerInformation,
16    constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
17    services::{
18        AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
19        ZoneSpecificPeerListEntryBase,
20    },
21    AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler,
22};
23
24#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
25enum PeerRequestHandlerError {
26    #[error("Received a handshake request during a connection.")]
27    ReceivedHandshakeDuringConnection,
28}
29
30/// The peer request handler, handles incoming [`PeerRequest`]s to our node.
31#[derive(Debug, Clone)]
32pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PR> {
33    /// The address book service.
34    pub address_book_svc: A,
35    /// Our core sync service.
36    pub our_sync_svc: CS,
37
38    /// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
39    pub protocol_request_handler: PR,
40
41    /// The basic node data of our node.
42    pub our_basic_node_data: BasicNodeData,
43
44    /// The information on the connected peer.
45    pub peer_info: PeerInformation<Z::Addr>,
46}
47
48impl<Z, A, CS, PR> PeerRequestHandler<Z, A, CS, PR>
49where
50    Z: NetworkZone,
51    A: AddressBook<Z>,
52    CS: CoreSyncSvc,
53    PR: ProtocolRequestHandler,
54{
55    /// Handles an incoming [`PeerRequest`] to our node.
56    pub(crate) async fn handle_peer_request(
57        &mut self,
58        req: PeerRequest,
59    ) -> Result<PeerResponse, tower::BoxError> {
60        match req {
61            PeerRequest::Admin(admin_req) => match admin_req {
62                AdminRequestMessage::Handshake(_) => {
63                    Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into())
64                }
65                AdminRequestMessage::SupportFlags => {
66                    let support_flags = self.our_basic_node_data.support_flags;
67
68                    Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags(
69                        SupportFlagsResponse { support_flags },
70                    )))
71                }
72                AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping(
73                    PingResponse {
74                        peer_id: self.our_basic_node_data.peer_id,
75                        status: PING_OK_RESPONSE_STATUS_TEXT,
76                    },
77                ))),
78                AdminRequestMessage::TimedSync(timed_sync_req) => {
79                    let res = self.handle_timed_sync_request(timed_sync_req).await?;
80
81                    Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res)))
82                }
83            },
84
85            PeerRequest::Protocol(protocol_req) => {
86                // TODO: add limits here
87
88                self.protocol_request_handler
89                    .ready()
90                    .await?
91                    .call(protocol_req)
92                    .map_ok(PeerResponse::Protocol)
93                    .await
94            }
95        }
96    }
97
98    /// Handles a [`TimedSyncRequest`] to our node.
99    async fn handle_timed_sync_request(
100        &mut self,
101        req: TimedSyncRequest,
102    ) -> Result<TimedSyncResponse, tower::BoxError> {
103        // TODO: add a limit on the amount of these requests in a certain time period.
104
105        *self.peer_info.core_sync_data.lock().unwrap() = req.payload_data;
106
107        // Fetch core sync data.
108        let CoreSyncDataResponse(core_sync_data) = self
109            .our_sync_svc
110            .ready()
111            .await?
112            .call(CoreSyncDataRequest)
113            .await?;
114
115        // Attempt to fetch our own address if supported by this network zone.
116        let own_addr = if Z::BROADCAST_OWN_ADDR {
117            let AddressBookResponse::OwnAddress(own_addr) = self
118                .address_book_svc
119                .ready()
120                .await?
121                .call(AddressBookRequest::OwnAddress)
122                .await?
123            else {
124                panic!("Address book sent incorrect response!");
125            };
126
127            own_addr
128        } else {
129            None
130        };
131
132        let mut peer_list_req_size = MAX_PEERS_IN_PEER_LIST_MESSAGE;
133        if own_addr.is_some() {
134            peer_list_req_size -= 1;
135        }
136
137        // Fetch a peerlist to send
138        let AddressBookResponse::Peers(mut peers) = self
139            .address_book_svc
140            .ready()
141            .await?
142            .call(AddressBookRequest::GetWhitePeers(peer_list_req_size))
143            .await?
144        else {
145            panic!("Address book sent incorrect response!");
146        };
147
148        if let Some(own_addr) = own_addr {
149            // Append our address to the final peer list
150            peers.insert(
151                thread_rng().gen_range(0..=peers.len()),
152                ZoneSpecificPeerListEntryBase {
153                    adr: own_addr,
154                    id: self.our_basic_node_data.peer_id,
155                    last_seen: 0,
156                    pruning_seed: PruningSeed::NotPruned,
157                    rpc_port: self.our_basic_node_data.rpc_port,
158                    rpc_credits_per_hash: self.our_basic_node_data.rpc_credits_per_hash,
159                },
160            );
161        }
162
163        Ok(TimedSyncResponse {
164            payload_data: core_sync_data,
165            local_peerlist_new: peers.into_iter().map(Into::into).collect(),
166        })
167    }
168}