cuprated/
main.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![allow(
4    unused_imports,
5    unreachable_pub,
6    unreachable_code,
7    unused_crate_dependencies,
8    dead_code,
9    unused_variables,
10    clippy::needless_pass_by_value,
11    clippy::unused_async,
12    clippy::diverging_sub_expression,
13    unused_mut,
14    clippy::let_unit_value,
15    clippy::needless_pass_by_ref_mut,
16    reason = "TODO: remove after v1.0.0"
17)]
18
19use std::{mem, sync::Arc};
20
21use p2p::initialize_zones_p2p;
22use tokio::sync::mpsc;
23use tower::{Service, ServiceExt};
24use tracing::{error, info, level_filters::LevelFilter};
25use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry};
26
27use cuprate_consensus_context::{
28    BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
29};
30use cuprate_database::{InitError, DATABASE_CORRUPT_MSG};
31use cuprate_helper::time::secs_to_hms;
32use cuprate_p2p_core::{transports::Tcp, ClearNet};
33use cuprate_types::blockchain::BlockchainWriteRequest;
34use txpool::IncomingTxHandler;
35
36use crate::{
37    config::Config,
38    constants::PANIC_CRITICAL_SERVICE_ERROR,
39    logging::CupratedTracingFilter,
40    tor::{initialize_tor_if_enabled, TorMode},
41};
42
43mod blockchain;
44mod commands;
45mod config;
46mod constants;
47mod killswitch;
48mod logging;
49mod p2p;
50mod rpc;
51mod signals;
52mod statics;
53mod tor;
54mod txpool;
55mod version;
56
57fn main() {
58    // Initialize the killswitch.
59    killswitch::init_killswitch();
60
61    // Initialize global static `LazyLock` data.
62    statics::init_lazylock_statics();
63
64    let config = config::read_config_and_args();
65
66    blockchain::set_fast_sync_hashes(config.fast_sync, config.network());
67
68    // Initialize logging.
69    logging::init_logging(&config);
70
71    //Printing configuration
72    info!("{config}");
73
74    // Initialize the thread-pools
75
76    init_global_rayon_pool(&config);
77
78    let rt = init_tokio_rt(&config);
79
80    let db_thread_pool = cuprate_database_service::init_thread_pool(
81        cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads),
82    );
83
84    // Start the blockchain & tx-pool databases.
85
86    let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
87        cuprate_blockchain::service::init_with_pool(
88            config.blockchain_config(),
89            Arc::clone(&db_thread_pool),
90        )
91        .inspect_err(|e| error!("Blockchain database error: {e}"))
92        .expect(DATABASE_CORRUPT_MSG);
93
94    let (txpool_read_handle, txpool_write_handle, _) =
95        cuprate_txpool::service::init_with_pool(&config.txpool_config(), db_thread_pool)
96            .inspect_err(|e| error!("Txpool database error: {e}"))
97            .expect(DATABASE_CORRUPT_MSG);
98
99    // Initialize async tasks.
100
101    rt.block_on(async move {
102        // TODO: Add an argument/option for keeping alt blocks between restart.
103        blockchain_write_handle
104            .ready()
105            .await
106            .expect(PANIC_CRITICAL_SERVICE_ERROR)
107            .call(BlockchainWriteRequest::FlushAltBlocks)
108            .await
109            .expect(PANIC_CRITICAL_SERVICE_ERROR);
110
111        // Check add the genesis block to the blockchain.
112        blockchain::check_add_genesis(
113            &mut blockchain_read_handle,
114            &mut blockchain_write_handle,
115            config.network(),
116        )
117        .await;
118
119        // Start the context service and the block/tx verifier.
120        let context_svc =
121            blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
122                .await
123                .unwrap();
124
125        // Bootstrap or configure Tor if enabled.
126        let tor_context = initialize_tor_if_enabled(&config).await;
127
128        // Start p2p network zones
129        let (network_interfaces, tx_handler_subscribers) = p2p::initialize_zones_p2p(
130            &config,
131            context_svc.clone(),
132            blockchain_read_handle.clone(),
133            txpool_read_handle.clone(),
134            tor_context,
135        )
136        .await;
137
138        // Create the incoming tx handler service.
139        let tx_handler = IncomingTxHandler::init(
140            config.storage.txpool.clone(),
141            network_interfaces.clearnet_network_interface.clone(),
142            network_interfaces.tor_network_interface,
143            txpool_write_handle.clone(),
144            txpool_read_handle.clone(),
145            context_svc.clone(),
146            blockchain_read_handle.clone(),
147        )
148        .await;
149
150        // Send tx handler sender to all network zones
151        for zone in tx_handler_subscribers {
152            if zone.send(tx_handler.clone()).is_err() {
153                unreachable!()
154            }
155        }
156
157        // Initialize the blockchain manager.
158        blockchain::init_blockchain_manager(
159            network_interfaces.clearnet_network_interface,
160            blockchain_write_handle,
161            blockchain_read_handle.clone(),
162            tx_handler.txpool_manager.clone(),
163            context_svc.clone(),
164            config.block_downloader_config(),
165        )
166        .await;
167
168        // Initialize the RPC server(s).
169        rpc::init_rpc_servers(
170            config.rpc,
171            config.network,
172            blockchain_read_handle,
173            context_svc.clone(),
174            txpool_read_handle,
175            tx_handler,
176        );
177
178        // Start the command listener.
179        if std::io::IsTerminal::is_terminal(&std::io::stdin()) {
180            let (command_tx, command_rx) = mpsc::channel(1);
181            std::thread::spawn(|| commands::command_listener(command_tx));
182
183            // Wait on the io_loop, spawned on a separate task as this improves performance.
184            tokio::spawn(commands::io_loop(command_rx, context_svc))
185                .await
186                .unwrap();
187        } else {
188            // If no STDIN, await OS exit signal.
189            info!("Terminal/TTY not detected, disabling STDIN commands");
190            tokio::signal::ctrl_c().await.unwrap();
191        }
192    });
193}
194
195/// Initialize the [`tokio`] runtime.
196fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
197    tokio::runtime::Builder::new_multi_thread()
198        .worker_threads(config.tokio.threads)
199        .thread_name("cuprated-tokio")
200        .enable_all()
201        .build()
202        .unwrap()
203}
204
205/// Initialize the global [`rayon`] thread-pool.
206fn init_global_rayon_pool(config: &Config) {
207    rayon::ThreadPoolBuilder::new()
208        .num_threads(config.rayon.threads)
209        .thread_name(|index| format!("cuprated-rayon-{index}"))
210        .build_global()
211        .unwrap();
212}