cuprate_p2p/
inbound_server.rs
1use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9 sync::{mpsc, Semaphore},
10 task::JoinSet,
11 time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17 client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18 services::{AddressBookRequest, AddressBookResponse},
19 AddressBook, ConnectionDirection, NetworkZone, Transport,
20};
21use cuprate_wire::{
22 admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23 AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27 constants::{
28 HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29 PING_REQUEST_TIMEOUT,
30 },
31 P2PConfig,
32};
33
34#[instrument(level = "warn", skip_all)]
37pub async fn inbound_server<Z, T, HS, A>(
38 new_connection_tx: mpsc::Sender<Client<Z>>,
39 mut handshaker: HS,
40 mut address_book: A,
41 config: P2PConfig<Z>,
42 transport_config: Option<T::ServerConfig>,
43) -> Result<(), tower::BoxError>
44where
45 Z: NetworkZone,
46 T: Transport<Z>,
47 HS: Service<DoHandshakeRequest<Z, T>, Response = Client<Z>, Error = HandshakeError>
48 + Send
49 + 'static,
50 HS::Future: Send + 'static,
51 A: AddressBook<Z>,
52{
53 let our_peer_id = config.basic_node_data().peer_id;
55
56 let Some(server_config) = transport_config else {
58 tracing::warn!("No inbound server config provided, not listening for inbound connections.");
59 return Ok(());
60 };
61
62 tracing::info!("Starting inbound connection server");
63
64 let listener = T::incoming_connection_listener(server_config)
65 .await
66 .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
67
68 let mut listener = pin!(listener);
69
70 let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
72 let mut ping_join_set = JoinSet::new();
74
75 while let Some(connection) = listener.next().await {
77 let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
78 continue;
79 };
80
81 if let Some(addr) = &addr {
83 let AddressBookResponse::GetBan { unban_instant } = address_book
84 .ready()
85 .await?
86 .call(AddressBookRequest::GetBan(*addr))
87 .await?
88 else {
89 panic!("Address book returned incorrect response!");
90 };
91
92 if unban_instant.is_some() {
93 continue;
94 }
95 }
96
97 let addr = match addr {
99 Some(addr) => InternalPeerID::KnownAddr(addr),
100 None => InternalPeerID::Unknown(rand::random()),
101 };
102
103 if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
105 tracing::debug!("Permit free for incoming connection, attempting handshake.");
106
107 let fut = handshaker.ready().await?.call(DoHandshakeRequest {
108 addr,
109 peer_stream,
110 peer_sink,
111 direction: ConnectionDirection::Inbound,
112 permit: Some(permit),
113 });
114
115 let new_connection_tx = new_connection_tx.clone();
116
117 tokio::spawn(
118 async move {
119 let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
120
121 match client {
122 Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
123 Err(_) => tracing::debug!("Timed out"),
124 Ok(Err(e)) => tracing::debug!("error: {e:?}"),
125 }
126 }
127 .instrument(Span::current()),
128 );
129 } else {
130 tracing::debug!("No permit free for incoming connection.");
132
133 if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
135 ping_join_set.spawn(
136 async move {
137 let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
139
140 if matches!(
142 fut.await,
143 Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
144 ) {
145 let response = peer_sink
146 .send(
147 Message::Response(AdminResponseMessage::Ping(PingResponse {
148 status: PING_OK_RESPONSE_STATUS_TEXT,
149 peer_id: our_peer_id,
150 }))
151 .into(),
152 )
153 .await;
154
155 if let Err(err) = response {
156 tracing::debug!(
157 "Unable to respond to ping request from peer ({addr}): {err}"
158 );
159 }
160 }
161 }
162 .instrument(Span::current()),
163 );
164 }
165 }
166
167 sleep(INBOUND_CONNECTION_COOL_DOWN).await;
168 }
169
170 Ok(())
171}