cuprated/txpool/dandelion/
tx_store.rs

1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use futures::{future::BoxFuture, FutureExt};
5use tower::{Service, ServiceExt};
6
7use cuprate_dandelion_tower::{
8    traits::{TxStoreRequest, TxStoreResponse},
9    State,
10};
11use cuprate_database::RuntimeError;
12use cuprate_txpool::service::{
13    interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
14    TxpoolReadHandle, TxpoolWriteHandle,
15};
16
17use super::{DandelionTx, TxId};
18
19/// The dandelion tx-store service.
20///
21/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
22pub struct TxStoreService {
23    pub txpool_read_handle: TxpoolReadHandle,
24    pub txpool_write_handle: TxpoolWriteHandle,
25}
26
27impl Service<TxStoreRequest<TxId>> for TxStoreService {
28    type Response = TxStoreResponse<DandelionTx>;
29    type Error = tower::BoxError;
30    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
31
32    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33        Poll::Ready(Ok(()))
34    }
35
36    fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
37        match req {
38            TxStoreRequest::Get(tx_id) => self
39                .txpool_read_handle
40                .clone()
41                .oneshot(TxpoolReadRequest::TxBlob(tx_id))
42                .map(|res| match res {
43                    Ok(TxpoolReadResponse::TxBlob {
44                        tx_blob,
45                        state_stem,
46                    }) => {
47                        let state = if state_stem {
48                            State::Stem
49                        } else {
50                            State::Fluff
51                        };
52
53                        Ok(TxStoreResponse::Transaction(Some((
54                            DandelionTx(Bytes::from(tx_blob)),
55                            state,
56                        ))))
57                    }
58                    Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
59                    Err(e) => Err(e.into()),
60                    Ok(_) => unreachable!(),
61                })
62                .boxed(),
63            TxStoreRequest::Promote(tx_id) => self
64                .txpool_write_handle
65                .clone()
66                .oneshot(TxpoolWriteRequest::Promote(tx_id))
67                .map(|res| match res {
68                    Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
69                    Err(e) => Err(e.into()),
70                })
71                .boxed(),
72        }
73    }
74}