wait_timeout/unix.rs
1//! Unix implementation of waiting for children with timeouts
2//!
3//! On unix, wait() and its friends have no timeout parameters, so there is
4//! no way to time out a thread in wait(). From some googling and some
5//! thinking, it appears that there are a few ways to handle timeouts in
6//! wait(), but the only real reasonable one for a multi-threaded program is
7//! to listen for SIGCHLD.
8//!
9//! With this in mind, the waiting mechanism with a timeout only uses
10//! waitpid() with WNOHANG, but otherwise all the necessary blocking is done by
11//! waiting for a SIGCHLD to arrive (and that blocking has a timeout). Note,
12//! however, that waitpid() is still used to actually reap the child.
13//!
14//! Signal handling is super tricky in general, and this is no exception. Due
15//! to the async nature of SIGCHLD, we use the self-pipe trick to transmit
16//! data out of the signal handler to the rest of the application.
17
18#![allow(bad_style)]
19
20use std::cmp;
21use std::collections::HashMap;
22use std::io::{self, Write, Read};
23use std::os::unix::net::UnixStream;
24use std::mem;
25use std::os::unix::prelude::*;
26use std::process::{Child, ExitStatus};
27use std::sync::{Once, ONCE_INIT, Mutex};
28use std::time::{Duration, Instant};
29
30use libc::{self, c_int};
31
32static INIT: Once = ONCE_INIT;
33static mut STATE: *mut State = 0 as *mut _;
34
35struct State {
36 prev: libc::sigaction,
37 write: UnixStream,
38 read: UnixStream,
39 map: Mutex<StateMap>,
40}
41
42type StateMap = HashMap<*mut Child, (UnixStream, Option<ExitStatus>)>;
43
44pub fn wait_timeout(child: &mut Child, dur: Duration)
45 -> io::Result<Option<ExitStatus>> {
46 INIT.call_once(State::init);
47 unsafe {
48 (*STATE).wait_timeout(child, dur)
49 }
50}
51
52// Do $value as type_of($target)
53macro_rules! _as {
54 ($value:expr, $target:expr) => (
55 {
56 let mut x = $target;
57 x = $value as _;
58 x
59 }
60 )
61}
62
63impl State {
64 #[allow(unused_assignments)]
65 fn init() {
66 unsafe {
67 // Create our "self pipe" and then set both ends to nonblocking
68 // mode.
69 let (read, write) = UnixStream::pair().unwrap();
70 read.set_nonblocking(true).unwrap();
71 write.set_nonblocking(true).unwrap();
72
73 let mut state = Box::new(State {
74 prev: mem::zeroed(),
75 write: write,
76 read: read,
77 map: Mutex::new(HashMap::new()),
78 });
79
80 // Register our sigchld handler
81 let mut new: libc::sigaction = mem::zeroed();
82 new.sa_sigaction = sigchld_handler as usize;
83
84 // FIXME: remove this workaround when the PR to libc get merged and released
85 //
86 // This is a workaround for the type mismatch in the definition of SA_*
87 // constants for android. See https://github.com/rust-lang/libc/pull/511
88 //
89 let sa_flags = new.sa_flags;
90 new.sa_flags = _as!(libc::SA_NOCLDSTOP, sa_flags) |
91 _as!(libc::SA_RESTART, sa_flags) |
92 _as!(libc::SA_SIGINFO, sa_flags);
93
94 assert_eq!(libc::sigaction(libc::SIGCHLD, &new, &mut state.prev), 0);
95
96 STATE = mem::transmute(state);
97 }
98 }
99
100 fn wait_timeout(&self, child: &mut Child, dur: Duration)
101 -> io::Result<Option<ExitStatus>> {
102 // First up, prep our notification pipe which will tell us when our
103 // child has been reaped (other threads may signal this pipe).
104 let (read, write) = UnixStream::pair()?;
105 read.set_nonblocking(true)?;
106 write.set_nonblocking(true)?;
107
108 // Next, take a lock on the map of children currently waiting. Right
109 // after this, **before** we add ourselves to the map, we check to see
110 // if our child has actually already exited via a `try_wait`. If the
111 // child has exited then we return immediately as we'll never otherwise
112 // receive a SIGCHLD notification.
113 //
114 // If the wait reports the child is still running, however, we add
115 // ourselves to the map and then block in `select` waiting for something
116 // to happen.
117 let mut map = self.map.lock().unwrap();
118 if let Some(status) = child.try_wait()? {
119 return Ok(Some(status))
120 }
121 assert!(map.insert(child, (write, None)).is_none());
122 drop(map);
123
124 // Make sure that no matter what when we exit our pointer is removed
125 // from the map.
126 struct Remove<'a> {
127 state: &'a State,
128 child: &'a mut Child,
129 }
130 impl<'a> Drop for Remove<'a> {
131 fn drop(&mut self) {
132 let mut map = self.state.map.lock().unwrap();
133 drop(map.remove(&(self.child as *mut Child)));
134 }
135 }
136 let remove = Remove { state: self, child };
137
138
139 // Alright, we're guaranteed that we'll eventually get a SIGCHLD due
140 // to our `try_wait` failing, and we're also guaranteed that we'll
141 // get notified about this because we're in the map. Next up wait
142 // for an event.
143 //
144 // Note that this happens in a loop for two reasons; we could
145 // receive EINTR or we could pick up a SIGCHLD for other threads but not
146 // actually be ready oureslves.
147 let start = Instant::now();
148 let mut fds = [
149 libc::pollfd {
150 fd: self.read.as_raw_fd(),
151 events: libc::POLLIN,
152
153 revents: 0,
154 },
155 libc::pollfd {
156 fd: read.as_raw_fd(),
157 events: libc::POLLIN,
158 revents: 0,
159 },
160 ];
161 loop {
162 let elapsed = start.elapsed();
163 if elapsed >= dur {
164 break
165 }
166 let timeout = dur - elapsed;
167 let timeout = timeout.as_secs().checked_mul(1_000)
168 .and_then(|amt| {
169 amt.checked_add(timeout.subsec_nanos() as u64 / 1_000_000)
170 })
171 .unwrap_or(u64::max_value());
172 let timeout = cmp::min(<c_int>::max_value() as u64, timeout) as c_int;
173 let r = unsafe {
174 libc::poll(fds.as_mut_ptr(), 2, timeout)
175 };
176 let timeout = match r {
177 0 => true,
178 n if n > 0 => false,
179 n => {
180 let err = io::Error::last_os_error();
181 if err.kind() == io::ErrorKind::Interrupted {
182 continue
183 } else {
184 panic!("error in select = {}: {}", n, err)
185 }
186 }
187 };
188
189 // Now that something has happened, we need to process what actually
190 // happened. There's are three reasons we could have woken up:
191 //
192 // 1. The file descriptor in our SIGCHLD handler was written to.
193 // This means that a SIGCHLD was received and we need to poll the
194 // entire list of waiting processes to figure out which ones
195 // actually exited.
196 // 2. Our file descriptor was written to. This means that another
197 // thread reaped our child and listed the exit status in the
198 // local map.
199 // 3. We timed out. This means we need to remove ourselves from the
200 // map and simply carry on.
201 //
202 // In the case that a SIGCHLD signal was received, we do that
203 // processing and keep going. If our fd was written to or a timeout
204 // was received then we break out of the loop and return from this
205 // call.
206 let mut map = self.map.lock().unwrap();
207 if drain(&self.read) {
208 self.process_sigchlds(&mut map);
209 }
210
211 if drain(&read) || timeout {
212 break
213 }
214 }
215
216 let mut map = self.map.lock().unwrap();
217 let (_write, ret) = map.remove(&(remove.child as *mut Child)).unwrap();
218 drop(map);
219 Ok(ret)
220 }
221
222 fn process_sigchlds(&self, map: &mut StateMap) {
223 for (&k, &mut (ref write, ref mut status)) in map {
224 // Already reaped, nothing to do here
225 if status.is_some() {
226 continue
227 }
228
229 *status = unsafe { (*k).try_wait().unwrap() };
230 if status.is_some() {
231 notify(write);
232 }
233 }
234 }
235}
236
237fn drain(mut file: &UnixStream) -> bool {
238 let mut ret = false;
239 let mut buf = [0u8; 16];
240 loop {
241 match file.read(&mut buf) {
242 Ok(0) => return true, // EOF == something happened
243 Ok(..) => ret = true, // data read, but keep draining
244 Err(e) => {
245 if e.kind() == io::ErrorKind::WouldBlock {
246 return ret
247 } else {
248 panic!("bad read: {}", e)
249 }
250 }
251 }
252 }
253}
254
255fn notify(mut file: &UnixStream) {
256 match file.write(&[1]) {
257 Ok(..) => {}
258 Err(e) => {
259 if e.kind() != io::ErrorKind::WouldBlock {
260 panic!("bad error on write fd: {}", e)
261 }
262 }
263 }
264}
265
266// Signal handler for SIGCHLD signals, must be async-signal-safe!
267//
268// This function will write to the writing half of the "self pipe" to wake
269// up the helper thread if it's waiting. Note that this write must be
270// nonblocking because if it blocks and the reader is the thread we
271// interrupted, then we'll deadlock.
272//
273// When writing, if the write returns EWOULDBLOCK then we choose to ignore
274// it. At that point we're guaranteed that there's something in the pipe
275// which will wake up the other end at some point, so we just allow this
276// signal to be coalesced with the pending signals on the pipe.
277#[allow(unused_assignments)]
278extern fn sigchld_handler(signum: c_int,
279 info: *mut libc::siginfo_t,
280 ptr: *mut libc::c_void) {
281 type FnSigaction = extern fn(c_int, *mut libc::siginfo_t, *mut libc::c_void);
282 type FnHandler = extern fn(c_int);
283
284 unsafe {
285 let state = &*STATE;
286 notify(&state.write);
287
288 let fnptr = state.prev.sa_sigaction;
289 if fnptr == 0 {
290 return
291 }
292 // FIXME: remove this workaround when the PR to libc get merged and released
293 //
294 // This is a workaround for the type mismatch in the definition of SA_*
295 // constants for android. See https://github.com/rust-lang/libc/pull/511
296 //
297 if state.prev.sa_flags & _as!(libc::SA_SIGINFO, state.prev.sa_flags) == 0 {
298 let action = mem::transmute::<usize, FnHandler>(fnptr);
299 action(signum)
300 } else {
301 let action = mem::transmute::<usize, FnSigaction>(fnptr);
302 action(signum, info, ptr)
303 }
304 }
305}