1use std::{
5 collections::{HashMap, HashSet},
6 panic,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::{
12 future::{ready, Ready},
13 FutureExt,
14};
15use tokio::{
16 task::JoinHandle,
17 time::{interval, Instant, Interval, MissedTickBehavior},
18};
19use tokio_util::time::DelayQueue;
20use tower::Service;
21
22use cuprate_p2p_core::{
23 client::InternalPeerID,
24 handles::ConnectionHandle,
25 services::{AddressBookRequest, AddressBookResponse, ZoneSpecificPeerListEntryBase},
26 NetZoneAddress, NetworkZone,
27};
28use cuprate_pruning::PruningSeed;
29
30use crate::{
31 peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError,
32 BorshNetworkZone,
33};
34
35#[cfg(test)]
36mod tests;
37
38pub(crate) struct ConnectionPeerEntry<Z: NetworkZone> {
40 addr: Option<Z::Addr>,
41 id: u64,
42 handle: ConnectionHandle,
43 pruning_seed: PruningSeed,
45 rpc_port: u16,
47 rpc_credits_per_hash: u32,
49}
50
51pub struct AddressBook<Z: BorshNetworkZone> {
52 white_list: PeerList<Z>,
54 gray_list: PeerList<Z>,
56 anchor_list: HashSet<Z::Addr>,
59 connected_peers: HashMap<InternalPeerID<Z::Addr>, ConnectionPeerEntry<Z>>,
61 connected_peers_ban_id: HashMap<<Z::Addr as NetZoneAddress>::BanID, HashSet<Z::Addr>>,
62
63 banned_peers: HashMap<<Z::Addr as NetZoneAddress>::BanID, Instant>,
64 banned_peers_queue: DelayQueue<<Z::Addr as NetZoneAddress>::BanID>,
65
66 peer_save_task_handle: Option<JoinHandle<std::io::Result<()>>>,
67 peer_save_interval: Interval,
68
69 cfg: AddressBookConfig,
70}
71
72impl<Z: BorshNetworkZone> AddressBook<Z> {
73 pub fn new(
74 cfg: AddressBookConfig,
75 white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
76 gray_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
77 anchor_peers: Vec<Z::Addr>,
78 ) -> Self {
79 let white_list = PeerList::new(white_peers);
80 let gray_list = PeerList::new(gray_peers);
81 let anchor_list = HashSet::from_iter(anchor_peers);
82
83 let banned_peers = HashMap::new();
85 let banned_peers_queue = DelayQueue::new();
86
87 let connected_peers = HashMap::new();
88
89 let mut peer_save_interval = interval(cfg.peer_save_period);
90 peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
91
92 Self {
93 white_list,
94 gray_list,
95 anchor_list,
96 connected_peers,
97 connected_peers_ban_id: HashMap::new(),
98 banned_peers,
99 banned_peers_queue,
100 peer_save_task_handle: None,
101 peer_save_interval,
102 cfg,
103 }
104 }
105
106 fn poll_save_to_disk(&mut self, cx: &mut Context<'_>) {
107 if let Some(handle) = &mut self.peer_save_task_handle {
108 match handle.poll_unpin(cx) {
110 Poll::Pending => return,
111 Poll::Ready(Ok(Err(e))) => {
112 tracing::error!("Could not save peer list to disk, got error: {e}");
113 }
114 Poll::Ready(Err(e)) => {
115 if e.is_panic() {
116 panic::resume_unwind(e.into_panic())
117 }
118 }
119 Poll::Ready(_) => (),
120 }
121 }
122 self.peer_save_task_handle = None;
124
125 let Poll::Ready(_) = self.peer_save_interval.poll_tick(cx) else {
126 return;
127 };
128
129 self.peer_save_task_handle = Some(save_peers_to_disk(
130 &self.cfg,
131 &self.white_list,
132 &self.gray_list,
133 ));
134 }
135
136 fn poll_unban_peers(&mut self, cx: &mut Context<'_>) {
137 while let Poll::Ready(Some(ban_id)) = self.banned_peers_queue.poll_expired(cx) {
138 tracing::debug!("Host {:?} is unbanned, ban has expired.", ban_id.get_ref(),);
139 self.banned_peers.remove(ban_id.get_ref());
140 }
141 }
142
143 fn poll_connected_peers(&mut self) {
144 let mut internal_addr_disconnected = Vec::new();
145 let mut addrs_to_ban = Vec::new();
146
147 #[expect(clippy::iter_over_hash_type, reason = "ordering doesn't matter here")]
148 for (internal_addr, peer) in &mut self.connected_peers {
149 if let Some(time) = peer.handle.check_should_ban() {
150 match internal_addr {
151 InternalPeerID::KnownAddr(addr) => addrs_to_ban.push((*addr, time.0)),
152 InternalPeerID::Unknown(_) => peer.handle.send_close_signal(),
154 }
155 }
156
157 if peer.handle.is_closed() {
158 internal_addr_disconnected.push(*internal_addr);
159 }
160 }
161
162 for (addr, time) in addrs_to_ban {
163 self.ban_peer(addr, time);
164 }
165
166 for disconnected_addr in internal_addr_disconnected {
167 self.connected_peers.remove(&disconnected_addr);
168 if let InternalPeerID::KnownAddr(addr) = disconnected_addr {
169 self.connected_peers_ban_id
171 .get_mut(&addr.ban_id())
172 .unwrap()
173 .remove(&addr);
174
175 if self.connected_peers_ban_id[&addr.ban_id()].is_empty() {
177 self.connected_peers_ban_id.remove(&addr.ban_id());
178 }
179 self.anchor_list.remove(&addr);
181 }
182 }
183 }
184
185 fn ban_peer(&mut self, addr: Z::Addr, time: Duration) {
186 if self.banned_peers.contains_key(&addr.ban_id()) {
187 tracing::error!("Tried to ban peer twice, this shouldn't happen.");
188 }
189
190 if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) {
191 for peer in connected_peers_with_ban_id.iter().map(|addr| {
192 tracing::debug!("Banning peer: {}, for: {:?}", addr, time);
193
194 self.connected_peers
195 .get(&InternalPeerID::KnownAddr(*addr))
196 .expect("Peer must be in connected list if in connected_peers_with_ban_id")
197 }) {
198 peer.handle.send_close_signal();
200 self.anchor_list.remove(&addr);
202 }
203 }
204
205 self.white_list.remove_peers_with_ban_id(&addr.ban_id());
206 self.gray_list.remove_peers_with_ban_id(&addr.ban_id());
207
208 let unban_at = Instant::now() + time;
209
210 self.banned_peers_queue.insert_at(addr.ban_id(), unban_at);
211 self.banned_peers.insert(addr.ban_id(), unban_at);
212 }
213
214 fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
216 if self.white_list.contains_peer(&peer.adr) {
217 tracing::trace!("Peer {} is already in white list skipping.", peer.adr);
218 return;
219 };
220 if !self.gray_list.contains_peer(&peer.adr) {
221 tracing::trace!("Adding peer {} to gray list.", peer.adr);
222 peer.last_seen = 0;
223 self.gray_list.add_new_peer(peer);
224 }
225 }
226
227 fn is_peer_banned(&self, peer: &Z::Addr) -> bool {
229 self.banned_peers.contains_key(&peer.ban_id())
230 }
231
232 fn peer_unban_instant(&self, peer: &Z::Addr) -> Option<Instant> {
238 self.banned_peers.get(&peer.ban_id()).copied()
239 }
240
241 fn handle_incoming_peer_list(
242 &mut self,
243 mut peer_list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
244 ) {
245 tracing::debug!("Received new peer list, length: {}", peer_list.len());
246
247 peer_list.retain_mut(|peer| {
248 peer.adr.make_canonical();
249
250 if peer.adr.should_add_to_peer_list() {
251 !self.is_peer_banned(&peer.adr)
252 } else {
253 false
254 }
255 });
257
258 for peer in peer_list {
259 self.add_peer_to_gray_list(peer);
260 }
261 self.gray_list
263 .reduce_list(&HashSet::new(), self.cfg.max_gray_list_length);
264 }
265
266 fn take_random_white_peer(
267 &mut self,
268 block_needed: Option<usize>,
269 ) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
270 tracing::debug!("Retrieving random white peer");
271 self.white_list
272 .take_random_peer(&mut rand::thread_rng(), block_needed, &self.anchor_list)
273 }
274
275 fn take_random_gray_peer(
276 &mut self,
277 block_needed: Option<usize>,
278 ) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
279 tracing::debug!("Retrieving random gray peer");
280 self.gray_list
281 .take_random_peer(&mut rand::thread_rng(), block_needed, &HashSet::new())
282 }
283
284 fn get_white_peers(&self, len: usize) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
285 tracing::debug!("Retrieving white peers, maximum: {}", len);
286 self.white_list
287 .get_random_peers(&mut rand::thread_rng(), len)
288 }
289
290 fn update_white_list_peer_entry(
293 &mut self,
294 peer: &ConnectionPeerEntry<Z>,
295 ) -> Result<(), AddressBookError> {
296 let Some(addr) = &peer.addr else {
297 return Ok(());
299 };
300
301 if let Some(peb) = self.white_list.get_peer_mut(addr) {
302 if peb.pruning_seed != peer.pruning_seed {
303 return Err(AddressBookError::PeersDataChanged("Pruning seed"));
304 }
305 peb.last_seen = 0;
307 peb.rpc_port = peer.rpc_port;
308 peb.rpc_credits_per_hash = peer.rpc_credits_per_hash;
309 } else {
310 let peb = ZoneSpecificPeerListEntryBase {
312 id: peer.id,
313 adr: *addr,
314 last_seen: 0,
315 rpc_port: peer.rpc_port,
316 rpc_credits_per_hash: peer.rpc_credits_per_hash,
317 pruning_seed: peer.pruning_seed,
318 };
319 self.white_list.add_new_peer(peb);
320 }
321 Ok(())
322 }
323
324 fn handle_new_connection(
325 &mut self,
326 internal_peer_id: InternalPeerID<Z::Addr>,
327 peer: ConnectionPeerEntry<Z>,
328 ) -> Result<(), AddressBookError> {
329 if self.connected_peers.contains_key(&internal_peer_id) {
330 return Err(AddressBookError::PeerAlreadyConnected);
331 }
332
333 if let InternalPeerID::KnownAddr(addr) = &internal_peer_id {
335 if self.is_peer_banned(addr) {
336 return Err(AddressBookError::PeerIsBanned);
337 }
338 self.connected_peers_ban_id
340 .entry(addr.ban_id())
341 .or_default()
342 .insert(*addr);
343 }
344
345 if let Some(addr) = peer.addr {
347 self.update_white_list_peer_entry(&peer)?;
349 self.anchor_list.insert(addr);
350 self.white_list
351 .reduce_list(&self.anchor_list, self.cfg.max_white_list_length);
352 }
353
354 self.connected_peers.insert(internal_peer_id, peer);
355 Ok(())
356 }
357}
358
359impl<Z: BorshNetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
360 type Response = AddressBookResponse<Z>;
361 type Error = AddressBookError;
362 type Future = Ready<Result<Self::Response, Self::Error>>;
363
364 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365 self.poll_unban_peers(cx);
366 self.poll_save_to_disk(cx);
367 self.poll_connected_peers();
368 Poll::Ready(Ok(()))
369 }
370
371 fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future {
372 let span = tracing::info_span!("AddressBook");
373 let _guard = span.enter();
374
375 let response = match req {
376 AddressBookRequest::NewConnection {
377 internal_peer_id,
378 public_address,
379 handle,
380 id,
381 pruning_seed,
382 rpc_port,
383 rpc_credits_per_hash,
384 } => self
385 .handle_new_connection(
386 internal_peer_id,
387 ConnectionPeerEntry {
388 addr: public_address,
389 id,
390 handle,
391 pruning_seed,
392 rpc_port,
393 rpc_credits_per_hash,
394 },
395 )
396 .map(|()| AddressBookResponse::Ok),
397 AddressBookRequest::IncomingPeerList(peer_list) => {
398 self.handle_incoming_peer_list(peer_list);
399 Ok(AddressBookResponse::Ok)
400 }
401 AddressBookRequest::TakeRandomWhitePeer { height } => self
402 .take_random_white_peer(height)
403 .map(AddressBookResponse::Peer)
404 .ok_or(AddressBookError::PeerNotFound),
405 AddressBookRequest::TakeRandomGrayPeer { height } => self
406 .take_random_gray_peer(height)
407 .map(AddressBookResponse::Peer)
408 .ok_or(AddressBookError::PeerNotFound),
409 AddressBookRequest::TakeRandomPeer { height } => self
410 .take_random_white_peer(height)
411 .or_else(|| self.take_random_gray_peer(height))
412 .map(AddressBookResponse::Peer)
413 .ok_or(AddressBookError::PeerNotFound),
414 AddressBookRequest::GetWhitePeers(len) => {
415 Ok(AddressBookResponse::Peers(self.get_white_peers(len)))
416 }
417 AddressBookRequest::GetBan(addr) => Ok(AddressBookResponse::GetBan {
418 unban_instant: self.peer_unban_instant(&addr).map(Instant::into_std),
419 }),
420 AddressBookRequest::PeerlistSize
421 | AddressBookRequest::ConnectionCount
422 | AddressBookRequest::SetBan(_)
423 | AddressBookRequest::GetBans
424 | AddressBookRequest::ConnectionInfo => {
425 todo!("finish https://github.com/Cuprate/cuprate/pull/297")
426 }
427 };
428
429 ready(response)
430 }
431}