cuprate_p2p_core/
handles.rs

1//! Connection Handles.
2//!
3//! This module contains the [`ConnectionHandle`] which allows banning a peer, disconnecting a peer and
4//! checking if the peer is still connected.
5use std::{
6    sync::{Arc, OnceLock},
7    time::Duration,
8};
9
10use tokio::sync::OwnedSemaphorePermit;
11use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
12
13/// A [`ConnectionHandle`] builder.
14#[derive(Default, Debug)]
15pub struct HandleBuilder {
16    permit: Option<OwnedSemaphorePermit>,
17}
18
19impl HandleBuilder {
20    /// Create a new builder.
21    pub const fn new() -> Self {
22        Self { permit: None }
23    }
24
25    /// Sets the permit for this connection.
26    #[must_use]
27    pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
28        self.permit = permit;
29        self
30    }
31
32    /// Builds the [`ConnectionGuard`] which should be handed to the connection task and the [`ConnectionHandle`].
33    ///
34    /// This will panic if a permit was not set [`HandleBuilder::with_permit`]
35    pub fn build(self) -> (ConnectionGuard, ConnectionHandle) {
36        let token = CancellationToken::new();
37
38        (
39            ConnectionGuard {
40                token: token.clone(),
41                _permit: self.permit,
42            },
43            ConnectionHandle {
44                token,
45                ban: Arc::new(OnceLock::new()),
46            },
47        )
48    }
49}
50
51/// A struct representing the time a peer should be banned for.
52#[derive(Debug, Copy, Clone)]
53pub struct BanPeer(pub Duration);
54
55/// A struct given to the connection task.
56pub struct ConnectionGuard {
57    token: CancellationToken,
58    _permit: Option<OwnedSemaphorePermit>,
59}
60
61impl ConnectionGuard {
62    /// Checks if we should close the connection.
63    pub fn should_shutdown(&self) -> WaitForCancellationFutureOwned {
64        self.token.clone().cancelled_owned()
65    }
66    /// Tell the corresponding [`ConnectionHandle`]s that this connection is closed.
67    ///
68    /// This will be called on [`Drop::drop`].
69    pub fn connection_closed(&self) {
70        self.token.cancel();
71    }
72}
73
74impl Drop for ConnectionGuard {
75    fn drop(&mut self) {
76        self.token.cancel();
77    }
78}
79
80/// A handle given to a task that needs to ban, disconnect, check if the peer should be banned or check
81/// the peer is still connected.
82#[derive(Debug, Clone)]
83pub struct ConnectionHandle {
84    token: CancellationToken,
85    ban: Arc<OnceLock<BanPeer>>,
86}
87
88impl ConnectionHandle {
89    pub fn closed(&self) -> WaitForCancellationFutureOwned {
90        self.token.clone().cancelled_owned()
91    }
92    /// Bans the peer for the given `duration`.
93    pub fn ban_peer(&self, duration: Duration) {
94        #[expect(
95            clippy::let_underscore_must_use,
96            reason = "error means peer is already banned; fine to ignore"
97        )]
98        let _ = self.ban.set(BanPeer(duration));
99        self.token.cancel();
100    }
101    /// Checks if this connection is closed.
102    pub fn is_closed(&self) -> bool {
103        self.token.is_cancelled()
104    }
105    /// Returns if this peer has been banned and the [`Duration`] of that ban.
106    pub fn check_should_ban(&mut self) -> Option<BanPeer> {
107        self.ban.get().copied()
108    }
109    /// Sends the signal to the connection task to disconnect.
110    pub fn send_close_signal(&self) {
111        self.token.cancel();
112    }
113}