cuprate_dandelion_tower/pool/
mod.rs1use std::{
23 collections::HashMap,
24 hash::Hash,
25 marker::PhantomData,
26 task::{Context, Poll},
27};
28
29use futures::{future::BoxFuture, FutureExt};
30use rand_distr::Exp;
31use tokio::{
32 sync::{mpsc, oneshot},
33 task::JoinSet,
34};
35use tokio_util::{sync::PollSender, time::DelayQueue};
36use tower::Service;
37
38use crate::{
39 pool::manager::DandelionPoolShutDown,
40 traits::{TxStoreRequest, TxStoreResponse},
41 DandelionConfig, DandelionRouteReq, DandelionRouterError, State,
42};
43
44mod incoming_tx;
45mod manager;
46
47pub use incoming_tx::{IncomingTx, IncomingTxBuilder};
48pub use manager::DandelionPoolManager;
49
50pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
63 buffer_size: usize,
64 dandelion_router: R,
65 backing_pool: P,
66 config: DandelionConfig,
67) -> DandelionPoolService<Tx, TxId, PeerId>
68where
69 Tx: Clone + Send + 'static,
70 TxId: Hash + Eq + Clone + Send + 'static,
71 PeerId: Hash + Eq + Clone + Send + 'static,
72 P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
73 + Send
74 + 'static,
75 P::Future: Send + 'static,
76 R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
77 + Send
78 + 'static,
79 R::Future: Send + 'static,
80{
81 let (tx, rx) = mpsc::channel(buffer_size);
82
83 let pool = DandelionPoolManager {
84 dandelion_router,
85 backing_pool,
86 routing_set: JoinSet::new(),
87 stem_origins: HashMap::new(),
88 embargo_timers: DelayQueue::new(),
89 embargo_timer_keys: HashMap::new(),
90 embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
91 config,
92 _tx: PhantomData,
93 };
94
95 tokio::spawn(pool.run(rx));
96
97 DandelionPoolService {
98 tx: PollSender::new(tx),
99 }
100}
101
102#[derive(Clone)]
106pub struct DandelionPoolService<Tx, TxId, PeerId> {
107 #[expect(clippy::type_complexity)]
108 tx: PollSender<(
110 (IncomingTx<Tx, TxId, PeerId>, tracing::Span),
111 oneshot::Sender<()>,
112 )>,
113}
114
115impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
116 for DandelionPoolService<Tx, TxId, PeerId>
117where
118 Tx: Clone + Send,
119 TxId: Hash + Eq + Clone + Send + 'static,
120 PeerId: Hash + Eq + Clone + Send + 'static,
121{
122 type Response = ();
123 type Error = DandelionPoolShutDown;
124 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
125
126 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127 self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
128 }
129
130 fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
131 let (tx, rx) = oneshot::channel();
133
134 let res = self
135 .tx
136 .send_item(((req, tracing::Span::current()), tx))
137 .map_err(|_| DandelionPoolShutDown);
138
139 async move {
140 res?;
141 rx.await.expect("Oneshot dropped before response!");
142
143 Ok(())
144 }
145 .boxed()
146 }
147}