diff --git a/kudu/pom.xml b/kudu/pom.xml index 1eead10b..5ee69c15 100644 --- a/kudu/pom.xml +++ b/kudu/pom.xml @@ -1,58 +1,82 @@ 4.0.0 com.yahoo.ycsb binding-parent 0.6.0-SNAPSHOT ../binding-parent kudu-binding Kudu DB Binding jar org.kududb kudu-client ${kudu.version} com.yahoo.ycsb core ${project.version} provided + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.15 + + true + ../checkstyle.xml + true + true + + + + validate + validate + + checkstyle + + + + + + true false cloudera-repo Cloudera Releases https://repository.cloudera.com/artifactory/cloudera-repos diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java index d823ff46..f063ed7d 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -1,318 +1,347 @@ /** * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.stumbleupon.async.TimeoutException; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.workloads.CoreWorkload; import org.kududb.ColumnSchema; import org.kududb.Schema; import org.kududb.client.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.Vector; import static org.kududb.Type.STRING; /** - * Kudu client for YCSB framework - * Example to load: - * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5 - * Example to run: + * Kudu client for YCSB framework. Example to load:
+ * + *
+ * 
+ * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5 
+ * 
+ * 
+ * + *
Example to run:
+ * + *
+ * 
  * ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
- *
+ * 
+ * 
+ * + *
*/ public class KuduYCSBClient extends com.yahoo.ycsb.DB { public static final String KEY = "key"; - public static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out."); + public static final Status TIMEOUT = + new Status("TIMEOUT", "The operation timed out."); public static final int MAX_TABLETS = 9000; public static final long DEFAULT_SLEEP = 60000; private static final String SYNC_OPS_OPT = "kudu_sync_ops"; private static final String DEBUG_OPT = "kudu_debug"; private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors"; - private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets"; + private static final String PRE_SPLIT_NUM_TABLETS_OPT = + "kudu_pre_split_num_tablets"; private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas"; private static final String BLOCK_SIZE_OPT = "kudu_block_size"; private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses"; private static final int BLOCK_SIZE_DEFAULT = 4096; - private static final List columnNames = new ArrayList(); + private static final List COLUMN_NAMES = new ArrayList(); private static KuduClient client; private static Schema schema; private static int fieldCount; private boolean debug = false; private boolean printErrors = false; private String tableName; private KuduSession session; - private KuduTable table; + private KuduTable kuduTable; @Override public void init() throws DBException { if (getProperties().getProperty(DEBUG_OPT) != null) { this.debug = getProperties().getProperty(DEBUG_OPT).equals("true"); } if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { - this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); + this.printErrors = + getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); } if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { - this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); + this.printErrors = + getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); } this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table; initClient(debug, tableName, getProperties()); this.session = client.newSession(); - if (getProperties().getProperty(SYNC_OPS_OPT) != null && - getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { + if (getProperties().getProperty(SYNC_OPS_OPT) != null + && getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); this.session.setMutationBufferSpace(100); } else { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); } try { - this.table = client.openTable(tableName); + this.kuduTable = client.openTable(tableName); } catch (Exception e) { throw new DBException("Could not open a table because of:", e); } } - private synchronized static void initClient(boolean debug, String tableName, Properties prop) - throws DBException { - if (client != null) return; + private static synchronized void initClient(boolean debug, String tableName, + Properties prop) throws DBException { + if (client != null) { + return; + } String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT); if (masterAddresses == null) { masterAddresses = "localhost:7051"; } int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); if (numTablets > MAX_TABLETS) { - throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " + - "or below " + MAX_TABLETS); + throw new DBException("Specified number of tablets (" + numTablets + + ") must be equal " + "or below " + MAX_TABLETS); } int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3); int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); client = new KuduClient.KuduClientBuilder(masterAddresses) .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) - .defaultOperationTimeoutMs(DEFAULT_SLEEP) - .build(); + .defaultOperationTimeoutMs(DEFAULT_SLEEP).build(); if (debug) { System.out.println("Connecting to the masters at " + masterAddresses); } fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); List columns = new ArrayList(fieldCount + 1); ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) - .key(true) - .desiredBlockSize(blockSize) - .build(); + .key(true).desiredBlockSize(blockSize).build(); columns.add(keyColumn); - columnNames.add(KEY); + COLUMN_NAMES.add(KEY); for (int i = 0; i < fieldCount; i++) { String name = "field" + i; - columnNames.add(name); + COLUMN_NAMES.add(name); columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) - .desiredBlockSize(blockSize) - .build()); + .desiredBlockSize(blockSize).build()); } schema = new Schema(columns); CreateTableBuilder builder = new CreateTableBuilder(); builder.setNumReplicas(numReplicas); // create n-1 split keys, which will end up being n tablets master-side for (int i = 1; i < numTablets + 0; i++) { // We do +1000 since YCSB starts at user1. int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000; String startKey = String.format("%04d", startKeyInt); PartialRow splitRow = schema.newPartialRow(); splitRow.addString(0, "user" + startKey); builder.addSplitRow(splitRow); } try { client.createTable(tableName, schema, builder); } catch (Exception e) { if (!e.getMessage().contains("ALREADY_PRESENT")) { throw new DBException("Couldn't create the table", e); } } } - private static int getIntFromProp(Properties prop, String propName, int defaultValue) - throws DBException { + private static int getIntFromProp(Properties prop, String propName, + int defaultValue) throws DBException { String intStr = prop.getProperty(propName); if (intStr == null) { return defaultValue; } else { try { return Integer.valueOf(intStr); } catch (NumberFormatException ex) { - throw new DBException("Provided number for " + propName + " isn't a valid integer"); + throw new DBException( + "Provided number for " + propName + " isn't a valid integer"); } } } @Override public void cleanup() throws DBException { try { this.session.close(); } catch (Exception e) { throw new DBException("Couldn't cleanup the session", e); } } @Override public Status read(String table, String key, Set fields, - HashMap result) { - Vector> results = new Vector>(); + HashMap result) { + Vector> results = + new Vector>(); final Status status = scan(table, key, 1, fields, results); - if (!status.equals(Status.OK)) return status; - if (results.size() != 1) return Status.NOT_FOUND; + if (!status.equals(Status.OK)) { + return status; + } + if (results.size() != 1) { + return Status.NOT_FOUND; + } result.putAll(results.firstElement()); return Status.OK; } @Override - public Status scan(String table, String startkey, int recordcount, Set fields, - Vector> result) { + public Status scan(String table, String startkey, int recordcount, + Set fields, Vector> result) { try { - KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table); + KuduScanner.KuduScannerBuilder scannerBuilder = + client.newScannerBuilder(this.kuduTable); List querySchema; if (fields == null) { - querySchema = columnNames; + querySchema = COLUMN_NAMES; // No need to set the projected columns with the whole schema. } else { querySchema = new ArrayList(fields); scannerBuilder.setProjectedColumnNames(querySchema); } PartialRow lowerBound = schema.newPartialRow(); lowerBound.addString(0, startkey); scannerBuilder.lowerBound(lowerBound); if (recordcount == 1) { PartialRow upperBound = schema.newPartialRow(); // Keys are fixed length, just adding something at the end is safe. upperBound.addString(0, startkey.concat(" ")); scannerBuilder.exclusiveUpperBound(upperBound); } - KuduScanner scanner = scannerBuilder - .limit(recordcount) // currently noop + KuduScanner scanner = scannerBuilder.limit(recordcount) // currently noop .build(); while (scanner.hasMoreRows()) { RowResultIterator data = scanner.nextRows(); addAllRowsToResult(data, recordcount, querySchema, result); - if (recordcount == result.size()) break; + if (recordcount == result.size()) { + break; + } } RowResultIterator closer = scanner.close(); addAllRowsToResult(closer, recordcount, querySchema, result); } catch (TimeoutException te) { if (printErrors) { - System.err.println("Waited too long for a scan operation with start key=" + startkey); + System.err.println( + "Waited too long for a scan operation with start key=" + startkey); } return TIMEOUT; } catch (Exception e) { System.err.println("Unexpected exception " + e); e.printStackTrace(); return Status.ERROR; } return Status.OK; } private void addAllRowsToResult(RowResultIterator it, int recordcount, - List querySchema, - Vector> result) - throws Exception { + List querySchema, Vector> result) + throws Exception { RowResult row; - HashMap rowResult = new HashMap(querySchema.size()); - if (it == null) return; + HashMap rowResult = + new HashMap(querySchema.size()); + if (it == null) { + return; + } while (it.hasNext()) { - if (result.size() == recordcount) return; + if (result.size() == recordcount) { + return; + } row = it.next(); int colIdx = 0; for (String col : querySchema) { rowResult.put(col, new StringByteIterator(row.getString(colIdx))); colIdx++; } result.add(rowResult); } } @Override - public Status update(String table, String key, HashMap values) { - Update update = this.table.newUpdate(); + public Status update(String table, String key, + HashMap values) { + Update update = this.kuduTable.newUpdate(); PartialRow row = update.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { String columnName = schema.getColumnByIndex(i).getName(); if (values.containsKey(columnName)) { String value = values.get(columnName).toString(); row.addString(columnName, value); } } apply(update); return Status.OK; } @Override - public Status insert(String table, String key, HashMap values) { - Insert insert = this.table.newInsert(); + public Status insert(String table, String key, + HashMap values) { + Insert insert = this.kuduTable.newInsert(); PartialRow row = insert.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { - row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray())); + row.addString(i, new String( + values.get(schema.getColumnByIndex(i).getName()).toArray())); } apply(insert); return Status.OK; } @Override public Status delete(String table, String key) { - Delete delete = this.table.newDelete(); + Delete delete = this.kuduTable.newDelete(); PartialRow row = delete.getRow(); row.addString(KEY, key); apply(delete); return Status.OK; } private void apply(Operation op) { try { OperationResponse response = session.apply(op); if (response != null && response.hasRowError() && printErrors) { System.err.println("Got a row error " + response.getRowError()); } } catch (Exception ex) { if (printErrors) { System.err.println("Failed to apply an operation " + ex.toString()); ex.printStackTrace(); } } } } diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 00000000..5d6c4961 --- /dev/null +++ b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for Kudu. + */ +package com.yahoo.ycsb.db; +