diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
index 294fa096..f92e1499 100644
--- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
@@ -1,501 +1,501 @@
/**
* Copyright (c) 2010 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.yahoo.ycsb.db.flavors.DBFlavor;
/**
* A class that wraps a JDBC compliant database to allow it to be interfaced
* with YCSB. This class extends {@link DB} and implements the database
* interface used by YCSB client.
*
*
* Each client will have its own instance of this class. This client is not
* thread safe.
*
*
* This interface expects a schema ... All
- * attributes are of type VARCHAR. All accesses are through the primary key.
+ * attributes are of type TEXT. All accesses are through the primary key.
* Therefore, only one index on the primary key is needed.
*/
public class JdbcDBClient extends DB {
/** The class to use as the jdbc driver. */
public static final String DRIVER_CLASS = "db.driver";
/** The URL to connect to the database. */
public static final String CONNECTION_URL = "db.url";
/** The user name to use to connect to the database. */
public static final String CONNECTION_USER = "db.user";
/** The password to use for establishing the connection. */
public static final String CONNECTION_PASSWD = "db.passwd";
/** The batch size for batched inserts. Set to >0 to use batching */
public static final String DB_BATCH_SIZE = "db.batchsize";
/** The JDBC fetch size hinted to the driver. */
public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize";
/** The JDBC connection auto-commit property for the driver. */
public static final String JDBC_AUTO_COMMIT = "jdbc.autocommit";
public static final String JDBC_BATCH_UPDATES = "jdbc.batchupdateapi";
/** The name of the property for the number of fields in a record. */
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
/** Default number of fields in a record. */
public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10";
/** Representing a NULL value. */
public static final String NULL_VALUE = "NULL";
/** The primary key in the user table. */
public static final String PRIMARY_KEY = "YCSB_KEY";
/** The field name prefix in the table. */
public static final String COLUMN_PREFIX = "FIELD";
private List conns;
private boolean initialized = false;
private Properties props;
private int jdbcFetchSize;
private int batchSize;
private boolean autoCommit;
private boolean batchUpdates;
private static final String DEFAULT_PROP = "";
private ConcurrentMap cachedStatements;
private long numRowsInBatch = 0;
/** DB flavor defines DB-specific syntax and behavior for the
* particular database. Current database flavors are: {default, phoenix} */
private DBFlavor dbFlavor;
/**
* Ordered field information for insert and update statements.
*/
private static class OrderedFieldInfo {
private String fieldKeys;
private List fieldValues;
OrderedFieldInfo(String fieldKeys, List fieldValues) {
this.fieldKeys = fieldKeys;
this.fieldValues = fieldValues;
}
String getFieldKeys() {
return fieldKeys;
}
List getFieldValues() {
return fieldValues;
}
}
/**
* For the given key, returns what shard contains data for this key.
*
* @param key Data key to do operation on
* @return Shard index
*/
private int getShardIndexByKey(String key) {
int ret = Math.abs(key.hashCode()) % conns.size();
return ret;
}
/**
* For the given key, returns Connection object that holds connection to the
* shard that contains this key.
*
* @param key Data key to get information for
* @return Connection object
*/
private Connection getShardConnectionByKey(String key) {
return conns.get(getShardIndexByKey(key));
}
private void cleanupAllConnections() throws SQLException {
for (Connection conn : conns) {
if (!autoCommit) {
conn.commit();
}
conn.close();
}
}
/** Returns parsed int value from the properties if set, otherwise returns -1. */
private static int getIntProperty(Properties props, String key) throws DBException {
String valueStr = props.getProperty(key);
if (valueStr != null) {
try {
return Integer.parseInt(valueStr);
} catch (NumberFormatException nfe) {
System.err.println("Invalid " + key + " specified: " + valueStr);
throw new DBException(nfe);
}
}
return -1;
}
/** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */
private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) {
String valueStr = props.getProperty(key);
if (valueStr != null) {
return Boolean.parseBoolean(valueStr);
}
return defaultVal;
}
@Override
public void init() throws DBException {
if (initialized) {
System.err.println("Client connection already initialized.");
return;
}
props = getProperties();
String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
String driver = props.getProperty(DRIVER_CLASS);
this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
this.autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
this.batchUpdates = getBoolProperty(props, JDBC_BATCH_UPDATES, false);
try {
if (driver != null) {
Class.forName(driver);
}
int shardCount = 0;
conns = new ArrayList(3);
final String[] urlArr = urls.split(",");
for (String url : urlArr) {
System.out.println("Adding shard node URL: " + url);
Connection conn = DriverManager.getConnection(url, user, passwd);
// Since there is no explicit commit method in the DB interface, all
// operations should auto commit, except when explicitly told not to
// (this is necessary in cases such as for PostgreSQL when running a
// scan workload with fetchSize)
conn.setAutoCommit(autoCommit);
shardCount++;
conns.add(conn);
}
System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
cachedStatements = new ConcurrentHashMap();
this.dbFlavor = DBFlavor.fromJdbcUrl(urlArr[0]);
} catch (ClassNotFoundException e) {
System.err.println("Error in initializing the JDBS driver: " + e);
throw new DBException(e);
} catch (SQLException e) {
System.err.println("Error in database operation: " + e);
throw new DBException(e);
} catch (NumberFormatException e) {
System.err.println("Invalid value for fieldcount property. " + e);
throw new DBException(e);
}
initialized = true;
}
@Override
public void cleanup() throws DBException {
if (batchSize > 0) {
try {
// commit un-finished batches
for (PreparedStatement st : cachedStatements.values()) {
if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) {
st.executeBatch();
}
}
} catch (SQLException e) {
System.err.println("Error in cleanup execution. " + e);
throw new DBException(e);
}
}
try {
cleanupAllConnections();
} catch (SQLException e) {
System.err.println("Error in closing the connection. " + e);
throw new DBException(e);
}
}
private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
throws SQLException {
String insert = dbFlavor.createInsertStatement(insertType, key);
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert);
PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
if (stmt == null) {
return insertStatement;
}
return stmt;
}
private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
throws SQLException {
String read = dbFlavor.createReadStatement(readType, key);
PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read);
PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
if (stmt == null) {
return readStatement;
}
return stmt;
}
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
throws SQLException {
String delete = dbFlavor.createDeleteStatement(deleteType, key);
PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete);
PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
if (stmt == null) {
return deleteStatement;
}
return stmt;
}
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
throws SQLException {
String update = dbFlavor.createUpdateStatement(updateType, key);
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update);
PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
if (stmt == null) {
return insertStatement;
}
return stmt;
}
private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
throws SQLException {
String select = dbFlavor.createScanStatement(scanType, key);
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select);
if (this.jdbcFetchSize > 0) {
scanStatement.setFetchSize(this.jdbcFetchSize);
}
PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
if (stmt == null) {
return scanStatement;
}
return stmt;
}
@Override
public Status read(String tableName, String key, Set fields, Map result) {
try {
StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", getShardIndexByKey(key));
PreparedStatement readStatement = cachedStatements.get(type);
if (readStatement == null) {
readStatement = createAndCacheReadStatement(type, key);
}
readStatement.setString(1, key);
ResultSet resultSet = readStatement.executeQuery();
if (!resultSet.next()) {
resultSet.close();
return Status.NOT_FOUND;
}
if (result != null && fields != null) {
for (String field : fields) {
String value = resultSet.getString(field);
result.put(field, new StringByteIterator(value));
}
}
resultSet.close();
return Status.OK;
} catch (SQLException e) {
System.err.println("Error in processing read of table " + tableName + ": " + e);
return Status.ERROR;
}
}
@Override
public Status scan(String tableName, String startKey, int recordcount, Set fields,
Vector> result) {
try {
StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, "", getShardIndexByKey(startKey));
PreparedStatement scanStatement = cachedStatements.get(type);
if (scanStatement == null) {
scanStatement = createAndCacheScanStatement(type, startKey);
}
scanStatement.setString(1, startKey);
scanStatement.setInt(2, recordcount);
ResultSet resultSet = scanStatement.executeQuery();
for (int i = 0; i < recordcount && resultSet.next(); i++) {
if (result != null && fields != null) {
HashMap values = new HashMap();
for (String field : fields) {
String value = resultSet.getString(field);
values.put(field, new StringByteIterator(value));
}
result.add(values);
}
}
resultSet.close();
return Status.OK;
} catch (SQLException e) {
System.err.println("Error in processing scan of table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status update(String tableName, String key, Map values) {
try {
int numFields = values.size();
OrderedFieldInfo fieldInfo = getFieldInfo(values);
StatementType type = new StatementType(StatementType.Type.UPDATE, tableName,
numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key));
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
updateStatement = createAndCacheUpdateStatement(type, key);
}
int index = 1;
for (String value: fieldInfo.getFieldValues()) {
updateStatement.setString(index++, value);
}
updateStatement.setString(index, key);
int result = updateStatement.executeUpdate();
if (result == 1) {
return Status.OK;
}
return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing update to table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status insert(String tableName, String key, Map values) {
try {
int numFields = values.size();
OrderedFieldInfo fieldInfo = getFieldInfo(values);
StatementType type = new StatementType(StatementType.Type.INSERT, tableName,
numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key));
PreparedStatement insertStatement = cachedStatements.get(type);
if (insertStatement == null) {
insertStatement = createAndCacheInsertStatement(type, key);
}
insertStatement.setString(1, key);
int index = 2;
for (String value: fieldInfo.getFieldValues()) {
insertStatement.setString(index++, value);
}
// Using the batch insert API
if (batchUpdates) {
insertStatement.addBatch();
// Check for a sane batch size
if (batchSize > 0) {
// Commit the batch after it grows beyond the configured size
if (++numRowsInBatch % batchSize == 0) {
int[] results = insertStatement.executeBatch();
for (int r : results) {
if (r != 1) {
return Status.ERROR;
}
}
// If autoCommit is off, make sure we commit the batch
if (!autoCommit) {
getShardConnectionByKey(key).commit();
}
return Status.OK;
} // else, the default value of -1 or a nonsense. Treat it as an infinitely large batch.
} // else, we let the batch accumulate
// Added element to the batch, potentially committing the batch too.
return Status.BATCHED_OK;
} else {
// Normal update
int result = insertStatement.executeUpdate();
// If we are not autoCommit, we might have to commit now
if (!autoCommit) {
// Let updates be batcher locally
if (batchSize > 0) {
if (++numRowsInBatch % batchSize == 0) {
// Send the batch of updates
getShardConnectionByKey(key).commit();
}
// uhh
return Status.OK;
} else {
// Commit each update
getShardConnectionByKey(key).commit();
}
}
if (result == 1) {
return Status.OK;
}
}
return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing insert to table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status delete(String tableName, String key) {
try {
StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, "", getShardIndexByKey(key));
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
deleteStatement = createAndCacheDeleteStatement(type, key);
}
deleteStatement.setString(1, key);
int result = deleteStatement.executeUpdate();
if (result == 1) {
return Status.OK;
}
return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing delete to table: " + tableName + e);
return Status.ERROR;
}
}
private OrderedFieldInfo getFieldInfo(Map values) {
String fieldKeys = "";
List fieldValues = new ArrayList<>();
int count = 0;
for (Map.Entry entry : values.entrySet()) {
fieldKeys += entry.getKey();
if (count < values.size() - 1) {
fieldKeys += ",";
}
fieldValues.add(count, entry.getValue().toString());
count++;
}
return new OrderedFieldInfo(fieldKeys, fieldValues);
}
}
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java
index 8dfb26dd..d1128eb6 100644
--- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java
@@ -1,224 +1,224 @@
/**
* Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Enumeration;
import java.util.Properties;
/**
* Utility class to create the table to be used by the benchmark.
*
* @author sudipto
*/
public final class JdbcDBCreateTable {
private static void usageMessage() {
System.out.println("Create Table Client. Options:");
System.out.println(" -p key=value properties defined.");
System.out.println(" -P location of the properties file to load.");
System.out.println(" -n name of the table.");
System.out.println(" -f number of fields (default 10).");
}
private static void createTable(Properties props, String tablename) throws SQLException {
String driver = props.getProperty(JdbcDBClient.DRIVER_CLASS);
String username = props.getProperty(JdbcDBClient.CONNECTION_USER);
String password = props.getProperty(JdbcDBClient.CONNECTION_PASSWD, "");
String url = props.getProperty(JdbcDBClient.CONNECTION_URL);
int fieldcount = Integer.parseInt(props.getProperty(JdbcDBClient.FIELD_COUNT_PROPERTY,
JdbcDBClient.FIELD_COUNT_PROPERTY_DEFAULT));
if (driver == null || username == null || url == null) {
throw new SQLException("Missing connection information.");
}
Connection conn = null;
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, username, password);
Statement stmt = conn.createStatement();
StringBuilder sql = new StringBuilder("DROP TABLE IF EXISTS ");
sql.append(tablename);
sql.append(";");
stmt.execute(sql.toString());
sql = new StringBuilder("CREATE TABLE ");
sql.append(tablename);
sql.append(" (YCSB_KEY VARCHAR PRIMARY KEY");
for (int idx = 0; idx < fieldcount; idx++) {
sql.append(", FIELD");
sql.append(idx);
- sql.append(" VARCHAR");
+ sql.append(" TEXT");
}
sql.append(");");
stmt.execute(sql.toString());
System.out.println("Table " + tablename + " created..");
} catch (ClassNotFoundException e) {
throw new SQLException("JDBC Driver class not found.");
} finally {
if (conn != null) {
System.out.println("Closing database connection.");
conn.close();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length == 0) {
usageMessage();
System.exit(0);
}
String tablename = null;
int fieldcount = -1;
Properties props = new Properties();
Properties fileprops = new Properties();
// parse arguments
int argindex = 0;
while (args[argindex].startsWith("-")) {
if (args[argindex].compareTo("-P") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.exit(0);
}
String propfile = args[argindex];
argindex++;
Properties myfileprops = new Properties();
try {
myfileprops.load(new FileInputStream(propfile));
} catch (IOException e) {
System.out.println(e.getMessage());
System.exit(0);
}
// Issue #5 - remove call to stringPropertyNames to make compilable
// under Java 1.5
for (Enumeration> e = myfileprops.propertyNames(); e.hasMoreElements();) {
String prop = (String) e.nextElement();
fileprops.setProperty(prop, myfileprops.getProperty(prop));
}
} else if (args[argindex].compareTo("-p") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.exit(0);
}
int eq = args[argindex].indexOf('=');
if (eq < 0) {
usageMessage();
System.exit(0);
}
String name = args[argindex].substring(0, eq);
String value = args[argindex].substring(eq + 1);
props.put(name, value);
argindex++;
} else if (args[argindex].compareTo("-n") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.exit(0);
}
tablename = args[argindex++];
} else if (args[argindex].compareTo("-f") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.exit(0);
}
try {
fieldcount = Integer.parseInt(args[argindex++]);
} catch (NumberFormatException e) {
System.err.println("Invalid number for field count");
usageMessage();
System.exit(1);
}
} else {
System.out.println("Unknown option " + args[argindex]);
usageMessage();
System.exit(0);
}
if (argindex >= args.length) {
break;
}
}
if (argindex != args.length) {
usageMessage();
System.exit(0);
}
// overwrite file properties with properties from the command line
// Issue #5 - remove call to stringPropertyNames to make compilable under
// Java 1.5
for (Enumeration> e = props.propertyNames(); e.hasMoreElements();) {
String prop = (String) e.nextElement();
fileprops.setProperty(prop, props.getProperty(prop));
}
props = fileprops;
if (tablename == null) {
System.err.println("table name missing.");
usageMessage();
System.exit(1);
}
if (fieldcount > 0) {
props.setProperty(JdbcDBClient.FIELD_COUNT_PROPERTY, String.valueOf(fieldcount));
}
try {
createTable(props, tablename);
} catch (SQLException e) {
System.err.println("Error in creating table. " + e);
System.exit(1);
}
}
/**
* Hidden constructor.
*/
private JdbcDBCreateTable() {
super();
}
}