1use std::sync::Arc;
7
8use rand::{distributions::Bernoulli, prelude::*};
9use tokio::{
10 sync::{mpsc, OwnedSemaphorePermit, Semaphore},
11 task::JoinSet,
12 time::{sleep, timeout},
13};
14use tower::{Service, ServiceExt};
15use tracing::{instrument, Instrument, Span};
16
17use cuprate_p2p_core::{
18 client::{Client, ConnectRequest, HandshakeError},
19 services::{AddressBookRequest, AddressBookResponse},
20 AddressBook, NetworkZone,
21};
22
23use crate::{
24 config::P2PConfig,
25 constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
26};
27
28enum OutboundConnectorError {
29 MaxConnections,
30 FailedToConnectToSeeds,
31 NoAvailablePeers,
32}
33
34pub struct MakeConnectionRequest {
39 block_needed: Option<usize>,
41}
42
43pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
47 pub new_peers_tx: mpsc::Sender<Client<N>>,
49 pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
51 pub address_book_svc: A,
53 pub connector_svc: C,
55 pub outbound_semaphore: Arc<Semaphore>,
57 pub extra_peers: usize,
61 pub config: P2PConfig<N>,
63 pub peer_type_gen: Bernoulli,
68}
69
70impl<N, A, C> OutboundConnectionKeeper<N, A, C>
71where
72 N: NetworkZone,
73 A: AddressBook<N>,
74 C: Service<ConnectRequest<N>, Response = Client<N>, Error = HandshakeError>,
75 C::Future: Send + 'static,
76{
77 pub fn new(
78 config: P2PConfig<N>,
79 new_peers_tx: mpsc::Sender<Client<N>>,
80 make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
81 address_book_svc: A,
82 connector_svc: C,
83 ) -> Self {
84 let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
85 .expect("Gray peer percent is incorrect should be 0..=1");
86
87 Self {
88 new_peers_tx,
89 make_connection_rx,
90 address_book_svc,
91 connector_svc,
92 outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
93 extra_peers: 0,
94 config,
95 peer_type_gen,
96 }
97 }
98
99 #[instrument(level = "info", skip(self))]
101 #[expect(
102 clippy::significant_drop_in_scrutinee,
103 clippy::significant_drop_tightening
104 )]
105 async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
106 let seeds = self
107 .config
108 .seeds
109 .choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
110
111 assert_ne!(seeds.len(), 0, "No seed nodes available to get peers from");
112
113 let mut allowed_errors = seeds.len();
114
115 let mut handshake_futs = JoinSet::new();
116
117 for seed in seeds {
118 tracing::info!("Getting peers from seed node: {}", seed);
119
120 let fut = timeout(
121 HANDSHAKE_TIMEOUT,
122 self.connector_svc
123 .ready()
124 .await
125 .expect("Connector had an error in `poll_ready`")
126 .call(ConnectRequest {
127 addr: *seed,
128 permit: None,
129 }),
130 );
131 handshake_futs.spawn(fut);
133 }
134
135 while let Some(res) = handshake_futs.join_next().await {
136 if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
137 allowed_errors -= 1;
138 }
139 }
140
141 if allowed_errors == 0 {
142 Err(OutboundConnectorError::FailedToConnectToSeeds)
143 } else {
144 Ok(())
145 }
146 }
147
148 #[instrument(level = "info", skip_all)]
150 async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
151 let new_peers_tx = self.new_peers_tx.clone();
152 let connection_fut = self
153 .connector_svc
154 .ready()
155 .await
156 .expect("Connector had an error in `poll_ready`")
157 .call(ConnectRequest {
158 addr,
159 permit: Some(permit),
160 });
161
162 tokio::spawn(
163 async move {
164 #[expect(clippy::significant_drop_in_scrutinee)]
165 if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
166 drop(new_peers_tx.send(peer).await);
167 }
168 }
169 .instrument(Span::current()),
170 );
171 }
172
173 #[expect(
175 clippy::significant_drop_tightening,
176 reason = "we need to hold onto a permit"
177 )]
178 async fn handle_peer_request(
179 &mut self,
180 req: &MakeConnectionRequest,
181 ) -> Result<(), OutboundConnectorError> {
182 let permit = Arc::clone(&self.outbound_semaphore)
184 .try_acquire_owned()
185 .or_else(|_| {
186 if self.extra_peers >= self.config.extra_outbound_connections {
188 Err(OutboundConnectorError::MaxConnections)
190 } else {
191 self.outbound_semaphore.add_permits(1);
192 self.extra_peers += 1;
193 Ok(Arc::clone(&self.outbound_semaphore)
194 .try_acquire_owned()
195 .unwrap())
196 }
197 })?;
198
199 let peer = self
201 .address_book_svc
202 .ready()
203 .await
204 .expect("Error in address book!")
205 .call(AddressBookRequest::TakeRandomPeer {
206 height: req.block_needed,
207 })
208 .await;
209
210 match peer {
211 Err(_) => {
212 tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
214
215 self.connect_to_random_seeds().await?;
216 Err(OutboundConnectorError::NoAvailablePeers)
217 }
218
219 Ok(AddressBookResponse::Peer(peer)) => {
220 self.connect_to_outbound_peer(permit, peer.adr).await;
221 Ok(())
222 }
223 Ok(_) => panic!("peer list sent incorrect response!"),
224 }
225 }
226
227 #[instrument(level = "debug", skip(self, permit))]
230 async fn handle_free_permit(
231 &mut self,
232 permit: OwnedSemaphorePermit,
233 ) -> Result<(), OutboundConnectorError> {
234 if self.extra_peers > 0 {
235 tracing::debug!(
236 "Permit available but we are over the minimum number of peers, forgetting permit."
237 );
238 permit.forget();
239 self.extra_peers -= 1;
240 return Ok(());
241 }
242
243 tracing::debug!("Permit available, making outbound connection.");
244
245 let req = if self.peer_type_gen.sample(&mut thread_rng()) {
246 AddressBookRequest::TakeRandomGrayPeer { height: None }
247 } else {
248 AddressBookRequest::TakeRandomPeer { height: None }
250 };
251
252 let Ok(AddressBookResponse::Peer(peer)) = self
253 .address_book_svc
254 .ready()
255 .await
256 .expect("Error in address book!")
257 .call(req)
258 .await
259 else {
260 tracing::warn!("No peers in peer list to make connection to.");
261 self.connect_to_random_seeds().await?;
262 return Err(OutboundConnectorError::NoAvailablePeers);
263 };
264
265 self.connect_to_outbound_peer(permit, peer.adr).await;
266 Ok(())
267 }
268
269 pub async fn run(mut self) {
271 tracing::info!(
272 "Starting outbound connection maintainer, target outbound connections: {}",
273 self.config.outbound_connections
274 );
275
276 loop {
277 tokio::select! {
278 biased;
279 peer_req = self.make_connection_rx.recv() => {
280 let Some(peer_req) = peer_req else {
281 tracing::info!("Shutting down outbound connector, make connection channel closed.");
282 return;
283 };
284 #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
285 let _ = self.handle_peer_request(&peer_req).await;
286 },
287 Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
290 if self.handle_free_permit(permit).await.is_err() {
291 sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
294 }
295 }
296 }
297 }
298 }
299}