cuprate_p2p_core/client/
connector.rs1use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13use futures::{FutureExt, Stream};
14use tokio::sync::OwnedSemaphorePermit;
15use tower::{Service, ServiceExt};
16
17use crate::{
18 client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
19 AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone,
20 ProtocolRequestHandlerMaker, Transport,
21};
22
23pub struct ConnectRequest<Z: NetworkZone> {
25 pub addr: Z::Addr,
27 pub permit: Option<OwnedSemaphorePermit>,
32}
33
34#[derive(Clone)]
36pub struct Connector<Z: NetworkZone, T: Transport<Z>, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
37 handshaker: HandShaker<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
38}
39
40impl<Z: NetworkZone, T: Transport<Z>, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
41 Connector<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
42{
43 pub const fn new(
45 handshaker: HandShaker<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
46 ) -> Self {
47 Self { handshaker }
48 }
49}
50
51impl<Z: NetworkZone, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
52 Service<ConnectRequest<Z>> for Connector<Z, T, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
53where
54 T: Transport<Z>,
55 AdrBook: AddressBook<Z> + Clone,
56 CSync: CoreSyncSvc + Clone,
57 ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
58 BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
59 BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
60{
61 type Response = Client<Z>;
62 type Error = HandshakeError;
63 type Future =
64 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
65
66 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 Poll::Ready(Ok(()))
68 }
69
70 fn call(&mut self, req: ConnectRequest<Z>) -> Self::Future {
71 tracing::debug!("Connecting to peer: {}", req.addr);
72 let mut handshaker = self.handshaker.clone();
73
74 async move {
75 let (peer_stream, peer_sink) =
76 T::connect_to_peer(req.addr, handshaker.transport_config()).await?;
77 let req = DoHandshakeRequest {
78 addr: InternalPeerID::KnownAddr(req.addr),
79 permit: req.permit,
80 peer_stream,
81 peer_sink,
82 direction: ConnectionDirection::Outbound,
83 };
84 handshaker.ready().await?.call(req).await
85 }
86 .boxed()
87 }
88}