1use std::{
11 collections::HashMap,
12 hash::Hash,
13 marker::PhantomData,
14 pin::Pin,
15 task::{ready, Context, Poll},
16 time::Instant,
17};
18
19use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream};
20use rand::{distributions::Bernoulli, prelude::*, thread_rng};
21use tower::Service;
22
23use crate::{
24 traits::{DiffuseRequest, StemRequest},
25 DandelionConfig,
26};
27
28#[derive(thiserror::Error, Debug)]
30pub enum DandelionRouterError {
31 #[error("Peer chosen to route stem txs to had an err: {0}.")]
33 PeerError(tower::BoxError),
34 #[error("Broadcast service returned an err: {0}.")]
36 BroadcastError(tower::BoxError),
37 #[error("The outbound peer stream returned an err: {0}.")]
39 OutboundPeerStreamError(tower::BoxError),
40 #[error("The outbound peer discoverer exited.")]
42 OutboundPeerDiscoverExited,
43}
44
45pub enum OutboundPeer<Id, T> {
47 Peer(Id, T),
49 Exhausted,
51}
52
53#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum State {
56 Fluff,
58 Stem,
60}
61
62#[derive(Debug, Clone, Eq, PartialEq)]
64pub enum TxState<Id> {
65 Fluff,
67 Stem {
69 from: Id,
71 },
72 Local,
74}
75
76impl<Id> TxState<Id> {
77 pub const fn is_stem_stage(&self) -> bool {
81 matches!(self, Self::Local | Self::Stem { .. })
82 }
83}
84
85pub struct DandelionRouteReq<Tx, Id> {
87 pub tx: Tx,
89 pub state: TxState<Id>,
91}
92
93pub struct DandelionRouter<P, B, Id, S, Tx> {
95 outbound_peer_discover: Pin<Box<P>>,
98 broadcast_svc: B,
100
101 current_state: State,
103 epoch_start: Instant,
105
106 local_route: Option<Id>,
108 stem_routes: HashMap<Id, Id>,
110 pub(crate) stem_peers: HashMap<Id, S>,
115
116 state_dist: Bernoulli,
118
119 config: DandelionConfig,
121
122 span: tracing::Span,
124
125 _tx: PhantomData<Tx>,
126}
127
128impl<Tx, Id, P, B, S> DandelionRouter<P, B, Id, S, Tx>
129where
130 Id: Hash + Eq + Clone,
131 P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
132 B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
133 B::Future: Send + 'static,
134 S: Service<StemRequest<Tx>, Error = tower::BoxError>,
135 S::Future: Send + 'static,
136{
137 pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self {
142 let state_dist = Bernoulli::new(config.fluff_probability)
144 .expect("Fluff probability was not between 0 and 1");
145
146 let current_state = if state_dist.sample(&mut thread_rng()) {
147 State::Fluff
148 } else {
149 State::Stem
150 };
151
152 Self {
153 outbound_peer_discover: Box::pin(outbound_peer_discover),
154 broadcast_svc,
155 current_state,
156 epoch_start: Instant::now(),
157 local_route: None,
158 stem_routes: HashMap::new(),
159 stem_peers: HashMap::new(),
160 state_dist,
161 config,
162 span: tracing::debug_span!("dandelion_router", state = ?current_state),
163 _tx: PhantomData,
164 }
165 }
166
167 fn poll_prepare_graph(
169 &mut self,
170 cx: &mut Context<'_>,
171 ) -> Poll<Result<(), DandelionRouterError>> {
172 let peers_needed = match self.current_state {
173 State::Stem => self.config.number_of_stems(),
174 State::Fluff => 1,
176 };
177
178 while self.stem_peers.len() < peers_needed {
179 match ready!(self
180 .outbound_peer_discover
181 .as_mut()
182 .try_poll_next(cx)
183 .map_err(DandelionRouterError::OutboundPeerStreamError))
184 .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
185 {
186 OutboundPeer::Peer(key, svc) => {
187 self.stem_peers.insert(key, svc);
188 }
189 OutboundPeer::Exhausted => {
190 tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded.");
191 return Poll::Ready(Ok(()));
192 }
193 }
194 }
195
196 Poll::Ready(Ok(()))
197 }
198
199 fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
200 self.broadcast_svc
201 .call(DiffuseRequest(tx))
202 .map_ok(|_| State::Fluff)
203 .map_err(DandelionRouterError::BroadcastError)
204 .boxed()
205 }
206
207 fn stem_tx(
208 &mut self,
209 tx: Tx,
210 from: &Id,
211 ) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
212 if self.stem_peers.is_empty() {
213 tracing::debug!("Stem peers are empty, fluffing stem transaction.");
214 return self.fluff_tx(tx);
215 }
216
217 loop {
218 let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| {
219 self.stem_peers
220 .iter()
221 .choose(&mut thread_rng())
222 .expect("No peers in `stem_peers` was poll_ready called?")
223 .0
224 .clone()
225 });
226
227 let Some(peer) = self.stem_peers.get_mut(stem_route) else {
228 self.stem_routes.remove(from);
229 continue;
230 };
231
232 return peer
233 .call(StemRequest(tx))
234 .map_ok(|_| State::Stem)
235 .map_err(DandelionRouterError::PeerError)
236 .boxed();
237 }
238 }
239
240 fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
241 if self.stem_peers.is_empty() {
242 tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded.");
243 return self.fluff_tx(tx);
244 }
245
246 loop {
247 let stem_route = self.local_route.get_or_insert_with(|| {
248 self.stem_peers
249 .iter()
250 .choose(&mut thread_rng())
251 .expect("No peers in `stem_peers` was poll_ready called?")
252 .0
253 .clone()
254 });
255
256 let Some(peer) = self.stem_peers.get_mut(stem_route) else {
257 self.local_route.take();
258 continue;
259 };
260
261 return peer
262 .call(StemRequest(tx))
263 .map_ok(|_| State::Stem)
264 .map_err(DandelionRouterError::PeerError)
265 .boxed();
266 }
267 }
268}
269
270impl<Tx, Id, P, B, S> Service<DandelionRouteReq<Tx, Id>> for DandelionRouter<P, B, Id, S, Tx>
271where
272 Id: Hash + Eq + Clone,
273 P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
274 B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
275 B::Future: Send + 'static,
276 S: Service<StemRequest<Tx>, Error = tower::BoxError>,
277 S::Future: Send + 'static,
278{
279 type Response = State;
280 type Error = DandelionRouterError;
281 type Future = BoxFuture<'static, Result<State, DandelionRouterError>>;
282
283 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
284 if self.epoch_start.elapsed() > self.config.epoch_duration {
285 self.stem_peers.clear();
287 self.stem_routes.clear();
288 self.local_route.take();
289
290 self.current_state = if self.state_dist.sample(&mut thread_rng()) {
291 State::Fluff
292 } else {
293 State::Stem
294 };
295
296 self.span
297 .record("state", format!("{:?}", self.current_state));
298 tracing::debug!(parent: &self.span, "Starting new d++ epoch",);
299
300 self.epoch_start = Instant::now();
301 }
302
303 let mut peers_pending = false;
304
305 let span = &self.span;
306
307 self.stem_peers
308 .retain(|_, peer_svc| match peer_svc.poll_ready(cx) {
309 Poll::Ready(res) => res
310 .inspect_err(|e| {
311 tracing::debug!(
312 parent: span,
313 "Peer returned an error on `poll_ready`: {e}, removing from router.",
314 );
315 })
316 .is_ok(),
317 Poll::Pending => {
318 peers_pending = true;
320 true
321 }
322 });
323
324 if peers_pending {
325 return Poll::Pending;
326 }
327
328 ready!(self.poll_prepare_graph(cx)?);
330
331 ready!(self
332 .broadcast_svc
333 .poll_ready(cx)
334 .map_err(DandelionRouterError::BroadcastError)?);
335
336 Poll::Ready(Ok(()))
337 }
338
339 fn call(&mut self, req: DandelionRouteReq<Tx, Id>) -> Self::Future {
340 tracing::trace!(parent: &self.span, "Handling route request.");
341
342 match req.state {
343 TxState::Fluff => self.fluff_tx(req.tx),
344 TxState::Stem { from } => match self.current_state {
345 State::Fluff => {
346 tracing::debug!(parent: &self.span, "Fluffing stem tx.");
347
348 self.fluff_tx(req.tx)
349 }
350 State::Stem => {
351 tracing::trace!(parent: &self.span, "Steming transaction");
352
353 self.stem_tx(req.tx, &from)
354 }
355 },
356 TxState::Local => {
357 tracing::debug!(parent: &self.span, "Steming local tx.");
358
359 self.stem_local_tx(req.tx)
360 }
361 }
362 }
363}