cuprate_p2p/
lib.rs

1//! Cuprate's P2P Crate.
2//!
3//! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on
4//! a certain [`NetworkZone`]
5use std::sync::Arc;
6
7use futures::FutureExt;
8use tokio::{
9    sync::mpsc,
10    task::JoinSet,
11    time::{sleep, Duration},
12};
13use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
14use tracing::{instrument, Instrument, Span};
15
16use cuprate_async_buffer::BufferStream;
17use cuprate_p2p_core::{
18    client::Connector,
19    services::{AddressBookRequest, AddressBookResponse},
20    CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, Transport,
21};
22
23pub mod block_downloader;
24mod broadcast;
25pub mod config;
26pub mod connection_maintainer;
27pub mod constants;
28mod inbound_server;
29mod peer_set;
30
31use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
32pub use broadcast::{BroadcastRequest, BroadcastSvc};
33pub use config::{AddressBookConfig, P2PConfig, TransportConfig};
34use connection_maintainer::MakeConnectionRequest;
35use peer_set::PeerSet;
36pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
37
38/// Interval for checking inbound connection status (1 hour)
39const INBOUND_CONNECTION_MONITOR_INTERVAL: Duration = Duration::from_secs(3600);
40
41/// Monitors for inbound connections and logs a warning if none are detected.
42///
43/// This task runs every hour to check if there are inbound connections available.
44/// If `max_inbound_connections` is 0, the task will exit without logging.
45#[expect(clippy::infinite_loop)]
46async fn inbound_connection_monitor(
47    inbound_semaphore: Arc<tokio::sync::Semaphore>,
48    max_inbound_connections: usize,
49    p2p_port: u16,
50) {
51    // Skip monitoring if inbound connections are disabled
52    if max_inbound_connections == 0 {
53        return;
54    }
55
56    loop {
57        // Wait for the monitoring interval
58        sleep(INBOUND_CONNECTION_MONITOR_INTERVAL).await;
59
60        // Check if we have any inbound connections
61        // If available permits equals max_inbound_connections, no peers are connected
62        let available_permits = inbound_semaphore.available_permits();
63        if available_permits == max_inbound_connections {
64            tracing::warn!(
65                "No incoming connections - check firewalls/routers allow port {}",
66                p2p_port
67            );
68        }
69    }
70}
71
72/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
73///
74/// This function starts all the tasks to maintain/accept/make connections.
75///
76/// # Usage
77/// You must provide:
78/// - A protocol request handler, which is given to each connection
79/// - A core sync service, which keeps track of the sync state of our node
80#[instrument(level = "error", name = "net", skip_all, fields(zone = Z::NAME))]
81pub async fn initialize_network<Z, T, PR, CS>(
82    protocol_request_handler_maker: PR,
83    core_sync_svc: CS,
84    config: P2PConfig<Z>,
85    transport_config: TransportConfig<Z, T>,
86) -> Result<NetworkInterface<Z>, tower::BoxError>
87where
88    Z: NetworkZone,
89    T: Transport<Z>,
90    Z::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
91    PR: ProtocolRequestHandlerMaker<Z> + Clone,
92    CS: CoreSyncSvc + Clone,
93{
94    let address_book =
95        cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
96    let address_book = Buffer::new(
97        address_book,
98        config
99            .max_inbound_connections
100            .checked_add(config.outbound_connections)
101            .unwrap(),
102    );
103
104    // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
105    // this.
106    let (broadcast_svc, outbound_mkr, inbound_mkr) =
107        broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
108
109    let mut basic_node_data = config.basic_node_data();
110
111    if !Z::CHECK_NODE_ID {
112        basic_node_data.peer_id = 1;
113    }
114
115    let outbound_handshaker_builder =
116        cuprate_p2p_core::client::HandshakerBuilder::<Z, T, _, _, _, _>::new(
117            basic_node_data,
118            transport_config.client_config,
119        )
120        .with_address_book(address_book.clone())
121        .with_core_sync_svc(core_sync_svc)
122        .with_protocol_request_handler_maker(protocol_request_handler_maker)
123        .with_broadcast_stream_maker(outbound_mkr)
124        .with_connection_parent_span(Span::current());
125
126    let inbound_handshaker = outbound_handshaker_builder
127        .clone()
128        .with_broadcast_stream_maker(inbound_mkr)
129        .build();
130
131    let outbound_handshaker = outbound_handshaker_builder.build();
132
133    let (new_connection_tx, new_connection_rx) = mpsc::channel(
134        config
135            .outbound_connections
136            .checked_add(config.max_inbound_connections)
137            .unwrap(),
138    );
139    let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
140
141    let outbound_connector = Connector::new(outbound_handshaker);
142    let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
143        config.clone(),
144        new_connection_tx.clone(),
145        make_connection_rx,
146        address_book.clone(),
147        outbound_connector,
148    );
149
150    let peer_set = PeerSet::new(new_connection_rx);
151
152    // Create semaphore for limiting inbound connections and monitoring
153    let inbound_semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_inbound_connections));
154
155    let mut background_tasks = JoinSet::new();
156
157    background_tasks.spawn(
158        outbound_connection_maintainer
159            .run()
160            .instrument(Span::current()),
161    );
162
163    // Spawn inbound connection monitor task
164    background_tasks.spawn(
165        inbound_connection_monitor(
166            Arc::clone(&inbound_semaphore),
167            config.max_inbound_connections,
168            config.p2p_port,
169        )
170        .instrument(tracing::info_span!("inbound_connection_monitor")),
171    );
172
173    background_tasks.spawn(
174        inbound_server::inbound_server(
175            new_connection_tx,
176            inbound_handshaker,
177            address_book.clone(),
178            config,
179            transport_config.server_config,
180            inbound_semaphore,
181        )
182        .map(|res| {
183            if let Err(e) = res {
184                tracing::error!("Error in inbound connection listener: {e}");
185            }
186
187            tracing::info!("Inbound connection listener shutdown");
188        })
189        .instrument(Span::current()),
190    );
191
192    Ok(NetworkInterface {
193        peer_set: Buffer::new(peer_set, 10).boxed_clone(),
194        broadcast_svc,
195        make_connection_tx,
196        address_book: address_book.boxed_clone(),
197        _background_tasks: Arc::new(background_tasks),
198    })
199}
200
201/// The interface to Monero's P2P network on a certain [`NetworkZone`].
202#[derive(Clone)]
203pub struct NetworkInterface<N: NetworkZone> {
204    /// A pool of free connected peers.
205    peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
206    /// A [`Service`] that allows broadcasting to all connected peers.
207    broadcast_svc: BroadcastSvc<N>,
208    /// A channel to request extra connections.
209    #[expect(dead_code, reason = "will be used eventually")]
210    make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
211    /// The address book service.
212    address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
213    /// Background tasks that will be aborted when this interface is dropped.
214    _background_tasks: Arc<JoinSet<()>>,
215}
216
217impl<N: NetworkZone> NetworkInterface<N> {
218    /// Returns a service which allows broadcasting messages to all the connected peers in a specific [`NetworkZone`].
219    pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
220        self.broadcast_svc.clone()
221    }
222
223    /// Starts the block downloader and returns a stream that will yield sequentially downloaded blocks.
224    pub fn block_downloader<C>(
225        &self,
226        our_chain_service: C,
227        config: BlockDownloaderConfig,
228    ) -> BufferStream<BlockBatch>
229    where
230        C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
231            + Send
232            + 'static,
233        C::Future: Send + 'static,
234    {
235        block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
236    }
237
238    /// Returns the address book service.
239    pub fn address_book(
240        &self,
241    ) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
242        self.address_book.clone()
243    }
244
245    /// Borrows the `PeerSet`, for access to connected peers.
246    pub fn peer_set(
247        &mut self,
248    ) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
249        &mut self.peer_set
250    }
251}