cuprate_address_book/
book.rs

1//! The address book service.
2//!
3//! This module holds the address book service for a specific network zone.
4use std::{
5    collections::{HashMap, HashSet},
6    panic,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::{
12    future::{ready, Ready},
13    FutureExt,
14};
15use tokio::{
16    task::JoinHandle,
17    time::{interval, Instant, Interval, MissedTickBehavior},
18};
19use tokio_util::time::DelayQueue;
20use tower::Service;
21
22use cuprate_p2p_core::{
23    client::InternalPeerID,
24    handles::ConnectionHandle,
25    services::{AddressBookRequest, AddressBookResponse, ZoneSpecificPeerListEntryBase},
26    NetZoneAddress, NetworkZone,
27};
28use cuprate_pruning::PruningSeed;
29
30use crate::{
31    peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError,
32    BorshNetworkZone,
33};
34
35#[cfg(test)]
36mod tests;
37
38/// An entry in the connected list.
39pub(crate) struct ConnectionPeerEntry<Z: NetworkZone> {
40    addr: Option<Z::Addr>,
41    id: u64,
42    handle: ConnectionHandle,
43    /// The peers pruning seed
44    pruning_seed: PruningSeed,
45    /// The peers port.
46    rpc_port: u16,
47    /// The peers rpc credits per hash
48    rpc_credits_per_hash: u32,
49}
50
51pub struct AddressBook<Z: BorshNetworkZone> {
52    /// Our white peers - the peers we have previously connected to.
53    white_list: PeerList<Z>,
54    /// Our gray peers - the peers we have been told about but haven't connected to.
55    gray_list: PeerList<Z>,
56    /// Our anchor peers - on start up will contain a list of peers we were connected to before shutting down
57    /// after that will contain a list of peers currently connected to that we can reach.
58    anchor_list: HashSet<Z::Addr>,
59    /// The currently connected peers.
60    connected_peers: HashMap<InternalPeerID<Z::Addr>, ConnectionPeerEntry<Z>>,
61    connected_peers_ban_id: HashMap<<Z::Addr as NetZoneAddress>::BanID, HashSet<Z::Addr>>,
62
63    banned_peers: HashMap<<Z::Addr as NetZoneAddress>::BanID, Instant>,
64    banned_peers_queue: DelayQueue<<Z::Addr as NetZoneAddress>::BanID>,
65
66    peer_save_task_handle: Option<JoinHandle<std::io::Result<()>>>,
67    peer_save_interval: Interval,
68
69    cfg: AddressBookConfig,
70}
71
72impl<Z: BorshNetworkZone> AddressBook<Z> {
73    pub fn new(
74        cfg: AddressBookConfig,
75        white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
76        gray_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
77        anchor_peers: Vec<Z::Addr>,
78    ) -> Self {
79        let white_list = PeerList::new(white_peers);
80        let gray_list = PeerList::new(gray_peers);
81        let anchor_list = HashSet::from_iter(anchor_peers);
82
83        // TODO: persist banned peers
84        let banned_peers = HashMap::new();
85        let banned_peers_queue = DelayQueue::new();
86
87        let connected_peers = HashMap::new();
88
89        let mut peer_save_interval = interval(cfg.peer_save_period);
90        peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
91
92        Self {
93            white_list,
94            gray_list,
95            anchor_list,
96            connected_peers,
97            connected_peers_ban_id: HashMap::new(),
98            banned_peers,
99            banned_peers_queue,
100            peer_save_task_handle: None,
101            peer_save_interval,
102            cfg,
103        }
104    }
105
106    fn poll_save_to_disk(&mut self, cx: &mut Context<'_>) {
107        if let Some(handle) = &mut self.peer_save_task_handle {
108            // if we have already spawned a task to save the peer list wait for that to complete.
109            match handle.poll_unpin(cx) {
110                Poll::Pending => return,
111                Poll::Ready(Ok(Err(e))) => {
112                    tracing::error!("Could not save peer list to disk, got error: {e}");
113                }
114                Poll::Ready(Err(e)) => {
115                    if e.is_panic() {
116                        panic::resume_unwind(e.into_panic())
117                    }
118                }
119                Poll::Ready(_) => (),
120            }
121        }
122        // the task is finished.
123        self.peer_save_task_handle = None;
124
125        let Poll::Ready(_) = self.peer_save_interval.poll_tick(cx) else {
126            return;
127        };
128
129        self.peer_save_task_handle = Some(save_peers_to_disk(
130            &self.cfg,
131            &self.white_list,
132            &self.gray_list,
133        ));
134    }
135
136    fn poll_unban_peers(&mut self, cx: &mut Context<'_>) {
137        while let Poll::Ready(Some(ban_id)) = self.banned_peers_queue.poll_expired(cx) {
138            tracing::debug!("Host {:?} is unbanned, ban has expired.", ban_id.get_ref(),);
139            self.banned_peers.remove(ban_id.get_ref());
140        }
141    }
142
143    fn poll_connected_peers(&mut self) {
144        let mut internal_addr_disconnected = Vec::new();
145        let mut addrs_to_ban = Vec::new();
146
147        #[expect(clippy::iter_over_hash_type, reason = "ordering doesn't matter here")]
148        for (internal_addr, peer) in &mut self.connected_peers {
149            if let Some(time) = peer.handle.check_should_ban() {
150                match internal_addr {
151                    InternalPeerID::KnownAddr(addr) => addrs_to_ban.push((*addr, time.0)),
152                    // If we don't know the peers address all we can do is disconnect.
153                    InternalPeerID::Unknown(_) => peer.handle.send_close_signal(),
154                }
155            }
156
157            if peer.handle.is_closed() {
158                internal_addr_disconnected.push(*internal_addr);
159            }
160        }
161
162        for (addr, time) in addrs_to_ban {
163            self.ban_peer(addr, time);
164        }
165
166        for disconnected_addr in internal_addr_disconnected {
167            self.connected_peers.remove(&disconnected_addr);
168            if let InternalPeerID::KnownAddr(addr) = disconnected_addr {
169                // remove the peer from the connected peers with this ban ID.
170                self.connected_peers_ban_id
171                    .get_mut(&addr.ban_id())
172                    .unwrap()
173                    .remove(&addr);
174
175                // If the amount of peers with this ban id is 0 remove the whole set.
176                if self.connected_peers_ban_id[&addr.ban_id()].is_empty() {
177                    self.connected_peers_ban_id.remove(&addr.ban_id());
178                }
179                // remove the peer from the anchor list.
180                self.anchor_list.remove(&addr);
181            }
182        }
183    }
184
185    fn ban_peer(&mut self, addr: Z::Addr, time: Duration) {
186        if self.banned_peers.contains_key(&addr.ban_id()) {
187            tracing::error!("Tried to ban peer twice, this shouldn't happen.");
188        }
189
190        if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) {
191            for peer in connected_peers_with_ban_id.iter().map(|addr| {
192                tracing::debug!("Banning peer: {}, for: {:?}", addr, time);
193
194                self.connected_peers
195                    .get(&InternalPeerID::KnownAddr(*addr))
196                    .expect("Peer must be in connected list if in connected_peers_with_ban_id")
197            }) {
198                // The peer will get removed from our connected list once we disconnect
199                peer.handle.send_close_signal();
200                // Remove the peer now from anchors so we don't accidentally persist a bad anchor peer to disk.
201                self.anchor_list.remove(&addr);
202            }
203        }
204
205        self.white_list.remove_peers_with_ban_id(&addr.ban_id());
206        self.gray_list.remove_peers_with_ban_id(&addr.ban_id());
207
208        let unban_at = Instant::now() + time;
209
210        self.banned_peers_queue.insert_at(addr.ban_id(), unban_at);
211        self.banned_peers.insert(addr.ban_id(), unban_at);
212    }
213
214    /// adds a peer to the gray list.
215    fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
216        if self.white_list.contains_peer(&peer.adr) {
217            tracing::trace!("Peer {} is already in white list skipping.", peer.adr);
218            return;
219        };
220        if !self.gray_list.contains_peer(&peer.adr) {
221            tracing::trace!("Adding peer {} to gray list.", peer.adr);
222            peer.last_seen = 0;
223            self.gray_list.add_new_peer(peer);
224        }
225    }
226
227    /// Checks if a peer is banned.
228    fn is_peer_banned(&self, peer: &Z::Addr) -> bool {
229        self.banned_peers.contains_key(&peer.ban_id())
230    }
231
232    /// Checks when a peer will be unbanned.
233    ///
234    /// - If the peer is banned, this returns [`Some`] containing
235    ///   the [`Instant`] the peer will be unbanned
236    /// - If the peer is not banned, this returns [`None`]
237    fn peer_unban_instant(&self, peer: &Z::Addr) -> Option<Instant> {
238        self.banned_peers.get(&peer.ban_id()).copied()
239    }
240
241    fn handle_incoming_peer_list(
242        &mut self,
243        mut peer_list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
244    ) {
245        tracing::debug!("Received new peer list, length: {}", peer_list.len());
246
247        peer_list.retain_mut(|peer| {
248            peer.adr.make_canonical();
249
250            if peer.adr.should_add_to_peer_list() {
251                !self.is_peer_banned(&peer.adr)
252            } else {
253                false
254            }
255            // TODO: check rpc/ p2p ports not the same
256        });
257
258        for peer in peer_list {
259            self.add_peer_to_gray_list(peer);
260        }
261        // The gray list has no peers we need to keep in the list so just pass an empty HashSet.
262        self.gray_list
263            .reduce_list(&HashSet::new(), self.cfg.max_gray_list_length);
264    }
265
266    fn take_random_white_peer(
267        &mut self,
268        block_needed: Option<usize>,
269    ) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
270        tracing::debug!("Retrieving random white peer");
271        self.white_list
272            .take_random_peer(&mut rand::thread_rng(), block_needed, &self.anchor_list)
273    }
274
275    fn take_random_gray_peer(
276        &mut self,
277        block_needed: Option<usize>,
278    ) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
279        tracing::debug!("Retrieving random gray peer");
280        self.gray_list
281            .take_random_peer(&mut rand::thread_rng(), block_needed, &HashSet::new())
282    }
283
284    fn get_white_peers(&self, len: usize) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
285        tracing::debug!("Retrieving white peers, maximum: {}", len);
286        self.white_list
287            .get_random_peers(&mut rand::thread_rng(), len)
288    }
289
290    /// Updates an entry in the white list, if the peer is not found then
291    /// the peer will be added to the white list.
292    fn update_white_list_peer_entry(
293        &mut self,
294        peer: &ConnectionPeerEntry<Z>,
295    ) -> Result<(), AddressBookError> {
296        let Some(addr) = &peer.addr else {
297            // If the peer isn't reachable we shouldn't add it too our address book.
298            return Ok(());
299        };
300
301        if let Some(peb) = self.white_list.get_peer_mut(addr) {
302            if peb.pruning_seed != peer.pruning_seed {
303                return Err(AddressBookError::PeersDataChanged("Pruning seed"));
304            }
305            // TODO: cuprate doesn't need last seen timestamps but should we have them anyway?
306            peb.last_seen = 0;
307            peb.rpc_port = peer.rpc_port;
308            peb.rpc_credits_per_hash = peer.rpc_credits_per_hash;
309        } else {
310            // if the peer is reachable add it to our white list
311            let peb = ZoneSpecificPeerListEntryBase {
312                id: peer.id,
313                adr: *addr,
314                last_seen: 0,
315                rpc_port: peer.rpc_port,
316                rpc_credits_per_hash: peer.rpc_credits_per_hash,
317                pruning_seed: peer.pruning_seed,
318            };
319            self.white_list.add_new_peer(peb);
320        }
321        Ok(())
322    }
323
324    fn handle_new_connection(
325        &mut self,
326        internal_peer_id: InternalPeerID<Z::Addr>,
327        peer: ConnectionPeerEntry<Z>,
328    ) -> Result<(), AddressBookError> {
329        if self.connected_peers.contains_key(&internal_peer_id) {
330            return Err(AddressBookError::PeerAlreadyConnected);
331        }
332
333        // If we know the address then check if it's banned.
334        if let InternalPeerID::KnownAddr(addr) = &internal_peer_id {
335            if self.is_peer_banned(addr) {
336                return Err(AddressBookError::PeerIsBanned);
337            }
338            // although the peer may not be reachable still add it to the connected peers with ban ID.
339            self.connected_peers_ban_id
340                .entry(addr.ban_id())
341                .or_default()
342                .insert(*addr);
343        }
344
345        // if the address is Some that means we can reach it from our node.
346        if let Some(addr) = peer.addr {
347            // The peer is reachable, update our white list and add it to the anchor connections.
348            self.update_white_list_peer_entry(&peer)?;
349            self.anchor_list.insert(addr);
350            self.white_list
351                .reduce_list(&self.anchor_list, self.cfg.max_white_list_length);
352        }
353
354        self.connected_peers.insert(internal_peer_id, peer);
355        Ok(())
356    }
357}
358
359impl<Z: BorshNetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
360    type Response = AddressBookResponse<Z>;
361    type Error = AddressBookError;
362    type Future = Ready<Result<Self::Response, Self::Error>>;
363
364    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365        self.poll_unban_peers(cx);
366        self.poll_save_to_disk(cx);
367        self.poll_connected_peers();
368        Poll::Ready(Ok(()))
369    }
370
371    fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future {
372        let span = tracing::info_span!("AddressBook");
373        let _guard = span.enter();
374
375        let response = match req {
376            AddressBookRequest::NewConnection {
377                internal_peer_id,
378                public_address,
379                handle,
380                id,
381                pruning_seed,
382                rpc_port,
383                rpc_credits_per_hash,
384            } => self
385                .handle_new_connection(
386                    internal_peer_id,
387                    ConnectionPeerEntry {
388                        addr: public_address,
389                        id,
390                        handle,
391                        pruning_seed,
392                        rpc_port,
393                        rpc_credits_per_hash,
394                    },
395                )
396                .map(|()| AddressBookResponse::Ok),
397            AddressBookRequest::IncomingPeerList(peer_list) => {
398                self.handle_incoming_peer_list(peer_list);
399                Ok(AddressBookResponse::Ok)
400            }
401            AddressBookRequest::TakeRandomWhitePeer { height } => self
402                .take_random_white_peer(height)
403                .map(AddressBookResponse::Peer)
404                .ok_or(AddressBookError::PeerNotFound),
405            AddressBookRequest::TakeRandomGrayPeer { height } => self
406                .take_random_gray_peer(height)
407                .map(AddressBookResponse::Peer)
408                .ok_or(AddressBookError::PeerNotFound),
409            AddressBookRequest::TakeRandomPeer { height } => self
410                .take_random_white_peer(height)
411                .or_else(|| self.take_random_gray_peer(height))
412                .map(AddressBookResponse::Peer)
413                .ok_or(AddressBookError::PeerNotFound),
414            AddressBookRequest::GetWhitePeers(len) => {
415                Ok(AddressBookResponse::Peers(self.get_white_peers(len)))
416            }
417            AddressBookRequest::GetBan(addr) => Ok(AddressBookResponse::GetBan {
418                unban_instant: self.peer_unban_instant(&addr).map(Instant::into_std),
419            }),
420            AddressBookRequest::PeerlistSize
421            | AddressBookRequest::ConnectionCount
422            | AddressBookRequest::SetBan(_)
423            | AddressBookRequest::GetBans
424            | AddressBookRequest::ConnectionInfo => {
425                todo!("finish https://github.com/Cuprate/cuprate/pull/297")
426            }
427        };
428
429        ready(response)
430    }
431}