cuprate_consensus_context/
rx_vms.rsuse std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use futures::{stream::FuturesOrdered, StreamExt};
use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VmInner};
use rayon::prelude::*;
use thread_local::ThreadLocal;
use tower::ServiceExt;
use tracing::instrument;
use cuprate_consensus_rules::blocks::randomx_seed_height;
use cuprate_consensus_rules::{
blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
};
use crate::{ContextCacheError, Database};
pub const RX_SEEDS_CACHED: usize = 2;
#[derive(Debug)]
pub struct RandomXVm {
vms: ThreadLocal<VmInner>,
cache: RandomXCache,
flags: RandomXFlag,
}
impl RandomXVm {
pub fn new(seed: &[u8; 32]) -> Result<Self, RandomXError> {
let flags = RandomXFlag::get_recommended_flags();
let cache = RandomXCache::new(flags, seed.as_slice())?;
Ok(Self {
vms: ThreadLocal::new(),
cache,
flags,
})
}
}
impl RandomX for RandomXVm {
type Error = RandomXError;
fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
self.vms
.get_or_try(|| VmInner::new(self.flags, Some(self.cache.clone()), None))?
.calculate_hash(buf)
.map(|out| out.try_into().unwrap())
}
}
#[derive(Clone, Debug)]
pub struct RandomXVmCache {
pub seeds: VecDeque<(usize, [u8; 32])>,
pub vms: HashMap<usize, Arc<RandomXVm>>,
pub cached_vm: Option<([u8; 32], Arc<RandomXVm>)>,
}
impl RandomXVmCache {
#[instrument(name = "init_rx_vm_cache", level = "info", skip(database))]
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: usize,
hf: &HardFork,
database: D,
) -> Result<Self, ContextCacheError> {
let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
tracing::debug!("last {RX_SEEDS_CACHED} randomX seed heights: {seed_heights:?}",);
let seeds: VecDeque<(usize, [u8; 32])> =
seed_heights.into_iter().zip(seed_hashes).collect();
let vms = if hf >= &HardFork::V12 {
tracing::debug!("Creating RandomX VMs");
let seeds_clone = seeds.clone();
rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
(
*height,
Arc::new(RandomXVm::new(seed).expect("Failed to create RandomX VM!")),
)
})
.collect()
})
.await
} else {
tracing::debug!("We are before hard-fork 12 randomX VMs are not needed.");
HashMap::new()
};
Ok(Self {
seeds,
vms,
cached_vm: None,
})
}
pub fn add_vm(&mut self, vm: ([u8; 32], Arc<RandomXVm>)) {
self.cached_vm.replace(vm);
}
pub async fn get_alt_vm<D: Database>(
&self,
height: usize,
chain: Chain,
database: D,
) -> Result<Arc<RandomXVm>, ContextCacheError> {
let seed_height = randomx_seed_height(height);
let BlockchainResponse::BlockHash(seed_hash) = database
.oneshot(BlockchainReadRequest::BlockHash(seed_height, chain))
.await?
else {
panic!("Database returned wrong response!");
};
for (vm_main_chain_height, vm_seed_hash) in &self.seeds {
if vm_seed_hash == &seed_hash {
let Some(vm) = self.vms.get(vm_main_chain_height) else {
break;
};
return Ok(Arc::clone(vm));
}
}
let alt_vm = rayon_spawn_async(move || Arc::new(RandomXVm::new(&seed_hash).unwrap())).await;
Ok(alt_vm)
}
pub async fn get_vms(&mut self) -> HashMap<usize, Arc<RandomXVm>> {
match self.seeds.len().checked_sub(self.vms.len()) {
Some(0) => (),
Some(1) => {
let (seed_height, next_seed_hash) = *self.seeds.front().unwrap();
let new_vm = 'new_vm_block: {
tracing::debug!(
"Initializing RandomX VM for seed: {}",
hex::encode(next_seed_hash)
);
if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
if cached_hash == next_seed_hash {
tracing::debug!("VM was already created.");
break 'new_vm_block cached_vm;
}
};
rayon_spawn_async(move || Arc::new(RandomXVm::new(&next_seed_hash).unwrap()))
.await
};
self.vms.insert(seed_height, new_vm);
}
_ => {
tracing::debug!("RandomX has activated, initialising VMs");
let seeds_clone = self.seeds.clone();
self.vms = rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
let vm = RandomXVm::new(seed).expect("Failed to create RandomX VM!");
let vm = Arc::new(vm);
(*height, vm)
})
.collect()
})
.await;
}
}
self.vms.clone()
}
pub fn pop_blocks_main_chain(&mut self, new_height: usize) {
self.seeds.retain(|(height, _)| *height < new_height);
self.vms.retain(|height, _| *height < new_height);
}
pub fn new_block(&mut self, height: usize, hash: &[u8; 32]) {
if is_randomx_seed_height(height) {
tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",);
self.seeds.push_front((height, *hash));
if self.seeds.len() > RX_SEEDS_CACHED {
self.seeds.pop_back();
self.vms.retain(|height, _| {
self.seeds
.iter()
.any(|(cached_height, _)| height == cached_height)
});
}
}
}
}
pub fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec<usize> {
let mut seeds = Vec::with_capacity(amount);
if is_randomx_seed_height(last_height) {
seeds.push(last_height);
amount -= 1;
}
for _ in 0..amount {
if last_height == 0 {
return seeds;
}
let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1);
seeds.push(seed_height);
last_height = seed_height;
}
seeds
}
async fn get_block_hashes<D: Database + Clone>(
heights: Vec<usize>,
database: D,
) -> Result<Vec<[u8; 32]>, ContextCacheError> {
let mut fut = FuturesOrdered::new();
for height in heights {
let db = database.clone();
fut.push_back(async move {
let BlockchainResponse::BlockHash(hash) = db
.clone()
.oneshot(BlockchainReadRequest::BlockHash(height, Chain::Main))
.await?
else {
panic!("Database sent incorrect response!");
};
Result::<_, ContextCacheError>::Ok(hash)
});
}
let mut res = Vec::new();
while let Some(hash) = fut.next().await {
res.push(hash?);
}
Ok(res)
}