wait_timeout/
unix.rs

1//! Unix implementation of waiting for children with timeouts
2//!
3//! On unix, wait() and its friends have no timeout parameters, so there is
4//! no way to time out a thread in wait(). From some googling and some
5//! thinking, it appears that there are a few ways to handle timeouts in
6//! wait(), but the only real reasonable one for a multi-threaded program is
7//! to listen for SIGCHLD.
8//!
9//! With this in mind, the waiting mechanism with a timeout only uses
10//! waitpid() with WNOHANG, but otherwise all the necessary blocking is done by
11//! waiting for a SIGCHLD to arrive (and that blocking has a timeout). Note,
12//! however, that waitpid() is still used to actually reap the child.
13//!
14//! Signal handling is super tricky in general, and this is no exception. Due
15//! to the async nature of SIGCHLD, we use the self-pipe trick to transmit
16//! data out of the signal handler to the rest of the application.
17
18#![allow(bad_style)]
19
20use std::cmp;
21use std::collections::HashMap;
22use std::io::{self, Write, Read};
23use std::os::unix::net::UnixStream;
24use std::mem;
25use std::os::unix::prelude::*;
26use std::process::{Child, ExitStatus};
27use std::sync::{Once, ONCE_INIT, Mutex};
28use std::time::{Duration, Instant};
29
30use libc::{self, c_int};
31
32static INIT: Once = ONCE_INIT;
33static mut STATE: *mut State = 0 as *mut _;
34
35struct State {
36    prev: libc::sigaction,
37    write: UnixStream,
38    read: UnixStream,
39    map: Mutex<StateMap>,
40}
41
42type StateMap = HashMap<*mut Child, (UnixStream, Option<ExitStatus>)>;
43
44pub fn wait_timeout(child: &mut Child, dur: Duration)
45                    -> io::Result<Option<ExitStatus>> {
46    INIT.call_once(State::init);
47    unsafe {
48        (*STATE).wait_timeout(child, dur)
49    }
50}
51
52// Do $value as type_of($target)
53macro_rules! _as {
54    ($value:expr, $target:expr) => (
55        {
56            let mut x = $target;
57            x = $value as _;
58            x
59        }
60    )
61}
62
63impl State {
64    #[allow(unused_assignments)]
65    fn init() {
66        unsafe {
67            // Create our "self pipe" and then set both ends to nonblocking
68            // mode.
69            let (read, write) = UnixStream::pair().unwrap();
70            read.set_nonblocking(true).unwrap();
71            write.set_nonblocking(true).unwrap();
72
73            let mut state = Box::new(State {
74                prev: mem::zeroed(),
75                write: write,
76                read: read,
77                map: Mutex::new(HashMap::new()),
78            });
79
80            // Register our sigchld handler
81            let mut new: libc::sigaction = mem::zeroed();
82            new.sa_sigaction = sigchld_handler as usize;
83
84            // FIXME: remove this workaround when the PR to libc get merged and released
85            //
86            // This is a workaround for the type mismatch in the definition of SA_*
87            // constants for android. See https://github.com/rust-lang/libc/pull/511
88            //
89            let sa_flags = new.sa_flags;
90            new.sa_flags = _as!(libc::SA_NOCLDSTOP, sa_flags) |
91                           _as!(libc::SA_RESTART, sa_flags) |
92                           _as!(libc::SA_SIGINFO, sa_flags);
93
94            assert_eq!(libc::sigaction(libc::SIGCHLD, &new, &mut state.prev), 0);
95
96            STATE = mem::transmute(state);
97        }
98    }
99
100    fn wait_timeout(&self, child: &mut Child, dur: Duration)
101                       -> io::Result<Option<ExitStatus>> {
102        // First up, prep our notification pipe which will tell us when our
103        // child has been reaped (other threads may signal this pipe).
104        let (read, write) = UnixStream::pair()?;
105        read.set_nonblocking(true)?;
106        write.set_nonblocking(true)?;
107
108        // Next, take a lock on the map of children currently waiting. Right
109        // after this, **before** we add ourselves to the map, we check to see
110        // if our child has actually already exited via a `try_wait`. If the
111        // child has exited then we return immediately as we'll never otherwise
112        // receive a SIGCHLD notification.
113        //
114        // If the wait reports the child is still running, however, we add
115        // ourselves to the map and then block in `select` waiting for something
116        // to happen.
117        let mut map = self.map.lock().unwrap();
118        if let Some(status) = child.try_wait()? {
119            return Ok(Some(status))
120        }
121        assert!(map.insert(child, (write, None)).is_none());
122        drop(map);
123
124        // Make sure that no matter what when we exit our pointer is removed
125        // from the map.
126        struct Remove<'a> {
127            state: &'a State,
128            child: &'a mut Child,
129        }
130        impl<'a> Drop for Remove<'a> {
131            fn drop(&mut self) {
132                let mut map = self.state.map.lock().unwrap();
133                drop(map.remove(&(self.child as *mut Child)));
134            }
135        }
136        let remove = Remove { state: self, child };
137
138
139        // Alright, we're guaranteed that we'll eventually get a SIGCHLD due
140        // to our `try_wait` failing, and we're also guaranteed that we'll
141        // get notified about this because we're in the map. Next up wait
142        // for an event.
143        //
144        // Note that this happens in a loop for two reasons; we could
145        // receive EINTR or we could pick up a SIGCHLD for other threads but not
146        // actually be ready oureslves.
147        let start = Instant::now();
148        let mut fds = [
149            libc::pollfd {
150                fd: self.read.as_raw_fd(),
151                events: libc::POLLIN,
152
153                revents: 0,
154            },
155            libc::pollfd {
156                fd: read.as_raw_fd(),
157                events: libc::POLLIN,
158                revents: 0,
159            },
160        ];
161        loop {
162            let elapsed = start.elapsed();
163            if elapsed >= dur {
164                break
165            }
166            let timeout = dur - elapsed;
167            let timeout = timeout.as_secs().checked_mul(1_000)
168                .and_then(|amt| {
169                    amt.checked_add(timeout.subsec_nanos() as u64 / 1_000_000)
170                })
171                .unwrap_or(u64::max_value());
172            let timeout = cmp::min(<c_int>::max_value() as u64, timeout) as c_int;
173            let r = unsafe {
174                libc::poll(fds.as_mut_ptr(), 2, timeout)
175            };
176            let timeout = match r {
177                0 => true,
178                n if n > 0 => false,
179                n => {
180                    let err = io::Error::last_os_error();
181                    if err.kind() == io::ErrorKind::Interrupted {
182                        continue
183                    } else {
184                        panic!("error in select = {}: {}", n, err)
185                    }
186                }
187            };
188
189            // Now that something has happened, we need to process what actually
190            // happened. There's are three reasons we could have woken up:
191            //
192            // 1. The file descriptor in our SIGCHLD handler was written to.
193            //    This means that a SIGCHLD was received and we need to poll the
194            //    entire list of waiting processes to figure out which ones
195            //    actually exited.
196            // 2. Our file descriptor was written to. This means that another
197            //    thread reaped our child and listed the exit status in the
198            //    local map.
199            // 3. We timed out. This means we need to remove ourselves from the
200            //    map and simply carry on.
201            //
202            // In the case that a SIGCHLD signal was received, we do that
203            // processing and keep going. If our fd was written to or a timeout
204            // was received then we break out of the loop and return from this
205            // call.
206            let mut map = self.map.lock().unwrap();
207            if drain(&self.read) {
208                self.process_sigchlds(&mut map);
209            }
210
211            if drain(&read) || timeout {
212                break
213            }
214        }
215
216        let mut map = self.map.lock().unwrap();
217        let (_write, ret) = map.remove(&(remove.child as *mut Child)).unwrap();
218        drop(map);
219        Ok(ret)
220    }
221
222    fn process_sigchlds(&self, map: &mut StateMap) {
223        for (&k, &mut (ref write, ref mut status)) in map {
224            // Already reaped, nothing to do here
225            if status.is_some() {
226                continue
227            }
228
229            *status = unsafe { (*k).try_wait().unwrap() };
230            if status.is_some() {
231                notify(write);
232            }
233        }
234    }
235}
236
237fn drain(mut file: &UnixStream) -> bool {
238    let mut ret = false;
239    let mut buf = [0u8; 16];
240    loop {
241        match file.read(&mut buf) {
242            Ok(0) => return true, // EOF == something happened
243            Ok(..) => ret = true, // data read, but keep draining
244            Err(e) => {
245                if e.kind() == io::ErrorKind::WouldBlock {
246                    return ret
247                } else {
248                    panic!("bad read: {}", e)
249                }
250            }
251        }
252    }
253}
254
255fn notify(mut file: &UnixStream) {
256    match file.write(&[1]) {
257        Ok(..) => {}
258        Err(e) => {
259            if e.kind() != io::ErrorKind::WouldBlock {
260                panic!("bad error on write fd: {}", e)
261            }
262        }
263    }
264}
265
266// Signal handler for SIGCHLD signals, must be async-signal-safe!
267//
268// This function will write to the writing half of the "self pipe" to wake
269// up the helper thread if it's waiting. Note that this write must be
270// nonblocking because if it blocks and the reader is the thread we
271// interrupted, then we'll deadlock.
272//
273// When writing, if the write returns EWOULDBLOCK then we choose to ignore
274// it. At that point we're guaranteed that there's something in the pipe
275// which will wake up the other end at some point, so we just allow this
276// signal to be coalesced with the pending signals on the pipe.
277#[allow(unused_assignments)]
278extern fn sigchld_handler(signum: c_int,
279                          info: *mut libc::siginfo_t,
280                          ptr: *mut libc::c_void) {
281    type FnSigaction = extern fn(c_int, *mut libc::siginfo_t, *mut libc::c_void);
282    type FnHandler = extern fn(c_int);
283
284    unsafe {
285        let state = &*STATE;
286        notify(&state.write);
287
288        let fnptr = state.prev.sa_sigaction;
289        if fnptr == 0 {
290            return
291        }
292        // FIXME: remove this workaround when the PR to libc get merged and released
293        //
294        // This is a workaround for the type mismatch in the definition of SA_*
295        // constants for android. See https://github.com/rust-lang/libc/pull/511
296        //
297        if state.prev.sa_flags & _as!(libc::SA_SIGINFO, state.prev.sa_flags) == 0 {
298            let action = mem::transmute::<usize, FnHandler>(fnptr);
299            action(signum)
300        } else {
301            let action = mem::transmute::<usize, FnSigaction>(fnptr);
302            action(signum, info, ptr)
303        }
304    }
305}