tokio_util/sync/cancellation_token.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
//! An asynchronously awaitable `CancellationToken`.
//! The token allows to signal a cancellation request to one or more tasks.
pub(crate) mod guard;
mod tree_node;
use crate::loom::sync::Arc;
use crate::util::MaybeDangling;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use guard::DropGuard;
use pin_project_lite::pin_project;
/// A token which can be used to signal a cancellation request to one or more
/// tasks.
///
/// Tasks can call [`CancellationToken::cancelled()`] in order to
/// obtain a Future which will be resolved when cancellation is requested.
///
/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
///
/// # Examples
///
/// ```no_run
/// use tokio::select;
/// use tokio_util::sync::CancellationToken;
///
/// #[tokio::main]
/// async fn main() {
/// let token = CancellationToken::new();
/// let cloned_token = token.clone();
///
/// let join_handle = tokio::spawn(async move {
/// // Wait for either cancellation or a very long time
/// select! {
/// _ = cloned_token.cancelled() => {
/// // The token was cancelled
/// 5
/// }
/// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
/// 99
/// }
/// }
/// });
///
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
///
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
pub struct CancellationToken {
inner: Arc<tree_node::TreeNode>,
}
impl std::panic::UnwindSafe for CancellationToken {}
impl std::panic::RefUnwindSafe for CancellationToken {}
pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled.
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
cancellation_token: &'a CancellationToken,
#[pin]
future: tokio::sync::futures::Notified<'a>,
}
}
pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled.
///
/// This is the counterpart to [`WaitForCancellationFuture`] that takes
/// [`CancellationToken`] by value instead of using a reference.
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFutureOwned {
// This field internally has a reference to the cancellation token, but camouflages
// the relationship with `'static`. To avoid Undefined Behavior, we must ensure
// that the reference is only used while the cancellation token is still alive. To
// do that, we ensure that the future is the first field, so that it is dropped
// before the cancellation token.
//
// We use `MaybeDanglingFuture` here because without it, the compiler could assert
// the reference inside `future` to be valid even after the destructor of that
// field runs. (Specifically, when the `WaitForCancellationFutureOwned` is passed
// as an argument to a function, the reference can be asserted to be valid for the
// rest of that function.) To avoid that, we use `MaybeDangling` which tells the
// compiler that the reference stored inside it might not be valid.
//
// See <https://users.rust-lang.org/t/unsafe-code-review-semi-owning-weak-rwlock-t-guard/95706>
// for more info.
#[pin]
future: MaybeDangling<tokio::sync::futures::Notified<'static>>,
cancellation_token: CancellationToken,
}
}
// ===== impl CancellationToken =====
impl core::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CancellationToken")
.field("is_cancelled", &self.is_cancelled())
.finish()
}
}
impl Clone for CancellationToken {
/// Creates a clone of the `CancellationToken` which will get cancelled
/// whenever the current token gets cancelled, and vice versa.
fn clone(&self) -> Self {
tree_node::increase_handle_refcount(&self.inner);
CancellationToken {
inner: self.inner.clone(),
}
}
}
impl Drop for CancellationToken {
fn drop(&mut self) {
tree_node::decrease_handle_refcount(&self.inner);
}
}
impl Default for CancellationToken {
fn default() -> CancellationToken {
CancellationToken::new()
}
}
impl CancellationToken {
/// Creates a new `CancellationToken` in the non-cancelled state.
pub fn new() -> CancellationToken {
CancellationToken {
inner: Arc::new(tree_node::TreeNode::new()),
}
}
/// Creates a `CancellationToken` which will get cancelled whenever the
/// current token gets cancelled. Unlike a cloned `CancellationToken`,
/// cancelling a child token does not cancel the parent token.
///
/// If the current token is already cancelled, the child token will get
/// returned in cancelled state.
///
/// # Examples
///
/// ```no_run
/// use tokio::select;
/// use tokio_util::sync::CancellationToken;
///
/// #[tokio::main]
/// async fn main() {
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
///
/// let join_handle = tokio::spawn(async move {
/// // Wait for either cancellation or a very long time
/// select! {
/// _ = child_token.cancelled() => {
/// // The token was cancelled
/// 5
/// }
/// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
/// 99
/// }
/// }
/// });
///
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
///
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
pub fn child_token(&self) -> CancellationToken {
CancellationToken {
inner: tree_node::child_node(&self.inner),
}
}
/// Cancel the [`CancellationToken`] and all child tokens which had been
/// derived from it.
///
/// This will wake up all tasks which are waiting for cancellation.
///
/// Be aware that cancellation is not an atomic operation. It is possible
/// for another thread running in parallel with a call to `cancel` to first
/// receive `true` from `is_cancelled` on one child node, and then receive
/// `false` from `is_cancelled` on another child node. However, once the
/// call to `cancel` returns, all child nodes have been fully cancelled.
pub fn cancel(&self) {
tree_node::cancel(&self.inner);
}
/// Returns `true` if the `CancellationToken` is cancelled.
pub fn is_cancelled(&self) -> bool {
tree_node::is_cancelled(&self.inner)
}
/// Returns a `Future` that gets fulfilled when cancellation is requested.
///
/// The future will complete immediately if the token is already cancelled
/// when this method is called.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
WaitForCancellationFuture {
cancellation_token: self,
future: self.inner.notified(),
}
}
/// Returns a `Future` that gets fulfilled when cancellation is requested.
///
/// The future will complete immediately if the token is already cancelled
/// when this method is called.
///
/// The function takes self by value and returns a future that owns the
/// token.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
WaitForCancellationFutureOwned::new(self)
}
/// Creates a `DropGuard` for this token.
///
/// Returned guard will cancel this token (and all its children) on drop
/// unless disarmed.
pub fn drop_guard(self) -> DropGuard {
DropGuard { inner: Some(self) }
}
/// Runs a future to completion and returns its result wrapped inside of an `Option`
/// unless the `CancellationToken` is cancelled. In that case the function returns
/// `None` and the future gets dropped.
///
/// # Cancel safety
///
/// This method is only cancel safe if `fut` is cancel safe.
pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
where
F: Future,
{
pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled or a given Future gets resolved. It is biased towards the
/// Future completion.
#[must_use = "futures do nothing unless polled"]
struct RunUntilCancelledFuture<'a, F: Future> {
#[pin]
cancellation: WaitForCancellationFuture<'a>,
#[pin]
future: F,
}
}
impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(res) = this.future.poll(cx) {
Poll::Ready(Some(res))
} else if this.cancellation.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
RunUntilCancelledFuture {
cancellation: self.cancelled(),
future: fut,
}
.await
}
}
// ===== impl WaitForCancellationFuture =====
impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFuture").finish()
}
}
impl<'a> Future for WaitForCancellationFuture<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
this.future.set(this.cancellation_token.inner.notified());
}
}
}
// ===== impl WaitForCancellationFutureOwned =====
impl core::fmt::Debug for WaitForCancellationFutureOwned {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFutureOwned").finish()
}
}
impl WaitForCancellationFutureOwned {
fn new(cancellation_token: CancellationToken) -> Self {
WaitForCancellationFutureOwned {
// cancellation_token holds a heap allocation and is guaranteed to have a
// stable deref, thus it would be ok to move the cancellation_token while
// the future holds a reference to it.
//
// # Safety
//
// cancellation_token is dropped after future due to the field ordering.
future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }),
cancellation_token,
}
}
/// # Safety
/// The returned future must be destroyed before the cancellation token is
/// destroyed.
unsafe fn new_future(
cancellation_token: &CancellationToken,
) -> tokio::sync::futures::Notified<'static> {
let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
// SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains
// valid until the strong count of the Arc drops to zero, and the caller
// guarantees that they will drop the future before that happens.
(*inner_ptr).notified()
}
}
impl Future for WaitForCancellationFutureOwned {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
// # Safety
//
// cancellation_token is dropped after future due to the field ordering.
this.future.set(MaybeDangling::new(unsafe {
Self::new_future(this.cancellation_token)
}));
}
}
}