cuprated/txpool/dandelion/
stem_service.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt, Stream};
9use tower::Service;
10
11use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
12use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
13use cuprate_p2p_core::{
14 client::{Client, InternalPeerID},
15 BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest, Tor,
16};
17use cuprate_wire::protocol::NewTransactions;
18
19use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
20
21pub struct OutboundPeerStream<Z: NetworkZone> {
23 network_interface: NetworkInterface<Z>,
24 state: OutboundPeerStreamState<Z>,
25}
26
27impl<Z: NetworkZone> OutboundPeerStream<Z> {
28 pub const fn new(network_interface: NetworkInterface<Z>) -> Self {
29 Self {
30 network_interface,
31 state: OutboundPeerStreamState::Standby,
32 }
33 }
34}
35
36impl<Z: NetworkZone> Stream for OutboundPeerStream<Z>
37where
38 InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
39{
40 type Item =
41 Result<OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<Z>>, tower::BoxError>;
42
43 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 loop {
45 match &mut self.state {
46 OutboundPeerStreamState::Standby => {
47 let peer_set = self.network_interface.peer_set();
48 let res = ready!(peer_set.poll_ready(cx));
49
50 self.state = OutboundPeerStreamState::AwaitingPeer(
51 peer_set.call(PeerSetRequest::StemPeer).boxed(),
52 );
53 }
54 OutboundPeerStreamState::AwaitingPeer(fut) => {
55 let res = ready!(fut.poll_unpin(cx));
56
57 self.state = OutboundPeerStreamState::Standby;
58
59 return Poll::Ready(Some(res.map(|res| {
60 let PeerSetResponse::StemPeer(stem_peer) = res else {
61 unreachable!()
62 };
63
64 match stem_peer {
65 Some(peer) => {
66 OutboundPeer::Peer(peer.info.id.into(), StemPeerService(peer))
67 }
68 None => OutboundPeer::Exhausted,
69 }
70 })));
71 }
72 }
73 }
74 }
75}
76
77enum OutboundPeerStreamState<Z: NetworkZone> {
79 Standby,
81 AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<Z>, tower::BoxError>>),
83}
84
85pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
87
88impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
89 type Response = <Client<N> as Service<PeerRequest>>::Response;
90 type Error = tower::BoxError;
91 type Future = <Client<N> as Service<PeerRequest>>::Future;
92
93 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 self.0.broadcast_client().poll_ready(cx)
95 }
96
97 fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
98 self.0
99 .broadcast_client()
100 .call(BroadcastMessage::NewTransactions(NewTransactions {
101 txs: vec![req.0 .0],
102 dandelionpp_fluff: false,
103 padding: Bytes::new(),
104 }))
105 }
106}