tokio/task/
join_set.rs

1//! A collection of tasks spawned on a Tokio runtime.
2//!
3//! This module provides the [`JoinSet`] type, a collection which stores a set
4//! of spawned tasks and allows asynchronously awaiting the output of those
5//! tasks as they complete. See the documentation for the [`JoinSet`] type for
6//! details.
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{fmt, panic};
11
12use crate::runtime::Handle;
13use crate::task::Id;
14use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet};
15use crate::util::IdleNotifiedSet;
16
17/// A collection of tasks spawned on a Tokio runtime.
18///
19/// A `JoinSet` can be used to await the completion of some or all of the tasks
20/// in the set. The set is not ordered, and the tasks will be returned in the
21/// order they complete.
22///
23/// All of the tasks must have the same return type `T`.
24///
25/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
26///
27/// # Examples
28///
29/// Spawn multiple tasks and wait for them.
30///
31/// ```
32/// use tokio::task::JoinSet;
33///
34/// #[tokio::main]
35/// async fn main() {
36///     let mut set = JoinSet::new();
37///
38///     for i in 0..10 {
39///         set.spawn(async move { i });
40///     }
41///
42///     let mut seen = [false; 10];
43///     while let Some(res) = set.join_next().await {
44///         let idx = res.unwrap();
45///         seen[idx] = true;
46///     }
47///
48///     for i in 0..10 {
49///         assert!(seen[i]);
50///     }
51/// }
52/// ```
53#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
54pub struct JoinSet<T> {
55    inner: IdleNotifiedSet<JoinHandle<T>>,
56}
57
58/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
59/// than on the current default runtime.
60///
61/// [`task::Builder`]: crate::task::Builder
62#[cfg(all(tokio_unstable, feature = "tracing"))]
63#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
64#[must_use = "builders do nothing unless used to spawn a task"]
65pub struct Builder<'a, T> {
66    joinset: &'a mut JoinSet<T>,
67    builder: super::Builder<'a>,
68}
69
70impl<T> JoinSet<T> {
71    /// Create a new `JoinSet`.
72    pub fn new() -> Self {
73        Self {
74            inner: IdleNotifiedSet::new(),
75        }
76    }
77
78    /// Returns the number of tasks currently in the `JoinSet`.
79    pub fn len(&self) -> usize {
80        self.inner.len()
81    }
82
83    /// Returns whether the `JoinSet` is empty.
84    pub fn is_empty(&self) -> bool {
85        self.inner.is_empty()
86    }
87}
88
89impl<T: 'static> JoinSet<T> {
90    /// Returns a [`Builder`] that can be used to configure a task prior to
91    /// spawning it on this `JoinSet`.
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// use tokio::task::JoinSet;
97    ///
98    /// #[tokio::main]
99    /// async fn main() -> std::io::Result<()> {
100    ///     let mut set = JoinSet::new();
101    ///
102    ///     // Use the builder to configure a task's name before spawning it.
103    ///     set.build_task()
104    ///         .name("my_task")
105    ///         .spawn(async { /* ... */ })?;
106    ///
107    ///     Ok(())
108    /// }
109    /// ```
110    #[cfg(all(tokio_unstable, feature = "tracing"))]
111    #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
112    pub fn build_task(&mut self) -> Builder<'_, T> {
113        Builder {
114            builder: super::Builder::new(),
115            joinset: self,
116        }
117    }
118
119    /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
120    /// that can be used to remotely cancel the task.
121    ///
122    /// The provided future will start running in the background immediately
123    /// when this method is called, even if you don't await anything on this
124    /// `JoinSet`.
125    ///
126    /// # Panics
127    ///
128    /// This method panics if called outside of a Tokio runtime.
129    ///
130    /// [`AbortHandle`]: crate::task::AbortHandle
131    #[track_caller]
132    pub fn spawn<F>(&mut self, task: F) -> AbortHandle
133    where
134        F: Future<Output = T>,
135        F: Send + 'static,
136        T: Send,
137    {
138        self.insert(crate::spawn(task))
139    }
140
141    /// Spawn the provided task on the provided runtime and store it in this
142    /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
143    /// cancel the task.
144    ///
145    /// The provided future will start running in the background immediately
146    /// when this method is called, even if you don't await anything on this
147    /// `JoinSet`.
148    ///
149    /// [`AbortHandle`]: crate::task::AbortHandle
150    #[track_caller]
151    pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
152    where
153        F: Future<Output = T>,
154        F: Send + 'static,
155        T: Send,
156    {
157        self.insert(handle.spawn(task))
158    }
159
160    /// Spawn the provided task on the current [`LocalSet`] and store it in this
161    /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
162    /// cancel the task.
163    ///
164    /// The provided future will start running in the background immediately
165    /// when this method is called, even if you don't await anything on this
166    /// `JoinSet`.
167    ///
168    /// # Panics
169    ///
170    /// This method panics if it is called outside of a `LocalSet`.
171    ///
172    /// [`LocalSet`]: crate::task::LocalSet
173    /// [`AbortHandle`]: crate::task::AbortHandle
174    #[track_caller]
175    pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
176    where
177        F: Future<Output = T>,
178        F: 'static,
179    {
180        self.insert(crate::task::spawn_local(task))
181    }
182
183    /// Spawn the provided task on the provided [`LocalSet`] and store it in
184    /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
185    /// remotely cancel the task.
186    ///
187    /// Unlike the [`spawn_local`] method, this method may be used to spawn local
188    /// tasks on a `LocalSet` that is _not_ currently running. The provided
189    /// future will start running whenever the `LocalSet` is next started.
190    ///
191    /// [`LocalSet`]: crate::task::LocalSet
192    /// [`AbortHandle`]: crate::task::AbortHandle
193    /// [`spawn_local`]: Self::spawn_local
194    #[track_caller]
195    pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
196    where
197        F: Future<Output = T>,
198        F: 'static,
199    {
200        self.insert(local_set.spawn_local(task))
201    }
202
203    /// Spawn the blocking code on the blocking threadpool and store
204    /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
205    /// used to remotely cancel the task.
206    ///
207    /// # Examples
208    ///
209    /// Spawn multiple blocking tasks and wait for them.
210    ///
211    /// ```
212    /// use tokio::task::JoinSet;
213    ///
214    /// #[tokio::main]
215    /// async fn main() {
216    ///     let mut set = JoinSet::new();
217    ///
218    ///     for i in 0..10 {
219    ///         set.spawn_blocking(move || { i });
220    ///     }
221    ///
222    ///     let mut seen = [false; 10];
223    ///     while let Some(res) = set.join_next().await {
224    ///         let idx = res.unwrap();
225    ///         seen[idx] = true;
226    ///     }
227    ///
228    ///     for i in 0..10 {
229    ///         assert!(seen[i]);
230    ///     }
231    /// }
232    /// ```
233    ///
234    /// # Panics
235    ///
236    /// This method panics if called outside of a Tokio runtime.
237    ///
238    /// [`AbortHandle`]: crate::task::AbortHandle
239    #[track_caller]
240    pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
241    where
242        F: FnOnce() -> T,
243        F: Send + 'static,
244        T: Send,
245    {
246        self.insert(crate::runtime::spawn_blocking(f))
247    }
248
249    /// Spawn the blocking code on the blocking threadpool of the
250    /// provided runtime and store it in this `JoinSet`, returning an
251    /// [`AbortHandle`] that can be used to remotely cancel the task.
252    ///
253    /// [`AbortHandle`]: crate::task::AbortHandle
254    #[track_caller]
255    pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
256    where
257        F: FnOnce() -> T,
258        F: Send + 'static,
259        T: Send,
260    {
261        self.insert(handle.spawn_blocking(f))
262    }
263
264    fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
265        let abort = jh.abort_handle();
266        let mut entry = self.inner.insert_idle(jh);
267
268        // Set the waker that is notified when the task completes.
269        entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
270        abort
271    }
272
273    /// Waits until one of the tasks in the set completes and returns its output.
274    ///
275    /// Returns `None` if the set is empty.
276    ///
277    /// # Cancel Safety
278    ///
279    /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
280    /// statement and some other branch completes first, it is guaranteed that no tasks were
281    /// removed from this `JoinSet`.
282    pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
283        std::future::poll_fn(|cx| self.poll_join_next(cx)).await
284    }
285
286    /// Waits until one of the tasks in the set completes and returns its
287    /// output, along with the [task ID] of the completed task.
288    ///
289    /// Returns `None` if the set is empty.
290    ///
291    /// When this method returns an error, then the id of the task that failed can be accessed
292    /// using the [`JoinError::id`] method.
293    ///
294    /// # Cancel Safety
295    ///
296    /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
297    /// statement and some other branch completes first, it is guaranteed that no tasks were
298    /// removed from this `JoinSet`.
299    ///
300    /// [task ID]: crate::task::Id
301    /// [`JoinError::id`]: fn@crate::task::JoinError::id
302    pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
303        std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
304    }
305
306    /// Tries to join one of the tasks in the set that has completed and return its output.
307    ///
308    /// Returns `None` if there are no completed tasks, or if the set is empty.
309    pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
310        // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
311        loop {
312            let mut entry = self.inner.try_pop_notified()?;
313
314            let res = entry.with_value_and_context(|jh, ctx| {
315                // Since this function is not async and cannot be forced to yield, we should
316                // disable budgeting when we want to check for the `JoinHandle` readiness.
317                Pin::new(&mut unconstrained(jh)).poll(ctx)
318            });
319
320            if let Poll::Ready(res) = res {
321                let _entry = entry.remove();
322
323                return Some(res);
324            }
325        }
326    }
327
328    /// Tries to join one of the tasks in the set that has completed and return its output,
329    /// along with the [task ID] of the completed task.
330    ///
331    /// Returns `None` if there are no completed tasks, or if the set is empty.
332    ///
333    /// When this method returns an error, then the id of the task that failed can be accessed
334    /// using the [`JoinError::id`] method.
335    ///
336    /// [task ID]: crate::task::Id
337    /// [`JoinError::id`]: fn@crate::task::JoinError::id
338    pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
339        // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
340        loop {
341            let mut entry = self.inner.try_pop_notified()?;
342
343            let res = entry.with_value_and_context(|jh, ctx| {
344                // Since this function is not async and cannot be forced to yield, we should
345                // disable budgeting when we want to check for the `JoinHandle` readiness.
346                Pin::new(&mut unconstrained(jh)).poll(ctx)
347            });
348
349            if let Poll::Ready(res) = res {
350                let entry = entry.remove();
351
352                return Some(res.map(|output| (entry.id(), output)));
353            }
354        }
355    }
356
357    /// Aborts all tasks and waits for them to finish shutting down.
358    ///
359    /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
360    /// a loop until it returns `None`.
361    ///
362    /// This method ignores any panics in the tasks shutting down. When this call returns, the
363    /// `JoinSet` will be empty.
364    ///
365    /// [`abort_all`]: fn@Self::abort_all
366    /// [`join_next`]: fn@Self::join_next
367    pub async fn shutdown(&mut self) {
368        self.abort_all();
369        while self.join_next().await.is_some() {}
370    }
371
372    /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
373    ///
374    /// The results will be stored in the order they completed not the order they were spawned.
375    /// This is a convenience method that is equivalent to calling [`join_next`] in
376    /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
377    /// to `join_all` will panic and all remaining tasks on the `JoinSet` are
378    /// cancelled. To handle errors in any other way, manually call [`join_next`]
379    /// in a loop.
380    ///
381    /// # Examples
382    ///
383    /// Spawn multiple tasks and `join_all` them.
384    ///
385    /// ```
386    /// use tokio::task::JoinSet;
387    /// use std::time::Duration;
388    ///
389    /// #[tokio::main]
390    /// async fn main() {
391    ///     let mut set = JoinSet::new();
392    ///
393    ///     for i in 0..3 {
394    ///        set.spawn(async move {
395    ///            tokio::time::sleep(Duration::from_secs(3 - i)).await;
396    ///            i
397    ///        });
398    ///     }   
399    ///
400    ///     let output = set.join_all().await;  
401    ///     assert_eq!(output, vec![2, 1, 0]);
402    /// }
403    /// ```
404    ///
405    /// Equivalent implementation of `join_all`, using [`join_next`] and loop.
406    ///
407    /// ```
408    /// use tokio::task::JoinSet;
409    /// use std::panic;
410    ///
411    /// #[tokio::main]
412    /// async fn main() {
413    ///     let mut set = JoinSet::new();
414    ///
415    ///     for i in 0..3 {
416    ///        set.spawn(async move {i});
417    ///     }   
418    ///     
419    ///     let mut output = Vec::new();
420    ///     while let Some(res) = set.join_next().await{
421    ///         match res {
422    ///             Ok(t) => output.push(t),
423    ///             Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
424    ///             Err(err) => panic!("{err}"),
425    ///         }
426    ///     }
427    ///     assert_eq!(output.len(),3);
428    /// }
429    /// ```
430    /// [`join_next`]: fn@Self::join_next
431    /// [`JoinError::id`]: fn@crate::task::JoinError::id
432    pub async fn join_all(mut self) -> Vec<T> {
433        let mut output = Vec::with_capacity(self.len());
434
435        while let Some(res) = self.join_next().await {
436            match res {
437                Ok(t) => output.push(t),
438                Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
439                Err(err) => panic!("{err}"),
440            }
441        }
442        output
443    }
444
445    /// Aborts all tasks on this `JoinSet`.
446    ///
447    /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
448    /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
449    pub fn abort_all(&mut self) {
450        self.inner.for_each(|jh| jh.abort());
451    }
452
453    /// Removes all tasks from this `JoinSet` without aborting them.
454    ///
455    /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
456    /// is dropped.
457    pub fn detach_all(&mut self) {
458        self.inner.drain(drop);
459    }
460
461    /// Polls for one of the tasks in the set to complete.
462    ///
463    /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
464    ///
465    /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
466    /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
467    /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
468    /// scheduled to receive a wakeup.
469    ///
470    /// # Returns
471    ///
472    /// This function returns:
473    ///
474    ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
475    ///     available right now.
476    ///  * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
477    ///     The `value` is the return value of one of the tasks that completed.
478    ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
479    ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
480    ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
481    ///
482    /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
483    /// This can happen if the [coop budget] is reached.
484    ///
485    /// [coop budget]: crate::task#cooperative-scheduling
486    pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
487        // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
488        // the `notified` list if the waker is notified in the `poll` call below.
489        let mut entry = match self.inner.pop_notified(cx.waker()) {
490            Some(entry) => entry,
491            None => {
492                if self.is_empty() {
493                    return Poll::Ready(None);
494                } else {
495                    // The waker was set by `pop_notified`.
496                    return Poll::Pending;
497                }
498            }
499        };
500
501        let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
502
503        if let Poll::Ready(res) = res {
504            let _entry = entry.remove();
505            Poll::Ready(Some(res))
506        } else {
507            // A JoinHandle generally won't emit a wakeup without being ready unless
508            // the coop limit has been reached. We yield to the executor in this
509            // case.
510            cx.waker().wake_by_ref();
511            Poll::Pending
512        }
513    }
514
515    /// Polls for one of the tasks in the set to complete.
516    ///
517    /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
518    ///
519    /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
520    /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
521    /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
522    /// scheduled to receive a wakeup.
523    ///
524    /// # Returns
525    ///
526    /// This function returns:
527    ///
528    ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
529    ///     available right now.
530    ///  * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
531    ///     The `value` is the return value of one of the tasks that completed, and
532    ///    `id` is the [task ID] of that task.
533    ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
534    ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
535    ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
536    ///
537    /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
538    /// This can happen if the [coop budget] is reached.
539    ///
540    /// [coop budget]: crate::task#cooperative-scheduling
541    /// [task ID]: crate::task::Id
542    pub fn poll_join_next_with_id(
543        &mut self,
544        cx: &mut Context<'_>,
545    ) -> Poll<Option<Result<(Id, T), JoinError>>> {
546        // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
547        // the `notified` list if the waker is notified in the `poll` call below.
548        let mut entry = match self.inner.pop_notified(cx.waker()) {
549            Some(entry) => entry,
550            None => {
551                if self.is_empty() {
552                    return Poll::Ready(None);
553                } else {
554                    // The waker was set by `pop_notified`.
555                    return Poll::Pending;
556                }
557            }
558        };
559
560        let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
561
562        if let Poll::Ready(res) = res {
563            let entry = entry.remove();
564            // If the task succeeded, add the task ID to the output. Otherwise, the
565            // `JoinError` will already have the task's ID.
566            Poll::Ready(Some(res.map(|output| (entry.id(), output))))
567        } else {
568            // A JoinHandle generally won't emit a wakeup without being ready unless
569            // the coop limit has been reached. We yield to the executor in this
570            // case.
571            cx.waker().wake_by_ref();
572            Poll::Pending
573        }
574    }
575}
576
577impl<T> Drop for JoinSet<T> {
578    fn drop(&mut self) {
579        self.inner.drain(|join_handle| join_handle.abort());
580    }
581}
582
583impl<T> fmt::Debug for JoinSet<T> {
584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585        f.debug_struct("JoinSet").field("len", &self.len()).finish()
586    }
587}
588
589impl<T> Default for JoinSet<T> {
590    fn default() -> Self {
591        Self::new()
592    }
593}
594
595/// Collect an iterator of futures into a [`JoinSet`].
596///
597/// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator.
598///
599/// # Examples
600///
601/// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]:
602///
603/// ```
604/// use tokio::task::JoinSet;
605///
606/// #[tokio::main]
607/// async fn main() {
608///     let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect();
609///
610///     let mut seen = [false; 10];
611///     while let Some(res) = set.join_next().await {
612///         let idx = res.unwrap();
613///         seen[idx] = true;
614///     }
615///
616///     for i in 0..10 {
617///         assert!(seen[i]);
618///     }
619/// }
620/// ```
621///
622/// [`collect`]: std::iter::Iterator::collect
623impl<T, F> std::iter::FromIterator<F> for JoinSet<T>
624where
625    F: Future<Output = T>,
626    F: Send + 'static,
627    T: Send + 'static,
628{
629    fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
630        let mut set = Self::new();
631        iter.into_iter().for_each(|task| {
632            set.spawn(task);
633        });
634        set
635    }
636}
637
638// === impl Builder ===
639
640#[cfg(all(tokio_unstable, feature = "tracing"))]
641#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
642impl<'a, T: 'static> Builder<'a, T> {
643    /// Assigns a name to the task which will be spawned.
644    pub fn name(self, name: &'a str) -> Self {
645        let builder = self.builder.name(name);
646        Self { builder, ..self }
647    }
648
649    /// Spawn the provided task with this builder's settings and store it in the
650    /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
651    /// cancel the task.
652    ///
653    /// # Returns
654    ///
655    /// An [`AbortHandle`] that can be used to remotely cancel the task.
656    ///
657    /// # Panics
658    ///
659    /// This method panics if called outside of a Tokio runtime.
660    ///
661    /// [`AbortHandle`]: crate::task::AbortHandle
662    #[track_caller]
663    pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
664    where
665        F: Future<Output = T>,
666        F: Send + 'static,
667        T: Send,
668    {
669        Ok(self.joinset.insert(self.builder.spawn(future)?))
670    }
671
672    /// Spawn the provided task on the provided [runtime handle] with this
673    /// builder's settings, and store it in the [`JoinSet`].
674    ///
675    /// # Returns
676    ///
677    /// An [`AbortHandle`] that can be used to remotely cancel the task.
678    ///
679    ///
680    /// [`AbortHandle`]: crate::task::AbortHandle
681    /// [runtime handle]: crate::runtime::Handle
682    #[track_caller]
683    pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
684    where
685        F: Future<Output = T>,
686        F: Send + 'static,
687        T: Send,
688    {
689        Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
690    }
691
692    /// Spawn the blocking code on the blocking threadpool with this builder's
693    /// settings, and store it in the [`JoinSet`].
694    ///
695    /// # Returns
696    ///
697    /// An [`AbortHandle`] that can be used to remotely cancel the task.
698    ///
699    /// # Panics
700    ///
701    /// This method panics if called outside of a Tokio runtime.
702    ///
703    /// [`JoinSet`]: crate::task::JoinSet
704    /// [`AbortHandle`]: crate::task::AbortHandle
705    #[track_caller]
706    pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle>
707    where
708        F: FnOnce() -> T,
709        F: Send + 'static,
710        T: Send,
711    {
712        Ok(self.joinset.insert(self.builder.spawn_blocking(f)?))
713    }
714
715    /// Spawn the blocking code on the blocking threadpool of the provided
716    /// runtime handle with this builder's settings, and store it in the
717    /// [`JoinSet`].
718    ///
719    /// # Returns
720    ///
721    /// An [`AbortHandle`] that can be used to remotely cancel the task.
722    ///
723    /// [`JoinSet`]: crate::task::JoinSet
724    /// [`AbortHandle`]: crate::task::AbortHandle
725    #[track_caller]
726    pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle>
727    where
728        F: FnOnce() -> T,
729        F: Send + 'static,
730        T: Send,
731    {
732        Ok(self
733            .joinset
734            .insert(self.builder.spawn_blocking_on(f, handle)?))
735    }
736
737    /// Spawn the provided task on the current [`LocalSet`] with this builder's
738    /// settings, and store it in the [`JoinSet`].
739    ///
740    /// # Returns
741    ///
742    /// An [`AbortHandle`] that can be used to remotely cancel the task.
743    ///
744    /// # Panics
745    ///
746    /// This method panics if it is called outside of a `LocalSet`.
747    ///
748    /// [`LocalSet`]: crate::task::LocalSet
749    /// [`AbortHandle`]: crate::task::AbortHandle
750    #[track_caller]
751    pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
752    where
753        F: Future<Output = T>,
754        F: 'static,
755    {
756        Ok(self.joinset.insert(self.builder.spawn_local(future)?))
757    }
758
759    /// Spawn the provided task on the provided [`LocalSet`] with this builder's
760    /// settings, and store it in the [`JoinSet`].
761    ///
762    /// # Returns
763    ///
764    /// An [`AbortHandle`] that can be used to remotely cancel the task.
765    ///
766    /// [`LocalSet`]: crate::task::LocalSet
767    /// [`AbortHandle`]: crate::task::AbortHandle
768    #[track_caller]
769    pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
770    where
771        F: Future<Output = T>,
772        F: 'static,
773    {
774        Ok(self
775            .joinset
776            .insert(self.builder.spawn_local_on(future, local_set)?))
777    }
778}
779
780// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
781// `Debug`.
782#[cfg(all(tokio_unstable, feature = "tracing"))]
783#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
784impl<'a, T> fmt::Debug for Builder<'a, T> {
785    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
786        f.debug_struct("join_set::Builder")
787            .field("joinset", &self.joinset)
788            .field("builder", &self.builder)
789            .finish()
790    }
791}