diff --git a/src/applications/config/check/PhabricatorMySQLSetupCheck.php b/src/applications/config/check/PhabricatorMySQLSetupCheck.php
index 053f31394..44d9cb3cd 100644
--- a/src/applications/config/check/PhabricatorMySQLSetupCheck.php
+++ b/src/applications/config/check/PhabricatorMySQLSetupCheck.php
@@ -1,320 +1,340 @@
 <?php
 
 final class PhabricatorMySQLSetupCheck extends PhabricatorSetupCheck {
 
   public function getDefaultGroup() {
     return self::GROUP_MYSQL;
   }
 
   public static function loadRawConfigValue($key) {
     $conn_raw = id(new PhabricatorUser())->establishConnection('w');
 
     try {
       $value = queryfx_one($conn_raw, 'SELECT @@%Q', $key);
       $value = $value['@@'.$key];
     } catch (AphrontQueryException $ex) {
       $value = null;
     }
 
     return $value;
   }
 
   protected function executeChecks() {
     $max_allowed_packet = self::loadRawConfigValue('max_allowed_packet');
     $recommended_minimum = 1024 * 1024;
     if ($max_allowed_packet < $recommended_minimum) {
       $message = pht(
         "MySQL is configured with a very small 'max_allowed_packet' (%d), ".
         "which may cause some large writes to fail. Strongly consider raising ".
         "this to at least %d in your MySQL configuration.",
         $max_allowed_packet,
         $recommended_minimum);
 
       $this->newIssue('mysql.max_allowed_packet')
         ->setName(pht('Small MySQL "max_allowed_packet"'))
         ->setMessage($message)
         ->addMySQLConfig('max_allowed_packet');
     }
 
     $modes = self::loadRawConfigValue('sql_mode');
     $modes = explode(',', $modes);
 
     if (!in_array('STRICT_ALL_TABLES', $modes)) {
       $summary = pht(
         'MySQL is not in strict mode, but using strict mode is strongly '.
         'encouraged.');
 
       $message = pht(
         "On your MySQL instance, the global %s is not set to %s. ".
         "It is strongly encouraged that you enable this mode when running ".
         "Phabricator.\n\n".
         "By default MySQL will silently ignore some types of errors, which ".
         "can cause data loss and raise security concerns. Enabling strict ".
         "mode makes MySQL raise an explicit error instead, and prevents this ".
         "entire class of problems from doing any damage.\n\n".
         "You can find more information about this mode (and how to configure ".
         "it) in the MySQL manual. Usually, it is sufficient to add this to ".
         "your %s file (in the %s section) and then restart %s:\n\n".
         "%s\n".
         "(Note that if you run other applications against the same database, ".
         "they may not work in strict mode. Be careful about enabling it in ".
         "these cases.)",
         phutil_tag('tt', array(), 'sql_mode'),
         phutil_tag('tt', array(), 'STRICT_ALL_TABLES'),
         phutil_tag('tt', array(), 'my.cnf'),
         phutil_tag('tt', array(), '[mysqld]'),
         phutil_tag('tt', array(), 'mysqld'),
         phutil_tag('pre', array(), 'sql_mode=STRICT_ALL_TABLES'));
 
       $this->newIssue('mysql.mode')
         ->setName(pht('MySQL STRICT_ALL_TABLES Mode Not Set'))
         ->setSummary($summary)
         ->setMessage($message)
         ->addMySQLConfig('sql_mode');
     }
     if (in_array('ONLY_FULL_GROUP_BY', $modes)) {
       $summary = pht(
         'MySQL is in ONLY_FULL_GROUP_BY mode, but using this mode is strongly '.
         'discouraged.');
 
       $message = pht(
         "On your MySQL instance, the global %s is set to %s. ".
         "It is strongly encouraged that you disable this mode when running ".
         "Phabricator.\n\n".
         "With %s enabled, MySQL rejects queries for which the select list ".
         "or (as of MySQL 5.0.23) %s list refer to nonaggregated columns ".
         "that are not named in the %s clause. More importantly, Phabricator ".
         "does not work properly with this mode enabled.\n\n".
         "You can find more information about this mode (and how to configure ".
         "it) in the MySQL manual. Usually, it is sufficient to change the %s ".
         "in your %s file (in the %s section) and then restart %s:\n\n".
         "%s\n".
         "(Note that if you run other applications against the same database, ".
         "they may not work with %s. Be careful about enabling ".
         "it in these cases and consider migrating Phabricator to a different ".
         "database.)",
         phutil_tag('tt', array(), 'sql_mode'),
         phutil_tag('tt', array(), 'ONLY_FULL_GROUP_BY'),
         phutil_tag('tt', array(), 'ONLY_FULL_GROUP_BY'),
         phutil_tag('tt', array(), 'HAVING'),
         phutil_tag('tt', array(), 'GROUP BY'),
         phutil_tag('tt', array(), 'sql_mode'),
         phutil_tag('tt', array(), 'my.cnf'),
         phutil_tag('tt', array(), '[mysqld]'),
         phutil_tag('tt', array(), 'mysqld'),
         phutil_tag('pre', array(), 'sql_mode=STRICT_ALL_TABLES'),
         phutil_tag('tt', array(), 'ONLY_FULL_GROUP_BY'));
 
       $this->newIssue('mysql.mode')
         ->setName(pht('MySQL ONLY_FULL_GROUP_BY Mode Set'))
         ->setSummary($summary)
         ->setMessage($message)
         ->addMySQLConfig('sql_mode');
     }
 
     $stopword_file = self::loadRawConfigValue('ft_stopword_file');
     if (!PhabricatorDefaultSearchEngineSelector::shouldUseElasticSearch()) {
       if ($stopword_file === null) {
         $summary = pht(
           'Your version of MySQL does not support configuration of a '.
           'stopword file. You will not be able to find search results for '.
           'common words.');
 
         $message = pht(
           "Your MySQL instance does not support the %s option. You will not ".
           "be able to find search results for common words. You can gain ".
           "access to this option by upgrading MySQL to a more recent ".
           "version.\n\n".
           "You can ignore this warning if you plan to configure ElasticSearch ".
           "later, or aren't concerned about searching for common words.",
           phutil_tag('tt', array(), 'ft_stopword_file'));
 
         $this->newIssue('mysql.ft_stopword_file')
           ->setName(pht('MySQL ft_stopword_file Not Supported'))
           ->setSummary($summary)
           ->setMessage($message)
           ->addMySQLConfig('ft_stopword_file');
 
       } else if ($stopword_file == '(built-in)') {
         $root = dirname(phutil_get_library_root('phabricator'));
         $stopword_path = $root.'/resources/sql/stopwords.txt';
         $stopword_path = Filesystem::resolvePath($stopword_path);
 
         $namespace = PhabricatorEnv::getEnvConfig('storage.default-namespace');
 
         $summary = pht(
           'MySQL is using a default stopword file, which will prevent '.
           'searching for many common words.');
 
         $message = pht(
           "Your MySQL instance is using the builtin stopword file for ".
           "building search indexes. This can make Phabricator's search ".
           "feature less useful.\n\n".
           "Stopwords are common words which are not indexed and thus can not ".
           "be searched for. The default stopword file has about 500 words, ".
           "including various words which you are likely to wish to search ".
           "for, such as 'various', 'likely', 'wish', and 'zero'.\n\n".
           "To make search more useful, you can use an alternate stopword ".
           "file with fewer words. Alternatively, if you aren't concerned ".
           "about searching for common words, you can ignore this warning. ".
           "If you later plan to configure ElasticSearch, you can also ignore ".
           "this warning: this stopword file only affects MySQL fulltext ".
           "indexes.\n\n".
           "To choose a different stopword file, add this to your %s file ".
           "(in the %s section) and then restart %s:\n\n".
           "%s\n".
           "(You can also use a different file if you prefer. The file ".
           "suggested above has about 50 of the most common English words.)\n\n".
           "Finally, run this command to rebuild indexes using the new ".
           "rules:\n\n".
           "%s",
           phutil_tag('tt', array(), 'my.cnf'),
           phutil_tag('tt', array(), '[mysqld]'),
           phutil_tag('tt', array(), 'mysqld'),
           phutil_tag('pre', array(), 'ft_stopword_file='.$stopword_path),
           phutil_tag(
             'pre',
             array(),
             "mysql> REPAIR TABLE {$namespace}_search.search_documentfield;"));
 
         $this->newIssue('mysql.ft_stopword_file')
           ->setName(pht('MySQL is Using Default Stopword File'))
           ->setSummary($summary)
           ->setMessage($message)
           ->addMySQLConfig('ft_stopword_file');
       }
     }
 
     $min_len = self::loadRawConfigValue('ft_min_word_len');
     if ($min_len >= 4) {
       if (!PhabricatorDefaultSearchEngineSelector::shouldUseElasticSearch()) {
         $namespace = PhabricatorEnv::getEnvConfig('storage.default-namespace');
 
         $summary = pht(
           'MySQL is configured to only index words with at least %d '.
           'characters.',
           $min_len);
 
         $message = pht(
           "Your MySQL instance is configured to use the default minimum word ".
           "length when building search indexes, which is 4. This means words ".
           "which are only 3 characters long will not be indexed and can not ".
           "be searched for.\n\n".
           "For example, you will not be able to find search results for words ".
           "like 'SMS', 'web', or 'DOS'.\n\n".
           "You can change this setting to 3 to allow these words to be ".
           "indexed. Alternatively, you can ignore this warning if you are ".
           "not concerned about searching for 3-letter words. If you later ".
           "plan to configure ElasticSearch, you can also ignore this warning: ".
           "only MySQL fulltext search is affected.\n\n".
           "To reduce the minimum word length to 3, add this to your %s file ".
           "(in the %s section) and then restart %s:\n\n".
           "%s\n".
           "Finally, run this command to rebuild indexes using the new ".
           "rules:\n\n".
           "%s",
           phutil_tag('tt', array(), 'my.cnf'),
           phutil_tag('tt', array(), '[mysqld]'),
           phutil_tag('tt', array(), 'mysqld'),
           phutil_tag('pre', array(), 'ft_min_word_len=3'),
           phutil_tag(
             'pre',
             array(),
             "mysql> REPAIR TABLE {$namespace}_search.search_documentfield;"));
 
         $this->newIssue('mysql.ft_min_word_len')
           ->setName(pht('MySQL is Using Default Minimum Word Length'))
           ->setSummary($summary)
           ->setMessage($message)
           ->addMySQLConfig('ft_min_word_len');
       }
     }
 
     $bool_syntax = self::loadRawConfigValue('ft_boolean_syntax');
     if ($bool_syntax != ' |-><()~*:""&^') {
       if (!PhabricatorDefaultSearchEngineSelector::shouldUseElasticSearch()) {
 
         $summary = pht(
           'MySQL is configured to search on fulltext indexes using "OR" by '.
           'default. Using "AND" is usually the desired behaviour.');
 
         $message = pht(
           "Your MySQL instance is configured to use the default Boolean ".
           "search syntax when using fulltext indexes. This means searching ".
           "for 'search words' will yield the query 'search OR words' ".
           "instead of the desired 'search AND words'.\n\n".
           "This might produce unexpected search results. \n\n".
           "You can change this setting to a more sensible default. ".
           "Alternatively, you can ignore this warning if ".
           "using 'OR' is the desired behaviour. If you later plan ".
           "to configure ElasticSearch, you can also ignore this warning: ".
           "only MySQL fulltext search is affected.\n\n".
           "To change this setting, add this to your %s file ".
           "(in the %s section) and then restart %s:\n\n".
           "%s\n",
           phutil_tag('tt', array(), 'my.cnf'),
           phutil_tag('tt', array(), '[mysqld]'),
           phutil_tag('tt', array(), 'mysqld'),
           phutil_tag('pre', array(), 'ft_boolean_syntax=\' |-><()~*:""&^\''));
 
         $this->newIssue('mysql.ft_boolean_syntax')
           ->setName(pht('MySQL is Using the Default Boolean Syntax'))
           ->setSummary($summary)
           ->setMessage($message)
           ->addMySQLConfig('ft_boolean_syntax');
       }
     }
 
     $innodb_pool = self::loadRawConfigValue('innodb_buffer_pool_size');
     $innodb_bytes = phutil_parse_bytes($innodb_pool);
     $innodb_readable = phutil_format_bytes($innodb_bytes);
 
     // This is arbitrary and just trying to detect values that the user
     // probably didn't set themselves. The Mac OS X default is 128MB and
     // 40% of an AWS EC2 Micro instance is 245MB, so keeping it somewhere
     // between those two values seems like a reasonable approximation.
     $minimum_readable = '225MB';
 
     $minimum_bytes = phutil_parse_bytes($minimum_readable);
     if ($innodb_bytes < $minimum_bytes) {
       $summary = pht(
         'MySQL is configured with a very small innodb_buffer_pool_size, '.
         'which may impact performance.');
 
       $message = pht(
         "Your MySQL instance is configured with a very small %s (%s). ".
         "This may cause poor database performance and lock exhaustion.\n\n".
         "There are no hard-and-fast rules to setting an appropriate value, ".
         "but a reasonable starting point for a standard install is something ".
         "like 40%% of the total memory on the machine. For example, if you ".
         "have 4GB of RAM on the machine you have installed Phabricator on, ".
         "you might set this value to %s.\n\n".
         "You can read more about this option in the MySQL documentation to ".
         "help you make a decision about how to configure it for your use ".
         "case. There are no concerns specific to Phabricator which make it ".
         "different from normal workloads with respect to this setting.\n\n".
         "To adjust the setting, add something like this to your %s file (in ".
         "the %s section), replacing %s with an appropriate value for your ".
         "host and use case. Then restart %s:\n\n".
         "%s\n".
         "If you're satisfied with the current setting, you can safely ".
         "ignore this setup warning.",
         phutil_tag('tt', array(), 'innodb_buffer_pool_size'),
         phutil_tag('tt', array(), $innodb_readable),
         phutil_tag('tt', array(), '1600M'),
         phutil_tag('tt', array(), 'my.cnf'),
         phutil_tag('tt', array(), '[mysqld]'),
         phutil_tag('tt', array(), '1600M'),
         phutil_tag('tt', array(), 'mysqld'),
         phutil_tag('pre', array(), 'innodb_buffer_pool_size=1600M'));
 
       $this->newIssue('mysql.innodb_buffer_pool_size')
         ->setName(pht('MySQL May Run Slowly'))
         ->setSummary($summary)
         ->setMessage($message)
         ->addMySQLConfig('innodb_buffer_pool_size');
     }
 
+    $ok = PhabricatorStorageManagementAPI::isCharacterSetAvailableOnConnection(
+      'utf8mb4',
+      id(new PhabricatorUser())->establishConnection('w'));
+    if (!$ok) {
+      $summary = pht(
+        'You are using an old version of MySQL, and should upgrade.');
+
+      $message = pht(
+        'You are using an old version of MySQL which has poor unicode '.
+        'support (it does not support the "utf8mb4" collation set). You will '.
+        'encounter limitations when working with some unicode data.'.
+        "\n\n".
+        'We strongly recommend you upgrade to MySQL 5.5 or newer.');
+
+      $this->newIssue('mysql.utf8mb4')
+        ->setName(pht('Old MySQL Version'))
+        ->setSummary($summary)
+        ->setMessage($message);
+    }
+
   }
 
 }
