1use std::sync::Arc;
6
7use futures::FutureExt;
8use tokio::{
9 sync::mpsc,
10 task::JoinSet,
11 time::{sleep, Duration},
12};
13use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_async_buffer::BufferStream;
17use cuprate_p2p_core::{
18 client::Connector,
19 services::{AddressBookRequest, AddressBookResponse},
20 CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, Transport,
21};
22
23pub mod block_downloader;
24mod broadcast;
25pub mod config;
26pub mod connection_maintainer;
27pub mod constants;
28mod inbound_server;
29mod peer_set;
30
31use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
32pub use broadcast::{BroadcastRequest, BroadcastSvc};
33pub use config::{AddressBookConfig, P2PConfig, TransportConfig};
34use connection_maintainer::MakeConnectionRequest;
35use peer_set::PeerSet;
36pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
37
38const INBOUND_CONNECTION_MONITOR_INTERVAL: Duration = Duration::from_secs(3600);
40
41#[expect(clippy::infinite_loop)]
46async fn inbound_connection_monitor(
47 inbound_semaphore: Arc<tokio::sync::Semaphore>,
48 max_inbound_connections: usize,
49 p2p_port: u16,
50) {
51 if max_inbound_connections == 0 {
53 return;
54 }
55
56 loop {
57 sleep(INBOUND_CONNECTION_MONITOR_INTERVAL).await;
59
60 let available_permits = inbound_semaphore.available_permits();
63 if available_permits == max_inbound_connections {
64 tracing::warn!(
65 "No incoming connections - check firewalls/routers allow port {}",
66 p2p_port
67 );
68 }
69 }
70}
71
72#[instrument(level = "error", name = "net", skip_all, fields(zone = Z::NAME))]
81pub async fn initialize_network<Z, T, PR, CS>(
82 protocol_request_handler_maker: PR,
83 core_sync_svc: CS,
84 config: P2PConfig<Z>,
85 transport_config: TransportConfig<Z, T>,
86) -> Result<NetworkInterface<Z>, tower::BoxError>
87where
88 Z: NetworkZone,
89 T: Transport<Z>,
90 Z::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
91 PR: ProtocolRequestHandlerMaker<Z> + Clone,
92 CS: CoreSyncSvc + Clone,
93{
94 let address_book =
95 cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
96 let address_book = Buffer::new(
97 address_book,
98 config
99 .max_inbound_connections
100 .checked_add(config.outbound_connections)
101 .unwrap(),
102 );
103
104 let (broadcast_svc, outbound_mkr, inbound_mkr) =
107 broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
108
109 let mut basic_node_data = config.basic_node_data();
110
111 if !Z::CHECK_NODE_ID {
112 basic_node_data.peer_id = 1;
113 }
114
115 let outbound_handshaker_builder =
116 cuprate_p2p_core::client::HandshakerBuilder::<Z, T, _, _, _, _>::new(
117 basic_node_data,
118 transport_config.client_config,
119 )
120 .with_address_book(address_book.clone())
121 .with_core_sync_svc(core_sync_svc)
122 .with_protocol_request_handler_maker(protocol_request_handler_maker)
123 .with_broadcast_stream_maker(outbound_mkr)
124 .with_connection_parent_span(Span::current());
125
126 let inbound_handshaker = outbound_handshaker_builder
127 .clone()
128 .with_broadcast_stream_maker(inbound_mkr)
129 .build();
130
131 let outbound_handshaker = outbound_handshaker_builder.build();
132
133 let (new_connection_tx, new_connection_rx) = mpsc::channel(
134 config
135 .outbound_connections
136 .checked_add(config.max_inbound_connections)
137 .unwrap(),
138 );
139 let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
140
141 let outbound_connector = Connector::new(outbound_handshaker);
142 let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
143 config.clone(),
144 new_connection_tx.clone(),
145 make_connection_rx,
146 address_book.clone(),
147 outbound_connector,
148 );
149
150 let peer_set = PeerSet::new(new_connection_rx);
151
152 let inbound_semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_inbound_connections));
154
155 let mut background_tasks = JoinSet::new();
156
157 background_tasks.spawn(
158 outbound_connection_maintainer
159 .run()
160 .instrument(Span::current()),
161 );
162
163 background_tasks.spawn(
165 inbound_connection_monitor(
166 Arc::clone(&inbound_semaphore),
167 config.max_inbound_connections,
168 config.p2p_port,
169 )
170 .instrument(tracing::info_span!("inbound_connection_monitor")),
171 );
172
173 background_tasks.spawn(
174 inbound_server::inbound_server(
175 new_connection_tx,
176 inbound_handshaker,
177 address_book.clone(),
178 config,
179 transport_config.server_config,
180 inbound_semaphore,
181 )
182 .map(|res| {
183 if let Err(e) = res {
184 tracing::error!("Error in inbound connection listener: {e}");
185 }
186
187 tracing::info!("Inbound connection listener shutdown");
188 })
189 .instrument(Span::current()),
190 );
191
192 Ok(NetworkInterface {
193 peer_set: Buffer::new(peer_set, 10).boxed_clone(),
194 broadcast_svc,
195 make_connection_tx,
196 address_book: address_book.boxed_clone(),
197 _background_tasks: Arc::new(background_tasks),
198 })
199}
200
201#[derive(Clone)]
203pub struct NetworkInterface<N: NetworkZone> {
204 peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
206 broadcast_svc: BroadcastSvc<N>,
208 #[expect(dead_code, reason = "will be used eventually")]
210 make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
211 address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
213 _background_tasks: Arc<JoinSet<()>>,
215}
216
217impl<N: NetworkZone> NetworkInterface<N> {
218 pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
220 self.broadcast_svc.clone()
221 }
222
223 pub fn block_downloader<C>(
225 &self,
226 our_chain_service: C,
227 config: BlockDownloaderConfig,
228 ) -> BufferStream<BlockBatch>
229 where
230 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
231 + Send
232 + 'static,
233 C::Future: Send + 'static,
234 {
235 block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
236 }
237
238 pub fn address_book(
240 &self,
241 ) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
242 self.address_book.clone()
243 }
244
245 pub fn peer_set(
247 &mut self,
248 ) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
249 &mut self.peer_set
250 }
251}