Page MenuHomec4science

PhutilDaemon.php
No OneTemporary

File Metadata

Created
Sat, Apr 27, 15:11

PhutilDaemon.php

<?php
/**
* Scaffolding for implementing robust background processing scripts.
*
*
* Autoscaling
* ===========
*
* Autoscaling automatically launches copies of a daemon when it is busy
* (scaling the pool up) and stops them when they're idle (scaling the pool
* down). This is appropriate for daemons which perform highly parallelizable
* work.
*
* To make a daemon support autoscaling, the implementation should look
* something like this:
*
* while (!$this->shouldExit()) {
* if (work_available()) {
* $this->willBeginWork();
* do_work();
* $this->sleep(0);
* } else {
* $this->willBeginIdle();
* $this->sleep(1);
* }
* }
*
* In particular, call @{method:willBeginWork} before becoming busy, and
* @{method:willBeginIdle} when no work is available. If the daemon is launched
* into an autoscale pool, this will cause the pool to automatically scale up
* when busy and down when idle.
*
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
* autoscaling daemon.
*
* Launching a daemon which does not make these callbacks into an autoscale
* pool will have no effect.
*
* @task overseer Communicating With the Overseer
* @task autoscale Autoscaling Daemon Pools
*/
abstract class PhutilDaemon extends Phobject {
const MESSAGETYPE_STDOUT = 'stdout';
const MESSAGETYPE_HEARTBEAT = 'heartbeat';
const MESSAGETYPE_BUSY = 'busy';
const MESSAGETYPE_IDLE = 'idle';
const MESSAGETYPE_DOWN = 'down';
const MESSAGETYPE_HIBERNATE = 'hibernate';
const WORKSTATE_BUSY = 'busy';
const WORKSTATE_IDLE = 'idle';
private $argv;
private $traceMode;
private $traceMemory;
private $verbose;
private $notifyReceived;
private $inGracefulShutdown;
private $workState = null;
private $idleSince = null;
private $scaledownDuration;
final public function setVerbose($verbose) {
$this->verbose = $verbose;
return $this;
}
final public function getVerbose() {
return $this->verbose;
}
final public function setScaledownDuration($scaledown_duration) {
$this->scaledownDuration = $scaledown_duration;
return $this;
}
final public function getScaledownDuration() {
return $this->scaledownDuration;
}
final public function __construct(array $argv) {
$this->argv = $argv;
$router = PhutilSignalRouter::getRouter();
$handler_key = 'daemon.term';
if (!$router->getHandler($handler_key)) {
$handler = new PhutilCallbackSignalHandler(
SIGTERM,
__CLASS__.'::onTermSignal');
$router->installHandler($handler_key, $handler);
}
pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));
// Without discard mode, this consumes unbounded amounts of memory. Keep
// memory bounded.
PhutilServiceProfiler::getInstance()->enableDiscardMode();
$this->beginStdoutCapture();
}
final public function __destruct() {
$this->endStdoutCapture();
}
final public function stillWorking() {
$this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);
if ($this->traceMemory) {
$daemon = get_class($this);
fprintf(
STDERR,
"%s %s %s\n",
'<RAMS>',
$daemon,
pht(
'Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
}
final public function shouldExit() {
return $this->inGracefulShutdown;
}
final protected function shouldHibernate($duration) {
// Don't hibernate if we don't have very long to sleep.
if ($duration < 30) {
return false;
}
// Never hibernate if we're part of a pool and could scale down instead.
// We only hibernate the last process to drop the pool size to zero.
if ($this->getScaledownDuration()) {
return false;
}
// Don't hibernate for too long.
$duration = min($duration, phutil_units('3 minutes in seconds'));
$this->emitOverseerMessage(
self::MESSAGETYPE_HIBERNATE,
array(
'duration' => $duration,
));
$this->log(
pht(
'Preparing to hibernate for %s second(s).',
new PhutilNumber($duration)));
return true;
}
final protected function sleep($duration) {
$this->notifyReceived = false;
$this->willSleep($duration);
$this->stillWorking();
$scale_down = $this->getScaledownDuration();
$max_sleep = 60;
if ($scale_down) {
$max_sleep = min($max_sleep, $scale_down);
}
if ($scale_down) {
if ($this->workState == self::WORKSTATE_IDLE) {
$dur = $this->getIdleDuration();
$this->log(pht('Idle for %s seconds.', $dur));
}
}
while ($duration > 0 &&
!$this->notifyReceived &&
!$this->shouldExit()) {
// If this is an autoscaling clone and we've been idle for too long,
// we're going to scale the pool down by exiting and not restarting. The
// DOWN message tells the overseer that we don't want to be restarted.
if ($scale_down) {
if ($this->workState == self::WORKSTATE_IDLE) {
if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
$this->inGracefulShutdown = true;
$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
$this->log(
pht(
'Daemon was idle for more than %s second(s), '.
'scaling pool down.',
new PhutilNumber($scale_down)));
break;
}
}
}
sleep(min($duration, $max_sleep));
$duration -= $max_sleep;
$this->stillWorking();
}
}
protected function willSleep($duration) {
return;
}
public static function onTermSignal($signo) {
self::didCatchSignal($signo);
}
final protected function getArgv() {
return $this->argv;
}
final public function execute() {
$this->willRun();
$this->run();
}
abstract protected function run();
final public function setTraceMemory() {
$this->traceMemory = true;
return $this;
}
final public function getTraceMemory() {
return $this->traceMemory;
}
final public function setTraceMode() {
$this->traceMode = true;
PhutilServiceProfiler::installEchoListener();
PhutilConsole::getConsole()->getServer()->setEnableLog(true);
$this->didSetTraceMode();
return $this;
}
final public function getTraceMode() {
return $this->traceMode;
}
final public function onGracefulSignal($signo) {
self::didCatchSignal($signo);
$this->inGracefulShutdown = true;
}
final public function onNotifySignal($signo) {
self::didCatchSignal($signo);
$this->notifyReceived = true;
$this->onNotify($signo);
}
protected function onNotify($signo) {
// This is a hook for subclasses.
}
protected function willRun() {
// This is a hook for subclasses.
}
protected function didSetTraceMode() {
// This is a hook for subclasses.
}
final protected function log($message) {
if ($this->verbose) {
$daemon = get_class($this);
fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);
}
}
private static function didCatchSignal($signo) {
$signame = phutil_get_signal_name($signo);
fprintf(
STDERR,
"%s Caught signal %s (%s).\n",
'<SGNL>',
$signo,
$signame);
}
/* -( Communicating With the Overseer )------------------------------------ */
private function beginStdoutCapture() {
ob_start(array($this, 'didReceiveStdout'), 2);
}
private function endStdoutCapture() {
ob_end_flush();
}
public function didReceiveStdout($data) {
if (!strlen($data)) {
return '';
}
return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);
}
private function encodeOverseerMessage($type, $data) {
$structure = array($type);
if ($data !== null) {
$structure[] = $data;
}
return json_encode($structure)."\n";
}
private function emitOverseerMessage($type, $data) {
$this->endStdoutCapture();
echo $this->encodeOverseerMessage($type, $data);
$this->beginStdoutCapture();
}
public static function errorListener($event, $value, array $metadata) {
// If the caller has redirected the error log to a file, PHP won't output
// messages to stderr, so the overseer can't capture them. Install a
// listener which just echoes errors to stderr, so the overseer is always
// aware of errors.
$console = PhutilConsole::getConsole();
$message = idx($metadata, 'default_message');
if ($message) {
$console->writeErr("%s\n", $message);
}
if (idx($metadata, 'trace')) {
$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);
$console->writeErr("%s\n", $trace);
}
}
/* -( Autoscaling )-------------------------------------------------------- */
/**
* Prepare to become busy. This may autoscale the pool up.
*
* This notifies the overseer that the daemon has become busy. If daemons
* that are part of an autoscale pool are continuously busy for a prolonged
* period of time, the overseer may scale up the pool.
*
* @return this
* @task autoscale
*/
protected function willBeginWork() {
if ($this->workState != self::WORKSTATE_BUSY) {
$this->workState = self::WORKSTATE_BUSY;
$this->idleSince = null;
$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
}
return $this;
}
/**
* Prepare to idle. This may autoscale the pool down.
*
* This notifies the overseer that the daemon is no longer busy. If daemons
* that are part of an autoscale pool are idle for a prolonged period of
* time, they may exit to scale the pool down.
*
* @return this
* @task autoscale
*/
protected function willBeginIdle() {
if ($this->workState != self::WORKSTATE_IDLE) {
$this->workState = self::WORKSTATE_IDLE;
$this->idleSince = time();
$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
}
return $this;
}
protected function getIdleDuration() {
if (!$this->idleSince) {
return null;
}
$now = time();
return ($now - $this->idleSince);
}
}

Event Timeline