tokio/process/unix/
reap.rs

1use crate::process::imp::orphan::{OrphanQueue, Wait};
2use crate::process::kill::Kill;
3use crate::signal::unix::InternalStream;
4
5use std::future::Future;
6use std::io;
7use std::ops::Deref;
8use std::pin::Pin;
9use std::process::ExitStatus;
10use std::task::Context;
11use std::task::Poll;
12
13/// Orchestrates between registering interest for receiving signals when a
14/// child process has exited, and attempting to poll for process completion.
15#[derive(Debug)]
16pub(crate) struct Reaper<W, Q, S>
17where
18    W: Wait,
19    Q: OrphanQueue<W>,
20{
21    inner: Option<W>,
22    orphan_queue: Q,
23    signal: S,
24}
25
26impl<W, Q, S> Deref for Reaper<W, Q, S>
27where
28    W: Wait,
29    Q: OrphanQueue<W>,
30{
31    type Target = W;
32
33    fn deref(&self) -> &Self::Target {
34        self.inner()
35    }
36}
37
38impl<W, Q, S> Reaper<W, Q, S>
39where
40    W: Wait,
41    Q: OrphanQueue<W>,
42{
43    pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
44        Self {
45            inner: Some(inner),
46            orphan_queue,
47            signal,
48        }
49    }
50
51    fn inner(&self) -> &W {
52        self.inner.as_ref().expect("inner has gone away")
53    }
54
55    pub(crate) fn inner_mut(&mut self) -> &mut W {
56        self.inner.as_mut().expect("inner has gone away")
57    }
58}
59
60impl<W, Q, S> Future for Reaper<W, Q, S>
61where
62    W: Wait + Unpin,
63    Q: OrphanQueue<W> + Unpin,
64    S: InternalStream + Unpin,
65{
66    type Output = io::Result<ExitStatus>;
67
68    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69        loop {
70            // If the child hasn't exited yet, then it's our responsibility to
71            // ensure the current task gets notified when it might be able to
72            // make progress. We can use the delivery of a SIGCHLD signal as a
73            // sign that we can potentially make progress.
74            //
75            // However, we will register for a notification on the next signal
76            // BEFORE we poll the child. Otherwise it is possible that the child
77            // can exit and the signal can arrive after we last polled the child,
78            // but before we've registered for a notification on the next signal
79            // (this can cause a deadlock if there are no more spawned children
80            // which can generate a different signal for us). A side effect of
81            // pre-registering for signal notifications is that when the child
82            // exits, we will have already registered for an additional
83            // notification we don't need to consume. If another signal arrives,
84            // this future's task will be notified/woken up again. Since the
85            // futures model allows for spurious wake ups this extra wakeup
86            // should not cause significant issues with parent futures.
87            let registered_interest = self.signal.poll_recv(cx).is_pending();
88
89            if let Some(status) = self.inner_mut().try_wait()? {
90                return Poll::Ready(Ok(status));
91            }
92
93            // If our attempt to poll for the next signal was not ready, then
94            // we've arranged for our task to get notified and we can bail out.
95            if registered_interest {
96                return Poll::Pending;
97            } else {
98                // Otherwise, if the signal stream delivered a signal to us, we
99                // won't get notified at the next signal, so we'll loop and try
100                // again.
101                continue;
102            }
103        }
104    }
105}
106
107impl<W, Q, S> Kill for Reaper<W, Q, S>
108where
109    W: Kill + Wait,
110    Q: OrphanQueue<W>,
111{
112    fn kill(&mut self) -> io::Result<()> {
113        self.inner_mut().kill()
114    }
115}
116
117impl<W, Q, S> Drop for Reaper<W, Q, S>
118where
119    W: Wait,
120    Q: OrphanQueue<W>,
121{
122    fn drop(&mut self) {
123        if let Ok(Some(_)) = self.inner_mut().try_wait() {
124            return;
125        }
126
127        let orphan = self.inner.take().unwrap();
128        self.orphan_queue.push_orphan(orphan);
129    }
130}
131
132#[cfg(all(test, not(loom)))]
133mod test {
134    use super::*;
135
136    use crate::process::unix::orphan::test::MockQueue;
137    use futures::future::FutureExt;
138    use std::os::unix::process::ExitStatusExt;
139    use std::process::ExitStatus;
140    use std::task::Context;
141    use std::task::Poll;
142
143    #[derive(Debug)]
144    struct MockWait {
145        total_kills: usize,
146        total_waits: usize,
147        num_wait_until_status: usize,
148        status: ExitStatus,
149    }
150
151    impl MockWait {
152        fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
153            Self {
154                total_kills: 0,
155                total_waits: 0,
156                num_wait_until_status,
157                status,
158            }
159        }
160    }
161
162    impl Wait for MockWait {
163        fn id(&self) -> u32 {
164            0
165        }
166
167        fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
168            let ret = if self.num_wait_until_status == self.total_waits {
169                Some(self.status)
170            } else {
171                None
172            };
173
174            self.total_waits += 1;
175            Ok(ret)
176        }
177    }
178
179    impl Kill for MockWait {
180        fn kill(&mut self) -> io::Result<()> {
181            self.total_kills += 1;
182            Ok(())
183        }
184    }
185
186    struct MockStream {
187        total_polls: usize,
188        values: Vec<Option<()>>,
189    }
190
191    impl MockStream {
192        fn new(values: Vec<Option<()>>) -> Self {
193            Self {
194                total_polls: 0,
195                values,
196            }
197        }
198    }
199
200    impl InternalStream for MockStream {
201        fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
202            self.total_polls += 1;
203            match self.values.remove(0) {
204                Some(()) => Poll::Ready(Some(())),
205                None => Poll::Pending,
206            }
207        }
208    }
209
210    #[test]
211    fn reaper() {
212        let exit = ExitStatus::from_raw(0);
213        let mock = MockWait::new(exit, 3);
214        let mut grim = Reaper::new(
215            mock,
216            MockQueue::new(),
217            MockStream::new(vec![None, Some(()), None, None, None]),
218        );
219
220        let waker = futures::task::noop_waker();
221        let mut context = Context::from_waker(&waker);
222
223        // Not yet exited, interest registered
224        assert!(grim.poll_unpin(&mut context).is_pending());
225        assert_eq!(1, grim.signal.total_polls);
226        assert_eq!(1, grim.total_waits);
227        assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
228
229        // Not yet exited, couldn't register interest the first time
230        // but managed to register interest the second time around
231        assert!(grim.poll_unpin(&mut context).is_pending());
232        assert_eq!(3, grim.signal.total_polls);
233        assert_eq!(3, grim.total_waits);
234        assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
235
236        // Exited
237        if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
238            assert!(r.is_ok());
239            let exit_code = r.unwrap();
240            assert_eq!(exit_code, exit);
241        } else {
242            unreachable!();
243        }
244        assert_eq!(4, grim.signal.total_polls);
245        assert_eq!(4, grim.total_waits);
246        assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
247    }
248
249    #[test]
250    fn kill() {
251        let exit = ExitStatus::from_raw(0);
252        let mut grim = Reaper::new(
253            MockWait::new(exit, 0),
254            MockQueue::new(),
255            MockStream::new(vec![None]),
256        );
257
258        grim.kill().unwrap();
259        assert_eq!(1, grim.total_kills);
260        assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
261    }
262
263    #[test]
264    fn drop_reaps_if_possible() {
265        let exit = ExitStatus::from_raw(0);
266        let mut mock = MockWait::new(exit, 0);
267
268        {
269            let queue = MockQueue::new();
270
271            let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
272
273            drop(grim);
274
275            assert!(queue.all_enqueued.borrow().is_empty());
276        }
277
278        assert_eq!(1, mock.total_waits);
279        assert_eq!(0, mock.total_kills);
280    }
281
282    #[test]
283    fn drop_enqueues_orphan_if_wait_fails() {
284        let exit = ExitStatus::from_raw(0);
285        let mut mock = MockWait::new(exit, 2);
286
287        {
288            let queue = MockQueue::<&mut MockWait>::new();
289            let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
290            drop(grim);
291
292            assert_eq!(1, queue.all_enqueued.borrow().len());
293        }
294
295        assert_eq!(1, mock.total_waits);
296        assert_eq!(0, mock.total_kills);
297    }
298}