cuprate_p2p_transport/
tor.rs

1//! Tor Daemon Transport
2//!
3//! This module defines a transport method for the `Tor` network zone using an external Tor daemon supporting SOCKS5.
4//!
5
6//---------------------------------------------------------------------------------------------------- Imports
7
8use std::{
9    io::{self, ErrorKind},
10    net::{IpAddr, SocketAddr},
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use tokio::net::{
18    tcp::{OwnedReadHalf, OwnedWriteHalf},
19    TcpListener,
20};
21use tokio_socks::tcp::Socks5Stream;
22use tokio_util::codec::{FramedRead, FramedWrite};
23
24use cuprate_p2p_core::{NetworkZone, Tor, Transport};
25use cuprate_wire::MoneroWireCodec;
26
27//---------------------------------------------------------------------------------------------------- Configuration
28
29#[derive(Clone, Copy)]
30pub struct DaemonClientConfig {
31    /// Socket address of the external Tor daemon
32    pub tor_daemon: SocketAddr,
33}
34
35#[derive(Clone, Copy)]
36pub struct DaemonServerConfig {
37    /// Listening IP Address.
38    pub ip: IpAddr,
39
40    /// Listening TCP Port.
41    pub port: u16,
42}
43
44//---------------------------------------------------------------------------------------------------- Transport
45
46/// A simple TCP server waiting for connections from the Tor daemon
47pub struct DaemonInboundStream {
48    listener: TcpListener,
49}
50
51impl Stream for DaemonInboundStream {
52    type Item = Result<
53        (
54            Option<<Tor as NetworkZone>::Addr>,
55            FramedRead<OwnedReadHalf, MoneroWireCodec>,
56            FramedWrite<OwnedWriteHalf, MoneroWireCodec>,
57        ),
58        io::Error,
59    >;
60
61    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        self.listener
63            .poll_accept(cx)
64            .map_ok(|(stream, _)| {
65                let (stream, sink) = stream.into_split();
66
67                (
68                    None, // Inbound is anonymous
69                    FramedRead::new(stream, MoneroWireCodec::default()),
70                    FramedWrite::new(sink, MoneroWireCodec::default()),
71                )
72            })
73            .map(Some)
74    }
75}
76
77#[derive(Clone, Copy)]
78pub struct Daemon;
79
80#[async_trait]
81impl Transport<Tor> for Daemon {
82    type ClientConfig = DaemonClientConfig;
83    type ServerConfig = DaemonServerConfig;
84
85    type Stream = FramedRead<OwnedReadHalf, MoneroWireCodec>;
86    type Sink = FramedWrite<OwnedWriteHalf, MoneroWireCodec>;
87    type Listener = DaemonInboundStream;
88
89    async fn connect_to_peer(
90        addr: <Tor as NetworkZone>::Addr,
91        config: &Self::ClientConfig,
92    ) -> Result<(Self::Stream, Self::Sink), io::Error> {
93        Socks5Stream::connect(config.tor_daemon, addr.to_string())
94            .await
95            .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
96            .map(|stream| {
97                let (stream, sink) = stream.into_inner().into_split();
98                (
99                    FramedRead::new(stream, MoneroWireCodec::default()),
100                    FramedWrite::new(sink, MoneroWireCodec::default()),
101                )
102            })
103    }
104
105    async fn incoming_connection_listener(
106        config: Self::ServerConfig,
107    ) -> Result<Self::Listener, io::Error> {
108        let listener = TcpListener::bind((config.ip, config.port)).await?;
109
110        Ok(DaemonInboundStream { listener })
111    }
112}