cuprated/txpool/dandelion/
tx_store.rs1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use futures::{future::BoxFuture, FutureExt};
5use tower::{Service, ServiceExt};
6
7use cuprate_dandelion_tower::{
8 traits::{TxStoreRequest, TxStoreResponse},
9 State,
10};
11use cuprate_database::RuntimeError;
12use cuprate_txpool::service::{
13 interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
14 TxpoolReadHandle, TxpoolWriteHandle,
15};
16
17use super::{DandelionTx, TxId};
18
19pub struct TxStoreService {
23 pub txpool_read_handle: TxpoolReadHandle,
24 pub txpool_write_handle: TxpoolWriteHandle,
25}
26
27impl Service<TxStoreRequest<TxId>> for TxStoreService {
28 type Response = TxStoreResponse<DandelionTx>;
29 type Error = tower::BoxError;
30 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
31
32 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33 Poll::Ready(Ok(()))
34 }
35
36 fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
37 match req {
38 TxStoreRequest::Get(tx_id) => self
39 .txpool_read_handle
40 .clone()
41 .oneshot(TxpoolReadRequest::TxBlob(tx_id))
42 .map(|res| match res {
43 Ok(TxpoolReadResponse::TxBlob {
44 tx_blob,
45 state_stem,
46 }) => {
47 let state = if state_stem {
48 State::Stem
49 } else {
50 State::Fluff
51 };
52
53 Ok(TxStoreResponse::Transaction(Some((
54 DandelionTx(Bytes::from(tx_blob)),
55 state,
56 ))))
57 }
58 Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
59 Err(e) => Err(e.into()),
60 Ok(_) => unreachable!(),
61 })
62 .boxed(),
63 TxStoreRequest::Promote(tx_id) => self
64 .txpool_write_handle
65 .clone()
66 .oneshot(TxpoolWriteRequest::Promote(tx_id))
67 .map(|res| match res {
68 Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
69 Err(e) => Err(e.into()),
70 })
71 .boxed(),
72 }
73 }
74}