cuprate_dandelion_tower/pool/
mod.rs1use std::{
23 collections::HashMap,
24 hash::Hash,
25 marker::PhantomData,
26 task::{Context, Poll},
27};
28
29use futures::{future::BoxFuture, FutureExt};
30use rand_distr::Exp;
31use tokio::{
32 sync::{mpsc, oneshot},
33 task::JoinSet,
34};
35use tokio_util::{sync::PollSender, time::DelayQueue};
36use tower::Service;
37use tracing::Instrument;
38
39use crate::{
40 pool::manager::DandelionPoolShutDown,
41 traits::{TxStoreRequest, TxStoreResponse},
42 DandelionConfig, DandelionRouteReq, DandelionRouterError, State,
43};
44
45mod incoming_tx;
46mod manager;
47
48pub use incoming_tx::{IncomingTx, IncomingTxBuilder};
49pub use manager::DandelionPoolManager;
50
51pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
64 buffer_size: usize,
65 dandelion_router: R,
66 backing_pool: P,
67 config: DandelionConfig,
68) -> DandelionPoolService<Tx, TxId, PeerId>
69where
70 Tx: Clone + Send + 'static,
71 TxId: Hash + Eq + Clone + Send + 'static,
72 PeerId: Hash + Eq + Clone + Send + 'static,
73 P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
74 + Send
75 + 'static,
76 P::Future: Send + 'static,
77 R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
78 + Send
79 + 'static,
80 R::Future: Send + 'static,
81{
82 let (tx, rx) = mpsc::channel(buffer_size);
83
84 let pool = DandelionPoolManager {
85 dandelion_router,
86 backing_pool,
87 routing_set: JoinSet::new(),
88 stem_origins: HashMap::new(),
89 embargo_timers: DelayQueue::new(),
90 embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
91 config,
92 _tx: PhantomData,
93 };
94
95 let span = tracing::debug_span!("dandelion_pool");
96
97 tokio::spawn(pool.run(rx).instrument(span));
98
99 DandelionPoolService {
100 tx: PollSender::new(tx),
101 }
102}
103
104#[derive(Clone)]
108pub struct DandelionPoolService<Tx, TxId, PeerId> {
109 tx: PollSender<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
111}
112
113impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
114 for DandelionPoolService<Tx, TxId, PeerId>
115where
116 Tx: Clone + Send,
117 TxId: Hash + Eq + Clone + Send + 'static,
118 PeerId: Hash + Eq + Clone + Send + 'static,
119{
120 type Response = ();
121 type Error = DandelionPoolShutDown;
122 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
123
124 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
125 self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
126 }
127
128 fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
129 let (tx, rx) = oneshot::channel();
131
132 let res = self
133 .tx
134 .send_item((req, tx))
135 .map_err(|_| DandelionPoolShutDown);
136
137 async move {
138 res?;
139 rx.await.expect("Oneshot dropped before response!");
140
141 Ok(())
142 }
143 .boxed()
144 }
145}