use std::sync::Arc;
use futures::FutureExt;
use tokio::{
sync::{mpsc, watch},
task::JoinSet,
};
use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::BufferStream;
use cuprate_p2p_core::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
};
mod block_downloader;
mod broadcast;
mod client_pool;
pub mod config;
pub mod connection_maintainer;
mod constants;
mod inbound_server;
mod sync_states;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig;
use connection_maintainer::MakeConnectionRequest;
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
pub async fn initialize_network<N, PR, CS>(
protocol_request_handler: PR,
core_sync_svc: CS,
config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError>
where
N: NetworkZone,
N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
PR: ProtocolRequestHandler + Clone,
CS: CoreSyncSvc + Clone,
{
let address_book =
cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new(
address_book,
config.max_inbound_connections + config.outbound_connections,
);
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
let sync_states_svc = Buffer::new(
sync_states_svc,
config.max_inbound_connections + config.outbound_connections,
);
let (broadcast_svc, outbound_mkr, inbound_mkr) =
broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
let mut basic_node_data = config.basic_node_data();
if !N::CHECK_NODE_ID {
basic_node_data.peer_id = 1;
}
let outbound_handshaker_builder =
cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data)
.with_address_book(address_book.clone())
.with_peer_sync_svc(sync_states_svc.clone())
.with_core_sync_svc(core_sync_svc)
.with_protocol_request_handler(protocol_request_handler)
.with_broadcast_stream_maker(outbound_mkr)
.with_connection_parent_span(Span::current());
let inbound_handshaker = outbound_handshaker_builder
.clone()
.with_broadcast_stream_maker(inbound_mkr)
.build();
let outbound_handshaker = outbound_handshaker_builder.build();
let client_pool = client_pool::ClientPool::new();
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(),
client_pool.clone(),
make_connection_rx,
address_book.clone(),
outbound_connector,
);
let mut background_tasks = JoinSet::new();
background_tasks.spawn(
outbound_connection_maintainer
.run()
.instrument(Span::current()),
);
background_tasks.spawn(
inbound_server::inbound_server(
client_pool.clone(),
inbound_handshaker,
address_book.clone(),
config,
)
.map(|res| {
if let Err(e) = res {
tracing::error!("Error in inbound connection listener: {e}")
}
tracing::info!("Inbound connection listener shutdown")
})
.instrument(Span::current()),
);
Ok(NetworkInterface {
pool: client_pool,
broadcast_svc,
top_block_watch,
make_connection_tx,
sync_states_svc,
address_book: address_book.boxed_clone(),
_background_tasks: Arc::new(background_tasks),
})
}
#[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> {
pool: Arc<client_pool::ClientPool<N>>,
broadcast_svc: BroadcastSvc<N>,
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
#[allow(dead_code)] make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
_background_tasks: Arc<JoinSet<()>>,
}
impl<N: NetworkZone> NetworkInterface<N> {
pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
self.broadcast_svc.clone()
}
pub fn block_downloader<C>(
&self,
our_chain_service: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
block_downloader::download_blocks(
self.pool.clone(),
self.sync_states_svc.clone(),
our_chain_service,
config,
)
}
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())
}
pub fn address_book(
&self,
) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
self.address_book.clone()
}
pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> {
self.pool.borrow_client(peer)
}
}