1use std::{
5 future::{ready, Future, Ready},
6 pin::{pin, Pin},
7 task::{ready, Context, Poll},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use futures::Stream;
13use rand::prelude::*;
14use rand_distr::Exp;
15use tokio::{
16 sync::{
17 broadcast::{self, error::TryRecvError},
18 watch,
19 },
20 time::{sleep_until, Instant, Sleep},
21};
22use tokio_stream::wrappers::WatchStream;
23use tower::Service;
24
25use cuprate_p2p_core::{
26 client::InternalPeerID, BroadcastMessage, ConnectionDirection, NetworkZone,
27};
28use cuprate_types::{BlockCompleteEntry, TransactionBlobs};
29use cuprate_wire::protocol::{NewFluffyBlock, NewTransactions};
30
31use crate::constants::{
32 DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND, DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND,
33 MAX_TXS_IN_BROADCAST_CHANNEL, SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT,
34};
35
36#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub(crate) struct BroadcastConfig {
39 pub diffusion_flush_average_seconds_outbound: Duration,
41 pub diffusion_flush_average_seconds_inbound: Duration,
43}
44
45impl Default for BroadcastConfig {
46 fn default() -> Self {
47 Self {
48 diffusion_flush_average_seconds_inbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND,
49 diffusion_flush_average_seconds_outbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND,
50 }
51 }
52}
53
54#[expect(clippy::type_complexity)]
61pub(crate) fn init_broadcast_channels<N: NetworkZone>(
62 config: BroadcastConfig,
63) -> (
64 BroadcastSvc<N>,
65 impl Fn(InternalPeerID<N::Addr>) -> BroadcastMessageStream<N> + Clone + Send + 'static,
66 impl Fn(InternalPeerID<N::Addr>) -> BroadcastMessageStream<N> + Clone + Send + 'static,
67) {
68 let outbound_dist = Exp::new(
69 1.0 / config
70 .diffusion_flush_average_seconds_outbound
71 .as_secs_f64(),
72 )
73 .unwrap();
74 let inbound_dist =
75 Exp::new(1.0 / config.diffusion_flush_average_seconds_inbound.as_secs_f64()).unwrap();
76
77 let (block_watch_sender, block_watch_receiver) = watch::channel(NewBlockInfo {
80 block_bytes: Default::default(),
81 current_blockchain_height: 0,
82 });
83
84 let (tx_broadcast_channel_outbound_sender, tx_broadcast_channel_outbound_receiver) =
86 broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL);
87 let (tx_broadcast_channel_inbound_sender, tx_broadcast_channel_inbound_receiver) =
88 broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL);
89
90 let broadcast_svc = BroadcastSvc {
92 new_block_watch: block_watch_sender,
93 tx_broadcast_channel_outbound: tx_broadcast_channel_outbound_sender,
94 tx_broadcast_channel_inbound: tx_broadcast_channel_inbound_sender,
95 };
96
97 let tx_channel_outbound_receiver_wrapped =
99 CloneableBroadcastReceiver(tx_broadcast_channel_outbound_receiver);
100 let tx_channel_inbound_receiver_wrapped =
101 CloneableBroadcastReceiver(tx_broadcast_channel_inbound_receiver);
102
103 let block_watch_receiver_cloned = block_watch_receiver.clone();
106 let outbound_stream_maker = move |addr| {
107 BroadcastMessageStream::new(
108 addr,
109 outbound_dist,
110 block_watch_receiver_cloned.clone(),
111 tx_channel_outbound_receiver_wrapped.clone().0,
112 )
113 };
114
115 let inbound_stream_maker = move |addr| {
116 BroadcastMessageStream::new(
117 addr,
118 inbound_dist,
119 block_watch_receiver.clone(),
120 tx_channel_inbound_receiver_wrapped.clone().0,
121 )
122 };
123
124 (broadcast_svc, outbound_stream_maker, inbound_stream_maker)
125}
126
127pub enum BroadcastRequest<N: NetworkZone> {
135 Block {
137 block_bytes: Bytes,
139 current_blockchain_height: u64,
141 },
142 Transaction {
146 tx_bytes: Bytes,
148 direction: Option<ConnectionDirection>,
150 received_from: Option<InternalPeerID<N::Addr>>,
152 },
153}
154
155#[derive(Clone)]
156pub struct BroadcastSvc<N: NetworkZone> {
157 new_block_watch: watch::Sender<NewBlockInfo>,
158 tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>,
159 tx_broadcast_channel_inbound: broadcast::Sender<BroadcastTxInfo<N>>,
160}
161
162impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
163 type Response = ();
164 type Error = std::convert::Infallible;
165 type Future = Ready<Result<(), std::convert::Infallible>>;
166
167 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168 Poll::Ready(Ok(()))
169 }
170
171 fn call(&mut self, req: BroadcastRequest<N>) -> Self::Future {
172 match req {
173 BroadcastRequest::Block {
174 block_bytes,
175 current_blockchain_height,
176 } => {
177 tracing::debug!(
178 "queuing block at chain height {current_blockchain_height} for broadcast"
179 );
180
181 self.new_block_watch.send_replace(NewBlockInfo {
182 block_bytes,
183 current_blockchain_height,
184 });
185 }
186 BroadcastRequest::Transaction {
187 tx_bytes,
188 received_from,
189 direction,
190 } => {
191 let nex_tx_info = BroadcastTxInfo {
192 tx: tx_bytes,
193 received_from,
194 };
195
196 drop(match direction {
198 Some(ConnectionDirection::Inbound) => {
199 self.tx_broadcast_channel_inbound.send(nex_tx_info)
200 }
201 Some(ConnectionDirection::Outbound) => {
202 self.tx_broadcast_channel_outbound.send(nex_tx_info)
203 }
204 None => {
205 drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()));
206 self.tx_broadcast_channel_inbound.send(nex_tx_info)
207 }
208 });
209 }
210 }
211
212 ready(Ok(()))
213 }
214}
215
216struct CloneableBroadcastReceiver<T: Clone>(broadcast::Receiver<T>);
221
222impl<T: Clone> Clone for CloneableBroadcastReceiver<T> {
223 fn clone(&self) -> Self {
224 Self(self.0.resubscribe())
225 }
226}
227
228#[derive(Clone)]
230struct NewBlockInfo {
231 block_bytes: Bytes,
233 current_blockchain_height: u64,
235}
236
237#[derive(Clone)]
239struct BroadcastTxInfo<N: NetworkZone> {
240 tx: Bytes,
242 received_from: Option<InternalPeerID<N::Addr>>,
244}
245
246#[pin_project::pin_project]
250pub(crate) struct BroadcastMessageStream<N: NetworkZone> {
251 addr: InternalPeerID<N::Addr>,
253
254 #[pin]
256 new_block_watch: WatchStream<NewBlockInfo>,
257 tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>,
259
260 diffusion_flush_dist: Exp<f64>,
263 #[pin]
265 next_flush: Sleep,
266}
267
268impl<N: NetworkZone> BroadcastMessageStream<N> {
269 fn new(
271 addr: InternalPeerID<N::Addr>,
272 diffusion_flush_dist: Exp<f64>,
273 new_block_watch: watch::Receiver<NewBlockInfo>,
274 tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>,
275 ) -> Self {
276 let next_flush = Instant::now()
277 + Duration::from_secs_f64(diffusion_flush_dist.sample(&mut thread_rng()));
278
279 Self {
280 addr,
281 new_block_watch: WatchStream::from_changes(new_block_watch),
283 tx_broadcast_channel,
284 diffusion_flush_dist,
285 next_flush: sleep_until(next_flush),
286 }
287 }
288}
289
290impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
291 type Item = BroadcastMessage;
292
293 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294 let mut this = self.project();
295
296 if let Poll::Ready(res) = this.new_block_watch.poll_next(cx) {
298 let Some(block) = res else {
299 return Poll::Ready(None);
300 };
301
302 let block_mes = NewFluffyBlock {
303 b: BlockCompleteEntry {
304 pruned: false,
305 block: block.block_bytes,
306 block_weight: 0,
308 txs: TransactionBlobs::None,
309 },
310 current_blockchain_height: block.current_blockchain_height,
311 };
312
313 return Poll::Ready(Some(BroadcastMessage::NewFluffyBlock(block_mes)));
314 }
315
316 ready!(this.next_flush.as_mut().poll(cx));
317
318 let (txs, more_available) = get_txs_to_broadcast::<N>(this.addr, this.tx_broadcast_channel);
319
320 let next_flush = if more_available {
321 Instant::now()
323 } else {
324 Instant::now()
325 + Duration::from_secs_f64(this.diffusion_flush_dist.sample(&mut thread_rng()))
326 };
327
328 let next_flush = sleep_until(next_flush);
329 this.next_flush.set(next_flush);
330
331 if let Some(txs) = txs {
332 tracing::debug!(
333 "Diffusion flush timer expired, diffusing {} txs",
334 txs.txs.len()
335 );
336 Poll::Ready(Some(BroadcastMessage::NewTransactions(txs)))
338 } else {
339 tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
340 #[expect(clippy::let_underscore_must_use)]
343 let _ = this.next_flush.poll(cx);
344 Poll::Pending
345 }
346 }
347}
348
349fn get_txs_to_broadcast<N: NetworkZone>(
352 addr: &InternalPeerID<N::Addr>,
353 broadcast_rx: &mut broadcast::Receiver<BroadcastTxInfo<N>>,
354) -> (Option<NewTransactions>, bool) {
355 let mut new_txs = NewTransactions {
356 txs: vec![],
357 dandelionpp_fluff: true,
358 padding: Bytes::new(),
359 };
360 let mut total_size = 0;
361
362 loop {
363 match broadcast_rx.try_recv() {
364 Ok(txs) => {
365 if txs.received_from.is_some_and(|from| &from == addr) {
366 continue;
368 }
369
370 total_size += txs.tx.len();
371
372 new_txs.txs.push(txs.tx);
373
374 if total_size > SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT {
375 return (Some(new_txs), true);
376 }
377 }
378 Err(e) => match e {
379 TryRecvError::Empty | TryRecvError::Closed => {
380 if new_txs.txs.is_empty() {
381 return (None, false);
382 }
383 return (Some(new_txs), false);
384 }
385 TryRecvError::Lagged(lag) => {
386 tracing::debug!(
387 "{lag} transaction broadcast messages were missed, continuing."
388 );
389 continue;
390 }
391 },
392 }
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use std::{pin::pin, time::Duration};
399
400 use bytes::Bytes;
401 use futures::StreamExt;
402 use tokio::time::timeout;
403 use tower::{Service, ServiceExt};
404
405 use cuprate_p2p_core::{client::InternalPeerID, BroadcastMessage, ConnectionDirection};
406 use cuprate_test_utils::test_netzone::TestNetZone;
407
408 use super::{init_broadcast_channels, BroadcastConfig, BroadcastRequest};
409
410 const TEST_CONFIG: BroadcastConfig = BroadcastConfig {
411 diffusion_flush_average_seconds_outbound: Duration::from_millis(100),
412 diffusion_flush_average_seconds_inbound: Duration::from_millis(200),
413 };
414
415 #[tokio::test]
416 async fn tx_broadcast_direction_correct() {
417 let (mut brcst, outbound_mkr, inbound_mkr) =
418 init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
419
420 let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
421 let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
422
423 brcst
426 .ready()
427 .await
428 .unwrap()
429 .call(BroadcastRequest::Transaction {
430 tx_bytes: Bytes::from_static(&[1]),
431 direction: Some(ConnectionDirection::Outbound),
432 received_from: None,
433 })
434 .await
435 .unwrap();
436
437 brcst
438 .ready()
439 .await
440 .unwrap()
441 .call(BroadcastRequest::Transaction {
442 tx_bytes: Bytes::from_static(&[2]),
443 direction: Some(ConnectionDirection::Inbound),
444 received_from: None,
445 })
446 .await
447 .unwrap();
448
449 brcst
450 .ready()
451 .await
452 .unwrap()
453 .call(BroadcastRequest::Transaction {
454 tx_bytes: Bytes::from_static(&[3]),
455 direction: None,
456 received_from: None,
457 })
458 .await
459 .unwrap();
460
461 let match_tx = |mes, txs| match mes {
462 BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
463 BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
464 };
465
466 let next = outbound_stream.next().await.unwrap();
467 let txs = [Bytes::from_static(&[1]), Bytes::from_static(&[3])];
468 match_tx(next, &txs);
469
470 let next = inbound_stream.next().await.unwrap();
471 match_tx(next, &[Bytes::from_static(&[2]), Bytes::from_static(&[3])]);
472 }
473
474 #[tokio::test]
475 async fn block_broadcast_sent_to_all() {
476 let (mut brcst, outbound_mkr, inbound_mkr) =
477 init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
478
479 let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
480 let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
481
482 brcst
483 .ready()
484 .await
485 .unwrap()
486 .call(BroadcastRequest::Block {
487 block_bytes: Default::default(),
488 current_blockchain_height: 0,
489 })
490 .await
491 .unwrap();
492
493 let next = outbound_stream.next().await.unwrap();
494 assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
495
496 let next = inbound_stream.next().await.unwrap();
497 assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
498 }
499
500 #[tokio::test]
501 async fn tx_broadcast_skipped_for_received_from_peer() {
502 let (mut brcst, outbound_mkr, inbound_mkr) =
503 init_broadcast_channels::<TestNetZone<true>>(TEST_CONFIG);
504
505 let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
506 let mut outbound_stream_from = pin!(outbound_mkr(InternalPeerID::Unknown(0)));
507
508 let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
509 let mut inbound_stream_from = pin!(inbound_mkr(InternalPeerID::Unknown(0)));
510
511 brcst
512 .ready()
513 .await
514 .unwrap()
515 .call(BroadcastRequest::Transaction {
516 tx_bytes: Bytes::from_static(&[1]),
517 direction: None,
518 received_from: Some(InternalPeerID::Unknown(0)),
519 })
520 .await
521 .unwrap();
522
523 let match_tx = |mes, txs| match mes {
524 BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
525 BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
526 };
527
528 let next = outbound_stream.next().await.unwrap();
529 let txs = [Bytes::from_static(&[1])];
530 match_tx(next, &txs);
531
532 let next = inbound_stream.next().await.unwrap();
533 match_tx(next, &[Bytes::from_static(&[1])]);
534
535 assert!(timeout(
537 Duration::from_secs(2),
538 futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
539 )
540 .await
541 .is_err());
542 }
543}