cuprated/txpool/dandelion/
stem_service.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt, Stream};
9use tower::Service;
10
11use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
12use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
13use cuprate_p2p_core::{
14    client::{Client, InternalPeerID},
15    BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
16};
17use cuprate_wire::protocol::NewTransactions;
18
19use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
20
21/// The dandelion outbound peer stream.
22pub struct OutboundPeerStream {
23    clear_net: NetworkInterface<ClearNet>,
24    state: OutboundPeerStreamState,
25}
26
27impl OutboundPeerStream {
28    pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
29        Self {
30            clear_net,
31            state: OutboundPeerStreamState::Standby,
32        }
33    }
34}
35
36impl Stream for OutboundPeerStream {
37    type Item = Result<
38        OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
39        tower::BoxError,
40    >;
41
42    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        loop {
44            match &mut self.state {
45                OutboundPeerStreamState::Standby => {
46                    let peer_set = self.clear_net.peer_set();
47                    let res = ready!(peer_set.poll_ready(cx));
48
49                    self.state = OutboundPeerStreamState::AwaitingPeer(
50                        peer_set.call(PeerSetRequest::StemPeer).boxed(),
51                    );
52                }
53                OutboundPeerStreamState::AwaitingPeer(fut) => {
54                    let res = ready!(fut.poll_unpin(cx));
55
56                    self.state = OutboundPeerStreamState::Standby;
57
58                    return Poll::Ready(Some(res.map(|res| {
59                        let PeerSetResponse::StemPeer(stem_peer) = res else {
60                            unreachable!()
61                        };
62
63                        match stem_peer {
64                            Some(peer) => OutboundPeer::Peer(
65                                CrossNetworkInternalPeerId::ClearNet(peer.info.id),
66                                StemPeerService(peer),
67                            ),
68                            None => OutboundPeer::Exhausted,
69                        }
70                    })));
71                }
72            }
73        }
74    }
75}
76
77/// The state of the [`OutboundPeerStream`].
78enum OutboundPeerStreamState {
79    /// Standby state.
80    Standby,
81    /// Awaiting a response from the peer-set.
82    AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
83}
84
85/// The stem service, used to send stem txs.
86pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
87
88impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
89    type Response = <Client<N> as Service<PeerRequest>>::Response;
90    type Error = tower::BoxError;
91    type Future = <Client<N> as Service<PeerRequest>>::Future;
92
93    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        self.0.broadcast_client().poll_ready(cx)
95    }
96
97    fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
98        self.0
99            .broadcast_client()
100            .call(BroadcastMessage::NewTransactions(NewTransactions {
101                txs: vec![req.0 .0],
102                dandelionpp_fluff: false,
103                padding: Bytes::new(),
104            }))
105    }
106}