cuprate_p2p/
inbound_server.rs

1//! # Inbound Server
2//!
3//! This module contains the inbound connection server, which listens for inbound connections, gives
4//! them to the handshaker service and then adds them to the client pool.
5use std::{pin::pin, sync::Arc};
6
7use futures::{SinkExt, StreamExt};
8use tokio::{
9    sync::{mpsc, Semaphore},
10    task::JoinSet,
11    time::{sleep, timeout},
12};
13use tower::{Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_p2p_core::{
17    client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
18    services::{AddressBookRequest, AddressBookResponse},
19    AddressBook, ConnectionDirection, NetworkZone,
20};
21use cuprate_wire::{
22    admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23    AdminRequestMessage, AdminResponseMessage, Message,
24};
25
26use crate::{
27    constants::{
28        HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
29        PING_REQUEST_TIMEOUT,
30    },
31    P2PConfig,
32};
33
34/// Starts the inbound server. This function will listen to all incoming connections
35/// and initiate handshake if needed, after verifying the address isn't banned.
36#[instrument(level = "warn", skip_all)]
37pub async fn inbound_server<N, HS, A>(
38    new_connection_tx: mpsc::Sender<Client<N>>,
39    mut handshaker: HS,
40    mut address_book: A,
41    config: P2PConfig<N>,
42) -> Result<(), tower::BoxError>
43where
44    N: NetworkZone,
45    HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
46        + Send
47        + 'static,
48    HS::Future: Send + 'static,
49    A: AddressBook<N>,
50{
51    // Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
52    let our_peer_id = config.basic_node_data().peer_id;
53
54    // Mandatory. Extract server config from P2PConfig
55    let Some(server_config) = config.server_config else {
56        tracing::warn!("No inbound server config provided, not listening for inbound connections.");
57        return Ok(());
58    };
59
60    tracing::info!("Starting inbound connection server");
61
62    let listener = N::incoming_connection_listener(server_config, config.p2p_port)
63        .await
64        .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
65
66    let mut listener = pin!(listener);
67
68    // Create semaphore for limiting to maximum inbound connections.
69    let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
70    // Create ping request handling JoinSet
71    let mut ping_join_set = JoinSet::new();
72
73    // Listen to incoming connections and extract necessary information.
74    while let Some(connection) = listener.next().await {
75        let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
76            continue;
77        };
78
79        // If peer is banned, drop connection
80        if let Some(addr) = &addr {
81            let AddressBookResponse::GetBan { unban_instant } = address_book
82                .ready()
83                .await?
84                .call(AddressBookRequest::GetBan(*addr))
85                .await?
86            else {
87                panic!("Address book returned incorrect response!");
88            };
89
90            if unban_instant.is_some() {
91                continue;
92            }
93        }
94
95        // Create a new internal id for new peers
96        let addr = match addr {
97            Some(addr) => InternalPeerID::KnownAddr(addr),
98            None => InternalPeerID::Unknown(rand::random()),
99        };
100
101        // If we're still behind our maximum limit, Initiate handshake.
102        if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
103            tracing::debug!("Permit free for incoming connection, attempting handshake.");
104
105            let fut = handshaker.ready().await?.call(DoHandshakeRequest {
106                addr,
107                peer_stream,
108                peer_sink,
109                direction: ConnectionDirection::Inbound,
110                permit: Some(permit),
111            });
112
113            let new_connection_tx = new_connection_tx.clone();
114
115            tokio::spawn(
116                async move {
117                    let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
118
119                    match client {
120                        Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
121                        Err(_) => tracing::debug!("Timed out"),
122                        Ok(Err(e)) => tracing::debug!("error: {e:?}"),
123                    }
124                }
125                .instrument(Span::current()),
126            );
127        } else {
128            // Otherwise check if the node is simply pinging us.
129            tracing::debug!("No permit free for incoming connection.");
130
131            // We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
132            if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
133                ping_join_set.spawn(
134                    async move {
135                        // Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
136                        let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
137
138                        // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
139                        if matches!(
140                            fut.await,
141                            Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
142                        ) {
143                            let response = peer_sink
144                                .send(
145                                    Message::Response(AdminResponseMessage::Ping(PingResponse {
146                                        status: PING_OK_RESPONSE_STATUS_TEXT,
147                                        peer_id: our_peer_id,
148                                    }))
149                                    .into(),
150                                )
151                                .await;
152
153                            if let Err(err) = response {
154                                tracing::debug!(
155                                    "Unable to respond to ping request from peer ({addr}): {err}"
156                                );
157                            }
158                        }
159                    }
160                    .instrument(Span::current()),
161                );
162            }
163        }
164
165        sleep(INBOUND_CONNECTION_COOL_DOWN).await;
166    }
167
168    Ok(())
169}