cuprate_p2p/
inbound_server.rs1use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9 sync::{mpsc, Semaphore},
10 task::JoinSet,
11 time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17 client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18 services::{AddressBookRequest, AddressBookResponse},
19 AddressBook, ConnectionDirection, NetworkZone, Transport,
20};
21use cuprate_wire::{
22 admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23 AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27 constants::{
28 HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29 PING_REQUEST_TIMEOUT,
30 },
31 P2PConfig,
32};
33
34#[instrument(level = "warn", skip_all)]
37pub(super) async fn inbound_server<Z, T, HS, A>(
38 new_connection_tx: mpsc::Sender<Client<Z>>,
39 mut handshaker: HS,
40 mut address_book: A,
41 config: P2PConfig<Z>,
42 transport_config: Option<T::ServerConfig>,
43 inbound_semaphore: Arc<Semaphore>,
44) -> Result<(), tower::BoxError>
45where
46 Z: NetworkZone,
47 T: Transport<Z>,
48 HS: Service<DoHandshakeRequest<Z, T>, Response = Client<Z>, Error = HandshakeError>
49 + Send
50 + 'static,
51 HS::Future: Send + 'static,
52 A: AddressBook<Z>,
53{
54 let our_peer_id = config.basic_node_data().peer_id;
56
57 let Some(server_config) = transport_config else {
59 tracing::warn!("No inbound server config provided, not listening for inbound connections.");
60 return Ok(());
61 };
62
63 tracing::info!("Starting inbound connection server");
64
65 let listener = T::incoming_connection_listener(server_config)
66 .await
67 .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
68
69 let mut listener = pin!(listener);
70
71 let semaphore = inbound_semaphore;
73 let mut ping_join_set = JoinSet::new();
75
76 while let Some(connection) = listener.next().await {
78 let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
79 continue;
80 };
81
82 if let Some(addr) = &addr {
84 let AddressBookResponse::GetBan { unban_instant } = address_book
85 .ready()
86 .await?
87 .call(AddressBookRequest::GetBan(*addr))
88 .await?
89 else {
90 panic!("Address book returned incorrect response!");
91 };
92
93 if unban_instant.is_some() {
94 continue;
95 }
96 }
97
98 let addr = match addr {
100 Some(addr) => InternalPeerID::KnownAddr(addr),
101 None => InternalPeerID::Unknown(rand::random()),
102 };
103
104 if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
106 tracing::debug!("Permit free for incoming connection, attempting handshake.");
107
108 let fut = handshaker.ready().await?.call(DoHandshakeRequest {
109 addr,
110 peer_stream,
111 peer_sink,
112 direction: ConnectionDirection::Inbound,
113 permit: Some(permit),
114 });
115
116 let new_connection_tx = new_connection_tx.clone();
117
118 tokio::spawn(
119 async move {
120 let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
121
122 match client {
123 Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
124 Err(_) => tracing::debug!("Timed out"),
125 Ok(Err(e)) => tracing::debug!("error: {e:?}"),
126 }
127 }
128 .instrument(Span::current()),
129 );
130 } else {
131 tracing::debug!("No permit free for incoming connection.");
133
134 if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
136 ping_join_set.spawn(
137 async move {
138 let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
140
141 if matches!(
143 fut.await,
144 Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
145 ) {
146 let response = peer_sink
147 .send(
148 Message::Response(AdminResponseMessage::Ping(PingResponse {
149 status: PING_OK_RESPONSE_STATUS_TEXT,
150 peer_id: our_peer_id,
151 }))
152 .into(),
153 )
154 .await;
155
156 if let Err(err) = response {
157 tracing::debug!(
158 "Unable to respond to ping request from peer ({addr}): {err}"
159 );
160 }
161 }
162 }
163 .instrument(Span::current()),
164 );
165 }
166 }
167
168 sleep(INBOUND_CONNECTION_COOL_DOWN).await;
169 }
170
171 Ok(())
172}