cuprate_p2p/
broadcast.rs

1//! # Broadcast Router
2//!
3//! This module handles broadcasting messages to multiple peers with the [`BroadcastSvc`].
4use std::{
5    future::{ready, Future, Ready},
6    pin::{pin, Pin},
7    task::{ready, Context, Poll},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use futures::Stream;
13use rand::prelude::*;
14use rand_distr::Exp;
15use tokio::{
16    sync::{
17        broadcast::{self, error::TryRecvError},
18        watch,
19    },
20    time::{sleep_until, Instant, Sleep},
21};
22use tokio_stream::wrappers::WatchStream;
23use tower::Service;
24
25use cuprate_p2p_core::{
26    client::InternalPeerID, BroadcastMessage, ConnectionDirection, NetworkZone,
27};
28use cuprate_types::{BlockCompleteEntry, TransactionBlobs};
29use cuprate_wire::protocol::{NewFluffyBlock, NewTransactions};
30
31use crate::constants::{
32    DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND, DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND,
33    MAX_TXS_IN_BROADCAST_CHANNEL, SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT,
34};
35
36/// The configuration for the [`BroadcastSvc`].
37#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub(crate) struct BroadcastConfig {
39    /// The average number of seconds between diffusion flushes for outbound connections.
40    pub diffusion_flush_average_seconds_outbound: Duration,
41    /// The average number of seconds between diffusion flushes for inbound connections.
42    pub diffusion_flush_average_seconds_inbound: Duration,
43}
44
45impl Default for BroadcastConfig {
46    fn default() -> Self {
47        Self {
48            diffusion_flush_average_seconds_inbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND,
49            diffusion_flush_average_seconds_outbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND,
50        }
51    }
52}
53
54/// Initialise the [`BroadcastSvc`] and the functions to produce [`BroadcastMessageStream`]s.
55///
56/// This function will return in order:
57/// - The [`BroadcastSvc`]
58/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers.
59/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers.
60#[expect(clippy::type_complexity)]
61pub(crate) fn init_broadcast_channels<N: NetworkZone>(
62    config: BroadcastConfig,
63) -> (
64    BroadcastSvc<N>,
65    impl Fn(InternalPeerID<N::Addr>) -> BroadcastMessageStream<N> + Clone + Send + 'static,
66    impl Fn(InternalPeerID<N::Addr>) -> BroadcastMessageStream<N> + Clone + Send + 'static,
67) {
68    let outbound_dist = Exp::new(
69        1.0 / config
70            .diffusion_flush_average_seconds_outbound
71            .as_secs_f64(),
72    )
73    .unwrap();
74    let inbound_dist =
75        Exp::new(1.0 / config.diffusion_flush_average_seconds_inbound.as_secs_f64()).unwrap();
76
77    // Set a default value for init - the broadcast streams given to the peer tasks will only broadcast from this channel when the value
78    // changes so no peer will get sent this.
79    let (block_watch_sender, block_watch_receiver) = watch::channel(NewBlockInfo {
80        block_bytes: Default::default(),
81        current_blockchain_height: 0,
82    });
83
84    // create the inbound/outbound broadcast channels.
85    let (tx_broadcast_channel_outbound_sender, tx_broadcast_channel_outbound_receiver) =
86        broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL);
87    let (tx_broadcast_channel_inbound_sender, tx_broadcast_channel_inbound_receiver) =
88        broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL);
89
90    // create the broadcast service.
91    let broadcast_svc = BroadcastSvc {
92        new_block_watch: block_watch_sender,
93        tx_broadcast_channel_outbound: tx_broadcast_channel_outbound_sender,
94        tx_broadcast_channel_inbound: tx_broadcast_channel_inbound_sender,
95    };
96
97    // wrap the tx broadcast channels in a wrapper that impls Clone so the closures later on impl clone.
98    let tx_channel_outbound_receiver_wrapped =
99        CloneableBroadcastReceiver(tx_broadcast_channel_outbound_receiver);
100    let tx_channel_inbound_receiver_wrapped =
101        CloneableBroadcastReceiver(tx_broadcast_channel_inbound_receiver);
102
103    // Create the closures that will be used to start the broadcast streams that the connection task will hold to listen
104    // for messages to broadcast.
105    let block_watch_receiver_cloned = block_watch_receiver.clone();
106    let outbound_stream_maker = move |addr| {
107        BroadcastMessageStream::new(
108            addr,
109            outbound_dist,
110            block_watch_receiver_cloned.clone(),
111            tx_channel_outbound_receiver_wrapped.clone().0,
112        )
113    };
114
115    let inbound_stream_maker = move |addr| {
116        BroadcastMessageStream::new(
117            addr,
118            inbound_dist,
119            block_watch_receiver.clone(),
120            tx_channel_inbound_receiver_wrapped.clone().0,
121        )
122    };
123
124    (broadcast_svc, outbound_stream_maker, inbound_stream_maker)
125}
126
127/// A request to broadcast some data to all connected peers or a sub-set like all inbound or all outbound.
128///
129/// Only certain P2P messages are supported here: [`NewFluffyBlock`] and [`NewTransactions`]. These are the only
130/// P2P messages that make sense to broadcast to multiple peers.
131///
132/// [`NewBlock`](cuprate_wire::protocol::NewBlock) has been excluded as monerod has had fluffy blocks for a while and
133/// Cuprate sets fluffy blocks as a requirement during handshakes.
134pub enum BroadcastRequest<N: NetworkZone> {
135    /// Broadcast a block to the network. The block will be broadcast as a fluffy block to all peers.
136    Block {
137        /// The block.
138        block_bytes: Bytes,
139        /// The current chain height - will be 1 more than the blocks' height.
140        current_blockchain_height: u64,
141    },
142    /// Broadcast transactions to the network. If a [`ConnectionDirection`] is set the transaction
143    /// will only be broadcast to that sub-set of peers, if it is [`None`] then the transaction will
144    /// be broadcast to all peers.
145    Transaction {
146        /// The serialised tx to broadcast.
147        tx_bytes: Bytes,
148        /// The direction of peers to broadcast this tx to, if [`None`] it will be sent to all peers.
149        direction: Option<ConnectionDirection>,
150        /// The peer on this network that told us about the tx.
151        received_from: Option<InternalPeerID<N::Addr>>,
152    },
153}
154
155#[derive(Clone)]
156pub struct BroadcastSvc<N: NetworkZone> {
157    new_block_watch: watch::Sender<NewBlockInfo>,
158    tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>,
159    tx_broadcast_channel_inbound: broadcast::Sender<BroadcastTxInfo<N>>,
160}
161
162impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
163    type Response = ();
164    type Error = std::convert::Infallible;
165    type Future = Ready<Result<(), std::convert::Infallible>>;
166
167    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168        Poll::Ready(Ok(()))
169    }
170
171    fn call(&mut self, req: BroadcastRequest<N>) -> Self::Future {
172        match req {
173            BroadcastRequest::Block {
174                block_bytes,
175                current_blockchain_height,
176            } => {
177                tracing::debug!(
178                    "queuing block at chain height {current_blockchain_height} for broadcast"
179                );
180
181                self.new_block_watch.send_replace(NewBlockInfo {
182                    block_bytes,
183                    current_blockchain_height,
184                });
185            }
186            BroadcastRequest::Transaction {
187                tx_bytes,
188                received_from,
189                direction,
190            } => {
191                let nex_tx_info = BroadcastTxInfo {
192                    tx: tx_bytes,
193                    received_from,
194                };
195
196                // An error here means _all_ receivers were dropped which we assume will never happen.
197                drop(match direction {
198                    Some(ConnectionDirection::Inbound) => {
199                        self.tx_broadcast_channel_inbound.send(nex_tx_info)
200                    }
201                    Some(ConnectionDirection::Outbound) => {
202                        self.tx_broadcast_channel_outbound.send(nex_tx_info)
203                    }
204                    None => {
205                        drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()));
206                        self.tx_broadcast_channel_inbound.send(nex_tx_info)
207                    }
208                });
209            }
210        }
211
212        ready(Ok(()))
213    }
214}
215
216/// A wrapper type that impls [`Clone`] for [`broadcast::Receiver`].
217///
218/// The clone impl just calls [`Receiver::resubscribe`](broadcast::Receiver::resubscribe), which isn't _exactly_
219/// a clone but is what we need for our use case.
220struct CloneableBroadcastReceiver<T: Clone>(broadcast::Receiver<T>);
221
222impl<T: Clone> Clone for CloneableBroadcastReceiver<T> {
223    fn clone(&self) -> Self {
224        Self(self.0.resubscribe())
225    }
226}
227
228/// A new block to broadcast.
229#[derive(Clone)]
230struct NewBlockInfo {
231    /// The block.
232    block_bytes: Bytes,
233    /// The current chain height - will be 1 more than the blocks' height.
234    current_blockchain_height: u64,
235}
236
237/// A new transaction to broadcast.
238#[derive(Clone)]
239struct BroadcastTxInfo<N: NetworkZone> {
240    /// The tx.
241    tx: Bytes,
242    /// The peer that sent us this tx (if the peer is on this network).
243    received_from: Option<InternalPeerID<N::Addr>>,
244}
245
246/// A [`Stream`] that returns [`BroadcastMessage`] to broadcast to a peer.
247///
248/// This is given to the connection task to await on for broadcast messages.
249#[pin_project::pin_project]
250pub(crate) struct BroadcastMessageStream<N: NetworkZone> {
251    /// The peer that is holding this stream.
252    addr: InternalPeerID<N::Addr>,
253
254    /// The channel where new blocks are received.
255    #[pin]
256    new_block_watch: WatchStream<NewBlockInfo>,
257    /// The channel where txs to broadcast are received.
258    tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>,
259
260    /// The distribution to generate the wait time before the next transaction
261    /// diffusion flush.
262    diffusion_flush_dist: Exp<f64>,
263    /// A [`Sleep`] that will awake when it's time to broadcast txs.
264    #[pin]
265    next_flush: Sleep,
266}
267
268impl<N: NetworkZone> BroadcastMessageStream<N> {
269    /// Creates a new [`BroadcastMessageStream`]
270    fn new(
271        addr: InternalPeerID<N::Addr>,
272        diffusion_flush_dist: Exp<f64>,
273        new_block_watch: watch::Receiver<NewBlockInfo>,
274        tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>,
275    ) -> Self {
276        let next_flush = Instant::now()
277            + Duration::from_secs_f64(diffusion_flush_dist.sample(&mut thread_rng()));
278
279        Self {
280            addr,
281            // We don't want to broadcast the message currently in the queue.
282            new_block_watch: WatchStream::from_changes(new_block_watch),
283            tx_broadcast_channel,
284            diffusion_flush_dist,
285            next_flush: sleep_until(next_flush),
286        }
287    }
288}
289
290impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
291    type Item = BroadcastMessage;
292
293    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294        let mut this = self.project();
295
296        // Prioritise blocks.
297        if let Poll::Ready(res) = this.new_block_watch.poll_next(cx) {
298            let Some(block) = res else {
299                return Poll::Ready(None);
300            };
301
302            let block_mes = NewFluffyBlock {
303                b: BlockCompleteEntry {
304                    pruned: false,
305                    block: block.block_bytes,
306                    // This is a full fluffy block these values do not need to be set.
307                    block_weight: 0,
308                    txs: TransactionBlobs::None,
309                },
310                current_blockchain_height: block.current_blockchain_height,
311            };
312
313            return Poll::Ready(Some(BroadcastMessage::NewFluffyBlock(block_mes)));
314        }
315
316        ready!(this.next_flush.as_mut().poll(cx));
317
318        let (txs, more_available) = get_txs_to_broadcast::<N>(this.addr, this.tx_broadcast_channel);
319
320        let next_flush = if more_available {
321            // If there are more txs to broadcast then set the next flush for now so we get woken up straight away.
322            Instant::now()
323        } else {
324            Instant::now()
325                + Duration::from_secs_f64(this.diffusion_flush_dist.sample(&mut thread_rng()))
326        };
327
328        let next_flush = sleep_until(next_flush);
329        this.next_flush.set(next_flush);
330
331        if let Some(txs) = txs {
332            tracing::debug!(
333                "Diffusion flush timer expired, diffusing {} txs",
334                txs.txs.len()
335            );
336            // no need to poll next_flush as we are ready now.
337            Poll::Ready(Some(BroadcastMessage::NewTransactions(txs)))
338        } else {
339            tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
340            // poll next_flush now to register the waker with it.
341            // the waker will already be registered with the block broadcast channel.
342            #[expect(clippy::let_underscore_must_use)]
343            let _ = this.next_flush.poll(cx);
344            Poll::Pending
345        }
346    }
347}
348
349/// Returns a list of new transactions to broadcast and a [`bool`] for if there are more txs in the queue
350/// that won't fit in the current batch.
351fn get_txs_to_broadcast<N: NetworkZone>(
352    addr: &InternalPeerID<N::Addr>,
353    broadcast_rx: &mut broadcast::Receiver<BroadcastTxInfo<N>>,
354) -> (Option<NewTransactions>, bool) {
355    let mut new_txs = NewTransactions {
356        txs: vec![],
357        dandelionpp_fluff: true,
358        padding: Bytes::new(),
359    };
360    let mut total_size = 0;
361
362    loop {
363        match broadcast_rx.try_recv() {
364            Ok(txs) => {
365                if txs.received_from.is_some_and(|from| &from == addr) {
366                    // If we are the one that sent this tx don't broadcast it back to us.
367                    continue;
368                }
369
370                total_size += txs.tx.len();
371
372                new_txs.txs.push(txs.tx);
373
374                if total_size > SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT {
375                    return (Some(new_txs), true);
376                }
377            }
378            Err(e) => match e {
379                TryRecvError::Empty | TryRecvError::Closed => {
380                    if new_txs.txs.is_empty() {
381                        return (None, false);
382                    }
383                    return (Some(new_txs), false);
384                }
385                TryRecvError::Lagged(lag) => {
386                    tracing::debug!(
387                        "{lag} transaction broadcast messages were missed, continuing."
388                    );
389                    continue;
390                }
391            },
392        }
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use std::{pin::pin, time::Duration};
399
400    use bytes::Bytes;
401    use futures::StreamExt;
402    use tokio::time::timeout;
403    use tower::{Service, ServiceExt};
404
405    use cuprate_p2p_core::{client::InternalPeerID, BroadcastMessage, ConnectionDirection};
406    use cuprate_test_utils::test_netzone::TestNetZone;
407
408    use super::{init_broadcast_channels, BroadcastConfig, BroadcastRequest};
409
410    const TEST_CONFIG: BroadcastConfig = BroadcastConfig {
411        diffusion_flush_average_seconds_outbound: Duration::from_millis(100),
412        diffusion_flush_average_seconds_inbound: Duration::from_millis(200),
413    };
414
415    #[tokio::test]
416    async fn tx_broadcast_direction_correct() {
417        let (mut brcst, outbound_mkr, inbound_mkr) =
418            init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
419
420        let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
421        let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
422
423        // Outbound should get 1 and 3, inbound should get 2 and 3.
424
425        brcst
426            .ready()
427            .await
428            .unwrap()
429            .call(BroadcastRequest::Transaction {
430                tx_bytes: Bytes::from_static(&[1]),
431                direction: Some(ConnectionDirection::Outbound),
432                received_from: None,
433            })
434            .await
435            .unwrap();
436
437        brcst
438            .ready()
439            .await
440            .unwrap()
441            .call(BroadcastRequest::Transaction {
442                tx_bytes: Bytes::from_static(&[2]),
443                direction: Some(ConnectionDirection::Inbound),
444                received_from: None,
445            })
446            .await
447            .unwrap();
448
449        brcst
450            .ready()
451            .await
452            .unwrap()
453            .call(BroadcastRequest::Transaction {
454                tx_bytes: Bytes::from_static(&[3]),
455                direction: None,
456                received_from: None,
457            })
458            .await
459            .unwrap();
460
461        let match_tx = |mes, txs| match mes {
462            BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
463            BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
464        };
465
466        let next = outbound_stream.next().await.unwrap();
467        let txs = [Bytes::from_static(&[1]), Bytes::from_static(&[3])];
468        match_tx(next, &txs);
469
470        let next = inbound_stream.next().await.unwrap();
471        match_tx(next, &[Bytes::from_static(&[2]), Bytes::from_static(&[3])]);
472    }
473
474    #[tokio::test]
475    async fn block_broadcast_sent_to_all() {
476        let (mut brcst, outbound_mkr, inbound_mkr) =
477            init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
478
479        let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
480        let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
481
482        brcst
483            .ready()
484            .await
485            .unwrap()
486            .call(BroadcastRequest::Block {
487                block_bytes: Default::default(),
488                current_blockchain_height: 0,
489            })
490            .await
491            .unwrap();
492
493        let next = outbound_stream.next().await.unwrap();
494        assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
495
496        let next = inbound_stream.next().await.unwrap();
497        assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
498    }
499
500    #[tokio::test]
501    async fn tx_broadcast_skipped_for_received_from_peer() {
502        let (mut brcst, outbound_mkr, inbound_mkr) =
503            init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
504
505        let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
506        let mut outbound_stream_from = pin!(outbound_mkr(InternalPeerID::Unknown(0)));
507
508        let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
509        let mut inbound_stream_from = pin!(inbound_mkr(InternalPeerID::Unknown(0)));
510
511        brcst
512            .ready()
513            .await
514            .unwrap()
515            .call(BroadcastRequest::Transaction {
516                tx_bytes: Bytes::from_static(&[1]),
517                direction: None,
518                received_from: Some(InternalPeerID::Unknown(0)),
519            })
520            .await
521            .unwrap();
522
523        let match_tx = |mes, txs| match mes {
524            BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
525            BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
526        };
527
528        let next = outbound_stream.next().await.unwrap();
529        let txs = [Bytes::from_static(&[1])];
530        match_tx(next, &txs);
531
532        let next = inbound_stream.next().await.unwrap();
533        match_tx(next, &[Bytes::from_static(&[1])]);
534
535        // Make sure the streams with the same id as the one we said sent the tx do not get the tx to broadcast.
536        assert!(timeout(
537            Duration::from_secs(2),
538            futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
539        )
540        .await
541        .is_err());
542    }
543}