cuprate_txpool/service.rs
1//! [`tower::Service`] integeration + thread-pool.
2//!
3//! ## `service`
4//! The `service` module implements the [`tower`] integration,
5//! along with the reader/writer thread-pool system.
6//!
7//! The thread-pool allows outside crates to communicate with it by
8//! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously -
9//! without having to actually worry and handle the database themselves.
10//!
11//! The system is managed by this crate, and only requires [`init`] by the user.
12//!
13//! ## Handles
14//! The 2 handles to the database are:
15//! - [`TxpoolReadHandle`]
16//! - [`TxpoolWriteHandle`]
17//!
18//! The 1st allows any caller to send [`ReadRequest`][req_r]s.
19//!
20//! The 2nd allows any caller to send [`WriteRequest`][req_w]s.
21//!
22//! Both the handles are cheaply [`Clone`]able.
23//!
24//! ## Initialization
25//! The database & thread-pool system can be initialized with [`init()`].
26//!
27//! This causes the underlying database/threads to be setup
28//! and returns a read/write handle to that database.
29//!
30//! ## Shutdown
31//! Upon the above handles being dropped, the corresponding thread(s) will automatically exit, i.e:
32//! - The last [`TxpoolReadHandle`] is dropped => reader thread-pool exits
33//! - The last [`TxpoolWriteHandle`] is dropped => writer thread exits
34//!
35//! Upon dropping the [`cuprate_database::Env`]:
36//! - All un-processed database transactions are completed
37//! - All data gets flushed to disk (caused by [`Drop::drop`] impl on `Env`)
38//!
39//! ## Request and Response
40//! To interact with the database (whether reading or writing data),
41//! a `Request` can be sent using one of the above handles.
42//!
43//! Both the handles implement [`tower::Service`], so they can be [`tower::Service::call`]ed.
44//!
45//! An `async`hronous channel will be returned from the call.
46//! This channel can be `.await`ed upon to (eventually) receive
47//! the corresponding `Response` to your `Request`.
48//!
49//! [req_r]: interface::TxpoolReadRequest
50//!
51//! [req_w]: interface::TxpoolWriteRequest
52//!
53//! // TODO: we have 2 responses
54//!
55//! [resp]: interface::TxpoolWriteResponse
56//!
57//! # Example
58//! Simple usage of `service`.
59//!
60//! ```rust
61//! use std::sync::Arc;
62//!
63//! use hex_literal::hex;
64//! use tower::{Service, ServiceExt};
65//!
66//! use cuprate_test_utils::data::TX_V1_SIG2;
67//!
68//! use cuprate_txpool::{
69//! cuprate_database::Env,
70//! config::ConfigBuilder,
71//! service::interface::{
72//! TxpoolWriteRequest,
73//! TxpoolWriteResponse,
74//! TxpoolReadRequest,
75//! TxpoolReadResponse
76//! }
77//! };
78//!
79//! # #[tokio::main]
80//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
81//! // Create a configuration for the database environment.
82//! use cuprate_test_utils::data::TX_V1_SIG2;
83//! let tmp_dir = tempfile::tempdir()?;
84//! let db_dir = tmp_dir.path().to_owned();
85//! let config = ConfigBuilder::new()
86//! .data_directory(db_dir.into())
87//! .build();
88//!
89//! // Initialize the database thread-pool.
90//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(config)?;
91//!
92//! // Prepare a request to write block.
93//! let tx = TX_V1_SIG2.clone();
94//! let request = TxpoolWriteRequest::AddTransaction {
95//! tx: Box::new(tx.try_into().unwrap()),
96//! state_stem: false,
97//! };
98//!
99//! // Send the request.
100//! // We receive back an `async` channel that will
101//! // eventually yield the result when `service`
102//! // is done writing the tx.
103//! let response_channel = write_handle.ready().await?.call(request);
104//!
105//! // Block write was OK.
106//! let TxpoolWriteResponse::AddTransaction(double_spent) = response_channel.await? else {
107//! panic!("tx-pool returned wrong response!");
108//! };
109//! assert!(double_spent.is_none());
110//!
111//! // Now, let's try getting the block hash
112//! // of the block we just wrote.
113//! let request = TxpoolReadRequest::TxBlob(TX_V1_SIG2.tx_hash);
114//! let response_channel = read_handle.ready().await?.call(request);
115//! let response = response_channel.await?;
116//!
117//! // This causes the writer thread on the
118//! // other side of this handle to exit...
119//! drop(write_handle);
120//! // ...and this causes the reader thread-pool to exit.
121//! drop(read_handle);
122//! # Ok(()) }
123//! ```
124
125mod free;
126pub mod interface;
127mod read;
128mod types;
129mod write;
130
131pub use free::{init, init_with_pool};
132pub use types::{TxpoolReadHandle, TxpoolWriteHandle};