cuprate_consensus_context/
rx_vms.rs1use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9};
10
11use futures::{stream::FuturesOrdered, StreamExt};
12use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VmInner};
13use rayon::prelude::*;
14use thread_local::ThreadLocal;
15use tower::ServiceExt;
16use tracing::instrument;
17
18use cuprate_consensus_rules::blocks::randomx_seed_height;
19use cuprate_consensus_rules::{
20 blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
21 HardFork,
22};
23use cuprate_helper::asynch::rayon_spawn_async;
24use cuprate_types::{
25 blockchain::{BlockchainReadRequest, BlockchainResponse},
26 Chain,
27};
28
29use crate::{ContextCacheError, Database};
30
31pub const RX_SEEDS_CACHED: usize = 2;
33
34#[derive(Debug)]
36pub struct RandomXVm {
37 vms: ThreadLocal<VmInner>,
39 cache: RandomXCache,
41 flags: RandomXFlag,
43}
44
45impl RandomXVm {
46 pub fn new(seed: &[u8; 32]) -> Result<Self, RandomXError> {
48 let flags = RandomXFlag::get_recommended_flags();
50
51 let cache = RandomXCache::new(flags, seed.as_slice())?;
52
53 Ok(Self {
54 vms: ThreadLocal::new(),
55 cache,
56 flags,
57 })
58 }
59}
60
61impl RandomX for RandomXVm {
62 type Error = RandomXError;
63
64 fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
65 self.vms
66 .get_or_try(|| VmInner::new(self.flags, Some(self.cache.clone()), None))?
67 .calculate_hash(buf)
68 .map(|out| out.try_into().unwrap())
69 }
70}
71
72#[derive(Clone, Debug)]
75pub struct RandomXVmCache {
76 pub seeds: VecDeque<(usize, [u8; 32])>,
78 pub vms: HashMap<usize, Arc<RandomXVm>>,
80
81 pub cached_vm: Option<([u8; 32], Arc<RandomXVm>)>,
83}
84
85impl RandomXVmCache {
86 #[instrument(name = "init_rx_vm_cache", level = "info", skip(database))]
87 pub async fn init_from_chain_height<D: Database + Clone>(
88 chain_height: usize,
89 hf: &HardFork,
90 database: D,
91 ) -> Result<Self, ContextCacheError> {
92 let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
93 let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
94
95 tracing::debug!("last {RX_SEEDS_CACHED} randomX seed heights: {seed_heights:?}",);
96
97 let seeds: VecDeque<(usize, [u8; 32])> =
98 seed_heights.into_iter().zip(seed_hashes).collect();
99
100 let vms = if hf >= &HardFork::V12 {
101 tracing::debug!("Creating RandomX VMs");
102 let seeds_clone = seeds.clone();
103 rayon_spawn_async(move || {
104 seeds_clone
105 .par_iter()
106 .map(|(height, seed)| {
107 (
108 *height,
109 Arc::new(RandomXVm::new(seed).expect("Failed to create RandomX VM!")),
110 )
111 })
112 .collect()
113 })
114 .await
115 } else {
116 tracing::debug!("We are before hard-fork 12 randomX VMs are not needed.");
117 HashMap::new()
118 };
119
120 Ok(Self {
121 seeds,
122 vms,
123 cached_vm: None,
124 })
125 }
126
127 pub fn add_vm(&mut self, vm: ([u8; 32], Arc<RandomXVm>)) {
129 self.cached_vm.replace(vm);
130 }
131
132 pub async fn get_alt_vm<D: Database>(
135 &self,
136 height: usize,
137 chain: Chain,
138 database: D,
139 ) -> Result<Arc<RandomXVm>, ContextCacheError> {
140 let seed_height = randomx_seed_height(height);
141
142 let BlockchainResponse::BlockHash(seed_hash) = database
143 .oneshot(BlockchainReadRequest::BlockHash(seed_height, chain))
144 .await?
145 else {
146 panic!("Database returned wrong response!");
147 };
148
149 for (vm_main_chain_height, vm_seed_hash) in &self.seeds {
150 if vm_seed_hash == &seed_hash {
151 let Some(vm) = self.vms.get(vm_main_chain_height) else {
152 break;
153 };
154
155 return Ok(Arc::clone(vm));
156 }
157 }
158
159 let alt_vm = rayon_spawn_async(move || Arc::new(RandomXVm::new(&seed_hash).unwrap())).await;
160
161 Ok(alt_vm)
162 }
163
164 pub async fn get_vms(&mut self) -> HashMap<usize, Arc<RandomXVm>> {
166 match self.seeds.len().checked_sub(self.vms.len()) {
167 Some(0) => (),
169 Some(1) => {
171 let (seed_height, next_seed_hash) = *self.seeds.front().unwrap();
172
173 let new_vm = 'new_vm_block: {
174 tracing::debug!(
175 "Initializing RandomX VM for seed: {}",
176 hex::encode(next_seed_hash)
177 );
178
179 if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
181 if cached_hash == next_seed_hash {
182 tracing::debug!("VM was already created.");
183 break 'new_vm_block cached_vm;
184 }
185 };
186
187 rayon_spawn_async(move || Arc::new(RandomXVm::new(&next_seed_hash).unwrap()))
188 .await
189 };
190
191 self.vms.insert(seed_height, new_vm);
192 }
193 _ => {
195 tracing::debug!("RandomX has activated, initialising VMs");
197
198 let seeds_clone = self.seeds.clone();
199 self.vms = rayon_spawn_async(move || {
200 seeds_clone
201 .par_iter()
202 .map(|(height, seed)| {
203 let vm = RandomXVm::new(seed).expect("Failed to create RandomX VM!");
204 let vm = Arc::new(vm);
205 (*height, vm)
206 })
207 .collect()
208 })
209 .await;
210 }
211 }
212
213 self.vms.clone()
214 }
215
216 pub fn pop_blocks_main_chain(&mut self, new_height: usize) {
218 self.seeds.retain(|(height, _)| *height < new_height);
219 self.vms.retain(|height, _| *height < new_height);
220 }
221
222 pub fn new_block(&mut self, height: usize, hash: &[u8; 32]) {
226 if is_randomx_seed_height(height) {
227 tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",);
228
229 self.seeds.push_front((height, *hash));
230
231 if self.seeds.len() > RX_SEEDS_CACHED {
232 self.seeds.pop_back();
233 self.vms.retain(|height, _| {
235 self.seeds
236 .iter()
237 .any(|(cached_height, _)| height == cached_height)
238 });
239 }
240 }
241 }
242}
243
244pub fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec<usize> {
247 let mut seeds = Vec::with_capacity(amount);
248 if is_randomx_seed_height(last_height) {
249 seeds.push(last_height);
250 amount -= 1;
251 }
252
253 for _ in 0..amount {
254 if last_height == 0 {
255 return seeds;
256 }
257
258 let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1);
260 seeds.push(seed_height);
261 last_height = seed_height;
262 }
263
264 seeds
265}
266
267async fn get_block_hashes<D: Database + Clone>(
269 heights: Vec<usize>,
270 database: D,
271) -> Result<Vec<[u8; 32]>, ContextCacheError> {
272 let mut fut = FuturesOrdered::new();
273
274 for height in heights {
275 let db = database.clone();
276 fut.push_back(async move {
277 let BlockchainResponse::BlockHash(hash) = db
278 .clone()
279 .oneshot(BlockchainReadRequest::BlockHash(height, Chain::Main))
280 .await?
281 else {
282 panic!("Database sent incorrect response!");
283 };
284 Result::<_, ContextCacheError>::Ok(hash)
285 });
286 }
287
288 let mut res = Vec::new();
289 while let Some(hash) = fut.next().await {
290 res.push(hash?);
291 }
292 Ok(res)
293}