tokio/process/unix/
orphan.rs

1use crate::loom::sync::{Mutex, MutexGuard};
2use crate::runtime::signal::Handle as SignalHandle;
3use crate::signal::unix::{signal_with_handle, SignalKind};
4use crate::sync::watch;
5use std::io;
6use std::process::ExitStatus;
7
8/// An interface for waiting on a process to exit.
9pub(crate) trait Wait {
10    /// Get the identifier for this process or diagnostics.
11    #[allow(dead_code)]
12    fn id(&self) -> u32;
13    /// Try waiting for a process to exit in a non-blocking manner.
14    fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
15}
16
17impl<T: Wait> Wait for &mut T {
18    fn id(&self) -> u32 {
19        (**self).id()
20    }
21
22    fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
23        (**self).try_wait()
24    }
25}
26
27/// An interface for queueing up an orphaned process so that it can be reaped.
28pub(crate) trait OrphanQueue<T> {
29    /// Adds an orphan to the queue.
30    fn push_orphan(&self, orphan: T);
31}
32
33impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
34    fn push_orphan(&self, orphan: T) {
35        (**self).push_orphan(orphan);
36    }
37}
38
39/// An implementation of `OrphanQueue`.
40#[derive(Debug)]
41pub(crate) struct OrphanQueueImpl<T> {
42    sigchild: Mutex<Option<watch::Receiver<()>>>,
43    queue: Mutex<Vec<T>>,
44}
45
46impl<T> OrphanQueueImpl<T> {
47    cfg_not_has_const_mutex_new! {
48        pub(crate) fn new() -> Self {
49            Self {
50                sigchild: Mutex::new(None),
51                queue: Mutex::new(Vec::new()),
52            }
53        }
54    }
55
56    cfg_has_const_mutex_new! {
57        pub(crate) const fn new() -> Self {
58            Self {
59                sigchild: Mutex::const_new(None),
60                queue: Mutex::const_new(Vec::new()),
61            }
62        }
63    }
64
65    #[cfg(test)]
66    fn len(&self) -> usize {
67        self.queue.lock().len()
68    }
69
70    pub(crate) fn push_orphan(&self, orphan: T)
71    where
72        T: Wait,
73    {
74        self.queue.lock().push(orphan);
75    }
76
77    /// Attempts to reap every process in the queue, ignoring any errors and
78    /// enqueueing any orphans which have not yet exited.
79    pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
80    where
81        T: Wait,
82    {
83        // If someone else is holding the lock, they will be responsible for draining
84        // the queue as necessary, so we can safely bail if that happens
85        if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
86            match &mut *sigchild_guard {
87                Some(sigchild) => {
88                    if sigchild.try_has_changed().and_then(Result::ok).is_some() {
89                        drain_orphan_queue(self.queue.lock());
90                    }
91                }
92                None => {
93                    let queue = self.queue.lock();
94
95                    // Be lazy and only initialize the SIGCHLD listener if there
96                    // are any orphaned processes in the queue.
97                    if !queue.is_empty() {
98                        // An errors shouldn't really happen here, but if it does it
99                        // means that the signal driver isn't running, in
100                        // which case there isn't anything we can
101                        // register/initialize here, so we can try again later
102                        if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
103                            *sigchild_guard = Some(sigchild);
104                            drain_orphan_queue(queue);
105                        }
106                    }
107                }
108            }
109        }
110    }
111}
112
113fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
114where
115    T: Wait,
116{
117    for i in (0..queue.len()).rev() {
118        match queue[i].try_wait() {
119            Ok(None) => {}
120            Ok(Some(_)) | Err(_) => {
121                // The stdlib handles interruption errors (EINTR) when polling a child process.
122                // All other errors represent invalid inputs or pids that have already been
123                // reaped, so we can drop the orphan in case an error is raised.
124                queue.swap_remove(i);
125            }
126        }
127    }
128
129    drop(queue);
130}
131
132#[cfg(all(test, not(loom)))]
133pub(crate) mod test {
134    use super::*;
135    use crate::runtime::io::Driver as IoDriver;
136    use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
137    use crate::sync::watch;
138    use std::cell::{Cell, RefCell};
139    use std::io;
140    use std::os::unix::process::ExitStatusExt;
141    use std::process::ExitStatus;
142    use std::rc::Rc;
143
144    pub(crate) struct MockQueue<W> {
145        pub(crate) all_enqueued: RefCell<Vec<W>>,
146    }
147
148    impl<W> MockQueue<W> {
149        pub(crate) fn new() -> Self {
150            Self {
151                all_enqueued: RefCell::new(Vec::new()),
152            }
153        }
154    }
155
156    impl<W> OrphanQueue<W> for MockQueue<W> {
157        fn push_orphan(&self, orphan: W) {
158            self.all_enqueued.borrow_mut().push(orphan);
159        }
160    }
161
162    struct MockWait {
163        total_waits: Rc<Cell<usize>>,
164        num_wait_until_status: usize,
165        return_err: bool,
166    }
167
168    impl MockWait {
169        fn new(num_wait_until_status: usize) -> Self {
170            Self {
171                total_waits: Rc::new(Cell::new(0)),
172                num_wait_until_status,
173                return_err: false,
174            }
175        }
176
177        fn with_err() -> Self {
178            Self {
179                total_waits: Rc::new(Cell::new(0)),
180                num_wait_until_status: 0,
181                return_err: true,
182            }
183        }
184    }
185
186    impl Wait for MockWait {
187        fn id(&self) -> u32 {
188            42
189        }
190
191        fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
192            let waits = self.total_waits.get();
193
194            let ret = if self.num_wait_until_status == waits {
195                if self.return_err {
196                    Ok(Some(ExitStatus::from_raw(0)))
197                } else {
198                    Err(io::Error::new(io::ErrorKind::Other, "mock err"))
199                }
200            } else {
201                Ok(None)
202            };
203
204            self.total_waits.set(waits + 1);
205            ret
206        }
207    }
208
209    #[test]
210    fn drain_attempts_a_single_reap_of_all_queued_orphans() {
211        let first_orphan = MockWait::new(0);
212        let second_orphan = MockWait::new(1);
213        let third_orphan = MockWait::new(2);
214        let fourth_orphan = MockWait::with_err();
215
216        let first_waits = first_orphan.total_waits.clone();
217        let second_waits = second_orphan.total_waits.clone();
218        let third_waits = third_orphan.total_waits.clone();
219        let fourth_waits = fourth_orphan.total_waits.clone();
220
221        let orphanage = OrphanQueueImpl::new();
222        orphanage.push_orphan(first_orphan);
223        orphanage.push_orphan(third_orphan);
224        orphanage.push_orphan(second_orphan);
225        orphanage.push_orphan(fourth_orphan);
226
227        assert_eq!(orphanage.len(), 4);
228
229        drain_orphan_queue(orphanage.queue.lock());
230        assert_eq!(orphanage.len(), 2);
231        assert_eq!(first_waits.get(), 1);
232        assert_eq!(second_waits.get(), 1);
233        assert_eq!(third_waits.get(), 1);
234        assert_eq!(fourth_waits.get(), 1);
235
236        drain_orphan_queue(orphanage.queue.lock());
237        assert_eq!(orphanage.len(), 1);
238        assert_eq!(first_waits.get(), 1);
239        assert_eq!(second_waits.get(), 2);
240        assert_eq!(third_waits.get(), 2);
241        assert_eq!(fourth_waits.get(), 1);
242
243        drain_orphan_queue(orphanage.queue.lock());
244        assert_eq!(orphanage.len(), 0);
245        assert_eq!(first_waits.get(), 1);
246        assert_eq!(second_waits.get(), 2);
247        assert_eq!(third_waits.get(), 3);
248        assert_eq!(fourth_waits.get(), 1);
249
250        // Safe to reap when empty
251        drain_orphan_queue(orphanage.queue.lock());
252    }
253
254    #[test]
255    fn no_reap_if_no_signal_received() {
256        let (tx, rx) = watch::channel(());
257
258        let handle = SignalHandle::default();
259
260        let orphanage = OrphanQueueImpl::new();
261        *orphanage.sigchild.lock() = Some(rx);
262
263        let orphan = MockWait::new(2);
264        let waits = orphan.total_waits.clone();
265        orphanage.push_orphan(orphan);
266
267        orphanage.reap_orphans(&handle);
268        assert_eq!(waits.get(), 0);
269
270        orphanage.reap_orphans(&handle);
271        assert_eq!(waits.get(), 0);
272
273        tx.send(()).unwrap();
274        orphanage.reap_orphans(&handle);
275        assert_eq!(waits.get(), 1);
276    }
277
278    #[test]
279    fn no_reap_if_signal_lock_held() {
280        let handle = SignalHandle::default();
281
282        let orphanage = OrphanQueueImpl::new();
283        let signal_guard = orphanage.sigchild.lock();
284
285        let orphan = MockWait::new(2);
286        let waits = orphan.total_waits.clone();
287        orphanage.push_orphan(orphan);
288
289        orphanage.reap_orphans(&handle);
290        assert_eq!(waits.get(), 0);
291
292        drop(signal_guard);
293    }
294
295    #[cfg_attr(miri, ignore)] // Miri does not support epoll.
296    #[test]
297    fn does_not_register_signal_if_queue_empty() {
298        let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
299        let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
300        let handle = signal_driver.handle();
301
302        let orphanage = OrphanQueueImpl::new();
303        assert!(orphanage.sigchild.lock().is_none()); // Sanity
304
305        // No register when queue empty
306        orphanage.reap_orphans(&handle);
307        assert!(orphanage.sigchild.lock().is_none());
308
309        let orphan = MockWait::new(2);
310        let waits = orphan.total_waits.clone();
311        orphanage.push_orphan(orphan);
312
313        orphanage.reap_orphans(&handle);
314        assert!(orphanage.sigchild.lock().is_some());
315        assert_eq!(waits.get(), 1); // Eager reap when registering listener
316    }
317
318    #[test]
319    fn does_nothing_if_signal_could_not_be_registered() {
320        let handle = SignalHandle::default();
321
322        let orphanage = OrphanQueueImpl::new();
323        assert!(orphanage.sigchild.lock().is_none());
324
325        let orphan = MockWait::new(2);
326        let waits = orphan.total_waits.clone();
327        orphanage.push_orphan(orphan);
328
329        // Signal handler has "gone away", nothing to register or reap
330        orphanage.reap_orphans(&handle);
331        assert!(orphanage.sigchild.lock().is_none());
332        assert_eq!(waits.get(), 0);
333    }
334}