cuprate_p2p_transport/
arti.rs

1//! Arti Transport
2//!
3//! This module defines a transport method for the `Tor` network zone using the `arti_client` library.
4//!
5
6//---------------------------------------------------------------------------------------------------- Imports
7
8use std::{
9    io::{self, ErrorKind},
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13};
14
15use arti_client::{DataReader, DataWriter, TorClient, TorClientConfig};
16use async_trait::async_trait;
17use futures::{Stream, StreamExt};
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tor_cell::relaycell::msg::Connected;
20use tor_config_path::CfgPathResolver;
21use tor_hsservice::{handle_rend_requests, OnionService, RunningOnionService};
22use tor_proto::stream::IncomingStreamRequest;
23use tor_rtcompat::PreferredRuntime;
24
25use cuprate_p2p_core::{ClearNet, NetworkZone, Tor, Transport};
26use cuprate_wire::MoneroWireCodec;
27
28use crate::DisabledListener;
29
30//---------------------------------------------------------------------------------------------------- Configuration
31
32#[derive(Clone)]
33pub struct ArtiClientConfig {
34    /// Arti bootstrapped client
35    pub client: TorClient<PreferredRuntime>,
36}
37
38pub struct ArtiServerConfig {
39    /// Arti onion service
40    pub onion_svc: OnionService,
41    /// Listening port
42    pub port: u16,
43
44    // Mandatory resources for launching the onion service
45    client: TorClient<PreferredRuntime>,
46    path_resolver: Arc<CfgPathResolver>,
47}
48
49impl ArtiServerConfig {
50    pub fn new(
51        onion_svc: OnionService,
52        port: u16,
53        client: &TorClient<PreferredRuntime>,
54        config: &TorClientConfig,
55    ) -> Self {
56        let path_resolver: &CfgPathResolver = config.as_ref();
57
58        Self {
59            onion_svc,
60            port,
61            client: client.clone(),
62            path_resolver: Arc::new(path_resolver.clone()),
63        }
64    }
65}
66
67//---------------------------------------------------------------------------------------------------- Transport
68
69type PinnedStream<I> = Pin<Box<dyn Stream<Item = I> + Send>>;
70
71/// An onion service listening for incoming peer connections.
72pub struct OnionListener {
73    /// A handle to the onion service instance.
74    _onion_svc: Arc<RunningOnionService>,
75    /// A modified stream that produce a data stream and sink from rendez-vous requests.
76    listener: PinnedStream<Result<(DataReader, DataWriter), io::Error>>,
77}
78
79impl Stream for OnionListener {
80    type Item = Result<
81        (
82            Option<<Tor as NetworkZone>::Addr>,
83            FramedRead<DataReader, MoneroWireCodec>,
84            FramedWrite<DataWriter, MoneroWireCodec>,
85        ),
86        io::Error,
87    >;
88
89    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90        match self.listener.poll_next_unpin(cx) {
91            Poll::Pending => Poll::Pending,
92            Poll::Ready(req) => Poll::Ready(req.map(|r| {
93                r.map(|(stream, sink)| {
94                    (
95                        None, // Inbound is anonymous
96                        FramedRead::new(stream, MoneroWireCodec::default()),
97                        FramedWrite::new(sink, MoneroWireCodec::default()),
98                    )
99                })
100            })),
101        }
102    }
103}
104
105#[derive(Clone, Copy)]
106pub struct Arti;
107
108#[async_trait]
109impl Transport<Tor> for Arti {
110    type ClientConfig = ArtiClientConfig;
111    type ServerConfig = ArtiServerConfig;
112
113    type Stream = FramedRead<DataReader, MoneroWireCodec>;
114    type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
115    type Listener = OnionListener;
116
117    async fn connect_to_peer(
118        addr: <Tor as NetworkZone>::Addr,
119        config: &Self::ClientConfig,
120    ) -> Result<(Self::Stream, Self::Sink), io::Error> {
121        config
122            .client
123            .connect((addr.addr_string(), addr.port()))
124            .await
125            .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
126            .map(|stream| {
127                let (stream, sink) = stream.split();
128                (
129                    FramedRead::new(stream, MoneroWireCodec::default()),
130                    FramedWrite::new(sink, MoneroWireCodec::default()),
131                )
132            })
133    }
134
135    async fn incoming_connection_listener(
136        config: Self::ServerConfig,
137    ) -> Result<Self::Listener, io::Error> {
138        // Launch onion service
139        #[expect(clippy::clone_on_ref_ptr)]
140        let (svc, rdv_stream) = config
141            .onion_svc
142            .launch(
143                config.client.runtime().clone(),
144                config.client.dirmgr().clone(),
145                config.client.hs_circ_pool().clone(),
146                config.path_resolver,
147            )
148            .unwrap();
149
150        // Accept all rendez-vous and await correct stream request
151        #[expect(clippy::wildcard_enum_match_arm)]
152        let req_stream = handle_rend_requests(rdv_stream).then(move |sreq| async move {
153            match sreq.request() {
154                // As specified in: <https://spec.torproject.org/rend-spec/managing-streams.html>
155                //
156                // A client that wishes to open a data stream with us needs to send a BEGIN message with an empty address
157                // and no flags. We additionally filter requests to the correct port configured and advertised on P2P.
158                IncomingStreamRequest::Begin(r)
159                    if r.port() == config.port && r.addr().is_empty() && r.flags().is_empty() =>
160                {
161                    let stream = sreq
162                        .accept(Connected::new_empty())
163                        .await
164                        .map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e.to_string()))?;
165
166                    Ok(stream.split())
167                }
168                _ => {
169                    sreq.shutdown_circuit()
170                        .expect("Should never panic, unless programming error from arti's end.");
171
172                    Err(io::Error::other("Received invalid command"))
173                }
174            }
175        });
176
177        Ok(OnionListener {
178            _onion_svc: svc,
179            listener: Box::pin(req_stream),
180        })
181    }
182}
183
184#[async_trait]
185impl Transport<ClearNet> for Arti {
186    type ClientConfig = ArtiClientConfig;
187    type ServerConfig = ();
188
189    type Stream = FramedRead<DataReader, MoneroWireCodec>;
190    type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
191    type Listener = DisabledListener<ClearNet, DataReader, DataWriter>;
192
193    async fn connect_to_peer(
194        addr: <ClearNet as NetworkZone>::Addr,
195        config: &Self::ClientConfig,
196    ) -> Result<(Self::Stream, Self::Sink), io::Error> {
197        config
198            .client
199            .connect(addr.to_string())
200            .await
201            .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
202            .map(|stream| {
203                let (stream, sink) = stream.split();
204                (
205                    FramedRead::new(stream, MoneroWireCodec::default()),
206                    FramedWrite::new(sink, MoneroWireCodec::default()),
207                )
208            })
209    }
210
211    async fn incoming_connection_listener(
212        _config: Self::ServerConfig,
213    ) -> Result<Self::Listener, io::Error> {
214        panic!("In anonymized clearnet mode, inbound is disabled!");
215    }
216}