cuprate_p2p_core/client/
weak.rsuse std::task::{ready, Context, Poll};
use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{
client::{connection, PeerInformation},
NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
pub struct WeakClient<N: NetworkZone> {
pub info: PeerInformation<N::Addr>,
pub(super) connection_tx: mpsc::WeakSender<connection::ConnectionTaskRequest>,
pub(super) semaphore: PollSemaphore,
pub(super) permit: Option<OwnedSemaphorePermit>,
pub(super) error: SharedError<PeerError>,
}
impl<N: NetworkZone> WeakClient<N> {
fn set_err(&self, err: PeerError) -> tower::BoxError {
let err_str = err.to_string();
match self.error.try_insert_err(err) {
Ok(()) => err_str,
Err(e) => e.to_string(),
}
.into()
}
}
impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(err) = self.error.try_get_err() {
return Poll::Ready(Err(err.to_string().into()));
}
if self.connection_tx.strong_count() == 0 {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
if self.permit.is_some() {
return Poll::Ready(Ok(()));
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
Poll::Ready(Ok(()))
}
#[expect(clippy::significant_drop_tightening)]
fn call(&mut self, request: PeerRequest) -> Self::Future {
let permit = self
.permit
.take()
.expect("poll_ready did not return ready before call to call");
let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {
response_channel: tx,
request,
permit: Some(permit),
};
match self.connection_tx.upgrade() {
None => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
Some(sender) => {
if let Err(e) = sender.try_send(req) {
use mpsc::error::TrySendError;
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
}
}
}
rx.into()
}
}