cuprate_p2p/
connection_maintainer.rs

1//! Outbound Connection Maintainer.
2//!
3//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`].
4//! It also handles making extra connections when the peer set is under load or when we need data that
5//! no connected peer has.
6use std::sync::Arc;
7
8use rand::{distributions::Bernoulli, prelude::*};
9use tokio::{
10    sync::{mpsc, OwnedSemaphorePermit, Semaphore},
11    task::JoinSet,
12    time::{sleep, timeout},
13};
14use tower::{Service, ServiceExt};
15use tracing::{instrument, Instrument, Span};
16
17use cuprate_p2p_core::{
18    client::{Client, ConnectRequest, HandshakeError},
19    services::{AddressBookRequest, AddressBookResponse},
20    AddressBook, NetworkZone,
21};
22
23use crate::{
24    config::P2PConfig,
25    constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
26};
27
28enum OutboundConnectorError {
29    MaxConnections,
30    FailedToConnectToSeeds,
31    NoAvailablePeers,
32}
33
34/// A request from the peer set to make an outbound connection.
35///
36/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer
37/// set needs specific data that none of the currently connected peers have.
38pub struct MakeConnectionRequest {
39    /// The block needed that no connected peers have due to pruning.
40    block_needed: Option<usize>,
41}
42
43/// The outbound connection count keeper.
44///
45/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
46pub struct OutboundConnectionKeeper<Z: NetworkZone, A, C> {
47    /// The pool of currently connected peers.
48    pub new_peers_tx: mpsc::Sender<Client<Z>>,
49    /// The channel that tells us to make new _extra_ outbound connections.
50    pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
51    /// The address book service
52    pub address_book_svc: A,
53    /// The service to connect to a specific peer.
54    pub connector_svc: C,
55    /// A semaphore to keep the amount of outbound peers constant.
56    pub outbound_semaphore: Arc<Semaphore>,
57    /// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore`
58    /// is full, and we need to connect to more peers for blocks or because not enough peers are ready
59    /// we add a permit to the semaphore and keep track here, upto a value in config.
60    pub extra_peers: usize,
61    /// The p2p config.
62    pub config: P2PConfig<Z>,
63    /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
64    /// false if we should connect to a white peer.
65    ///
66    /// This is weighted to the percentage given in `config`.
67    pub peer_type_gen: Bernoulli,
68}
69
70impl<Z, A, C> OutboundConnectionKeeper<Z, A, C>
71where
72    Z: NetworkZone,
73    A: AddressBook<Z>,
74    C: Service<ConnectRequest<Z>, Response = Client<Z>, Error = HandshakeError>,
75    C::Future: Send + 'static,
76{
77    pub fn new(
78        config: P2PConfig<Z>,
79        new_peers_tx: mpsc::Sender<Client<Z>>,
80        make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
81        address_book_svc: A,
82        connector_svc: C,
83    ) -> Self {
84        let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
85            .expect("Gray peer percent is incorrect should be 0..=1");
86
87        Self {
88            new_peers_tx,
89            make_connection_rx,
90            address_book_svc,
91            connector_svc,
92            outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
93            extra_peers: 0,
94            config,
95            peer_type_gen,
96        }
97    }
98
99    /// Connects to random seeds to get peers and immediately disconnects
100    #[instrument(level = "info", skip(self))]
101    #[expect(clippy::significant_drop_tightening)]
102    async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
103        let seeds = self
104            .config
105            .seeds
106            .choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
107
108        assert_ne!(seeds.len(), 0, "No seed nodes available to get peers from");
109
110        let mut allowed_errors = seeds.len();
111
112        let mut handshake_futs = JoinSet::new();
113
114        for seed in seeds {
115            tracing::info!("Getting peers from seed node: {}", seed);
116
117            let fut = timeout(
118                HANDSHAKE_TIMEOUT,
119                self.connector_svc
120                    .ready()
121                    .await
122                    .expect("Connector had an error in `poll_ready`")
123                    .call(ConnectRequest {
124                        addr: *seed,
125                        permit: None,
126                    }),
127            );
128            // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
129            handshake_futs.spawn(fut);
130        }
131
132        while let Some(res) = handshake_futs.join_next().await {
133            if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
134                allowed_errors -= 1;
135            }
136        }
137
138        if allowed_errors == 0 {
139            Err(OutboundConnectorError::FailedToConnectToSeeds)
140        } else {
141            Ok(())
142        }
143    }
144
145    /// Connects to a given outbound peer.
146    #[instrument(level = "info", skip_all)]
147    async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: Z::Addr) {
148        let new_peers_tx = self.new_peers_tx.clone();
149        let connection_fut = self
150            .connector_svc
151            .ready()
152            .await
153            .expect("Connector had an error in `poll_ready`")
154            .call(ConnectRequest {
155                addr,
156                permit: Some(permit),
157            });
158
159        tokio::spawn(
160            async move {
161                if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
162                    drop(new_peers_tx.send(peer).await);
163                }
164            }
165            .instrument(Span::current()),
166        );
167    }
168
169    /// Handles a request from the peer set for more peers.
170    #[expect(
171        clippy::significant_drop_tightening,
172        reason = "we need to hold onto a permit"
173    )]
174    async fn handle_peer_request(
175        &mut self,
176        req: &MakeConnectionRequest,
177    ) -> Result<(), OutboundConnectorError> {
178        // try to get a permit.
179        let permit = Arc::clone(&self.outbound_semaphore)
180            .try_acquire_owned()
181            .or_else(|_| {
182                // if we can't get a permit add one if we are below the max number of connections.
183                if self.extra_peers >= self.config.extra_outbound_connections {
184                    // If we can't add a permit return an error.
185                    Err(OutboundConnectorError::MaxConnections)
186                } else {
187                    self.outbound_semaphore.add_permits(1);
188                    self.extra_peers += 1;
189                    Ok(Arc::clone(&self.outbound_semaphore)
190                        .try_acquire_owned()
191                        .unwrap())
192                }
193            })?;
194
195        // try to get a random peer on any network zone from the address book.
196        let peer = self
197            .address_book_svc
198            .ready()
199            .await
200            .expect("Error in address book!")
201            .call(AddressBookRequest::TakeRandomPeer {
202                height: req.block_needed,
203            })
204            .await;
205
206        match peer {
207            Err(_) => {
208                // TODO: We should probably send peer requests to our connected peers rather than go to seeds.
209                tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
210
211                self.connect_to_random_seeds().await?;
212                Err(OutboundConnectorError::NoAvailablePeers)
213            }
214
215            Ok(AddressBookResponse::Peer(peer)) => {
216                self.connect_to_outbound_peer(permit, peer.adr).await;
217                Ok(())
218            }
219            Ok(_) => panic!("peer list sent incorrect response!"),
220        }
221    }
222
223    /// Handles a free permit, by either connecting to a new peer or by removing a permit if we are above the
224    /// minimum number of outbound connections.
225    #[instrument(level = "debug", skip(self, permit))]
226    async fn handle_free_permit(
227        &mut self,
228        permit: OwnedSemaphorePermit,
229    ) -> Result<(), OutboundConnectorError> {
230        if self.extra_peers > 0 {
231            tracing::debug!(
232                "Permit available but we are over the minimum number of peers, forgetting permit."
233            );
234            permit.forget();
235            self.extra_peers -= 1;
236            return Ok(());
237        }
238
239        tracing::debug!("Permit available, making outbound connection.");
240
241        let req = if self.peer_type_gen.sample(&mut thread_rng()) {
242            AddressBookRequest::TakeRandomGrayPeer { height: None }
243        } else {
244            // This will try white peers first then gray.
245            AddressBookRequest::TakeRandomPeer { height: None }
246        };
247
248        let Ok(AddressBookResponse::Peer(peer)) = self
249            .address_book_svc
250            .ready()
251            .await
252            .expect("Error in address book!")
253            .call(req)
254            .await
255        else {
256            tracing::warn!("No peers in peer list to make connection to.");
257            self.connect_to_random_seeds().await?;
258            return Err(OutboundConnectorError::NoAvailablePeers);
259        };
260
261        self.connect_to_outbound_peer(permit, peer.adr).await;
262        Ok(())
263    }
264
265    /// Runs the outbound connection count keeper.
266    pub async fn run(mut self) {
267        tracing::info!(
268            "Starting outbound connection maintainer, target outbound connections: {}",
269            self.config.outbound_connections
270        );
271
272        loop {
273            tokio::select! {
274                biased;
275                peer_req = self.make_connection_rx.recv() => {
276                    let Some(peer_req) = peer_req else {
277                        tracing::info!("Shutting down outbound connector, make connection channel closed.");
278                        return;
279                    };
280                    #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
281                    let _ = self.handle_peer_request(&peer_req).await;
282                },
283                // This future is not cancellation safe as you will lose your space in the queue but as we are the only place
284                // that actually requires permits that should be ok.
285                Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
286                    if self.handle_free_permit(permit).await.is_err() {
287                        // if we got an error then we still have a permit free so to prevent this from just looping
288                        // uncontrollably add a timeout.
289                        sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
290                    }
291                }
292            }
293        }
294    }
295}