cuprate_p2p/
inbound_server.rs

1//! # Inbound Server
2//!
3//! This module contains the inbound connection server, which listens for inbound connections, gives
4//! them to the handshaker service and then adds them to the client pool.
5use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9    sync::{mpsc, Semaphore},
10    task::JoinSet,
11    time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17    client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18    services::{AddressBookRequest, AddressBookResponse},
19    AddressBook, ConnectionDirection, NetworkZone, Transport,
20};
21use cuprate_wire::{
22    admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23    AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27    constants::{
28        HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29        PING_REQUEST_TIMEOUT,
30    },
31    P2PConfig,
32};
33
34/// Starts the inbound server. This function will listen to all incoming connections
35/// and initiate handshake if needed, after verifying the address isn't banned.
36#[instrument(level = "warn", skip_all)]
37pub(super) async fn inbound_server<Z, T, HS, A>(
38    new_connection_tx: mpsc::Sender<Client<Z>>,
39    mut handshaker: HS,
40    mut address_book: A,
41    config: P2PConfig<Z>,
42    transport_config: Option<T::ServerConfig>,
43    inbound_semaphore: Arc<Semaphore>,
44) -> Result<(), tower::BoxError>
45where
46    Z: NetworkZone,
47    T: Transport<Z>,
48    HS: Service<DoHandshakeRequest<Z, T>, Response = Client<Z>, Error = HandshakeError>
49        + Send
50        + 'static,
51    HS::Future: Send + 'static,
52    A: AddressBook<Z>,
53{
54    // Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
55    let our_peer_id = config.basic_node_data().peer_id;
56
57    // Mandatory. Extract server config from P2PConfig
58    let Some(server_config) = transport_config else {
59        tracing::warn!("No inbound server config provided, not listening for inbound connections.");
60        return Ok(());
61    };
62
63    tracing::info!("Starting inbound connection server");
64
65    let listener = T::incoming_connection_listener(server_config)
66        .await
67        .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
68
69    let mut listener = pin!(listener);
70
71    // Use the provided semaphore for limiting to maximum inbound connections.
72    let semaphore = inbound_semaphore;
73    // Create ping request handling JoinSet
74    let mut ping_join_set = JoinSet::new();
75
76    // Listen to incoming connections and extract necessary information.
77    while let Some(connection) = listener.next().await {
78        let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
79            continue;
80        };
81
82        // If peer is banned, drop connection
83        if let Some(addr) = &addr {
84            let AddressBookResponse::GetBan { unban_instant } = address_book
85                .ready()
86                .await?
87                .call(AddressBookRequest::GetBan(*addr))
88                .await?
89            else {
90                panic!("Address book returned incorrect response!");
91            };
92
93            if unban_instant.is_some() {
94                continue;
95            }
96        }
97
98        // Create a new internal id for new peers
99        let addr = match addr {
100            Some(addr) => InternalPeerID::KnownAddr(addr),
101            None => InternalPeerID::Unknown(rand::random()),
102        };
103
104        // If we're still behind our maximum limit, Initiate handshake.
105        if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
106            tracing::debug!("Permit free for incoming connection, attempting handshake.");
107
108            let fut = handshaker.ready().await?.call(DoHandshakeRequest {
109                addr,
110                peer_stream,
111                peer_sink,
112                direction: ConnectionDirection::Inbound,
113                permit: Some(permit),
114            });
115
116            let new_connection_tx = new_connection_tx.clone();
117
118            tokio::spawn(
119                async move {
120                    let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
121
122                    match client {
123                        Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
124                        Err(_) => tracing::debug!("Timed out"),
125                        Ok(Err(e)) => tracing::debug!("error: {e:?}"),
126                    }
127                }
128                .instrument(Span::current()),
129            );
130        } else {
131            // Otherwise check if the node is simply pinging us.
132            tracing::debug!("No permit free for incoming connection.");
133
134            // We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
135            if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
136                ping_join_set.spawn(
137                    async move {
138                        // Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
139                        let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
140
141                        // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
142                        if matches!(
143                            fut.await,
144                            Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
145                        ) {
146                            let response = peer_sink
147                                .send(
148                                    Message::Response(AdminResponseMessage::Ping(PingResponse {
149                                        status: PING_OK_RESPONSE_STATUS_TEXT,
150                                        peer_id: our_peer_id,
151                                    }))
152                                    .into(),
153                                )
154                                .await;
155
156                            if let Err(err) = response {
157                                tracing::debug!(
158                                    "Unable to respond to ping request from peer ({addr}): {err}"
159                                );
160                            }
161                        }
162                    }
163                    .instrument(Span::current()),
164                );
165            }
166        }
167
168        sleep(INBOUND_CONNECTION_COOL_DOWN).await;
169    }
170
171    Ok(())
172}