diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index c448e7221..473649ba1 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,286 +1,288 @@ <?php /** * @task config Configuring Retries and Failures */ abstract class PhabricatorWorker extends Phobject { private $data; private static $runAllTasksInProcess = false; private $queuedTasks = array(); private $currentWorkerTask; // NOTE: Lower priority numbers execute first. The priority numbers have to // have the same ordering that IDs do (lowest first) so MySQL can use a // multipart key across both of them efficiently. const PRIORITY_ALERTS = 1000; const PRIORITY_DEFAULT = 2000; const PRIORITY_BULK = 3000; const PRIORITY_IMPORT = 4000; /** * Special owner indicating that the task has yielded. */ const YIELD_OWNER = '(yield)'; /* -( Configuring Retries and Failures )----------------------------------- */ /** * Return the number of seconds this worker needs hold a lease on the task for * while it performs work. For most tasks you can leave this at `null`, which * will give you a default lease (currently 2 hours). * * For tasks which may take a very long time to complete, you should return * an upper bound on the amount of time the task may require. * * @return int|null Number of seconds this task needs to remain leased for, * or null for a default lease. * * @task config */ public function getRequiredLeaseTime() { return null; } /** * Return the maximum number of times this task may be retried before it is * considered permanently failed. By default, tasks retry indefinitely. You * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an * immediate permanent failure. * * @return int|null Number of times the task will retry before permanent * failure. Return `null` to retry indefinitely. * * @task config */ public function getMaximumRetryCount() { return null; } /** * Return the number of seconds a task should wait after a failure before * retrying. For most tasks you can leave this at `null`, which will give you * a short default retry period (currently 60 seconds). * * @param PhabricatorWorkerTask The task itself. This object is probably * useful mostly to examine the failure count * if you want to implement staggered retries, * or to examine the execution exception if * you want to react to different failures in * different ways. * @return int|null Number of seconds to wait between retries, * or null for a default retry period * (currently 60 seconds). * * @task config */ public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { return null; } public function setCurrentWorkerTask(PhabricatorWorkerTask $task) { $this->currentWorkerTask = $task; return $this; } public function getCurrentWorkerTask() { return $this->currentWorkerTask; } public function getCurrentWorkerTaskID() { $task = $this->getCurrentWorkerTask(); if (!$task) { return null; } return $task->getID(); } abstract protected function doWork(); final public function __construct($data) { $this->data = $data; } final protected function getTaskData() { return $this->data; } final protected function getTaskDataValue($key, $default = null) { $data = $this->getTaskData(); if (!is_array($data)) { throw new PhabricatorWorkerPermanentFailureException( pht('Expected task data to be a dictionary.')); } return idx($data, $key, $default); } final public function executeTask() { $this->doWork(); } final public static function scheduleTask( $task_class, $data, $options = array()) { PhutilTypeSpec::checkMap( $options, array( 'priority' => 'optional int|null', 'objectPHID' => 'optional string|null', 'delayUntil' => 'optional int|null', )); $priority = idx($options, 'priority'); if ($priority === null) { $priority = self::PRIORITY_DEFAULT; } $object_phid = idx($options, 'objectPHID'); $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) ->setData($data) ->setPriority($priority) ->setObjectPHID($object_phid); $delay = idx($options, 'delayUntil'); if ($delay) { $task->setLeaseExpires($delay); } if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); while (true) { try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { - list($queued_class, $queued_data, $queued_priority) = $queued_task; - $queued_options = array('priority' => $queued_priority); + list($queued_class, $queued_data, $queued_options) = $queued_task; self::scheduleTask($queued_class, $queued_data, $queued_options); } break; } catch (PhabricatorWorkerYieldException $ex) { phlog( pht( 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); sleep($ex->getDuration()); } } // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } public function renderForDisplay(PhabricatorUser $viewer) { return null; } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } /** * Queue a task to be executed after this one succeeds. * * The followup task will be queued only if this task completes cleanly. * * @param string Task class to queue. * @param array Data for the followup task. - * @param int|null Priority for the followup task. + * @param array Options for the followup task. * @return this */ - final protected function queueTask($class, array $data, $priority = null) { - $this->queuedTasks[] = array($class, $data, $priority); + final protected function queueTask( + $class, + array $data, + array $options = array()) { + $this->queuedTasks[] = array($class, $data, $options); return $this; } /** * Get tasks queued as followups by @{method:queueTask}. * * @return list<tuple<string, wild, int|null>> Queued task specifications. */ final public function getQueuedTasks() { return $this->queuedTasks; } /** * Awaken tasks that have yielded. * * Reschedules the specified tasks if they are currently queued in a yielded, * unleased, unretried state so they'll execute sooner. This can let the * queue avoid unnecessary waits. * * This method does not provide any assurances about when these tasks will * execute, or even guarantee that it will have any effect at all. * * @param list<id> List of task IDs to try to awaken. * @return void */ final public static function awakenTaskIDs(array $ids) { if (!$ids) { return; } $table = new PhabricatorWorkerActiveTask(); $conn_w = $table->establishConnection('w'); // NOTE: At least for now, we're keeping these tasks yielded, just // pretending that they threw a shorter yield than they really did. // Overlap the windows here to handle minor client/server time differences // and because it's likely correct to push these tasks to the head of their // respective priorities. There is a good chance they are ready to execute. $window = phutil_units('1 hour in seconds'); $epoch_ago = (PhabricatorTime::getNow() - $window); queryfx( $conn_w, 'UPDATE %T SET leaseExpires = %d WHERE id IN (%Ld) AND leaseOwner = %s AND leaseExpires > %d AND failureCount = 0', $table->getTableName(), $epoch_ago, $ids, self::YIELD_OWNER, $epoch_ago); } } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php index 6c2f5b446..57a1794ec 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -1,233 +1,234 @@ <?php final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask { protected $failureTime; private $serverTime; private $localTime; protected function getConfiguration() { $parent = parent::getConfiguration(); $config = array( self::CONFIG_IDS => self::IDS_COUNTER, self::CONFIG_TIMESTAMPS => false, self::CONFIG_KEY_SCHEMA => array( 'dataID' => array( 'columns' => array('dataID'), 'unique' => true, ), 'taskClass' => array( 'columns' => array('taskClass'), ), 'leaseExpires' => array( 'columns' => array('leaseExpires'), ), 'leaseOwner' => array( 'columns' => array('leaseOwner(16)'), ), 'key_failuretime' => array( 'columns' => array('failureTime'), ), 'leaseOwner_2' => array( 'columns' => array('leaseOwner', 'priority', 'id'), ), ) + $parent[self::CONFIG_KEY_SCHEMA], ); $config[self::CONFIG_COLUMN_SCHEMA] = array( // T6203/NULLABILITY // This isn't nullable in the archive table, so at a minimum these // should be the same. 'dataID' => 'uint32?', ) + $parent[self::CONFIG_COLUMN_SCHEMA]; return $config + $parent; } public function setServerTime($server_time) { $this->serverTime = $server_time; $this->localTime = time(); return $this; } public function setLeaseDuration($lease_duration) { $this->checkLease(); $server_lease_expires = $this->serverTime + $lease_duration; $this->setLeaseExpires($server_lease_expires); // NOTE: This is primarily to allow unit tests to set negative lease // durations so they don't have to wait around for leases to expire. We // check that the lease is valid above. return $this->forceSaveWithoutLease(); } public function save() { $this->checkLease(); return $this->forceSaveWithoutLease(); } public function forceSaveWithoutLease() { $is_new = !$this->getID(); if ($is_new) { $this->failureCount = 0; } if ($is_new && ($this->getData() !== null)) { $data = new PhabricatorWorkerTaskData(); $data->setData($this->getData()); $data->save(); $this->setDataID($data->getID()); } return parent::save(); } protected function checkLease() { $owner = $this->leaseOwner; if (!$owner) { return; } if ($owner == PhabricatorWorker::YIELD_OWNER) { return; } $current_server_time = $this->serverTime + (time() - $this->localTime); if ($current_server_time >= $this->leaseExpires) { throw new Exception( pht( 'Trying to update Task %d (%s) after lease expiration!', $this->getID(), $this->getTaskClass())); } } public function delete() { throw new Exception( pht( 'Active tasks can not be deleted directly. '. 'Use %s to move tasks to the archive.', 'archiveTask()')); } public function archiveTask($result, $duration) { if ($this->getID() === null) { throw new Exception( pht("Attempting to archive a task which hasn't been saved!")); } $this->checkLease(); $archive = id(new PhabricatorWorkerArchiveTask()) ->setID($this->getID()) ->setTaskClass($this->getTaskClass()) ->setLeaseOwner($this->getLeaseOwner()) ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) ->setPriority($this->getPriority()) ->setObjectPHID($this->getObjectPHID()) ->setResult($result) ->setDuration($duration); // NOTE: This deletes the active task (this object)! $archive->save(); return $archive; } public function executeTask() { // We do this outside of the try .. catch because we don't have permission // to release the lease otherwise. $this->checkLease(); $did_succeed = false; $worker = null; try { $worker = $this->getWorkerInstance(); $worker->setCurrentWorkerTask($this); $maximum_failures = $worker->getMaximumRetryCount(); if ($maximum_failures !== null) { if ($this->getFailureCount() > $maximum_failures) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Task % has exceeded the maximum number of failures (%d).', $this->getID(), $maximum_failures)); } } $lease = $worker->getRequiredLeaseTime(); if ($lease !== null) { $this->setLeaseDuration($lease); } $t_start = microtime(true); $worker->executeTask(); $t_end = microtime(true); $duration = (int)(1000000 * ($t_end - $t_start)); $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $duration); $did_succeed = true; } catch (PhabricatorWorkerPermanentFailureException $ex) { $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_FAILURE, 0); $result->setExecutionException($ex); } catch (PhabricatorWorkerYieldException $ex) { $this->setExecutionException($ex); $this->setLeaseOwner(PhabricatorWorker::YIELD_OWNER); $retry = $ex->getDuration(); $retry = max($retry, 5); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } catch (Exception $ex) { $this->setExecutionException($ex); $this->setFailureCount($this->getFailureCount() + 1); $this->setFailureTime(time()); $retry = null; if ($worker) { $retry = $worker->getWaitBeforeRetry($this); } $retry = coalesce( $retry, PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry()); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } // NOTE: If this throws, we don't want it to cause the task to fail again, // so execute it out here and just let the exception escape. if ($did_succeed) { foreach ($worker->getQueuedTasks() as $task) { - list($class, $data) = $task; - PhabricatorWorker::scheduleTask( - $class, - $data, - array( - 'priority' => (int)$this->getPriority(), - )); + list($class, $data, $options) = $task; + + // Default the new task priority to our own priority. + $options = $options + array( + 'priority' => (int)$this->getPriority(), + ); + + PhabricatorWorker::scheduleTask($class, $data, $options); } } return $result; } }