use std::{
collections::HashSet,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::transaction::Transaction;
use tower::{Service, ServiceExt};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTxBuilder},
State, TxState,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::{
service::{
interface::{
TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
},
TxpoolReadHandle, TxpoolWriteHandle,
},
transaction_blob_hash,
};
use cuprate_types::TransactionVerificationData;
use crate::{
blockchain::ConcreteTxVerifierService,
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
txpool::{
dandelion,
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
},
};
#[derive(Debug, thiserror::Error)]
pub enum IncomingTxError {
#[error("Error parsing tx: {0}")]
Parse(std::io::Error),
#[error(transparent)]
Consensus(ExtendedConsensusError),
#[error("Duplicate tx in message")]
DuplicateTransaction,
}
pub struct IncomingTxs {
pub txs: Vec<Bytes>,
pub state: TxState<CrossNetworkInternalPeerId>,
}
#[derive(Clone)]
pub struct DandelionTx(pub Bytes);
pub(super) type TxId = [u8; 32];
#[derive(Clone)]
pub struct IncomingTxHandler {
pub(super) txs_being_handled: TxsBeingHandled,
pub(super) blockchain_context_cache: BlockChainContextService,
pub(super) dandelion_pool_manager:
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
pub(super) tx_verifier_service: ConcreteTxVerifierService,
pub(super) txpool_write_handle: TxpoolWriteHandle,
pub(super) txpool_read_handle: TxpoolReadHandle,
}
impl IncomingTxHandler {
#[expect(clippy::significant_drop_tightening)]
pub fn init(
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockChainContextService,
tx_verifier_service: ConcreteTxVerifierService,
) -> Self {
let dandelion_router = dandelion::dandelion_router(clear_net);
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
dandelion_router,
txpool_read_handle.clone(),
txpool_write_handle.clone(),
);
Self {
txs_being_handled: TxsBeingHandled::new(),
blockchain_context_cache,
dandelion_pool_manager,
tx_verifier_service,
txpool_write_handle,
txpool_read_handle,
}
}
}
impl Service<IncomingTxs> for IncomingTxHandler {
type Response = ();
type Error = IncomingTxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: IncomingTxs) -> Self::Future {
handle_incoming_txs(
req,
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.tx_verifier_service.clone(),
self.txpool_write_handle.clone(),
self.txpool_read_handle.clone(),
self.dandelion_pool_manager.clone(),
)
.boxed()
}
}
async fn handle_incoming_txs(
IncomingTxs { txs, state }: IncomingTxs,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
) -> Result<(), IncomingTxError> {
let _reorg_guard = REORG_LOCK.read().await;
let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
let BlockChainContextResponse::Context(context) = blockchain_context_cache
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let context = context.unchecked_blockchain_context();
tx_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyTxRequest::Prepped {
txs: txs.clone(),
current_chain_height: context.chain_height,
top_hash: context.top_hash,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
})
.await
.map_err(IncomingTxError::Consensus)?;
for tx in txs {
handle_valid_tx(
tx,
state.clone(),
&mut txpool_write_handle,
&mut dandelion_pool_manager,
)
.await;
}
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
state.clone(),
&mut txpool_read_handle,
&mut dandelion_pool_manager,
)
.await;
}
Ok(())
}
async fn prepare_incoming_txs(
tx_blobs: Vec<Bytes>,
txs_being_handled: TxsBeingHandled,
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<
(
Vec<Arc<TransactionVerificationData>>,
Vec<TxId>,
TxsBeingHandledLocally,
),
IncomingTxError,
> {
let mut tx_blob_hashes = HashSet::new();
let mut txs_being_handled_locally = txs_being_handled.local_tracker();
let txs = tx_blobs
.into_iter()
.filter_map(|tx_blob| {
let tx_blob_hash = transaction_blob_hash(&tx_blob);
if !tx_blob_hashes.insert(tx_blob_hash) {
return Some(Err(IncomingTxError::DuplicateTransaction));
}
if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
return None;
}
Some(Ok((tx_blob_hash, tx_blob)))
})
.collect::<Result<Vec<_>, _>>()?;
let TxpoolReadResponse::FilterKnownTxBlobHashes {
unknown_blob_hashes,
stem_pool_hashes,
} = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
rayon_spawn_async(move || {
let txs = txs
.into_iter()
.filter_map(|(tx_blob_hash, tx_blob)| {
if unknown_blob_hashes.contains(&tx_blob_hash) {
Some(tx_blob)
} else {
None
}
})
.map(|bytes| {
let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
let tx = new_tx_verification_data(tx)
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
Ok(Arc::new(tx))
})
.collect::<Result<Vec<_>, IncomingTxError>>()?;
Ok((txs, stem_pool_hashes, txs_being_handled_locally))
})
.await
}
async fn handle_valid_tx(
tx: Arc<TransactionVerificationData>,
state: TxState<CrossNetworkInternalPeerId>,
txpool_write_handle: &mut TxpoolWriteHandle,
dandelion_pool_manager: &mut DandelionPoolService<
DandelionTx,
TxId,
CrossNetworkInternalPeerId,
>,
) {
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx,
state_stem: state.is_stem_stage(),
})
.await
.expect("TODO")
else {
unreachable!()
};
if let Some(tx_hash) = double_spend {
return;
};
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(None)
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
async fn rerelay_stem_tx(
tx_hash: &TxId,
state: TxState<CrossNetworkInternalPeerId>,
txpool_read_handle: &mut TxpoolReadHandle,
dandelion_pool_manager: &mut DandelionPoolService<
DandelionTx,
TxId,
CrossNetworkInternalPeerId,
>,
) {
let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxBlob(*tx_hash))
.await
else {
return;
};
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(Some(State::Stem))
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}