cuprated/txpool/dandelion/
anon_net_service.rs

1use std::{
2    pin::Pin,
3    task::{ready, Context, Poll},
4};
5
6use futures::{Stream, StreamExt, TryStream};
7use tower::Service;
8
9use cuprate_dandelion_tower::{DandelionRouterError, OutboundPeer};
10use cuprate_p2p::NetworkInterface;
11use cuprate_p2p_core::{client::InternalPeerID, NetworkZone};
12
13use crate::{
14    p2p::CrossNetworkInternalPeerId,
15    txpool::dandelion::stem_service::{OutboundPeerStream, StemPeerService},
16};
17
18/// The service to prepare peers on anonymous network zones for sending transactions.
19pub struct AnonTxService<Z: NetworkZone> {
20    outbound_peer_discover: Pin<Box<OutboundPeerStream<Z>>>,
21    pub peer: Option<StemPeerService<Z>>,
22}
23
24impl<Z: NetworkZone> AnonTxService<Z>
25where
26    InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
27{
28    pub fn new(network_interface: NetworkInterface<Z>) -> Self {
29        Self {
30            outbound_peer_discover: Box::pin(OutboundPeerStream::new(network_interface)),
31            peer: None,
32        }
33    }
34
35    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DandelionRouterError>> {
36        loop {
37            if let Some(peer) = &mut self.peer {
38                if ready!(peer.poll_ready(cx)).is_err() {
39                    self.peer = None;
40
41                    continue;
42                }
43
44                return Poll::Ready(Ok(()));
45            }
46
47            let ret = ready!(self
48                .outbound_peer_discover
49                .as_mut()
50                .try_poll_next(cx)
51                .map_err(DandelionRouterError::OutboundPeerStreamError))
52            .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??;
53
54            match ret {
55                OutboundPeer::Peer(_, mut svc) => {
56                    let poll = svc.poll_ready(cx);
57                    self.peer = Some(svc);
58                    if ready!(poll).is_err() {
59                        self.peer = None;
60                    }
61                }
62                OutboundPeer::Exhausted => return Poll::Ready(Ok(())),
63            }
64        }
65
66        Poll::Ready(Ok(()))
67    }
68}