diff --git a/jdbc/README.md b/jdbc/README.md index 841375a8..f74ff54f 100644 --- a/jdbc/README.md +++ b/jdbc/README.md @@ -1,105 +1,106 @@ # JDBC Driver for YCSB This driver enables YCSB to work with databases accessible via the JDBC protocol. ## Getting Started ### 1. Start your database This driver will connect to databases that use the JDBC protocol, please refer to your databases documentation on information on how to install, configure and start your system. ### 2. Set up YCSB You can clone the YCSB project and compile it to stay up to date with the latest changes. Or you can just download the latest release and unpack it. Either way, instructions for doing so can be found here: https://github.com/brianfrankcooper/YCSB. ### 3. Configure your database and table. You can name your database what ever you want, you will need to provide the database name in the JDBC connection string. You can name your table whatever you like also, but it needs to be specified using the YCSB core properties, the default is to just use 'usertable' as the table name. The expected table schema will look similar to the following, syntactical differences may exist with your specific database: ```sql CREATE TABLE usertable ( YCSB_KEY VARCHAR(255) PRIMARY KEY, FIELD0 TEXT, FIELD1 TEXT, FIELD2 TEXT, FIELD3 TEXT, FIELD4 TEXT, FIELD5 TEXT, FIELD6 TEXT, FIELD7 TEXT, FIELD8 TEXT, FIELD9 TEXT ); ``` Key take aways: * The primary key field needs to be named YCSB_KEY * The other fields need to be prefixed with FIELD and count up starting from 1 * Add the same number of FIELDs as you specify in the YCSB core properties, default is 10. * The type of the fields is not so important as long as they can accept strings of the length that you specify in the YCSB core properties, default is 100. #### JdbcDBCreateTable Utility YCSB has a utility to help create your SQL table. NOTE: It does not support all databases flavors, if it does not work for you, you will have to create your table manually with the schema given above. An example usage of the utility: ```sh java -cp YCSB_HOME/jdbc-binding/lib/jdbc-binding-0.4.0.jar:mysql-connector-java-5.1.37-bin.jar com.yahoo.ycsb.db.JdbcDBCreateTable -P testworkload -P db.properties -n usertable ``` Hint: you need to include your Driver jar in the classpath as well as specify your loading options via a workload file, JDBC connection information, and a table name with ```-n```. Simply executing the JdbcDBCreateTable class without any other parameters will print out usage information. ### 4. Configure YCSB connection properties You need to set the following connection configurations: ```sh db.driver=com.mysql.jdbc.Driver db.url=jdbc:mysql://127.0.0.1:3306/ycsb db.user=admin db.passwd=admin ``` Be sure to use your driver class, a valid JDBC connection string, and credentials to your database. You can add these to your workload configuration or a separate properties file and specify it with ```-P``` or you can add the properties individually to your ycsb command with ```-p```. ### 5. Add your JDBC Driver to the classpath There are several ways to do this, but a couple easy methods are to put a copy of your Driver jar in ```YCSB_HOME/jdbc-binding/lib/``` or just specify the path to your Driver jar with ```-cp``` in your ycsb command. ### 6. Running a workload Before you can actually run the workload, you need to "load" the data first. ```sh bin/ycsb load jdbc -P workloads/workloada -P db.properties -cp mysql-connector-java.jar ``` Then, you can run the workload: ```sh bin/ycsb run jdbc -P workloads/workloada -P db.properties -cp mysql-connector-java.jar ``` ## Configuration Properties ```sh db.driver=com.mysql.jdbc.Driver # The JDBC driver class to use. db.url=jdbc:mysql://127.0.0.1:3306/ycsb # The Database connection URL. db.user=admin # User name for the connection. db.passwd=admin # Password for the connection. +db.batchsize=1000 # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching. jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver. jdbc.autocommit=true # The JDBC connection auto-commit property for the driver. ``` Please refer to https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for all other YCSB core properties. diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java index f6d5c3e2..80dd338e 100644 --- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java +++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java @@ -1,523 +1,564 @@ /** * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you + * + * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * + * may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. + * permissions and limitations under the License. See accompanying + * LICENSE file. */ package com.yahoo.ycsb.db; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import java.sql.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * A class that wraps a JDBC compliant database to allow it to be interfaced * with YCSB. This class extends {@link DB} and implements the database * interface used by YCSB client. - * + * *
* Each client will have its own instance of this class. This client is not * thread safe. - * + * *
* This interface expects a schema ... All * attributes are of type VARCHAR. All accesses are through the primary key. * Therefore, only one index on the primary key is needed. */ public class JdbcDBClient extends DB { /** The class to use as the jdbc driver. */ public static final String DRIVER_CLASS = "db.driver"; /** The URL to connect to the database. */ public static final String CONNECTION_URL = "db.url"; /** The user name to use to connect to the database. */ public static final String CONNECTION_USER = "db.user"; /** The password to use for establishing the connection. */ public static final String CONNECTION_PASSWD = "db.passwd"; + /** The batch size for batched inserts. Set to >0 to use batching */ + public static final String DB_BATCH_SIZE = "db.batchsize"; + /** The JDBC fetch size hinted to the driver. */ public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize"; /** The JDBC connection auto-commit property for the driver. */ public static final String JDBC_AUTO_COMMIT = "jdbc.autocommit"; /** The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY = "fieldcount"; /** Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; /** Representing a NULL value. */ public static final String NULL_VALUE = "NULL"; /** The primary key in the user table. */ public static final String PRIMARY_KEY = "YCSB_KEY"; /** The field name prefix in the table. */ public static final String COLUMN_PREFIX = "FIELD"; private ArrayList conns; private boolean initialized = false; private Properties props; - private Integer jdbcFetchSize; + private int jdbcFetchSize; + private int batchSize; private static final String DEFAULT_PROP = ""; private ConcurrentMap cachedStatements; + private long numRowsInBatch = 0; /** * Ordered field information for insert and update statements. */ private static class OrderedFieldInfo { private String fieldKeys; private List fieldValues; OrderedFieldInfo(String fieldKeys, List fieldValues) { this.fieldKeys = fieldKeys; this.fieldValues = fieldValues; } String getFieldKeys() { return fieldKeys; } List getFieldValues() { return fieldValues; } } /** * The statement type for the prepared statements. */ private static class StatementType { enum Type { INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5); private final int internalType; private Type(int type) { internalType = type; } int getHashCode() { final int prime = 31; int result = 1; result = prime * result + internalType; return result; } } private Type type; private int shardIndex; private int numFields; private String tableName; private String fieldString; StatementType(Type type, String tableName, int numFields, String fieldString, int shardIndex) { this.type = type; this.tableName = tableName; this.numFields = numFields; this.fieldString = fieldString; this.shardIndex = shardIndex; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + numFields + 100 * shardIndex; result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); result = prime * result + ((type == null) ? 0 : type.getHashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } StatementType other = (StatementType) obj; if (numFields != other.numFields) { return false; } if (shardIndex != other.shardIndex) { return false; } if (tableName == null) { if (other.tableName != null) { return false; } } else if (!tableName.equals(other.tableName)) { return false; } if (type != other.type) { return false; } if (!fieldString.equals(other.fieldString)) { return false; } return true; } } /** * For the given key, returns what shard contains data for this key. * * @param key Data key to do operation on * @return Shard index */ private int getShardIndexByKey(String key) { int ret = Math.abs(key.hashCode()) % conns.size(); return ret; } /** * For the given key, returns Connection object that holds connection to the * shard that contains this key. * * @param key Data key to get information for * @return Connection object */ private Connection getShardConnectionByKey(String key) { return conns.get(getShardIndexByKey(key)); } private void cleanupAllConnections() throws SQLException { for (Connection conn : conns) { conn.close(); } } + /** Returns parsed int value from the properties if set, otherwise returns -1. */ + private static int getIntProperty(Properties props, String key) throws DBException { + String valueStr = props.getProperty(key); + if (valueStr != null) { + try { + return Integer.parseInt(valueStr); + } catch (NumberFormatException nfe) { + System.err.println("Invalid " + key + " specified: " + valueStr); + throw new DBException(nfe); + } + } + return -1; + } + @Override public void init() throws DBException { if (initialized) { System.err.println("Client connection already initialized."); return; } props = getProperties(); String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP); String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP); String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); String driver = props.getProperty(DRIVER_CLASS); - String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE); - if (jdbcFetchSizeStr != null) { - try { - this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr); - } catch (NumberFormatException nfe) { - System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr); - throw new DBException(nfe); - } - } + this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE); + this.batchSize = getIntProperty(props, DB_BATCH_SIZE); String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString()); Boolean autoCommit = Boolean.parseBoolean(autoCommitStr); try { if (driver != null) { Class.forName(driver); } int shardCount = 0; conns = new ArrayList(3); for (String url : urls.split(",")) { System.out.println("Adding shard node URL: " + url); Connection conn = DriverManager.getConnection(url, user, passwd); // Since there is no explicit commit method in the DB interface, all // operations should auto commit, except when explicitly told not to // (this is necessary in cases such as for PostgreSQL when running a // scan workload with fetchSize) conn.setAutoCommit(autoCommit); shardCount++; conns.add(conn); } - System.out.println("Using " + shardCount + " shards"); + System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize); cachedStatements = new ConcurrentHashMap(); } catch (ClassNotFoundException e) { System.err.println("Error in initializing the JDBS driver: " + e); throw new DBException(e); } catch (SQLException e) { System.err.println("Error in database operation: " + e); throw new DBException(e); } catch (NumberFormatException e) { System.err.println("Invalid value for fieldcount property. " + e); throw new DBException(e); } initialized = true; } @Override public void cleanup() throws DBException { + if (batchSize > 0) { + try { + // commit un-finished batches + for (PreparedStatement st : cachedStatements.values()) { + if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) { + st.executeBatch(); + } + } + } catch (SQLException e) { + System.err.println("Error in cleanup execution. " + e); + throw new DBException(e); + } + } + try { cleanupAllConnections(); } catch (SQLException e) { System.err.println("Error in closing the connection. " + e); throw new DBException(e); } } private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key) throws SQLException { StringBuilder insert = new StringBuilder("INSERT INTO "); insert.append(insertType.tableName); insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")"); insert.append(" VALUES(?"); for (int i = 0; i < insertType.numFields; i++) { insert.append(",?"); } insert.append(")"); PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString()); PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement); if (stmt == null) { return insertStatement; } return stmt; } private PreparedStatement createAndCacheReadStatement(StatementType readType, String key) throws SQLException { StringBuilder read = new StringBuilder("SELECT * FROM "); read.append(readType.tableName); read.append(" WHERE "); read.append(PRIMARY_KEY); read.append(" = "); read.append("?"); PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString()); PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement); if (stmt == null) { return readStatement; } return stmt; } private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key) throws SQLException { StringBuilder delete = new StringBuilder("DELETE FROM "); delete.append(deleteType.tableName); delete.append(" WHERE "); delete.append(PRIMARY_KEY); delete.append(" = ?"); PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString()); PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement); if (stmt == null) { return deleteStatement; } return stmt; } private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key) throws SQLException { String[] fieldKeys = updateType.fieldString.split(","); StringBuilder update = new StringBuilder("UPDATE "); update.append(updateType.tableName); update.append(" SET "); for (int i = 0; i < fieldKeys.length; i++) { update.append(fieldKeys[i]); update.append("=?"); if (i < fieldKeys.length - 1) { update.append(", "); } } update.append(" WHERE "); update.append(PRIMARY_KEY); update.append(" = ?"); PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString()); PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement); if (stmt == null) { return insertStatement; } return stmt; } private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key) throws SQLException { StringBuilder select = new StringBuilder("SELECT * FROM "); select.append(scanType.tableName); select.append(" WHERE "); select.append(PRIMARY_KEY); select.append(" >= ?"); select.append(" ORDER BY "); select.append(PRIMARY_KEY); select.append(" LIMIT ?"); PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString()); - if (this.jdbcFetchSize != null) { + if (this.jdbcFetchSize > 0) { scanStatement.setFetchSize(this.jdbcFetchSize); } PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement); if (stmt == null) { return scanStatement; } return stmt; } @Override public Status read(String tableName, String key, Set fields, HashMap result) { try { StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, "", getShardIndexByKey(key)); PreparedStatement readStatement = cachedStatements.get(type); if (readStatement == null) { readStatement = createAndCacheReadStatement(type, key); } readStatement.setString(1, key); ResultSet resultSet = readStatement.executeQuery(); if (!resultSet.next()) { resultSet.close(); return Status.NOT_FOUND; } if (result != null && fields != null) { for (String field : fields) { String value = resultSet.getString(field); result.put(field, new StringByteIterator(value)); } } resultSet.close(); return Status.OK; } catch (SQLException e) { System.err.println("Error in processing read of table " + tableName + ": " + e); return Status.ERROR; } } @Override public Status scan(String tableName, String startKey, int recordcount, Set fields, Vector> result) { try { StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, "", getShardIndexByKey(startKey)); PreparedStatement scanStatement = cachedStatements.get(type); if (scanStatement == null) { scanStatement = createAndCacheScanStatement(type, startKey); } scanStatement.setString(1, startKey); scanStatement.setInt(2, recordcount); ResultSet resultSet = scanStatement.executeQuery(); for (int i = 0; i < recordcount && resultSet.next(); i++) { if (result != null && fields != null) { HashMap values = new HashMap(); for (String field : fields) { String value = resultSet.getString(field); values.put(field, new StringByteIterator(value)); } result.add(values); } } resultSet.close(); return Status.OK; } catch (SQLException e) { System.err.println("Error in processing scan of table: " + tableName + e); return Status.ERROR; } } @Override public Status update(String tableName, String key, HashMap values) { try { int numFields = values.size(); OrderedFieldInfo fieldInfo = getFieldInfo(values); StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key)); PreparedStatement updateStatement = cachedStatements.get(type); if (updateStatement == null) { updateStatement = createAndCacheUpdateStatement(type, key); } int index = 1; for (String value: fieldInfo.getFieldValues()) { updateStatement.setString(index++, value); } updateStatement.setString(index, key); int result = updateStatement.executeUpdate(); if (result == 1) { return Status.OK; } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing update to table: " + tableName + e); return Status.ERROR; } } @Override public Status insert(String tableName, String key, HashMap values) { try { int numFields = values.size(); OrderedFieldInfo fieldInfo = getFieldInfo(values); StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, fieldInfo.getFieldKeys(), getShardIndexByKey(key)); PreparedStatement insertStatement = cachedStatements.get(type); if (insertStatement == null) { insertStatement = createAndCacheInsertStatement(type, key); } insertStatement.setString(1, key); int index = 2; for (String value: fieldInfo.getFieldValues()) { insertStatement.setString(index++, value); } - int result = insertStatement.executeUpdate(); + int result; + if (batchSize > 0) { + insertStatement.addBatch(); + if (++numRowsInBatch % batchSize == 0) { + int[] results = insertStatement.executeBatch(); + for (int r : results) { + if (r != 1) { + return Status.ERROR; + } + } + return Status.OK; + } + return Status.BATCHED_OK; + } else { + result = insertStatement.executeUpdate(); + } if (result == 1) { return Status.OK; } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing insert to table: " + tableName + e); return Status.ERROR; } } @Override public Status delete(String tableName, String key) { try { StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, "", getShardIndexByKey(key)); PreparedStatement deleteStatement = cachedStatements.get(type); if (deleteStatement == null) { deleteStatement = createAndCacheDeleteStatement(type, key); } deleteStatement.setString(1, key); int result = deleteStatement.executeUpdate(); if (result == 1) { return Status.OK; } return Status.UNEXPECTED_STATE; } catch (SQLException e) { System.err.println("Error in processing delete to table: " + tableName + e); return Status.ERROR; } } private OrderedFieldInfo getFieldInfo(HashMap values) { String fieldKeys = ""; List fieldValues = new ArrayList(); int count = 0; for (Map.Entry entry : values.entrySet()) { fieldKeys += entry.getKey(); if (count < values.size() - 1) { fieldKeys += ","; } fieldValues.add(count, entry.getValue().toString()); count++; } return new OrderedFieldInfo(fieldKeys, fieldValues); } -} \ No newline at end of file +} diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java index c3cc3024..6317f483 100644 --- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java +++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java @@ -1,322 +1,388 @@ /** * Copyright (c) 2015 - 2016 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import static org.junit.Assert.*; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.StringByteIterator; import org.junit.*; import java.sql.*; import java.util.HashMap; import java.util.HashSet; import java.util.Properties; import java.util.Vector; public class JdbcDBClientTest { private static final String TEST_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; private static final String TEST_DB_URL = "jdbc:hsqldb:mem:ycsb"; private static final String TEST_DB_USER = "sa"; private static final String TABLE_NAME = "USERTABLE"; private static final int FIELD_LENGTH = 32; private static final String FIELD_PREFIX = "FIELD"; private static final String KEY_PREFIX = "user"; private static final String KEY_FIELD = "YCSB_KEY"; private static final int NUM_FIELDS = 3; private static Connection jdbcConnection = null; private static JdbcDBClient jdbcDBClient = null; @BeforeClass public static void setup() { - try { - jdbcConnection = DriverManager.getConnection(TEST_DB_URL); - jdbcDBClient = new JdbcDBClient(); - - Properties p = new Properties(); - p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); - p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); - p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + setupWithBatch(1); + } - jdbcDBClient.setProperties(p); - jdbcDBClient.init(); - } catch (SQLException e) { - e.printStackTrace(); - fail("Could not create local Database"); - } catch (DBException e) { - e.printStackTrace(); - fail("Could not create JdbcDBClient instance"); - } + public static void setupWithBatch(int batchSize) { + try { + jdbcConnection = DriverManager.getConnection(TEST_DB_URL); + jdbcDBClient = new JdbcDBClient(); + + Properties p = new Properties(); + p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); + p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); + p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize)); + + jdbcDBClient.setProperties(p); + jdbcDBClient.init(); + } catch (SQLException e) { + e.printStackTrace(); + fail("Could not create local Database"); + } catch (DBException e) { + e.printStackTrace(); + fail("Could not create JdbcDBClient instance"); + } } @AfterClass public static void teardown() { try { if (jdbcConnection != null) { jdbcConnection.close(); } } catch (SQLException e) { e.printStackTrace(); } - + try { if (jdbcDBClient != null) { jdbcDBClient.cleanup(); } } catch (DBException e) { e.printStackTrace(); } } @Before public void prepareTest() { try { DatabaseMetaData metaData = jdbcConnection.getMetaData(); ResultSet tableResults = metaData.getTables(null, null, TABLE_NAME, null); if (tableResults.next()) { // If the table already exists, just truncate it jdbcConnection.prepareStatement( String.format("TRUNCATE TABLE %s", TABLE_NAME) ).execute(); } else { // If the table does not exist then create it StringBuilder createString = new StringBuilder( String.format("CREATE TABLE %s (%s VARCHAR(100) PRIMARY KEY", TABLE_NAME, KEY_FIELD) ); for (int i = 0; i < NUM_FIELDS; i++) { createString.append( String.format(", %s%d VARCHAR(100)", FIELD_PREFIX, i) ); } createString.append(")"); jdbcConnection.prepareStatement(createString.toString()).execute(); } } catch (SQLException e) { e.printStackTrace(); fail("Failed to prepare test"); } } /* This is a copy of buildDeterministicValue() from core:com.yahoo.ycsb.workloads.CoreWorkload.java. That method is neither public nor static so we need a copy. */ private String buildDeterministicValue(String key, String fieldkey) { int size = FIELD_LENGTH; StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /* Inserts a row of deterministic values for the given insertKey using the jdbcDBClient. */ private HashMap insertRow(String insertKey) { HashMap insertMap = new HashMap(); for (int i = 0; i < 3; i++) { insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i))); } jdbcDBClient.insert(TABLE_NAME, insertKey, insertMap); return insertMap; } @Test public void insertTest() { try { String insertKey = "user0"; HashMap insertMap = insertRow(insertKey); ResultSet resultSet = jdbcConnection.prepareStatement( String.format("SELECT * FROM %s", TABLE_NAME) ).executeQuery(); // Check we have a result Row assertTrue(resultSet.next()); // Check that all the columns have expected values assertEquals(resultSet.getString(KEY_FIELD), insertKey); for (int i = 0; i < 3; i++) { assertEquals(resultSet.getString(FIELD_PREFIX + i), insertMap.get(FIELD_PREFIX + i).toString()); } // Check that we do not have any more rows assertFalse(resultSet.next()); resultSet.close(); } catch (SQLException e) { e.printStackTrace(); fail("Failed insertTest"); } } @Test public void updateTest() { try { String preupdateString = "preupdate"; StringBuilder fauxInsertString = new StringBuilder( String.format("INSERT INTO %s VALUES(?", TABLE_NAME) ); for (int i = 0; i < NUM_FIELDS; i++) { fauxInsertString.append(",?"); } fauxInsertString.append(")"); PreparedStatement fauxInsertStatement = jdbcConnection.prepareStatement(fauxInsertString.toString()); for (int i = 2; i < NUM_FIELDS + 2; i++) { fauxInsertStatement.setString(i, preupdateString); } fauxInsertStatement.setString(1, "user0"); fauxInsertStatement.execute(); fauxInsertStatement.setString(1, "user1"); fauxInsertStatement.execute(); fauxInsertStatement.setString(1, "user2"); fauxInsertStatement.execute(); HashMap updateMap = new HashMap(); for (int i = 0; i < 3; i++) { updateMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue("user1", FIELD_PREFIX + i))); } jdbcDBClient.update(TABLE_NAME, "user1", updateMap); ResultSet resultSet = jdbcConnection.prepareStatement( String.format("SELECT * FROM %s ORDER BY %s", TABLE_NAME, KEY_FIELD) ).executeQuery(); // Ensure that user0 record was not changed resultSet.next(); assertEquals("Assert first row key is user0", resultSet.getString(KEY_FIELD), "user0"); for (int i = 0; i < 3; i++) { assertEquals("Assert first row fields contain preupdateString", resultSet.getString(FIELD_PREFIX + i), preupdateString); } // Check that all the columns have expected values for user1 record resultSet.next(); assertEquals(resultSet.getString(KEY_FIELD), "user1"); for (int i = 0; i < 3; i++) { assertEquals(resultSet.getString(FIELD_PREFIX + i), updateMap.get(FIELD_PREFIX + i).toString()); } // Ensure that user2 record was not changed resultSet.next(); assertEquals("Assert third row key is user2", resultSet.getString(KEY_FIELD), "user2"); for (int i = 0; i < 3; i++) { assertEquals("Assert third row fields contain preupdateString", resultSet.getString(FIELD_PREFIX + i), preupdateString); } resultSet.close(); } catch (SQLException e) { e.printStackTrace(); fail("Failed updateTest"); } } @Test public void readTest() { String insertKey = "user0"; HashMap insertMap = insertRow(insertKey); HashSet readFields = new HashSet(); HashMap readResultMap = new HashMap(); // Test reading a single field readFields.add("FIELD0"); jdbcDBClient.read(TABLE_NAME, insertKey, readFields, readResultMap); assertEquals("Assert that result has correct number of fields", readFields.size(), readResultMap.size()); for (String field: readFields) { assertEquals("Assert " + field + " was read correctly", insertMap.get(field).toString(), readResultMap.get(field).toString()); } readResultMap = new HashMap(); // Test reading all fields readFields.add("FIELD1"); readFields.add("FIELD2"); jdbcDBClient.read(TABLE_NAME, insertKey, readFields, readResultMap); assertEquals("Assert that result has correct number of fields", readFields.size(), readResultMap.size()); for (String field: readFields) { assertEquals("Assert " + field + " was read correctly", insertMap.get(field).toString(), readResultMap.get(field).toString()); } } @Test public void deleteTest() { try { insertRow("user0"); String deleteKey = "user1"; insertRow(deleteKey); insertRow("user2"); jdbcDBClient.delete(TABLE_NAME, deleteKey); ResultSet resultSet = jdbcConnection.prepareStatement( String.format("SELECT * FROM %s", TABLE_NAME) ).executeQuery(); int totalRows = 0; while (resultSet.next()) { assertNotEquals("Assert this is not the deleted row key", deleteKey, resultSet.getString(KEY_FIELD)); totalRows++; } // Check we do not have a result Row assertEquals("Assert we ended with the correct number of rows", totalRows, 2); resultSet.close(); } catch (SQLException e) { e.printStackTrace(); fail("Failed deleteTest"); } } @Test public void scanTest() throws SQLException { HashMap> keyMap = new HashMap>(); for (int i = 0; i < 5; i++) { String insertKey = KEY_PREFIX + i; keyMap.put(insertKey, insertRow(insertKey)); } HashSet fieldSet = new HashSet(); fieldSet.add("FIELD0"); fieldSet.add("FIELD1"); int startIndex = 1; int resultRows = 3; Vector> resultVector = new Vector>(); jdbcDBClient.scan(TABLE_NAME, KEY_PREFIX + startIndex, resultRows, fieldSet, resultVector); // Check the resultVector is the correct size assertEquals("Assert the correct number of results rows were returned", resultRows, resultVector.size()); // Check each vector row to make sure we have the correct fields int testIndex = startIndex; for (HashMap result: resultVector) { assertEquals("Assert that this row has the correct number of fields", fieldSet.size(), result.size()); for (String field: fieldSet) { assertEquals("Assert this field is correct in this row", keyMap.get(KEY_PREFIX + testIndex).get(field).toString(), result.get(field).toString()); } testIndex++; } } + + @Test + public void insertBatchTest() throws DBException { + insertBatchTest(20); + } + + @Test + public void insertPartialBatchTest() throws DBException { + insertBatchTest(19); + } + + public void insertBatchTest(int numRows) throws DBException { + teardown(); + setupWithBatch(10); + + try { + String insertKey = "user0"; + HashMap insertMap = insertRow(insertKey); + + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + // Check we do not have a result Row (because batch is not full yet + assertFalse(resultSet.next()); + + // insert more rows, completing 1 batch (still results are partial). + for (int i = 1; i < numRows; i++) { + insertMap = insertRow("user" + i); + } + + // + assertNumRows(10 * (numRows / 10)); + + // call cleanup, which should insert the partial batch + jdbcDBClient.cleanup(); + + // Check that we have all rows + assertNumRows(numRows); + + } catch (SQLException e) { + e.printStackTrace(); + fail("Failed insertBatchTest"); + } finally { + teardown(); // for next tests + setup(); + } + } + + private void assertNumRows(long numRows) throws SQLException { + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + for (int i = 0; i < numRows; i++) { + assertTrue("expecting " + numRows + " results, received only " + i, resultSet.next()); + } + assertFalse("expecting " + numRows + " results, received more", resultSet.next()); + + resultSet.close(); + } }