cuprated/txpool/dandelion/
stem_service.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt, Stream};
9use tower::Service;
10
11use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
12use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
13use cuprate_p2p_core::{
14 client::{Client, InternalPeerID},
15 BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
16};
17use cuprate_wire::protocol::NewTransactions;
18
19use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
20
21pub struct OutboundPeerStream {
23 clear_net: NetworkInterface<ClearNet>,
24 state: OutboundPeerStreamState,
25}
26
27impl OutboundPeerStream {
28 pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
29 Self {
30 clear_net,
31 state: OutboundPeerStreamState::Standby,
32 }
33 }
34}
35
36impl Stream for OutboundPeerStream {
37 type Item = Result<
38 OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
39 tower::BoxError,
40 >;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 loop {
44 match &mut self.state {
45 OutboundPeerStreamState::Standby => {
46 let peer_set = self.clear_net.peer_set();
47 let res = ready!(peer_set.poll_ready(cx));
48
49 self.state = OutboundPeerStreamState::AwaitingPeer(
50 peer_set.call(PeerSetRequest::StemPeer).boxed(),
51 );
52 }
53 OutboundPeerStreamState::AwaitingPeer(fut) => {
54 let res = ready!(fut.poll_unpin(cx));
55
56 self.state = OutboundPeerStreamState::Standby;
57
58 return Poll::Ready(Some(res.map(|res| {
59 let PeerSetResponse::StemPeer(stem_peer) = res else {
60 unreachable!()
61 };
62
63 match stem_peer {
64 Some(peer) => OutboundPeer::Peer(
65 CrossNetworkInternalPeerId::ClearNet(peer.info.id),
66 StemPeerService(peer),
67 ),
68 None => OutboundPeer::Exhausted,
69 }
70 })));
71 }
72 }
73 }
74 }
75}
76
77enum OutboundPeerStreamState {
79 Standby,
81 AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
83}
84
85pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
87
88impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
89 type Response = <Client<N> as Service<PeerRequest>>::Response;
90 type Error = tower::BoxError;
91 type Future = <Client<N> as Service<PeerRequest>>::Future;
92
93 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 self.0.broadcast_client().poll_ready(cx)
95 }
96
97 fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
98 self.0
99 .broadcast_client()
100 .call(BroadcastMessage::NewTransactions(NewTransactions {
101 txs: vec![req.0 .0],
102 dandelionpp_fluff: false,
103 padding: Bytes::new(),
104 }))
105 }
106}