cuprate_p2p_core/client/
connector.rs

1//! Connector
2//!
3//! This module handles connecting to peers and giving the sink/stream to the handshaker which will then
4//! perform a handshake and create a [`Client`].
5//!
6//! This is where outbound connections are created.
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13use futures::{FutureExt, Stream};
14use tokio::sync::OwnedSemaphorePermit;
15use tower::{Service, ServiceExt};
16
17use crate::{
18    client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
19    AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone,
20    ProtocolRequestHandlerMaker, Transport,
21};
22
23/// A request to connect to a peer.
24pub struct ConnectRequest<Z: NetworkZone> {
25    /// The peer's address.
26    pub addr: Z::Addr,
27    /// A permit which will be held be the connection allowing you to set limits on the number of
28    /// connections.
29    ///
30    /// This doesn't have to be set.
31    pub permit: Option<OwnedSemaphorePermit>,
32}
33
34/// The connector service, this service connects to peer and returns the [`Client`].
35#[derive(Clone)]
36pub struct Connector<Z: NetworkZone, T: Transport<Z>, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
37    handshaker: HandShaker<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
38}
39
40impl<Z: NetworkZone, T: Transport<Z>, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
41    Connector<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
42{
43    /// Create a new connector from a handshaker.
44    pub const fn new(
45        handshaker: HandShaker<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
46    ) -> Self {
47        Self { handshaker }
48    }
49}
50
51impl<Z: NetworkZone, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
52    Service<ConnectRequest<Z>> for Connector<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
53where
54    T: Transport<Z>,
55    AdrBook: AddressBook<Z> + Clone,
56    CSync: CoreSyncSvc + Clone,
57    ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
58    BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
59    BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
60{
61    type Response = Client<Z>;
62    type Error = HandshakeError;
63    type Future =
64        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
65
66    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67        Poll::Ready(Ok(()))
68    }
69
70    fn call(&mut self, req: ConnectRequest<Z>) -> Self::Future {
71        tracing::debug!("Connecting to peer: {}", req.addr);
72        let mut handshaker = self.handshaker.clone();
73
74        async move {
75            let (peer_stream, peer_sink) =
76                T::connect_to_peer(req.addr, handshaker.transport_config()).await?;
77            let req = DoHandshakeRequest {
78                addr: InternalPeerID::KnownAddr(req.addr),
79                permit: req.permit,
80                peer_stream,
81                peer_sink,
82                direction: ConnectionDirection::Outbound,
83            };
84            handshaker.ready().await?.call(req).await
85        }
86        .boxed()
87    }
88}