cuprate_p2p_core/
handles.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Connection Handles.
//!
//! This module contains the [`ConnectionHandle`] which allows banning a peer, disconnecting a peer and
//! checking if the peer is still connected.
use std::{
    sync::{Arc, OnceLock},
    time::Duration,
};

use tokio::sync::OwnedSemaphorePermit;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};

/// A [`ConnectionHandle`] builder.
#[derive(Default, Debug)]
pub struct HandleBuilder {
    permit: Option<OwnedSemaphorePermit>,
}

impl HandleBuilder {
    /// Create a new builder.
    pub const fn new() -> Self {
        Self { permit: None }
    }

    /// Sets the permit for this connection.
    #[must_use]
    pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
        self.permit = permit;
        self
    }

    /// Builds the [`ConnectionGuard`] which should be handed to the connection task and the [`ConnectionHandle`].
    ///
    /// This will panic if a permit was not set [`HandleBuilder::with_permit`]
    pub fn build(self) -> (ConnectionGuard, ConnectionHandle) {
        let token = CancellationToken::new();

        (
            ConnectionGuard {
                token: token.clone(),
                _permit: self.permit,
            },
            ConnectionHandle {
                token,
                ban: Arc::new(OnceLock::new()),
            },
        )
    }
}

/// A struct representing the time a peer should be banned for.
#[derive(Debug, Copy, Clone)]
pub struct BanPeer(pub Duration);

/// A struct given to the connection task.
pub struct ConnectionGuard {
    token: CancellationToken,
    _permit: Option<OwnedSemaphorePermit>,
}

impl ConnectionGuard {
    /// Checks if we should close the connection.
    pub fn should_shutdown(&self) -> bool {
        self.token.is_cancelled()
    }
    /// Tell the corresponding [`ConnectionHandle`]s that this connection is closed.
    ///
    /// This will be called on [`Drop::drop`].
    pub fn connection_closed(&self) {
        self.token.cancel();
    }
}

impl Drop for ConnectionGuard {
    fn drop(&mut self) {
        self.token.cancel();
    }
}

/// A handle given to a task that needs to ban, disconnect, check if the peer should be banned or check
/// the peer is still connected.
#[derive(Debug, Clone)]
pub struct ConnectionHandle {
    token: CancellationToken,
    ban: Arc<OnceLock<BanPeer>>,
}

impl ConnectionHandle {
    pub fn closed(&self) -> WaitForCancellationFutureOwned {
        self.token.clone().cancelled_owned()
    }
    /// Bans the peer for the given `duration`.
    pub fn ban_peer(&self, duration: Duration) {
        #[expect(
            clippy::let_underscore_must_use,
            reason = "error means peer is already banned; fine to ignore"
        )]
        let _ = self.ban.set(BanPeer(duration));
        self.token.cancel();
    }
    /// Checks if this connection is closed.
    pub fn is_closed(&self) -> bool {
        self.token.is_cancelled()
    }
    /// Returns if this peer has been banned and the [`Duration`] of that ban.
    pub fn check_should_ban(&mut self) -> Option<BanPeer> {
        self.ban.get().copied()
    }
    /// Sends the signal to the connection task to disconnect.
    pub fn send_close_signal(&self) {
        self.token.cancel();
    }
}