cuprated/txpool/
dandelion.rs

1use std::{
2    task::{ready, Poll},
3    time::Duration,
4};
5
6use futures::{future::BoxFuture, FutureExt, TryFutureExt};
7use tower::{Service, ServiceExt};
8
9use tokio::sync::mpsc;
10use tokio_util::sync::PollSender;
11
12use cuprate_dandelion_tower::{
13    pool::DandelionPoolService, traits::StemRequest, DandelionConfig, DandelionRouteReq,
14    DandelionRouter, DandelionRouterError, Graph, State, TxState,
15};
16use cuprate_p2p::NetworkInterface;
17use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone, Tor};
18use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
19
20use crate::{
21    p2p::CrossNetworkInternalPeerId,
22    txpool::incoming_tx::{DandelionTx, TxId},
23};
24
25mod anon_net_service;
26mod diffuse_service;
27mod stem_service;
28mod tx_store;
29
30pub use anon_net_service::AnonTxService;
31pub use diffuse_service::DiffuseService;
32
33/// The configuration used for [`cuprate_dandelion_tower`].
34///
35/// TODO: should we expose this to users of cuprated? probably not.
36const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
37    time_between_hop: Duration::from_millis(175),
38    epoch_duration: Duration::from_secs(10 * 60),
39    fluff_probability: 0.12,
40    graph: Graph::FourRegular,
41};
42
43/// A [`DandelionRouter`] with all generic types defined.
44pub(super) type ConcreteDandelionRouter<Z> = DandelionRouter<
45    stem_service::OutboundPeerStream<Z>,
46    DiffuseService<Z>,
47    CrossNetworkInternalPeerId,
48    stem_service::StemPeerService<Z>,
49    DandelionTx,
50>;
51
52/// The dandelion router used to send transactions to the network.
53pub(super) struct MainDandelionRouter {
54    clearnet_router: ConcreteDandelionRouter<ClearNet>,
55    tor_router: Option<AnonTxService<Tor>>,
56}
57
58impl MainDandelionRouter {
59    pub const fn new(
60        clearnet_router: ConcreteDandelionRouter<ClearNet>,
61        tor_router: Option<AnonTxService<Tor>>,
62    ) -> Self {
63        Self {
64            clearnet_router,
65            tor_router,
66        }
67    }
68}
69
70impl Service<DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>> for MainDandelionRouter {
71    type Response = State;
72    type Error = DandelionRouterError;
73    type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
74
75    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
76        if let Some(tor_router) = self.tor_router.as_mut() {
77            ready!(tor_router.poll_ready(cx))?;
78        }
79
80        self.clearnet_router.poll_ready(cx)
81    }
82
83    fn call(
84        &mut self,
85        req: DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>,
86    ) -> Self::Future {
87        // TODO: is this the best way to use anonymity networks?
88        if req.state == TxState::Local {
89            if let Some(tor_router) = self.tor_router.as_mut() {
90                if let Some(mut peer) = tor_router.peer.take() {
91                    tracing::debug!("routing tx over Tor");
92                    return peer
93                        .call(StemRequest(req.tx))
94                        .map_ok(|_| State::Stem)
95                        .map_err(DandelionRouterError::PeerError)
96                        .boxed();
97                }
98
99                tracing::warn!(
100                    "failed to route tx over Tor, no connections, falling back to Clearnet"
101                );
102            }
103        }
104
105        self.clearnet_router.call(req)
106    }
107}
108
109/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast.
110pub fn start_dandelion_pool_manager(
111    router: MainDandelionRouter,
112    txpool_read_handle: TxpoolReadHandle,
113    promote_tx: mpsc::UnboundedSender<[u8; 32]>,
114) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
115    cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
116        // TODO: make this constant configurable?
117        32,
118        router,
119        tx_store::TxStoreService {
120            txpool_read_handle,
121            promote_tx,
122        },
123        DANDELION_CONFIG,
124    )
125}
126
127/// Creates a [`DandelionRouter`] from a [`NetworkInterface`].
128pub fn dandelion_router<Z: NetworkZone>(
129    network_interface: NetworkInterface<Z>,
130) -> ConcreteDandelionRouter<Z>
131where
132    InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
133{
134    DandelionRouter::new(
135        DiffuseService {
136            clear_net_broadcast_service: network_interface.broadcast_svc(),
137        },
138        stem_service::OutboundPeerStream::<Z>::new(network_interface),
139        DANDELION_CONFIG,
140    )
141}