cuprate_consensus_context/
task.rs

1//! Context Task
2//!
3//! This module contains the async task that handles keeping track of blockchain context.
4//! It holds all the context caches and handles [`tower::Service`] requests.
5//!
6use std::sync::Arc;
7
8use arc_swap::ArcSwap;
9use futures::channel::oneshot;
10use tokio::sync::mpsc;
11use tower::ServiceExt;
12use tracing::Instrument;
13
14use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
15use cuprate_helper::cast::u64_to_usize;
16use cuprate_types::{
17    blockchain::{BlockchainReadRequest, BlockchainResponse},
18    Chain, HardFork,
19};
20
21use crate::{
22    alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
23    difficulty::DifficultyCache,
24    hardforks::HardForkState,
25    rx_vms,
26    weight::BlockWeightsCache,
27    BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, ContextCacheError,
28    ContextConfig, Database, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
29};
30
31/// A request from the context service to the context task.
32pub(super) struct ContextTaskRequest {
33    /// The request.
34    pub req: BlockChainContextRequest,
35    /// The response channel.
36    pub tx: oneshot::Sender<Result<BlockChainContextResponse, tower::BoxError>>,
37    /// The tracing span of the requester.
38    pub span: tracing::Span,
39}
40
41/// The Context task that keeps the blockchain context and handles requests.
42pub(crate) struct ContextTask<D: Database> {
43    context_cache: Arc<ArcSwap<BlockchainContext>>,
44
45    /// The difficulty cache.
46    difficulty_cache: DifficultyCache,
47    /// The weight cache.
48    weight_cache: BlockWeightsCache,
49    /// The RX VM cache.
50    rx_vm_cache: rx_vms::RandomXVmCache,
51    /// The hard-fork state cache.
52    hardfork_state: HardForkState,
53
54    alt_chain_cache_map: AltChainMap,
55
56    /// The current chain height.
57    chain_height: usize,
58    /// The top block hash.
59    top_block_hash: [u8; 32],
60    /// The total amount of coins generated.
61    already_generated_coins: u64,
62
63    database: D,
64}
65
66impl<D: Database + Clone + Send + 'static> ContextTask<D> {
67    /// Initialize the [`ContextTask`], this will need to pull a lot of data from the database so may take a
68    /// while to complete.
69    pub(crate) async fn init_context(
70        cfg: ContextConfig,
71        mut database: D,
72    ) -> Result<(Self, Arc<ArcSwap<BlockchainContext>>), ContextCacheError> {
73        let ContextConfig {
74            difficulty_cfg,
75            weights_config,
76            hard_fork_cfg,
77        } = cfg;
78
79        tracing::debug!("Initialising blockchain context");
80
81        let BlockchainResponse::ChainHeight(chain_height, top_block_hash) = database
82            .ready()
83            .await?
84            .call(BlockchainReadRequest::ChainHeight)
85            .await?
86        else {
87            panic!("Database sent incorrect response!");
88        };
89
90        let BlockchainResponse::GeneratedCoins(already_generated_coins) = database
91            .ready()
92            .await?
93            .call(BlockchainReadRequest::GeneratedCoins(chain_height - 1))
94            .await?
95        else {
96            panic!("Database sent incorrect response!");
97        };
98
99        let db = database.clone();
100        let hardfork_state_handle = tokio::spawn(async move {
101            HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
102        });
103
104        let db = database.clone();
105        let difficulty_cache_handle = tokio::spawn(async move {
106            DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db, Chain::Main)
107                .await
108        });
109
110        let db = database.clone();
111        let weight_cache_handle = tokio::spawn(async move {
112            BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db, Chain::Main)
113                .await
114        });
115
116        // Wait for the hardfork state to finish first as we need it to start the randomX VM cache.
117        let hardfork_state = hardfork_state_handle.await.unwrap()?;
118        let current_hf = hardfork_state.current_hardfork();
119
120        let db = database.clone();
121        let rx_seed_handle = tokio::spawn(async move {
122            rx_vms::RandomXVmCache::init_from_chain_height(chain_height, &current_hf, db).await
123        });
124
125        let difficulty_cache = difficulty_cache_handle.await.unwrap()?;
126        let weight_cache = weight_cache_handle.await.unwrap()?;
127
128        let blockchain_context = blockchain_context(
129            &weight_cache,
130            &difficulty_cache,
131            current_hf,
132            top_block_hash,
133            chain_height,
134            already_generated_coins,
135        );
136
137        let context_cache = Arc::new(ArcSwap::from_pointee(blockchain_context));
138
139        let context_svc = Self {
140            context_cache: Arc::clone(&context_cache),
141            difficulty_cache,
142            weight_cache,
143            rx_vm_cache: rx_seed_handle.await.unwrap()?,
144            hardfork_state,
145            alt_chain_cache_map: AltChainMap::new(),
146            chain_height,
147            already_generated_coins,
148            top_block_hash,
149            database,
150        };
151
152        Ok((context_svc, context_cache))
153    }
154
155    fn update_blockchain_context(&self) {
156        let context = blockchain_context(
157            &self.weight_cache,
158            &self.difficulty_cache,
159            self.hardfork_state.current_hardfork(),
160            self.top_block_hash,
161            self.chain_height,
162            self.already_generated_coins,
163        );
164
165        self.context_cache.store(Arc::new(context));
166    }
167
168    /// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`].
169    pub(crate) async fn handle_req(
170        &mut self,
171        req: BlockChainContextRequest,
172    ) -> Result<BlockChainContextResponse, tower::BoxError> {
173        Ok(match req {
174            BlockChainContextRequest::CurrentRxVms => {
175                BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
176            }
177            BlockChainContextRequest::BatchGetDifficulties(blocks) => {
178                tracing::debug!("Getting batch difficulties len: {}", blocks.len() + 1);
179
180                let next_diffs = self
181                    .difficulty_cache
182                    .next_difficulties(blocks, self.hardfork_state.current_hardfork());
183                BlockChainContextResponse::BatchDifficulties(next_diffs)
184            }
185            BlockChainContextRequest::NewRXVM(vm) => {
186                tracing::debug!("Adding randomX VM to cache.");
187
188                self.rx_vm_cache.add_vm(vm);
189                BlockChainContextResponse::Ok
190            }
191            BlockChainContextRequest::Update(new) => {
192                tracing::debug!(
193                    "Updating blockchain cache with new block, height: {}",
194                    new.height
195                );
196
197                self.difficulty_cache.new_block(
198                    new.height,
199                    new.timestamp,
200                    new.cumulative_difficulty,
201                );
202
203                self.weight_cache
204                    .new_block(new.height, new.weight, new.long_term_weight);
205
206                self.hardfork_state.new_block(new.vote, new.height);
207
208                self.rx_vm_cache.new_block(new.height, &new.block_hash);
209
210                self.chain_height = new.height + 1;
211                self.top_block_hash = new.block_hash;
212                self.already_generated_coins = self
213                    .already_generated_coins
214                    .saturating_add(new.generated_coins);
215
216                self.update_blockchain_context();
217
218                BlockChainContextResponse::Ok
219            }
220            BlockChainContextRequest::PopBlocks { numb_blocks } => {
221                assert!(numb_blocks < self.chain_height);
222
223                self.difficulty_cache
224                    .pop_blocks_main_chain(numb_blocks, self.database.clone())
225                    .await?;
226                self.weight_cache
227                    .pop_blocks_main_chain(numb_blocks, self.database.clone())
228                    .await?;
229                self.rx_vm_cache
230                    .pop_blocks_main_chain(self.chain_height - numb_blocks - 1);
231                self.hardfork_state
232                    .pop_blocks_main_chain(numb_blocks, self.database.clone())
233                    .await?;
234
235                self.alt_chain_cache_map.clear();
236
237                self.chain_height -= numb_blocks;
238
239                let BlockchainResponse::GeneratedCoins(already_generated_coins) = self
240                    .database
241                    .ready()
242                    .await?
243                    .call(BlockchainReadRequest::GeneratedCoins(self.chain_height - 1))
244                    .await?
245                else {
246                    panic!("Database sent incorrect response!");
247                };
248
249                let BlockchainResponse::BlockHash(top_block_hash) = self
250                    .database
251                    .ready()
252                    .await?
253                    .call(BlockchainReadRequest::BlockHash(
254                        self.chain_height - 1,
255                        Chain::Main,
256                    ))
257                    .await?
258                else {
259                    panic!("Database returned incorrect response!");
260                };
261
262                self.already_generated_coins = already_generated_coins;
263                self.top_block_hash = top_block_hash;
264
265                self.update_blockchain_context();
266
267                BlockChainContextResponse::Ok
268            }
269            BlockChainContextRequest::ClearAltCache => {
270                self.alt_chain_cache_map.clear();
271
272                BlockChainContextResponse::Ok
273            }
274            BlockChainContextRequest::AltChainContextCache { prev_id, _token } => {
275                BlockChainContextResponse::AltChainContextCache(
276                    self.alt_chain_cache_map
277                        .get_alt_chain_context(prev_id, &mut self.database)
278                        .await?,
279                )
280            }
281            BlockChainContextRequest::AltChainDifficultyCache { prev_id, _token } => {
282                BlockChainContextResponse::AltChainDifficultyCache(
283                    get_alt_chain_difficulty_cache(
284                        prev_id,
285                        &self.difficulty_cache,
286                        self.database.clone(),
287                    )
288                    .await?,
289                )
290            }
291            BlockChainContextRequest::AltChainWeightCache { prev_id, _token } => {
292                BlockChainContextResponse::AltChainWeightCache(
293                    get_alt_chain_weight_cache(prev_id, &self.weight_cache, self.database.clone())
294                        .await?,
295                )
296            }
297            BlockChainContextRequest::AltChainRxVM {
298                height,
299                chain,
300                _token,
301            } => BlockChainContextResponse::AltChainRxVM(
302                self.rx_vm_cache
303                    .get_alt_vm(height, chain, &mut self.database)
304                    .await?,
305            ),
306            BlockChainContextRequest::AddAltChainContextCache { cache, _token } => {
307                self.alt_chain_cache_map.add_alt_cache(cache);
308                BlockChainContextResponse::Ok
309            }
310            BlockChainContextRequest::HardForkInfo(_)
311            | BlockChainContextRequest::FeeEstimate { .. }
312            | BlockChainContextRequest::AltChains
313            | BlockChainContextRequest::CalculatePow { .. } => {
314                todo!("finish https://github.com/Cuprate/cuprate/pull/297")
315            }
316        })
317    }
318
319    /// Run the [`ContextTask`], the task will listen for requests on the passed in channel. When the channel closes the
320    /// task will finish.
321    pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<ContextTaskRequest>) {
322        while let Some(req) = rx.recv().await {
323            let res = self.handle_req(req.req).instrument(req.span).await;
324            drop(req.tx.send(res));
325        }
326
327        tracing::info!("Shutting down blockchain context task.");
328    }
329}
330
331fn blockchain_context(
332    weight_cache: &BlockWeightsCache,
333    difficulty_cache: &DifficultyCache,
334
335    current_hf: HardFork,
336    top_hash: [u8; 32],
337    chain_height: usize,
338    already_generated_coins: u64,
339) -> BlockchainContext {
340    BlockchainContext {
341        context_to_verify_block: ContextToVerifyBlock {
342            median_weight_for_block_reward: weight_cache.median_for_block_reward(current_hf),
343            effective_median_weight: weight_cache.effective_median_block_weight(current_hf),
344            top_hash,
345            median_block_timestamp: difficulty_cache
346                .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
347            chain_height,
348            current_hf,
349            next_difficulty: difficulty_cache.next_difficulty(current_hf),
350            already_generated_coins,
351        },
352        cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
353        median_long_term_weight: weight_cache.median_long_term_weight(),
354        top_block_timestamp: difficulty_cache.top_block_timestamp(),
355    }
356}