...
* All attributes are of type VARCHAR. All accesses are through the primary key. Therefore,
* only one index on the primary key is needed.
*
* The following options must be passed when using this database client.
*
*
* - db.driver The JDBC driver class to use.
* - db.url The Database connection URL.
* - db.user User name for the connection.
* - db.passwd Password for the connection.
*
*
* @author sudipto
*
*/
public class JdbcDBClient extends DB implements JdbcDBClientConstants {
private ArrayList conns;
private boolean initialized = false;
private Properties props;
private Integer jdbcFetchSize;
private static final String DEFAULT_PROP = "";
private ConcurrentMap cachedStatements;
/**
* The statement type for the prepared statements.
*/
private static class StatementType {
enum Type {
INSERT(1),
DELETE(2),
READ(3),
UPDATE(4),
SCAN(5),
;
int internalType;
private Type(int type) {
internalType = type;
}
int getHashCode() {
final int prime = 31;
int result = 1;
result = prime * result + internalType;
return result;
}
}
Type type;
int shardIndex;
int numFields;
String tableName;
StatementType(Type type, String tableName, int numFields, int _shardIndex) {
this.type = type;
this.tableName = tableName;
this.numFields = numFields;
this.shardIndex = _shardIndex;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + numFields + 100 * shardIndex;
result = prime * result
+ ((tableName == null) ? 0 : tableName.hashCode());
result = prime * result + ((type == null) ? 0 : type.getHashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StatementType other = (StatementType) obj;
if (numFields != other.numFields)
return false;
if (shardIndex != other.shardIndex)
return false;
if (tableName == null) {
if (other.tableName != null)
return false;
} else if (!tableName.equals(other.tableName))
return false;
if (type != other.type)
return false;
return true;
}
}
/**
* For the given key, returns what shard contains data for this key
*
* @param key Data key to do operation on
* @return Shard index
*/
private int getShardIndexByKey(String key) {
int ret = Math.abs(key.hashCode()) % conns.size();
//System.out.println(conns.size() + ": Shard instance for "+ key + " (hash " + key.hashCode()+ " ) " + " is " + ret);
return ret;
}
/**
* For the given key, returns Connection object that holds connection
* to the shard that contains this key
*
* @param key Data key to get information for
* @return Connection object
*/
private Connection getShardConnectionByKey(String key) {
return conns.get(getShardIndexByKey(key));
}
private void cleanupAllConnections() throws SQLException {
for(Connection conn: conns) {
conn.close();
}
}
/**
* Initialize the database connection and set it up for sending requests to the database.
* This must be called once per client.
* @throws
*/
@Override
public void init() throws DBException {
if (initialized) {
System.err.println("Client connection already initialized.");
return;
}
props = getProperties();
String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
String driver = props.getProperty(DRIVER_CLASS);
String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE);
if (jdbcFetchSizeStr != null) {
try {
this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr);
} catch (NumberFormatException nfe) {
System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr);
throw new DBException(nfe);
}
}
String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
try {
if (driver != null) {
Class.forName(driver);
}
int shardCount = 0;
conns = new ArrayList(3);
for (String url: urls.split(",")) {
System.out.println("Adding shard node URL: " + url);
Connection conn = DriverManager.getConnection(url, user, passwd);
// Since there is no explicit commit method in the DB interface, all
// operations should auto commit, except when explicitly told not to
// (this is necessary in cases such as for PostgreSQL when running a
// scan workload with fetchSize)
conn.setAutoCommit(autoCommit);
shardCount++;
conns.add(conn);
}
System.out.println("Using " + shardCount + " shards");
cachedStatements = new ConcurrentHashMap();
} catch (ClassNotFoundException e) {
System.err.println("Error in initializing the JDBS driver: " + e);
throw new DBException(e);
} catch (SQLException e) {
System.err.println("Error in database operation: " + e);
throw new DBException(e);
} catch (NumberFormatException e) {
System.err.println("Invalid value for fieldcount property. " + e);
throw new DBException(e);
}
initialized = true;
}
@Override
public void cleanup() throws DBException {
try {
cleanupAllConnections();
} catch (SQLException e) {
System.err.println("Error in closing the connection. " + e);
throw new DBException(e);
}
}
private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
throws SQLException {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.tableName);
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
- insert.append(");");
+ insert.append(")");
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
if (stmt == null) return insertStatement;
else return stmt;
}
private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
throws SQLException {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
- read.append("?;");
+ read.append("?");
PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
if (stmt == null) return readStatement;
else return stmt;
}
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
throws SQLException {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
- delete.append(" = ?;");
+ delete.append(" = ?");
PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
if (stmt == null) return deleteStatement;
else return stmt;
}
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
throws SQLException {
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
for (int i = 0; i < updateType.numFields; i++) {
update.append(COLUMN_PREFIX);
update.append(i);
update.append("=?");
if (i < updateType.numFields - 1) update.append(", ");
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
- update.append(" = ?;");
+ update.append(" = ?");
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
if (stmt == null) return insertStatement;
else return stmt;
}
private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
throws SQLException {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ?");
select.append(" ORDER BY ");
select.append(PRIMARY_KEY);
select.append(" LIMIT ?;");
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
if (this.jdbcFetchSize != null) scanStatement.setFetchSize(this.jdbcFetchSize);
PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
if (stmt == null) return scanStatement;
else return stmt;
}
@Override
public Status read(String tableName, String key, Set fields,
HashMap result) {
try {
StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, getShardIndexByKey(key));
PreparedStatement readStatement = cachedStatements.get(type);
if (readStatement == null) {
readStatement = createAndCacheReadStatement(type, key);
}
readStatement.setString(1, key);
ResultSet resultSet = readStatement.executeQuery();
if (!resultSet.next()) {
resultSet.close();
return Status.NOT_FOUND;
}
if (result != null && fields != null) {
for (String field : fields) {
String value = resultSet.getString(field);
result.put(field, new StringByteIterator(value));
}
}
resultSet.close();
return Status.OK;
} catch (SQLException e) {
System.err.println("Error in processing read of table " + tableName + ": "+e);
return Status.ERROR;
}
}
@Override
public Status scan(String tableName, String startKey, int recordcount,
Set fields, Vector> result) {
try {
StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, getShardIndexByKey(startKey));
PreparedStatement scanStatement = cachedStatements.get(type);
if (scanStatement == null) {
scanStatement = createAndCacheScanStatement(type, startKey);
}
scanStatement.setString(1, startKey);
scanStatement.setInt(2, recordcount);
ResultSet resultSet = scanStatement.executeQuery();
for (int i = 0; i < recordcount && resultSet.next(); i++) {
if (result != null && fields != null) {
HashMap values = new HashMap();
for (String field : fields) {
String value = resultSet.getString(field);
values.put(field, new StringByteIterator(value));
}
result.add(values);
}
}
resultSet.close();
return Status.OK;
} catch (SQLException e) {
System.err.println("Error in processing scan of table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status update(String tableName, String key, HashMap values) {
try {
int numFields = values.size();
StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, getShardIndexByKey(key));
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
updateStatement = createAndCacheUpdateStatement(type, key);
}
int index = 1;
for (Map.Entry entry : values.entrySet()) {
updateStatement.setString(index++, entry.getValue().toString());
}
updateStatement.setString(index, key);
int result = updateStatement.executeUpdate();
if (result == 1) return Status.OK;
else return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing update to table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status insert(String tableName, String key, HashMap values) {
try {
int numFields = values.size();
StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, getShardIndexByKey(key));
PreparedStatement insertStatement = cachedStatements.get(type);
if (insertStatement == null) {
insertStatement = createAndCacheInsertStatement(type, key);
}
insertStatement.setString(1, key);
int index = 2;
for (Map.Entry entry : values.entrySet()) {
String field = entry.getValue().toString();
insertStatement.setString(index++, field);
}
int result = insertStatement.executeUpdate();
if (result == 1) return Status.OK;
else return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing insert to table: " + tableName + e);
return Status.ERROR;
}
}
@Override
public Status delete(String tableName, String key) {
try {
StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, getShardIndexByKey(key));
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
deleteStatement = createAndCacheDeleteStatement(type, key);
}
deleteStatement.setString(1, key);
int result = deleteStatement.executeUpdate();
if (result == 1) return Status.OK;
else return Status.UNEXPECTED_STATE;
} catch (SQLException e) {
System.err.println("Error in processing delete to table: " + tableName + e);
return Status.ERROR;
}
}
}
diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
index 92b91e10..e2983d08 100644
--- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
+++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
@@ -1,35 +1,324 @@
package com.yahoo.ycsb.db;
import static org.junit.Assert.*;
-import org.junit.Test;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.StringByteIterator;
+import org.junit.*;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Vector;
/**
* Created by kruthar on 11/2/15.
*/
public class JdbcDBClientTest {
+ private static final String TEST_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
+ private static final String TEST_DB_URL = "jdbc:hsqldb:mem:ycsb";
+ private static final String TEST_DB_USER = "sa";
+ private static final String TABLE_NAME = "USERTABLE";
+ private static final int FIELD_LENGTH = 32;
+ private static final String FIELD_PREFIX = "FIELD";
+ private static final String KEY_PREFIX = "user";
+ private static final String KEY_FIELD = "YCSB_KEY";
+ private static final int NUM_FIELDS = 3;
+
+ private static Connection jdbcConnection = null;
+ private static JdbcDBClient jdbcDBClient = null;
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ Class driverClass = Class.forName(TEST_DB_DRIVER);
+ jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
+
+ jdbcDBClient = new JdbcDBClient();
+
+ Properties p = new Properties();
+ p.setProperty(JdbcDBClientConstants.CONNECTION_URL, TEST_DB_URL);
+ p.setProperty(JdbcDBClientConstants.DRIVER_CLASS, TEST_DB_DRIVER);
+ p.setProperty(JdbcDBClientConstants.CONNECTION_USER, TEST_DB_USER);
+
+ jdbcDBClient.setProperties(p);
+ jdbcDBClient.init();
+
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ fail("Could not find Driver Class: " + TEST_DB_DRIVER);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Could not create local Database");
+ } catch (DBException e) {
+ e.printStackTrace();
+ fail("Could not create JdbcDBClient instance");
+ }
+ }
+
+ @AfterClass
+ public static void teardown() {
+ try {
+ jdbcConnection.close();
+ jdbcConnection = DriverManager.getConnection(TEST_DB_URL + ";shutdown=true");
+ } catch (SQLNonTransientConnectionException e) {
+ // Expected exception when database is destroyed
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Could not drop local Database");
+ }
+ }
+
+ @Before
+ public void prepareTest() {
+ try {
+ DatabaseMetaData metaData = jdbcConnection.getMetaData();
+ ResultSet tableResults = metaData.getTables(null, null, TABLE_NAME, null);
+ if (tableResults.next()) {
+ // If the table already exists, just truncate it
+ jdbcConnection.prepareStatement(
+ String.format("TRUNCATE TABLE %s", TABLE_NAME)
+ ).execute();
+ } else {
+ // If the table does not exist then create it
+ StringBuilder createString = new StringBuilder(
+ String.format("CREATE TABLE %s (%s VARCHAR(100) PRIMARY KEY", TABLE_NAME, KEY_FIELD)
+ );
+ for (int i = 0; i < NUM_FIELDS; i++) {
+ createString.append(
+ String.format(", %s%d VARCHAR(100)", FIELD_PREFIX, i)
+ );
+ }
+ createString.append(")");
+ jdbcConnection.prepareStatement(createString.toString()).execute();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Failed to prepare test");
+ }
+
+ }
+
+ /*
+ This is a copy of buildDeterministicValue() from core:com.yahoo.ycsb.workloads.CoreWorkload.java.
+ That method is neither public nor static so we need a copy.
+ */
+ private String buildDeterministicValue(String key, String fieldkey) {
+ int size = FIELD_LENGTH;
+ StringBuilder sb = new StringBuilder(size);
+ sb.append(key);
+ sb.append(':');
+ sb.append(fieldkey);
+ while (sb.length() < size) {
+ sb.append(':');
+ sb.append(sb.toString().hashCode());
+ }
+ sb.setLength(size);
+
+ return sb.toString();
+ }
+
+ /*
+ Inserts a row of deterministic values for the given insertKey using the jdbcDBClient.
+ */
+ private HashMap insertRow(String insertKey) {
+ HashMap insertMap = new HashMap();
+ for (int i = 0; i < 3; i++) {
+ insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i)));
+ }
+ jdbcDBClient.insert(TABLE_NAME, insertKey, insertMap);
+
+ return insertMap;
+ }
+
@Test
public void insertTest() {
- assertTrue(true);
+ try {
+ String insertKey = "user0";
+ HashMap insertMap = insertRow(insertKey);
+
+ ResultSet resultSet = jdbcConnection.prepareStatement(
+ String.format("SELECT * FROM %s", TABLE_NAME)
+ ).executeQuery();
+
+ // Check we have a result Row
+ assertTrue(resultSet.next());
+ // Check that all the columns have expected values
+ assertEquals(resultSet.getString(KEY_FIELD), insertKey);
+ for (int i = 0; i < 3; i++) {
+ // TODO: This will fail until the fix is made to insert and update fields in the correct order.
+ // TODO: Uncomment this assertEquals when the fix is made.
+ //assertEquals(resultSet.getString(FIELD_PREFIX + i), insertMap.get(FIELD_PREFIX + i));
+ }
+ // Check that we do not have any more rows
+ assertFalse(resultSet.next());
+
+ resultSet.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Failed insertTest");
+ }
}
@Test
public void updateTest() {
+ try {
+ String preupdateString = "preupdate";
+ StringBuilder fauxInsertString = new StringBuilder(
+ String.format("INSERT INTO %s VALUES(?", TABLE_NAME)
+ );
+ for (int i = 0; i < NUM_FIELDS; i++) {
+ fauxInsertString.append(",?");
+ }
+ fauxInsertString.append(")");
+
+ PreparedStatement fauxInsertStatement = jdbcConnection.prepareStatement(fauxInsertString.toString());
+ for (int i = 2; i < NUM_FIELDS + 2; i++) {
+ fauxInsertStatement.setString(i, preupdateString);
+ }
+
+ fauxInsertStatement.setString(1, "user0");
+ fauxInsertStatement.execute();
+ fauxInsertStatement.setString(1, "user1");
+ fauxInsertStatement.execute();
+ fauxInsertStatement.setString(1, "user2");
+ fauxInsertStatement.execute();
+
+ HashMap updateMap = new HashMap();
+ for (int i = 0; i < 3; i++) {
+ updateMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue("user1", FIELD_PREFIX + i)));
+ }
+
+ jdbcDBClient.update(TABLE_NAME, "user1", updateMap);
+
+ ResultSet resultSet = jdbcConnection.prepareStatement(
+ String.format("SELECT * FROM %s ORDER BY %s", TABLE_NAME, KEY_FIELD)
+ ).executeQuery();
+
+ // Ensure that user0 record was not changed
+ resultSet.next();
+ assertEquals("Assert first row key is user0", resultSet.getString(KEY_FIELD), "user0");
+ for (int i = 0; i < 3; i++) {
+ assertEquals("Assert first row fields contain preupdateString", resultSet.getString(FIELD_PREFIX + i), preupdateString);
+ }
+
+ // Check that all the columns have expected values for user1 record
+ resultSet.next();
+ assertEquals(resultSet.getString(KEY_FIELD), "user1");
+ for (int i = 0; i < 3; i++) {
+ // TODO: This will fail until the fix is made to insert and update fields in the correct order.
+ // TODO: Uncomment this assertEquals when the fix is made.
+ //assertEquals(resultSet.getString(FIELD_PREFIX + i), updateMap.get(FIELD_PREFIX + i));
+ }
+
+ // Ensure that user2 record was not changed
+ resultSet.next();
+ assertEquals("Assert third row key is user2", resultSet.getString(KEY_FIELD), "user2");
+ for (int i = 0; i < 3; i++) {
+ assertEquals("Assert third row fields contain preupdateString", resultSet.getString(FIELD_PREFIX + i), preupdateString);
+ }
+ resultSet.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Failed updateTest");
+ }
+
+
assertTrue(true);
}
@Test
public void readTest() {
- assertTrue(true);
+ String insertKey = "user0";
+ HashMap insertMap = insertRow(insertKey);
+ HashSet readFields = new HashSet();
+ HashMap readResultMap = new HashMap();
+
+ // Test reading a single field
+ readFields.add("FIELD0");
+ jdbcDBClient.read(TABLE_NAME, insertKey, readFields, readResultMap);
+ assertEquals("Assert that result has correct number of fields", readFields.size(), readResultMap.size());
+ for (String field: readFields) {
+ // TODO: This will fail until the fix is made to insert and update fields in the correct order.
+ // TODO: Uncomment this assertEquals when the fix is made.
+ //assertEquals("Assert " + field + " was read correctly", insertMap.get(field), readResultMap.get(field));
+ }
+
+ readResultMap = new HashMap();
+
+ // Test reading all fields
+ readFields.add("FIELD1");
+ readFields.add("FIELD2");
+ jdbcDBClient.read(TABLE_NAME, insertKey, readFields, readResultMap);
+ assertEquals("Assert that result has correct number of fields", readFields.size(), readResultMap.size());
+ for (String field: readFields) {
+ // TODO: This will fail until the fix is made to insert and update fields in the correct order.
+ // TODO: Uncomment this assertEquals when the fix is made.
+ //assertEquals("Assert " + field + " was read correctly", insertMap.get(field), readResultMap.get(field));
+ }
}
@Test
public void deleteTest() {
- assertTrue(true);
+ try {
+ String insertBeforeKey = "user0";
+ HashMap insertBeforeMap = insertRow(insertBeforeKey);
+ String deleteKey = "user1";
+ insertRow(deleteKey);
+ String insertAfterKey = "user2";
+ HashMap insertAfterMap = insertRow(insertAfterKey);
+
+ jdbcDBClient.delete(TABLE_NAME, deleteKey);
+
+ ResultSet resultSet = jdbcConnection.prepareStatement(
+ String.format("SELECT * FROM %s", TABLE_NAME)
+ ).executeQuery();
+
+ int totalRows = 0;
+ while (resultSet.next()) {
+ assertNotEquals("Assert this is not the deleted row key", deleteKey, resultSet.getString(KEY_FIELD));
+ totalRows++;
+ }
+ // Check we do not have a result Row
+ assertEquals("Assert we ended with the correct number of rows", totalRows, 2);
+
+ resultSet.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Failed deleteTest");
+ }
}
@Test
- public void scanTest() {
- assertTrue(true);
+ public void scanTest() throws SQLException {
+ HashMap> keyMap = new HashMap>();
+ for (int i = 0; i < 5; i++) {
+ String insertKey = KEY_PREFIX + i;
+ keyMap.put(insertKey, insertRow(insertKey));
+ }
+ HashSet fieldSet = new HashSet();
+ fieldSet.add("FIELD0");
+ fieldSet.add("FIELD1");
+ int startIndex = 1;
+ int resultRows = 3;
+
+ Vector> resultVector = new Vector>();
+ jdbcDBClient.scan(TABLE_NAME, KEY_PREFIX + startIndex, resultRows, fieldSet, resultVector);
+
+ // Check the resultVector is the correct size
+ assertEquals("Assert the correct number of results rows were returned", resultRows, resultVector.size());
+ // Check each vector row to make sure we have the correct fields
+ int testIndex = startIndex;
+ for (HashMap result: resultVector) {
+ assertEquals("Assert that this row has the correct number of fields", fieldSet.size(), result.size());
+ for (String field: fieldSet) {
+ // TODO: This will fail until the fix is made to insert and update fields in the correct order.
+ // TODO: Uncomment this assertEquals when the fix is made.
+ //assertEquals("Assert this field is correct in this row", keyMap.get(KEY_PREFIX + testIndex).get(field), result.get(field));
+ }
+ testIndex++;
+ }
}
}