async_executors/iface/
join_handle.rs1#[ allow(unused_imports) ] use
4{
5 std :: { future::Future, sync::atomic::{ AtomicBool, Ordering } } ,
6 std :: { task::{ Poll, Context }, pin::Pin } ,
7 futures_util:: { future::{ AbortHandle, Aborted, RemoteHandle }, ready } ,
8 super :: *,
9};
10
11
12
13
14#[ derive( Debug ) ]
33#[ must_use = "JoinHandle will cancel your future when dropped unless you await it." ]
35pub struct JoinHandle<T> { inner: InnerJh<T> }
37
38
39
40impl<T> JoinHandle<T>
41{
42 #[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
45 pub fn tokio( handle: TokioJoinHandle<T> ) -> Self
47 {
48 let detached = false;
49 let inner = InnerJh::Tokio { handle, detached };
50
51 Self{ inner }
52 }
53
54
55
56 #[ cfg( feature = "async_global" ) ]
59 pub fn async_global( task: AsyncGlobalTask<T> ) -> Self
61 {
62 let task = Some( task );
63 let inner = InnerJh::AsyncGlobal{ task };
64
65 Self{ inner }
66 }
67
68
69
70 #[ cfg( feature = "async_std" ) ]
74 pub fn async_std
76 (
77 handle : AsyncStdJoinHandle<Result<T, Aborted>> ,
78 a_handle: AbortHandle ,
79
80 ) -> Self
81 {
82 let detached = false;
83 let inner = InnerJh::AsyncStd{ handle, a_handle, detached };
84
85 Self{ inner }
86 }
87
88
89 pub fn remote_handle( handle: RemoteHandle<T> ) -> Self
92 {
93 let inner = InnerJh::RemoteHandle{ handle: Some(handle) };
94
95 Self{ inner }
96 }
97}
98
99
100
101#[ derive(Debug) ] #[ allow(dead_code) ]
102enum InnerJh<T>
104{
105 #[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
108 Tokio
110 {
111 handle : TokioJoinHandle<T> ,
112 detached: bool ,
113 },
114
115 #[ cfg( feature = "async_global" ) ]
118 AsyncGlobal
120 {
121 task: Option< AsyncGlobalTask<T> > ,
122 },
123
124 #[ cfg( feature = "async_std" ) ]
127 AsyncStd
129 {
130 handle : AsyncStdJoinHandle<Result<T, Aborted>> ,
131 a_handle: AbortHandle ,
132 detached: bool ,
133 },
134
135 RemoteHandle
138 {
139 handle: Option<RemoteHandle<T>>,
140 },
141}
142
143
144
145impl<T> JoinHandle<T>
146{
147 pub fn detach( mut self )
152 {
153 match &mut self.inner
154 {
155 #[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
156 InnerJh::Tokio{ ref mut detached, .. } =>
158 {
159 *detached = true;
164 }
165
166 #[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task } =>
167 {
168 let task = task.take();
169 task.unwrap().detach();
170 }
171
172 #[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ ref mut detached, .. } =>
173 {
174 *detached = true;
175 }
176
177 InnerJh::RemoteHandle{ handle } =>
178 {
179 if let Some(rh) = handle.take() { rh.forget() };
180 }
181 }
182 }
183}
184
185
186
187impl<T: 'static> Future for JoinHandle<T>
188{
189 type Output = T;
190
191 fn poll( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>
192 {
193 match &mut self.get_mut().inner
194 {
195 #[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
196 InnerJh::Tokio{ handle, .. } =>
198 {
199 match ready!( Pin::new( handle ).poll( cx ) )
200 {
201 Ok (t) => Poll::Ready( t ),
202
203 Err(e) =>
204 {
205 panic!( "Task has been canceled or it has panicked. Are you dropping the executor to early? Error: {}", e );
206 }
207 }
208 }
209
210
211 #[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd{ handle, .. } =>
212 {
213 match ready!( Pin::new( handle ).poll( cx ) )
214 {
215 Ok (t) => Poll::Ready( t ),
216 Err(_) => unreachable!(),
217 }
218 }
219
220
221 #[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal{ task, .. } =>
222 {
223 Pin::new( task.as_mut().unwrap() ).poll( cx )
224 }
225
226
227 InnerJh::RemoteHandle{ ref mut handle } => Pin::new( handle ).as_pin_mut().expect( "no polling after detach" ).poll( cx ),
228 }
229 }
230}
231
232
233
234impl<T> Drop for JoinHandle<T>
235{
236 fn drop( &mut self )
239 {
240 match &mut self.inner
241 {
242 #[ cfg(any( feature = "tokio_tp", feature = "tokio_ct" )) ]
243 InnerJh::Tokio{ handle, detached, .. } =>
245
246 if !*detached { handle.abort() },
247
248
249 #[ cfg( feature = "async_std" ) ] InnerJh::AsyncStd { a_handle, detached, .. } =>
250
251 if !*detached { a_handle.abort() },
252
253
254 #[ cfg( feature = "async_global" ) ] InnerJh::AsyncGlobal { .. } => {}
257
258
259 InnerJh::RemoteHandle{ .. } => {},
260 };
261 }
262}