diff --git a/src/infrastructure/storage/management/PhabricatorStorageManagementAPI.php b/src/infrastructure/storage/management/PhabricatorStorageManagementAPI.php
index 16d4c5674..9244f647a 100644
--- a/src/infrastructure/storage/management/PhabricatorStorageManagementAPI.php
+++ b/src/infrastructure/storage/management/PhabricatorStorageManagementAPI.php
@@ -1,287 +1,300 @@
 <?php
 
 final class PhabricatorStorageManagementAPI {
 
   private $host;
   private $user;
   private $port;
   private $password;
   private $namespace;
   private $conns = array();
   private $disableUTF8MB4;
 
   const CHARSET_DEFAULT = 'CHARSET';
   const CHARSET_FULLTEXT = 'CHARSET_FULLTEXT';
   const COLLATE_TEXT = 'COLLATE_TEXT';
   const COLLATE_SORT = 'COLLATE_SORT';
   const COLLATE_FULLTEXT = 'COLLATE_FULLTEXT';
 
   public function setDisableUTF8MB4($disable_utf8_mb4) {
     $this->disableUTF8MB4 = $disable_utf8_mb4;
     return $this;
   }
 
   public function getDisableUTF8MB4() {
     return $this->disableUTF8MB4;
   }
 
   public function setNamespace($namespace) {
     $this->namespace = $namespace;
     PhabricatorLiskDAO::pushStorageNamespace($namespace);
     return $this;
   }
 
   public function getNamespace() {
     return $this->namespace;
   }
 
   public function setUser($user) {
     $this->user = $user;
     return $this;
   }
 
   public function getUser() {
     return $this->user;
   }
 
   public function setPassword($password) {
     $this->password = $password;
     return $this;
   }
 
   public function getPassword() {
     return $this->password;
   }
 
   public function setHost($host) {
     $this->host = $host;
     return $this;
   }
 
   public function getHost() {
     return $this->host;
   }
 
   public function setPort($port) {
     $this->port = $port;
     return $this;
   }
 
   public function getPort() {
     return $this->port;
   }
 
   public function getDatabaseName($fragment) {
     return $this->namespace.'_'.$fragment;
   }
 
   public function getDatabaseList(array $patches, $only_living = false) {
     assert_instances_of($patches, 'PhabricatorStoragePatch');
 
     $list = array();
 
     foreach ($patches as $patch) {
       if ($patch->getType() == 'db') {
         if ($only_living && $patch->isDead()) {
           continue;
         }
         $list[] = $this->getDatabaseName($patch->getName());
       }
     }
 
     return $list;
   }
 
   public function getConn($fragment) {
     $database = $this->getDatabaseName($fragment);
     $return = &$this->conns[$this->host][$this->user][$database];
     if (!$return) {
       $return = PhabricatorEnv::newObjectFromConfig(
       'mysql.implementation',
       array(
         array(
           'user'      => $this->user,
           'pass'      => $this->password,
           'host'      => $this->host,
           'port'      => $this->port,
           'database'  => $fragment
             ? $database
             : null,
         ),
       ));
     }
     return $return;
   }
 
   public function getAppliedPatches() {
     try {
       $applied = queryfx_all(
         $this->getConn('meta_data'),
         'SELECT patch FROM patch_status');
       return ipull($applied, 'patch');
     } catch (AphrontQueryException $ex) {
       return null;
     }
   }
 
   public function createDatabase($fragment) {
     $info = $this->getCharsetInfo();
 
     queryfx(
       $this->getConn(null),
       'CREATE DATABASE IF NOT EXISTS %T COLLATE %T',
       $this->getDatabaseName($fragment),
       $info[self::COLLATE_TEXT]);
   }
 
   public function createTable($fragment, $table, array $cols) {
     queryfx(
       $this->getConn($fragment),
       'CREATE TABLE IF NOT EXISTS %T.%T (%Q) '.
       'ENGINE=InnoDB, COLLATE utf8_general_ci',
       $this->getDatabaseName($fragment),
       $table,
       implode(', ', $cols));
   }
 
   public function getLegacyPatches(array $patches) {
     assert_instances_of($patches, 'PhabricatorStoragePatch');
 
     try {
       $row = queryfx_one(
         $this->getConn('meta_data'),
         'SELECT version FROM %T',
         'schema_version');
       $version = $row['version'];
     } catch (AphrontQueryException $ex) {
       return array();
     }
 
     $legacy = array();
     foreach ($patches as $key => $patch) {
       if ($patch->getLegacy() !== false && $patch->getLegacy() <= $version) {
         $legacy[] = $key;
       }
     }
 
     return $legacy;
   }
 
   public function markPatchApplied($patch) {
     queryfx(
       $this->getConn('meta_data'),
       'INSERT INTO %T (patch, applied) VALUES (%s, %d)',
       'patch_status',
       $patch,
       time());
   }
 
   public function applyPatch(PhabricatorStoragePatch $patch) {
     $type = $patch->getType();
     $name = $patch->getName();
     switch ($type) {
       case 'db':
         $this->createDatabase($name);
         break;
       case 'sql':
         $this->applyPatchSQL($name);
         break;
       case 'php':
         $this->applyPatchPHP($name);
         break;
       default:
         throw new Exception("Unable to apply patch of type '{$type}'.");
     }
   }
 
   public function applyPatchSQL($sql) {
     $sql = Filesystem::readFile($sql);
     $queries = preg_split('/;\s+/', $sql);
     $queries = array_filter($queries);
 
     $conn = $this->getConn(null);
 
     $charset_info = $this->getCharsetInfo();
     foreach ($charset_info as $key => $value) {
       $charset_info[$key] = qsprintf($conn, '%T', $value);
     }
 
     foreach ($queries as $query) {
       $query = str_replace('{$NAMESPACE}', $this->namespace, $query);
 
       foreach ($charset_info as $key => $value) {
         $query = str_replace('{$'.$key.'}', $value, $query);
       }
 
       queryfx(
         $conn,
         '%Q',
         $query);
     }
   }
 
   public function applyPatchPHP($script) {
     $schema_conn = $this->getConn(null);
     require_once $script;
   }
 
   public function isCharacterSetAvailable($character_set) {
     if ($character_set == 'utf8mb4') {
       if ($this->getDisableUTF8MB4()) {
         return false;
       }
     }
 
     $conn = $this->getConn(null);
+    return self::isCharacterSetAvailableOnConnection($character_set, $conn);
+  }
 
+  public static function isCharacterSetAvailableOnConnection(
+    $character_set,
+    AphrontDatabaseConnection $conn) {
     $result = queryfx_one(
       $conn,
       'SELECT CHARACTER_SET_NAME FROM INFORMATION_SCHEMA.CHARACTER_SETS
         WHERE CHARACTER_SET_NAME = %s',
       $character_set);
 
     return (bool)$result;
   }
 
   public function getCharsetInfo() {
     if ($this->isCharacterSetAvailable('utf8mb4')) {
       // If utf8mb4 is available, we use it with the utf8mb4_unicode_ci
       // collation. This is most correct, and will sort properly.
 
       $charset = 'utf8mb4';
       $charset_full = 'utf8mb4';
       $collate_text = 'utf8mb4_bin';
       $collate_sort = 'utf8mb4_unicode_ci';
       $collate_full = 'utf8mb4_unicode_ci';
     } else {
-      // If utf8mb4 is not available, we use binary. This allows us to store
-      // 4-byte unicode characters. This has some tradeoffs:
-      //
-      // Unicode characters won't sort correctly. There's nothing we can do
-      // about this while still supporting 4-byte characters.
+      // If utf8mb4 is not available, we use binary for most data. This allows
+      // us to store 4-byte unicode characters.
       //
       // It's possible that strings will be truncated in the middle of a
       // character on insert. We encourage users to set STRICT_ALL_TABLES
       // to prevent this.
       //
-      // There's no valid collation we can use to get a fulltext index on
-      // 4-byte unicode characters: we can't add a fulltext key to a binary
-      // column.
+      // For "fulltext" and "sort" columns, we don't use binary.
+      //
+      // With "fulltext", we can not use binary because MySQL won't let us.
+      // We use 3-byte utf8 instead and accept being unable to index 4-byte
+      // characters.
+      //
+      // With "sort", if we use binary we lose case insensitivity (for
+      // example, "ALincoln@logcabin.com" and "alincoln@logcabin.com" would no
+      // longer be identified as the same email address). This can be very
+      // confusing and is far worse overall than not supporting 4-byte unicode
+      // characters, so we use 3-byte utf8 and accept limited 4-byte support as
+      // a tradeoff to get sensible collation behavior. Many columns where
+      // collation is important rarely contain 4-byte characters anyway, so we
+      // are not giving up too much.
 
       $charset = 'binary';
       $charset_full = 'utf8';
       $collate_text = 'binary';
-      $collate_sort = 'binary';
+      $collate_sort = 'utf8_general_ci';
       $collate_full = 'utf8_general_ci';
     }
 
     return array(
       self::CHARSET_DEFAULT => $charset,
       self::CHARSET_FULLTEXT => $charset_full,
       self::COLLATE_TEXT => $collate_text,
       self::COLLATE_SORT => $collate_sort,
       self::COLLATE_FULLTEXT => $collate_full,
     );
   }
 
 }
