1use std::{
2 cmp::min,
3 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
4};
5
6use bytes::Bytes;
7use futures::StreamExt;
8use indexmap::IndexMap;
9use rand::Rng;
10use tokio::sync::{mpsc, oneshot};
11use tokio_util::{time::delay_queue, time::DelayQueue};
12use tower::{Service, ServiceExt};
13use tracing::{instrument, Instrument, Span};
14
15use cuprate_dandelion_tower::{
16 pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
17 traits::DiffuseRequest,
18 TxState,
19};
20use cuprate_helper::time::current_unix_timestamp;
21use cuprate_p2p_core::ClearNet;
22use cuprate_txpool::service::{
23 interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
24 TxpoolReadHandle, TxpoolWriteHandle,
25};
26use cuprate_types::TransactionVerificationData;
27
28use crate::{
29 config::TxpoolConfig,
30 constants::PANIC_CRITICAL_SERVICE_ERROR,
31 p2p::{CrossNetworkInternalPeerId, NetworkInterfaces},
32 txpool::{
33 dandelion::DiffuseService,
34 incoming_tx::{DandelionTx, TxId},
35 },
36};
37
38const INCOMING_TX_QUEUE_SIZE: usize = 100;
39
40pub async fn start_txpool_manager(
46 mut txpool_write_handle: TxpoolWriteHandle,
47 mut txpool_read_handle: TxpoolReadHandle,
48 promote_tx_channel: mpsc::UnboundedReceiver<[u8; 32]>,
49 diffuse_service: DiffuseService<ClearNet>,
50 dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
51 config: TxpoolConfig,
52) -> TxpoolManagerHandle {
53 let TxpoolReadResponse::Backlog(backlog) = txpool_read_handle
54 .ready()
55 .await
56 .expect(PANIC_CRITICAL_SERVICE_ERROR)
57 .call(TxpoolReadRequest::Backlog)
58 .await
59 .expect(PANIC_CRITICAL_SERVICE_ERROR)
60 else {
61 unreachable!()
62 };
63
64 tracing::info!(txs_in_pool = backlog.len(), "starting txpool manager");
65
66 let mut stem_txs = Vec::new();
67
68 let mut tx_timeouts = DelayQueue::with_capacity(backlog.len());
69 let current_txs = backlog
70 .into_iter()
71 .map(|tx| {
72 let timeout_key = if tx.private {
73 stem_txs.push(tx.id);
74 None
75 } else {
76 let next_timeout = calculate_next_timeout(tx.received_at, config.maximum_age_secs);
77 Some(tx_timeouts.insert(tx.id, Duration::from_secs(next_timeout)))
78 };
79
80 (
81 tx.id,
82 TxInfo {
83 weight: tx.weight,
84 fee: tx.fee,
85 received_at: tx.received_at,
86 private: tx.private,
87 timeout_key,
88 },
89 )
90 })
91 .collect();
92
93 let mut manager = TxpoolManager {
94 current_txs,
95 tx_timeouts,
96 txpool_write_handle,
97 txpool_read_handle,
98 dandelion_pool_manager,
99 promote_tx_channel,
100 diffuse_service,
101 config,
102 };
103
104 tracing::info!(stem_txs = stem_txs.len(), "promoting stem txs");
105
106 for tx in stem_txs {
107 manager.promote_tx(tx).await;
108 }
109
110 let (tx_tx, tx_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE);
111 let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1);
112
113 tokio::spawn(manager.run(tx_rx, spent_kis_rx));
114
115 TxpoolManagerHandle {
116 tx_tx,
117 spent_kis_tx,
118 }
119}
120
121#[derive(Clone)]
123pub struct TxpoolManagerHandle {
124 pub tx_tx: mpsc::Sender<(
126 TransactionVerificationData,
127 TxState<CrossNetworkInternalPeerId>,
128 )>,
129
130 spent_kis_tx: mpsc::Sender<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
132}
133
134impl TxpoolManagerHandle {
135 #[expect(clippy::let_underscore_must_use)]
139 pub fn mock() -> Self {
140 let (spent_kis_tx, mut spent_kis_rx) = mpsc::channel(1);
141 let (tx_tx, mut tx_rx) = mpsc::channel(100);
142
143 tokio::spawn(async move {
144 loop {
145 let Some(rec): Option<(_, oneshot::Sender<()>)> = spent_kis_rx.recv().await else {
146 return;
147 };
148
149 let _ = rec.1.send(());
150 }
151 });
152
153 tokio::spawn(async move {
154 loop {
155 if tx_rx.recv().await.is_none() {
156 return;
157 }
158 }
159 });
160
161 Self {
162 tx_tx,
163 spent_kis_tx,
164 }
165 }
166
167 pub async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> anyhow::Result<()> {
169 let (tx, rx) = oneshot::channel();
170
171 drop(self.spent_kis_tx.send((spent_key_images, tx)).await);
172
173 rx.await
174 .map_err(|_| anyhow::anyhow!("txpool manager stopped"))
175 }
176}
177
178struct TxInfo {
180 weight: usize,
182 fee: u64,
184 received_at: u64,
186 private: bool,
188
189 timeout_key: Option<delay_queue::Key>,
193}
194
195struct TxpoolManager {
196 current_txs: IndexMap<[u8; 32], TxInfo>,
197
198 tx_timeouts: DelayQueue<[u8; 32]>,
202
203 txpool_write_handle: TxpoolWriteHandle,
204 txpool_read_handle: TxpoolReadHandle,
205
206 dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
207 promote_tx_channel: mpsc::UnboundedReceiver<[u8; 32]>,
210 diffuse_service: DiffuseService<ClearNet>,
214
215 config: TxpoolConfig,
216}
217
218impl TxpoolManager {
219 #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
225 async fn remove_tx_from_pool(&mut self, tx: [u8; 32], remove_from_db: bool) {
226 tracing::debug!("removing tx from pool");
227
228 let tx_info = self.current_txs.swap_remove(&tx).unwrap();
229
230 tx_info
231 .timeout_key
232 .and_then(|key| self.tx_timeouts.try_remove(&key));
233
234 if remove_from_db {
235 self.txpool_write_handle
236 .ready()
237 .await
238 .expect(PANIC_CRITICAL_SERVICE_ERROR)
239 .call(TxpoolWriteRequest::RemoveTransaction(tx))
240 .await
241 .expect(PANIC_CRITICAL_SERVICE_ERROR);
242 }
243 }
244
245 #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
251 async fn rerelay_tx(&mut self, tx: [u8; 32]) {
252 tracing::debug!("re-relaying tx to network");
253
254 let TxpoolReadResponse::TxBlob {
255 tx_blob,
256 state_stem: _,
257 } = self
258 .txpool_read_handle
259 .ready()
260 .await
261 .expect(PANIC_CRITICAL_SERVICE_ERROR)
262 .call(TxpoolReadRequest::TxBlob(tx))
263 .await
264 .expect(PANIC_CRITICAL_SERVICE_ERROR)
265 else {
266 unreachable!()
267 };
268
269 self.diffuse_service
270 .call(DiffuseRequest(DandelionTx(Bytes::from(tx_blob))))
271 .await
272 .expect(PANIC_CRITICAL_SERVICE_ERROR);
273 }
274
275 #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
278 async fn handle_tx_timeout(&mut self, tx: [u8; 32]) {
279 let Some(tx_info) = self.current_txs.get(&tx) else {
280 tracing::warn!("tx timed out, but tx not in pool");
281 return;
282 };
283
284 let time_in_pool = current_unix_timestamp() - tx_info.received_at;
285
286 if time_in_pool + 10 > self.config.maximum_age_secs {
289 tracing::warn!("tx has been in pool too long, removing from pool");
290 self.remove_tx_from_pool(tx, true).await;
291 return;
292 }
293
294 let received_at = tx_info.received_at;
295
296 tracing::debug!(time_in_pool, "tx timed out, resending to network");
297
298 self.rerelay_tx(tx).await;
299
300 let tx_info = self.current_txs.get_mut(&tx).unwrap();
301
302 let next_timeout = calculate_next_timeout(received_at, self.config.maximum_age_secs);
303 tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
304
305 tx_info.timeout_key = Some(
306 self.tx_timeouts
307 .insert(tx, Duration::from_secs(next_timeout)),
308 );
309 }
310
311 #[instrument(level = "trace", skip_all, fields(tx_id = hex::encode(tx)))]
313 fn track_tx(&mut self, tx: [u8; 32], weight: usize, fee: u64, private: bool) {
314 let now = current_unix_timestamp();
315
316 let timeout_key = if private {
317 None
319 } else {
320 let timeout = calculate_next_timeout(now, self.config.maximum_age_secs);
321
322 tracing::trace!(in_secs = timeout, "setting next tx timeout");
323
324 Some(self.tx_timeouts.insert(tx, Duration::from_secs(timeout)))
325 };
326
327 self.current_txs.insert(
328 tx,
329 TxInfo {
330 weight,
331 fee,
332 received_at: now,
333 private,
334 timeout_key,
335 },
336 );
337 }
338
339 #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx.tx_hash), state))]
341 async fn handle_incoming_tx(
342 &mut self,
343 tx: TransactionVerificationData,
344 state: TxState<CrossNetworkInternalPeerId>,
345 ) {
346 tracing::debug!("handling new tx");
347
348 let incoming_tx =
349 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
350
351 let (tx_hash, tx_weight, tx_fee) = (tx.tx_hash, tx.tx_weight, tx.fee);
352
353 let TxpoolWriteResponse::AddTransaction(double_spend) = self
354 .txpool_write_handle
355 .ready()
356 .await
357 .expect(PANIC_CRITICAL_SERVICE_ERROR)
358 .call(TxpoolWriteRequest::AddTransaction {
359 tx: Box::new(tx),
360 state_stem: state.is_stem_stage(),
361 })
362 .await
363 .expect(PANIC_CRITICAL_SERVICE_ERROR)
364 else {
365 unreachable!()
366 };
367
368 if let Some(tx_hash) = double_spend {
369 tracing::debug!(
370 double_spent = hex::encode(tx_hash),
371 "transaction is a double spend, ignoring"
372 );
373 return;
374 }
375
376 self.track_tx(tx_hash, tx_weight, tx_fee, state.is_stem_stage());
377
378 let incoming_tx = incoming_tx
379 .with_routing_state(state)
380 .with_state_in_db(None)
381 .build()
382 .unwrap();
383
384 self.dandelion_pool_manager
385 .ready()
386 .await
387 .expect(PANIC_CRITICAL_SERVICE_ERROR)
388 .call(incoming_tx)
389 .await
390 .expect(PANIC_CRITICAL_SERVICE_ERROR);
391 }
392
393 #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
395 async fn promote_tx(&mut self, tx: [u8; 32]) {
396 let Some(tx_info) = self.current_txs.get_mut(&tx) else {
397 tracing::debug!("not promoting tx, tx not in pool");
398 return;
399 };
400
401 if !tx_info.private {
402 tracing::trace!("not promoting tx, tx is already public");
403 return;
404 }
405
406 tracing::debug!("promoting tx");
407
408 tx_info.received_at = current_unix_timestamp();
410
411 let next_timeout =
412 calculate_next_timeout(tx_info.received_at, self.config.maximum_age_secs);
413 tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
414 tx_info.timeout_key = Some(
415 self.tx_timeouts
416 .insert(tx, Duration::from_secs(next_timeout)),
417 );
418
419 self.txpool_write_handle
420 .ready()
421 .await
422 .expect(PANIC_CRITICAL_SERVICE_ERROR)
423 .call(TxpoolWriteRequest::Promote(tx))
424 .await
425 .expect(PANIC_CRITICAL_SERVICE_ERROR);
426 }
427
428 #[instrument(level = "debug", skip_all)]
430 async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) {
431 tracing::debug!("handling new block");
432
433 let TxpoolWriteResponse::NewBlock(removed_txs) = self
434 .txpool_write_handle
435 .ready()
436 .await
437 .expect(PANIC_CRITICAL_SERVICE_ERROR)
438 .call(TxpoolWriteRequest::NewBlock { spent_key_images })
439 .await
440 .expect(PANIC_CRITICAL_SERVICE_ERROR)
441 else {
442 unreachable!()
443 };
444
445 for tx in removed_txs {
446 self.remove_tx_from_pool(tx, false).await;
447 }
448 }
449
450 #[expect(clippy::let_underscore_must_use)]
451 async fn run(
452 mut self,
453 mut tx_rx: mpsc::Receiver<(
454 TransactionVerificationData,
455 TxState<CrossNetworkInternalPeerId>,
456 )>,
457 mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
458 ) {
459 loop {
460 tokio::select! {
461 Some(tx) = self.tx_timeouts.next() => {
462 self.handle_tx_timeout(tx.into_inner()).await;
463 }
464 Some((tx, state)) = tx_rx.recv() => {
465 self.handle_incoming_tx(tx, state).await;
466 }
467 Some(tx) = self.promote_tx_channel.recv() => {
468 self.promote_tx(tx).await;
469 }
470 Some((spent_kis, tx)) = block_rx.recv() => {
471 self.new_block(spent_kis).await;
472 let _ = tx.send(());
473 }
474 }
475 }
476 }
477}
478
479fn calculate_next_timeout(received_at: u64, max_time_in_pool: u64) -> u64 {
481 const TX_RERELAY_TIME: u64 = 300;
483
484 let now = current_unix_timestamp();
489
490 let time_in_pool = now - received_at;
491
492 let time_till_max_timeout = max_time_in_pool.saturating_sub(time_in_pool);
493
494 let timeouts = time_in_pool / TX_RERELAY_TIME;
495
496 min((timeouts + 1) * TX_RERELAY_TIME, time_till_max_timeout)
497}