cuprate_p2p/
inbound_server.rs
1use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9 sync::{mpsc, Semaphore},
10 task::JoinSet,
11 time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17 client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18 services::{AddressBookRequest, AddressBookResponse},
19 AddressBook, ConnectionDirection, NetworkZone,
20};
21use cuprate_wire::{
22 admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23 AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27 constants::{
28 HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29 PING_REQUEST_TIMEOUT,
30 },
31 P2PConfig,
32};
33
34#[instrument(level = "warn", skip_all)]
37pub async fn inbound_server<N, HS, A>(
38 new_connection_tx: mpsc::Sender<Client<N>>,
39 mut handshaker: HS,
40 mut address_book: A,
41 config: P2PConfig<N>,
42) -> Result<(), tower::BoxError>
43where
44 N: NetworkZone,
45 HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
46 + Send
47 + 'static,
48 HS::Future: Send + 'static,
49 A: AddressBook<N>,
50{
51 let our_peer_id = config.basic_node_data().peer_id;
53
54 let Some(server_config) = config.server_config else {
56 tracing::warn!("No inbound server config provided, not listening for inbound connections.");
57 return Ok(());
58 };
59
60 tracing::info!("Starting inbound connection server");
61
62 let listener = N::incoming_connection_listener(server_config, config.p2p_port)
63 .await
64 .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
65
66 let mut listener = pin!(listener);
67
68 let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
70 let mut ping_join_set = JoinSet::new();
72
73 while let Some(connection) = listener.next().await {
75 let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
76 continue;
77 };
78
79 if let Some(addr) = &addr {
81 let AddressBookResponse::GetBan { unban_instant } = address_book
82 .ready()
83 .await?
84 .call(AddressBookRequest::GetBan(*addr))
85 .await?
86 else {
87 panic!("Address book returned incorrect response!");
88 };
89
90 if unban_instant.is_some() {
91 continue;
92 }
93 }
94
95 let addr = match addr {
97 Some(addr) => InternalPeerID::KnownAddr(addr),
98 None => InternalPeerID::Unknown(rand::random()),
99 };
100
101 if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
103 tracing::debug!("Permit free for incoming connection, attempting handshake.");
104
105 let fut = handshaker.ready().await?.call(DoHandshakeRequest {
106 addr,
107 peer_stream,
108 peer_sink,
109 direction: ConnectionDirection::Inbound,
110 permit: Some(permit),
111 });
112
113 let new_connection_tx = new_connection_tx.clone();
114
115 tokio::spawn(
116 async move {
117 let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
118
119 match client {
120 Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
121 Err(_) => tracing::debug!("Timed out"),
122 Ok(Err(e)) => tracing::debug!("error: {e:?}"),
123 }
124 }
125 .instrument(Span::current()),
126 );
127 } else {
128 tracing::debug!("No permit free for incoming connection.");
130
131 if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
133 ping_join_set.spawn(
134 async move {
135 let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
137
138 if matches!(
140 fut.await,
141 Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
142 ) {
143 let response = peer_sink
144 .send(
145 Message::Response(AdminResponseMessage::Ping(PingResponse {
146 status: PING_OK_RESPONSE_STATUS_TEXT,
147 peer_id: our_peer_id,
148 }))
149 .into(),
150 )
151 .await;
152
153 if let Err(err) = response {
154 tracing::debug!(
155 "Unable to respond to ping request from peer ({addr}): {err}"
156 );
157 }
158 }
159 }
160 .instrument(Span::current()),
161 );
162 }
163 }
164
165 sleep(INBOUND_CONNECTION_COOL_DOWN).await;
166 }
167
168 Ok(())
169}