cuprated/txpool/dandelion/
anon_net_service.rs1use std::{
2 pin::Pin,
3 task::{ready, Context, Poll},
4};
5
6use futures::{Stream, StreamExt, TryStream};
7use tower::Service;
8
9use cuprate_dandelion_tower::{DandelionRouterError, OutboundPeer};
10use cuprate_p2p::NetworkInterface;
11use cuprate_p2p_core::{client::InternalPeerID, NetworkZone};
12
13use crate::{
14 p2p::CrossNetworkInternalPeerId,
15 txpool::dandelion::stem_service::{OutboundPeerStream, StemPeerService},
16};
17
18pub struct AnonTxService<Z: NetworkZone> {
20 outbound_peer_discover: Pin<Box<OutboundPeerStream<Z>>>,
21 pub peer: Option<StemPeerService<Z>>,
22}
23
24impl<Z: NetworkZone> AnonTxService<Z>
25where
26 InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
27{
28 pub fn new(network_interface: NetworkInterface<Z>) -> Self {
29 Self {
30 outbound_peer_discover: Box::pin(OutboundPeerStream::new(network_interface)),
31 peer: None,
32 }
33 }
34
35 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), DandelionRouterError>> {
36 loop {
37 if let Some(peer) = &mut self.peer {
38 if ready!(peer.poll_ready(cx)).is_err() {
39 self.peer = None;
40
41 continue;
42 }
43
44 return Poll::Ready(Ok(()));
45 }
46
47 let ret = ready!(self
48 .outbound_peer_discover
49 .as_mut()
50 .try_poll_next(cx)
51 .map_err(DandelionRouterError::OutboundPeerStreamError))
52 .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??;
53
54 match ret {
55 OutboundPeer::Peer(_, mut svc) => {
56 let poll = svc.poll_ready(cx);
57 self.peer = Some(svc);
58 if ready!(poll).is_err() {
59 self.peer = None;
60 }
61 }
62 OutboundPeer::Exhausted => return Poll::Ready(Ok(())),
63 }
64 }
65
66 Poll::Ready(Ok(()))
67 }
68}