cuprate_p2p_transport/
tor.rs1use std::{
9 io::{self, ErrorKind},
10 net::{IpAddr, SocketAddr},
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use tokio::net::{
18 tcp::{OwnedReadHalf, OwnedWriteHalf},
19 TcpListener,
20};
21use tokio_socks::tcp::Socks5Stream;
22use tokio_util::codec::{FramedRead, FramedWrite};
23
24use cuprate_p2p_core::{NetworkZone, Tor, Transport};
25use cuprate_wire::MoneroWireCodec;
26
27#[derive(Clone, Copy)]
30pub struct DaemonClientConfig {
31 pub tor_daemon: SocketAddr,
33}
34
35#[derive(Clone, Copy)]
36pub struct DaemonServerConfig {
37 pub ip: IpAddr,
39
40 pub port: u16,
42}
43
44pub struct DaemonInboundStream {
48 listener: TcpListener,
49}
50
51impl Stream for DaemonInboundStream {
52 type Item = Result<
53 (
54 Option<<Tor as NetworkZone>::Addr>,
55 FramedRead<OwnedReadHalf, MoneroWireCodec>,
56 FramedWrite<OwnedWriteHalf, MoneroWireCodec>,
57 ),
58 io::Error,
59 >;
60
61 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 self.listener
63 .poll_accept(cx)
64 .map_ok(|(stream, _)| {
65 let (stream, sink) = stream.into_split();
66
67 (
68 None, FramedRead::new(stream, MoneroWireCodec::default()),
70 FramedWrite::new(sink, MoneroWireCodec::default()),
71 )
72 })
73 .map(Some)
74 }
75}
76
77#[derive(Clone, Copy)]
78pub struct Daemon;
79
80#[async_trait]
81impl Transport<Tor> for Daemon {
82 type ClientConfig = DaemonClientConfig;
83 type ServerConfig = DaemonServerConfig;
84
85 type Stream = FramedRead<OwnedReadHalf, MoneroWireCodec>;
86 type Sink = FramedWrite<OwnedWriteHalf, MoneroWireCodec>;
87 type Listener = DaemonInboundStream;
88
89 async fn connect_to_peer(
90 addr: <Tor as NetworkZone>::Addr,
91 config: &Self::ClientConfig,
92 ) -> Result<(Self::Stream, Self::Sink), io::Error> {
93 Socks5Stream::connect(config.tor_daemon, addr.to_string())
94 .await
95 .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
96 .map(|stream| {
97 let (stream, sink) = stream.into_inner().into_split();
98 (
99 FramedRead::new(stream, MoneroWireCodec::default()),
100 FramedWrite::new(sink, MoneroWireCodec::default()),
101 )
102 })
103 }
104
105 async fn incoming_connection_listener(
106 config: Self::ServerConfig,
107 ) -> Result<Self::Listener, io::Error> {
108 let listener = TcpListener::bind((config.ip, config.port)).await?;
109
110 Ok(DaemonInboundStream { listener })
111 }
112}