tokio/process/unix/
pidfd_reaper.rs

1use crate::{
2    io::{interest::Interest, PollEvented},
3    process::{
4        imp::{orphan::Wait, OrphanQueue},
5        kill::Kill,
6    },
7    util::error::RUNTIME_SHUTTING_DOWN_ERROR,
8};
9
10use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK};
11use mio::{event::Source, unix::SourceFd};
12use std::{
13    fs::File,
14    future::Future,
15    io,
16    marker::Unpin,
17    ops::Deref,
18    os::unix::io::{AsRawFd, FromRawFd, RawFd},
19    pin::Pin,
20    process::ExitStatus,
21    sync::atomic::{AtomicBool, Ordering::Relaxed},
22    task::{ready, Context, Poll},
23};
24
25#[derive(Debug)]
26struct Pidfd {
27    fd: File,
28}
29
30impl Pidfd {
31    fn open(pid: u32) -> Option<Pidfd> {
32        // Store false (0) to reduce executable size
33        static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false);
34
35        if NO_PIDFD_SUPPORT.load(Relaxed) {
36            return None;
37        }
38
39        // Safety: The following function calls invovkes syscall pidfd_open,
40        // which takes two parameter: pidfd_open(fd: c_int, flag: c_int)
41        let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) };
42        if fd == -1 {
43            let errno = io::Error::last_os_error().raw_os_error().unwrap();
44
45            if errno == ENOSYS {
46                NO_PIDFD_SUPPORT.store(true, Relaxed)
47            }
48
49            None
50        } else {
51            // Safety: pidfd_open returns -1 on error or a valid fd with ownership.
52            Some(Pidfd {
53                fd: unsafe { File::from_raw_fd(fd as i32) },
54            })
55        }
56    }
57}
58
59impl AsRawFd for Pidfd {
60    fn as_raw_fd(&self) -> RawFd {
61        self.fd.as_raw_fd()
62    }
63}
64
65impl Source for Pidfd {
66    fn register(
67        &mut self,
68        registry: &mio::Registry,
69        token: mio::Token,
70        interest: mio::Interest,
71    ) -> io::Result<()> {
72        SourceFd(&self.as_raw_fd()).register(registry, token, interest)
73    }
74
75    fn reregister(
76        &mut self,
77        registry: &mio::Registry,
78        token: mio::Token,
79        interest: mio::Interest,
80    ) -> io::Result<()> {
81        SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
82    }
83
84    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
85        SourceFd(&self.as_raw_fd()).deregister(registry)
86    }
87}
88
89#[derive(Debug)]
90struct PidfdReaperInner<W>
91where
92    W: Unpin,
93{
94    inner: W,
95    pidfd: PollEvented<Pidfd>,
96}
97
98#[allow(deprecated)]
99fn is_rt_shutdown_err(err: &io::Error) -> bool {
100    if let Some(inner) = err.get_ref() {
101        // Using `Error::description()` is more efficient than `format!("{inner}")`,
102        // so we use it here even if it is deprecated.
103        err.kind() == io::ErrorKind::Other
104            && inner.source().is_none()
105            && inner.description() == RUNTIME_SHUTTING_DOWN_ERROR
106    } else {
107        false
108    }
109}
110
111impl<W> Future for PidfdReaperInner<W>
112where
113    W: Wait + Unpin,
114{
115    type Output = io::Result<ExitStatus>;
116
117    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118        let this = Pin::into_inner(self);
119
120        match ready!(this.pidfd.poll_read_ready(cx)) {
121            Err(err) if is_rt_shutdown_err(&err) => {
122                this.pidfd.reregister(Interest::READABLE)?;
123                ready!(this.pidfd.poll_read_ready(cx))?
124            }
125            res => res?,
126        }
127        Poll::Ready(Ok(this
128            .inner
129            .try_wait()?
130            .expect("pidfd is ready to read, the process should have exited")))
131    }
132}
133
134#[derive(Debug)]
135pub(crate) struct PidfdReaper<W, Q>
136where
137    W: Wait + Unpin,
138    Q: OrphanQueue<W> + Unpin,
139{
140    inner: Option<PidfdReaperInner<W>>,
141    orphan_queue: Q,
142}
143
144impl<W, Q> Deref for PidfdReaper<W, Q>
145where
146    W: Wait + Unpin,
147    Q: OrphanQueue<W> + Unpin,
148{
149    type Target = W;
150
151    fn deref(&self) -> &Self::Target {
152        &self.inner.as_ref().expect("inner has gone away").inner
153    }
154}
155
156impl<W, Q> PidfdReaper<W, Q>
157where
158    W: Wait + Unpin,
159    Q: OrphanQueue<W> + Unpin,
160{
161    pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> {
162        if let Some(pidfd) = Pidfd::open(inner.id()) {
163            match PollEvented::new_with_interest(pidfd, Interest::READABLE) {
164                Ok(pidfd) => Ok(Self {
165                    inner: Some(PidfdReaperInner { pidfd, inner }),
166                    orphan_queue,
167                }),
168                Err(io_error) => Err((Some(io_error), inner)),
169            }
170        } else {
171            Err((None, inner))
172        }
173    }
174
175    pub(crate) fn inner_mut(&mut self) -> &mut W {
176        &mut self.inner.as_mut().expect("inner has gone away").inner
177    }
178}
179
180impl<W, Q> Future for PidfdReaper<W, Q>
181where
182    W: Wait + Unpin,
183    Q: OrphanQueue<W> + Unpin,
184{
185    type Output = io::Result<ExitStatus>;
186
187    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        Pin::new(
189            Pin::into_inner(self)
190                .inner
191                .as_mut()
192                .expect("inner has gone away"),
193        )
194        .poll(cx)
195    }
196}
197
198impl<W, Q> Kill for PidfdReaper<W, Q>
199where
200    W: Wait + Unpin + Kill,
201    Q: OrphanQueue<W> + Unpin,
202{
203    fn kill(&mut self) -> io::Result<()> {
204        self.inner_mut().kill()
205    }
206}
207
208impl<W, Q> Drop for PidfdReaper<W, Q>
209where
210    W: Wait + Unpin,
211    Q: OrphanQueue<W> + Unpin,
212{
213    fn drop(&mut self) {
214        let mut orphan = self.inner.take().expect("inner has gone away").inner;
215        if let Ok(Some(_)) = orphan.try_wait() {
216            return;
217        }
218
219        self.orphan_queue.push_orphan(orphan);
220    }
221}
222
223#[cfg(all(test, not(loom), not(miri)))]
224mod test {
225    use super::*;
226    use crate::{
227        process::unix::orphan::test::MockQueue,
228        runtime::{Builder as RuntimeBuilder, Runtime},
229    };
230    use std::process::{Command, Output};
231
232    fn create_runtime() -> Runtime {
233        RuntimeBuilder::new_current_thread()
234            .enable_io()
235            .build()
236            .unwrap()
237    }
238
239    fn run_test(fut: impl Future<Output = ()>) {
240        create_runtime().block_on(fut)
241    }
242
243    fn is_pidfd_available() -> bool {
244        let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap();
245        assert!(status.success());
246        let stdout = String::from_utf8_lossy(&stdout);
247
248        let mut kernel_version_iter = match stdout.split_once('-') {
249            Some((version, _)) => version,
250            _ => &stdout,
251        }
252        .split('.');
253
254        let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap();
255        let minor: u32 = kernel_version_iter.next().unwrap().parse().unwrap();
256
257        major >= 6 || (major == 5 && minor >= 10)
258    }
259
260    #[test]
261    fn test_pidfd_reaper_poll() {
262        if !is_pidfd_available() {
263            eprintln!("pidfd is not available on this linux kernel, skip this test");
264            return;
265        }
266
267        let queue = MockQueue::new();
268
269        run_test(async {
270            let child = Command::new("true").spawn().unwrap();
271            let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
272
273            let exit_status = pidfd_reaper.await.unwrap();
274            assert!(exit_status.success());
275        });
276
277        assert!(queue.all_enqueued.borrow().is_empty());
278    }
279
280    #[test]
281    fn test_pidfd_reaper_kill() {
282        if !is_pidfd_available() {
283            eprintln!("pidfd is not available on this linux kernel, skip this test");
284            return;
285        }
286
287        let queue = MockQueue::new();
288
289        run_test(async {
290            let child = Command::new("sleep").arg("1800").spawn().unwrap();
291            let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
292
293            pidfd_reaper.kill().unwrap();
294
295            let exit_status = pidfd_reaper.await.unwrap();
296            assert!(!exit_status.success());
297        });
298
299        assert!(queue.all_enqueued.borrow().is_empty());
300    }
301
302    #[test]
303    fn test_pidfd_reaper_drop() {
304        if !is_pidfd_available() {
305            eprintln!("pidfd is not available on this linux kernel, skip this test");
306            return;
307        }
308
309        let queue = MockQueue::new();
310
311        let mut child = Command::new("sleep").arg("1800").spawn().unwrap();
312
313        run_test(async {
314            let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap();
315        });
316
317        assert_eq!(queue.all_enqueued.borrow().len(), 1);
318
319        child.kill().unwrap();
320        child.wait().unwrap();
321    }
322}