cuprated/
p2p.rs

1//! P2P
2//!
3//! Will handle initiating the P2P and contains a protocol request handler.
4
5use std::convert::From;
6
7use arti_client::TorClient;
8use futures::{FutureExt, TryFutureExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::oneshot::{self, Sender};
11use tor_rtcompat::PreferredRuntime;
12use tower::{Service, ServiceExt};
13
14use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
15use cuprate_consensus::BlockchainContextService;
16use cuprate_p2p::{config::TransportConfig, NetworkInterface, P2PConfig};
17use cuprate_p2p_core::{
18    client::InternalPeerID, transports::Tcp, ClearNet, NetworkZone, Tor, Transport,
19};
20use cuprate_p2p_transport::{Arti, ArtiClientConfig, Daemon};
21use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
22use cuprate_types::blockchain::BlockchainWriteRequest;
23
24use crate::{
25    blockchain,
26    config::Config,
27    constants::PANIC_CRITICAL_SERVICE_ERROR,
28    tor::{
29        transport_arti_config, transport_clearnet_arti_config, transport_daemon_config, TorContext,
30        TorMode,
31    },
32    txpool::{self, IncomingTxHandler},
33};
34
35mod core_sync_service;
36mod network_address;
37pub mod request_handler;
38
39pub use network_address::CrossNetworkInternalPeerId;
40
41/// A simple parsing enum for the `p2p.clear_net.proxy` field
42#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
43pub enum ProxySettings {
44    Tor,
45    #[serde(untagged)]
46    Socks(String),
47}
48
49/// This struct collect all supported and optional network zone interfaces.
50pub struct NetworkInterfaces {
51    /// Mandatory clearnet network interface
52    pub clearnet_network_interface: NetworkInterface<ClearNet>,
53    /// Optional tor network interface
54    pub tor_network_interface: Option<NetworkInterface<Tor>>,
55    // ...one can dream for more!
56}
57
58impl NetworkInterfaces {
59    pub const fn new(clearnet_network_interface: NetworkInterface<ClearNet>) -> Self {
60        Self {
61            clearnet_network_interface,
62            tor_network_interface: None,
63        }
64    }
65}
66
67/// Initialize all P2P network zones. Returning a [`NetworkInterfaces`] collection and
68/// a [`Vec<Sender<IncomingTxHandler>>`] for propagating the tx handler.
69pub async fn initialize_zones_p2p(
70    config: &Config,
71    context_svc: BlockchainContextService,
72    mut blockchain_read_handle: BlockchainReadHandle,
73    txpool_read_handle: TxpoolReadHandle,
74    tor_ctx: TorContext,
75) -> (NetworkInterfaces, Vec<Sender<IncomingTxHandler>>) {
76    // Start clearnet P2P.
77    let (clearnet, incoming_tx_handler_tx) = {
78        // If proxy is set
79        match config.p2p.clear_net.proxy {
80            ProxySettings::Tor => match tor_ctx.mode {
81                TorMode::Arti => {
82                    tracing::info!("Anonymizing clearnet connections through Arti.");
83                    start_zone_p2p::<ClearNet, Arti>(
84                        blockchain_read_handle.clone(),
85                        context_svc.clone(),
86                        txpool_read_handle.clone(),
87                        config.clearnet_p2p_config(),
88                        transport_clearnet_arti_config(&tor_ctx),
89                    )
90                    .await
91                    .unwrap()
92                }
93                TorMode::Daemon => {
94                    tracing::error!("Anonymizing clearnet connections through the Tor daemon is not yet supported.");
95                    std::process::exit(0);
96                }
97                TorMode::Off => {
98                    tracing::error!("Clearnet proxy set to \"tor\" but Tor is actually off. Please be sure to set a mode in the configuration or command line");
99                    std::process::exit(0);
100                }
101            },
102            ProxySettings::Socks(ref s) => {
103                if !s.is_empty() {
104                    tracing::error!("Socks proxy is not yet supported.");
105                    std::process::exit(0);
106                }
107
108                start_zone_p2p::<ClearNet, Tcp>(
109                    blockchain_read_handle.clone(),
110                    context_svc.clone(),
111                    txpool_read_handle.clone(),
112                    config.clearnet_p2p_config(),
113                    (&config.p2p.clear_net).into(),
114                )
115                .await
116                .unwrap()
117            }
118        }
119    };
120
121    // Create network interface collection
122    let mut network_interfaces = NetworkInterfaces::new(clearnet);
123    let mut tx_handler_subscribers = vec![incoming_tx_handler_tx];
124
125    // Start Tor P2P (if enabled)
126    let tor = if config.p2p.tor_net.enabled {
127        match tor_ctx.mode {
128            TorMode::Off => None,
129            TorMode::Daemon => Some(
130                start_zone_p2p::<Tor, Daemon>(
131                    blockchain_read_handle.clone(),
132                    context_svc.clone(),
133                    txpool_read_handle.clone(),
134                    config.tor_p2p_config(&tor_ctx),
135                    transport_daemon_config(config),
136                )
137                .await
138                .unwrap(),
139            ),
140            TorMode::Arti => Some(
141                start_zone_p2p::<Tor, Arti>(
142                    blockchain_read_handle.clone(),
143                    context_svc.clone(),
144                    txpool_read_handle.clone(),
145                    config.tor_p2p_config(&tor_ctx),
146                    transport_arti_config(config, tor_ctx),
147                )
148                .await
149                .unwrap(),
150            ),
151        }
152    } else {
153        None
154    };
155    if let Some((tor, incoming_tx_handler_tx)) = tor {
156        network_interfaces.tor_network_interface = Some(tor);
157        tx_handler_subscribers.push(incoming_tx_handler_tx);
158    }
159
160    (network_interfaces, tx_handler_subscribers)
161}
162
163/// Starts the P2P network zone, returning a [`NetworkInterface`] to interact with it.
164///
165/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network
166/// handshakes can not be completed.
167pub async fn start_zone_p2p<N, T>(
168    blockchain_read_handle: BlockchainReadHandle,
169    blockchain_context_service: BlockchainContextService,
170    txpool_read_handle: TxpoolReadHandle,
171    config: P2PConfig<N>,
172    transport_config: TransportConfig<N, T>,
173) -> Result<(NetworkInterface<N>, Sender<IncomingTxHandler>), tower::BoxError>
174where
175    N: NetworkZone,
176    T: Transport<N>,
177    N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
178    CrossNetworkInternalPeerId: From<InternalPeerID<<N as NetworkZone>::Addr>>,
179{
180    let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel();
181
182    let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker {
183        blockchain_read_handle,
184        blockchain_context_service: blockchain_context_service.clone(),
185        txpool_read_handle,
186        incoming_tx_handler: None,
187        incoming_tx_handler_fut: incoming_tx_handler_rx.shared(),
188    };
189
190    Ok((
191        cuprate_p2p::initialize_network::<N, T, _, _>(
192            request_handler_maker.map_response(|s| s.map_err(Into::into)),
193            core_sync_service::CoreSyncService(blockchain_context_service),
194            config,
195            transport_config,
196        )
197        .await?,
198        incoming_tx_handler_tx,
199    ))
200}