cuprate_p2p_core/
handles.rs1use std::{
6 sync::{Arc, OnceLock},
7 time::Duration,
8};
9
10use tokio::sync::OwnedSemaphorePermit;
11use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
12
13#[derive(Default, Debug)]
15pub struct HandleBuilder {
16 permit: Option<OwnedSemaphorePermit>,
17}
18
19impl HandleBuilder {
20 pub const fn new() -> Self {
22 Self { permit: None }
23 }
24
25 #[must_use]
27 pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
28 self.permit = permit;
29 self
30 }
31
32 pub fn build(self) -> (ConnectionGuard, ConnectionHandle) {
36 let token = CancellationToken::new();
37
38 (
39 ConnectionGuard {
40 token: token.clone(),
41 _permit: self.permit,
42 },
43 ConnectionHandle {
44 token,
45 ban: Arc::new(OnceLock::new()),
46 },
47 )
48 }
49}
50
51#[derive(Debug, Copy, Clone)]
53pub struct BanPeer(pub Duration);
54
55pub struct ConnectionGuard {
57 token: CancellationToken,
58 _permit: Option<OwnedSemaphorePermit>,
59}
60
61impl ConnectionGuard {
62 pub fn should_shutdown(&self) -> WaitForCancellationFutureOwned {
64 self.token.clone().cancelled_owned()
65 }
66 pub fn connection_closed(&self) {
70 self.token.cancel();
71 }
72}
73
74impl Drop for ConnectionGuard {
75 fn drop(&mut self) {
76 self.token.cancel();
77 }
78}
79
80#[derive(Debug, Clone)]
83pub struct ConnectionHandle {
84 token: CancellationToken,
85 ban: Arc<OnceLock<BanPeer>>,
86}
87
88impl ConnectionHandle {
89 pub fn closed(&self) -> WaitForCancellationFutureOwned {
90 self.token.clone().cancelled_owned()
91 }
92 pub fn ban_peer(&self, duration: Duration) {
94 #[expect(
95 clippy::let_underscore_must_use,
96 reason = "error means peer is already banned; fine to ignore"
97 )]
98 let _ = self.ban.set(BanPeer(duration));
99 self.token.cancel();
100 }
101 pub fn is_closed(&self) -> bool {
103 self.token.is_cancelled()
104 }
105 pub fn check_should_ban(&mut self) -> Option<BanPeer> {
107 self.ban.get().copied()
108 }
109 pub fn send_close_signal(&self) {
111 self.token.cancel();
112 }
113}