cuprate_dandelion_tower/
router.rs

1//! # Dandelion++ Router
2//!
3//! This module contains [`DandelionRouter`] which is a [`Service`]. It that handles keeping the
4//! current dandelion++ [`State`] and deciding where to send transactions based on their [`TxState`].
5//!
6//! ### What The Router Does Not Do
7//!
8//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
9//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPoolManager)
10use std::{
11    collections::HashMap,
12    hash::Hash,
13    marker::PhantomData,
14    pin::Pin,
15    task::{ready, Context, Poll},
16    time::Instant,
17};
18
19use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream};
20use rand::{distributions::Bernoulli, prelude::*, thread_rng};
21use tower::Service;
22
23use crate::{
24    traits::{DiffuseRequest, StemRequest},
25    DandelionConfig,
26};
27
28/// An error returned from the [`DandelionRouter`]
29#[derive(thiserror::Error, Debug)]
30pub enum DandelionRouterError {
31    /// This error is probably recoverable so the request should be retried.
32    #[error("Peer chosen to route stem txs to had an err: {0}.")]
33    PeerError(tower::BoxError),
34    /// The broadcast service returned an error.
35    #[error("Broadcast service returned an err: {0}.")]
36    BroadcastError(tower::BoxError),
37    /// The outbound peer stream returned an error, this is critical.
38    #[error("The outbound peer stream returned an err: {0}.")]
39    OutboundPeerStreamError(tower::BoxError),
40    /// The outbound peer discoverer returned [`None`].
41    #[error("The outbound peer discoverer exited.")]
42    OutboundPeerDiscoverExited,
43}
44
45/// A response from an attempt to retrieve an outbound peer.
46pub enum OutboundPeer<Id, T> {
47    /// A peer.
48    Peer(Id, T),
49    /// The peer store is exhausted and has no more to return.
50    Exhausted,
51}
52
53/// The dandelion++ state.
54#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum State {
56    /// Fluff state, in this state we are diffusing stem transactions to all peers.
57    Fluff,
58    /// Stem state, in this state we are stemming stem transactions to a single outbound peer.
59    Stem,
60}
61
62/// The routing state of a transaction.
63#[derive(Debug, Clone, Eq, PartialEq)]
64pub enum TxState<Id> {
65    /// Fluff state.
66    Fluff,
67    /// Stem state.
68    Stem {
69        /// The peer who sent us this transaction's Id.
70        from: Id,
71    },
72    /// Local - the transaction originated from our node.
73    Local,
74}
75
76impl<Id> TxState<Id> {
77    /// Returns `true` if the tx is in the stem stage.
78    ///
79    /// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states.
80    pub const fn is_stem_stage(&self) -> bool {
81        matches!(self, Self::Local | Self::Stem { .. })
82    }
83}
84
85/// A request to route a transaction.
86pub struct DandelionRouteReq<Tx, Id> {
87    /// The transaction.
88    pub tx: Tx,
89    /// The transaction state.
90    pub state: TxState<Id>,
91}
92
93/// The dandelion router service.
94pub struct DandelionRouter<P, B, Id, S, Tx> {
95    // pub(crate) is for tests
96    /// A [`Discover`] where we can get outbound peers from.
97    outbound_peer_discover: Pin<Box<P>>,
98    /// A [`Service`] which handle broadcasting (diffusing) transactions.
99    broadcast_svc: B,
100
101    /// The current state.
102    current_state: State,
103    /// The time at which this epoch started.
104    epoch_start: Instant,
105
106    /// The stem our local transactions will be sent to.
107    local_route: Option<Id>,
108    /// A [`HashMap`] linking peer's Ids to Ids in `stem_peers`.
109    stem_routes: HashMap<Id, Id>,
110    /// Peers we are using for stemming.
111    ///
112    /// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`]
113    /// transactions.
114    pub(crate) stem_peers: HashMap<Id, S>,
115
116    /// The distribution to sample to get the [`State`], true is [`State::Fluff`].
117    state_dist: Bernoulli,
118
119    /// The config.
120    config: DandelionConfig,
121
122    /// The routers tracing span.
123    span: tracing::Span,
124
125    _tx: PhantomData<Tx>,
126}
127
128impl<Tx, Id, P, B, S> DandelionRouter<P, B, Id, S, Tx>
129where
130    Id: Hash + Eq + Clone,
131    P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
132    B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
133    B::Future: Send + 'static,
134    S: Service<StemRequest<Tx>, Error = tower::BoxError>,
135    S::Future: Send + 'static,
136{
137    /// Creates a new [`DandelionRouter`], with the provided services and config.
138    ///
139    /// # Panics
140    /// This function panics if [`DandelionConfig::fluff_probability`] is not `0.0..=1.0`.
141    pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self {
142        // get the current state
143        let state_dist = Bernoulli::new(config.fluff_probability)
144            .expect("Fluff probability was not between 0 and 1");
145
146        let current_state = if state_dist.sample(&mut thread_rng()) {
147            State::Fluff
148        } else {
149            State::Stem
150        };
151
152        Self {
153            outbound_peer_discover: Box::pin(outbound_peer_discover),
154            broadcast_svc,
155            current_state,
156            epoch_start: Instant::now(),
157            local_route: None,
158            stem_routes: HashMap::new(),
159            stem_peers: HashMap::new(),
160            state_dist,
161            config,
162            span: tracing::debug_span!("dandelion_router", state = ?current_state),
163            _tx: PhantomData,
164        }
165    }
166
167    /// This function gets the number of outbound peers from the [`Discover`] required for the selected [`Graph`](crate::Graph).
168    fn poll_prepare_graph(
169        &mut self,
170        cx: &mut Context<'_>,
171    ) -> Poll<Result<(), DandelionRouterError>> {
172        let peers_needed = match self.current_state {
173            State::Stem => self.config.number_of_stems(),
174            // When in the fluff state we only need one peer, the one for our txs.
175            State::Fluff => 1,
176        };
177
178        while self.stem_peers.len() < peers_needed {
179            match ready!(self
180                .outbound_peer_discover
181                .as_mut()
182                .try_poll_next(cx)
183                .map_err(DandelionRouterError::OutboundPeerStreamError))
184            .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
185            {
186                OutboundPeer::Peer(key, svc) => {
187                    self.stem_peers.insert(key, svc);
188                }
189                OutboundPeer::Exhausted => {
190                    tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded.");
191                    return Poll::Ready(Ok(()));
192                }
193            }
194        }
195
196        Poll::Ready(Ok(()))
197    }
198
199    fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
200        self.broadcast_svc
201            .call(DiffuseRequest(tx))
202            .map_ok(|_| State::Fluff)
203            .map_err(DandelionRouterError::BroadcastError)
204            .boxed()
205    }
206
207    fn stem_tx(
208        &mut self,
209        tx: Tx,
210        from: &Id,
211    ) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
212        if self.stem_peers.is_empty() {
213            tracing::debug!("Stem peers are empty, fluffing stem transaction.");
214            return self.fluff_tx(tx);
215        }
216
217        loop {
218            let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| {
219                self.stem_peers
220                    .iter()
221                    .choose(&mut thread_rng())
222                    .expect("No peers in `stem_peers` was poll_ready called?")
223                    .0
224                    .clone()
225            });
226
227            let Some(peer) = self.stem_peers.get_mut(stem_route) else {
228                self.stem_routes.remove(from);
229                continue;
230            };
231
232            return peer
233                .call(StemRequest(tx))
234                .map_ok(|_| State::Stem)
235                .map_err(DandelionRouterError::PeerError)
236                .boxed();
237        }
238    }
239
240    fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
241        if self.stem_peers.is_empty() {
242            tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded.");
243            return self.fluff_tx(tx);
244        }
245
246        loop {
247            let stem_route = self.local_route.get_or_insert_with(|| {
248                self.stem_peers
249                    .iter()
250                    .choose(&mut thread_rng())
251                    .expect("No peers in `stem_peers` was poll_ready called?")
252                    .0
253                    .clone()
254            });
255
256            let Some(peer) = self.stem_peers.get_mut(stem_route) else {
257                self.local_route.take();
258                continue;
259            };
260
261            return peer
262                .call(StemRequest(tx))
263                .map_ok(|_| State::Stem)
264                .map_err(DandelionRouterError::PeerError)
265                .boxed();
266        }
267    }
268}
269
270impl<Tx, Id, P, B, S> Service<DandelionRouteReq<Tx, Id>> for DandelionRouter<P, B, Id, S, Tx>
271where
272    Id: Hash + Eq + Clone,
273    P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
274    B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
275    B::Future: Send + 'static,
276    S: Service<StemRequest<Tx>, Error = tower::BoxError>,
277    S::Future: Send + 'static,
278{
279    type Response = State;
280    type Error = DandelionRouterError;
281    type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
282
283    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
284        if self.epoch_start.elapsed() > self.config.epoch_duration {
285            // clear all the stem routing data.
286            self.stem_peers.clear();
287            self.stem_routes.clear();
288            self.local_route.take();
289
290            self.current_state = if self.state_dist.sample(&mut thread_rng()) {
291                State::Fluff
292            } else {
293                State::Stem
294            };
295
296            self.span
297                .record("state", format!("{:?}", self.current_state));
298            tracing::debug!(parent: &self.span, "Starting new d++ epoch",);
299
300            self.epoch_start = Instant::now();
301        }
302
303        let mut peers_pending = false;
304
305        let span = &self.span;
306
307        self.stem_peers
308            .retain(|_, peer_svc| match peer_svc.poll_ready(cx) {
309                Poll::Ready(res) => res
310                    .inspect_err(|e| {
311                        tracing::debug!(
312                            parent: span,
313                            "Peer returned an error on `poll_ready`: {e}, removing from router.",
314                        );
315                    })
316                    .is_ok(),
317                Poll::Pending => {
318                    // Pending peers should be kept - they have not errored yet.
319                    peers_pending = true;
320                    true
321                }
322            });
323
324        if peers_pending {
325            return Poll::Pending;
326        }
327
328        // now we have removed the failed peers check if we still have enough for the graph chosen.
329        ready!(self.poll_prepare_graph(cx)?);
330
331        ready!(self
332            .broadcast_svc
333            .poll_ready(cx)
334            .map_err(DandelionRouterError::BroadcastError)?);
335
336        Poll::Ready(Ok(()))
337    }
338
339    fn call(&mut self, req: DandelionRouteReq<Tx, Id>) -> Self::Future {
340        tracing::trace!(parent: &self.span,  "Handling route request.");
341
342        match req.state {
343            TxState::Fluff => self.fluff_tx(req.tx),
344            TxState::Stem { from } => match self.current_state {
345                State::Fluff => {
346                    tracing::debug!(parent: &self.span, "Fluffing stem tx.");
347
348                    self.fluff_tx(req.tx)
349                }
350                State::Stem => {
351                    tracing::trace!(parent: &self.span, "Steming transaction");
352
353                    self.stem_tx(req.tx, &from)
354                }
355            },
356            TxState::Local => {
357                tracing::debug!(parent: &self.span, "Steming local tx.");
358
359                self.stem_local_tx(req.tx)
360            }
361        }
362    }
363}