1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! # Dandelion++ Pool
//!
//! This module contains [`DandelionPoolManager`] which is a wrapper around a backing transaction store,
//! which fully implements the dandelion++ protocol.
//!
//! The [`DandelionPoolManager`] is a middle man between a [preprocessing stage](#preprocessing-stage) and a dandelion router.
//! It handles promoting transactions in the stem state to the fluff state and setting embargo timers on stem state transactions.
//!
//! ### Preprocessing stage
//!
//! The preprocessing stage (not handled in this crate) before giving the transaction to the [`DandelionPoolManager`]
//! should handle:
//!
//! - verifying the tx.
//! - checking if we have the tx in the pool already and giving that information to the [`IncomingTxBuilder`].
//! - storing the tx in the pool, if it isn't there already.
//!
//! ### Keep Stem Transactions Hidden
//!
//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
use std::{
    collections::HashMap,
    hash::Hash,
    marker::PhantomData,
    task::{Context, Poll},
};

use futures::{future::BoxFuture, FutureExt};
use rand_distr::Exp;
use tokio::{
    sync::{mpsc, oneshot},
    task::JoinSet,
};
use tokio_util::{sync::PollSender, time::DelayQueue};
use tower::Service;
use tracing::Instrument;

use crate::{
    pool::manager::DandelionPoolShutDown,
    traits::{TxStoreRequest, TxStoreResponse},
    DandelionConfig, DandelionRouteReq, DandelionRouterError, State,
};

mod incoming_tx;
mod manager;

pub use incoming_tx::{IncomingTx, IncomingTxBuilder};
pub use manager::DandelionPoolManager;

/// Start the [`DandelionPoolManager`].
///
/// This function spawns the [`DandelionPoolManager`] and returns [`DandelionPoolService`] which can be used to send
/// requests to the pool.
///
/// ### Args
///
/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPoolManager`].
/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
///   user to customise routing functionality.
/// - `backing_pool` is the backing transaction storage service
/// - `config` is [`DandelionConfig`].
pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
    buffer_size: usize,
    dandelion_router: R,
    backing_pool: P,
    config: DandelionConfig,
) -> DandelionPoolService<Tx, TxId, PeerId>
where
    Tx: Clone + Send + 'static,
    TxId: Hash + Eq + Clone + Send + 'static,
    PeerId: Hash + Eq + Clone + Send + 'static,
    P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
        + Send
        + 'static,
    P::Future: Send + 'static,
    R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
        + Send
        + 'static,
    R::Future: Send + 'static,
{
    let (tx, rx) = mpsc::channel(buffer_size);

    let pool = DandelionPoolManager {
        dandelion_router,
        backing_pool,
        routing_set: JoinSet::new(),
        stem_origins: HashMap::new(),
        embargo_timers: DelayQueue::new(),
        embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
        config,
        _tx: PhantomData,
    };

    let span = tracing::debug_span!("dandelion_pool");

    tokio::spawn(pool.run(rx).instrument(span));

    DandelionPoolService {
        tx: PollSender::new(tx),
    }
}

/// The dandelion pool manager service.
///
/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
#[derive(Clone)]
pub struct DandelionPoolService<Tx, TxId, PeerId> {
    /// The channel to [`DandelionPoolManager`].
    tx: PollSender<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
}

impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
    for DandelionPoolService<Tx, TxId, PeerId>
where
    Tx: Clone + Send,
    TxId: Hash + Eq + Clone + Send + 'static,
    PeerId: Hash + Eq + Clone + Send + 'static,
{
    type Response = ();
    type Error = DandelionPoolShutDown;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
    }

    fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
        // although the channel isn't sending anything we want to wait for the request to be handled before continuing.
        let (tx, rx) = oneshot::channel();

        let res = self
            .tx
            .send_item((req, tx))
            .map_err(|_| DandelionPoolShutDown);

        async move {
            res?;
            rx.await.expect("Oneshot dropped before response!");

            Ok(())
        }
        .boxed()
    }
}