cuprate_p2p/
connection_maintainer.rs

1//! Outbound Connection Maintainer.
2//!
3//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`].
4//! It also handles making extra connections when the peer set is under load or when we need data that
5//! no connected peer has.
6use std::sync::Arc;
7
8use rand::{distributions::Bernoulli, prelude::*};
9use tokio::{
10    sync::{mpsc, OwnedSemaphorePermit, Semaphore},
11    task::JoinSet,
12    time::{sleep, timeout},
13};
14use tower::{Service, ServiceExt};
15use tracing::{instrument, Instrument, Span};
16
17use cuprate_p2p_core::{
18    client::{Client, ConnectRequest, HandshakeError},
19    services::{AddressBookRequest, AddressBookResponse},
20    AddressBook, NetworkZone,
21};
22
23use crate::{
24    config::P2PConfig,
25    constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
26};
27
28enum OutboundConnectorError {
29    MaxConnections,
30    FailedToConnectToSeeds,
31    NoAvailablePeers,
32}
33
34/// A request from the peer set to make an outbound connection.
35///
36/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer
37/// set needs specific data that none of the currently connected peers have.
38pub struct MakeConnectionRequest {
39    /// The block needed that no connected peers have due to pruning.
40    block_needed: Option<usize>,
41}
42
43/// The outbound connection count keeper.
44///
45/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
46pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
47    /// The pool of currently connected peers.
48    pub new_peers_tx: mpsc::Sender<Client<N>>,
49    /// The channel that tells us to make new _extra_ outbound connections.
50    pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
51    /// The address book service
52    pub address_book_svc: A,
53    /// The service to connect to a specific peer.
54    pub connector_svc: C,
55    /// A semaphore to keep the amount of outbound peers constant.
56    pub outbound_semaphore: Arc<Semaphore>,
57    /// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore`
58    /// is full, and we need to connect to more peers for blocks or because not enough peers are ready
59    /// we add a permit to the semaphore and keep track here, upto a value in config.
60    pub extra_peers: usize,
61    /// The p2p config.
62    pub config: P2PConfig<N>,
63    /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
64    /// false if we should connect to a white peer.
65    ///
66    /// This is weighted to the percentage given in `config`.
67    pub peer_type_gen: Bernoulli,
68}
69
70impl<N, A, C> OutboundConnectionKeeper<N, A, C>
71where
72    N: NetworkZone,
73    A: AddressBook<N>,
74    C: Service<ConnectRequest<N>, Response = Client<N>, Error = HandshakeError>,
75    C::Future: Send + 'static,
76{
77    pub fn new(
78        config: P2PConfig<N>,
79        new_peers_tx: mpsc::Sender<Client<N>>,
80        make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
81        address_book_svc: A,
82        connector_svc: C,
83    ) -> Self {
84        let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
85            .expect("Gray peer percent is incorrect should be 0..=1");
86
87        Self {
88            new_peers_tx,
89            make_connection_rx,
90            address_book_svc,
91            connector_svc,
92            outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
93            extra_peers: 0,
94            config,
95            peer_type_gen,
96        }
97    }
98
99    /// Connects to random seeds to get peers and immediately disconnects
100    #[instrument(level = "info", skip(self))]
101    #[expect(
102        clippy::significant_drop_in_scrutinee,
103        clippy::significant_drop_tightening
104    )]
105    async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
106        let seeds = self
107            .config
108            .seeds
109            .choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
110
111        assert_ne!(seeds.len(), 0, "No seed nodes available to get peers from");
112
113        let mut allowed_errors = seeds.len();
114
115        let mut handshake_futs = JoinSet::new();
116
117        for seed in seeds {
118            tracing::info!("Getting peers from seed node: {}", seed);
119
120            let fut = timeout(
121                HANDSHAKE_TIMEOUT,
122                self.connector_svc
123                    .ready()
124                    .await
125                    .expect("Connector had an error in `poll_ready`")
126                    .call(ConnectRequest {
127                        addr: *seed,
128                        permit: None,
129                    }),
130            );
131            // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
132            handshake_futs.spawn(fut);
133        }
134
135        while let Some(res) = handshake_futs.join_next().await {
136            if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
137                allowed_errors -= 1;
138            }
139        }
140
141        if allowed_errors == 0 {
142            Err(OutboundConnectorError::FailedToConnectToSeeds)
143        } else {
144            Ok(())
145        }
146    }
147
148    /// Connects to a given outbound peer.
149    #[instrument(level = "info", skip_all)]
150    async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
151        let new_peers_tx = self.new_peers_tx.clone();
152        let connection_fut = self
153            .connector_svc
154            .ready()
155            .await
156            .expect("Connector had an error in `poll_ready`")
157            .call(ConnectRequest {
158                addr,
159                permit: Some(permit),
160            });
161
162        tokio::spawn(
163            async move {
164                #[expect(clippy::significant_drop_in_scrutinee)]
165                if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
166                    drop(new_peers_tx.send(peer).await);
167                }
168            }
169            .instrument(Span::current()),
170        );
171    }
172
173    /// Handles a request from the peer set for more peers.
174    #[expect(
175        clippy::significant_drop_tightening,
176        reason = "we need to hold onto a permit"
177    )]
178    async fn handle_peer_request(
179        &mut self,
180        req: &MakeConnectionRequest,
181    ) -> Result<(), OutboundConnectorError> {
182        // try to get a permit.
183        let permit = Arc::clone(&self.outbound_semaphore)
184            .try_acquire_owned()
185            .or_else(|_| {
186                // if we can't get a permit add one if we are below the max number of connections.
187                if self.extra_peers >= self.config.extra_outbound_connections {
188                    // If we can't add a permit return an error.
189                    Err(OutboundConnectorError::MaxConnections)
190                } else {
191                    self.outbound_semaphore.add_permits(1);
192                    self.extra_peers += 1;
193                    Ok(Arc::clone(&self.outbound_semaphore)
194                        .try_acquire_owned()
195                        .unwrap())
196                }
197            })?;
198
199        // try to get a random peer on any network zone from the address book.
200        let peer = self
201            .address_book_svc
202            .ready()
203            .await
204            .expect("Error in address book!")
205            .call(AddressBookRequest::TakeRandomPeer {
206                height: req.block_needed,
207            })
208            .await;
209
210        match peer {
211            Err(_) => {
212                // TODO: We should probably send peer requests to our connected peers rather than go to seeds.
213                tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
214
215                self.connect_to_random_seeds().await?;
216                Err(OutboundConnectorError::NoAvailablePeers)
217            }
218
219            Ok(AddressBookResponse::Peer(peer)) => {
220                self.connect_to_outbound_peer(permit, peer.adr).await;
221                Ok(())
222            }
223            Ok(_) => panic!("peer list sent incorrect response!"),
224        }
225    }
226
227    /// Handles a free permit, by either connecting to a new peer or by removing a permit if we are above the
228    /// minimum number of outbound connections.
229    #[instrument(level = "debug", skip(self, permit))]
230    async fn handle_free_permit(
231        &mut self,
232        permit: OwnedSemaphorePermit,
233    ) -> Result<(), OutboundConnectorError> {
234        if self.extra_peers > 0 {
235            tracing::debug!(
236                "Permit available but we are over the minimum number of peers, forgetting permit."
237            );
238            permit.forget();
239            self.extra_peers -= 1;
240            return Ok(());
241        }
242
243        tracing::debug!("Permit available, making outbound connection.");
244
245        let req = if self.peer_type_gen.sample(&mut thread_rng()) {
246            AddressBookRequest::TakeRandomGrayPeer { height: None }
247        } else {
248            // This will try white peers first then gray.
249            AddressBookRequest::TakeRandomPeer { height: None }
250        };
251
252        let Ok(AddressBookResponse::Peer(peer)) = self
253            .address_book_svc
254            .ready()
255            .await
256            .expect("Error in address book!")
257            .call(req)
258            .await
259        else {
260            tracing::warn!("No peers in peer list to make connection to.");
261            self.connect_to_random_seeds().await?;
262            return Err(OutboundConnectorError::NoAvailablePeers);
263        };
264
265        self.connect_to_outbound_peer(permit, peer.adr).await;
266        Ok(())
267    }
268
269    /// Runs the outbound connection count keeper.
270    pub async fn run(mut self) {
271        tracing::info!(
272            "Starting outbound connection maintainer, target outbound connections: {}",
273            self.config.outbound_connections
274        );
275
276        loop {
277            tokio::select! {
278                biased;
279                peer_req = self.make_connection_rx.recv() => {
280                    let Some(peer_req) = peer_req else {
281                        tracing::info!("Shutting down outbound connector, make connection channel closed.");
282                        return;
283                    };
284                    #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
285                    let _ = self.handle_peer_request(&peer_req).await;
286                },
287                // This future is not cancellation safe as you will lose your space in the queue but as we are the only place
288                // that actually requires permits that should be ok.
289                Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
290                    if self.handle_free_permit(permit).await.is_err() {
291                        // if we got an error then we still have a permit free so to prevent this from just looping
292                        // uncontrollably add a timeout.
293                        sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
294                    }
295                }
296            }
297        }
298    }
299}