1use std::sync::Arc;
7
8use rand::{distributions::Bernoulli, prelude::*};
9use tokio::{
10 sync::{mpsc, OwnedSemaphorePermit, Semaphore},
11 task::JoinSet,
12 time::{sleep, timeout},
13};
14use tower::{Service, ServiceExt};
15use tracing::{instrument, Instrument, Span};
16
17use cuprate_p2p_core::{
18 client::{Client, ConnectRequest, HandshakeError},
19 services::{AddressBookRequest, AddressBookResponse},
20 AddressBook, NetworkZone,
21};
22
23use crate::{
24 config::P2PConfig,
25 constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
26};
27
28enum OutboundConnectorError {
29 MaxConnections,
30 FailedToConnectToSeeds,
31 NoAvailablePeers,
32}
33
34pub struct MakeConnectionRequest {
39 block_needed: Option<usize>,
41}
42
43pub struct OutboundConnectionKeeper<Z: NetworkZone, A, C> {
47 pub new_peers_tx: mpsc::Sender<Client<Z>>,
49 pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
51 pub address_book_svc: A,
53 pub connector_svc: C,
55 pub outbound_semaphore: Arc<Semaphore>,
57 pub extra_peers: usize,
61 pub config: P2PConfig<Z>,
63 pub peer_type_gen: Bernoulli,
68}
69
70impl<Z, A, C> OutboundConnectionKeeper<Z, A, C>
71where
72 Z: NetworkZone,
73 A: AddressBook<Z>,
74 C: Service<ConnectRequest<Z>, Response = Client<Z>, Error = HandshakeError>,
75 C::Future: Send + 'static,
76{
77 pub fn new(
78 config: P2PConfig<Z>,
79 new_peers_tx: mpsc::Sender<Client<Z>>,
80 make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
81 address_book_svc: A,
82 connector_svc: C,
83 ) -> Self {
84 let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
85 .expect("Gray peer percent is incorrect should be 0..=1");
86
87 Self {
88 new_peers_tx,
89 make_connection_rx,
90 address_book_svc,
91 connector_svc,
92 outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
93 extra_peers: 0,
94 config,
95 peer_type_gen,
96 }
97 }
98
99 #[instrument(level = "info", skip(self))]
101 #[expect(clippy::significant_drop_tightening)]
102 async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
103 let seeds = self
104 .config
105 .seeds
106 .choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
107
108 assert_ne!(seeds.len(), 0, "No seed nodes available to get peers from");
109
110 let mut allowed_errors = seeds.len();
111
112 let mut handshake_futs = JoinSet::new();
113
114 for seed in seeds {
115 tracing::info!("Getting peers from seed node: {}", seed);
116
117 let fut = timeout(
118 HANDSHAKE_TIMEOUT,
119 self.connector_svc
120 .ready()
121 .await
122 .expect("Connector had an error in `poll_ready`")
123 .call(ConnectRequest {
124 addr: *seed,
125 permit: None,
126 }),
127 );
128 handshake_futs.spawn(fut);
130 }
131
132 while let Some(res) = handshake_futs.join_next().await {
133 if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
134 allowed_errors -= 1;
135 }
136 }
137
138 if allowed_errors == 0 {
139 Err(OutboundConnectorError::FailedToConnectToSeeds)
140 } else {
141 Ok(())
142 }
143 }
144
145 #[instrument(level = "info", skip_all)]
147 async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: Z::Addr) {
148 let new_peers_tx = self.new_peers_tx.clone();
149 let connection_fut = self
150 .connector_svc
151 .ready()
152 .await
153 .expect("Connector had an error in `poll_ready`")
154 .call(ConnectRequest {
155 addr,
156 permit: Some(permit),
157 });
158
159 tokio::spawn(
160 async move {
161 if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
162 drop(new_peers_tx.send(peer).await);
163 }
164 }
165 .instrument(Span::current()),
166 );
167 }
168
169 #[expect(
171 clippy::significant_drop_tightening,
172 reason = "we need to hold onto a permit"
173 )]
174 async fn handle_peer_request(
175 &mut self,
176 req: &MakeConnectionRequest,
177 ) -> Result<(), OutboundConnectorError> {
178 let permit = Arc::clone(&self.outbound_semaphore)
180 .try_acquire_owned()
181 .or_else(|_| {
182 if self.extra_peers >= self.config.extra_outbound_connections {
184 Err(OutboundConnectorError::MaxConnections)
186 } else {
187 self.outbound_semaphore.add_permits(1);
188 self.extra_peers += 1;
189 Ok(Arc::clone(&self.outbound_semaphore)
190 .try_acquire_owned()
191 .unwrap())
192 }
193 })?;
194
195 let peer = self
197 .address_book_svc
198 .ready()
199 .await
200 .expect("Error in address book!")
201 .call(AddressBookRequest::TakeRandomPeer {
202 height: req.block_needed,
203 })
204 .await;
205
206 match peer {
207 Err(_) => {
208 tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
210
211 self.connect_to_random_seeds().await?;
212 Err(OutboundConnectorError::NoAvailablePeers)
213 }
214
215 Ok(AddressBookResponse::Peer(peer)) => {
216 self.connect_to_outbound_peer(permit, peer.adr).await;
217 Ok(())
218 }
219 Ok(_) => panic!("peer list sent incorrect response!"),
220 }
221 }
222
223 #[instrument(level = "debug", skip(self, permit))]
226 async fn handle_free_permit(
227 &mut self,
228 permit: OwnedSemaphorePermit,
229 ) -> Result<(), OutboundConnectorError> {
230 if self.extra_peers > 0 {
231 tracing::debug!(
232 "Permit available but we are over the minimum number of peers, forgetting permit."
233 );
234 permit.forget();
235 self.extra_peers -= 1;
236 return Ok(());
237 }
238
239 tracing::debug!("Permit available, making outbound connection.");
240
241 let req = if self.peer_type_gen.sample(&mut thread_rng()) {
242 AddressBookRequest::TakeRandomGrayPeer { height: None }
243 } else {
244 AddressBookRequest::TakeRandomPeer { height: None }
246 };
247
248 let Ok(AddressBookResponse::Peer(peer)) = self
249 .address_book_svc
250 .ready()
251 .await
252 .expect("Error in address book!")
253 .call(req)
254 .await
255 else {
256 tracing::warn!("No peers in peer list to make connection to.");
257 self.connect_to_random_seeds().await?;
258 return Err(OutboundConnectorError::NoAvailablePeers);
259 };
260
261 self.connect_to_outbound_peer(permit, peer.adr).await;
262 Ok(())
263 }
264
265 pub async fn run(mut self) {
267 tracing::info!(
268 "Starting outbound connection maintainer, target outbound connections: {}",
269 self.config.outbound_connections
270 );
271
272 loop {
273 tokio::select! {
274 biased;
275 peer_req = self.make_connection_rx.recv() => {
276 let Some(peer_req) = peer_req else {
277 tracing::info!("Shutting down outbound connector, make connection channel closed.");
278 return;
279 };
280 #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
281 let _ = self.handle_peer_request(&peer_req).await;
282 },
283 Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
286 if self.handle_free_permit(permit).await.is_err() {
287 sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
290 }
291 }
292 }
293 }
294 }
295}