cuprated/rpc/service/
txpool.rs

1//! Functions to send [`TxpoolReadRequest`]s.
2
3use std::{
4    collections::{HashMap, HashSet},
5    convert::Infallible,
6    num::NonZero,
7};
8
9use anyhow::{anyhow, Error};
10use monero_serai::transaction::Transaction;
11use tower::{Service, ServiceExt};
12
13use cuprate_helper::cast::usize_to_u64;
14use cuprate_rpc_types::misc::{SpentKeyImageInfo, TxInfo};
15use cuprate_txpool::{
16    service::{
17        interface::{TxpoolReadRequest, TxpoolReadResponse},
18        TxpoolReadHandle,
19    },
20    TxEntry,
21};
22use cuprate_types::{
23    rpc::{PoolInfo, PoolInfoFull, PoolInfoIncremental, PoolTxInfo, TxpoolStats},
24    TransactionVerificationData, TxInPool, TxRelayChecks,
25};
26
27// FIXME: use `anyhow::Error` over `tower::BoxError` in txpool.
28
29/// [`TxpoolReadRequest::Backlog`]
30pub async fn backlog(txpool_read: &mut TxpoolReadHandle) -> Result<Vec<TxEntry>, Error> {
31    let TxpoolReadResponse::Backlog(tx_entries) = txpool_read
32        .ready()
33        .await
34        .map_err(|e| anyhow!(e))?
35        .call(TxpoolReadRequest::Backlog)
36        .await
37        .map_err(|e| anyhow!(e))?
38    else {
39        unreachable!();
40    };
41
42    Ok(tx_entries)
43}
44
45/// [`TxpoolReadRequest::Size`]
46pub async fn size(
47    txpool_read: &mut TxpoolReadHandle,
48    include_sensitive_txs: bool,
49) -> Result<u64, Error> {
50    let TxpoolReadResponse::Size(size) = txpool_read
51        .ready()
52        .await
53        .map_err(|e| anyhow!(e))?
54        .call(TxpoolReadRequest::Size {
55            include_sensitive_txs,
56        })
57        .await
58        .map_err(|e| anyhow!(e))?
59    else {
60        unreachable!();
61    };
62
63    Ok(usize_to_u64(size))
64}
65
66/// [`TxpoolReadRequest::PoolInfo`]
67pub async fn pool_info(
68    txpool_read: &mut TxpoolReadHandle,
69    include_sensitive_txs: bool,
70    max_tx_count: usize,
71    start_time: Option<NonZero<usize>>,
72) -> Result<PoolInfo, Error> {
73    let TxpoolReadResponse::PoolInfo(pool_info) = txpool_read
74        .ready()
75        .await
76        .map_err(|e| anyhow!(e))?
77        .call(TxpoolReadRequest::PoolInfo {
78            include_sensitive_txs,
79            max_tx_count,
80            start_time,
81        })
82        .await
83        .map_err(|e| anyhow!(e))?
84    else {
85        unreachable!();
86    };
87
88    Ok(pool_info)
89}
90
91/// [`TxpoolReadRequest::TxsByHash`]
92pub async fn txs_by_hash(
93    txpool_read: &mut TxpoolReadHandle,
94    tx_hashes: Vec<[u8; 32]>,
95    include_sensitive_txs: bool,
96) -> Result<Vec<TxInPool>, Error> {
97    let TxpoolReadResponse::TxsByHash(txs_in_pool) = txpool_read
98        .ready()
99        .await
100        .map_err(|e| anyhow!(e))?
101        .call(TxpoolReadRequest::TxsByHash {
102            tx_hashes,
103            include_sensitive_txs,
104        })
105        .await
106        .map_err(|e| anyhow!(e))?
107    else {
108        unreachable!();
109    };
110
111    Ok(txs_in_pool)
112}
113
114/// [`TxpoolReadRequest::KeyImagesSpent`]
115pub async fn key_images_spent(
116    txpool_read: &mut TxpoolReadHandle,
117    key_images: HashSet<[u8; 32]>,
118    include_sensitive_txs: bool,
119) -> Result<bool, Error> {
120    let TxpoolReadResponse::KeyImagesSpent(status) = txpool_read
121        .ready()
122        .await
123        .map_err(|e| anyhow!(e))?
124        .call(TxpoolReadRequest::KeyImagesSpent {
125            key_images,
126            include_sensitive_txs,
127        })
128        .await
129        .map_err(|e| anyhow!(e))?
130    else {
131        unreachable!();
132    };
133
134    Ok(status)
135}
136
137/// [`TxpoolReadRequest::KeyImagesSpentVec`]
138pub async fn key_images_spent_vec(
139    txpool_read: &mut TxpoolReadHandle,
140    key_images: Vec<[u8; 32]>,
141    include_sensitive_txs: bool,
142) -> Result<Vec<bool>, Error> {
143    let TxpoolReadResponse::KeyImagesSpentVec(status) = txpool_read
144        .ready()
145        .await
146        .map_err(|e| anyhow!(e))?
147        .call(TxpoolReadRequest::KeyImagesSpentVec {
148            key_images,
149            include_sensitive_txs,
150        })
151        .await
152        .map_err(|e| anyhow!(e))?
153    else {
154        unreachable!();
155    };
156
157    Ok(status)
158}
159
160/// [`TxpoolReadRequest::Pool`]
161pub async fn pool(
162    txpool_read: &mut TxpoolReadHandle,
163    include_sensitive_txs: bool,
164) -> Result<(Vec<TxInfo>, Vec<SpentKeyImageInfo>), Error> {
165    let TxpoolReadResponse::Pool {
166        txs,
167        spent_key_images,
168    } = txpool_read
169        .ready()
170        .await
171        .map_err(|e| anyhow!(e))?
172        .call(TxpoolReadRequest::Pool {
173            include_sensitive_txs,
174        })
175        .await
176        .map_err(|e| anyhow!(e))?
177    else {
178        unreachable!();
179    };
180
181    let txs = txs.into_iter().map(Into::into).collect();
182    let spent_key_images = spent_key_images.into_iter().map(Into::into).collect();
183
184    Ok((txs, spent_key_images))
185}
186
187/// [`TxpoolReadRequest::PoolStats`]
188pub async fn pool_stats(
189    txpool_read: &mut TxpoolReadHandle,
190    include_sensitive_txs: bool,
191) -> Result<TxpoolStats, Error> {
192    let TxpoolReadResponse::PoolStats(txpool_stats) = txpool_read
193        .ready()
194        .await
195        .map_err(|e| anyhow!(e))?
196        .call(TxpoolReadRequest::PoolStats {
197            include_sensitive_txs,
198        })
199        .await
200        .map_err(|e| anyhow!(e))?
201    else {
202        unreachable!();
203    };
204
205    Ok(txpool_stats)
206}
207
208/// [`TxpoolReadRequest::AllHashes`]
209pub async fn all_hashes(
210    txpool_read: &mut TxpoolReadHandle,
211    include_sensitive_txs: bool,
212) -> Result<Vec<[u8; 32]>, Error> {
213    let TxpoolReadResponse::AllHashes(hashes) = txpool_read
214        .ready()
215        .await
216        .map_err(|e| anyhow!(e))?
217        .call(TxpoolReadRequest::AllHashes {
218            include_sensitive_txs,
219        })
220        .await
221        .map_err(|e| anyhow!(e))?
222    else {
223        unreachable!();
224    };
225
226    Ok(hashes)
227}
228
229/// [`TxpoolReadRequest::TxsForBlock`]
230pub async fn txs_for_block(
231    txpool_read: &mut TxpoolReadHandle,
232    tx_hashes: Vec<[u8; 32]>,
233) -> Result<(HashMap<[u8; 32], TransactionVerificationData>, Vec<usize>), Error> {
234    let TxpoolReadResponse::TxsForBlock { txs, missing } = txpool_read
235        .ready()
236        .await
237        .map_err(|e| anyhow!(e))?
238        .call(TxpoolReadRequest::TxsForBlock(tx_hashes))
239        .await
240        .map_err(|e| anyhow!(e))?
241    else {
242        unreachable!();
243    };
244
245    Ok((txs, missing))
246}
247
248/// TODO: impl txpool manager.
249pub async fn flush(txpool_manager: &mut Infallible, tx_hashes: Vec<[u8; 32]>) -> Result<(), Error> {
250    todo!();
251    Ok(())
252}
253
254/// TODO: impl txpool manager.
255pub async fn relay(txpool_manager: &mut Infallible, tx_hashes: Vec<[u8; 32]>) -> Result<(), Error> {
256    todo!();
257    Ok(())
258}