1use std::task::{ready, Context, Poll};
23use futures::channel::oneshot;
4use tokio::sync::OwnedSemaphorePermit;
5use tokio_util::sync::{PollSemaphore, PollSender};
6use tower::Service;
78use cuprate_helper::asynch::InfallibleOneshotReceiver;
910use crate::{
11 client::{connection, PeerInformation},
12 BroadcastMessage, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
13};
1415/// A weak handle to a [`Client`](super::Client).
16///
17/// When this is dropped the peer will not be disconnected.
18pub struct WeakClient<N: NetworkZone> {
19/// Information on the connected peer.
20pub info: PeerInformation<N::Addr>,
2122/// The channel to the [`Connection`](connection::Connection) task.
23pub(super) connection_tx: PollSender<connection::ConnectionTaskRequest>,
2425/// The semaphore that limits the requests sent to the peer.
26pub(super) semaphore: PollSemaphore,
27/// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
28pub(super) permit: Option<OwnedSemaphorePermit>,
2930/// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
31pub(super) error: SharedError<PeerError>,
32}
3334impl<N: NetworkZone> WeakClient<N> {
35/// Internal function to set an error on the [`SharedError`].
36fn set_err(&self, err: PeerError) -> tower::BoxError {
37let err_str = err.to_string();
38match self.error.try_insert_err(err) {
39Ok(()) => err_str,
40Err(e) => e.to_string(),
41 }
42 .into()
43 }
4445/// Create a [`WeakBroadcastClient`] from this [`WeakClient`].
46 ///
47 /// See the docs for [`WeakBroadcastClient`] for what this type can do.
48pub const fn broadcast_client(&mut self) -> WeakBroadcastClient<'_, N> {
49 WeakBroadcastClient(self)
50 }
51}
5253impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
54type Response = PeerResponse;
55type Error = tower::BoxError;
56type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
5758fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59if let Some(err) = self.error.try_get_err() {
60return Poll::Ready(Err(err.to_string().into()));
61 }
6263if self.permit.is_none() {
64let permit = ready!(self.semaphore.poll_acquire(cx))
65 .expect("Client semaphore should not be closed!");
6667self.permit = Some(permit);
68 }
6970if ready!(self.connection_tx.poll_reserve(cx)).is_err() {
71let err = self.set_err(PeerError::ClientChannelClosed);
72return Poll::Ready(Err(err));
73 }
7475 Poll::Ready(Ok(()))
76 }
7778fn call(&mut self, request: PeerRequest) -> Self::Future {
79let permit = self
80.permit
81 .take()
82 .expect("poll_ready did not return ready before call to call");
8384let (tx, rx) = oneshot::channel();
85let req = connection::ConnectionTaskRequest {
86 response_channel: tx,
87 request,
88 permit: Some(permit),
89 };
9091if let Err(req) = self.connection_tx.send_item(req) {
92// The connection task could have closed between a call to `poll_ready` and the call to
93 // `call`, which means if we don't handle the error here the receiver would panic.
94self.set_err(PeerError::ClientChannelClosed);
9596let resp = Err(PeerError::ClientChannelClosed.into());
97 drop(req.into_inner().unwrap().response_channel.send(resp));
98 }
99100 rx.into()
101 }
102}
103104/// A client used to send [`BroadcastMessage`]s directly to a single peer, although these messages
105/// can be sent using a [`WeakClient`] or [`Client`](super::Client), using this client type allows
106/// bypassing the single request being handled at a time.
107///
108/// This means that if another [`WeakClient`] has a request in progress [`WeakBroadcastClient`] can
109/// still send messages and does not need to wait for the other [`WeakClient`] to finish.
110///
111/// A thing to note is that a call to [`WeakBroadcastClient::poll_ready`] will still reserve a slot
112/// in the queue, this should be kept in mind as many [`WeakBroadcastClient`]s calling [`WeakBroadcastClient::poll_ready`]
113/// without [`WeakBroadcastClient::call`] will stop other [`WeakBroadcastClient`]s & the other types
114/// of clients.
115///
116/// This type is kept in state with the [`WeakClient`] it was produced from, allowing you to do this:
117///
118/// ```rust, ignore
119/// weak_client.broadcast_client().poll_ready(cx)
120///
121/// weak_client.broadcast_client().call(req)
122/// ```
123pub struct WeakBroadcastClient<'a, N: NetworkZone>(&'a mut WeakClient<N>);
124125impl<N: NetworkZone> Service<BroadcastMessage> for WeakBroadcastClient<'_, N> {
126type Response = PeerResponse;
127type Error = tower::BoxError;
128type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
129130fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131self.0.permit.take();
132133if let Some(err) = self.0.error.try_get_err() {
134return Poll::Ready(Err(err.to_string().into()));
135 }
136137if ready!(self.0.connection_tx.poll_reserve(cx)).is_err() {
138let err = self.0.set_err(PeerError::ClientChannelClosed);
139return Poll::Ready(Err(err));
140 }
141142 Poll::Ready(Ok(()))
143 }
144145fn call(&mut self, request: BroadcastMessage) -> Self::Future {
146let (tx, rx) = oneshot::channel();
147let req = connection::ConnectionTaskRequest {
148 response_channel: tx,
149 request: request.into(),
150// We don't need a permit as we only accept `BroadcastMessage`, which does not require a response.
151permit: None,
152 };
153154if let Err(req) = self.0.connection_tx.send_item(req) {
155// The connection task could have closed between a call to `poll_ready` and the call to
156 // `call`, which means if we don't handle the error here the receiver would panic.
157self.0.set_err(PeerError::ClientChannelClosed);
158159let resp = Err(PeerError::ClientChannelClosed.into());
160 drop(req.into_inner().unwrap().response_channel.send(resp));
161 }
162163 rx.into()
164 }
165}