cuprate_p2p_core/transports/
tcp.rs
1use std::{
10 net::{Ipv4Addr, Ipv6Addr, SocketAddr},
11 pin::Pin,
12 task::{Context, Poll},
13 time::Duration,
14};
15
16use futures::Stream;
17use tokio::net::{
18 tcp::{OwnedReadHalf, OwnedWriteHalf},
19 TcpListener, TcpStream,
20};
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23use cuprate_wire::MoneroWireCodec;
24
25use crate::{NetworkZone, Transport};
26
27#[derive(Debug, Clone, Copy, Default)]
29pub struct Tcp;
30
31#[derive(Debug, Clone)]
32pub struct TcpServerConfig {
34 pub ipv4: Option<Ipv4Addr>,
36 pub ipv6: Option<Ipv6Addr>,
38
39 pub port: u16,
41
42 _send_timeout: Duration,
44}
45
46impl Default for TcpServerConfig {
47 fn default() -> Self {
48 Self {
49 ipv4: Some(Ipv4Addr::UNSPECIFIED),
50 ipv6: None,
51 port: 18081,
52 _send_timeout: Duration::from_secs(20),
53 }
54 }
55}
56
57pub struct TcpInBoundStream {
59 listener_v4: Option<TcpListener>,
61 listener_v6: Option<TcpListener>,
63}
64
65impl Stream for TcpInBoundStream {
66 type Item = Result<
67 (
68 Option<SocketAddr>,
69 FramedRead<OwnedReadHalf, MoneroWireCodec>,
70 FramedWrite<OwnedWriteHalf, MoneroWireCodec>,
71 ),
72 std::io::Error,
73 >;
74
75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 self.listener_v4
79 .as_ref()
80 .and_then(|l| match l.poll_accept(cx) {
81 Poll::Ready(r) => Some(Poll::Ready(r)),
82 Poll::Pending => None,
83 })
84 .or(self
85 .listener_v6
86 .as_ref()
87 .and_then(|l| match l.poll_accept(cx) {
88 Poll::Ready(r) => Some(Poll::Ready(r)),
89 Poll::Pending => None,
90 }))
91 .unwrap_or(Poll::Pending)
92 .map_ok(|(stream, mut addr)| {
93 let ip = addr.ip().to_canonical();
94 addr.set_ip(ip);
95
96 let (read, write) = stream.into_split();
97 (
98 Some(addr),
99 FramedRead::new(read, MoneroWireCodec::default()),
100 FramedWrite::new(write, MoneroWireCodec::default()),
101 )
102 })
103 .map(Some)
104 }
105}
106
107#[async_trait::async_trait]
108impl<Z: NetworkZone<Addr = SocketAddr>> Transport<Z> for Tcp {
109 type ClientConfig = ();
110 type ServerConfig = TcpServerConfig;
111
112 type Stream = FramedRead<OwnedReadHalf, MoneroWireCodec>;
113 type Sink = FramedWrite<OwnedWriteHalf, MoneroWireCodec>;
114 type Listener = TcpInBoundStream;
115
116 async fn connect_to_peer(
117 addr: Z::Addr,
118 _config: &Self::ClientConfig,
119 ) -> Result<(Self::Stream, Self::Sink), std::io::Error> {
120 let (read, write) = TcpStream::connect(addr).await?.into_split();
121 Ok((
122 FramedRead::new(read, MoneroWireCodec::default()),
123 FramedWrite::new(write, MoneroWireCodec::default()),
124 ))
125 }
126
127 async fn incoming_connection_listener(
128 config: Self::ServerConfig,
129 ) -> Result<Self::Listener, std::io::Error> {
130 let ipv4_listener = if let Some(ipv4) = config.ipv4 {
132 Some(TcpListener::bind(SocketAddr::new(ipv4.into(), config.port)).await?)
133 } else {
134 None
135 };
136
137 #[cfg(target_os = "linux")]
139 if config.ipv4 == Some(Ipv4Addr::UNSPECIFIED) && config.ipv6 == Some(Ipv6Addr::UNSPECIFIED)
140 {
141 return Ok(TcpInBoundStream {
142 listener_v4: ipv4_listener,
143 listener_v6: None,
144 });
145 }
146
147 let ipv6_listener = if let Some(ipv6) = config.ipv6 {
148 Some(TcpListener::bind(SocketAddr::new(ipv6.into(), config.port)).await?)
149 } else {
150 None
151 };
152
153 Ok(TcpInBoundStream {
154 listener_v4: ipv4_listener,
155 listener_v6: ipv6_listener,
156 })
157 }
158}