cuprate_p2p_core/client/
weak.rs1use std::task::{ready, Context, Poll};
2
3use futures::channel::oneshot;
4use tokio::sync::OwnedSemaphorePermit;
5use tokio_util::sync::{PollSemaphore, PollSender};
6use tower::Service;
7
8use cuprate_helper::asynch::InfallibleOneshotReceiver;
9
10use crate::{
11 client::{connection, PeerInformation},
12 BroadcastMessage, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
13};
14
15pub struct WeakClient<N: NetworkZone> {
19 pub info: PeerInformation<N::Addr>,
21
22 pub(super) connection_tx: PollSender<connection::ConnectionTaskRequest>,
24
25 pub(super) semaphore: PollSemaphore,
27 pub(super) permit: Option<OwnedSemaphorePermit>,
29
30 pub(super) error: SharedError<PeerError>,
32}
33
34impl<N: NetworkZone> WeakClient<N> {
35 fn set_err(&self, err: PeerError) -> tower::BoxError {
37 let err_str = err.to_string();
38 match self.error.try_insert_err(err) {
39 Ok(()) => err_str,
40 Err(e) => e.to_string(),
41 }
42 .into()
43 }
44
45 pub fn broadcast_client(&mut self) -> WeakBroadcastClient<'_, N> {
49 WeakBroadcastClient(self)
50 }
51}
52
53impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
54 type Response = PeerResponse;
55 type Error = tower::BoxError;
56 type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
57
58 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59 if let Some(err) = self.error.try_get_err() {
60 return Poll::Ready(Err(err.to_string().into()));
61 }
62
63 if self.permit.is_none() {
64 let permit = ready!(self.semaphore.poll_acquire(cx))
65 .expect("Client semaphore should not be closed!");
66
67 self.permit = Some(permit);
68 }
69
70 if ready!(self.connection_tx.poll_reserve(cx)).is_err() {
71 let err = self.set_err(PeerError::ClientChannelClosed);
72 return Poll::Ready(Err(err));
73 }
74
75 Poll::Ready(Ok(()))
76 }
77
78 fn call(&mut self, request: PeerRequest) -> Self::Future {
79 let permit = self
80 .permit
81 .take()
82 .expect("poll_ready did not return ready before call to call");
83
84 let (tx, rx) = oneshot::channel();
85 let req = connection::ConnectionTaskRequest {
86 response_channel: tx,
87 request,
88 permit: Some(permit),
89 };
90
91 if let Err(req) = self.connection_tx.send_item(req) {
92 self.set_err(PeerError::ClientChannelClosed);
95
96 let resp = Err(PeerError::ClientChannelClosed.into());
97 drop(req.into_inner().unwrap().response_channel.send(resp));
98 }
99
100 rx.into()
101 }
102}
103
104pub struct WeakBroadcastClient<'a, N: NetworkZone>(&'a mut WeakClient<N>);
124
125impl<N: NetworkZone> Service<BroadcastMessage> for WeakBroadcastClient<'_, N> {
126 type Response = PeerResponse;
127 type Error = tower::BoxError;
128 type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
129
130 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131 self.0.permit.take();
132
133 if let Some(err) = self.0.error.try_get_err() {
134 return Poll::Ready(Err(err.to_string().into()));
135 }
136
137 if ready!(self.0.connection_tx.poll_reserve(cx)).is_err() {
138 let err = self.0.set_err(PeerError::ClientChannelClosed);
139 return Poll::Ready(Err(err));
140 }
141
142 Poll::Ready(Ok(()))
143 }
144
145 fn call(&mut self, request: BroadcastMessage) -> Self::Future {
146 let (tx, rx) = oneshot::channel();
147 let req = connection::ConnectionTaskRequest {
148 response_channel: tx,
149 request: request.into(),
150 permit: None,
152 };
153
154 if let Err(req) = self.0.connection_tx.send_item(req) {
155 self.0.set_err(PeerError::ClientChannelClosed);
158
159 let resp = Err(PeerError::ClientChannelClosed.into());
160 drop(req.into_inner().unwrap().response_channel.send(resp));
161 }
162
163 rx.into()
164 }
165}