Page MenuHomec4science

PhageAgentAction.php
No OneTemporary

File Metadata

Created
Sat, May 18, 23:58

PhageAgentAction.php

<?php
abstract class PhageAgentAction
extends PhageAction {
private $future;
private $channel;
private $commands = array();
private $commandKey = 0;
private $isExiting = false;
private $isActive = false;
private $limit = 0;
private $throttle = 0;
private $waitUntil;
private $queued = array();
private $active = array();
final public function isContainerAction() {
return true;
}
protected function willAddAction(PhageAction $action) {
// TODO: You should probably be able to add agents to an agent, is that
// explicit or by making agents execute actions?
if (!($action instanceof PhageExecuteAction)) {
throw new Exception(
pht(
'Can only add execute actions to an agent.'));
}
if ($this->isExiting) {
throw new Exception(
pht(
'You can not add new actions to an exiting agent.'));
}
$key = 'command/'.$this->commandKey++;
$this->commands[$key] = array(
'key' => $key,
'command' => $action,
);
$this->queued[$key] = $key;
}
public function isActiveAgent() {
return $this->isActive;
}
final public function setLimit($limit) {
$this->limit = $limit;
return $this;
}
final public function getLimit() {
return $this->limit;
}
final public function setThrottle($throttle) {
$this->throttle = $throttle;
return $this;
}
final public function getThrottle() {
return $this->throttle;
}
abstract protected function newAgentFuture(PhutilCommandString $command);
protected function getAllWaitingChannels() {
$channels = array();
if ($this->isActiveAgent()) {
$channels[] = $this->channel;
}
return $channels;
}
public function startAgent() {
$bootloader = new PhagePHPAgentBootloader();
$future = $this->newAgentFuture($bootloader->getBootCommand());
$future->write($bootloader->getBootSequence(), $keep_open = true);
$channel = new PhutilExecChannel($future);
$channel->setStderrHandler(array($this, 'didReadAgentStderr'));
$channel = new PhutilJSONProtocolChannel($channel);
$this->future = $future;
$this->channel = $channel;
$this->isActive = true;
}
private function updateQueue() {
// If we don't have anything waiting in queue, we have nothing to do.
if (!$this->queued) {
return false;
}
$now = microtime(true);
// If we're throttling commands and recently started one, don't start
// another one yet.
$throttle = $this->getThrottle();
if ($throttle) {
if ($this->waitUntil && ($now < $this->waitUntil)) {
return false;
}
}
// If we're limiting parallelism and the active list is full, don't
// start anything else yet.
$limit = $this->getLimit();
if ($limit) {
if (count($this->active) >= $limit) {
return false;
}
}
// Move a command out of the queue and tell the agent to execute it.
$key = head($this->queued);
unset($this->queued[$key]);
$this->active[$key] = $key;
$command = $this->commands[$key]['command'];
$channel = $this->getChannel();
$channel->write(
array(
'type' => 'EXEC',
'key' => $key,
'command' => $command->getCommand()->getUnmaskedString(),
));
if ($throttle) {
$this->waitUntil = ($now + $throttle);
}
return true;
}
private function getChannel() {
return $this->channel;
}
public function updateAgent() {
if (!$this->isActiveAgent()) {
return;
}
$channel = $this->channel;
while (true) {
do {
$did_update = $this->updateQueue();
} while ($did_update);
$is_open = $channel->update();
$message = $channel->read();
if ($message !== null) {
switch ($message['type']) {
case 'TEXT':
$key = $message['key'];
$this->writeOutput($key, $message['kind'], $message['text']);
break;
case 'RSLV':
$key = $message['key'];
$command = $this->commands[$key]['command'];
$this->writeOutput($key, 'stdout', $message['stdout']);
$this->writeOutput($key, 'stderr', $message['stderr']);
$exit_code = $message['err'];
$command->setExitCode($exit_code);
if ($exit_code != 0) {
$exit_code = $this->formatOutput(
pht(
'Command ("%s") exited nonzero ("%s")!',
$command->getCommand(),
$exit_code),
$command->getLabel());
fprintf(STDOUT, '%s', $exit_code);
}
unset($this->active[$key]);
if (!$this->active && !$this->queued) {
$channel->write(
array(
'type' => 'EXIT',
'key' => 'exit',
));
$this->isExiting = true;
break;
}
}
}
if (!$is_open) {
if ($this->isExiting) {
$this->isActive = false;
break;
} else {
throw new Exception(pht('Channel closed unexpectedly!'));
}
}
if ($message === null) {
break;
}
}
}
private function writeOutput($key, $kind, $text) {
if (!strlen($text)) {
return;
}
switch ($kind) {
case 'stdout':
$target = STDOUT;
break;
case 'stderr':
$target = STDERR;
break;
default:
throw new Exception(pht('Unknown output kind "%s".', $kind));
}
$command = $this->commands[$key]['command'];
$label = $command->getLabel();
if (!strlen($label)) {
$label = pht('Unknown Command');
}
$text = $this->formatOutput($text, $label);
fprintf($target, '%s', $text);
}
private function formatOutput($output, $context) {
$output = phutil_split_lines($output, false);
foreach ($output as $key => $line) {
$output[$key] = tsprintf("[%s] %R\n", $context, $line);
}
$output = implode('', $output);
return $output;
}
public function didReadAgentStderr($channel, $stderr) {
throw new Exception(
pht(
'Unexpected output on agent stderr: %s.',
$stderr));
}
}

Event Timeline