cuprated/
main.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![allow(
4    unused_imports,
5    unreachable_pub,
6    unreachable_code,
7    unused_crate_dependencies,
8    dead_code,
9    unused_variables,
10    clippy::needless_pass_by_value,
11    clippy::unused_async,
12    clippy::diverging_sub_expression,
13    unused_mut,
14    clippy::let_unit_value,
15    clippy::needless_pass_by_ref_mut,
16    reason = "TODO: remove after v1.0.0"
17)]
18
19use std::{mem, sync::Arc};
20
21use p2p::initialize_zones_p2p;
22use tokio::sync::mpsc;
23use tower::{Service, ServiceExt};
24use tracing::{error, info, level_filters::LevelFilter};
25use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry};
26
27use cuprate_consensus_context::{
28    BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
29};
30use cuprate_database::{InitError, DATABASE_CORRUPT_MSG};
31use cuprate_helper::time::secs_to_hms;
32use cuprate_p2p_core::{transports::Tcp, ClearNet};
33use cuprate_types::blockchain::BlockchainWriteRequest;
34use txpool::IncomingTxHandler;
35
36use crate::{
37    config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter,
38};
39
40mod blockchain;
41mod commands;
42mod config;
43mod constants;
44mod killswitch;
45mod logging;
46mod p2p;
47mod rpc;
48mod signals;
49mod statics;
50mod txpool;
51mod version;
52
53fn main() {
54    // Initialize the killswitch.
55    killswitch::init_killswitch();
56
57    // Initialize global static `LazyLock` data.
58    statics::init_lazylock_statics();
59
60    let config = config::read_config_and_args();
61
62    blockchain::set_fast_sync_hashes(config.fast_sync, config.network());
63
64    // Initialize logging.
65    logging::init_logging(&config);
66
67    //Printing configuration
68    info!("{config}");
69
70    // Initialize the thread-pools
71
72    init_global_rayon_pool(&config);
73
74    let rt = init_tokio_rt(&config);
75
76    let db_thread_pool = cuprate_database_service::init_thread_pool(
77        cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads),
78    );
79
80    // Start the blockchain & tx-pool databases.
81
82    let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
83        cuprate_blockchain::service::init_with_pool(
84            config.blockchain_config(),
85            Arc::clone(&db_thread_pool),
86        )
87        .inspect_err(|e| error!("Blockchain database error: {e}"))
88        .expect(DATABASE_CORRUPT_MSG);
89
90    let (txpool_read_handle, txpool_write_handle, _) =
91        cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool)
92            .inspect_err(|e| error!("Txpool database error: {e}"))
93            .expect(DATABASE_CORRUPT_MSG);
94
95    // Initialize async tasks.
96
97    rt.block_on(async move {
98        // TODO: Add an argument/option for keeping alt blocks between restart.
99        blockchain_write_handle
100            .ready()
101            .await
102            .expect(PANIC_CRITICAL_SERVICE_ERROR)
103            .call(BlockchainWriteRequest::FlushAltBlocks)
104            .await
105            .expect(PANIC_CRITICAL_SERVICE_ERROR);
106
107        // Check add the genesis block to the blockchain.
108        blockchain::check_add_genesis(
109            &mut blockchain_read_handle,
110            &mut blockchain_write_handle,
111            config.network(),
112        )
113        .await;
114
115        // Start the context service and the block/tx verifier.
116        let context_svc =
117            blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
118                .await
119                .unwrap();
120
121        // Start p2p network zones
122        let (network_interfaces, tx_handler_subscribers) = p2p::initialize_zones_p2p(
123            &config,
124            context_svc.clone(),
125            blockchain_write_handle.clone(),
126            blockchain_read_handle.clone(),
127            txpool_write_handle.clone(),
128            txpool_read_handle.clone(),
129        )
130        .await;
131
132        // Create the incoming tx handler service.
133        let tx_handler = IncomingTxHandler::init(
134            network_interfaces.clearnet_network_interface.clone(),
135            txpool_write_handle.clone(),
136            txpool_read_handle,
137            context_svc.clone(),
138            blockchain_read_handle.clone(),
139        );
140
141        // Send tx handler sender to all network zones
142        for zone in tx_handler_subscribers {
143            if zone.send(tx_handler.clone()).is_err() {
144                unreachable!()
145            }
146        }
147
148        // Initialize the blockchain manager.
149        blockchain::init_blockchain_manager(
150            network_interfaces.clearnet_network_interface,
151            blockchain_write_handle,
152            blockchain_read_handle,
153            txpool_write_handle,
154            context_svc.clone(),
155            config.block_downloader_config(),
156        )
157        .await;
158
159        // Start the command listener.
160        if std::io::IsTerminal::is_terminal(&std::io::stdin()) {
161            let (command_tx, command_rx) = mpsc::channel(1);
162            std::thread::spawn(|| commands::command_listener(command_tx));
163
164            // Wait on the io_loop, spawned on a separate task as this improves performance.
165            tokio::spawn(commands::io_loop(command_rx, context_svc))
166                .await
167                .unwrap();
168        } else {
169            // If no STDIN, await OS exit signal.
170            info!("Terminal/TTY not detected, disabling STDIN commands");
171            tokio::signal::ctrl_c().await.unwrap();
172        }
173    });
174}
175
176/// Initialize the [`tokio`] runtime.
177fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
178    tokio::runtime::Builder::new_multi_thread()
179        .worker_threads(config.tokio.threads)
180        .thread_name("cuprated-tokio")
181        .enable_all()
182        .build()
183        .unwrap()
184}
185
186/// Initialize the global [`rayon`] thread-pool.
187fn init_global_rayon_pool(config: &Config) {
188    rayon::ThreadPoolBuilder::new()
189        .num_threads(config.rayon.threads)
190        .thread_name(|index| format!("cuprated-rayon-{index}"))
191        .build_global()
192        .unwrap();
193}