1use crate::future::Future;
10use crate::loom::cell::UnsafeCell;
11use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
12use crate::util::linked_list::{Link, LinkedList};
13use crate::util::sharded_list;
14
15use crate::loom::sync::atomic::{AtomicBool, Ordering};
16use std::marker::PhantomData;
17use std::num::NonZeroU64;
18
19cfg_has_atomic_u64! {
29 use std::sync::atomic::AtomicU64;
30
31 static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
32
33 fn get_next_id() -> NonZeroU64 {
34 loop {
35 let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
36 if let Some(id) = NonZeroU64::new(id) {
37 return id;
38 }
39 }
40 }
41}
42
43cfg_not_has_atomic_u64! {
44 use std::sync::atomic::AtomicU32;
45
46 static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
47
48 fn get_next_id() -> NonZeroU64 {
49 loop {
50 let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
51 if let Some(id) = NonZeroU64::new(u64::from(id)) {
52 return id;
53 }
54 }
55 }
56}
57
58pub(crate) struct OwnedTasks<S: 'static> {
59 list: List<S>,
60 pub(crate) id: NonZeroU64,
61 closed: AtomicBool,
62}
63
64type List<S> = sharded_list::ShardedList<Task<S>, <Task<S> as Link>::Target>;
65
66pub(crate) struct LocalOwnedTasks<S: 'static> {
67 inner: UnsafeCell<OwnedTasksInner<S>>,
68 pub(crate) id: NonZeroU64,
69 _not_send_or_sync: PhantomData<*const ()>,
70}
71
72struct OwnedTasksInner<S: 'static> {
73 list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
74 closed: bool,
75}
76
77impl<S: 'static> OwnedTasks<S> {
78 pub(crate) fn new(num_cores: usize) -> Self {
79 let shard_size = Self::gen_shared_list_size(num_cores);
80 Self {
81 list: List::new(shard_size),
82 closed: AtomicBool::new(false),
83 id: get_next_id(),
84 }
85 }
86
87 pub(crate) fn bind<T>(
90 &self,
91 task: T,
92 scheduler: S,
93 id: super::Id,
94 ) -> (JoinHandle<T::Output>, Option<Notified<S>>)
95 where
96 S: Schedule,
97 T: Future + Send + 'static,
98 T::Output: Send + 'static,
99 {
100 let (task, notified, join) = super::new_task(task, scheduler, id);
101 let notified = unsafe { self.bind_inner(task, notified) };
102 (join, notified)
103 }
104
105 pub(crate) unsafe fn bind_local<T>(
110 &self,
111 task: T,
112 scheduler: S,
113 id: super::Id,
114 ) -> (JoinHandle<T::Output>, Option<Notified<S>>)
115 where
116 S: Schedule,
117 T: Future + 'static,
118 T::Output: 'static,
119 {
120 let (task, notified, join) = super::new_task(task, scheduler, id);
121 let notified = unsafe { self.bind_inner(task, notified) };
122 (join, notified)
123 }
124
125 unsafe fn bind_inner(&self, task: Task<S>, notified: Notified<S>) -> Option<Notified<S>>
127 where
128 S: Schedule,
129 {
130 unsafe {
131 task.header().set_owner_id(self.id);
134 }
135
136 let shard = self.list.lock_shard(&task);
137 if self.closed.load(Ordering::Acquire) {
140 drop(shard);
141 task.shutdown();
142 return None;
143 }
144 shard.push(task);
145 Some(notified)
146 }
147
148 #[inline]
151 pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
152 debug_assert_eq!(task.header().get_owner_id(), Some(self.id));
153 LocalNotified {
156 task: task.0,
157 _not_send: PhantomData,
158 }
159 }
160
161 pub(crate) fn close_and_shutdown_all(&self, start: usize)
167 where
168 S: Schedule,
169 {
170 self.closed.store(true, Ordering::Release);
171 for i in start..self.get_shard_size() + start {
172 loop {
173 let task = self.list.pop_back(i);
174 match task {
175 Some(task) => {
176 task.shutdown();
177 }
178 None => break,
179 }
180 }
181 }
182 }
183
184 #[inline]
185 pub(crate) fn get_shard_size(&self) -> usize {
186 self.list.shard_size()
187 }
188
189 pub(crate) fn num_alive_tasks(&self) -> usize {
190 self.list.len()
191 }
192
193 cfg_64bit_metrics! {
194 pub(crate) fn spawned_tasks_count(&self) -> u64 {
195 self.list.added()
196 }
197 }
198
199 pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
200 let task_id = task.header().get_owner_id()?;
203
204 assert_eq!(task_id, self.id);
205
206 unsafe { self.list.remove(task.header_ptr()) }
209 }
210
211 pub(crate) fn is_empty(&self) -> bool {
212 self.list.is_empty()
213 }
214
215 fn gen_shared_list_size(num_cores: usize) -> usize {
227 const MAX_SHARED_LIST_SIZE: usize = 1 << 16;
228 usize::min(MAX_SHARED_LIST_SIZE, num_cores.next_power_of_two() * 4)
229 }
230}
231
232cfg_taskdump! {
233 impl<S: 'static> OwnedTasks<S> {
234 pub(crate) fn for_each<F>(&self, f: F)
236 where
237 F: FnMut(&Task<S>),
238 {
239 self.list.for_each(f);
240 }
241 }
242}
243
244impl<S: 'static> LocalOwnedTasks<S> {
245 pub(crate) fn new() -> Self {
246 Self {
247 inner: UnsafeCell::new(OwnedTasksInner {
248 list: LinkedList::new(),
249 closed: false,
250 }),
251 id: get_next_id(),
252 _not_send_or_sync: PhantomData,
253 }
254 }
255
256 pub(crate) fn bind<T>(
257 &self,
258 task: T,
259 scheduler: S,
260 id: super::Id,
261 ) -> (JoinHandle<T::Output>, Option<Notified<S>>)
262 where
263 S: Schedule,
264 T: Future + 'static,
265 T::Output: 'static,
266 {
267 let (task, notified, join) = super::new_task(task, scheduler, id);
268
269 unsafe {
270 task.header().set_owner_id(self.id);
273 }
274
275 if self.is_closed() {
276 drop(notified);
277 task.shutdown();
278 (join, None)
279 } else {
280 self.with_inner(|inner| {
281 inner.list.push_front(task);
282 });
283 (join, Some(notified))
284 }
285 }
286
287 pub(crate) fn close_and_shutdown_all(&self)
290 where
291 S: Schedule,
292 {
293 self.with_inner(|inner| inner.closed = true);
294
295 while let Some(task) = self.with_inner(|inner| inner.list.pop_back()) {
296 task.shutdown();
297 }
298 }
299
300 pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
301 let task_id = task.header().get_owner_id()?;
304
305 assert_eq!(task_id, self.id);
306
307 self.with_inner(|inner|
308 unsafe { inner.list.remove(task.header_ptr()) })
311 }
312
313 #[inline]
316 pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
317 assert_eq!(task.header().get_owner_id(), Some(self.id));
318
319 LocalNotified {
323 task: task.0,
324 _not_send: PhantomData,
325 }
326 }
327
328 #[inline]
329 fn with_inner<F, T>(&self, f: F) -> T
330 where
331 F: FnOnce(&mut OwnedTasksInner<S>) -> T,
332 {
333 self.inner.with_mut(|ptr| unsafe { f(&mut *ptr) })
337 }
338
339 pub(crate) fn is_closed(&self) -> bool {
340 self.with_inner(|inner| inner.closed)
341 }
342
343 pub(crate) fn is_empty(&self) -> bool {
344 self.with_inner(|inner| inner.list.is_empty())
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
355 fn test_id_not_broken() {
356 let mut last_id = get_next_id();
357
358 for _ in 0..1000 {
359 let next_id = get_next_id();
360 assert!(last_id < next_id);
361 last_id = next_id;
362 }
363 }
364}