monero_simple_request_rpc/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3#![deny(missing_docs)]
4
5use core::future::Future;
6use std::{sync::Arc, io::Read, time::Duration};
7
8use tokio::sync::Mutex;
9
10use digest_auth::{WwwAuthenticateHeader, AuthContext};
11use simple_request::{
12  hyper::{StatusCode, header::HeaderValue, Request},
13  Response, Client,
14};
15
16use monero_rpc::{RpcError, Rpc};
17
18const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
19
20#[derive(Clone, Debug)]
21enum Authentication {
22  // If unauthenticated, use a single client
23  Unauthenticated(Client),
24  // If authenticated, use a single client which supports being locked and tracks its nonce
25  // This ensures that if a nonce is requested, another caller doesn't make a request invalidating
26  // it
27  Authenticated {
28    username: String,
29    password: String,
30    #[allow(clippy::type_complexity)]
31    connection: Arc<Mutex<(Option<(WwwAuthenticateHeader, u64)>, Client)>>,
32  },
33}
34
35/// An HTTP(S) transport for the RPC.
36///
37/// Requires tokio.
38#[derive(Clone, Debug)]
39pub struct SimpleRequestRpc {
40  authentication: Authentication,
41  url: String,
42  request_timeout: Duration,
43}
44
45impl SimpleRequestRpc {
46  fn digest_auth_challenge(
47    response: &Response,
48  ) -> Result<Option<(WwwAuthenticateHeader, u64)>, RpcError> {
49    Ok(if let Some(header) = response.headers().get("www-authenticate") {
50      Some((
51        digest_auth::parse(header.to_str().map_err(|_| {
52          RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
53        })?)
54        .map_err(|_| RpcError::InvalidNode("invalid digest-auth response".to_string()))?,
55        0,
56      ))
57    } else {
58      None
59    })
60  }
61
62  /// Create a new HTTP(S) RPC connection.
63  ///
64  /// A daemon requiring authentication can be used via including the username and password in the
65  /// URL.
66  pub async fn new(url: String) -> Result<SimpleRequestRpc, RpcError> {
67    Self::with_custom_timeout(url, DEFAULT_TIMEOUT).await
68  }
69
70  /// Create a new HTTP(S) RPC connection with a custom timeout.
71  ///
72  /// A daemon requiring authentication can be used via including the username and password in the
73  /// URL.
74  pub async fn with_custom_timeout(
75    mut url: String,
76    request_timeout: Duration,
77  ) -> Result<SimpleRequestRpc, RpcError> {
78    let authentication = if url.contains('@') {
79      // Parse out the username and password
80      let url_clone = url;
81      let split_url = url_clone.split('@').collect::<Vec<_>>();
82      if split_url.len() != 2 {
83        Err(RpcError::ConnectionError("invalid amount of login specifications".to_string()))?;
84      }
85      let mut userpass = split_url[0];
86      url = split_url[1].to_string();
87
88      // If there was additionally a protocol string, restore that to the daemon URL
89      if userpass.contains("://") {
90        let split_userpass = userpass.split("://").collect::<Vec<_>>();
91        if split_userpass.len() != 2 {
92          Err(RpcError::ConnectionError("invalid amount of protocol specifications".to_string()))?;
93        }
94        url = split_userpass[0].to_string() + "://" + &url;
95        userpass = split_userpass[1];
96      }
97
98      let split_userpass = userpass.split(':').collect::<Vec<_>>();
99      if split_userpass.len() > 2 {
100        Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?;
101      }
102
103      let client = Client::without_connection_pool(&url)
104        .map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?;
105      // Obtain the initial challenge, which also somewhat validates this connection
106      let challenge = Self::digest_auth_challenge(
107        &client
108          .request(
109            Request::post(url.clone())
110              .body(vec![].into())
111              .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?,
112          )
113          .await
114          .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
115      )?;
116      Authentication::Authenticated {
117        username: split_userpass[0].to_string(),
118        password: (*split_userpass.get(1).unwrap_or(&"")).to_string(),
119        connection: Arc::new(Mutex::new((challenge, client))),
120      }
121    } else {
122      Authentication::Unauthenticated(Client::with_connection_pool())
123    };
124
125    Ok(SimpleRequestRpc { authentication, url, request_timeout })
126  }
127}
128
129impl SimpleRequestRpc {
130  async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
131    let request_fn = |uri| {
132      Request::post(uri)
133        .body(body.clone().into())
134        .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
135    };
136
137    async fn body_from_response(response: Response<'_>) -> Result<Vec<u8>, RpcError> {
138      /*
139      let length = usize::try_from(
140        response
141          .headers()
142          .get("content-length")
143          .ok_or(RpcError::InvalidNode("no content-length header"))?
144          .to_str()
145          .map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
146          .parse::<u32>()
147          .map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
148      )
149      .unwrap();
150      // Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
151      // has to send 1 GB of data to cause a 1 GB allocation
152      let mut res = Vec::with_capacity(length.max(1024 * 1024));
153      let mut body = response.into_body();
154      while res.len() < length {
155        let Some(data) = body.data().await else { break };
156        res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref());
157      }
158      */
159
160      let mut res = Vec::with_capacity(128);
161      response
162        .body()
163        .await
164        .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
165        .read_to_end(&mut res)
166        .unwrap();
167      Ok(res)
168    }
169
170    for attempt in 0 .. 2 {
171      return Ok(match &self.authentication {
172        Authentication::Unauthenticated(client) => {
173          body_from_response(
174            client
175              .request(request_fn(self.url.clone() + "/" + route)?)
176              .await
177              .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
178          )
179          .await?
180        }
181        Authentication::Authenticated { username, password, connection } => {
182          let mut connection_lock = connection.lock().await;
183
184          let mut request = request_fn("/".to_string() + route)?;
185
186          // If we don't have an auth challenge, obtain one
187          if connection_lock.0.is_none() {
188            connection_lock.0 = Self::digest_auth_challenge(
189              &connection_lock
190                .1
191                .request(request)
192                .await
193                .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
194            )?;
195            request = request_fn("/".to_string() + route)?;
196          }
197
198          // Insert the challenge response, if we have a challenge
199          if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
200            // Update the cnonce
201            // Overflow isn't a concern as this is a u64
202            *cnonce += 1;
203
204            let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
205              username,
206              password,
207              "/".to_string() + route,
208              None,
209            );
210            context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
211
212            request.headers_mut().insert(
213              "Authorization",
214              HeaderValue::from_str(
215                &challenge
216                  .respond(&context)
217                  .map_err(|_| {
218                    RpcError::InvalidNode("couldn't respond to digest-auth challenge".to_string())
219                  })?
220                  .to_header_string(),
221              )
222              .unwrap(),
223            );
224          }
225
226          let response = connection_lock
227            .1
228            .request(request)
229            .await
230            .map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
231
232          let (error, is_stale) = match &response {
233            Err(e) => (Some(e.clone()), false),
234            Ok(response) => (
235              None,
236              if response.status() == StatusCode::UNAUTHORIZED {
237                if let Some(header) = response.headers().get("www-authenticate") {
238                  header
239                    .to_str()
240                    .map_err(|_| {
241                      RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
242                    })?
243                    .contains("stale")
244                } else {
245                  false
246                }
247              } else {
248                false
249              },
250            ),
251          };
252
253          // If the connection entered an error state, drop the cached challenge as challenges are
254          // per-connection
255          // We don't need to create a new connection as simple-request will for us
256          if error.is_some() || is_stale {
257            connection_lock.0 = None;
258            // If we're not already on our second attempt, move to the next loop iteration
259            // (retrying all of this once)
260            if attempt == 0 {
261              continue;
262            }
263            if let Some(e) = error {
264              Err(e)?
265            } else {
266              debug_assert!(is_stale);
267              Err(RpcError::InvalidNode(
268                "node claimed fresh connection had stale authentication".to_string(),
269              ))?
270            }
271          } else {
272            body_from_response(response.unwrap()).await?
273          }
274        }
275      });
276    }
277
278    unreachable!()
279  }
280}
281
282impl Rpc for SimpleRequestRpc {
283  fn post(
284    &self,
285    route: &str,
286    body: Vec<u8>,
287  ) -> impl Send + Future<Output = Result<Vec<u8>, RpcError>> {
288    async move {
289      tokio::time::timeout(self.request_timeout, self.inner_post(route, body))
290        .await
291        .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
292    }
293  }
294}