cuprate_p2p_core/client/
request_handler.rs
1use futures::TryFutureExt;
2use rand::{thread_rng, Rng};
3use tower::ServiceExt;
4
5use cuprate_pruning::PruningSeed;
6use cuprate_wire::{
7 admin::{
8 PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse,
9 PING_OK_RESPONSE_STATUS_TEXT,
10 },
11 AdminRequestMessage, AdminResponseMessage, BasicNodeData,
12};
13
14use crate::{
15 client::PeerInformation,
16 constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
17 services::{
18 AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
19 ZoneSpecificPeerListEntryBase,
20 },
21 AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler,
22};
23
24#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
25enum PeerRequestHandlerError {
26 #[error("Received a handshake request during a connection.")]
27 ReceivedHandshakeDuringConnection,
28}
29
30#[derive(Debug, Clone)]
32pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PR> {
33 pub address_book_svc: A,
35 pub our_sync_svc: CS,
37
38 pub protocol_request_handler: PR,
40
41 pub our_basic_node_data: BasicNodeData,
43
44 pub peer_info: PeerInformation<Z::Addr>,
46}
47
48impl<Z, A, CS, PR> PeerRequestHandler<Z, A, CS, PR>
49where
50 Z: NetworkZone,
51 A: AddressBook<Z>,
52 CS: CoreSyncSvc,
53 PR: ProtocolRequestHandler,
54{
55 pub(crate) async fn handle_peer_request(
57 &mut self,
58 req: PeerRequest,
59 ) -> Result<PeerResponse, tower::BoxError> {
60 match req {
61 PeerRequest::Admin(admin_req) => match admin_req {
62 AdminRequestMessage::Handshake(_) => {
63 Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into())
64 }
65 AdminRequestMessage::SupportFlags => {
66 let support_flags = self.our_basic_node_data.support_flags;
67
68 Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags(
69 SupportFlagsResponse { support_flags },
70 )))
71 }
72 AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping(
73 PingResponse {
74 peer_id: self.our_basic_node_data.peer_id,
75 status: PING_OK_RESPONSE_STATUS_TEXT,
76 },
77 ))),
78 AdminRequestMessage::TimedSync(timed_sync_req) => {
79 let res = self.handle_timed_sync_request(timed_sync_req).await?;
80
81 Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res)))
82 }
83 },
84
85 PeerRequest::Protocol(protocol_req) => {
86 self.protocol_request_handler
89 .ready()
90 .await?
91 .call(protocol_req)
92 .map_ok(PeerResponse::Protocol)
93 .await
94 }
95 }
96 }
97
98 async fn handle_timed_sync_request(
100 &mut self,
101 req: TimedSyncRequest,
102 ) -> Result<TimedSyncResponse, tower::BoxError> {
103 *self.peer_info.core_sync_data.lock().unwrap() = req.payload_data;
106
107 let CoreSyncDataResponse(core_sync_data) = self
109 .our_sync_svc
110 .ready()
111 .await?
112 .call(CoreSyncDataRequest)
113 .await?;
114
115 let own_addr = if Z::BROADCAST_OWN_ADDR {
117 let AddressBookResponse::OwnAddress(own_addr) = self
118 .address_book_svc
119 .ready()
120 .await?
121 .call(AddressBookRequest::OwnAddress)
122 .await?
123 else {
124 panic!("Address book sent incorrect response!");
125 };
126
127 own_addr
128 } else {
129 None
130 };
131
132 let mut peer_list_req_size = MAX_PEERS_IN_PEER_LIST_MESSAGE;
133 if own_addr.is_some() {
134 peer_list_req_size -= 1;
135 }
136
137 let AddressBookResponse::Peers(mut peers) = self
139 .address_book_svc
140 .ready()
141 .await?
142 .call(AddressBookRequest::GetWhitePeers(peer_list_req_size))
143 .await?
144 else {
145 panic!("Address book sent incorrect response!");
146 };
147
148 if let Some(own_addr) = own_addr {
149 peers.insert(
151 thread_rng().gen_range(0..=peers.len()),
152 ZoneSpecificPeerListEntryBase {
153 adr: own_addr,
154 id: self.our_basic_node_data.peer_id,
155 last_seen: 0,
156 pruning_seed: PruningSeed::NotPruned,
157 rpc_port: self.our_basic_node_data.rpc_port,
158 rpc_credits_per_hash: self.our_basic_node_data.rpc_credits_per_hash,
159 },
160 );
161 }
162
163 Ok(TimedSyncResponse {
164 payload_data: core_sync_data,
165 local_peerlist_new: peers.into_iter().map(Into::into).collect(),
166 })
167 }
168}