cuprate_p2p_core/client/
weak.rs

1use std::task::{ready, Context, Poll};
2
3use futures::channel::oneshot;
4use tokio::sync::OwnedSemaphorePermit;
5use tokio_util::sync::{PollSemaphore, PollSender};
6use tower::Service;
7
8use cuprate_helper::asynch::InfallibleOneshotReceiver;
9
10use crate::{
11    client::{connection, PeerInformation},
12    BroadcastMessage, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
13};
14
15/// A weak handle to a [`Client`](super::Client).
16///
17/// When this is dropped the peer will not be disconnected.
18pub struct WeakClient<N: NetworkZone> {
19    /// Information on the connected peer.
20    pub info: PeerInformation<N::Addr>,
21
22    /// The channel to the [`Connection`](connection::Connection) task.
23    pub(super) connection_tx: PollSender<connection::ConnectionTaskRequest>,
24
25    /// The semaphore that limits the requests sent to the peer.
26    pub(super) semaphore: PollSemaphore,
27    /// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
28    pub(super) permit: Option<OwnedSemaphorePermit>,
29
30    /// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
31    pub(super) error: SharedError<PeerError>,
32}
33
34impl<N: NetworkZone> WeakClient<N> {
35    /// Internal function to set an error on the [`SharedError`].
36    fn set_err(&self, err: PeerError) -> tower::BoxError {
37        let err_str = err.to_string();
38        match self.error.try_insert_err(err) {
39            Ok(()) => err_str,
40            Err(e) => e.to_string(),
41        }
42        .into()
43    }
44
45    /// Create a [`WeakBroadcastClient`] from this [`WeakClient`].
46    ///
47    /// See the docs for [`WeakBroadcastClient`] for what this type can do.
48    pub fn broadcast_client(&mut self) -> WeakBroadcastClient<'_, N> {
49        WeakBroadcastClient(self)
50    }
51}
52
53impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
54    type Response = PeerResponse;
55    type Error = tower::BoxError;
56    type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
57
58    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59        if let Some(err) = self.error.try_get_err() {
60            return Poll::Ready(Err(err.to_string().into()));
61        }
62
63        if self.permit.is_none() {
64            let permit = ready!(self.semaphore.poll_acquire(cx))
65                .expect("Client semaphore should not be closed!");
66
67            self.permit = Some(permit);
68        }
69
70        if ready!(self.connection_tx.poll_reserve(cx)).is_err() {
71            let err = self.set_err(PeerError::ClientChannelClosed);
72            return Poll::Ready(Err(err));
73        }
74
75        Poll::Ready(Ok(()))
76    }
77
78    fn call(&mut self, request: PeerRequest) -> Self::Future {
79        let permit = self
80            .permit
81            .take()
82            .expect("poll_ready did not return ready before call to call");
83
84        let (tx, rx) = oneshot::channel();
85        let req = connection::ConnectionTaskRequest {
86            response_channel: tx,
87            request,
88            permit: Some(permit),
89        };
90
91        if let Err(req) = self.connection_tx.send_item(req) {
92            // The connection task could have closed between a call to `poll_ready` and the call to
93            // `call`, which means if we don't handle the error here the receiver would panic.
94            self.set_err(PeerError::ClientChannelClosed);
95
96            let resp = Err(PeerError::ClientChannelClosed.into());
97            drop(req.into_inner().unwrap().response_channel.send(resp));
98        }
99
100        rx.into()
101    }
102}
103
104/// A client used to send [`BroadcastMessage`]s directly to a single peer, although these messages
105/// can be sent using a [`WeakClient`] or [`Client`](super::Client), using this client type allows
106/// bypassing the single request being handled at a time.
107///
108/// This means that if another [`WeakClient`] has a request in progress [`WeakBroadcastClient`] can
109/// still send messages and does not need to wait for the other [`WeakClient`] to finish.
110///
111/// A thing to note is that a call to [`WeakBroadcastClient::poll_ready`] will still reserve a slot
112/// in the queue, this should be kept in mind as many [`WeakBroadcastClient`]s calling [`WeakBroadcastClient::poll_ready`]
113/// without [`WeakBroadcastClient::call`] will stop other [`WeakBroadcastClient`]s & the other types
114/// of clients.
115///
116/// This type is kept in state with the [`WeakClient`] it was produced from, allowing you to do this:
117///
118/// ```rust, ignore
119/// weak_client.broadcast_client().poll_ready(cx)
120///
121/// weak_client.broadcast_client().call(req)
122/// ```
123pub struct WeakBroadcastClient<'a, N: NetworkZone>(&'a mut WeakClient<N>);
124
125impl<N: NetworkZone> Service<BroadcastMessage> for WeakBroadcastClient<'_, N> {
126    type Response = PeerResponse;
127    type Error = tower::BoxError;
128    type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
129
130    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131        self.0.permit.take();
132
133        if let Some(err) = self.0.error.try_get_err() {
134            return Poll::Ready(Err(err.to_string().into()));
135        }
136
137        if ready!(self.0.connection_tx.poll_reserve(cx)).is_err() {
138            let err = self.0.set_err(PeerError::ClientChannelClosed);
139            return Poll::Ready(Err(err));
140        }
141
142        Poll::Ready(Ok(()))
143    }
144
145    fn call(&mut self, request: BroadcastMessage) -> Self::Future {
146        let (tx, rx) = oneshot::channel();
147        let req = connection::ConnectionTaskRequest {
148            response_channel: tx,
149            request: request.into(),
150            // We don't need a permit as we only accept `BroadcastMessage`, which does not require a response.
151            permit: None,
152        };
153
154        if let Err(req) = self.0.connection_tx.send_item(req) {
155            // The connection task could have closed between a call to `poll_ready` and the call to
156            // `call`, which means if we don't handle the error here the receiver would panic.
157            self.0.set_err(PeerError::ClientChannelClosed);
158
159            let resp = Err(PeerError::ClientChannelClosed.into());
160            drop(req.into_inner().unwrap().response_channel.send(resp));
161        }
162
163        rx.into()
164    }
165}