cuprate_p2p/
lib.rs

1//! Cuprate's P2P Crate.
2//!
3//! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on
4//! a certain [`NetworkZone`]
5use std::sync::Arc;
6
7use futures::FutureExt;
8use tokio::{sync::mpsc, task::JoinSet};
9use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
10use tracing::{instrument, Instrument, Span};
11
12use cuprate_async_buffer::BufferStream;
13use cuprate_p2p_core::{
14    client::Connector,
15    services::{AddressBookRequest, AddressBookResponse},
16    CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, Transport,
17};
18
19pub mod block_downloader;
20mod broadcast;
21pub mod config;
22pub mod connection_maintainer;
23pub mod constants;
24mod inbound_server;
25mod peer_set;
26
27use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
28pub use broadcast::{BroadcastRequest, BroadcastSvc};
29pub use config::{AddressBookConfig, P2PConfig, TransportConfig};
30use connection_maintainer::MakeConnectionRequest;
31use peer_set::PeerSet;
32pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
33
34/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
35///
36/// This function starts all the tasks to maintain/accept/make connections.
37///
38/// # Usage
39/// You must provide:
40/// - A protocol request handler, which is given to each connection
41/// - A core sync service, which keeps track of the sync state of our node
42#[instrument(level = "debug", name = "net", skip_all, fields(zone = Z::NAME))]
43pub async fn initialize_network<Z, T, PR, CS>(
44    protocol_request_handler_maker: PR,
45    core_sync_svc: CS,
46    config: P2PConfig<Z>,
47    transport_config: TransportConfig<Z, T>,
48) -> Result<NetworkInterface<Z>, tower::BoxError>
49where
50    Z: NetworkZone,
51    T: Transport<Z>,
52    Z::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
53    PR: ProtocolRequestHandlerMaker<Z> + Clone,
54    CS: CoreSyncSvc + Clone,
55{
56    let address_book =
57        cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
58    let address_book = Buffer::new(
59        address_book,
60        config
61            .max_inbound_connections
62            .checked_add(config.outbound_connections)
63            .unwrap(),
64    );
65
66    // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
67    // this.
68    let (broadcast_svc, outbound_mkr, inbound_mkr) =
69        broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
70
71    let mut basic_node_data = config.basic_node_data();
72
73    if !Z::CHECK_NODE_ID {
74        basic_node_data.peer_id = 1;
75    }
76
77    let outbound_handshaker_builder =
78        cuprate_p2p_core::client::HandshakerBuilder::<Z, T, _, _, _, _>::new(
79            basic_node_data,
80            transport_config.client_config,
81        )
82        .with_address_book(address_book.clone())
83        .with_core_sync_svc(core_sync_svc)
84        .with_protocol_request_handler_maker(protocol_request_handler_maker)
85        .with_broadcast_stream_maker(outbound_mkr)
86        .with_connection_parent_span(Span::current());
87
88    let inbound_handshaker = outbound_handshaker_builder
89        .clone()
90        .with_broadcast_stream_maker(inbound_mkr)
91        .build();
92
93    let outbound_handshaker = outbound_handshaker_builder.build();
94
95    let (new_connection_tx, new_connection_rx) = mpsc::channel(
96        config
97            .outbound_connections
98            .checked_add(config.max_inbound_connections)
99            .unwrap(),
100    );
101    let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
102
103    let outbound_connector = Connector::new(outbound_handshaker);
104    let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
105        config.clone(),
106        new_connection_tx.clone(),
107        make_connection_rx,
108        address_book.clone(),
109        outbound_connector,
110    );
111
112    let peer_set = PeerSet::new(new_connection_rx);
113
114    let mut background_tasks = JoinSet::new();
115
116    background_tasks.spawn(
117        outbound_connection_maintainer
118            .run()
119            .instrument(Span::current()),
120    );
121    background_tasks.spawn(
122        inbound_server::inbound_server(
123            new_connection_tx,
124            inbound_handshaker,
125            address_book.clone(),
126            config,
127            transport_config.server_config,
128        )
129        .map(|res| {
130            if let Err(e) = res {
131                tracing::error!("Error in inbound connection listener: {e}");
132            }
133
134            tracing::info!("Inbound connection listener shutdown");
135        })
136        .instrument(Span::current()),
137    );
138
139    Ok(NetworkInterface {
140        peer_set: Buffer::new(peer_set, 10).boxed_clone(),
141        broadcast_svc,
142        make_connection_tx,
143        address_book: address_book.boxed_clone(),
144        _background_tasks: Arc::new(background_tasks),
145    })
146}
147
148/// The interface to Monero's P2P network on a certain [`NetworkZone`].
149#[derive(Clone)]
150pub struct NetworkInterface<N: NetworkZone> {
151    /// A pool of free connected peers.
152    peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
153    /// A [`Service`] that allows broadcasting to all connected peers.
154    broadcast_svc: BroadcastSvc<N>,
155    /// A channel to request extra connections.
156    #[expect(dead_code, reason = "will be used eventually")]
157    make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
158    /// The address book service.
159    address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
160    /// Background tasks that will be aborted when this interface is dropped.
161    _background_tasks: Arc<JoinSet<()>>,
162}
163
164impl<N: NetworkZone> NetworkInterface<N> {
165    /// Returns a service which allows broadcasting messages to all the connected peers in a specific [`NetworkZone`].
166    pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
167        self.broadcast_svc.clone()
168    }
169
170    /// Starts the block downloader and returns a stream that will yield sequentially downloaded blocks.
171    pub fn block_downloader<C>(
172        &self,
173        our_chain_service: C,
174        config: BlockDownloaderConfig,
175    ) -> BufferStream<BlockBatch>
176    where
177        C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
178            + Send
179            + 'static,
180        C::Future: Send + 'static,
181    {
182        block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
183    }
184
185    /// Returns the address book service.
186    pub fn address_book(
187        &self,
188    ) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
189        self.address_book.clone()
190    }
191
192    /// Borrows the `PeerSet`, for access to connected peers.
193    pub fn peer_set(
194        &mut self,
195    ) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
196        &mut self.peer_set
197    }
198}