cuprated/
p2p.rs

1//! P2P
2//!
3//! Will handle initiating the P2P and contains a protocol request handler.
4
5use std::convert::From;
6
7use futures::{FutureExt, TryFutureExt};
8use tokio::sync::oneshot::{self, Sender};
9use tower::{Service, ServiceExt};
10
11use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
12use cuprate_consensus::BlockchainContextService;
13use cuprate_p2p::{config::TransportConfig, NetworkInterface, P2PConfig};
14use cuprate_p2p_core::{client::InternalPeerID, transports::Tcp, ClearNet, NetworkZone, Transport};
15use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
16use cuprate_types::blockchain::BlockchainWriteRequest;
17
18use crate::{
19    blockchain,
20    config::Config,
21    constants::PANIC_CRITICAL_SERVICE_ERROR,
22    txpool::{self, IncomingTxHandler},
23};
24
25mod core_sync_service;
26mod network_address;
27pub mod request_handler;
28
29pub use network_address::CrossNetworkInternalPeerId;
30
31/// This struct collect all supported and optional network zone interfaces.
32pub struct NetworkInterfaces {
33    /// Mandatory clearnet network interface
34    pub clearnet_network_interface: NetworkInterface<ClearNet>,
35    // ...one can dream for more!
36}
37
38impl NetworkInterfaces {
39    pub const fn new(clearnet_network_interface: NetworkInterface<ClearNet>) -> Self {
40        Self {
41            clearnet_network_interface,
42        }
43    }
44}
45
46/// Initialize all P2P network zones. Returning a [`NetworkInterfaces`] collection and
47/// a [`Vec<Sender<IncomingTxHandler>>`] for propagating the tx handler.
48pub async fn initialize_zones_p2p(
49    config: &Config,
50    context_svc: BlockchainContextService,
51    mut blockchain_write_handle: BlockchainWriteHandle,
52    mut blockchain_read_handle: BlockchainReadHandle,
53    txpool_write_handle: TxpoolWriteHandle,
54    txpool_read_handle: TxpoolReadHandle,
55) -> (NetworkInterfaces, Vec<Sender<IncomingTxHandler>>) {
56    // Start TCP clearnet P2P.
57    let (clearnet, incoming_tx_handler_tx) = start_zone_p2p::<ClearNet, Tcp>(
58        blockchain_read_handle.clone(),
59        context_svc.clone(),
60        txpool_read_handle.clone(),
61        config.clearnet_p2p_config(),
62        (&config.p2p.clear_net).into(),
63    )
64    .await
65    .unwrap();
66
67    // Create network interface collection
68    let network_interfaces = NetworkInterfaces::new(clearnet);
69    let tx_handler_subscribers = vec![incoming_tx_handler_tx];
70
71    (network_interfaces, tx_handler_subscribers)
72}
73
74/// Starts the P2P network zone, returning a [`NetworkInterface`] to interact with it.
75///
76/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network
77/// handshakes can not be completed.
78pub async fn start_zone_p2p<N, T>(
79    blockchain_read_handle: BlockchainReadHandle,
80    blockchain_context_service: BlockchainContextService,
81    txpool_read_handle: TxpoolReadHandle,
82    config: P2PConfig<N>,
83    transport_config: TransportConfig<N, T>,
84) -> Result<(NetworkInterface<N>, Sender<IncomingTxHandler>), tower::BoxError>
85where
86    N: NetworkZone,
87    T: Transport<N>,
88    N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
89    CrossNetworkInternalPeerId: From<InternalPeerID<<N as NetworkZone>::Addr>>,
90{
91    let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel();
92
93    let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker {
94        blockchain_read_handle,
95        blockchain_context_service: blockchain_context_service.clone(),
96        txpool_read_handle,
97        incoming_tx_handler: None,
98        incoming_tx_handler_fut: incoming_tx_handler_rx.shared(),
99    };
100
101    Ok((
102        cuprate_p2p::initialize_network::<N, T, _, _>(
103            request_handler_maker.map_response(|s| s.map_err(Into::into)),
104            core_sync_service::CoreSyncService(blockchain_context_service),
105            config,
106            transport_config,
107        )
108        .await?,
109        incoming_tx_handler_tx,
110    ))
111}