tokio/task/join_set.rs
1//! A collection of tasks spawned on a Tokio runtime.
2//!
3//! This module provides the [`JoinSet`] type, a collection which stores a set
4//! of spawned tasks and allows asynchronously awaiting the output of those
5//! tasks as they complete. See the documentation for the [`JoinSet`] type for
6//! details.
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{fmt, panic};
11
12use crate::runtime::Handle;
13use crate::task::Id;
14use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet};
15use crate::util::IdleNotifiedSet;
16
17/// A collection of tasks spawned on a Tokio runtime.
18///
19/// A `JoinSet` can be used to await the completion of some or all of the tasks
20/// in the set. The set is not ordered, and the tasks will be returned in the
21/// order they complete.
22///
23/// All of the tasks must have the same return type `T`.
24///
25/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
26///
27/// # Examples
28///
29/// Spawn multiple tasks and wait for them.
30///
31/// ```
32/// use tokio::task::JoinSet;
33///
34/// #[tokio::main]
35/// async fn main() {
36/// let mut set = JoinSet::new();
37///
38/// for i in 0..10 {
39/// set.spawn(async move { i });
40/// }
41///
42/// let mut seen = [false; 10];
43/// while let Some(res) = set.join_next().await {
44/// let idx = res.unwrap();
45/// seen[idx] = true;
46/// }
47///
48/// for i in 0..10 {
49/// assert!(seen[i]);
50/// }
51/// }
52/// ```
53#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
54pub struct JoinSet<T> {
55 inner: IdleNotifiedSet<JoinHandle<T>>,
56}
57
58/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
59/// than on the current default runtime.
60///
61/// [`task::Builder`]: crate::task::Builder
62#[cfg(all(tokio_unstable, feature = "tracing"))]
63#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
64#[must_use = "builders do nothing unless used to spawn a task"]
65pub struct Builder<'a, T> {
66 joinset: &'a mut JoinSet<T>,
67 builder: super::Builder<'a>,
68}
69
70impl<T> JoinSet<T> {
71 /// Create a new `JoinSet`.
72 pub fn new() -> Self {
73 Self {
74 inner: IdleNotifiedSet::new(),
75 }
76 }
77
78 /// Returns the number of tasks currently in the `JoinSet`.
79 pub fn len(&self) -> usize {
80 self.inner.len()
81 }
82
83 /// Returns whether the `JoinSet` is empty.
84 pub fn is_empty(&self) -> bool {
85 self.inner.is_empty()
86 }
87}
88
89impl<T: 'static> JoinSet<T> {
90 /// Returns a [`Builder`] that can be used to configure a task prior to
91 /// spawning it on this `JoinSet`.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// use tokio::task::JoinSet;
97 ///
98 /// #[tokio::main]
99 /// async fn main() -> std::io::Result<()> {
100 /// let mut set = JoinSet::new();
101 ///
102 /// // Use the builder to configure a task's name before spawning it.
103 /// set.build_task()
104 /// .name("my_task")
105 /// .spawn(async { /* ... */ })?;
106 ///
107 /// Ok(())
108 /// }
109 /// ```
110 #[cfg(all(tokio_unstable, feature = "tracing"))]
111 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
112 pub fn build_task(&mut self) -> Builder<'_, T> {
113 Builder {
114 builder: super::Builder::new(),
115 joinset: self,
116 }
117 }
118
119 /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
120 /// that can be used to remotely cancel the task.
121 ///
122 /// The provided future will start running in the background immediately
123 /// when this method is called, even if you don't await anything on this
124 /// `JoinSet`.
125 ///
126 /// # Panics
127 ///
128 /// This method panics if called outside of a Tokio runtime.
129 ///
130 /// [`AbortHandle`]: crate::task::AbortHandle
131 #[track_caller]
132 pub fn spawn<F>(&mut self, task: F) -> AbortHandle
133 where
134 F: Future<Output = T>,
135 F: Send + 'static,
136 T: Send,
137 {
138 self.insert(crate::spawn(task))
139 }
140
141 /// Spawn the provided task on the provided runtime and store it in this
142 /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
143 /// cancel the task.
144 ///
145 /// The provided future will start running in the background immediately
146 /// when this method is called, even if you don't await anything on this
147 /// `JoinSet`.
148 ///
149 /// [`AbortHandle`]: crate::task::AbortHandle
150 #[track_caller]
151 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
152 where
153 F: Future<Output = T>,
154 F: Send + 'static,
155 T: Send,
156 {
157 self.insert(handle.spawn(task))
158 }
159
160 /// Spawn the provided task on the current [`LocalSet`] and store it in this
161 /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
162 /// cancel the task.
163 ///
164 /// The provided future will start running in the background immediately
165 /// when this method is called, even if you don't await anything on this
166 /// `JoinSet`.
167 ///
168 /// # Panics
169 ///
170 /// This method panics if it is called outside of a `LocalSet`.
171 ///
172 /// [`LocalSet`]: crate::task::LocalSet
173 /// [`AbortHandle`]: crate::task::AbortHandle
174 #[track_caller]
175 pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
176 where
177 F: Future<Output = T>,
178 F: 'static,
179 {
180 self.insert(crate::task::spawn_local(task))
181 }
182
183 /// Spawn the provided task on the provided [`LocalSet`] and store it in
184 /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
185 /// remotely cancel the task.
186 ///
187 /// Unlike the [`spawn_local`] method, this method may be used to spawn local
188 /// tasks on a `LocalSet` that is _not_ currently running. The provided
189 /// future will start running whenever the `LocalSet` is next started.
190 ///
191 /// [`LocalSet`]: crate::task::LocalSet
192 /// [`AbortHandle`]: crate::task::AbortHandle
193 /// [`spawn_local`]: Self::spawn_local
194 #[track_caller]
195 pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
196 where
197 F: Future<Output = T>,
198 F: 'static,
199 {
200 self.insert(local_set.spawn_local(task))
201 }
202
203 /// Spawn the blocking code on the blocking threadpool and store
204 /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
205 /// used to remotely cancel the task.
206 ///
207 /// # Examples
208 ///
209 /// Spawn multiple blocking tasks and wait for them.
210 ///
211 /// ```
212 /// use tokio::task::JoinSet;
213 ///
214 /// #[tokio::main]
215 /// async fn main() {
216 /// let mut set = JoinSet::new();
217 ///
218 /// for i in 0..10 {
219 /// set.spawn_blocking(move || { i });
220 /// }
221 ///
222 /// let mut seen = [false; 10];
223 /// while let Some(res) = set.join_next().await {
224 /// let idx = res.unwrap();
225 /// seen[idx] = true;
226 /// }
227 ///
228 /// for i in 0..10 {
229 /// assert!(seen[i]);
230 /// }
231 /// }
232 /// ```
233 ///
234 /// # Panics
235 ///
236 /// This method panics if called outside of a Tokio runtime.
237 ///
238 /// [`AbortHandle`]: crate::task::AbortHandle
239 #[track_caller]
240 pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
241 where
242 F: FnOnce() -> T,
243 F: Send + 'static,
244 T: Send,
245 {
246 self.insert(crate::runtime::spawn_blocking(f))
247 }
248
249 /// Spawn the blocking code on the blocking threadpool of the
250 /// provided runtime and store it in this `JoinSet`, returning an
251 /// [`AbortHandle`] that can be used to remotely cancel the task.
252 ///
253 /// [`AbortHandle`]: crate::task::AbortHandle
254 #[track_caller]
255 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
256 where
257 F: FnOnce() -> T,
258 F: Send + 'static,
259 T: Send,
260 {
261 self.insert(handle.spawn_blocking(f))
262 }
263
264 fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
265 let abort = jh.abort_handle();
266 let mut entry = self.inner.insert_idle(jh);
267
268 // Set the waker that is notified when the task completes.
269 entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
270 abort
271 }
272
273 /// Waits until one of the tasks in the set completes and returns its output.
274 ///
275 /// Returns `None` if the set is empty.
276 ///
277 /// # Cancel Safety
278 ///
279 /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
280 /// statement and some other branch completes first, it is guaranteed that no tasks were
281 /// removed from this `JoinSet`.
282 pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
283 std::future::poll_fn(|cx| self.poll_join_next(cx)).await
284 }
285
286 /// Waits until one of the tasks in the set completes and returns its
287 /// output, along with the [task ID] of the completed task.
288 ///
289 /// Returns `None` if the set is empty.
290 ///
291 /// When this method returns an error, then the id of the task that failed can be accessed
292 /// using the [`JoinError::id`] method.
293 ///
294 /// # Cancel Safety
295 ///
296 /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
297 /// statement and some other branch completes first, it is guaranteed that no tasks were
298 /// removed from this `JoinSet`.
299 ///
300 /// [task ID]: crate::task::Id
301 /// [`JoinError::id`]: fn@crate::task::JoinError::id
302 pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
303 std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
304 }
305
306 /// Tries to join one of the tasks in the set that has completed and return its output.
307 ///
308 /// Returns `None` if there are no completed tasks, or if the set is empty.
309 pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
310 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
311 loop {
312 let mut entry = self.inner.try_pop_notified()?;
313
314 let res = entry.with_value_and_context(|jh, ctx| {
315 // Since this function is not async and cannot be forced to yield, we should
316 // disable budgeting when we want to check for the `JoinHandle` readiness.
317 Pin::new(&mut unconstrained(jh)).poll(ctx)
318 });
319
320 if let Poll::Ready(res) = res {
321 let _entry = entry.remove();
322
323 return Some(res);
324 }
325 }
326 }
327
328 /// Tries to join one of the tasks in the set that has completed and return its output,
329 /// along with the [task ID] of the completed task.
330 ///
331 /// Returns `None` if there are no completed tasks, or if the set is empty.
332 ///
333 /// When this method returns an error, then the id of the task that failed can be accessed
334 /// using the [`JoinError::id`] method.
335 ///
336 /// [task ID]: crate::task::Id
337 /// [`JoinError::id`]: fn@crate::task::JoinError::id
338 pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
339 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
340 loop {
341 let mut entry = self.inner.try_pop_notified()?;
342
343 let res = entry.with_value_and_context(|jh, ctx| {
344 // Since this function is not async and cannot be forced to yield, we should
345 // disable budgeting when we want to check for the `JoinHandle` readiness.
346 Pin::new(&mut unconstrained(jh)).poll(ctx)
347 });
348
349 if let Poll::Ready(res) = res {
350 let entry = entry.remove();
351
352 return Some(res.map(|output| (entry.id(), output)));
353 }
354 }
355 }
356
357 /// Aborts all tasks and waits for them to finish shutting down.
358 ///
359 /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
360 /// a loop until it returns `None`.
361 ///
362 /// This method ignores any panics in the tasks shutting down. When this call returns, the
363 /// `JoinSet` will be empty.
364 ///
365 /// [`abort_all`]: fn@Self::abort_all
366 /// [`join_next`]: fn@Self::join_next
367 pub async fn shutdown(&mut self) {
368 self.abort_all();
369 while self.join_next().await.is_some() {}
370 }
371
372 /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
373 ///
374 /// The results will be stored in the order they completed not the order they were spawned.
375 /// This is a convenience method that is equivalent to calling [`join_next`] in
376 /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
377 /// to `join_all` will panic and all remaining tasks on the `JoinSet` are
378 /// cancelled. To handle errors in any other way, manually call [`join_next`]
379 /// in a loop.
380 ///
381 /// # Examples
382 ///
383 /// Spawn multiple tasks and `join_all` them.
384 ///
385 /// ```
386 /// use tokio::task::JoinSet;
387 /// use std::time::Duration;
388 ///
389 /// #[tokio::main]
390 /// async fn main() {
391 /// let mut set = JoinSet::new();
392 ///
393 /// for i in 0..3 {
394 /// set.spawn(async move {
395 /// tokio::time::sleep(Duration::from_secs(3 - i)).await;
396 /// i
397 /// });
398 /// }
399 ///
400 /// let output = set.join_all().await;
401 /// assert_eq!(output, vec![2, 1, 0]);
402 /// }
403 /// ```
404 ///
405 /// Equivalent implementation of `join_all`, using [`join_next`] and loop.
406 ///
407 /// ```
408 /// use tokio::task::JoinSet;
409 /// use std::panic;
410 ///
411 /// #[tokio::main]
412 /// async fn main() {
413 /// let mut set = JoinSet::new();
414 ///
415 /// for i in 0..3 {
416 /// set.spawn(async move {i});
417 /// }
418 ///
419 /// let mut output = Vec::new();
420 /// while let Some(res) = set.join_next().await{
421 /// match res {
422 /// Ok(t) => output.push(t),
423 /// Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
424 /// Err(err) => panic!("{err}"),
425 /// }
426 /// }
427 /// assert_eq!(output.len(),3);
428 /// }
429 /// ```
430 /// [`join_next`]: fn@Self::join_next
431 /// [`JoinError::id`]: fn@crate::task::JoinError::id
432 pub async fn join_all(mut self) -> Vec<T> {
433 let mut output = Vec::with_capacity(self.len());
434
435 while let Some(res) = self.join_next().await {
436 match res {
437 Ok(t) => output.push(t),
438 Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
439 Err(err) => panic!("{err}"),
440 }
441 }
442 output
443 }
444
445 /// Aborts all tasks on this `JoinSet`.
446 ///
447 /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
448 /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
449 pub fn abort_all(&mut self) {
450 self.inner.for_each(|jh| jh.abort());
451 }
452
453 /// Removes all tasks from this `JoinSet` without aborting them.
454 ///
455 /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
456 /// is dropped.
457 pub fn detach_all(&mut self) {
458 self.inner.drain(drop);
459 }
460
461 /// Polls for one of the tasks in the set to complete.
462 ///
463 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
464 ///
465 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
466 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
467 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
468 /// scheduled to receive a wakeup.
469 ///
470 /// # Returns
471 ///
472 /// This function returns:
473 ///
474 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
475 /// available right now.
476 /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
477 /// The `value` is the return value of one of the tasks that completed.
478 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
479 /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
480 /// * `Poll::Ready(None)` if the `JoinSet` is empty.
481 ///
482 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
483 /// This can happen if the [coop budget] is reached.
484 ///
485 /// [coop budget]: crate::task#cooperative-scheduling
486 pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
487 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
488 // the `notified` list if the waker is notified in the `poll` call below.
489 let mut entry = match self.inner.pop_notified(cx.waker()) {
490 Some(entry) => entry,
491 None => {
492 if self.is_empty() {
493 return Poll::Ready(None);
494 } else {
495 // The waker was set by `pop_notified`.
496 return Poll::Pending;
497 }
498 }
499 };
500
501 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
502
503 if let Poll::Ready(res) = res {
504 let _entry = entry.remove();
505 Poll::Ready(Some(res))
506 } else {
507 // A JoinHandle generally won't emit a wakeup without being ready unless
508 // the coop limit has been reached. We yield to the executor in this
509 // case.
510 cx.waker().wake_by_ref();
511 Poll::Pending
512 }
513 }
514
515 /// Polls for one of the tasks in the set to complete.
516 ///
517 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
518 ///
519 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
520 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
521 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
522 /// scheduled to receive a wakeup.
523 ///
524 /// # Returns
525 ///
526 /// This function returns:
527 ///
528 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
529 /// available right now.
530 /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
531 /// The `value` is the return value of one of the tasks that completed, and
532 /// `id` is the [task ID] of that task.
533 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
534 /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
535 /// * `Poll::Ready(None)` if the `JoinSet` is empty.
536 ///
537 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
538 /// This can happen if the [coop budget] is reached.
539 ///
540 /// [coop budget]: crate::task#cooperative-scheduling
541 /// [task ID]: crate::task::Id
542 pub fn poll_join_next_with_id(
543 &mut self,
544 cx: &mut Context<'_>,
545 ) -> Poll<Option<Result<(Id, T), JoinError>>> {
546 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
547 // the `notified` list if the waker is notified in the `poll` call below.
548 let mut entry = match self.inner.pop_notified(cx.waker()) {
549 Some(entry) => entry,
550 None => {
551 if self.is_empty() {
552 return Poll::Ready(None);
553 } else {
554 // The waker was set by `pop_notified`.
555 return Poll::Pending;
556 }
557 }
558 };
559
560 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
561
562 if let Poll::Ready(res) = res {
563 let entry = entry.remove();
564 // If the task succeeded, add the task ID to the output. Otherwise, the
565 // `JoinError` will already have the task's ID.
566 Poll::Ready(Some(res.map(|output| (entry.id(), output))))
567 } else {
568 // A JoinHandle generally won't emit a wakeup without being ready unless
569 // the coop limit has been reached. We yield to the executor in this
570 // case.
571 cx.waker().wake_by_ref();
572 Poll::Pending
573 }
574 }
575}
576
577impl<T> Drop for JoinSet<T> {
578 fn drop(&mut self) {
579 self.inner.drain(|join_handle| join_handle.abort());
580 }
581}
582
583impl<T> fmt::Debug for JoinSet<T> {
584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585 f.debug_struct("JoinSet").field("len", &self.len()).finish()
586 }
587}
588
589impl<T> Default for JoinSet<T> {
590 fn default() -> Self {
591 Self::new()
592 }
593}
594
595/// Collect an iterator of futures into a [`JoinSet`].
596///
597/// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator.
598///
599/// # Examples
600///
601/// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]:
602///
603/// ```
604/// use tokio::task::JoinSet;
605///
606/// #[tokio::main]
607/// async fn main() {
608/// let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect();
609///
610/// let mut seen = [false; 10];
611/// while let Some(res) = set.join_next().await {
612/// let idx = res.unwrap();
613/// seen[idx] = true;
614/// }
615///
616/// for i in 0..10 {
617/// assert!(seen[i]);
618/// }
619/// }
620/// ```
621///
622/// [`collect`]: std::iter::Iterator::collect
623impl<T, F> std::iter::FromIterator<F> for JoinSet<T>
624where
625 F: Future<Output = T>,
626 F: Send + 'static,
627 T: Send + 'static,
628{
629 fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
630 let mut set = Self::new();
631 iter.into_iter().for_each(|task| {
632 set.spawn(task);
633 });
634 set
635 }
636}
637
638// === impl Builder ===
639
640#[cfg(all(tokio_unstable, feature = "tracing"))]
641#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
642impl<'a, T: 'static> Builder<'a, T> {
643 /// Assigns a name to the task which will be spawned.
644 pub fn name(self, name: &'a str) -> Self {
645 let builder = self.builder.name(name);
646 Self { builder, ..self }
647 }
648
649 /// Spawn the provided task with this builder's settings and store it in the
650 /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
651 /// cancel the task.
652 ///
653 /// # Returns
654 ///
655 /// An [`AbortHandle`] that can be used to remotely cancel the task.
656 ///
657 /// # Panics
658 ///
659 /// This method panics if called outside of a Tokio runtime.
660 ///
661 /// [`AbortHandle`]: crate::task::AbortHandle
662 #[track_caller]
663 pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
664 where
665 F: Future<Output = T>,
666 F: Send + 'static,
667 T: Send,
668 {
669 Ok(self.joinset.insert(self.builder.spawn(future)?))
670 }
671
672 /// Spawn the provided task on the provided [runtime handle] with this
673 /// builder's settings, and store it in the [`JoinSet`].
674 ///
675 /// # Returns
676 ///
677 /// An [`AbortHandle`] that can be used to remotely cancel the task.
678 ///
679 ///
680 /// [`AbortHandle`]: crate::task::AbortHandle
681 /// [runtime handle]: crate::runtime::Handle
682 #[track_caller]
683 pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
684 where
685 F: Future<Output = T>,
686 F: Send + 'static,
687 T: Send,
688 {
689 Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
690 }
691
692 /// Spawn the blocking code on the blocking threadpool with this builder's
693 /// settings, and store it in the [`JoinSet`].
694 ///
695 /// # Returns
696 ///
697 /// An [`AbortHandle`] that can be used to remotely cancel the task.
698 ///
699 /// # Panics
700 ///
701 /// This method panics if called outside of a Tokio runtime.
702 ///
703 /// [`JoinSet`]: crate::task::JoinSet
704 /// [`AbortHandle`]: crate::task::AbortHandle
705 #[track_caller]
706 pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle>
707 where
708 F: FnOnce() -> T,
709 F: Send + 'static,
710 T: Send,
711 {
712 Ok(self.joinset.insert(self.builder.spawn_blocking(f)?))
713 }
714
715 /// Spawn the blocking code on the blocking threadpool of the provided
716 /// runtime handle with this builder's settings, and store it in the
717 /// [`JoinSet`].
718 ///
719 /// # Returns
720 ///
721 /// An [`AbortHandle`] that can be used to remotely cancel the task.
722 ///
723 /// [`JoinSet`]: crate::task::JoinSet
724 /// [`AbortHandle`]: crate::task::AbortHandle
725 #[track_caller]
726 pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle>
727 where
728 F: FnOnce() -> T,
729 F: Send + 'static,
730 T: Send,
731 {
732 Ok(self
733 .joinset
734 .insert(self.builder.spawn_blocking_on(f, handle)?))
735 }
736
737 /// Spawn the provided task on the current [`LocalSet`] with this builder's
738 /// settings, and store it in the [`JoinSet`].
739 ///
740 /// # Returns
741 ///
742 /// An [`AbortHandle`] that can be used to remotely cancel the task.
743 ///
744 /// # Panics
745 ///
746 /// This method panics if it is called outside of a `LocalSet`.
747 ///
748 /// [`LocalSet`]: crate::task::LocalSet
749 /// [`AbortHandle`]: crate::task::AbortHandle
750 #[track_caller]
751 pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
752 where
753 F: Future<Output = T>,
754 F: 'static,
755 {
756 Ok(self.joinset.insert(self.builder.spawn_local(future)?))
757 }
758
759 /// Spawn the provided task on the provided [`LocalSet`] with this builder's
760 /// settings, and store it in the [`JoinSet`].
761 ///
762 /// # Returns
763 ///
764 /// An [`AbortHandle`] that can be used to remotely cancel the task.
765 ///
766 /// [`LocalSet`]: crate::task::LocalSet
767 /// [`AbortHandle`]: crate::task::AbortHandle
768 #[track_caller]
769 pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
770 where
771 F: Future<Output = T>,
772 F: 'static,
773 {
774 Ok(self
775 .joinset
776 .insert(self.builder.spawn_local_on(future, local_set)?))
777 }
778}
779
780// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
781// `Debug`.
782#[cfg(all(tokio_unstable, feature = "tracing"))]
783#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
784impl<'a, T> fmt::Debug for Builder<'a, T> {
785 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
786 f.debug_struct("join_set::Builder")
787 .field("joinset", &self.joinset)
788 .field("builder", &self.builder)
789 .finish()
790 }
791}