cuprate_p2p/
inbound_server.rs

1//! # Inbound Server
2//!
3//! This module contains the inbound connection server, which listens for inbound connections, gives
4//! them to the handshaker service and then adds them to the client pool.
5use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9    sync::{mpsc, Semaphore},
10    task::JoinSet,
11    time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17    client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18    services::{AddressBookRequest, AddressBookResponse},
19    AddressBook, ConnectionDirection, NetworkZone, Transport,
20};
21use cuprate_wire::{
22    admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23    AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27    constants::{
28        HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29        PING_REQUEST_TIMEOUT,
30    },
31    P2PConfig,
32};
33
34/// Starts the inbound server. This function will listen to all incoming connections
35/// and initiate handshake if needed, after verifying the address isn't banned.
36#[instrument(level = "warn", skip_all)]
37pub async fn inbound_server<Z, T, HS, A>(
38    new_connection_tx: mpsc::Sender<Client<Z>>,
39    mut handshaker: HS,
40    mut address_book: A,
41    config: P2PConfig<Z>,
42    transport_config: Option<T::ServerConfig>,
43) -> Result<(), tower::BoxError>
44where
45    Z: NetworkZone,
46    T: Transport<Z>,
47    HS: Service<DoHandshakeRequest<Z, T>, Response = Client<Z>, Error = HandshakeError>
48        + Send
49        + 'static,
50    HS::Future: Send + 'static,
51    A: AddressBook<Z>,
52{
53    // Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
54    let our_peer_id = config.basic_node_data().peer_id;
55
56    // Mandatory. Extract server config from P2PConfig
57    let Some(server_config) = transport_config else {
58        tracing::warn!("No inbound server config provided, not listening for inbound connections.");
59        return Ok(());
60    };
61
62    tracing::info!("Starting inbound connection server");
63
64    let listener = T::incoming_connection_listener(server_config)
65        .await
66        .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
67
68    let mut listener = pin!(listener);
69
70    // Create semaphore for limiting to maximum inbound connections.
71    let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
72    // Create ping request handling JoinSet
73    let mut ping_join_set = JoinSet::new();
74
75    // Listen to incoming connections and extract necessary information.
76    while let Some(connection) = listener.next().await {
77        let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
78            continue;
79        };
80
81        // If peer is banned, drop connection
82        if let Some(addr) = &addr {
83            let AddressBookResponse::GetBan { unban_instant } = address_book
84                .ready()
85                .await?
86                .call(AddressBookRequest::GetBan(*addr))
87                .await?
88            else {
89                panic!("Address book returned incorrect response!");
90            };
91
92            if unban_instant.is_some() {
93                continue;
94            }
95        }
96
97        // Create a new internal id for new peers
98        let addr = match addr {
99            Some(addr) => InternalPeerID::KnownAddr(addr),
100            None => InternalPeerID::Unknown(rand::random()),
101        };
102
103        // If we're still behind our maximum limit, Initiate handshake.
104        if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
105            tracing::debug!("Permit free for incoming connection, attempting handshake.");
106
107            let fut = handshaker.ready().await?.call(DoHandshakeRequest {
108                addr,
109                peer_stream,
110                peer_sink,
111                direction: ConnectionDirection::Inbound,
112                permit: Some(permit),
113            });
114
115            let new_connection_tx = new_connection_tx.clone();
116
117            tokio::spawn(
118                async move {
119                    let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
120
121                    match client {
122                        Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
123                        Err(_) => tracing::debug!("Timed out"),
124                        Ok(Err(e)) => tracing::debug!("error: {e:?}"),
125                    }
126                }
127                .instrument(Span::current()),
128            );
129        } else {
130            // Otherwise check if the node is simply pinging us.
131            tracing::debug!("No permit free for incoming connection.");
132
133            // We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
134            if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
135                ping_join_set.spawn(
136                    async move {
137                        // Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
138                        let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
139
140                        // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
141                        if matches!(
142                            fut.await,
143                            Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
144                        ) {
145                            let response = peer_sink
146                                .send(
147                                    Message::Response(AdminResponseMessage::Ping(PingResponse {
148                                        status: PING_OK_RESPONSE_STATUS_TEXT,
149                                        peer_id: our_peer_id,
150                                    }))
151                                    .into(),
152                                )
153                                .await;
154
155                            if let Err(err) = response {
156                                tracing::debug!(
157                                    "Unable to respond to ping request from peer ({addr}): {err}"
158                                );
159                            }
160                        }
161                    }
162                    .instrument(Span::current()),
163                );
164            }
165        }
166
167        sleep(INBOUND_CONNECTION_COOL_DOWN).await;
168    }
169
170    Ok(())
171}