cuprated/txpool/dandelion/
tx_store.rs

1use std::{
2    future::ready,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use tokio::sync::mpsc;
9use tower::{Service, ServiceExt};
10
11use cuprate_dandelion_tower::{
12    traits::{TxStoreRequest, TxStoreResponse},
13    State,
14};
15use cuprate_database::RuntimeError;
16use cuprate_txpool::service::{
17    interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
18    TxpoolReadHandle, TxpoolWriteHandle,
19};
20
21use super::{DandelionTx, TxId};
22
23/// The dandelion tx-store service.
24///
25/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
26pub struct TxStoreService {
27    pub txpool_read_handle: TxpoolReadHandle,
28    pub promote_tx: mpsc::UnboundedSender<[u8; 32]>,
29}
30
31impl Service<TxStoreRequest<TxId>> for TxStoreService {
32    type Response = TxStoreResponse<DandelionTx>;
33    type Error = tower::BoxError;
34    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
35
36    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37        Poll::Ready(Ok(()))
38    }
39
40    fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
41        match req {
42            TxStoreRequest::Get(tx_id) => self
43                .txpool_read_handle
44                .clone()
45                .oneshot(TxpoolReadRequest::TxBlob(tx_id))
46                .map(|res| match res {
47                    Ok(TxpoolReadResponse::TxBlob {
48                        tx_blob,
49                        state_stem,
50                    }) => {
51                        let state = if state_stem {
52                            State::Stem
53                        } else {
54                            State::Fluff
55                        };
56
57                        Ok(TxStoreResponse::Transaction(Some((
58                            DandelionTx(Bytes::from(tx_blob)),
59                            state,
60                        ))))
61                    }
62                    Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
63                    Err(e) => Err(e.into()),
64                    Ok(_) => unreachable!(),
65                })
66                .boxed(),
67            TxStoreRequest::Promote(tx_id) => ready(
68                self.promote_tx
69                    .send(tx_id)
70                    .map_err(Into::into)
71                    .map(|()| TxStoreResponse::Ok),
72            )
73            .boxed(),
74        }
75    }
76}