monero_simple_request_rpc/
lib.rs#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::future::Future;
use std::{sync::Arc, io::Read, time::Duration};
use tokio::sync::Mutex;
use digest_auth::{WwwAuthenticateHeader, AuthContext};
use simple_request::{
hyper::{StatusCode, header::HeaderValue, Request},
Response, Client,
};
use monero_rpc::{RpcError, Rpc};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug)]
enum Authentication {
Unauthenticated(Client),
Authenticated {
username: String,
password: String,
#[allow(clippy::type_complexity)]
connection: Arc<Mutex<(Option<(WwwAuthenticateHeader, u64)>, Client)>>,
},
}
#[derive(Clone, Debug)]
pub struct SimpleRequestRpc {
authentication: Authentication,
url: String,
request_timeout: Duration,
}
impl SimpleRequestRpc {
fn digest_auth_challenge(
response: &Response,
) -> Result<Option<(WwwAuthenticateHeader, u64)>, RpcError> {
Ok(if let Some(header) = response.headers().get("www-authenticate") {
Some((
digest_auth::parse(header.to_str().map_err(|_| {
RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
})?)
.map_err(|_| RpcError::InvalidNode("invalid digest-auth response".to_string()))?,
0,
))
} else {
None
})
}
pub async fn new(url: String) -> Result<SimpleRequestRpc, RpcError> {
Self::with_custom_timeout(url, DEFAULT_TIMEOUT).await
}
pub async fn with_custom_timeout(
mut url: String,
request_timeout: Duration,
) -> Result<SimpleRequestRpc, RpcError> {
let authentication = if url.contains('@') {
let url_clone = url;
let split_url = url_clone.split('@').collect::<Vec<_>>();
if split_url.len() != 2 {
Err(RpcError::ConnectionError("invalid amount of login specifications".to_string()))?;
}
let mut userpass = split_url[0];
url = split_url[1].to_string();
if userpass.contains("://") {
let split_userpass = userpass.split("://").collect::<Vec<_>>();
if split_userpass.len() != 2 {
Err(RpcError::ConnectionError("invalid amount of protocol specifications".to_string()))?;
}
url = split_userpass[0].to_string() + "://" + &url;
userpass = split_userpass[1];
}
let split_userpass = userpass.split(':').collect::<Vec<_>>();
if split_userpass.len() > 2 {
Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?;
}
let client = Client::without_connection_pool(&url)
.map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?;
let challenge = Self::digest_auth_challenge(
&client
.request(
Request::post(url.clone())
.body(vec![].into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?,
)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)?;
Authentication::Authenticated {
username: split_userpass[0].to_string(),
password: (*split_userpass.get(1).unwrap_or(&"")).to_string(),
connection: Arc::new(Mutex::new((challenge, client))),
}
} else {
Authentication::Unauthenticated(Client::with_connection_pool())
};
Ok(SimpleRequestRpc { authentication, url, request_timeout })
}
}
impl SimpleRequestRpc {
async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
let request_fn = |uri| {
Request::post(uri)
.body(body.clone().into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
};
async fn body_from_response(response: Response<'_>) -> Result<Vec<u8>, RpcError> {
let mut res = Vec::with_capacity(128);
response
.body()
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
.read_to_end(&mut res)
.unwrap();
Ok(res)
}
for attempt in 0 .. 2 {
return Ok(match &self.authentication {
Authentication::Unauthenticated(client) => {
body_from_response(
client
.request(request_fn(self.url.clone() + "/" + route)?)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)
.await?
}
Authentication::Authenticated { username, password, connection } => {
let mut connection_lock = connection.lock().await;
let mut request = request_fn("/".to_string() + route)?;
if connection_lock.0.is_none() {
connection_lock.0 = Self::digest_auth_challenge(
&connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)?;
request = request_fn("/".to_string() + route)?;
}
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
*cnonce += 1;
let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
username,
password,
"/".to_string() + route,
None,
);
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(
&challenge
.respond(&context)
.map_err(|_| {
RpcError::InvalidNode("couldn't respond to digest-auth challenge".to_string())
})?
.to_header_string(),
)
.unwrap(),
);
}
let response = connection_lock
.1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
let (error, is_stale) = match &response {
Err(e) => (Some(e.clone()), false),
Ok(response) => (
None,
if response.status() == StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get("www-authenticate") {
header
.to_str()
.map_err(|_| {
RpcError::InvalidNode("www-authenticate header wasn't a string".to_string())
})?
.contains("stale")
} else {
false
}
} else {
false
},
),
};
if error.is_some() || is_stale {
connection_lock.0 = None;
if attempt == 0 {
continue;
}
if let Some(e) = error {
Err(e)?
} else {
debug_assert!(is_stale);
Err(RpcError::InvalidNode(
"node claimed fresh connection had stale authentication".to_string(),
))?
}
} else {
body_from_response(response.unwrap()).await?
}
}
});
}
unreachable!()
}
}
impl Rpc for SimpleRequestRpc {
fn post(
&self,
route: &str,
body: Vec<u8>,
) -> impl Send + Future<Output = Result<Vec<u8>, RpcError>> {
async move {
tokio::time::timeout(self.request_timeout, self.inner_post(route, body))
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
}
}
}