monero_simple_request_rpc/
lib.rs1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3#![deny(missing_docs)]
4
5use core::future::Future;
6use std::{sync::Arc, io::Read, time::Duration};
7
8use tokio::sync::Mutex;
9
10use digest_auth::{WwwAuthenticateHeader, AuthContext};
11use simple_request::{
12 hyper::{StatusCode, header::HeaderValue, Request},
13 Response, Client,
14};
15
16use monero_rpc::{RpcError, Rpc};
17
18const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
19
20#[derive(Clone, Debug)]
21enum Authentication {
22 Unauthenticated(Client),
24 Authenticated {
28 username: String,
29 password: String,
30 #[allow(clippy::type_complexity)]
31 connection: Arc<Mutex<(Option<(WwwAuthenticateHeader, u64)>, Client)>>,
32 },
33}
34
35#[derive(Clone, Debug)]
39pub struct SimpleRequestRpc {
40 authentication: Authentication,
41 url: String,
42 request_timeout: Duration,
43}
44
45impl SimpleRequestRpc {
46 fn digest_auth_challenge(
47 response: &Response,
48 ) -> Result<Option<(WwwAuthenticateHeader, u64)>, RpcError> {
49 Ok(if let Some(header) = response.headers().get("www-authenticate") {
50 Some((
51 digest_auth::parse(header.to_str().map_err(|_| {
52 RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
53 })?)
54 .map_err(|_| RpcError::InvalidNode("invalid digest-auth response".to_string()))?,
55 0,
56 ))
57 } else {
58 None
59 })
60 }
61
62 pub async fn new(url: String) -> Result<SimpleRequestRpc, RpcError> {
67 Self::with_custom_timeout(url, DEFAULT_TIMEOUT).await
68 }
69
70 pub async fn with_custom_timeout(
75 mut url: String,
76 request_timeout: Duration,
77 ) -> Result<SimpleRequestRpc, RpcError> {
78 let authentication = if url.contains('@') {
79 let url_clone = url;
81 let split_url = url_clone.split('@').collect::<Vec<_>>();
82 if split_url.len() != 2 {
83 Err(RpcError::ConnectionError("invalid amount of login specifications".to_string()))?;
84 }
85 let mut userpass = split_url[0];
86 url = split_url[1].to_string();
87
88 if userpass.contains("://") {
90 let split_userpass = userpass.split("://").collect::<Vec<_>>();
91 if split_userpass.len() != 2 {
92 Err(RpcError::ConnectionError("invalid amount of protocol specifications".to_string()))?;
93 }
94 url = split_userpass[0].to_string() + "://" + &url;
95 userpass = split_userpass[1];
96 }
97
98 let split_userpass = userpass.split(':').collect::<Vec<_>>();
99 if split_userpass.len() > 2 {
100 Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?;
101 }
102
103 let client = Client::without_connection_pool(&url)
104 .map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?;
105 let challenge = Self::digest_auth_challenge(
107 &client
108 .request(
109 Request::post(url.clone())
110 .body(vec![].into())
111 .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?,
112 )
113 .await
114 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
115 )?;
116 Authentication::Authenticated {
117 username: split_userpass[0].to_string(),
118 password: (*split_userpass.get(1).unwrap_or(&"")).to_string(),
119 connection: Arc::new(Mutex::new((challenge, client))),
120 }
121 } else {
122 Authentication::Unauthenticated(Client::with_connection_pool())
123 };
124
125 Ok(SimpleRequestRpc { authentication, url, request_timeout })
126 }
127}
128
129impl SimpleRequestRpc {
130 async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
131 let request_fn = |uri| {
132 Request::post(uri)
133 .body(body.clone().into())
134 .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
135 };
136
137 async fn body_from_response(response: Response<'_>) -> Result<Vec<u8>, RpcError> {
138 let mut res = Vec::with_capacity(128);
161 response
162 .body()
163 .await
164 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
165 .read_to_end(&mut res)
166 .unwrap();
167 Ok(res)
168 }
169
170 for attempt in 0 .. 2 {
171 return Ok(match &self.authentication {
172 Authentication::Unauthenticated(client) => {
173 body_from_response(
174 client
175 .request(request_fn(self.url.clone() + "/" + route)?)
176 .await
177 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
178 )
179 .await?
180 }
181 Authentication::Authenticated { username, password, connection } => {
182 let mut connection_lock = connection.lock().await;
183
184 let mut request = request_fn("/".to_string() + route)?;
185
186 if connection_lock.0.is_none() {
188 connection_lock.0 = Self::digest_auth_challenge(
189 &connection_lock
190 .1
191 .request(request)
192 .await
193 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
194 )?;
195 request = request_fn("/".to_string() + route)?;
196 }
197
198 if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
200 *cnonce += 1;
203
204 let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
205 username,
206 password,
207 "/".to_string() + route,
208 None,
209 );
210 context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
211
212 request.headers_mut().insert(
213 "Authorization",
214 HeaderValue::from_str(
215 &challenge
216 .respond(&context)
217 .map_err(|_| {
218 RpcError::InvalidNode("couldn't respond to digest-auth challenge".to_string())
219 })?
220 .to_header_string(),
221 )
222 .unwrap(),
223 );
224 }
225
226 let response = connection_lock
227 .1
228 .request(request)
229 .await
230 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
231
232 let (error, is_stale) = match &response {
233 Err(e) => (Some(e.clone()), false),
234 Ok(response) => (
235 None,
236 if response.status() == StatusCode::UNAUTHORIZED {
237 if let Some(header) = response.headers().get("www-authenticate") {
238 header
239 .to_str()
240 .map_err(|_| {
241 RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
242 })?
243 .contains("stale")
244 } else {
245 false
246 }
247 } else {
248 false
249 },
250 ),
251 };
252
253 if error.is_some() || is_stale {
257 connection_lock.0 = None;
258 if attempt == 0 {
261 continue;
262 }
263 if let Some(e) = error {
264 Err(e)?
265 } else {
266 debug_assert!(is_stale);
267 Err(RpcError::InvalidNode(
268 "node claimed fresh connection had stale authentication".to_string(),
269 ))?
270 }
271 } else {
272 body_from_response(response.unwrap()).await?
273 }
274 }
275 });
276 }
277
278 unreachable!()
279 }
280}
281
282impl Rpc for SimpleRequestRpc {
283 fn post(
284 &self,
285 route: &str,
286 body: Vec<u8>,
287 ) -> impl Send + Future<Output = Result<Vec<u8>, RpcError>> {
288 async move {
289 tokio::time::timeout(self.request_timeout, self.inner_post(route, body))
290 .await
291 .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
292 }
293 }
294}