cuprate_consensus_context/
task.rs
1use std::sync::Arc;
7
8use arc_swap::ArcSwap;
9use futures::channel::oneshot;
10use tokio::sync::mpsc;
11use tower::ServiceExt;
12use tracing::Instrument;
13
14use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
15use cuprate_helper::cast::u64_to_usize;
16use cuprate_types::{
17 blockchain::{BlockchainReadRequest, BlockchainResponse},
18 Chain, HardFork,
19};
20
21use crate::{
22 alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
23 difficulty::DifficultyCache,
24 hardforks::HardForkState,
25 rx_vms,
26 weight::BlockWeightsCache,
27 BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, ContextCacheError,
28 ContextConfig, Database, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
29};
30
31pub(super) struct ContextTaskRequest {
33 pub req: BlockChainContextRequest,
35 pub tx: oneshot::Sender<Result<BlockChainContextResponse, tower::BoxError>>,
37 pub span: tracing::Span,
39}
40
41pub(crate) struct ContextTask<D: Database> {
43 context_cache: Arc<ArcSwap<BlockchainContext>>,
44
45 difficulty_cache: DifficultyCache,
47 weight_cache: BlockWeightsCache,
49 rx_vm_cache: rx_vms::RandomXVmCache,
51 hardfork_state: HardForkState,
53
54 alt_chain_cache_map: AltChainMap,
55
56 chain_height: usize,
58 top_block_hash: [u8; 32],
60 already_generated_coins: u64,
62
63 database: D,
64}
65
66impl<D: Database + Clone + Send + 'static> ContextTask<D> {
67 pub(crate) async fn init_context(
70 cfg: ContextConfig,
71 mut database: D,
72 ) -> Result<(Self, Arc<ArcSwap<BlockchainContext>>), ContextCacheError> {
73 let ContextConfig {
74 difficulty_cfg,
75 weights_config,
76 hard_fork_cfg,
77 } = cfg;
78
79 tracing::debug!("Initialising blockchain context");
80
81 let BlockchainResponse::ChainHeight(chain_height, top_block_hash) = database
82 .ready()
83 .await?
84 .call(BlockchainReadRequest::ChainHeight)
85 .await?
86 else {
87 panic!("Database sent incorrect response!");
88 };
89
90 let BlockchainResponse::GeneratedCoins(already_generated_coins) = database
91 .ready()
92 .await?
93 .call(BlockchainReadRequest::GeneratedCoins(chain_height - 1))
94 .await?
95 else {
96 panic!("Database sent incorrect response!");
97 };
98
99 let db = database.clone();
100 let hardfork_state_handle = tokio::spawn(async move {
101 HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
102 });
103
104 let db = database.clone();
105 let difficulty_cache_handle = tokio::spawn(async move {
106 DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db, Chain::Main)
107 .await
108 });
109
110 let db = database.clone();
111 let weight_cache_handle = tokio::spawn(async move {
112 BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db, Chain::Main)
113 .await
114 });
115
116 let hardfork_state = hardfork_state_handle.await.unwrap()?;
118 let current_hf = hardfork_state.current_hardfork();
119
120 let db = database.clone();
121 let rx_seed_handle = tokio::spawn(async move {
122 rx_vms::RandomXVmCache::init_from_chain_height(chain_height, ¤t_hf, db).await
123 });
124
125 let difficulty_cache = difficulty_cache_handle.await.unwrap()?;
126 let weight_cache = weight_cache_handle.await.unwrap()?;
127
128 let blockchain_context = blockchain_context(
129 &weight_cache,
130 &difficulty_cache,
131 current_hf,
132 top_block_hash,
133 chain_height,
134 already_generated_coins,
135 );
136
137 let context_cache = Arc::new(ArcSwap::from_pointee(blockchain_context));
138
139 let context_svc = Self {
140 context_cache: Arc::clone(&context_cache),
141 difficulty_cache,
142 weight_cache,
143 rx_vm_cache: rx_seed_handle.await.unwrap()?,
144 hardfork_state,
145 alt_chain_cache_map: AltChainMap::new(),
146 chain_height,
147 already_generated_coins,
148 top_block_hash,
149 database,
150 };
151
152 Ok((context_svc, context_cache))
153 }
154
155 fn update_blockchain_context(&self) {
156 let context = blockchain_context(
157 &self.weight_cache,
158 &self.difficulty_cache,
159 self.hardfork_state.current_hardfork(),
160 self.top_block_hash,
161 self.chain_height,
162 self.already_generated_coins,
163 );
164
165 self.context_cache.store(Arc::new(context));
166 }
167
168 pub(crate) async fn handle_req(
170 &mut self,
171 req: BlockChainContextRequest,
172 ) -> Result<BlockChainContextResponse, tower::BoxError> {
173 Ok(match req {
174 BlockChainContextRequest::CurrentRxVms => {
175 BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
176 }
177 BlockChainContextRequest::BatchGetDifficulties(blocks) => {
178 tracing::debug!("Getting batch difficulties len: {}", blocks.len() + 1);
179
180 let next_diffs = self
181 .difficulty_cache
182 .next_difficulties(blocks, self.hardfork_state.current_hardfork());
183 BlockChainContextResponse::BatchDifficulties(next_diffs)
184 }
185 BlockChainContextRequest::NewRXVM(vm) => {
186 tracing::debug!("Adding randomX VM to cache.");
187
188 self.rx_vm_cache.add_vm(vm);
189 BlockChainContextResponse::Ok
190 }
191 BlockChainContextRequest::Update(new) => {
192 tracing::debug!(
193 "Updating blockchain cache with new block, height: {}",
194 new.height
195 );
196
197 self.difficulty_cache.new_block(
198 new.height,
199 new.timestamp,
200 new.cumulative_difficulty,
201 );
202
203 self.weight_cache
204 .new_block(new.height, new.weight, new.long_term_weight);
205
206 self.hardfork_state.new_block(new.vote, new.height);
207
208 self.rx_vm_cache.new_block(new.height, &new.block_hash);
209
210 self.chain_height = new.height + 1;
211 self.top_block_hash = new.block_hash;
212 self.already_generated_coins = self
213 .already_generated_coins
214 .saturating_add(new.generated_coins);
215
216 self.update_blockchain_context();
217
218 BlockChainContextResponse::Ok
219 }
220 BlockChainContextRequest::PopBlocks { numb_blocks } => {
221 assert!(numb_blocks < self.chain_height);
222
223 self.difficulty_cache
224 .pop_blocks_main_chain(numb_blocks, self.database.clone())
225 .await?;
226 self.weight_cache
227 .pop_blocks_main_chain(numb_blocks, self.database.clone())
228 .await?;
229 self.rx_vm_cache
230 .pop_blocks_main_chain(self.chain_height - numb_blocks - 1);
231 self.hardfork_state
232 .pop_blocks_main_chain(numb_blocks, self.database.clone())
233 .await?;
234
235 self.alt_chain_cache_map.clear();
236
237 self.chain_height -= numb_blocks;
238
239 let BlockchainResponse::GeneratedCoins(already_generated_coins) = self
240 .database
241 .ready()
242 .await?
243 .call(BlockchainReadRequest::GeneratedCoins(self.chain_height - 1))
244 .await?
245 else {
246 panic!("Database sent incorrect response!");
247 };
248
249 let BlockchainResponse::BlockHash(top_block_hash) = self
250 .database
251 .ready()
252 .await?
253 .call(BlockchainReadRequest::BlockHash(
254 self.chain_height - 1,
255 Chain::Main,
256 ))
257 .await?
258 else {
259 panic!("Database returned incorrect response!");
260 };
261
262 self.already_generated_coins = already_generated_coins;
263 self.top_block_hash = top_block_hash;
264
265 self.update_blockchain_context();
266
267 BlockChainContextResponse::Ok
268 }
269 BlockChainContextRequest::ClearAltCache => {
270 self.alt_chain_cache_map.clear();
271
272 BlockChainContextResponse::Ok
273 }
274 BlockChainContextRequest::AltChainContextCache { prev_id, _token } => {
275 BlockChainContextResponse::AltChainContextCache(
276 self.alt_chain_cache_map
277 .get_alt_chain_context(prev_id, &mut self.database)
278 .await?,
279 )
280 }
281 BlockChainContextRequest::AltChainDifficultyCache { prev_id, _token } => {
282 BlockChainContextResponse::AltChainDifficultyCache(
283 get_alt_chain_difficulty_cache(
284 prev_id,
285 &self.difficulty_cache,
286 self.database.clone(),
287 )
288 .await?,
289 )
290 }
291 BlockChainContextRequest::AltChainWeightCache { prev_id, _token } => {
292 BlockChainContextResponse::AltChainWeightCache(
293 get_alt_chain_weight_cache(prev_id, &self.weight_cache, self.database.clone())
294 .await?,
295 )
296 }
297 BlockChainContextRequest::AltChainRxVM {
298 height,
299 chain,
300 _token,
301 } => BlockChainContextResponse::AltChainRxVM(
302 self.rx_vm_cache
303 .get_alt_vm(height, chain, &mut self.database)
304 .await?,
305 ),
306 BlockChainContextRequest::AddAltChainContextCache { cache, _token } => {
307 self.alt_chain_cache_map.add_alt_cache(cache);
308 BlockChainContextResponse::Ok
309 }
310 BlockChainContextRequest::HardForkInfo(_)
311 | BlockChainContextRequest::FeeEstimate { .. }
312 | BlockChainContextRequest::AltChains
313 | BlockChainContextRequest::CalculatePow { .. } => {
314 todo!("finish https://github.com/Cuprate/cuprate/pull/297")
315 }
316 })
317 }
318
319 pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<ContextTaskRequest>) {
322 while let Some(req) = rx.recv().await {
323 let res = self.handle_req(req.req).instrument(req.span).await;
324 drop(req.tx.send(res));
325 }
326
327 tracing::info!("Shutting down blockchain context task.");
328 }
329}
330
331fn blockchain_context(
332 weight_cache: &BlockWeightsCache,
333 difficulty_cache: &DifficultyCache,
334
335 current_hf: HardFork,
336 top_hash: [u8; 32],
337 chain_height: usize,
338 already_generated_coins: u64,
339) -> BlockchainContext {
340 BlockchainContext {
341 context_to_verify_block: ContextToVerifyBlock {
342 median_weight_for_block_reward: weight_cache.median_for_block_reward(current_hf),
343 effective_median_weight: weight_cache.effective_median_block_weight(current_hf),
344 top_hash,
345 median_block_timestamp: difficulty_cache
346 .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
347 chain_height,
348 current_hf,
349 next_difficulty: difficulty_cache.next_difficulty(current_hf),
350 already_generated_coins,
351 },
352 cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
353 median_long_term_weight: weight_cache.median_long_term_weight(),
354 top_block_timestamp: difficulty_cache.top_block_timestamp(),
355 }
356}