1use std::{
11 collections::HashMap,
12 hash::Hash,
13 marker::PhantomData,
14 pin::Pin,
15 task::{ready, Context, Poll},
16 time::Instant,
17};
18
19use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream};
20use rand::{distributions::Bernoulli, prelude::*, thread_rng};
21use tower::Service;
22
23use crate::{
24 traits::{DiffuseRequest, StemRequest},
25 DandelionConfig,
26};
27
28#[derive(thiserror::Error, Debug)]
30pub enum DandelionRouterError {
31 #[error("Peer chosen to route stem txs to had an err: {0}.")]
33 PeerError(tower::BoxError),
34 #[error("Broadcast service returned an err: {0}.")]
36 BroadcastError(tower::BoxError),
37 #[error("The outbound peer stream returned an err: {0}.")]
39 OutboundPeerStreamError(tower::BoxError),
40 #[error("The outbound peer discoverer exited.")]
42 OutboundPeerDiscoverExited,
43}
44
45pub enum OutboundPeer<Id, T> {
47 Peer(Id, T),
49 Exhausted,
51}
52
53#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum State {
56 Fluff,
58 Stem,
60}
61
62#[derive(Debug, Clone, Eq, PartialEq)]
64pub enum TxState<Id> {
65 Fluff,
67 Stem {
69 from: Id,
71 },
72 Local,
74}
75
76impl<Id> TxState<Id> {
77 pub const fn is_stem_stage(&self) -> bool {
81 matches!(self, Self::Local | Self::Stem { .. })
82 }
83}
84
85pub struct DandelionRouteReq<Tx, Id> {
87 pub tx: Tx,
89 pub state: TxState<Id>,
91}
92
93pub struct DandelionRouter<P, B, Id, S, Tx> {
95 outbound_peer_discover: Pin<Box<P>>,
98 broadcast_svc: B,
100
101 current_state: State,
103 epoch_start: Instant,
105
106 local_route: Option<Id>,
108 stem_routes: HashMap<Id, Id>,
110 pub(crate) stem_peers: HashMap<Id, S>,
115
116 state_dist: Bernoulli,
118
119 config: DandelionConfig,
121
122 span: tracing::Span,
124
125 _tx: PhantomData<Tx>,
126}
127
128impl<Tx, Id, P, B, S> DandelionRouter<P, B, Id, S, Tx>
129where
130 Id: Hash + Eq + Clone,
131 P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
132 B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
133 B::Future: Send + 'static,
134 S: Service<StemRequest<Tx>, Error = tower::BoxError>,
135 S::Future: Send + 'static,
136{
137 pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self {
142 let state_dist = Bernoulli::new(config.fluff_probability)
144 .expect("Fluff probability was not between 0 and 1");
145
146 let current_state = if state_dist.sample(&mut thread_rng()) {
147 State::Fluff
148 } else {
149 State::Stem
150 };
151
152 Self {
153 outbound_peer_discover: Box::pin(outbound_peer_discover),
154 broadcast_svc,
155 current_state,
156 epoch_start: Instant::now(),
157 local_route: None,
158 stem_routes: HashMap::new(),
159 stem_peers: HashMap::new(),
160 state_dist,
161 config,
162 span: tracing::debug_span!("dandelion_router", state = ?current_state),
163 _tx: PhantomData,
164 }
165 }
166
167 fn poll_prepare_graph(
169 &mut self,
170 cx: &mut Context<'_>,
171 ) -> Poll<Result<(), DandelionRouterError>> {
172 let peers_needed = match self.current_state {
173 State::Stem => self.config.number_of_stems(),
174 State::Fluff => 1,
176 };
177
178 while self.stem_peers.len() < peers_needed {
179 match ready!(self
180 .outbound_peer_discover
181 .as_mut()
182 .try_poll_next(cx)
183 .map_err(DandelionRouterError::OutboundPeerStreamError))
184 .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
185 {
186 OutboundPeer::Peer(key, mut svc) => {
187 let poll = svc.poll_ready(cx);
188
189 self.stem_peers.insert(key.clone(), svc);
190
191 if ready!(poll).is_err() {
192 self.stem_peers.remove(&key);
193 }
194 }
195 OutboundPeer::Exhausted => {
196 tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded.");
197 return Poll::Ready(Ok(()));
198 }
199 }
200 }
201
202 Poll::Ready(Ok(()))
203 }
204
205 fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
206 self.broadcast_svc
207 .call(DiffuseRequest(tx))
208 .map_ok(|_| State::Fluff)
209 .map_err(DandelionRouterError::BroadcastError)
210 .boxed()
211 }
212
213 fn stem_tx(
214 &mut self,
215 tx: Tx,
216 from: &Id,
217 ) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
218 if self.stem_peers.is_empty() {
219 tracing::debug!("Stem peers are empty, fluffing stem transaction.");
220 return self.fluff_tx(tx);
221 }
222
223 loop {
224 let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| {
225 self.stem_peers
226 .iter()
227 .choose(&mut thread_rng())
228 .expect("No peers in `stem_peers` was poll_ready called?")
229 .0
230 .clone()
231 });
232
233 let Some(peer) = self.stem_peers.get_mut(stem_route) else {
234 self.stem_routes.remove(from);
235 continue;
236 };
237
238 return peer
239 .call(StemRequest(tx))
240 .map_ok(|_| State::Stem)
241 .map_err(DandelionRouterError::PeerError)
242 .boxed();
243 }
244 }
245
246 fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
247 if self.stem_peers.is_empty() {
248 tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded.");
249 return self.fluff_tx(tx);
250 }
251
252 loop {
253 let stem_route = self.local_route.get_or_insert_with(|| {
254 self.stem_peers
255 .iter()
256 .choose(&mut thread_rng())
257 .expect("No peers in `stem_peers` was poll_ready called?")
258 .0
259 .clone()
260 });
261
262 let Some(peer) = self.stem_peers.get_mut(stem_route) else {
263 self.local_route.take();
264 continue;
265 };
266
267 return peer
268 .call(StemRequest(tx))
269 .map_ok(|_| State::Stem)
270 .map_err(DandelionRouterError::PeerError)
271 .boxed();
272 }
273 }
274}
275
276impl<Tx, Id, P, B, S> Service<DandelionRouteReq<Tx, Id>> for DandelionRouter<P, B, Id, S, Tx>
277where
278 Id: Hash + Eq + Clone,
279 P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
280 B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
281 B::Future: Send + 'static,
282 S: Service<StemRequest<Tx>, Error = tower::BoxError>,
283 S::Future: Send + 'static,
284{
285 type Response = State;
286 type Error = DandelionRouterError;
287 type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
288
289 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290 if self.epoch_start.elapsed() > self.config.epoch_duration {
291 self.stem_peers.clear();
293 self.stem_routes.clear();
294 self.local_route.take();
295
296 self.current_state = if self.state_dist.sample(&mut thread_rng()) {
297 State::Fluff
298 } else {
299 State::Stem
300 };
301
302 self.span = tracing::debug_span!("dandelion_router", state = ?self.current_state);
303 tracing::debug!(parent: &self.span, "Starting new d++ epoch",);
304
305 self.epoch_start = Instant::now();
306 }
307
308 let mut peers_pending = false;
309
310 let span = &self.span;
311
312 self.stem_peers
313 .retain(|_, peer_svc| match peer_svc.poll_ready(cx) {
314 Poll::Ready(res) => res
315 .inspect_err(|e| {
316 tracing::debug!(
317 parent: span,
318 "Peer returned an error on `poll_ready`: {e}, removing from router.",
319 );
320 })
321 .is_ok(),
322 Poll::Pending => {
323 peers_pending = true;
325 true
326 }
327 });
328
329 if peers_pending {
330 return Poll::Pending;
331 }
332
333 ready!(self.poll_prepare_graph(cx)?);
335
336 ready!(self
337 .broadcast_svc
338 .poll_ready(cx)
339 .map_err(DandelionRouterError::BroadcastError)?);
340
341 Poll::Ready(Ok(()))
342 }
343
344 fn call(&mut self, req: DandelionRouteReq<Tx, Id>) -> Self::Future {
345 tracing::trace!(parent: &self.span, "Handling route request.");
346
347 match req.state {
348 TxState::Fluff => self.fluff_tx(req.tx),
349 TxState::Stem { from } => match self.current_state {
350 State::Fluff => {
351 tracing::debug!(parent: &self.span, "Fluffing stem tx.");
352
353 self.fluff_tx(req.tx)
354 }
355 State::Stem => {
356 tracing::trace!(parent: &self.span, "Stemming transaction");
357
358 self.stem_tx(req.tx, &from)
359 }
360 },
361 TxState::Local => {
362 tracing::debug!(parent: &self.span, "Stemming local tx.");
363
364 self.stem_local_tx(req.tx)
365 }
366 }
367 }
368}