cuprate_p2p_core/transports/
tcp.rs

1//! TCP Transport
2//!
3//! This module defines the default transport method used by `monerod`: TCP Sockets.
4//!
5//! Since TCP Sockets can only connect to IP addresses, it's implementation is constrained
6//! to `Z: NetworkZone<Addr = SocketAddr>`
7//!
8
9use std::{
10    net::{Ipv4Addr, Ipv6Addr, SocketAddr},
11    pin::Pin,
12    task::{Context, Poll},
13    time::Duration,
14};
15
16use futures::Stream;
17use tokio::net::{
18    tcp::{OwnedReadHalf, OwnedWriteHalf},
19    TcpListener, TcpStream,
20};
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23use cuprate_wire::MoneroWireCodec;
24
25use crate::{NetworkZone, Transport};
26
27/// Classic, TCP Socket based default transport.
28#[derive(Debug, Clone, Copy, Default)]
29pub struct Tcp;
30
31#[derive(Debug, Clone)]
32/// Mandatory parameters for starting the TCP p2p inbound listener
33pub struct TcpServerConfig {
34    /// Listening IPv4 Address.
35    pub ipv4: Option<Ipv4Addr>,
36    /// Listening IPv6 Address.
37    pub ipv6: Option<Ipv6Addr>,
38
39    /// Listening IPv4 Port.
40    pub port: u16,
41
42    /// Number of milliseconds before timeout at TCP writing
43    _send_timeout: Duration,
44}
45
46impl Default for TcpServerConfig {
47    fn default() -> Self {
48        Self {
49            ipv4: Some(Ipv4Addr::UNSPECIFIED),
50            ipv6: None,
51            port: 18081,
52            _send_timeout: Duration::from_secs(20),
53        }
54    }
55}
56
57/// A set of listener to which new peers can connect to
58pub struct TcpInBoundStream {
59    /// IPv4 TCP listener
60    listener_v4: Option<TcpListener>,
61    /// IPv6 TCP listener
62    listener_v6: Option<TcpListener>,
63}
64
65impl Stream for TcpInBoundStream {
66    type Item = Result<
67        (
68            Option<SocketAddr>,
69            FramedRead<OwnedReadHalf, MoneroWireCodec>,
70            FramedWrite<OwnedWriteHalf, MoneroWireCodec>,
71        ),
72        std::io::Error,
73    >;
74
75    /// SAFETY: Caller must ensure that at least one of the listener is `Some`, otherwise this function
76    /// will always return `Poll::Pending`
77    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        self.listener_v4
79            .as_ref()
80            .and_then(|l| match l.poll_accept(cx) {
81                Poll::Ready(r) => Some(Poll::Ready(r)),
82                Poll::Pending => None,
83            })
84            .or(self
85                .listener_v6
86                .as_ref()
87                .and_then(|l| match l.poll_accept(cx) {
88                    Poll::Ready(r) => Some(Poll::Ready(r)),
89                    Poll::Pending => None,
90                }))
91            .unwrap_or(Poll::Pending)
92            .map_ok(|(stream, mut addr)| {
93                let ip = addr.ip().to_canonical();
94                addr.set_ip(ip);
95
96                let (read, write) = stream.into_split();
97                (
98                    Some(addr),
99                    FramedRead::new(read, MoneroWireCodec::default()),
100                    FramedWrite::new(write, MoneroWireCodec::default()),
101                )
102            })
103            .map(Some)
104    }
105}
106
107#[async_trait::async_trait]
108impl<Z: NetworkZone<Addr = SocketAddr>> Transport<Z> for Tcp {
109    type ClientConfig = ();
110    type ServerConfig = TcpServerConfig;
111
112    type Stream = FramedRead<OwnedReadHalf, MoneroWireCodec>;
113    type Sink = FramedWrite<OwnedWriteHalf, MoneroWireCodec>;
114    type Listener = TcpInBoundStream;
115
116    async fn connect_to_peer(
117        addr: Z::Addr,
118        _config: &Self::ClientConfig,
119    ) -> Result<(Self::Stream, Self::Sink), std::io::Error> {
120        let (read, write) = TcpStream::connect(addr).await?.into_split();
121        Ok((
122            FramedRead::new(read, MoneroWireCodec::default()),
123            FramedWrite::new(write, MoneroWireCodec::default()),
124        ))
125    }
126
127    async fn incoming_connection_listener(
128        config: Self::ServerConfig,
129    ) -> Result<Self::Listener, std::io::Error> {
130        // Start up the IPv4/6 listeners
131        let ipv4_listener = if let Some(ipv4) = config.ipv4 {
132            Some(TcpListener::bind(SocketAddr::new(ipv4.into(), config.port)).await?)
133        } else {
134            None
135        };
136
137        // Linux INADDR_ANY bind all local interfaces regardless of the IP versioning.
138        #[cfg(target_os = "linux")]
139        if config.ipv4 == Some(Ipv4Addr::UNSPECIFIED) && config.ipv6 == Some(Ipv6Addr::UNSPECIFIED)
140        {
141            return Ok(TcpInBoundStream {
142                listener_v4: ipv4_listener,
143                listener_v6: None,
144            });
145        }
146
147        let ipv6_listener = if let Some(ipv6) = config.ipv6 {
148            Some(TcpListener::bind(SocketAddr::new(ipv6.into(), config.port)).await?)
149        } else {
150            None
151        };
152
153        Ok(TcpInBoundStream {
154            listener_v4: ipv4_listener,
155            listener_v6: ipv6_listener,
156        })
157    }
158}