1use std::sync::Arc;
6
7use futures::FutureExt;
8use tokio::{sync::mpsc, task::JoinSet};
9use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
10use tracing::{instrument, Instrument, Span};
11
12use cuprate_async_buffer::BufferStream;
13use cuprate_p2p_core::{
14 client::Connector,
15 services::{AddressBookRequest, AddressBookResponse},
16 CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, Transport,
17};
18
19pub mod block_downloader;
20mod broadcast;
21pub mod config;
22pub mod connection_maintainer;
23pub mod constants;
24mod inbound_server;
25mod peer_set;
26
27use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
28pub use broadcast::{BroadcastRequest, BroadcastSvc};
29pub use config::{AddressBookConfig, P2PConfig, TransportConfig};
30use connection_maintainer::MakeConnectionRequest;
31use peer_set::PeerSet;
32pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
33
34#[instrument(level = "debug", name = "net", skip_all, fields(zone = Z::NAME))]
43pub async fn initialize_network<Z, T, PR, CS>(
44 protocol_request_handler_maker: PR,
45 core_sync_svc: CS,
46 config: P2PConfig<Z>,
47 transport_config: TransportConfig<Z, T>,
48) -> Result<NetworkInterface<Z>, tower::BoxError>
49where
50 Z: NetworkZone,
51 T: Transport<Z>,
52 Z::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
53 PR: ProtocolRequestHandlerMaker<Z> + Clone,
54 CS: CoreSyncSvc + Clone,
55{
56 let address_book =
57 cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
58 let address_book = Buffer::new(
59 address_book,
60 config
61 .max_inbound_connections
62 .checked_add(config.outbound_connections)
63 .unwrap(),
64 );
65
66 let (broadcast_svc, outbound_mkr, inbound_mkr) =
69 broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
70
71 let mut basic_node_data = config.basic_node_data();
72
73 if !Z::CHECK_NODE_ID {
74 basic_node_data.peer_id = 1;
75 }
76
77 let outbound_handshaker_builder =
78 cuprate_p2p_core::client::HandshakerBuilder::<Z, T, _, _, _, _>::new(
79 basic_node_data,
80 transport_config.client_config,
81 )
82 .with_address_book(address_book.clone())
83 .with_core_sync_svc(core_sync_svc)
84 .with_protocol_request_handler_maker(protocol_request_handler_maker)
85 .with_broadcast_stream_maker(outbound_mkr)
86 .with_connection_parent_span(Span::current());
87
88 let inbound_handshaker = outbound_handshaker_builder
89 .clone()
90 .with_broadcast_stream_maker(inbound_mkr)
91 .build();
92
93 let outbound_handshaker = outbound_handshaker_builder.build();
94
95 let (new_connection_tx, new_connection_rx) = mpsc::channel(
96 config
97 .outbound_connections
98 .checked_add(config.max_inbound_connections)
99 .unwrap(),
100 );
101 let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
102
103 let outbound_connector = Connector::new(outbound_handshaker);
104 let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
105 config.clone(),
106 new_connection_tx.clone(),
107 make_connection_rx,
108 address_book.clone(),
109 outbound_connector,
110 );
111
112 let peer_set = PeerSet::new(new_connection_rx);
113
114 let mut background_tasks = JoinSet::new();
115
116 background_tasks.spawn(
117 outbound_connection_maintainer
118 .run()
119 .instrument(Span::current()),
120 );
121 background_tasks.spawn(
122 inbound_server::inbound_server(
123 new_connection_tx,
124 inbound_handshaker,
125 address_book.clone(),
126 config,
127 transport_config.server_config,
128 )
129 .map(|res| {
130 if let Err(e) = res {
131 tracing::error!("Error in inbound connection listener: {e}");
132 }
133
134 tracing::info!("Inbound connection listener shutdown");
135 })
136 .instrument(Span::current()),
137 );
138
139 Ok(NetworkInterface {
140 peer_set: Buffer::new(peer_set, 10).boxed_clone(),
141 broadcast_svc,
142 make_connection_tx,
143 address_book: address_book.boxed_clone(),
144 _background_tasks: Arc::new(background_tasks),
145 })
146}
147
148#[derive(Clone)]
150pub struct NetworkInterface<N: NetworkZone> {
151 peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
153 broadcast_svc: BroadcastSvc<N>,
155 #[expect(dead_code, reason = "will be used eventually")]
157 make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
158 address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
160 _background_tasks: Arc<JoinSet<()>>,
162}
163
164impl<N: NetworkZone> NetworkInterface<N> {
165 pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
167 self.broadcast_svc.clone()
168 }
169
170 pub fn block_downloader<C>(
172 &self,
173 our_chain_service: C,
174 config: BlockDownloaderConfig,
175 ) -> BufferStream<BlockBatch>
176 where
177 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
178 + Send
179 + 'static,
180 C::Future: Send + 'static,
181 {
182 block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
183 }
184
185 pub fn address_book(
187 &self,
188 ) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
189 self.address_book.clone()
190 }
191
192 pub fn peer_set(
194 &mut self,
195 ) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
196 &mut self.peer_set
197 }
198}