tor_rtcompat/
scheduler.rs1use crate::SleepProvider;
4use futures::channel::mpsc;
5use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6use futures::{Stream, StreamExt};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant, SystemTime};
11
12use pin_project::pin_project;
13
14#[derive(Clone, Debug, thiserror::Error)]
23#[non_exhaustive]
24pub enum SleepError {
25 #[error("All task handles dropped: task exiting.")]
28 ScheduleDropped,
29}
30
31#[derive(Copy, Clone)]
33enum SchedulerCommand {
34 Fire,
36 FireAt(Instant),
38 Cancel,
40 Suspend,
43 Resume,
46}
47
48#[pin_project(project = TaskScheduleP)]
53pub struct TaskSchedule<R: SleepProvider> {
54 sleep: Option<Pin<Box<R::SleepFuture>>>,
56 rx: UnboundedReceiver<SchedulerCommand>,
58 rt: R,
60 instant_fire: bool,
65 suspended: bool,
68}
69
70#[derive(Clone)]
75pub struct TaskHandle {
76 tx: UnboundedSender<SchedulerCommand>,
78}
79
80impl<R: SleepProvider> TaskSchedule<R> {
81 pub fn new(rt: R) -> (Self, TaskHandle) {
83 let (tx, rx) = mpsc::unbounded();
84 (
85 Self {
86 sleep: None,
87 rx,
88 rt,
89 instant_fire: true,
91 suspended: false,
92 },
93 TaskHandle { tx },
94 )
95 }
96
97 pub fn fire_in(&mut self, dur: Duration) {
99 self.instant_fire = false;
100 self.sleep = Some(Box::pin(self.rt.sleep(dur)));
101 }
102
103 pub fn fire(&mut self) {
105 self.instant_fire = true;
106 self.sleep = None;
107 }
108
109 pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
128 self.fire_in(dur);
129 self.next().await.ok_or(SleepError::ScheduleDropped)
130 }
131
132 pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
136 loop {
137 let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
138 self.sleep(delay).await?;
139 if finished {
140 return Ok(());
141 }
142 }
143 }
144}
145
146impl TaskHandle {
147 pub fn fire(&self) -> bool {
151 self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
152 }
153 pub fn fire_at(&self, instant: Instant) -> bool {
157 self.tx
158 .unbounded_send(SchedulerCommand::FireAt(instant))
159 .is_ok()
160 }
161 pub fn cancel(&self) -> bool {
165 self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
166 }
167
168 pub fn suspend(&self) -> bool {
177 self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
178 }
179
180 pub fn resume(&self) -> bool {
190 self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
191 }
192}
193
194impl<R: SleepProvider> TaskScheduleP<'_, R> {
197 fn handle_command(&mut self, cmd: SchedulerCommand) {
199 match cmd {
200 SchedulerCommand::Fire => {
201 *self.instant_fire = true;
202 *self.sleep = None;
203 }
204 SchedulerCommand::FireAt(instant) => {
205 let now = self.rt.now();
206 let dur = instant.saturating_duration_since(now);
207 *self.instant_fire = false;
208 *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
209 }
210 SchedulerCommand::Cancel => {
211 *self.instant_fire = false;
212 *self.sleep = None;
213 }
214 SchedulerCommand::Suspend => {
215 *self.suspended = true;
216 }
217 SchedulerCommand::Resume => {
218 *self.suspended = false;
219 }
220 }
221 }
222}
223
224impl<R: SleepProvider> Stream for TaskSchedule<R> {
225 type Item = ();
226
227 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228 let mut this = self.project();
229 while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
230 match maybe_cmd {
231 Some(c) => this.handle_command(c),
232 None => {
233 return Poll::Ready(None);
235 }
236 }
237 }
238 if *this.suspended {
239 return Poll::Pending;
240 }
241 if *this.instant_fire {
242 *this.instant_fire = false;
243 return Poll::Ready(Some(()));
244 }
245 if this
246 .sleep
247 .as_mut()
248 .map(|x| x.as_mut().poll(cx).is_ready())
249 .unwrap_or(false)
250 {
251 *this.sleep = None;
252 return Poll::Ready(Some(()));
253 }
254 Poll::Pending
255 }
256}
257
258#[cfg(all(
260 test,
261 any(feature = "native-tls", feature = "rustls"),
262 any(feature = "tokio", feature = "async-std"),
263 not(miri), ))]
265mod test {
266 use crate::scheduler::TaskSchedule;
267 use crate::{test_with_all_runtimes, SleepProvider};
268 use futures::FutureExt;
269 use futures::StreamExt;
270 use std::time::{Duration, Instant};
271
272 #[test]
273 fn it_fires_immediately() {
274 test_with_all_runtimes!(|rt| async move {
275 let (mut sch, _hdl) = TaskSchedule::new(rt);
276 assert!(sch.next().now_or_never().is_some());
277 });
278 }
279
280 #[test]
281 #[allow(clippy::unwrap_used)]
282 fn it_dies_if_dropped() {
283 test_with_all_runtimes!(|rt| async move {
284 let (mut sch, hdl) = TaskSchedule::new(rt);
285 drop(hdl);
286 assert!(sch.next().now_or_never().unwrap().is_none());
287 });
288 }
289
290 #[test]
291 fn it_fires_on_demand() {
292 test_with_all_runtimes!(|rt| async move {
293 let (mut sch, hdl) = TaskSchedule::new(rt);
294 assert!(sch.next().now_or_never().is_some());
295
296 assert!(sch.next().now_or_never().is_none());
297 assert!(hdl.fire());
298 assert!(sch.next().now_or_never().is_some());
299 assert!(sch.next().now_or_never().is_none());
300 });
301 }
302
303 #[test]
304 fn it_cancels_instant_firings() {
305 test_with_all_runtimes!(|rt| async move {
308 let (mut sch, hdl) = TaskSchedule::new(rt);
309 assert!(sch.next().now_or_never().is_some());
310
311 assert!(sch.next().now_or_never().is_none());
312 assert!(hdl.fire());
313 assert!(hdl.cancel());
314 assert!(sch.next().now_or_never().is_none());
315 });
316 }
317
318 #[test]
319 fn it_fires_after_self_reschedule() {
320 test_with_all_runtimes!(|rt| async move {
321 let (mut sch, _hdl) = TaskSchedule::new(rt);
322 assert!(sch.next().now_or_never().is_some());
323
324 sch.fire_in(Duration::from_millis(100));
325
326 assert!(sch.next().now_or_never().is_none());
327 assert!(sch.next().await.is_some());
328 assert!(sch.next().now_or_never().is_none());
329 });
330 }
331
332 #[test]
333 fn it_fires_after_external_reschedule() {
334 test_with_all_runtimes!(|rt| async move {
335 let (mut sch, hdl) = TaskSchedule::new(rt);
336 assert!(sch.next().now_or_never().is_some());
337
338 hdl.fire_at(Instant::now() + Duration::from_millis(100));
339
340 assert!(sch.next().now_or_never().is_none());
341 assert!(sch.next().await.is_some());
342 assert!(sch.next().now_or_never().is_none());
343 });
344 }
345
346 #[test]
351 #[ignore]
352 fn it_cancels_delayed_firings() {
353 test_with_all_runtimes!(|rt| async move {
354 let (mut sch, hdl) = TaskSchedule::new(rt.clone());
355 assert!(sch.next().now_or_never().is_some());
356
357 hdl.fire_at(Instant::now() + Duration::from_millis(100));
358
359 assert!(sch.next().now_or_never().is_none());
360
361 rt.sleep(Duration::from_millis(50)).await;
362
363 assert!(sch.next().now_or_never().is_none());
364
365 hdl.cancel();
366
367 assert!(sch.next().now_or_never().is_none());
368
369 rt.sleep(Duration::from_millis(100)).await;
370
371 assert!(sch.next().now_or_never().is_none());
372 });
373 }
374
375 #[test]
376 fn last_fire_wins() {
377 test_with_all_runtimes!(|rt| async move {
378 let (mut sch, hdl) = TaskSchedule::new(rt.clone());
379 assert!(sch.next().now_or_never().is_some());
380
381 hdl.fire_at(Instant::now() + Duration::from_millis(100));
382 hdl.fire();
383
384 assert!(sch.next().now_or_never().is_some());
385 assert!(sch.next().now_or_never().is_none());
386
387 rt.sleep(Duration::from_millis(150)).await;
388
389 assert!(sch.next().now_or_never().is_none());
390 });
391 }
392
393 #[test]
394 fn suspend_and_resume_with_fire() {
395 test_with_all_runtimes!(|rt| async move {
396 let (mut sch, hdl) = TaskSchedule::new(rt.clone());
397 hdl.fire();
398 hdl.suspend();
399
400 assert!(sch.next().now_or_never().is_none());
401 hdl.resume();
402 assert!(sch.next().now_or_never().is_some());
403 });
404 }
405
406 #[test]
407 fn suspend_and_resume_with_sleep() {
408 test_with_all_runtimes!(|rt| async move {
409 let (mut sch, hdl) = TaskSchedule::new(rt.clone());
410 sch.fire_in(Duration::from_millis(100));
411 hdl.suspend();
412
413 assert!(sch.next().now_or_never().is_none());
414 hdl.resume();
415 assert!(sch.next().now_or_never().is_none());
416 assert!(sch.next().await.is_some());
417 });
418 }
419
420 #[test]
421 fn suspend_and_resume_with_nothing() {
422 test_with_all_runtimes!(|rt| async move {
423 let (mut sch, hdl) = TaskSchedule::new(rt.clone());
424 assert!(sch.next().now_or_never().is_some());
425 hdl.suspend();
426
427 assert!(sch.next().now_or_never().is_none());
428 hdl.resume();
429 });
430 }
431}