cuprated/txpool/dandelion/
stem_service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
};

use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, Stream};
use tower::Service;

use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{
    client::{Client, InternalPeerID},
    ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
};
use cuprate_wire::protocol::NewTransactions;

use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};

/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
    clear_net: NetworkInterface<ClearNet>,
    state: OutboundPeerStreamState,
}

impl OutboundPeerStream {
    pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
        Self {
            clear_net,
            state: OutboundPeerStreamState::Standby,
        }
    }
}

impl Stream for OutboundPeerStream {
    type Item = Result<
        OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
        tower::BoxError,
    >;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match &mut self.state {
                OutboundPeerStreamState::Standby => {
                    let peer_set = self.clear_net.peer_set();
                    let res = ready!(peer_set.poll_ready(cx));

                    self.state = OutboundPeerStreamState::AwaitingPeer(
                        peer_set.call(PeerSetRequest::StemPeer).boxed(),
                    );
                }
                OutboundPeerStreamState::AwaitingPeer(fut) => {
                    let res = ready!(fut.poll_unpin(cx));

                    return Poll::Ready(Some(res.map(|res| {
                        let PeerSetResponse::StemPeer(stem_peer) = res else {
                            unreachable!()
                        };

                        match stem_peer {
                            Some(peer) => OutboundPeer::Peer(
                                CrossNetworkInternalPeerId::ClearNet(peer.info.id),
                                StemPeerService(peer),
                            ),
                            None => OutboundPeer::Exhausted,
                        }
                    })));
                }
            }
        }
    }
}

/// The state of the [`OutboundPeerStream`].
enum OutboundPeerStreamState {
    /// Standby state.
    Standby,
    /// Awaiting a response from the peer-set.
    AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
}

/// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);

impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
    type Response = <Client<N> as Service<PeerRequest>>::Response;
    type Error = tower::BoxError;
    type Future = <Client<N> as Service<PeerRequest>>::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.0.poll_ready(cx)
    }

    fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
        self.0
            .call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
                NewTransactions {
                    txs: vec![req.0 .0],
                    dandelionpp_fluff: false,
                    padding: Bytes::new(),
                },
            )))
    }
}