1//! RandomX VM Cache
2//!
3//! This module keeps track of the RandomX VM to calculate the next blocks proof-of-work, if the block needs a randomX VM and potentially
4//! more VMs around this height.
5//!
6use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9};
1011use futures::{stream::FuturesOrdered, StreamExt};
12use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VmInner};
13use rayon::prelude::*;
14use thread_local::ThreadLocal;
15use tower::ServiceExt;
16use tracing::instrument;
1718use cuprate_consensus_rules::blocks::randomx_seed_height;
19use cuprate_consensus_rules::{
20 blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
21 HardFork,
22};
23use cuprate_helper::asynch::rayon_spawn_async;
24use cuprate_types::{
25 blockchain::{BlockchainReadRequest, BlockchainResponse},
26 Chain,
27};
2829use crate::{ContextCacheError, Database};
3031/// The amount of randomX VMs to keep in the cache.
32pub const RX_SEEDS_CACHED: usize = 2;
3334/// A multithreaded randomX VM.
35#[derive(Debug)]
36pub struct RandomXVm {
37/// These RandomX VMs all share the same cache.
38vms: ThreadLocal<VmInner>,
39/// The RandomX cache.
40cache: RandomXCache,
41/// The flags used to start the RandomX VMs.
42flags: RandomXFlag,
43}
4445impl RandomXVm {
46/// Create a new multithreaded randomX VM with the provided seed.
47pub fn new(seed: &[u8; 32]) -> Result<Self, RandomXError> {
48// TODO: allow passing in flags.
49let flags = RandomXFlag::get_recommended_flags();
5051let cache = RandomXCache::new(flags, seed.as_slice())?;
5253Ok(Self {
54 vms: ThreadLocal::new(),
55 cache,
56 flags,
57 })
58 }
59}
6061impl RandomX for RandomXVm {
62type Error = RandomXError;
6364fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
65self.vms
66 .get_or_try(|| VmInner::new(self.flags, Some(self.cache.clone()), None))?
67.calculate_hash(buf)
68 .map(|out| out.try_into().unwrap())
69 }
70}
7172/// The randomX VMs cache, keeps the VM needed to calculate the current block's proof-of-work hash (if a VM is needed) and a
73/// couple more around this VM.
74#[derive(Clone, Debug)]
75pub struct RandomXVmCache {
76/// The top [`RX_SEEDS_CACHED`] RX seeds.
77pub seeds: VecDeque<(usize, [u8; 32])>,
78/// The VMs for `seeds` (if after hf 12, otherwise this will be empty).
79pub vms: HashMap<usize, Arc<RandomXVm>>,
8081/// A single cached VM that was given to us from a part of Cuprate.
82pub cached_vm: Option<([u8; 32], Arc<RandomXVm>)>,
83}
8485impl RandomXVmCache {
86#[instrument(name = "init_rx_vm_cache", level = "info", skip(database))]
87pub async fn init_from_chain_height<D: Database + Clone>(
88 chain_height: usize,
89 hf: &HardFork,
90 database: D,
91 ) -> Result<Self, ContextCacheError> {
92let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
93let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
9495tracing::debug!("last {RX_SEEDS_CACHED} randomX seed heights: {seed_heights:?}",);
9697let seeds: VecDeque<(usize, [u8; 32])> =
98 seed_heights.into_iter().zip(seed_hashes).collect();
99100let vms = if hf >= &HardFork::V12 {
101tracing::debug!("Creating RandomX VMs");
102let seeds_clone = seeds.clone();
103 rayon_spawn_async(move || {
104 seeds_clone
105 .par_iter()
106 .map(|(height, seed)| {
107 (
108*height,
109 Arc::new(RandomXVm::new(seed).expect("Failed to create RandomX VM!")),
110 )
111 })
112 .collect()
113 })
114 .await
115} else {
116tracing::debug!("We are before hard-fork 12 randomX VMs are not needed.");
117 HashMap::new()
118 };
119120Ok(Self {
121 seeds,
122 vms,
123 cached_vm: None,
124 })
125 }
126127/// Add a randomX VM to the cache, with the seed it was created with.
128pub fn add_vm(&mut self, vm: ([u8; 32], Arc<RandomXVm>)) {
129self.cached_vm.replace(vm);
130 }
131132/// Creates a RX VM for an alt chain, looking at the main chain RX VMs to see if we can use one
133 /// of them first.
134pub async fn get_alt_vm<D: Database>(
135&self,
136 height: usize,
137 chain: Chain,
138 database: D,
139 ) -> Result<Arc<RandomXVm>, ContextCacheError> {
140let seed_height = randomx_seed_height(height);
141142let BlockchainResponse::BlockHash(seed_hash) = database
143 .oneshot(BlockchainReadRequest::BlockHash(seed_height, chain))
144 .await?
145else {
146panic!("Database returned wrong response!");
147 };
148149for (vm_main_chain_height, vm_seed_hash) in &self.seeds {
150if vm_seed_hash == &seed_hash {
151let Some(vm) = self.vms.get(vm_main_chain_height) else {
152break;
153 };
154155return Ok(Arc::clone(vm));
156 }
157 }
158159let alt_vm = rayon_spawn_async(move || Arc::new(RandomXVm::new(&seed_hash).unwrap())).await;
160161Ok(alt_vm)
162 }
163164/// Get the main-chain RandomX VMs.
165pub async fn get_vms(&mut self) -> HashMap<usize, Arc<RandomXVm>> {
166match self.seeds.len().checked_sub(self.vms.len()) {
167// No difference in the amount of seeds to VMs.
168Some(0) => (),
169// One more seed than VM.
170Some(1) => {
171let (seed_height, next_seed_hash) = *self.seeds.front().unwrap();
172173let new_vm = 'new_vm_block: {
174tracing::debug!(
175"Initializing RandomX VM for seed: {}",
176 hex::encode(next_seed_hash)
177 );
178179// Check if we have been given the RX VM from another part of Cuprate.
180if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
181if cached_hash == next_seed_hash {
182tracing::debug!("VM was already created.");
183break 'new_vm_block cached_vm;
184 }
185 };
186187 rayon_spawn_async(move || Arc::new(RandomXVm::new(&next_seed_hash).unwrap()))
188 .await
189};
190191self.vms.insert(seed_height, new_vm);
192 }
193// More than one more seed than VM.
194_ => {
195// this will only happen when syncing and rx activates.
196tracing::debug!("RandomX has activated, initialising VMs");
197198let seeds_clone = self.seeds.clone();
199self.vms = rayon_spawn_async(move || {
200 seeds_clone
201 .par_iter()
202 .map(|(height, seed)| {
203let vm = RandomXVm::new(seed).expect("Failed to create RandomX VM!");
204let vm = Arc::new(vm);
205 (*height, vm)
206 })
207 .collect()
208 })
209 .await;
210 }
211 }
212213self.vms.clone()
214 }
215216/// Removes all the RandomX VMs above the `new_height`.
217pub fn pop_blocks_main_chain(&mut self, new_height: usize) {
218self.seeds.retain(|(height, _)| *height < new_height);
219self.vms.retain(|height, _| *height < new_height);
220 }
221222/// Add a new block to the VM cache.
223 ///
224 /// hash is the block hash not the blocks proof-of-work hash.
225pub fn new_block(&mut self, height: usize, hash: &[u8; 32]) {
226if is_randomx_seed_height(height) {
227tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",);
228229self.seeds.push_front((height, *hash));
230231if self.seeds.len() > RX_SEEDS_CACHED {
232self.seeds.pop_back();
233// HACK: This is really inefficient but the amount of VMs cached is not a lot.
234self.vms.retain(|height, _| {
235self.seeds
236 .iter()
237 .any(|(cached_height, _)| height == cached_height)
238 });
239 }
240 }
241 }
242}
243244/// Get the last `amount` of RX seeds, the top height returned here will not necessarily be the RX VM for the top block
245/// in the chain as VMs include some lag before a seed activates.
246pub fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec<usize> {
247let mut seeds = Vec::with_capacity(amount);
248if is_randomx_seed_height(last_height) {
249 seeds.push(last_height);
250 amount -= 1;
251 }
252253for _ in 0..amount {
254if last_height == 0 {
255return seeds;
256 }
257258// We don't include the lag as we only want seeds not the specific seed for this height.
259let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1);
260 seeds.push(seed_height);
261 last_height = seed_height;
262 }
263264 seeds
265}
266267/// Gets the block hashes for the heights specified.
268async fn get_block_hashes<D: Database + Clone>(
269 heights: Vec<usize>,
270 database: D,
271) -> Result<Vec<[u8; 32]>, ContextCacheError> {
272let mut fut = FuturesOrdered::new();
273274for height in heights {
275let db = database.clone();
276 fut.push_back(async move {
277let BlockchainResponse::BlockHash(hash) = db
278 .clone()
279 .oneshot(BlockchainReadRequest::BlockHash(height, Chain::Main))
280 .await?
281else {
282panic!("Database sent incorrect response!");
283 };
284Result::<_, ContextCacheError>::Ok(hash)
285 });
286 }
287288let mut res = Vec::new();
289while let Some(hash) = fut.next().await {
290 res.push(hash?);
291 }
292Ok(res)
293}