1use std::{
9 io::{self, ErrorKind},
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13};
14
15use arti_client::{DataReader, DataWriter, TorClient, TorClientConfig};
16use async_trait::async_trait;
17use futures::{Stream, StreamExt};
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tor_cell::relaycell::msg::Connected;
20use tor_config_path::CfgPathResolver;
21use tor_hsservice::{handle_rend_requests, OnionService, RunningOnionService};
22use tor_proto::stream::IncomingStreamRequest;
23use tor_rtcompat::PreferredRuntime;
24
25use cuprate_p2p_core::{ClearNet, NetworkZone, Tor, Transport};
26use cuprate_wire::MoneroWireCodec;
27
28use crate::DisabledListener;
29
30#[derive(Clone)]
33pub struct ArtiClientConfig {
34 pub client: TorClient<PreferredRuntime>,
36}
37
38pub struct ArtiServerConfig {
39 pub onion_svc: OnionService,
41 pub port: u16,
43
44 client: TorClient<PreferredRuntime>,
46 path_resolver: Arc<CfgPathResolver>,
47}
48
49impl ArtiServerConfig {
50 pub fn new(
51 onion_svc: OnionService,
52 port: u16,
53 client: &TorClient<PreferredRuntime>,
54 config: &TorClientConfig,
55 ) -> Self {
56 let path_resolver: &CfgPathResolver = config.as_ref();
57
58 Self {
59 onion_svc,
60 port,
61 client: client.clone(),
62 path_resolver: Arc::new(path_resolver.clone()),
63 }
64 }
65}
66
67type PinnedStream<I> = Pin<Box<dyn Stream<Item = I> + Send>>;
70
71pub struct OnionListener {
73 _onion_svc: Arc<RunningOnionService>,
75 listener: PinnedStream<Result<(DataReader, DataWriter), io::Error>>,
77}
78
79impl Stream for OnionListener {
80 type Item = Result<
81 (
82 Option<<Tor as NetworkZone>::Addr>,
83 FramedRead<DataReader, MoneroWireCodec>,
84 FramedWrite<DataWriter, MoneroWireCodec>,
85 ),
86 io::Error,
87 >;
88
89 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 match self.listener.poll_next_unpin(cx) {
91 Poll::Pending => Poll::Pending,
92 Poll::Ready(req) => Poll::Ready(req.map(|r| {
93 r.map(|(stream, sink)| {
94 (
95 None, FramedRead::new(stream, MoneroWireCodec::default()),
97 FramedWrite::new(sink, MoneroWireCodec::default()),
98 )
99 })
100 })),
101 }
102 }
103}
104
105#[derive(Clone, Copy)]
106pub struct Arti;
107
108#[async_trait]
109impl Transport<Tor> for Arti {
110 type ClientConfig = ArtiClientConfig;
111 type ServerConfig = ArtiServerConfig;
112
113 type Stream = FramedRead<DataReader, MoneroWireCodec>;
114 type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
115 type Listener = OnionListener;
116
117 async fn connect_to_peer(
118 addr: <Tor as NetworkZone>::Addr,
119 config: &Self::ClientConfig,
120 ) -> Result<(Self::Stream, Self::Sink), io::Error> {
121 config
122 .client
123 .connect((addr.addr_string(), addr.port()))
124 .await
125 .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
126 .map(|stream| {
127 let (stream, sink) = stream.split();
128 (
129 FramedRead::new(stream, MoneroWireCodec::default()),
130 FramedWrite::new(sink, MoneroWireCodec::default()),
131 )
132 })
133 }
134
135 async fn incoming_connection_listener(
136 config: Self::ServerConfig,
137 ) -> Result<Self::Listener, io::Error> {
138 #[expect(clippy::clone_on_ref_ptr)]
140 let (svc, rdv_stream) = config
141 .onion_svc
142 .launch(
143 config.client.runtime().clone(),
144 config.client.dirmgr().clone(),
145 config.client.hs_circ_pool().clone(),
146 config.path_resolver,
147 )
148 .unwrap();
149
150 #[expect(clippy::wildcard_enum_match_arm)]
152 let req_stream = handle_rend_requests(rdv_stream).then(move |sreq| async move {
153 match sreq.request() {
154 IncomingStreamRequest::Begin(r)
159 if r.port() == config.port && r.addr().is_empty() && r.flags().is_empty() =>
160 {
161 let stream = sreq
162 .accept(Connected::new_empty())
163 .await
164 .map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e.to_string()))?;
165
166 Ok(stream.split())
167 }
168 _ => {
169 sreq.shutdown_circuit()
170 .expect("Should never panic, unless programming error from arti's end.");
171
172 Err(io::Error::other("Received invalid command"))
173 }
174 }
175 });
176
177 Ok(OnionListener {
178 _onion_svc: svc,
179 listener: Box::pin(req_stream),
180 })
181 }
182}
183
184#[async_trait]
185impl Transport<ClearNet> for Arti {
186 type ClientConfig = ArtiClientConfig;
187 type ServerConfig = ();
188
189 type Stream = FramedRead<DataReader, MoneroWireCodec>;
190 type Sink = FramedWrite<DataWriter, MoneroWireCodec>;
191 type Listener = DisabledListener<ClearNet, DataReader, DataWriter>;
192
193 async fn connect_to_peer(
194 addr: <ClearNet as NetworkZone>::Addr,
195 config: &Self::ClientConfig,
196 ) -> Result<(Self::Stream, Self::Sink), io::Error> {
197 config
198 .client
199 .connect(addr.to_string())
200 .await
201 .map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e.to_string()))
202 .map(|stream| {
203 let (stream, sink) = stream.split();
204 (
205 FramedRead::new(stream, MoneroWireCodec::default()),
206 FramedWrite::new(sink, MoneroWireCodec::default()),
207 )
208 })
209 }
210
211 async fn incoming_connection_listener(
212 _config: Self::ServerConfig,
213 ) -> Result<Self::Listener, io::Error> {
214 panic!("In anonymized clearnet mode, inbound is disabled!");
215 }
216}