cuprate_dandelion_tower/pool/
mod.rs

1//! # Dandelion++ Pool
2//!
3//! This module contains [`DandelionPoolManager`] which is a wrapper around a backing transaction store,
4//! which fully implements the dandelion++ protocol.
5//!
6//! The [`DandelionPoolManager`] is a middle man between a [preprocessing stage](#preprocessing-stage) and a dandelion router.
7//! It handles promoting transactions in the stem state to the fluff state and setting embargo timers on stem state transactions.
8//!
9//! ### Preprocessing stage
10//!
11//! The preprocessing stage (not handled in this crate) before giving the transaction to the [`DandelionPoolManager`]
12//! should handle:
13//!
14//! - verifying the tx.
15//! - checking if we have the tx in the pool already and giving that information to the [`IncomingTxBuilder`].
16//! - storing the tx in the pool, if it isn't there already.
17//!
18//! ### Keep Stem Transactions Hidden
19//!
20//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
21//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
22use std::{
23    collections::HashMap,
24    hash::Hash,
25    marker::PhantomData,
26    task::{Context, Poll},
27};
28
29use futures::{future::BoxFuture, FutureExt};
30use rand_distr::Exp;
31use tokio::{
32    sync::{mpsc, oneshot},
33    task::JoinSet,
34};
35use tokio_util::{sync::PollSender, time::DelayQueue};
36use tower::Service;
37
38use crate::{
39    pool::manager::DandelionPoolShutDown,
40    traits::{TxStoreRequest, TxStoreResponse},
41    DandelionConfig, DandelionRouteReq, DandelionRouterError, State,
42};
43
44mod incoming_tx;
45mod manager;
46
47pub use incoming_tx::{IncomingTx, IncomingTxBuilder};
48pub use manager::DandelionPoolManager;
49
50/// Start the [`DandelionPoolManager`].
51///
52/// This function spawns the [`DandelionPoolManager`] and returns [`DandelionPoolService`] which can be used to send
53/// requests to the pool.
54///
55/// ### Args
56///
57/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPoolManager`].
58/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
59///   user to customise routing functionality.
60/// - `backing_pool` is the backing transaction storage service
61/// - `config` is [`DandelionConfig`].
62pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
63    buffer_size: usize,
64    dandelion_router: R,
65    backing_pool: P,
66    config: DandelionConfig,
67) -> DandelionPoolService<Tx, TxId, PeerId>
68where
69    Tx: Clone + Send + 'static,
70    TxId: Hash + Eq + Clone + Send + 'static,
71    PeerId: Hash + Eq + Clone + Send + 'static,
72    P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
73        + Send
74        + 'static,
75    P::Future: Send + 'static,
76    R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
77        + Send
78        + 'static,
79    R::Future: Send + 'static,
80{
81    let (tx, rx) = mpsc::channel(buffer_size);
82
83    let pool = DandelionPoolManager {
84        dandelion_router,
85        backing_pool,
86        routing_set: JoinSet::new(),
87        stem_origins: HashMap::new(),
88        embargo_timers: DelayQueue::new(),
89        embargo_timer_keys: HashMap::new(),
90        embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
91        config,
92        _tx: PhantomData,
93    };
94
95    tokio::spawn(pool.run(rx));
96
97    DandelionPoolService {
98        tx: PollSender::new(tx),
99    }
100}
101
102/// The dandelion pool manager service.
103///
104/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
105#[derive(Clone)]
106pub struct DandelionPoolService<Tx, TxId, PeerId> {
107    #[expect(clippy::type_complexity)]
108    /// The channel to [`DandelionPoolManager`].
109    tx: PollSender<(
110        (IncomingTx<Tx, TxId, PeerId>, tracing::Span),
111        oneshot::Sender<()>,
112    )>,
113}
114
115impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
116    for DandelionPoolService<Tx, TxId, PeerId>
117where
118    Tx: Clone + Send,
119    TxId: Hash + Eq + Clone + Send + 'static,
120    PeerId: Hash + Eq + Clone + Send + 'static,
121{
122    type Response = ();
123    type Error = DandelionPoolShutDown;
124    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
125
126    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127        self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
128    }
129
130    fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
131        // although the channel isn't sending anything we want to wait for the request to be handled before continuing.
132        let (tx, rx) = oneshot::channel();
133
134        let res = self
135            .tx
136            .send_item(((req, tracing::Span::current()), tx))
137            .map_err(|_| DandelionPoolShutDown);
138
139        async move {
140            res?;
141            rx.await.expect("Oneshot dropped before response!");
142
143            Ok(())
144        }
145        .boxed()
146    }
147}