diff --git a/kudu/pom.xml b/kudu/pom.xml
index 1eead10b..5ee69c15 100644
--- a/kudu/pom.xml
+++ b/kudu/pom.xml
@@ -1,58 +1,82 @@
4.0.0
com.yahoo.ycsb
binding-parent
0.6.0-SNAPSHOT
../binding-parent
kudu-binding
Kudu DB Binding
jar
org.kududb
kudu-client
${kudu.version}
com.yahoo.ycsb
core
${project.version}
provided
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.15
+
+ true
+ ../checkstyle.xml
+ true
+ true
+
+
+
+ validate
+ validate
+
+ checkstyle
+
+
+
+
+
+
true
false
cloudera-repo
Cloudera Releases
https://repository.cloudera.com/artifactory/cloudera-repos
diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
index d823ff46..f063ed7d 100644
--- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
+++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
@@ -1,318 +1,347 @@
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.stumbleupon.async.TimeoutException;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.workloads.CoreWorkload;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.client.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import static org.kududb.Type.STRING;
/**
- * Kudu client for YCSB framework
- * Example to load:
- * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
- * Example to run:
+ * Kudu client for YCSB framework. Example to load:
+ *
+ *
+ *
+ * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
+ *
+ *
+ *
+ *
Example to run:
+ *
+ *
+ *
* ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
- *
+ *
+ *
+ *
+ *
*/
public class KuduYCSBClient extends com.yahoo.ycsb.DB {
public static final String KEY = "key";
- public static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out.");
+ public static final Status TIMEOUT =
+ new Status("TIMEOUT", "The operation timed out.");
public static final int MAX_TABLETS = 9000;
public static final long DEFAULT_SLEEP = 60000;
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String DEBUG_OPT = "kudu_debug";
private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors";
- private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets";
+ private static final String PRE_SPLIT_NUM_TABLETS_OPT =
+ "kudu_pre_split_num_tablets";
private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final int BLOCK_SIZE_DEFAULT = 4096;
- private static final List columnNames = new ArrayList();
+ private static final List COLUMN_NAMES = new ArrayList();
private static KuduClient client;
private static Schema schema;
private static int fieldCount;
private boolean debug = false;
private boolean printErrors = false;
private String tableName;
private KuduSession session;
- private KuduTable table;
+ private KuduTable kuduTable;
@Override
public void init() throws DBException {
if (getProperties().getProperty(DEBUG_OPT) != null) {
this.debug = getProperties().getProperty(DEBUG_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
- this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
+ this.printErrors =
+ getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
- this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
+ this.printErrors =
+ getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table;
initClient(debug, tableName, getProperties());
this.session = client.newSession();
- if (getProperties().getProperty(SYNC_OPS_OPT) != null &&
- getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
+ if (getProperties().getProperty(SYNC_OPS_OPT) != null
+ && getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
this.session.setMutationBufferSpace(100);
} else {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
}
try {
- this.table = client.openTable(tableName);
+ this.kuduTable = client.openTable(tableName);
} catch (Exception e) {
throw new DBException("Could not open a table because of:", e);
}
}
- private synchronized static void initClient(boolean debug, String tableName, Properties prop)
- throws DBException {
- if (client != null) return;
+ private static synchronized void initClient(boolean debug, String tableName,
+ Properties prop) throws DBException {
+ if (client != null) {
+ return;
+ }
String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT);
if (masterAddresses == null) {
masterAddresses = "localhost:7051";
}
int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
- throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " +
- "or below " + MAX_TABLETS);
+ throw new DBException("Specified number of tablets (" + numTablets
+ + ") must be equal " + "or below " + MAX_TABLETS);
}
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
client = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
- .defaultOperationTimeoutMs(DEFAULT_SLEEP)
- .build();
+ .defaultOperationTimeoutMs(DEFAULT_SLEEP).build();
if (debug) {
System.out.println("Connecting to the masters at " + masterAddresses);
}
fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
List columns = new ArrayList(fieldCount + 1);
ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
- .key(true)
- .desiredBlockSize(blockSize)
- .build();
+ .key(true).desiredBlockSize(blockSize).build();
columns.add(keyColumn);
- columnNames.add(KEY);
+ COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
- columnNames.add(name);
+ COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
- .desiredBlockSize(blockSize)
- .build());
+ .desiredBlockSize(blockSize).build());
}
schema = new Schema(columns);
CreateTableBuilder builder = new CreateTableBuilder();
builder.setNumReplicas(numReplicas);
// create n-1 split keys, which will end up being n tablets master-side
for (int i = 1; i < numTablets + 0; i++) {
// We do +1000 since YCSB starts at user1.
int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000;
String startKey = String.format("%04d", startKeyInt);
PartialRow splitRow = schema.newPartialRow();
splitRow.addString(0, "user" + startKey);
builder.addSplitRow(splitRow);
}
try {
client.createTable(tableName, schema, builder);
} catch (Exception e) {
if (!e.getMessage().contains("ALREADY_PRESENT")) {
throw new DBException("Couldn't create the table", e);
}
}
}
- private static int getIntFromProp(Properties prop, String propName, int defaultValue)
- throws DBException {
+ private static int getIntFromProp(Properties prop, String propName,
+ int defaultValue) throws DBException {
String intStr = prop.getProperty(propName);
if (intStr == null) {
return defaultValue;
} else {
try {
return Integer.valueOf(intStr);
} catch (NumberFormatException ex) {
- throw new DBException("Provided number for " + propName + " isn't a valid integer");
+ throw new DBException(
+ "Provided number for " + propName + " isn't a valid integer");
}
}
}
@Override
public void cleanup() throws DBException {
try {
this.session.close();
} catch (Exception e) {
throw new DBException("Couldn't cleanup the session", e);
}
}
@Override
public Status read(String table, String key, Set fields,
- HashMap result) {
- Vector> results = new Vector>();
+ HashMap result) {
+ Vector> results =
+ new Vector>();
final Status status = scan(table, key, 1, fields, results);
- if (!status.equals(Status.OK)) return status;
- if (results.size() != 1) return Status.NOT_FOUND;
+ if (!status.equals(Status.OK)) {
+ return status;
+ }
+ if (results.size() != 1) {
+ return Status.NOT_FOUND;
+ }
result.putAll(results.firstElement());
return Status.OK;
}
@Override
- public Status scan(String table, String startkey, int recordcount, Set fields,
- Vector> result) {
+ public Status scan(String table, String startkey, int recordcount,
+ Set fields, Vector> result) {
try {
- KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table);
+ KuduScanner.KuduScannerBuilder scannerBuilder =
+ client.newScannerBuilder(this.kuduTable);
List querySchema;
if (fields == null) {
- querySchema = columnNames;
+ querySchema = COLUMN_NAMES;
// No need to set the projected columns with the whole schema.
} else {
querySchema = new ArrayList(fields);
scannerBuilder.setProjectedColumnNames(querySchema);
}
PartialRow lowerBound = schema.newPartialRow();
lowerBound.addString(0, startkey);
scannerBuilder.lowerBound(lowerBound);
if (recordcount == 1) {
PartialRow upperBound = schema.newPartialRow();
// Keys are fixed length, just adding something at the end is safe.
upperBound.addString(0, startkey.concat(" "));
scannerBuilder.exclusiveUpperBound(upperBound);
}
- KuduScanner scanner = scannerBuilder
- .limit(recordcount) // currently noop
+ KuduScanner scanner = scannerBuilder.limit(recordcount) // currently noop
.build();
while (scanner.hasMoreRows()) {
RowResultIterator data = scanner.nextRows();
addAllRowsToResult(data, recordcount, querySchema, result);
- if (recordcount == result.size()) break;
+ if (recordcount == result.size()) {
+ break;
+ }
}
RowResultIterator closer = scanner.close();
addAllRowsToResult(closer, recordcount, querySchema, result);
} catch (TimeoutException te) {
if (printErrors) {
- System.err.println("Waited too long for a scan operation with start key=" + startkey);
+ System.err.println(
+ "Waited too long for a scan operation with start key=" + startkey);
}
return TIMEOUT;
} catch (Exception e) {
System.err.println("Unexpected exception " + e);
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
private void addAllRowsToResult(RowResultIterator it, int recordcount,
- List querySchema,
- Vector> result)
- throws Exception {
+ List querySchema, Vector> result)
+ throws Exception {
RowResult row;
- HashMap rowResult = new HashMap(querySchema.size());
- if (it == null) return;
+ HashMap rowResult =
+ new HashMap(querySchema.size());
+ if (it == null) {
+ return;
+ }
while (it.hasNext()) {
- if (result.size() == recordcount) return;
+ if (result.size() == recordcount) {
+ return;
+ }
row = it.next();
int colIdx = 0;
for (String col : querySchema) {
rowResult.put(col, new StringByteIterator(row.getString(colIdx)));
colIdx++;
}
result.add(rowResult);
}
}
@Override
- public Status update(String table, String key, HashMap values) {
- Update update = this.table.newUpdate();
+ public Status update(String table, String key,
+ HashMap values) {
+ Update update = this.kuduTable.newUpdate();
PartialRow row = update.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
String columnName = schema.getColumnByIndex(i).getName();
if (values.containsKey(columnName)) {
String value = values.get(columnName).toString();
row.addString(columnName, value);
}
}
apply(update);
return Status.OK;
}
@Override
- public Status insert(String table, String key, HashMap values) {
- Insert insert = this.table.newInsert();
+ public Status insert(String table, String key,
+ HashMap values) {
+ Insert insert = this.kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
- row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray()));
+ row.addString(i, new String(
+ values.get(schema.getColumnByIndex(i).getName()).toArray()));
}
apply(insert);
return Status.OK;
}
@Override
public Status delete(String table, String key) {
- Delete delete = this.table.newDelete();
+ Delete delete = this.kuduTable.newDelete();
PartialRow row = delete.getRow();
row.addString(KEY, key);
apply(delete);
return Status.OK;
}
private void apply(Operation op) {
try {
OperationResponse response = session.apply(op);
if (response != null && response.hasRowError() && printErrors) {
System.err.println("Got a row error " + response.getRowError());
}
} catch (Exception ex) {
if (printErrors) {
System.err.println("Failed to apply an operation " + ex.toString());
ex.printStackTrace();
}
}
}
}
diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java
new file mode 100644
index 00000000..5d6c4961
--- /dev/null
+++ b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for Kudu.
+ */
+package com.yahoo.ycsb.db;
+