cuprate_dandelion_tower/
router.rs

1//! # Dandelion++ Router
2//!
3//! This module contains [`DandelionRouter`] which is a [`Service`]. It that handles keeping the
4//! current dandelion++ [`State`] and deciding where to send transactions based on their [`TxState`].
5//!
6//! ### What The Router Does Not Do
7//!
8//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
9//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPoolManager)
10use std::{
11    collections::HashMap,
12    hash::Hash,
13    marker::PhantomData,
14    pin::Pin,
15    task::{ready, Context, Poll},
16    time::Instant,
17};
18
19use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream};
20use rand::{distributions::Bernoulli, prelude::*, thread_rng};
21use tower::Service;
22
23use crate::{
24    traits::{DiffuseRequest, StemRequest},
25    DandelionConfig,
26};
27
28/// An error returned from the [`DandelionRouter`]
29#[derive(thiserror::Error, Debug)]
30pub enum DandelionRouterError {
31    /// This error is probably recoverable so the request should be retried.
32    #[error("Peer chosen to route stem txs to had an err: {0}.")]
33    PeerError(tower::BoxError),
34    /// The broadcast service returned an error.
35    #[error("Broadcast service returned an err: {0}.")]
36    BroadcastError(tower::BoxError),
37    /// The outbound peer stream returned an error, this is critical.
38    #[error("The outbound peer stream returned an err: {0}.")]
39    OutboundPeerStreamError(tower::BoxError),
40    /// The outbound peer discoverer returned [`None`].
41    #[error("The outbound peer discoverer exited.")]
42    OutboundPeerDiscoverExited,
43}
44
45/// A response from an attempt to retrieve an outbound peer.
46pub enum OutboundPeer<Id, T> {
47    /// A peer.
48    Peer(Id, T),
49    /// The peer store is exhausted and has no more to return.
50    Exhausted,
51}
52
53/// The dandelion++ state.
54#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum State {
56    /// Fluff state, in this state we are diffusing stem transactions to all peers.
57    Fluff,
58    /// Stem state, in this state we are stemming stem transactions to a single outbound peer.
59    Stem,
60}
61
62/// The routing state of a transaction.
63#[derive(Debug, Clone, Eq, PartialEq)]
64pub enum TxState<Id> {
65    /// Fluff state.
66    Fluff,
67    /// Stem state.
68    Stem {
69        /// The peer who sent us this transaction's Id.
70        from: Id,
71    },
72    /// Local - the transaction originated from our node.
73    Local,
74}
75
76impl<Id> TxState<Id> {
77    /// Returns `true` if the tx is in the stem stage.
78    ///
79    /// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states.
80    pub const fn is_stem_stage(&self) -> bool {
81        matches!(self, Self::Local | Self::Stem { .. })
82    }
83}
84
85/// A request to route a transaction.
86pub struct DandelionRouteReq<Tx, Id> {
87    /// The transaction.
88    pub tx: Tx,
89    /// The transaction state.
90    pub state: TxState<Id>,
91}
92
93/// The dandelion router service.
94pub struct DandelionRouter<P, B, Id, S, Tx> {
95    // pub(crate) is for tests
96    /// A [`Discover`] where we can get outbound peers from.
97    outbound_peer_discover: Pin<Box<P>>,
98    /// A [`Service`] which handle broadcasting (diffusing) transactions.
99    broadcast_svc: B,
100
101    /// The current state.
102    current_state: State,
103    /// The time at which this epoch started.
104    epoch_start: Instant,
105
106    /// The stem our local transactions will be sent to.
107    local_route: Option<Id>,
108    /// A [`HashMap`] linking peer's Ids to Ids in `stem_peers`.
109    stem_routes: HashMap<Id, Id>,
110    /// Peers we are using for stemming.
111    ///
112    /// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`]
113    /// transactions.
114    pub(crate) stem_peers: HashMap<Id, S>,
115
116    /// The distribution to sample to get the [`State`], true is [`State::Fluff`].
117    state_dist: Bernoulli,
118
119    /// The config.
120    config: DandelionConfig,
121
122    /// The routers tracing span.
123    span: tracing::Span,
124
125    _tx: PhantomData<Tx>,
126}
127
128impl<Tx, Id, P, B, S> DandelionRouter<P, B, Id, S, Tx>
129where
130    Id: Hash + Eq + Clone,
131    P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
132    B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
133    B::Future: Send + 'static,
134    S: Service<StemRequest<Tx>, Error = tower::BoxError>,
135    S::Future: Send + 'static,
136{
137    /// Creates a new [`DandelionRouter`], with the provided services and config.
138    ///
139    /// # Panics
140    /// This function panics if [`DandelionConfig::fluff_probability`] is not `0.0..=1.0`.
141    pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self {
142        // get the current state
143        let state_dist = Bernoulli::new(config.fluff_probability)
144            .expect("Fluff probability was not between 0 and 1");
145
146        let current_state = if state_dist.sample(&mut thread_rng()) {
147            State::Fluff
148        } else {
149            State::Stem
150        };
151
152        Self {
153            outbound_peer_discover: Box::pin(outbound_peer_discover),
154            broadcast_svc,
155            current_state,
156            epoch_start: Instant::now(),
157            local_route: None,
158            stem_routes: HashMap::new(),
159            stem_peers: HashMap::new(),
160            state_dist,
161            config,
162            span: tracing::debug_span!("dandelion_router", state = ?current_state),
163            _tx: PhantomData,
164        }
165    }
166
167    /// This function gets the number of outbound peers from the [`Discover`] required for the selected [`Graph`](crate::Graph).
168    fn poll_prepare_graph(
169        &mut self,
170        cx: &mut Context<'_>,
171    ) -> Poll<Result<(), DandelionRouterError>> {
172        let peers_needed = match self.current_state {
173            State::Stem => self.config.number_of_stems(),
174            // When in the fluff state we only need one peer, the one for our txs.
175            State::Fluff => 1,
176        };
177
178        while self.stem_peers.len() < peers_needed {
179            match ready!(self
180                .outbound_peer_discover
181                .as_mut()
182                .try_poll_next(cx)
183                .map_err(DandelionRouterError::OutboundPeerStreamError))
184            .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
185            {
186                OutboundPeer::Peer(key, mut svc) => {
187                    let poll = svc.poll_ready(cx);
188
189                    self.stem_peers.insert(key.clone(), svc);
190
191                    if ready!(poll).is_err() {
192                        self.stem_peers.remove(&key);
193                    }
194                }
195                OutboundPeer::Exhausted => {
196                    tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded.");
197                    return Poll::Ready(Ok(()));
198                }
199            }
200        }
201
202        Poll::Ready(Ok(()))
203    }
204
205    fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
206        self.broadcast_svc
207            .call(DiffuseRequest(tx))
208            .map_ok(|_| State::Fluff)
209            .map_err(DandelionRouterError::BroadcastError)
210            .boxed()
211    }
212
213    fn stem_tx(
214        &mut self,
215        tx: Tx,
216        from: &Id,
217    ) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
218        if self.stem_peers.is_empty() {
219            tracing::debug!("Stem peers are empty, fluffing stem transaction.");
220            return self.fluff_tx(tx);
221        }
222
223        loop {
224            let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| {
225                self.stem_peers
226                    .iter()
227                    .choose(&mut thread_rng())
228                    .expect("No peers in `stem_peers` was poll_ready called?")
229                    .0
230                    .clone()
231            });
232
233            let Some(peer) = self.stem_peers.get_mut(stem_route) else {
234                self.stem_routes.remove(from);
235                continue;
236            };
237
238            return peer
239                .call(StemRequest(tx))
240                .map_ok(|_| State::Stem)
241                .map_err(DandelionRouterError::PeerError)
242                .boxed();
243        }
244    }
245
246    fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
247        if self.stem_peers.is_empty() {
248            tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded.");
249            return self.fluff_tx(tx);
250        }
251
252        loop {
253            let stem_route = self.local_route.get_or_insert_with(|| {
254                self.stem_peers
255                    .iter()
256                    .choose(&mut thread_rng())
257                    .expect("No peers in `stem_peers` was poll_ready called?")
258                    .0
259                    .clone()
260            });
261
262            let Some(peer) = self.stem_peers.get_mut(stem_route) else {
263                self.local_route.take();
264                continue;
265            };
266
267            return peer
268                .call(StemRequest(tx))
269                .map_ok(|_| State::Stem)
270                .map_err(DandelionRouterError::PeerError)
271                .boxed();
272        }
273    }
274}
275
276impl<Tx, Id, P, B, S> Service<DandelionRouteReq<Tx, Id>> for DandelionRouter<P, B, Id, S, Tx>
277where
278    Id: Hash + Eq + Clone,
279    P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
280    B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
281    B::Future: Send + 'static,
282    S: Service<StemRequest<Tx>, Error = tower::BoxError>,
283    S::Future: Send + 'static,
284{
285    type Response = State;
286    type Error = DandelionRouterError;
287    type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
288
289    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290        if self.epoch_start.elapsed() > self.config.epoch_duration {
291            // clear all the stem routing data.
292            self.stem_peers.clear();
293            self.stem_routes.clear();
294            self.local_route.take();
295
296            self.current_state = if self.state_dist.sample(&mut thread_rng()) {
297                State::Fluff
298            } else {
299                State::Stem
300            };
301
302            self.span = tracing::debug_span!("dandelion_router", state = ?self.current_state);
303            tracing::debug!(parent: &self.span, "Starting new d++ epoch",);
304
305            self.epoch_start = Instant::now();
306        }
307
308        let mut peers_pending = false;
309
310        let span = &self.span;
311
312        self.stem_peers
313            .retain(|_, peer_svc| match peer_svc.poll_ready(cx) {
314                Poll::Ready(res) => res
315                    .inspect_err(|e| {
316                        tracing::debug!(
317                            parent: span,
318                            "Peer returned an error on `poll_ready`: {e}, removing from router.",
319                        );
320                    })
321                    .is_ok(),
322                Poll::Pending => {
323                    // Pending peers should be kept - they have not errored yet.
324                    peers_pending = true;
325                    true
326                }
327            });
328
329        if peers_pending {
330            return Poll::Pending;
331        }
332
333        // now we have removed the failed peers check if we still have enough for the graph chosen.
334        ready!(self.poll_prepare_graph(cx)?);
335
336        ready!(self
337            .broadcast_svc
338            .poll_ready(cx)
339            .map_err(DandelionRouterError::BroadcastError)?);
340
341        Poll::Ready(Ok(()))
342    }
343
344    fn call(&mut self, req: DandelionRouteReq<Tx, Id>) -> Self::Future {
345        tracing::trace!(parent: &self.span,  "Handling route request.");
346
347        match req.state {
348            TxState::Fluff => self.fluff_tx(req.tx),
349            TxState::Stem { from } => match self.current_state {
350                State::Fluff => {
351                    tracing::debug!(parent: &self.span, "Fluffing stem tx.");
352
353                    self.fluff_tx(req.tx)
354                }
355                State::Stem => {
356                    tracing::trace!(parent: &self.span, "Stemming transaction");
357
358                    self.stem_tx(req.tx, &from)
359                }
360            },
361            TxState::Local => {
362                tracing::debug!(parent: &self.span, "Stemming local tx.");
363
364                self.stem_local_tx(req.tx)
365            }
366        }
367    }
368}