diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java index 294fa096..f92e1499 100644 --- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java +++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java @@ -1,501 +1,501 @@ /** * Copyright (c) 2010 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import java.sql.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import com.yahoo.ycsb.db.flavors.DBFlavor; /** * A class that wraps a JDBC compliant database to allow it to be interfaced * with YCSB. This class extends {@link DB} and implements the database * interface used by YCSB client. * *
* Each client will have its own instance of this class. This client is not * thread safe. * *
* This interface expects a schema ... All - * attributes are of type VARCHAR. All accesses are through the primary key. + * attributes are of type TEXT. All accesses are through the primary key. * Therefore, only one index on the primary key is needed. */ public class JdbcDBClient extends DB { /** The class to use as the jdbc driver. */ public static final String DRIVER_CLASS = "db.driver"; /** The URL to connect to the database. */ public static final String CONNECTION_URL = "db.url"; /** The user name to use to connect to the database. */ public static final String CONNECTION_USER = "db.user"; /** The password to use for establishing the connection. */ public static final String CONNECTION_PASSWD = "db.passwd"; /** The batch size for batched inserts. Set to >0 to use batching */ public static final String DB_BATCH_SIZE = "db.batchsize"; /** The JDBC fetch size hinted to the driver. */ public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize"; /** The JDBC connection auto-commit property for the driver. */ public static final String JDBC_AUTO_COMMIT = "jdbc.autocommit"; public static final String JDBC_BATCH_UPDATES = "jdbc.batchupdateapi"; /** The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY = "fieldcount"; /** Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; /** Representing a NULL value. */ public static final String NULL_VALUE = "NULL"; /** The primary key in the user table. */ public static final String PRIMARY_KEY = "YCSB_KEY"; /** The field name prefix in the table. */ public static final String COLUMN_PREFIX = "FIELD"; private List conns; private boolean initialized = false; private Properties props; private int jdbcFetchSize; private int batchSize; private boolean autoCommit; private boolean batchUpdates; private static final String DEFAULT_PROP = ""; private ConcurrentMap cachedStatements; private long numRowsInBatch = 0; /** DB flavor defines DB-specific syntax and behavior for the * particular database. Current database flavors are: {default, phoenix} */ private DBFlavor dbFlavor; /** * Ordered field information for insert and update statements. */ private static class OrderedFieldInfo { private String fieldKeys; private List fieldValues; OrderedFieldInfo(String fieldKeys, List fieldValues) { this.fieldKeys = fieldKeys; this.fieldValues = fieldValues; } String getFieldKeys() { return fieldKeys; } List getFieldValues() { return fieldValues; } } /** * For the given key, returns what shard contains data for this key. * * @param key Data key to do operation on * @return Shard index */ private int getShardIndexByKey(String key) { int ret = Math.abs(key.hashCode()) % conns.size(); return ret; } /** * For the given key, returns Connection object that holds connection to the * shard that contains this key. * * @param key Data key to get information for * @return Connection object */ private Connection getShardConnectionByKey(String key) { return conns.get(getShardIndexByKey(key)); } private void cleanupAllConnections() throws SQLException { for (Connection conn : conns) { if (!autoCommit) { conn.commit(); } conn.close(); } } /** Returns parsed int value from the properties if set, otherwise returns -1. */ private static int getIntProperty(Properties props, String key) throws DBException { String valueStr = props.getProperty(key); if (valueStr != null) { try { return Integer.parseInt(valueStr); } catch (NumberFormatException nfe) { System.err.println("Invalid " + key + " specified: " + valueStr); throw new DBException(nfe); } } return -1; } /** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */ private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) { String valueStr = props.getProperty(key); if (valueStr != null) { return Boolean.parseBoolean(valueStr); } return defaultVal; } @Override public void init() throws DBException { if (initialized) { System.err.println("Client connection already initialized."); return; } props = getProperties(); String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP); String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); String driver = props.getProperty(DRIVER_CLASS); this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE); this.batchSize = getIntProperty(props, DB_BATCH_SIZE); this.autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true); this.batchUpdates = getBoolProperty(props, JDBC_BATCH_UPDATES, false); try { if (driver != null) { Class.forName(driver); } int shardCount = 0; conns = new ArrayList(3); final String[] urlArr = urls.split(","); for (String url : urlArr) { System.out.println("Adding shard node URL: " + url); Connection conn = DriverManager.getConnection(url, user, passwd); // Since there is no explicit commit method in the DB interface, all // operations should auto commit, except when explicitly told not to // (this is necessary in cases such as for PostgreSQL when running a // scan workload with fetchSize) conn.setAutoCommit(autoCommit); shardCount++; conns.add(conn); } System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize); cachedStatements = new ConcurrentHashMap(); this.dbFlavor = DBFlavor.fromJdbcUrl(urlArr[0]); } catch (ClassNotFoundException e) { System.err.println("Error in initializing the JDBS driver: " + e); throw new DBException(e); } catch (SQLException e) { System.err.println("Error in database operation: " + e); throw new DBException(e); } catch (NumberFormatException e) { System.err.println("Invalid value for fieldcount property. " + e); throw new DBException(e); } initialized = true; } @Override public void cleanup() throws DBException { if (batchSize > 0) { try { // commit un-finished batches for (PreparedStatement st : cachedStatements.values()) { if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) { st.executeBatch(); } } } catch (SQLException e) { System.err.println("Error in cleanup execution. " + e); throw new DBException(e); } } try { cleanupAllConnections(); } catch (SQLException e) { System.err.println("Error in closing the connection. " + e); throw new DBException(e); } } private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key) throws SQLException { String insert = dbFlavor.createInsertStatement(insertType, key); PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert); PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement); if (stmt == null) { return insertStatement; } return stmt; } private PreparedStatement createAndCacheReadStatement(StatementType readType, String key) throws SQLException { String read = dbFlavor.createReadStatement(readType, key); PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read); PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement); if (stmt == null) { return readStatement; } return stmt; } private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key) throws SQLException { String delete = dbFlavor.createDeleteStatement(deleteType, key); PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete); PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement); if (stmt == null) { return deleteStatement; } return stmt; } private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key) throws SQLException { String update = dbFlavor.createUpdateStatement(updateType, key); PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update); PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement); if (stmt == null) { return insertStatement; } return stmt; } private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key) throws SQLException { String select = dbFlavor.createScanStatement(scanType, key); PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select); if (this.jdbcFetchSize > 0) { scanStatement.setFetchSize(this.jdbcFetchSize); } PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement); if (stmt == null) { return scanStatement; } return stmt; } @Override public Status read(String tableName, String key, Set fields, Map result) { try { StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", getShardIndexByKey(key)); PreparedStatement readStatement = cachedStatements.get(type); if (readStatement == null) { readStatement = createAndCacheReadStatement(type, key); } readStatement.setString(1, key); ResultSet resultSet = readStatement.executeQuery(); if (!resultSet.next()) { resultSet.close(); return Status.NOT_FOUND; } if (result != null && fields != null) { for (String field : fields) { String value = resultSet.getString(field); result.put(field, new StringByteIterator(value)); } } resultSet.close(); return Status.OK; } catch (SQLException e) { System.err.println("Error in processing read of table " + tableName + ": " + e); return Status.ERROR; } } @Override public Status scan(String tableName, String startKey, int recordcount, Set fields, Vector> result) { try { StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, "", getShardIndexByKey(startKey)); PreparedStatement scanStatement = cachedStatements.get(type); if (scanStatement == null) { scanStatement = createAndCacheScanStatement(type, startKey); } scanStatement.setString(1, startKey); scanStatement.setInt(2, recordcount); ResultSet resultSet = scanStatement.executeQuery(); for (int i = 0; i < recordcount && resultSet.next(); i++) { if (result != null && fields != null) { HashMap values = new HashMap(); for (String field : fields) { String value = resultSet.getString(field); values.put(field, new StringByteIterator(value)); } result.add(values); } } resultSet.close(); return Status.OK; } catch (SQLException e) { System.err.println("Error in processing scan of table: " + tableName + e); return Status.ERROR; } } @Override public Status update(String tableName, String key, Map values) { try { int numFields = values.size(); OrderedFieldInfo fieldInfo = getFieldInfo(values); StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key)); PreparedStatement updateStatement = cachedStatements.get(type); if (updateStatement == null) { updateStatement = createAndCacheUpdateStatement(type, key); } int index = 1; for (String value: fieldInfo.getFieldValues()) { updateStatement.setString(index++, value); } updateStatement.setString(index, key); int result = updateStatement.executeUpdate(); if (result == 1) { return Status.OK; } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing update to table: " + tableName + e); return Status.ERROR; } } @Override public Status insert(String tableName, String key, Map values) { try { int numFields = values.size(); OrderedFieldInfo fieldInfo = getFieldInfo(values); StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key)); PreparedStatement insertStatement = cachedStatements.get(type); if (insertStatement == null) { insertStatement = createAndCacheInsertStatement(type, key); } insertStatement.setString(1, key); int index = 2; for (String value: fieldInfo.getFieldValues()) { insertStatement.setString(index++, value); } // Using the batch insert API if (batchUpdates) { insertStatement.addBatch(); // Check for a sane batch size if (batchSize > 0) { // Commit the batch after it grows beyond the configured size if (++numRowsInBatch % batchSize == 0) { int[] results = insertStatement.executeBatch(); for (int r : results) { if (r != 1) { return Status.ERROR; } } // If autoCommit is off, make sure we commit the batch if (!autoCommit) { getShardConnectionByKey(key).commit(); } return Status.OK; } // else, the default value of -1 or a nonsense. Treat it as an infinitely large batch. } // else, we let the batch accumulate // Added element to the batch, potentially committing the batch too. return Status.BATCHED_OK; } else { // Normal update int result = insertStatement.executeUpdate(); // If we are not autoCommit, we might have to commit now if (!autoCommit) { // Let updates be batcher locally if (batchSize > 0) { if (++numRowsInBatch % batchSize == 0) { // Send the batch of updates getShardConnectionByKey(key).commit(); } // uhh return Status.OK; } else { // Commit each update getShardConnectionByKey(key).commit(); } } if (result == 1) { return Status.OK; } } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing insert to table: " + tableName + e); return Status.ERROR; } } @Override public Status delete(String tableName, String key) { try { StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, "", getShardIndexByKey(key)); PreparedStatement deleteStatement = cachedStatements.get(type); if (deleteStatement == null) { deleteStatement = createAndCacheDeleteStatement(type, key); } deleteStatement.setString(1, key); int result = deleteStatement.executeUpdate(); if (result == 1) { return Status.OK; } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing delete to table: " + tableName + e); return Status.ERROR; } } private OrderedFieldInfo getFieldInfo(Map values) { String fieldKeys = ""; List fieldValues = new ArrayList<>(); int count = 0; for (Map.Entry entry : values.entrySet()) { fieldKeys += entry.getKey(); if (count < values.size() - 1) { fieldKeys += ","; } fieldValues.add(count, entry.getValue().toString()); count++; } return new OrderedFieldInfo(fieldKeys, fieldValues); } } diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java index 8dfb26dd..d1128eb6 100644 --- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java +++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBCreateTable.java @@ -1,224 +1,224 @@ /** * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import java.io.FileInputStream; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.Enumeration; import java.util.Properties; /** * Utility class to create the table to be used by the benchmark. * * @author sudipto */ public final class JdbcDBCreateTable { private static void usageMessage() { System.out.println("Create Table Client. Options:"); System.out.println(" -p key=value properties defined."); System.out.println(" -P location of the properties file to load."); System.out.println(" -n name of the table."); System.out.println(" -f number of fields (default 10)."); } private static void createTable(Properties props, String tablename) throws SQLException { String driver = props.getProperty(JdbcDBClient.DRIVER_CLASS); String username = props.getProperty(JdbcDBClient.CONNECTION_USER); String password = props.getProperty(JdbcDBClient.CONNECTION_PASSWD, ""); String url = props.getProperty(JdbcDBClient.CONNECTION_URL); int fieldcount = Integer.parseInt(props.getProperty(JdbcDBClient.FIELD_COUNT_PROPERTY, JdbcDBClient.FIELD_COUNT_PROPERTY_DEFAULT)); if (driver == null || username == null || url == null) { throw new SQLException("Missing connection information."); } Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection(url, username, password); Statement stmt = conn.createStatement(); StringBuilder sql = new StringBuilder("DROP TABLE IF EXISTS "); sql.append(tablename); sql.append(";"); stmt.execute(sql.toString()); sql = new StringBuilder("CREATE TABLE "); sql.append(tablename); sql.append(" (YCSB_KEY VARCHAR PRIMARY KEY"); for (int idx = 0; idx < fieldcount; idx++) { sql.append(", FIELD"); sql.append(idx); - sql.append(" VARCHAR"); + sql.append(" TEXT"); } sql.append(");"); stmt.execute(sql.toString()); System.out.println("Table " + tablename + " created.."); } catch (ClassNotFoundException e) { throw new SQLException("JDBC Driver class not found."); } finally { if (conn != null) { System.out.println("Closing database connection."); conn.close(); } } } /** * @param args */ public static void main(String[] args) { if (args.length == 0) { usageMessage(); System.exit(0); } String tablename = null; int fieldcount = -1; Properties props = new Properties(); Properties fileprops = new Properties(); // parse arguments int argindex = 0; while (args[argindex].startsWith("-")) { if (args[argindex].compareTo("-P") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.exit(0); } String propfile = args[argindex]; argindex++; Properties myfileprops = new Properties(); try { myfileprops.load(new FileInputStream(propfile)); } catch (IOException e) { System.out.println(e.getMessage()); System.exit(0); } // Issue #5 - remove call to stringPropertyNames to make compilable // under Java 1.5 for (Enumeration e = myfileprops.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, myfileprops.getProperty(prop)); } } else if (args[argindex].compareTo("-p") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.exit(0); } int eq = args[argindex].indexOf('='); if (eq < 0) { usageMessage(); System.exit(0); } String name = args[argindex].substring(0, eq); String value = args[argindex].substring(eq + 1); props.put(name, value); argindex++; } else if (args[argindex].compareTo("-n") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.exit(0); } tablename = args[argindex++]; } else if (args[argindex].compareTo("-f") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.exit(0); } try { fieldcount = Integer.parseInt(args[argindex++]); } catch (NumberFormatException e) { System.err.println("Invalid number for field count"); usageMessage(); System.exit(1); } } else { System.out.println("Unknown option " + args[argindex]); usageMessage(); System.exit(0); } if (argindex >= args.length) { break; } } if (argindex != args.length) { usageMessage(); System.exit(0); } // overwrite file properties with properties from the command line // Issue #5 - remove call to stringPropertyNames to make compilable under // Java 1.5 for (Enumeration e = props.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, props.getProperty(prop)); } props = fileprops; if (tablename == null) { System.err.println("table name missing."); usageMessage(); System.exit(1); } if (fieldcount > 0) { props.setProperty(JdbcDBClient.FIELD_COUNT_PROPERTY, String.valueOf(fieldcount)); } try { createTable(props, tablename); } catch (SQLException e) { System.err.println("Error in creating table. " + e); System.exit(1); } } /** * Hidden constructor. */ private JdbcDBCreateTable() { super(); } }