diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php index 186e379..0f9f3ba 100644 --- a/src/future/exec/ExecFuture.php +++ b/src/future/exec/ExecFuture.php @@ -1,674 +1,686 @@ array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w'), // stderr ); /* -( Creating ExecFutures )----------------------------------------------- */ /** * Create a new ExecFuture. * * $future = new ExecFuture('wc -l %s', $file_path); * * @param string ##sprintf()##-style command string which will be passed * through @{function:csprintf} with the rest of the arguments. * @param ... Zero or more additional arguments for @{function:csprintf}. * @return ExecFuture ExecFuture for running the specified command. * @task create */ public function __construct($command) { $argv = func_get_args(); $this->command = call_user_func_array('csprintf', $argv); } /* -( Command Information )------------------------------------------------ */ /** * Retrieve the raw command to be executed. * * @return string Raw command. * @task info */ public function getCommand() { return $this->command; } /** * Retrieve the byte limit for the stderr buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStderrSizeLimit() { return $this->stderrSizeLimit; } /** * Retrieve the byte limit for the stdout buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStdoutSizeLimit() { return $this->stdoutSizeLimit; } /** * Get the process's pid. This only works after execution is initiated, e.g. * by a call to start(). * * @return int Process ID of the executing process. * @task info */ public function getPID() { $status = $this->procGetStatus(); return $status['pid']; } /* -( Configuring Execution )---------------------------------------------- */ /** * Set a maximum size for the stdout read buffer. To limit stderr, see * @{method:setStderrSizeLimit}. The major use of these methods is to use less * memory if you are running a command which sometimes produces huge volumes * of output that you don't really care about. * * NOTE: Setting this to 0 means "no buffer", not "unlimited buffer". * * @param int Maximum size of the stdout read buffer. * @return this * @task config */ public function setStdoutSizeLimit($limit) { $this->stdoutSizeLimit = $limit; return $this; } /** * Set a maximum size for the stderr read buffer. * See @{method:setStdoutSizeLimit} for discussion. * * @param int Maximum size of the stderr read buffer. * @return this * @task config */ public function setStderrSizeLimit($limit) { $this->stderrSizeLimit = $limit; return $this; } /** * Set the current working directory to use when executing the command. * * @param string Directory to set as CWD before executing the command. * @return this * @task config */ public function setCWD($cwd) { $this->cwd = $cwd; return $this; } /* -( Interacting With Commands )------------------------------------------ */ /** * Read and return output from stdout and stderr, if any is available. This * method keeps a read cursor on each stream, but the entire streams are * still returned when the future resolves. You can call read() again after * resolving the future to retrieve only the parts of the streams you did not * previously read: * * $future = new ExecFuture('...'); * // ... * list($stdout) = $future->read(); // Returns output so far * list($stdout) = $future->read(); // Returns new output since first call * // ... * list($stdout) = $future->resolvex(); // Returns ALL output * list($stdout) = $future->read(); // Returns unread output * * NOTE: If you set a limit with @{method:setStdoutSizeLimit} or * @{method:setStderrSizeLimit}, this method will not be able to read data * past the limit. * * NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data * will be thrown away and the cursors will be reset. * * @return pair <$stdout, $stderr> pair with new output since the last call * to this method. * @task interact */ public function read() { if ($this->start) { $this->isReady(); // Sync } $result = array( (string)substr($this->stdout, $this->stdoutPos), (string)substr($this->stderr, $this->stderrPos), ); $this->stdoutPos = strlen($this->stdout); $this->stderrPos = strlen($this->stderr); return $result; } /** * Write data to stdin of the command. * * @param string Data to write. * @param bool If true, keep the pipe open for writing. By default, the pipe * will be closed as soon as possible so that commands which * listen for EOF will execute. If you want to keep the pipe open * past the start of command execution, do an empty write with * `$keep_pipe = true` first. * @return this * @task interact */ public function write($data, $keep_pipe = false) { $this->stdin .= $data; $this->closePipe = !$keep_pipe; return $this; } /** * Permanently discard the stdout and stderr buffers and reset the read * cursors. This is basically useful only if you are streaming a large amount * of data from some process: * * $future = new ExecFuture('zcat huge_file.gz'); * do { * $done = $future->resolve(0.1); // Every 100ms, * list($stdout) = $future->read(); // read output... * echo $stdout; // send it somewhere... * $future->discardBuffers(); // and then free the buffers. * } while ($done === null); * * Conceivably you might also need to do this if you're writing a client using * ExecFuture and ##netcat##, but you probably should not do that. * * NOTE: This completely discards the data. It won't be available when the * future resolves. This is almost certainly only useful if you need the * buffer memory for some reason. * * @return this * @task interact */ public function discardBuffers() { $this->stdout = ''; $this->stderr = ''; $this->stdoutPos = 0; $this->stderrPos = 0; return $this; } /** * Returns true if this future was killed by a timeout configured with * @{method:setTimeout}. * * @return bool True if the future was killed for exceeding its time limit. */ public function getWasKilledByTimeout() { return $this->killedByTimeout; } /* -( Configuring Execution )---------------------------------------------- */ /** * Set a hard limit on execution time. If the command runs longer, it will * be killed and the future will resolve with an error code. You can test * if a future was killed by a timeout with @{method:getWasKilledByTimeout}. * * @param int Maximum number of seconds this command may execute for. * @return this * @task config */ public function setTimeout($seconds) { $this->timeout = $seconds; return $this; } /* -( Resolving Execution )------------------------------------------------ */ /** * Resolve a command you expect to exit with return code 0. Works like * @{method:resolve}, but throws if $err is nonempty. Returns only * $stdout and $stderr. See also @{function:execx}. * * list($stdout, $stderr) = $future->resolvex(); * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return pair <$stdout, $stderr> pair. * @task resolve */ public function resolvex($timeout = null) { list($err, $stdout, $stderr) = $this->resolve($timeout); if ($err) { $cmd = $this->command; throw new CommandException( "Command failed with error #{$err}!", $cmd, $err, $stdout, $stderr); } return array($stdout, $stderr); } /** * Resolve a command you expect to return valid JSON. Works like * @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not * valid JSON. Returns a PHP array, decoded from the JSON command output. * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return array PHP array, decoded from JSON command output. * @task resolve */ public function resolveJSON($timeout = null) { list($stdout, $stderr) = $this->resolvex($timeout); if (strlen($stderr)) { $cmd = $this->command; throw new CommandException( "JSON command '{$cmd}' emitted text to stderr when none was expected: ". $stderr, $cmd, 0, $stdout, $stderr); } $object = json_decode($stdout, true); if (!is_array($object)) { $cmd = $this->command; throw new CommandException( "JSON command '{$cmd}' did not produce a valid JSON object on stdout: ". $stdout, $cmd, 0, $stdout, $stderr); } return $object; } /** * Resolve the process by abruptly terminating it. * * @return list List of results. * @task resolve */ public function resolveKill() { if (defined('SIGKILL')) { $signal = SIGKILL; } else { $signal = 9; } proc_terminate($this->proc, $signal); $this->result = array( 128 + $signal, $this->stdout, $this->stderr); - $this->__destruct(); - $this->endProfile(); + $this->closeProcess(); return $this->result; } /* -( Internals )---------------------------------------------------------- */ /** * Provides read sockets to the future core. * * @return list List of read sockets. * @task internal */ public function getReadSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdout) && !feof($stdout)) { $sockets[] = $stdout; } if (isset($stderr) && !feof($stderr)) { $sockets[] = $stderr; } return $sockets; } /** * Provides write sockets to the future core. * * @return list List of write sockets. * @task internal */ public function getWriteSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdin) && strlen($this->stdin) && !feof($stdin)) { $sockets[] = $stdin; } return $sockets; } /** * Reads some bytes from a stream, discarding output once a certain amount * has been accumulated. * * @param resource Stream to read from. * @param int Maximum number of bytes to return from $stream. If * additional bytes are available, they will be read and * discarded. * @param string Human-readable description of stream, for exception * message. * @return string The data read from the stream. * @task internal */ protected function readAndDiscard($stream, $limit, $description) { $output = ''; do { $data = fread($stream, 4096); if (false === $data) { throw new Exception('Failed to read from '.$description); } $read_bytes = strlen($data); if ($read_bytes > 0 && $limit > 0) { if ($read_bytes > $limit) { $data = substr($data, 0, $limit); } $output .= $data; $limit -= strlen($data); } } while ($read_bytes > 0); return $output; } /** * Begin or continue command execution. * * @return bool True if future has resolved. * @task internal */ public function isReady() { if (!$this->pipes) { $profiler = PhutilServiceProfiler::getInstance(); $this->profilerCallID = $profiler->beginServiceCall( array( 'type' => 'exec', 'command' => $this->command, )); if (!$this->start) { // We might already have started the timer via initating resolution. $this->start = microtime(true); } $pipes = array(); $proc = proc_open( $this->command, self::$descriptorSpec, $pipes, $this->cwd); if (!is_resource($proc)) { throw new Exception('Failed to open process.'); } $this->pipes = $pipes; $this->proc = $proc; list($stdin, $stdout, $stderr) = $pipes; if (!phutil_is_windows()) { // On Windows, there's no such thing as nonblocking interprocess I/O. // Just leave the sockets blocking and hope for the best. Some features // will not work. if ((!stream_set_blocking($stdout, false)) || (!stream_set_blocking($stderr, false)) || (!stream_set_blocking($stdin, false))) { $this->__destruct(); throw new Exception('Failed to set streams nonblocking.'); } } $this->tryToCloseStdin(); return false; } if (!$this->proc) { return true; } list($stdin, $stdout, $stderr) = $this->pipes; if (isset($this->stdin) && strlen($this->stdin)) { $bytes = fwrite($stdin, $this->stdin); if ($bytes === false) { throw new Exception('Unable to write to stdin!'); } else if ($bytes) { $this->stdin = substr($this->stdin, $bytes); } } $this->tryToCloseStdin(); // Read status before reading pipes so that we can never miss data that // arrives between our last read and the process exiting. $status = $this->procGetStatus(); $this->stdout .= $this->readAndDiscard( $stdout, $this->getStdoutSizeLimit() - strlen($this->stdout), 'stdout'); $this->stderr .= $this->readAndDiscard( $stderr, $this->getStderrSizeLimit() - strlen($this->stderr), 'stderr'); if (!$status['running']) { $this->result = array( $status['exitcode'], $this->stdout, $this->stderr, ); - $this->__destruct(); - $this->endProfile(); + $this->closeProcess(); return true; } $elapsed = (microtime(true) - $this->start); if ($this->timeout && ($elapsed >= $this->timeout)) { $this->killedByTimeout = true; $this->resolveKill(); return true; } } + /** + * @return void + * @task internal + */ + public function __destruct() { + if (!$this->proc) { + return; + } + + // NOTE: If we try to proc_close() an open process, we hang indefinitely. To + // avoid this, kill the process explicitly if it's still running. + + $status = $this->procGetStatus(); + if ($status['running']) { + $this->resolveKill(); + } else { + $this->closeProcess(); + } + } + + /** * Close and free resources if necessary. * * @return void * @task internal */ - public function __destruct() { + private function closeProcess() { foreach ($this->pipes as $pipe) { if (isset($pipe)) { @fclose($pipe); } } - $this->pipes = array(null, null, null); + $this->pipes = array(null, null, null); if ($this->proc) { @proc_close($this->proc); $this->proc = null; } $this->stdin = null; - } - - /** - * End the service call profiler for this command. - * - * @return void - * @task internal - */ - private function endProfile() { if ($this->profilerCallID !== null) { $profiler = PhutilServiceProfiler::getInstance(); $profiler->endServiceCall( $this->profilerCallID, array( 'err' => $this->result ? idx($this->result, 0) : null, )); + $this->profilerCallID = null; } } + /** * Execute proc_get_status(), but avoid pitfalls. * * @return dict Process status. * @task internal */ private function procGetStatus() { // After the process exits, we only get one chance to read proc_get_status() // before it starts returning garbage. Make sure we don't throw away the // last good read. if ($this->procStatus) { if (!$this->procStatus['running']) { return $this->procStatus; } } $this->procStatus = proc_get_status($this->proc); return $this->procStatus; } /** * Try to close stdin, if we're done using it. This keeps us from hanging if * the process on the other end of the pipe is waiting for EOF. * * @return void * @task internal */ private function tryToCloseStdin() { if (!$this->closePipe) { // We've been told to keep the pipe open by a call to write(..., true). return; } if (strlen($this->stdin)) { // We still have bytes to write. return; } list($stdin) = $this->pipes; if (!$stdin) { // We've already closed stdin. return; } // There's nothing stopping us from closing stdin, so close it. @fclose($stdin); $this->pipes[0] = null; } public function getDefaultWait() { $wait = parent::getDefaultWait(); if ($this->timeout) { if (!$this->start) { $this->start = microtime(true); } $elapsed = (microtime(true) - $this->start); $wait = max(0, min($this->timeout - $elapsed, $wait)); } return $wait; } } diff --git a/src/future/exec/__tests__/ExecFutureTestCase.php b/src/future/exec/__tests__/ExecFutureTestCase.php index 0b88f75..f4ef39b 100644 --- a/src/future/exec/__tests__/ExecFutureTestCase.php +++ b/src/future/exec/__tests__/ExecFutureTestCase.php @@ -1,100 +1,112 @@ write('')->resolvex(); $this->assertEqual('', $stdout); } public function testKeepPipe() { // NOTE: This is mosty testing the semantics of $keep_pipe in write(). list($stdout) = id(new ExecFuture('cat')) ->write('', true) ->start() ->write('x', true) ->write('y', true) ->write('z', false) ->resolvex(); $this->assertEqual('xyz', $stdout); } public function testLargeBuffer() { // NOTE: This is mostly a coverage test to hit branches where we're still // flushing a buffer. $data = str_repeat('x', 1024 * 1024 * 4); list($stdout) = id(new ExecFuture('cat'))->write($data)->resolvex(); $this->assertEqual($data, $stdout); } public function testBufferLimit() { $data = str_repeat('x', 1024 * 1024); list($stdout) = id(new ExecFuture('cat')) ->setStdoutSizeLimit(1024) ->write($data) ->resolvex(); $this->assertEqual(substr($data, 0, 1024), $stdout); } public function testResolveTimeoutTestShouldRunLessThan1Sec() { // NOTE: This tests interactions between the resolve() timeout and the // ExecFuture timeout, which are similar but not identical. $future = id(new ExecFuture('sleep 32000'))->start(); $future->setTimeout(32000); // We expect this to return in 0.01s. $result = $future->resolve(0.01); $this->assertEqual($result, null); // We expect this to now force the time out / kill immediately. If we don't // do this, we'll hang when exiting until our subprocess exits (32000 // seconds!) $future->setTimeout(0.01); $future->resolve(); } public function testTimeoutTestShouldRunLessThan1Sec() { // NOTE: This is partly testing that we choose appropriate select wait // times; this test should run for significantly less than 1 second. $future = new ExecFuture('sleep 32000'); list($err) = $future->setTimeout(0.01)->resolve(); $this->assertEqual(true, $err > 0); $this->assertEqual(true, $future->getWasKilledByTimeout()); } public function testMultipleTimeoutsTestShouldRunLessThan1Sec() { $futures = array(); for ($ii = 0; $ii < 4; $ii++) { $futures[] = id(new ExecFuture('sleep 32000'))->setTimeout(0.01); } foreach (Futures($futures) as $future) { list ($err) = $future->resolve(); $this->assertEqual(true, $err > 0); $this->assertEqual(true, $future->getWasKilledByTimeout()); } } + public function testNoHangOnExecFutureDestructionWithRunningChild() { + $start = microtime(true); + $future = new ExecFuture('sleep 30'); + $future->start(); + unset($future); + $end = microtime(true); + + // If ExecFuture::__destruct() hangs until the child closes, we won't make + // it here in time. + $this->assertEqual(true, ($end - $start) < 5); + } + }