cuprated/txpool/
dandelion.rs
1use std::{
2 task::{ready, Poll},
3 time::Duration,
4};
5
6use futures::{future::BoxFuture, FutureExt, TryFutureExt};
7use tower::{Service, ServiceExt};
8
9use tokio::sync::mpsc;
10use tokio_util::sync::PollSender;
11
12use cuprate_dandelion_tower::{
13 pool::DandelionPoolService, traits::StemRequest, DandelionConfig, DandelionRouteReq,
14 DandelionRouter, DandelionRouterError, Graph, State, TxState,
15};
16use cuprate_p2p::NetworkInterface;
17use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone, Tor};
18use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
19
20use crate::{
21 p2p::CrossNetworkInternalPeerId,
22 txpool::incoming_tx::{DandelionTx, TxId},
23};
24
25mod anon_net_service;
26mod diffuse_service;
27mod stem_service;
28mod tx_store;
29
30pub use anon_net_service::AnonTxService;
31pub use diffuse_service::DiffuseService;
32
33const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
37 time_between_hop: Duration::from_millis(175),
38 epoch_duration: Duration::from_secs(10 * 60),
39 fluff_probability: 0.12,
40 graph: Graph::FourRegular,
41};
42
43pub(super) type ConcreteDandelionRouter<Z> = DandelionRouter<
45 stem_service::OutboundPeerStream<Z>,
46 DiffuseService<Z>,
47 CrossNetworkInternalPeerId,
48 stem_service::StemPeerService<Z>,
49 DandelionTx,
50>;
51
52pub(super) struct MainDandelionRouter {
54 clearnet_router: ConcreteDandelionRouter<ClearNet>,
55 tor_router: Option<AnonTxService<Tor>>,
56}
57
58impl MainDandelionRouter {
59 pub const fn new(
60 clearnet_router: ConcreteDandelionRouter<ClearNet>,
61 tor_router: Option<AnonTxService<Tor>>,
62 ) -> Self {
63 Self {
64 clearnet_router,
65 tor_router,
66 }
67 }
68}
69
70impl Service<DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>> for MainDandelionRouter {
71 type Response = State;
72 type Error = DandelionRouterError;
73 type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
74
75 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
76 if let Some(tor_router) = self.tor_router.as_mut() {
77 ready!(tor_router.poll_ready(cx))?;
78 }
79
80 self.clearnet_router.poll_ready(cx)
81 }
82
83 fn call(
84 &mut self,
85 req: DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>,
86 ) -> Self::Future {
87 if req.state == TxState::Local {
89 if let Some(tor_router) = self.tor_router.as_mut() {
90 if let Some(mut peer) = tor_router.peer.take() {
91 tracing::debug!("routing tx over Tor");
92 return peer
93 .call(StemRequest(req.tx))
94 .map_ok(|_| State::Stem)
95 .map_err(DandelionRouterError::PeerError)
96 .boxed();
97 }
98
99 tracing::warn!(
100 "failed to route tx over Tor, no connections, falling back to Clearnet"
101 );
102 }
103 }
104
105 self.clearnet_router.call(req)
106 }
107}
108
109pub fn start_dandelion_pool_manager(
111 router: MainDandelionRouter,
112 txpool_read_handle: TxpoolReadHandle,
113 promote_tx: mpsc::UnboundedSender<[u8; 32]>,
114) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
115 cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
116 32,
118 router,
119 tx_store::TxStoreService {
120 txpool_read_handle,
121 promote_tx,
122 },
123 DANDELION_CONFIG,
124 )
125}
126
127pub fn dandelion_router<Z: NetworkZone>(
129 network_interface: NetworkInterface<Z>,
130) -> ConcreteDandelionRouter<Z>
131where
132 InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
133{
134 DandelionRouter::new(
135 DiffuseService {
136 clear_net_broadcast_service: network_interface.broadcast_svc(),
137 },
138 stem_service::OutboundPeerStream::<Z>::new(network_interface),
139 DANDELION_CONFIG,
140 )
141}