cuprate_p2p_core/client/
request_handler.rs1use futures::TryFutureExt;
2use tower::ServiceExt;
3
4use cuprate_wire::{
5 admin::{
6 PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse,
7 PING_OK_RESPONSE_STATUS_TEXT,
8 },
9 AdminRequestMessage, AdminResponseMessage, BasicNodeData,
10};
11
12use crate::{
13 client::PeerInformation,
14 constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
15 services::{
16 AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
17 },
18 AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler,
19};
20
21#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
22enum PeerRequestHandlerError {
23 #[error("Received a handshake request during a connection.")]
24 ReceivedHandshakeDuringConnection,
25}
26
27#[derive(Debug, Clone)]
29pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PR> {
30 pub address_book_svc: A,
32 pub our_sync_svc: CS,
34
35 pub protocol_request_handler: PR,
37
38 pub our_basic_node_data: BasicNodeData,
40
41 pub peer_info: PeerInformation<Z::Addr>,
43}
44
45impl<Z, A, CS, PR> PeerRequestHandler<Z, A, CS, PR>
46where
47 Z: NetworkZone,
48 A: AddressBook<Z>,
49 CS: CoreSyncSvc,
50 PR: ProtocolRequestHandler,
51{
52 pub(crate) async fn handle_peer_request(
54 &mut self,
55 req: PeerRequest,
56 ) -> Result<PeerResponse, tower::BoxError> {
57 match req {
58 PeerRequest::Admin(admin_req) => match admin_req {
59 AdminRequestMessage::Handshake(_) => {
60 Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into())
61 }
62 AdminRequestMessage::SupportFlags => {
63 let support_flags = self.our_basic_node_data.support_flags;
64
65 Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags(
66 SupportFlagsResponse { support_flags },
67 )))
68 }
69 AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping(
70 PingResponse {
71 peer_id: self.our_basic_node_data.peer_id,
72 status: PING_OK_RESPONSE_STATUS_TEXT,
73 },
74 ))),
75 AdminRequestMessage::TimedSync(timed_sync_req) => {
76 let res = self.handle_timed_sync_request(timed_sync_req).await?;
77
78 Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res)))
79 }
80 },
81
82 PeerRequest::Protocol(protocol_req) => {
83 self.protocol_request_handler
86 .ready()
87 .await?
88 .call(protocol_req)
89 .map_ok(PeerResponse::Protocol)
90 .await
91 }
92 }
93 }
94
95 async fn handle_timed_sync_request(
97 &mut self,
98 req: TimedSyncRequest,
99 ) -> Result<TimedSyncResponse, tower::BoxError> {
100 *self.peer_info.core_sync_data.lock().unwrap() = req.payload_data;
103
104 let AddressBookResponse::Peers(peers) = self
105 .address_book_svc
106 .ready()
107 .await?
108 .call(AddressBookRequest::GetWhitePeers(
109 MAX_PEERS_IN_PEER_LIST_MESSAGE,
110 ))
111 .await?
112 else {
113 panic!("Address book sent incorrect response!");
114 };
115
116 let CoreSyncDataResponse(core_sync_data) = self
117 .our_sync_svc
118 .ready()
119 .await?
120 .call(CoreSyncDataRequest)
121 .await?;
122
123 Ok(TimedSyncResponse {
124 payload_data: core_sync_data,
125 local_peerlist_new: peers.into_iter().map(Into::into).collect(),
126 })
127 }
128}