diff --git a/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java b/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java index a144636e..017e6f42 100644 --- a/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java +++ b/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java @@ -1,218 +1,224 @@ package com.yahoo.ycsb.db; import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.QueryResult; import com.github.jasync.sql.db.RowData; import com.github.jasync.sql.db.exceptions.UnableToParseURLException; import com.github.jasync.sql.db.mysql.MySQLConnection; import com.github.jasync.sql.db.mysql.util.URLParser; import com.yahoo.ycsb.AsyncDB; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.db.flavors.DBFlavor; import com.yahoo.ycsb.db.flavors.DefaultDBFlavor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * */ public class JAsyncMySQLDBClient extends AsyncDB { /** The database URL to use when establishing the connection. * Alternatively db.host, db.port, db.user, and db.passwd can be used. */ public static final String CONNECTION_URL = "db.url"; /** The database host to connect to. */ private static final String CONNECTION_HOST = "db.host"; /** The database port to connect to. */ private static final String CONNECTION_PORT = "db.port"; /** The user name to use to connect to the database. */ private static final String CONNECTION_USER = "db.user"; /** The password to use for establishing the connection. */ private static final String CONNECTION_PASSWD = "db.passwd"; /** The name of the database to connect to. */ private static final String CONNECTION_DB = "db.name"; /** The maximum number of database connections to use (by default unlimited). */ private static final String CONNECTION_MAX = "db.max_conn"; /** Whether to display debug information about the connection pool. */ private static final String CONNECTION_DEBUG_POOL = "db.debug_pool"; private static final String DEFAULT_PORT = "3306"; private static final String DEFAULT_PROP = ""; private boolean initialized = false; private DBFlavor dbFlavor = new DefaultDBFlavor(); private MySQLConnectionPool connectionPool; private Configuration fallbackConfig(Properties props) { String host = props.getProperty(CONNECTION_HOST, DEFAULT_PROP); String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); int port = Integer.parseInt(props.getProperty(CONNECTION_PORT, DEFAULT_PORT)); String db = props.getProperty(CONNECTION_DB, DEFAULT_PROP); String password = props.getProperty(CONNECTION_PASSWD, null); return new Configuration(user, host, port, password, db); } @Override public void init() { if (initialized) { System.err.println("Client connection already initialized."); return; } Properties props = getProperties(); Configuration configuration; if (props.containsKey(CONNECTION_URL)) { String url = props.getProperty(CONNECTION_URL, DEFAULT_PROP); try { configuration = URLParser.INSTANCE.parseOrDie(url, Charset.defaultCharset()); } catch (UnableToParseURLException e) { System.err.println("Unable to parse \"db.url\" parameter, falling back to separate parameter parsing."); configuration = fallbackConfig(props); } } else { configuration = fallbackConfig(props); } int maxConnections = Integer.MAX_VALUE; if (props.containsKey(CONNECTION_MAX)) { maxConnections = Integer.parseInt(props.getProperty(CONNECTION_MAX)); } boolean debugPool = Boolean.parseBoolean(props.getProperty(CONNECTION_DEBUG_POOL, "false")); initialized = true; connectionPool = new MySQLConnectionPool(configuration, maxConnections, true); if (debugPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("Available: " + connectionPool.getNumAvailableObjects() + " Used: " + connectionPool.getNumTakenObjects()); }, 0, 100, TimeUnit.MILLISECONDS); } } @Override public void cleanup() { connectionPool.close(); } @Override public CompletableFuture read(String tableName, String key, Set fields, Map result) { StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", 0); String read = dbFlavor.createReadStatement(type, key); MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(read, Collections.singletonList(key)) .thenApply(QueryResult::getRows) .thenApply((resultRows) -> { connectionPool.giveBack(connection); if (resultRows == null) { return Status.UNEXPECTED_STATE; } if (resultRows.isEmpty()) { return Status.NOT_FOUND; } if (result != null && fields != null) { for (String field : fields) { String value = (String) resultRows.get(0).get(field); result.put(field, new StringByteIterator(value)); } } return Status.OK; }); } @Override public CompletableFuture scan(String tableName, String startKey, int recordCount, Set fields, Vector> result) { StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, "", 0); String read = dbFlavor.createScanStatement(type, startKey); MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(read, Arrays.asList(startKey, recordCount)) .thenApply(QueryResult::getRows) .thenApply((resultRows) -> { connectionPool.giveBack(connection); if (resultRows == null || resultRows.size() < recordCount) { return Status.UNEXPECTED_STATE; } for (RowData row : resultRows) { if (result != null && fields != null) { HashMap values = new HashMap(); for (String field : fields) { String value = (String) row.get(field); values.put(field, new StringByteIterator(value)); } result.add(values); } } return Status.OK; }); } @Override - public CompletableFuture update(String tableName, String key, Map values) { - Set> orderedValues = values.entrySet(); - String fieldString = orderedValues.stream().map(Map.Entry::getKey) - .reduce((a, b) -> a = a + ", " + b).orElse(""); - List queryParams = orderedValues.stream().map(Map.Entry::getValue) - .map(ByteIterator::toString).collect(Collectors.toCollection(ArrayList::new)); - queryParams.add(key); - StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, - values.size(), fieldString, 0); - - return runModifyingQuery(queryParams, dbFlavor.createUpdateStatement(type, key)); + public CompletableFuture update(String table, String key, Map values) { + return runAlteringQuery(StatementType.Type.UPDATE, table, key, values); } @Override public CompletableFuture insert(String table, String key, Map values) { - Set> orderedValues = values.entrySet(); - String fieldString = orderedValues.stream().map(Map.Entry::getKey) - .reduce((a, b) -> a = a + ", " + b).orElse(""); - List queryParams = orderedValues.stream().map(Map.Entry::getValue) - .map(ByteIterator::toString).collect(Collectors.toCollection(ArrayList::new)); - StatementType type = new StatementType(StatementType.Type.INSERT, table, - values.size(), fieldString, 0); - - return runModifyingQuery(queryParams, dbFlavor.createInsertStatement(type, key)); + return runAlteringQuery(StatementType.Type.INSERT, table, key, values); } @Override - public CompletableFuture delete(String tableName, String key) { - StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", 0); - return runModifyingQuery(Collections.singletonList(key), dbFlavor.createDeleteStatement(type, key)); + public CompletableFuture delete(String table, String key) { + return runAlteringQuery(StatementType.Type.DELETE, table, key, null); } - private CompletableFuture runModifyingQuery(List queryParams, String query) { + private CompletableFuture runAlteringQuery(StatementType.Type queryType, String table, String key, + Map values) { + final String query; + final List queryParams; + if (queryType == StatementType.Type.DELETE) { + queryParams = Collections.singletonList(key); + StatementType type = new StatementType(StatementType.Type.DELETE, table, 1, "", 0); + query = dbFlavor.createDeleteStatement(type, key); + } else { + // INSERT or UPDATE + Set> orderedValues = values.entrySet(); + String fieldString = orderedValues.stream().map(Map.Entry::getKey) + .reduce((a, b) -> a = a + ", " + b).orElse(""); + queryParams = orderedValues.stream().map(Map.Entry::getValue) + .map(ByteIterator::toString).collect(Collectors.toCollection(ArrayList::new)); + StatementType type = new StatementType(queryType, table, values.size(), fieldString, 0); + if (queryType == StatementType.Type.INSERT) { + queryParams.add(0, key); + query = dbFlavor.createInsertStatement(type, key); + } else { + queryParams.add(key); + query = dbFlavor.createUpdateStatement(type, key); + } + } + MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(query, queryParams).thenApply((qResult) -> { connectionPool.giveBack(connection); if (qResult.getRowsAffected() != 1) { return Status.UNEXPECTED_STATE; } return Status.OK; }); } }