cuprate_p2p_core/client/
connector.rsuse std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::{FutureExt, Stream};
use tokio::sync::OwnedSemaphorePermit;
use tower::{Service, ServiceExt};
use crate::{
client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone,
ProtocolRequestHandlerMaker,
};
pub struct ConnectRequest<Z: NetworkZone> {
pub addr: Z::Addr,
pub permit: Option<OwnedSemaphorePermit>,
}
pub struct Connector<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
}
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
{
pub const fn new(
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
) -> Self {
Self { handshaker }
}
}
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
where
AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone,
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{
type Response = Client<Z>;
type Error = HandshakeError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ConnectRequest<Z>) -> Self::Future {
tracing::debug!("Connecting to peer: {}", req.addr);
let mut handshaker = self.handshaker.clone();
async move {
let (peer_stream, peer_sink) = Z::connect_to_peer(req.addr).await?;
let req = DoHandshakeRequest {
addr: InternalPeerID::KnownAddr(req.addr),
permit: req.permit,
peer_stream,
peer_sink,
direction: ConnectionDirection::Outbound,
};
handshaker.ready().await?.call(req).await
}
.boxed()
}
}