cuprate_consensus_context/
rx_vms.rs

1//! RandomX VM Cache
2//!
3//! This module keeps track of the RandomX VM to calculate the next blocks proof-of-work, if the block needs a randomX VM and potentially
4//! more VMs around this height.
5//!
6use std::{
7    collections::{HashMap, VecDeque},
8    sync::Arc,
9};
10
11use futures::{stream::FuturesOrdered, StreamExt};
12use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VmInner};
13use rayon::prelude::*;
14use thread_local::ThreadLocal;
15use tower::ServiceExt;
16use tracing::instrument;
17
18use cuprate_consensus_rules::blocks::randomx_seed_height;
19use cuprate_consensus_rules::{
20    blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
21    HardFork,
22};
23use cuprate_helper::asynch::rayon_spawn_async;
24use cuprate_types::{
25    blockchain::{BlockchainReadRequest, BlockchainResponse},
26    Chain,
27};
28
29use crate::{ContextCacheError, Database};
30
31/// The amount of randomX VMs to keep in the cache.
32pub const RX_SEEDS_CACHED: usize = 2;
33
34/// A multithreaded randomX VM.
35#[derive(Debug)]
36pub struct RandomXVm {
37    /// These RandomX VMs all share the same cache.
38    vms: ThreadLocal<VmInner>,
39    /// The RandomX cache.
40    cache: RandomXCache,
41    /// The flags used to start the RandomX VMs.
42    flags: RandomXFlag,
43}
44
45impl RandomXVm {
46    /// Create a new multithreaded randomX VM with the provided seed.
47    pub fn new(seed: &[u8; 32]) -> Result<Self, RandomXError> {
48        // TODO: allow passing in flags.
49        let flags = RandomXFlag::get_recommended_flags();
50
51        let cache = RandomXCache::new(flags, seed.as_slice())?;
52
53        Ok(Self {
54            vms: ThreadLocal::new(),
55            cache,
56            flags,
57        })
58    }
59}
60
61impl RandomX for RandomXVm {
62    type Error = RandomXError;
63
64    fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
65        self.vms
66            .get_or_try(|| VmInner::new(self.flags, Some(self.cache.clone()), None))?
67            .calculate_hash(buf)
68            .map(|out| out.try_into().unwrap())
69    }
70}
71
72/// The randomX VMs cache, keeps the VM needed to calculate the current block's proof-of-work hash (if a VM is needed) and a
73/// couple more around this VM.
74#[derive(Clone, Debug)]
75pub struct RandomXVmCache {
76    /// The top [`RX_SEEDS_CACHED`] RX seeds.  
77    pub seeds: VecDeque<(usize, [u8; 32])>,
78    /// The VMs for `seeds` (if after hf 12, otherwise this will be empty).
79    pub vms: HashMap<usize, Arc<RandomXVm>>,
80
81    /// A single cached VM that was given to us from a part of Cuprate.
82    pub cached_vm: Option<([u8; 32], Arc<RandomXVm>)>,
83}
84
85impl RandomXVmCache {
86    #[instrument(name = "init_rx_vm_cache", level = "info", skip(database))]
87    pub async fn init_from_chain_height<D: Database + Clone>(
88        chain_height: usize,
89        hf: &HardFork,
90        database: D,
91    ) -> Result<Self, ContextCacheError> {
92        let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
93        let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
94
95        tracing::debug!("last {RX_SEEDS_CACHED} randomX seed heights: {seed_heights:?}",);
96
97        let seeds: VecDeque<(usize, [u8; 32])> =
98            seed_heights.into_iter().zip(seed_hashes).collect();
99
100        let vms = if hf >= &HardFork::V12 {
101            tracing::debug!("Creating RandomX VMs");
102            let seeds_clone = seeds.clone();
103            rayon_spawn_async(move || {
104                seeds_clone
105                    .par_iter()
106                    .map(|(height, seed)| {
107                        (
108                            *height,
109                            Arc::new(RandomXVm::new(seed).expect("Failed to create RandomX VM!")),
110                        )
111                    })
112                    .collect()
113            })
114            .await
115        } else {
116            tracing::debug!("We are before hard-fork 12 randomX VMs are not needed.");
117            HashMap::new()
118        };
119
120        Ok(Self {
121            seeds,
122            vms,
123            cached_vm: None,
124        })
125    }
126
127    /// Add a randomX VM to the cache, with the seed it was created with.
128    pub fn add_vm(&mut self, vm: ([u8; 32], Arc<RandomXVm>)) {
129        self.cached_vm.replace(vm);
130    }
131
132    /// Creates a RX VM for an alt chain, looking at the main chain RX VMs to see if we can use one
133    /// of them first.
134    pub async fn get_alt_vm<D: Database>(
135        &self,
136        height: usize,
137        chain: Chain,
138        database: D,
139    ) -> Result<Arc<RandomXVm>, ContextCacheError> {
140        let seed_height = randomx_seed_height(height);
141
142        let BlockchainResponse::BlockHash(seed_hash) = database
143            .oneshot(BlockchainReadRequest::BlockHash(seed_height, chain))
144            .await?
145        else {
146            panic!("Database returned wrong response!");
147        };
148
149        for (vm_main_chain_height, vm_seed_hash) in &self.seeds {
150            if vm_seed_hash == &seed_hash {
151                let Some(vm) = self.vms.get(vm_main_chain_height) else {
152                    break;
153                };
154
155                return Ok(Arc::clone(vm));
156            }
157        }
158
159        let alt_vm = rayon_spawn_async(move || Arc::new(RandomXVm::new(&seed_hash).unwrap())).await;
160
161        Ok(alt_vm)
162    }
163
164    /// Get the main-chain RandomX VMs.
165    pub async fn get_vms(&mut self) -> HashMap<usize, Arc<RandomXVm>> {
166        match self.seeds.len().checked_sub(self.vms.len()) {
167            // No difference in the amount of seeds to VMs.
168            Some(0) => (),
169            // One more seed than VM.
170            Some(1) => {
171                let (seed_height, next_seed_hash) = *self.seeds.front().unwrap();
172
173                let new_vm = 'new_vm_block: {
174                    tracing::debug!(
175                        "Initializing RandomX VM for seed: {}",
176                        hex::encode(next_seed_hash)
177                    );
178
179                    // Check if we have been given the RX VM from another part of Cuprate.
180                    if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
181                        if cached_hash == next_seed_hash {
182                            tracing::debug!("VM was already created.");
183                            break 'new_vm_block cached_vm;
184                        }
185                    };
186
187                    rayon_spawn_async(move || Arc::new(RandomXVm::new(&next_seed_hash).unwrap()))
188                        .await
189                };
190
191                self.vms.insert(seed_height, new_vm);
192            }
193            // More than one more seed than VM.
194            _ => {
195                // this will only happen when syncing and rx activates.
196                tracing::debug!("RandomX has activated, initialising VMs");
197
198                let seeds_clone = self.seeds.clone();
199                self.vms = rayon_spawn_async(move || {
200                    seeds_clone
201                        .par_iter()
202                        .map(|(height, seed)| {
203                            let vm = RandomXVm::new(seed).expect("Failed to create RandomX VM!");
204                            let vm = Arc::new(vm);
205                            (*height, vm)
206                        })
207                        .collect()
208                })
209                .await;
210            }
211        }
212
213        self.vms.clone()
214    }
215
216    /// Removes all the RandomX VMs above the `new_height`.
217    pub fn pop_blocks_main_chain(&mut self, new_height: usize) {
218        self.seeds.retain(|(height, _)| *height < new_height);
219        self.vms.retain(|height, _| *height < new_height);
220    }
221
222    /// Add a new block to the VM cache.
223    ///
224    /// hash is the block hash not the blocks proof-of-work hash.
225    pub fn new_block(&mut self, height: usize, hash: &[u8; 32]) {
226        if is_randomx_seed_height(height) {
227            tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",);
228
229            self.seeds.push_front((height, *hash));
230
231            if self.seeds.len() > RX_SEEDS_CACHED {
232                self.seeds.pop_back();
233                // HACK: This is really inefficient but the amount of VMs cached is not a lot.
234                self.vms.retain(|height, _| {
235                    self.seeds
236                        .iter()
237                        .any(|(cached_height, _)| height == cached_height)
238                });
239            }
240        }
241    }
242}
243
244/// Get the last `amount` of RX seeds, the top height returned here will not necessarily be the RX VM for the top block
245/// in the chain as VMs include some lag before a seed activates.
246pub fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec<usize> {
247    let mut seeds = Vec::with_capacity(amount);
248    if is_randomx_seed_height(last_height) {
249        seeds.push(last_height);
250        amount -= 1;
251    }
252
253    for _ in 0..amount {
254        if last_height == 0 {
255            return seeds;
256        }
257
258        // We don't include the lag as we only want seeds not the specific seed for this height.
259        let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1);
260        seeds.push(seed_height);
261        last_height = seed_height;
262    }
263
264    seeds
265}
266
267/// Gets the block hashes for the heights specified.
268async fn get_block_hashes<D: Database + Clone>(
269    heights: Vec<usize>,
270    database: D,
271) -> Result<Vec<[u8; 32]>, ContextCacheError> {
272    let mut fut = FuturesOrdered::new();
273
274    for height in heights {
275        let db = database.clone();
276        fut.push_back(async move {
277            let BlockchainResponse::BlockHash(hash) = db
278                .clone()
279                .oneshot(BlockchainReadRequest::BlockHash(height, Chain::Main))
280                .await?
281            else {
282                panic!("Database sent incorrect response!");
283            };
284            Result::<_, ContextCacheError>::Ok(hash)
285        });
286    }
287
288    let mut res = Vec::new();
289    while let Some(hash) = fut.next().await {
290        res.push(hash?);
291    }
292    Ok(res)
293}