diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementDebugWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementDebugWorkflow.php index 510e95d4c..269301fdd 100644 --- a/src/applications/aphlict/management/PhabricatorAphlictManagementDebugWorkflow.php +++ b/src/applications/aphlict/management/PhabricatorAphlictManagementDebugWorkflow.php @@ -1,21 +1,24 @@ setName('debug') ->setSynopsis( pht( 'Start the notifications server in the foreground and print large '. - 'volumes of diagnostic information to the console.')) - ->setArguments(array()); + 'volumes of diagnostic information to the console.')); } public function execute(PhutilArgumentParser $args) { - $this->willLaunch(true); - return $this->launch(true); + parent::execute($args); + $this->setDebug(true); + + $this->willLaunch(); + return $this->launch(); } } diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementRestartWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementRestartWorkflow.php index 4772587b5..8d96ecd98 100644 --- a/src/applications/aphlict/management/PhabricatorAphlictManagementRestartWorkflow.php +++ b/src/applications/aphlict/management/PhabricatorAphlictManagementRestartWorkflow.php @@ -1,21 +1,23 @@ setName('restart') - ->setSynopsis(pht('Stop, then start the notifications server.')) - ->setArguments(array()); + ->setSynopsis(pht('Stop, then start the notifications server.')); } public function execute(PhutilArgumentParser $args) { + parent::execute($args); + $err = $this->executeStopCommand(); if ($err) { return $err; } return $this->executeStartCommand(); } } diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementStartWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementStartWorkflow.php index 38c9ab88e..3e1ab8a13 100644 --- a/src/applications/aphlict/management/PhabricatorAphlictManagementStartWorkflow.php +++ b/src/applications/aphlict/management/PhabricatorAphlictManagementStartWorkflow.php @@ -1,17 +1,18 @@ setName('start') - ->setSynopsis(pht('Start the notifications server.')) - ->setArguments(array()); + ->setSynopsis(pht('Start the notifications server.')); } public function execute(PhutilArgumentParser $args) { + parent::execute($args); return $this->executeStartCommand(); } } diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php index abf77b11f..cc21d68d6 100644 --- a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php +++ b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php @@ -1,247 +1,275 @@ setArguments( + array( + array( + 'name' => 'client-host', + 'param' => 'hostname', + 'help' => pht('Hostname to bind to for the client server.'), + ), + )); + } + + public function execute(PhutilArgumentParser $args) { + $this->clientHost = $args->getArg('client-host'); + return 0; + } + final public function getPIDPath() { return PhabricatorEnv::getEnvConfig('notification.pidfile'); } final public function getPID() { $pid = null; if (Filesystem::pathExists($this->getPIDPath())) { $pid = (int)Filesystem::readFile($this->getPIDPath()); } return $pid; } final public function cleanup($signo = '?') { global $g_future; if ($g_future) { $g_future->resolveKill(); $g_future = null; } Filesystem::remove($this->getPIDPath()); exit(1); } + protected final function setDebug($debug) { + $this->debug = $debug; + } + public static function requireExtensions() { self::mustHaveExtension('pcntl'); self::mustHaveExtension('posix'); } private static function mustHaveExtension($ext) { if (!extension_loaded($ext)) { echo "ERROR: The PHP extension '{$ext}' is not installed. You must ". "install it to run aphlict on this machine.\n"; exit(1); } $extension = new ReflectionExtension($ext); foreach ($extension->getFunctions() as $function) { $function = $function->name; if (!function_exists($function)) { echo "ERROR: The PHP function {$function}() is disabled. You must ". "enable it to run aphlict on this machine.\n"; exit(1); } } } - final protected function willLaunch($debug = false) { + final protected function willLaunch() { $console = PhutilConsole::getConsole(); $pid = $this->getPID(); if ($pid) { throw new PhutilArgumentUsageException( pht( 'Unable to start notifications server because it is already '. 'running. Use `aphlict restart` to restart it.')); } if (posix_getuid() == 0) { throw new PhutilArgumentUsageException( pht( // TODO: Update this message after a while. 'The notification server should not be run as root. It no '. 'longer requires access to privileged ports.')); } // Make sure we can write to the PID file. - if (!$debug) { + if (!$this->debug) { Filesystem::writeFile($this->getPIDPath(), ''); } // First, start the server in configuration test mode with --test. This // will let us error explicitly if there are missing modules, before we // fork and lose access to the console. - $test_argv = $this->getServerArgv($debug); + $test_argv = $this->getServerArgv(); $test_argv[] = '--test=true'; execx( '%s %s %Ls', $this->getNodeBinary(), $this->getAphlictScriptPath(), $test_argv); } - private function getServerArgv($debug) { + private function getServerArgv() { $ssl_key = PhabricatorEnv::getEnvConfig('notification.ssl-key'); $ssl_cert = PhabricatorEnv::getEnvConfig('notification.ssl-cert'); $server_uri = PhabricatorEnv::getEnvConfig('notification.server-uri'); $server_uri = new PhutilURI($server_uri); $client_uri = PhabricatorEnv::getEnvConfig('notification.client-uri'); $client_uri = new PhutilURI($client_uri); $log = PhabricatorEnv::getEnvConfig('notification.log'); $server_argv = array(); - $server_argv[] = '--port='.$client_uri->getPort(); - $server_argv[] = '--admin='.$server_uri->getPort(); - $server_argv[] = '--host='.$server_uri->getDomain(); + $server_argv[] = '--client-port='.$client_uri->getPort(); + $server_argv[] = '--admin-port='.$server_uri->getPort(); + $server_argv[] = '--admin-host='.$server_uri->getDomain(); if ($ssl_key) { $server_argv[] = '--ssl-key='.$ssl_key; } if ($ssl_cert) { $server_argv[] = '--ssl-cert='.$ssl_cert; } - if (!$debug) { + if (!$this->debug) { $server_argv[] = '--log='.$log; } + if ($this->clientHost) { + $server_argv[] = '--client-host='.$this->clientHost; + } + return $server_argv; } private function getAphlictScriptPath() { $root = dirname(phutil_get_library_root('phabricator')); return $root.'/support/aphlict/server/aphlict_server.js'; } - final protected function launch($debug = false) { + final protected function launch() { $console = PhutilConsole::getConsole(); - if ($debug) { + if ($this->debug) { $console->writeOut(pht("Starting Aphlict server in foreground...\n")); } else { Filesystem::writeFile($this->getPIDPath(), getmypid()); } $command = csprintf( '%s %s %Ls', $this->getNodeBinary(), $this->getAphlictScriptPath(), - $this->getServerArgv($debug)); + $this->getServerArgv()); - if (!$debug) { + if (!$this->debug) { declare(ticks = 1); pcntl_signal(SIGINT, array($this, 'cleanup')); pcntl_signal(SIGTERM, array($this, 'cleanup')); } register_shutdown_function(array($this, 'cleanup')); - if ($debug) { + if ($this->debug) { $console->writeOut("Launching server:\n\n $ ".$command."\n\n"); $err = phutil_passthru('%C', $command); $console->writeOut(">>> Server exited!\n"); exit($err); } else { while (true) { global $g_future; $g_future = new ExecFuture('exec %C', $command); $g_future->resolve(); // If the server exited, wait a couple of seconds and restart it. unset($g_future); sleep(2); } } } /* -( Commands )----------------------------------------------------------- */ final protected function executeStartCommand() { $console = PhutilConsole::getConsole(); $this->willLaunch(); $pid = pcntl_fork(); if ($pid < 0) { throw new Exception('Failed to fork()!'); } else if ($pid) { $console->writeErr(pht("Aphlict Server started.\n")); exit(0); } // When we fork, the child process will inherit its parent's set of open // file descriptors. If the parent process of bin/aphlict is waiting for // bin/aphlict's file descriptors to close, it will be stuck waiting on // the daemonized process. (This happens if e.g. bin/aphlict is started // in another script using passthru().) fclose(STDOUT); fclose(STDERR); $this->launch(); return 0; } final protected function executeStopCommand() { $console = PhutilConsole::getConsole(); $pid = $this->getPID(); if (!$pid) { $console->writeErr(pht("Aphlict is not running.\n")); return 0; } $console->writeErr(pht("Stopping Aphlict Server (%s)...\n", $pid)); posix_kill($pid, SIGINT); $start = time(); do { if (!PhabricatorDaemonReference::isProcessRunning($pid)) { $console->writeOut( "%s\n", pht('Aphlict Server (%s) exited normally.', $pid)); $pid = null; break; } usleep(100000); } while (time() < $start + 5); if ($pid) { $console->writeErr(pht('Sending %s a SIGKILL.', $pid)."\n"); posix_kill($pid, SIGKILL); unset($pid); } Filesystem::remove($this->getPIDPath()); return 0; } private function getNodeBinary() { if (Filesystem::binaryExists('nodejs')) { return 'nodejs'; } if (Filesystem::binaryExists('node')) { return 'node'; } throw new PhutilArgumentUsageException( pht( 'No `nodejs` or `node` binary was found in $PATH. You must install '. 'Node.js to start the Aphlict server.')); } } diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js index d4c94ca83..9649c50c8 100644 --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -1,239 +1,245 @@ var JX = require('./lib/javelin').JX; var http = require('http'); var https = require('https'); var util = require('util'); var fs = require('fs'); JX.require('lib/AphlictListenerList', __dirname); JX.require('lib/AphlictLog', __dirname); function parse_command_line_arguments(argv) { var config = { - port: 22280, - admin: 22281, - host: '127.0.0.1', + 'client-port': 22280, + 'admin-port': 22281, + 'client-host': '0.0.0.0', + 'admin-host': '127.0.0.1', log: '/var/log/aphlict.log', 'ssl-key': null, 'ssl-cert': null, test: false }; for (var ii = 2; ii < argv.length; ii++) { var arg = argv[ii]; var matches = arg.match(/^--([^=]+)=(.*)$/); if (!matches) { throw new Error("Unknown argument '" + arg + "'!"); } if (!(matches[1] in config)) { throw new Error("Unknown argument '" + matches[1] + "'!"); } config[matches[1]] = matches[2]; } - config.port = parseInt(config.port, 10); - config.admin = parseInt(config.admin, 10); + config['client-port'] = parseInt(config['client-port'], 10); + config['admin-port'] = parseInt(config['admin-port'], 10); return config; } var debug = new JX.AphlictLog() .addConsole(console); var config = parse_command_line_arguments(process.argv); process.on('uncaughtException', function(err) { debug.log('\n<<< UNCAUGHT EXCEPTION! >>>\n' + err.stack); process.exit(1); }); var WebSocket; try { WebSocket = require('ws'); } catch (ex) { throw new Error( 'You need to install the Node.js "ws" module for websocket support. ' + 'See "Notifications User Guide: Setup and Configuration" in the ' + 'documentation for instructions. ' + ex.toString()); } var ssl_config = { enabled: (config['ssl-key'] || config['ssl-cert']) }; // Load the SSL certificates (if any were provided) now, so that runs with // `--test` will see any errors. if (ssl_config.enabled) { ssl_config.key = fs.readFileSync(config['ssl-key']); ssl_config.cert = fs.readFileSync(config['ssl-cert']); } // Add the logfile so we'll fail if we can't write to it. if (config.logfile) { debug.addLogfile(config.logfile); } // If we're just doing a configuration test, exit here before starting any // servers. if (config.test) { debug.log('Configuration test OK.'); process.exit(0); } var start_time = new Date().getTime(); var messages_out = 0; var messages_in = 0; var clients = new JX.AphlictListenerList(); function https_discard_handler(req, res) { res.writeHead(501); res.end('HTTP/501 Use Websockets\n'); } var ws; if (ssl_config.enabled) { var https_server = https.createServer({ key: ssl_config.key, cert: ssl_config.cert - }, https_discard_handler).listen(config.port); + }, https_discard_handler).listen( + config['client-port'], + config['client-host']); ws = new WebSocket.Server({server: https_server}); } else { - ws = new WebSocket.Server({port: config.port}); + ws = new WebSocket.Server({ + port: config['client-port'], + host: config['client-host'], + }); } ws.on('connection', function(ws) { var listener = clients.addListener(ws); function log() { debug.log( util.format('<%s>', listener.getDescription()) + ' ' + util.format.apply(null, arguments)); } log('Connected from %s.', ws._socket.remoteAddress); ws.on('message', function(data) { log('Received message: %s', data); var message; try { message = JSON.parse(data); } catch (err) { log('Message is invalid: %s', err.message); return; } switch (message.command) { case 'subscribe': log( 'Subscribed to: %s', JSON.stringify(message.data)); listener.subscribe(message.data); break; case 'unsubscribe': log( 'Unsubscribed from: %s', JSON.stringify(message.data)); listener.unsubscribe(message.data); break; default: log('Unrecognized command "%s".', message.command || ''); } }); ws.on('close', function() { clients.removeListener(listener); log('Disconnected.'); }); ws.on('error', function(err) { log('Error: %s', err.message); }); }); function transmit(msg) { var listeners = clients.getListeners().filter(function(client) { return client.isSubscribedToAny(msg.subscribers); }); for (var i = 0; i < listeners.length; i++) { var listener = listeners[i]; try { listener.writeMessage(msg); ++messages_out; debug.log('<%s> Wrote Message', listener.getDescription()); } catch (error) { clients.removeListener(listener); debug.log('<%s> Write Error: %s', listener.getDescription(), error); } } } http.createServer(function(request, response) { // Publishing a notification. if (request.url == '/') { if (request.method == 'POST') { var body = ''; request.on('data', function(data) { body += data; }); request.on('end', function() { try { var msg = JSON.parse(body); debug.log('Received notification: ' + JSON.stringify(msg)); ++messages_in; try { transmit(msg); response.writeHead(200, {'Content-Type': 'text/plain'}); } catch (err) { debug.log( '<%s> Internal Server Error! %s', request.socket.remoteAddress, err); response.writeHead(500, 'Internal Server Error'); } } catch (err) { debug.log( '<%s> Bad Request! %s', request.socket.remoteAddress, err); response.writeHead(400, 'Bad Request'); } finally { response.end(); } }); } else { response.writeHead(405, 'Method Not Allowed'); response.end(); } } else if (request.url == '/status/') { var status = { 'uptime': (new Date().getTime() - start_time), 'clients.active': clients.getActiveListenerCount(), 'clients.total': clients.getTotalListenerCount(), 'messages.in': messages_in, 'messages.out': messages_out, 'log': config.log, 'version': 6 }; response.writeHead(200, {'Content-Type': 'application/json'}); response.write(JSON.stringify(status)); response.end(); } else { response.writeHead(404, 'Not Found'); response.end(); } -}).listen(config.admin, config.host); +}).listen(config['admin-port'], config['admin-host']); debug.log('Started Server (PID %d)', process.pid);