cuprated/txpool/dandelion/
tx_store.rs1use std::{
2 future::ready,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use tokio::sync::mpsc;
9use tower::{Service, ServiceExt};
10
11use cuprate_dandelion_tower::{
12 traits::{TxStoreRequest, TxStoreResponse},
13 State,
14};
15use cuprate_database::RuntimeError;
16use cuprate_txpool::service::{
17 interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
18 TxpoolReadHandle, TxpoolWriteHandle,
19};
20
21use super::{DandelionTx, TxId};
22
23pub struct TxStoreService {
27 pub txpool_read_handle: TxpoolReadHandle,
28 pub promote_tx: mpsc::UnboundedSender<[u8; 32]>,
29}
30
31impl Service<TxStoreRequest<TxId>> for TxStoreService {
32 type Response = TxStoreResponse<DandelionTx>;
33 type Error = tower::BoxError;
34 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
35
36 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37 Poll::Ready(Ok(()))
38 }
39
40 fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
41 match req {
42 TxStoreRequest::Get(tx_id) => self
43 .txpool_read_handle
44 .clone()
45 .oneshot(TxpoolReadRequest::TxBlob(tx_id))
46 .map(|res| match res {
47 Ok(TxpoolReadResponse::TxBlob {
48 tx_blob,
49 state_stem,
50 }) => {
51 let state = if state_stem {
52 State::Stem
53 } else {
54 State::Fluff
55 };
56
57 Ok(TxStoreResponse::Transaction(Some((
58 DandelionTx(Bytes::from(tx_blob)),
59 state,
60 ))))
61 }
62 Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
63 Err(e) => Err(e.into()),
64 Ok(_) => unreachable!(),
65 })
66 .boxed(),
67 TxStoreRequest::Promote(tx_id) => ready(
68 self.promote_tx
69 .send(tx_id)
70 .map_err(Into::into)
71 .map(|()| TxStoreResponse::Ok),
72 )
73 .boxed(),
74 }
75 }
76}