diff --git a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
index 35a02bff3..f8da37961 100644
--- a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
+++ b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
@@ -1,681 +1,683 @@
 <?php
 
 abstract class PhabricatorStorageManagementWorkflow
   extends PhabricatorManagementWorkflow {
 
   private $patches;
   private $api;
 
   public function setPatches(array $patches) {
     assert_instances_of($patches, 'PhabricatorStoragePatch');
     $this->patches = $patches;
     return $this;
   }
 
   public function getPatches() {
     return $this->patches;
   }
 
   final public function setAPI(PhabricatorStorageManagementAPI $api) {
     $this->api = $api;
     return $this;
   }
 
   final public function getAPI() {
     return $this->api;
   }
 
   private function loadSchemata() {
     $query = id(new PhabricatorConfigSchemaQuery())
       ->setAPI($this->getAPI());
 
     $actual = $query->loadActualSchema();
     $expect = $query->loadExpectedSchema();
     $comp = $query->buildComparisonSchema($expect, $actual);
 
     return array($comp, $expect, $actual);
   }
 
   protected function adjustSchemata($force, $unsafe, $dry_run) {
     $console = PhutilConsole::getConsole();
 
     $console->writeOut(
       "%s\n",
       pht('Verifying database schemata...'));
 
     list($adjustments, $errors) = $this->findAdjustments();
     $api = $this->getAPI();
 
     if (!$adjustments) {
       $console->writeOut(
         "%s\n",
         pht('Found no adjustments for schemata.'));
 
       return $this->printErrors($errors, 0);
     }
 
     if (!$force && !$api->isCharacterSetAvailable('utf8mb4')) {
       $message = pht(
         "You have an old version of MySQL (older than 5.5) which does not ".
-        "support the utf8mb4 character set. If you apply adjustments now ".
-        "and later update MySQL to 5.5 or newer, you'll need to apply ".
-        "adjustments again (and they will take a long time).\n\n".
+        "support the utf8mb4 character set. We strongly recomend upgrading to ".
+        "5.5 or newer.\n\n".
+        "If you apply adjustments now and later update MySQL to 5.5 or newer, ".
+        "you'll need to apply adjustments again (and they will take a long ".
+        "time).\n\n".
         "You can exit this workflow, update MySQL now, and then run this ".
         "workflow again. This is recommended, but may cause a lot of downtime ".
         "right now.\n\n".
         "You can exit this workflow, continue using Phabricator without ".
         "applying adjustments, update MySQL at a later date, and then run ".
         "this workflow again. This is also a good approach, and will let you ".
         "delay downtime until later.\n\n".
         "You can proceed with this workflow, and then optionally update ".
         "MySQL at a later date. After you do, you'll need to apply ".
         "adjustments again.\n\n".
         "For more information, see \"Managing Storage Adjustments\" in ".
         "the documentation.");
 
       $console->writeOut(
         "\n**<bg:yellow> %s </bg>**\n\n%s\n",
         pht('OLD MySQL VERSION'),
         phutil_console_wrap($message));
 
       $prompt = pht('Continue with old MySQL version?');
       if (!phutil_console_confirm($prompt, $default_no = true)) {
         return;
       }
     }
 
     $table = id(new PhutilConsoleTable())
       ->addColumn('database', array('title' => pht('Database')))
       ->addColumn('table', array('title' => pht('Table')))
       ->addColumn('name', array('title' => pht('Name')))
       ->addColumn('info', array('title' => pht('Issues')));
 
     foreach ($adjustments as $adjust) {
       $info = array();
       foreach ($adjust['issues'] as $issue) {
         $info[] = PhabricatorConfigStorageSchema::getIssueName($issue);
       }
 
       $table->addRow(array(
         'database' => $adjust['database'],
         'table' => idx($adjust, 'table'),
         'name' => idx($adjust, 'name'),
         'info' => implode(', ', $info),
       ));
     }
 
     $console->writeOut("\n\n");
 
     $table->draw();
 
     if ($dry_run) {
       $console->writeOut(
         "%s\n",
         pht('DRYRUN: Would apply adjustments.'));
       return 0;
     } else if (!$force) {
       $console->writeOut(
         "\n%s\n",
         pht(
           "Found %s issues(s) with schemata, detailed above.\n\n".
           "You can review issues in more detail from the web interface, ".
           "in Config > Database Status. To better understand the adjustment ".
           "workflow, see \"Managing Storage Adjustments\" in the ".
           "documentation.\n\n".
           "MySQL needs to copy table data to make some adjustments, so these ".
           "migrations may take some time.",
           new PhutilNumber(count($adjustments))));
 
       $prompt = pht('Fix these schema issues?');
       if (!phutil_console_confirm($prompt, $default_no = true)) {
         return 1;
       }
     }
 
     $console->writeOut(
       "%s\n",
       pht('Fixing schema issues...'));
 
     $conn = $api->getConn(null);
 
     if ($unsafe) {
       queryfx($conn, 'SET SESSION sql_mode = %s', '');
     } else {
       queryfx($conn, 'SET SESSION sql_mode = %s', 'STRICT_ALL_TABLES');
     }
 
     $failed = array();
 
     // We make changes in several phases.
     $phases = array(
       // Drop surplus autoincrements. This allows us to drop primary keys on
       // autoincrement columns.
       'drop_auto',
 
       // Drop all keys we're going to adjust. This prevents them from
       // interfering with column changes.
       'drop_keys',
 
       // Apply all database, table, and column changes.
       'main',
 
       // Restore adjusted keys.
       'add_keys',
 
       // Add missing autoincrements.
       'add_auto',
     );
 
     $bar = id(new PhutilConsoleProgressBar())
       ->setTotal(count($adjustments) * count($phases));
 
     foreach ($phases as $phase) {
       foreach ($adjustments as $adjust) {
         try {
           switch ($adjust['kind']) {
             case 'database':
               if ($phase == 'main') {
                 queryfx(
                   $conn,
                   'ALTER DATABASE %T CHARACTER SET = %s COLLATE = %s',
                   $adjust['database'],
                   $adjust['charset'],
                   $adjust['collation']);
               }
               break;
             case 'table':
               if ($phase == 'main') {
                 queryfx(
                   $conn,
                   'ALTER TABLE %T.%T COLLATE = %s',
                   $adjust['database'],
                   $adjust['table'],
                   $adjust['collation']);
               }
               break;
             case 'column':
               $apply = false;
               $auto = false;
               $new_auto = idx($adjust, 'auto');
               if ($phase == 'drop_auto') {
                 if ($new_auto === false) {
                   $apply = true;
                   $auto = false;
                 }
               } else if ($phase == 'main') {
                 $apply = true;
                 if ($new_auto === false) {
                   $auto = false;
                 } else {
                   $auto = $adjust['is_auto'];
                 }
               } else if ($phase == 'add_auto') {
                 if ($new_auto === true) {
                   $apply = true;
                   $auto = true;
                 }
               }
 
               if ($apply) {
                 $parts = array();
 
                 if ($auto) {
                   $parts[] = qsprintf(
                     $conn,
                     'AUTO_INCREMENT');
                 }
 
                 if ($adjust['charset']) {
                   $parts[] = qsprintf(
                     $conn,
                     'CHARACTER SET %Q COLLATE %Q',
                     $adjust['charset'],
                     $adjust['collation']);
                 }
 
                 queryfx(
                   $conn,
                   'ALTER TABLE %T.%T MODIFY %T %Q %Q %Q',
                   $adjust['database'],
                   $adjust['table'],
                   $adjust['name'],
                   $adjust['type'],
                   implode(' ', $parts),
                   $adjust['nullable'] ? 'NULL' : 'NOT NULL');
               }
               break;
             case 'key':
               if (($phase == 'drop_keys') && $adjust['exists']) {
                 if ($adjust['name'] == 'PRIMARY') {
                   $key_name = 'PRIMARY KEY';
                 } else {
                   $key_name = qsprintf($conn, 'KEY %T', $adjust['name']);
                 }
 
                 queryfx(
                   $conn,
                   'ALTER TABLE %T.%T DROP %Q',
                   $adjust['database'],
                   $adjust['table'],
                   $key_name);
               }
 
               if (($phase == 'add_keys') && $adjust['keep']) {
                 // Different keys need different creation syntax. Notable
                 // special cases are primary keys and fulltext keys.
                 if ($adjust['name'] == 'PRIMARY') {
                   $key_name = 'PRIMARY KEY';
                 } else if ($adjust['indexType'] == 'FULLTEXT') {
                   $key_name = qsprintf($conn, 'FULLTEXT %T', $adjust['name']);
                 } else {
                   if ($adjust['unique']) {
                     $key_name = qsprintf(
                       $conn,
                       'UNIQUE KEY %T',
                       $adjust['name']);
                   } else {
                     $key_name = qsprintf(
                       $conn,
                       '/* NONUNIQUE */ KEY %T',
                       $adjust['name']);
                   }
                 }
 
                 queryfx(
                   $conn,
                   'ALTER TABLE %T.%T ADD %Q (%Q)',
                   $adjust['database'],
                   $adjust['table'],
                   $key_name,
                   implode(', ', $adjust['columns']));
               }
               break;
             default:
               throw new Exception(
                 pht('Unknown schema adjustment kind "%s"!', $adjust['kind']));
           }
         } catch (AphrontQueryException $ex) {
           $failed[] = array($adjust, $ex);
         }
         $bar->update(1);
       }
     }
     $bar->done();
 
     if (!$failed) {
       $console->writeOut(
         "%s\n",
         pht('Completed fixing all schema issues.'));
 
       $err = 0;
     } else {
       $table = id(new PhutilConsoleTable())
         ->addColumn('target', array('title' => pht('Target')))
         ->addColumn('error', array('title' => pht('Error')));
 
       foreach ($failed as $failure) {
         list($adjust, $ex) = $failure;
 
         $pieces = array_select_keys(
           $adjust,
           array('database', 'table', 'name'));
         $pieces = array_filter($pieces);
         $target = implode('.', $pieces);
 
         $table->addRow(
           array(
             'target' => $target,
             'error' => $ex->getMessage(),
           ));
       }
 
       $console->writeOut("\n");
       $table->draw();
       $console->writeOut(
         "\n%s\n",
         pht('Failed to make some schema adjustments, detailed above.'));
       $console->writeOut(
         "%s\n",
         pht(
           'For help troubleshooting adjustments, see "Managing Storage '.
           'Adjustments" in the documentation.'));
 
       $err = 1;
     }
 
     return $this->printErrors($errors, $err);
   }
 
   private function findAdjustments() {
     list($comp, $expect, $actual) = $this->loadSchemata();
 
     $issue_charset = PhabricatorConfigStorageSchema::ISSUE_CHARSET;
     $issue_collation = PhabricatorConfigStorageSchema::ISSUE_COLLATION;
     $issue_columntype = PhabricatorConfigStorageSchema::ISSUE_COLUMNTYPE;
     $issue_surpluskey = PhabricatorConfigStorageSchema::ISSUE_SURPLUSKEY;
     $issue_missingkey = PhabricatorConfigStorageSchema::ISSUE_MISSINGKEY;
     $issue_columns = PhabricatorConfigStorageSchema::ISSUE_KEYCOLUMNS;
     $issue_unique = PhabricatorConfigStorageSchema::ISSUE_UNIQUE;
     $issue_longkey = PhabricatorConfigStorageSchema::ISSUE_LONGKEY;
     $issue_auto = PhabricatorConfigStorageSchema::ISSUE_AUTOINCREMENT;
 
     $adjustments = array();
     $errors = array();
     foreach ($comp->getDatabases() as $database_name => $database) {
       foreach ($this->findErrors($database) as $issue) {
         $errors[] = array(
           'database' => $database_name,
           'issue' => $issue,
         );
       }
 
       $expect_database = $expect->getDatabase($database_name);
       $actual_database = $actual->getDatabase($database_name);
 
       if (!$expect_database || !$actual_database) {
         // If there's a real issue here, skip this stuff.
         continue;
       }
 
       $issues = array();
       if ($database->hasIssue($issue_charset)) {
         $issues[] = $issue_charset;
       }
       if ($database->hasIssue($issue_collation)) {
         $issues[] = $issue_collation;
       }
 
       if ($issues) {
         $adjustments[] = array(
           'kind' => 'database',
           'database' => $database_name,
           'issues' => $issues,
           'charset' => $expect_database->getCharacterSet(),
           'collation' => $expect_database->getCollation(),
         );
       }
 
       foreach ($database->getTables() as $table_name => $table) {
         foreach ($this->findErrors($table) as $issue) {
           $errors[] = array(
             'database' => $database_name,
             'table' => $table_name,
             'issue' => $issue,
           );
         }
 
         $expect_table = $expect_database->getTable($table_name);
         $actual_table = $actual_database->getTable($table_name);
 
         if (!$expect_table || !$actual_table) {
           continue;
         }
 
         $issues = array();
         if ($table->hasIssue($issue_collation)) {
           $issues[] = $issue_collation;
         }
 
         if ($issues) {
           $adjustments[] = array(
             'kind' => 'table',
             'database' => $database_name,
             'table' => $table_name,
             'issues' => $issues,
             'collation' => $expect_table->getCollation(),
           );
         }
 
         foreach ($table->getColumns() as $column_name => $column) {
           foreach ($this->findErrors($column) as $issue) {
             $errors[] = array(
               'database' => $database_name,
               'table' => $table_name,
               'name' => $column_name,
               'issue' => $issue,
             );
           }
 
           $expect_column = $expect_table->getColumn($column_name);
           $actual_column = $actual_table->getColumn($column_name);
 
           if (!$expect_column || !$actual_column) {
             continue;
           }
 
           $issues = array();
           if ($column->hasIssue($issue_collation)) {
             $issues[] = $issue_collation;
           }
           if ($column->hasIssue($issue_charset)) {
             $issues[] = $issue_charset;
           }
           if ($column->hasIssue($issue_columntype)) {
             $issues[] = $issue_columntype;
           }
           if ($column->hasIssue($issue_auto)) {
             $issues[] = $issue_auto;
           }
 
           if ($issues) {
             if ($expect_column->getCharacterSet() === null) {
               // For non-text columns, we won't be specifying a collation or
               // character set.
               $charset = null;
               $collation = null;
             } else {
               $charset = $expect_column->getCharacterSet();
               $collation = $expect_column->getCollation();
             }
 
             $adjustment = array(
               'kind' => 'column',
               'database' => $database_name,
               'table' => $table_name,
               'name' => $column_name,
               'issues' => $issues,
               'collation' => $collation,
               'charset' => $charset,
               'type' => $expect_column->getColumnType(),
 
               // NOTE: We don't adjust column nullability because it is
               // dangerous, so always use the current nullability.
               'nullable' => $actual_column->getNullable(),
 
               // NOTE: This always stores the current value, because we have
               // to make these updates separately.
               'is_auto' => $actual_column->getAutoIncrement(),
             );
 
             if ($column->hasIssue($issue_auto)) {
               $adjustment['auto'] = $expect_column->getAutoIncrement();
             }
 
             $adjustments[] = $adjustment;
           }
         }
 
         foreach ($table->getKeys() as $key_name => $key) {
           foreach ($this->findErrors($key) as $issue) {
             $errors[] = array(
               'database' => $database_name,
               'table' => $table_name,
               'name' => $key_name,
               'issue' => $issue,
             );
           }
 
           $expect_key = $expect_table->getKey($key_name);
           $actual_key = $actual_table->getKey($key_name);
 
           $issues = array();
           $keep_key = true;
           if ($key->hasIssue($issue_surpluskey)) {
             $issues[] = $issue_surpluskey;
             $keep_key = false;
           }
 
           if ($key->hasIssue($issue_missingkey)) {
             $issues[] = $issue_missingkey;
           }
 
           if ($key->hasIssue($issue_columns)) {
             $issues[] = $issue_columns;
           }
 
           if ($key->hasIssue($issue_unique)) {
             $issues[] = $issue_unique;
           }
 
           // NOTE: We can't really fix this, per se, but we may need to remove
           // the key to change the column type. In the best case, the new
           // column type won't be overlong and recreating the key really will
           // fix the issue. In the worst case, we get the right column type and
           // lose the key, which is still better than retaining the key having
           // the wrong column type.
           if ($key->hasIssue($issue_longkey)) {
             $issues[] = $issue_longkey;
           }
 
           if ($issues) {
             $adjustment = array(
               'kind' => 'key',
               'database' => $database_name,
               'table' => $table_name,
               'name' => $key_name,
               'issues' => $issues,
               'exists' => (bool)$actual_key,
               'keep' => $keep_key,
             );
 
             if ($keep_key) {
               $adjustment += array(
                 'columns' => $expect_key->getColumnNames(),
                 'unique' => $expect_key->getUnique(),
                 'indexType' => $expect_key->getIndexType(),
               );
             }
 
             $adjustments[] = $adjustment;
           }
         }
       }
     }
 
     return array($adjustments, $errors);
   }
 
   private function findErrors(PhabricatorConfigStorageSchema $schema) {
     $result = array();
     foreach ($schema->getLocalIssues() as $issue) {
       $status = PhabricatorConfigStorageSchema::getIssueStatus($issue);
       if ($status == PhabricatorConfigStorageSchema::STATUS_FAIL) {
         $result[] = $issue;
       }
     }
     return $result;
   }
 
   private function printErrors(array $errors, $default_return) {
     if (!$errors) {
       return $default_return;
     }
 
     $console = PhutilConsole::getConsole();
 
     $table = id(new PhutilConsoleTable())
       ->addColumn('target', array('title' => pht('Target')))
       ->addColumn('error', array('title' => pht('Error')));
 
     $any_surplus = false;
     $all_surplus = true;
     foreach ($errors as $error) {
       $pieces = array_select_keys(
         $error,
         array('database', 'table', 'name'));
       $pieces = array_filter($pieces);
       $target = implode('.', $pieces);
 
       $name = PhabricatorConfigStorageSchema::getIssueName($error['issue']);
 
       if ($error['issue'] === PhabricatorConfigStorageSchema::ISSUE_SURPLUS) {
         $any_surplus = true;
       } else {
         $all_surplus = false;
       }
 
       $table->addRow(
         array(
           'target' => $target,
           'error' => $name,
         ));
     }
 
     $console->writeOut("\n");
     $table->draw();
     $console->writeOut("\n");
 
 
     $message = array();
     if ($all_surplus) {
       $message[] = pht(
         'You have surplus schemata (extra tables or columns which Phabricator '.
         'does not expect). For information on resolving these '.
         'issues, see the "Surplus Schemata" section in the "Managing Storage '.
         'Adjustments" article in the documentation.');
     } else {
       $message[] = pht(
         'The schemata have errors (detailed above) which the adjustment '.
         'workflow can not fix.');
 
       if ($any_surplus) {
         $message[] = pht(
           'Some of these errors are caused by surplus schemata (extra '.
           'tables or columsn which Phabricator does not expect). These are '.
           'not serious. For information on resolving these issues, see the '.
           '"Surplus Schemata" section in the "Managing Storage Adjustments" '.
           'article in the documentation.');
       }
 
       $message[] = pht(
         'If you are not developing Phabricator itself, report this issue to '.
         'the upstream.');
 
       $message[] = pht(
         'If you are developing Phabricator, these errors usually indicate '.
         'that your schema specifications do not agree with the schemata your '.
         'code actually builds.');
     }
     $message = implode("\n\n", $message);
 
     if ($all_surplus) {
       $console->writeOut(
         "**<bg:yellow> %s </bg>**\n\n%s\n",
         pht('SURPLUS SCHEMATA'),
         phutil_console_wrap($message));
     } else {
       $console->writeOut(
         "**<bg:red> %s </bg>**\n\n%s\n",
         pht('SCHEMATA ERRORS'),
         phutil_console_wrap($message));
     }
 
     return 2;
   }
 
   protected final function getBareHostAndPort($host) {
     // Split out port information, since the command-line client requires a
     // separate flag for the port.
     $uri = new PhutilURI('mysql://'.$host);
     if ($uri->getPort()) {
       $port = $uri->getPort();
       $bare_hostname = $uri->getDomain();
     } else {
       $port = null;
       $bare_hostname = $host;
     }
 
     return array($bare_hostname, $port);
   }
 
 }