diff --git a/core/src/main/java/com/yahoo/ycsb/ObjectPool.java b/core/src/main/java/com/yahoo/ycsb/ObjectPool.java new file mode 100644 index 00000000..6526fc68 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/ObjectPool.java @@ -0,0 +1,104 @@ +package com.yahoo.ycsb; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + */ +public abstract class ObjectPool { + private ConcurrentLinkedQueue available = new ConcurrentLinkedQueue<>(); + private ConcurrentHashMap checkedOut = new ConcurrentHashMap<>(); + private AtomicLong objectCount = new AtomicLong(0); + private boolean closed = false; + private ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(); + private final long maxObjects; + + public ObjectPool(long maxObjects) { + this.maxObjects = maxObjects; + if (maxObjects <= 0) { + throw new IllegalArgumentException("maxObjects cannot be less than or equal to 0."); + } + } + + private static class PoolAlreadyClosedException extends IllegalStateException { + PoolAlreadyClosedException() { + super("This object pool has already been closed."); + } + } + + private static class PoolExhaustedException extends IllegalStateException { + PoolExhaustedException() { + super("This pool has no more objects available."); + } + } + + protected abstract T create(); + + protected abstract void destroy(T object); + + protected abstract boolean validate(T object); + + public T take() { + if (closed) { + throw new PoolAlreadyClosedException(); + } + T object = available.poll(); + if (null == object) { + long connIdx = objectCount.getAndUpdate(count -> count == maxObjects ? count : count + 1); + if (connIdx == maxObjects) { + throw new PoolExhaustedException(); + } + object = create(); + } + checkedOut.put(object, object); + return object; + } + + public void giveBack(T object) { + if (!checkedOut.containsKey(object)) { + throw new IllegalArgumentException("This object is not part of the pool or was already returned."); + } + + if (validate(object)) { + checkedOut.remove(object); + available.offer(object); + } + } + + public long getTotalObjects() { + return objectCount.get(); + } + + public int getNumTakenObjects() { + return checkedOut.size(); + } + + public int getNumAvailableObjects() { + return available.size(); + } + + private void destroyAllObjects() { + available.forEach(this::destroy); + } + + public void close() { + closed = true; + + destroyAllObjects(); + if (!checkedOut.isEmpty()) { + ses.scheduleAtFixedRate(() -> { + if (checkedOut.isEmpty()) { + destroyAllObjects(); + ses.shutdown(); + } else { + System.out.println("Waiting 100ms for all objects to be released."); + } + }, 0, 100, TimeUnit.MILLISECONDS); + } + } +} diff --git a/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java b/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java index 5391dd64..0501066b 100644 --- a/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java +++ b/mysql/src/main/java/com/yahoo/ycsb/db/JAsyncMySQLDBClient.java @@ -1,209 +1,218 @@ package com.yahoo.ycsb.db; import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.QueryResult; import com.github.jasync.sql.db.RowData; import com.github.jasync.sql.db.exceptions.UnableToParseURLException; import com.github.jasync.sql.db.mysql.MySQLConnection; import com.github.jasync.sql.db.mysql.util.URLParser; import com.yahoo.ycsb.AsyncDB; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.db.flavors.DBFlavor; import com.yahoo.ycsb.db.flavors.DefaultDBFlavor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * */ public class JAsyncMySQLDBClient extends AsyncDB { /** The database URL to use when establishing the connection. * Alternatively db.host, db.port, db.user, and db.passwd can be used. */ public static final String CONNECTION_URL = "db.url"; /** The database host to connect to. */ private static final String CONNECTION_HOST = "db.host"; /** The database port to connect to. */ private static final String CONNECTION_PORT = "db.port"; /** The user name to use to connect to the database. */ private static final String CONNECTION_USER = "db.user"; /** The password to use for establishing the connection. */ private static final String CONNECTION_PASSWD = "db.passwd"; /** The name of the database to connect to. */ private static final String CONNECTION_DB = "db.name"; /** The maximum number of database connections to use (by default unlimited). */ private static final String CONNECTION_MAX = "db.max_conn"; /** Whether to display debug information about the connection pool. */ private static final String CONNECTION_DEBUG_POOL = "db.debug_pool"; private static final String DEFAULT_PORT = "3306"; private static final String DEFAULT_PROP = ""; private boolean initialized = false; private DBFlavor dbFlavor = new DefaultDBFlavor(); private MySQLConnectionPool connectionPool; + private Configuration fallbackConfig(Properties props) { + String host = props.getProperty(CONNECTION_HOST, DEFAULT_PROP); + String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); + int port = Integer.parseInt(props.getProperty(CONNECTION_PORT, DEFAULT_PORT)); + String db = props.getProperty(CONNECTION_DB, DEFAULT_PROP); + String password = props.getProperty(CONNECTION_PASSWD, null); + return new Configuration(user, host, port, password, db); + } + + @Override public void init() { if (initialized) { System.err.println("Client connection already initialized."); return; } Properties props = getProperties(); - String url = props.getProperty(CONNECTION_URL, DEFAULT_PROP); Configuration configuration; - try { - configuration = URLParser.INSTANCE.parseOrDie(url, Charset.defaultCharset()); - } catch (UnableToParseURLException e) { - System.err.println("Unable to parse \"db.url\" parameter, falling back to separate parameter parsing."); - String host = props.getProperty(CONNECTION_HOST, DEFAULT_PROP); - String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); - int port = Integer.parseInt(props.getProperty(CONNECTION_PORT, DEFAULT_PORT)); - String db = props.getProperty(CONNECTION_DB, DEFAULT_PROP); - String password = props.getProperty(CONNECTION_PASSWD, null); - configuration = new Configuration(user, host, port, password, db); + if (props.containsKey(CONNECTION_URL)) { + String url = props.getProperty(CONNECTION_URL, DEFAULT_PROP); + try { + configuration = URLParser.INSTANCE.parseOrDie(url, Charset.defaultCharset()); + } catch (UnableToParseURLException e) { + System.err.println("Unable to parse \"db.url\" parameter, falling back to separate parameter parsing."); + configuration = fallbackConfig(props); + } + } else { + configuration = fallbackConfig(props); } int maxConnections = Integer.MAX_VALUE; if (props.containsKey(CONNECTION_MAX)) { maxConnections = Integer.parseInt(props.getProperty(CONNECTION_MAX)); } boolean debugPool = Boolean.parseBoolean(props.getProperty(CONNECTION_DEBUG_POOL, "false")); initialized = true; connectionPool = new MySQLConnectionPool(configuration, maxConnections); if (debugPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { - System.out.println("Available: " + connectionPool.getNumAvailableConnections() + - " Used: " + connectionPool.getNumTakenConnections()); + System.out.println("Available: " + connectionPool.getNumAvailableObjects() + + " Used: " + connectionPool.getNumTakenObjects()); }, 0, 100, TimeUnit.MILLISECONDS); } } @Override public void cleanup() { connectionPool.close(); } @Override public CompletableFuture read(String tableName, String key, Set fields, Map result) { StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", 0); String read = dbFlavor.createReadStatement(type, key); MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(read, Collections.singletonList(key)) .thenApply(QueryResult::getRows) .thenApply((resultRows) -> { connectionPool.giveBack(connection); if (resultRows == null) { return Status.UNEXPECTED_STATE; } if (resultRows.isEmpty()) { return Status.NOT_FOUND; } if (result != null && fields != null) { for (String field : fields) { String value = (String) resultRows.get(0).get(field); result.put(field, new StringByteIterator(value)); } } return Status.OK; }); } @Override public CompletableFuture scan(String tableName, String startKey, int recordCount, Set fields, Vector> result) { StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, "", 0); String read = dbFlavor.createScanStatement(type, startKey); MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(read, Arrays.asList(startKey, recordCount)) .thenApply(QueryResult::getRows) .thenApply((resultRows) -> { connectionPool.giveBack(connection); if (resultRows == null || resultRows.size() < recordCount) { return Status.UNEXPECTED_STATE; } for (RowData row : resultRows) { if (result != null && fields != null) { HashMap values = new HashMap(); for (String field : fields) { String value = (String) row.get(field); values.put(field, new StringByteIterator(value)); } result.add(values); } } return Status.OK; }); } @Override public CompletableFuture update(String tableName, String key, Map values) { Set> orderedValues = values.entrySet(); String fieldString = orderedValues.stream().map(Map.Entry::getKey) .reduce((a, b) -> a = a + ", " + b).orElse(""); List queryParams = orderedValues.stream().map(Map.Entry::getValue) .map(ByteIterator::toString).collect(Collectors.toCollection(ArrayList::new)); queryParams.add(key); StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, values.size(), fieldString, 0); return runModifyingQuery(queryParams, dbFlavor.createUpdateStatement(type, key)); } @Override public CompletableFuture insert(String table, String key, Map values) { Set> orderedValues = values.entrySet(); String fieldString = orderedValues.stream().map(Map.Entry::getKey) .reduce((a, b) -> a = a + ", " + b).orElse(""); List queryParams = orderedValues.stream().map(Map.Entry::getValue) .map(ByteIterator::toString).collect(Collectors.toCollection(ArrayList::new)); StatementType type = new StatementType(StatementType.Type.INSERT, table, values.size(), fieldString, 0); return runModifyingQuery(queryParams, dbFlavor.createInsertStatement(type, key)); } @Override public CompletableFuture delete(String tableName, String key) { StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", 0); return runModifyingQuery(Collections.singletonList(key), dbFlavor.createDeleteStatement(type, key)); } private CompletableFuture runModifyingQuery(List queryParams, String query) { MySQLConnection connection = connectionPool.take(); return connection.sendPreparedStatement(query, queryParams).thenApply((qResult) -> { connectionPool.giveBack(connection); if (qResult.getRowsAffected() != 1) { return Status.UNEXPECTED_STATE; } return Status.OK; }); } } diff --git a/mysql/src/main/java/com/yahoo/ycsb/db/MySQLConnectionPool.java b/mysql/src/main/java/com/yahoo/ycsb/db/MySQLConnectionPool.java index 70b46a47..52847d66 100644 --- a/mysql/src/main/java/com/yahoo/ycsb/db/MySQLConnectionPool.java +++ b/mysql/src/main/java/com/yahoo/ycsb/db/MySQLConnectionPool.java @@ -1,90 +1,34 @@ package com.yahoo.ycsb.db; import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.mysql.MySQLConnection; import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory; -import com.github.jasync.sql.db.pool.PoolAlreadyTerminatedException; -import com.github.jasync.sql.db.pool.PoolExhaustedException; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import com.yahoo.ycsb.ObjectPool; /** * */ -public class MySQLConnectionPool { +public class MySQLConnectionPool extends ObjectPool { + private final MySQLConnectionFactory connectionFactory; - private ConcurrentLinkedQueue available = new ConcurrentLinkedQueue<>(); - private ConcurrentHashMap checkedOut = new ConcurrentHashMap<>(); - private AtomicLong connectionCount = new AtomicLong(0); - private boolean closed = false; - private ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(); - private final long maxConnections; public MySQLConnectionPool(Configuration config, long maxConnections) { + super(maxConnections); this.connectionFactory = new MySQLConnectionFactory(config); - this.maxConnections = maxConnections; - if (maxConnections <= 0) { - throw new IllegalArgumentException("maxConnections cannot be less than or equal to 0."); - } - } - - public MySQLConnection take() { - if (closed) { - throw new PoolAlreadyTerminatedException(); - } - MySQLConnection connection = available.poll(); - if (null == connection) { - long connIdx = connectionCount.getAndUpdate(count -> count == maxConnections ? count : count + 1); - if (connIdx == maxConnections) { - throw new PoolExhaustedException("No connections are available."); - } - connection = connectionFactory.create(); - } - checkedOut.put(connection, connection); - return connection; - } - - public void giveBack(MySQLConnection connection) { - if (!checkedOut.containsKey(connection)) { - throw new IllegalArgumentException("This connection is not part of the pool or was already returned."); - } - - if (connectionFactory.validate(connection).isSuccess()) { - checkedOut.remove(connection); - available.offer(connection); - } - } - - public long getTotalConnections() { - return connectionCount.get(); } - public int getNumTakenConnections() { - return checkedOut.size(); + @Override + protected MySQLConnection create() { + return connectionFactory.create(); } - public int getNumAvailableConnections() { - return available.size(); + @Override + protected void destroy(MySQLConnection connection) { + connection.disconnect(); } - private void closeAllConnections() { - available.forEach((MySQLConnection::disconnect)); - } - - public void close() { - closed = true; - - closeAllConnections(); - if (!checkedOut.isEmpty()) { - ses.scheduleAtFixedRate(() -> { - if (checkedOut.isEmpty()) { - closeAllConnections(); - ses.shutdown(); - } else { - System.out.println("Waiting 100ms for all connections to be released."); - } - }, 0, 100, TimeUnit.MILLISECONDS); - } + @Override + protected boolean validate(MySQLConnection connection) { + return connectionFactory.validate(connection).isSuccess(); } }