diff --git a/kudu/README.md b/kudu/README.md index e1f2b286..704ca8c2 100644 --- a/kudu/README.md +++ b/kudu/README.md @@ -1,56 +1,61 @@ # Kudu bindings for YCSB -[Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data. +[Apache Kudu](https://kudu.apache.org) is a storage engine that enables fast +analytics on fast data. ## Benchmarking Kudu -Use the following command line to load the initial data into an existing Kudu cluster with default -configurations. +Use the following command line to load the initial data into an existing Kudu +cluster with default configurations. ``` bin/ycsb load kudu -P workloads/workloada ``` Additional configurations: -* `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost. -* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default -uses 4 tablets. A good rule of thumb is to use 5 per tablet server. -* `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should -only be configured to use 1 instead, for single node tests. -* `kudu_sync_ops`: If the client should wait after every write operation. The default is true. -* `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes. +* `kudu_master_addresses`: The master's address. The default configuration + expects a master on localhost. +* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create + for the table. The default uses 4 tablets. A good rule of thumb is to use 5 + per tablet server. +* `kudu_table_num_replicas`: The number of replicas that each tablet will have. + The default is 3. Should only be configured to use 1 instead, for single node tests. +* `kudu_sync_ops`: If the client should wait after every write operation. The + default is true. +* `kudu_block_size`: The data block size used to configure columns. The default + is 4096 bytes. Then, you can run the workload: ``` bin/ycsb run kudu -P workloads/workloada ``` ## Using a previous client version -If you wish to use a different Kudu client version than the one shipped with YCSB, you can specify on the -command line with `-Dkudu.version=x`. For example: +If you wish to use a different Kudu client version than the one shipped with +YCSB, you can specify on the command line with `-Dkudu.version=x`. For example: ``` -mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=0.7.1 +mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=1.0.1 ``` -Note that prior to 1.0, Kudu doesn't guarantee wire or API compability between versions and only the latest -one is officially supported. +Note that only versions since 1.0 are supported, since Kudu did not guarantee +wire or API compatibility prior to 1.0. diff --git a/kudu/pom.xml b/kudu/pom.xml index 4d703244..f1c7b00d 100644 --- a/kudu/pom.xml +++ b/kudu/pom.xml @@ -1,58 +1,55 @@ 4.0.0 com.yahoo.ycsb binding-parent 0.13.0-SNAPSHOT ../binding-parent kudu-binding Kudu DB Binding jar - org.kududb + org.apache.kudu kudu-client ${kudu.version} com.yahoo.ycsb core ${project.version} provided + + org.slf4j + slf4j-api + 1.7.21 + + + org.slf4j + slf4j-log4j12 + 1.7.21 + - - - - true - - - false - - cloudera-repo - Cloudera Releases - https://repository.cloudera.com/artifactory/cloudera-repos - - diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java index 4a771a7f..2d68a178 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -1,352 +1,319 @@ /** - * Copyright (c) 2015 YCSB contributors. All rights reserved. + * Copyright (c) 2015-2016 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.stumbleupon.async.TimeoutException; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.workloads.CoreWorkload; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.Vector; -import static org.kududb.Type.STRING; +import static org.apache.kudu.Type.STRING; +import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL; +import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL; /** * Kudu client for YCSB framework. Example to load:
- * + * *
  * 
- * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5 
+ * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
  * 
  * 
- * + * *
Example to run:
- * + * *
  * 
  * ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
  * 
  * 
- * + * *
*/ public class KuduYCSBClient extends com.yahoo.ycsb.DB { - public static final String KEY = "key"; - public static final Status TIMEOUT = - new Status("TIMEOUT", "The operation timed out."); - public static final int MAX_TABLETS = 9000; - public static final long DEFAULT_SLEEP = 60000; + private static final Logger LOG = LoggerFactory.getLogger(KuduYCSBClient.class); + private static final String KEY = "key"; + private static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out."); + private static final int MAX_TABLETS = 9000; + private static final long DEFAULT_SLEEP = 60000; private static final String SYNC_OPS_OPT = "kudu_sync_ops"; - private static final String DEBUG_OPT = "kudu_debug"; - private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors"; - private static final String PRE_SPLIT_NUM_TABLETS_OPT = - "kudu_pre_split_num_tablets"; + private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets"; private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas"; private static final String BLOCK_SIZE_OPT = "kudu_block_size"; private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses"; private static final int BLOCK_SIZE_DEFAULT = 4096; - private static final List COLUMN_NAMES = new ArrayList(); + private static final List COLUMN_NAMES = new ArrayList<>(); private static KuduClient client; private static Schema schema; - private static int fieldCount; - private boolean debug = false; - private boolean printErrors = false; - private String tableName; private KuduSession session; private KuduTable kuduTable; @Override public void init() throws DBException { - if (getProperties().getProperty(DEBUG_OPT) != null) { - this.debug = getProperties().getProperty(DEBUG_OPT).equals("true"); - } - if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { - this.printErrors = - getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); - } - if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { - this.printErrors = - getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); - } - this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table; - initClient(debug, tableName, getProperties()); + String tableName = CoreWorkload.table; + initClient(tableName, getProperties()); this.session = client.newSession(); if (getProperties().getProperty(SYNC_OPS_OPT) != null && getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); this.session.setMutationBufferSpace(100); } else { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); } try { this.kuduTable = client.openTable(tableName); } catch (Exception e) { throw new DBException("Could not open a table because of:", e); } } - private static synchronized void initClient(boolean debug, String tableName, - Properties prop) throws DBException { + private static synchronized void initClient(String tableName, + Properties prop) throws DBException { if (client != null) { return; } String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT); if (masterAddresses == null) { masterAddresses = "localhost:7051"; } int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); if (numTablets > MAX_TABLETS) { - throw new DBException("Specified number of tablets (" + numTablets - + ") must be equal " + "or below " + MAX_TABLETS); + throw new DBException(String.format( + "Specified number of tablets (%s) must be equal or below %s", numTablets, MAX_TABLETS)); } int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3); - int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); client = new KuduClient.KuduClientBuilder(masterAddresses) - .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) - .defaultOperationTimeoutMs(DEFAULT_SLEEP) - .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP).build(); - if (debug) { - System.out.println("Connecting to the masters at " + masterAddresses); - } + .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) + .defaultOperationTimeoutMs(DEFAULT_SLEEP) + .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) + .build(); + LOG.debug("Connecting to the masters at {}", masterAddresses); - fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, - Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); + int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, + Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); - List columns = new ArrayList(fieldCount + 1); + List columns = new ArrayList<>(fieldCount + 1); ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) - .key(true).desiredBlockSize(blockSize).build(); + .key(true) + .desiredBlockSize(blockSize) + .build(); columns.add(keyColumn); COLUMN_NAMES.add(KEY); for (int i = 0; i < fieldCount; i++) { String name = "field" + i; COLUMN_NAMES.add(name); columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) - .desiredBlockSize(blockSize).build()); + .desiredBlockSize(blockSize) + .build()); } schema = new Schema(columns); CreateTableOptions builder = new CreateTableOptions(); - List rangePartitionColumns = new ArrayList<>(1); - rangePartitionColumns.add(KEY); - builder.setRangePartitionColumns(rangePartitionColumns); + builder.setRangePartitionColumns(new ArrayList()); + List hashPartitionColumns = new ArrayList<>(); + hashPartitionColumns.add(KEY); + builder.addHashPartitions(hashPartitionColumns, numTablets); builder.setNumReplicas(numReplicas); - // create n-1 split keys, which will end up being n tablets master-side - for (int i = 1; i < numTablets + 0; i++) { - // We do +1000 since YCSB starts at user1. - int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000; - String startKey = String.format("%04d", startKeyInt); - PartialRow splitRow = schema.newPartialRow(); - splitRow.addString(0, "user" + startKey); - builder.addSplitRow(splitRow); - } try { client.createTable(tableName, schema, builder); } catch (Exception e) { - if (!e.getMessage().contains("ALREADY_PRESENT")) { + if (!e.getMessage().contains("already exists")) { throw new DBException("Couldn't create the table", e); } } } - private static int getIntFromProp(Properties prop, String propName, - int defaultValue) throws DBException { + private static int getIntFromProp(Properties prop, + String propName, + int defaultValue) throws DBException { String intStr = prop.getProperty(propName); if (intStr == null) { return defaultValue; } else { try { return Integer.valueOf(intStr); } catch (NumberFormatException ex) { - throw new DBException( - "Provided number for " + propName + " isn't a valid integer"); + throw new DBException("Provided number for " + propName + " isn't a valid integer"); } } } @Override public void cleanup() throws DBException { try { this.session.close(); } catch (Exception e) { throw new DBException("Couldn't cleanup the session", e); } } @Override - public Status read(String table, String key, Set fields, - HashMap result) { - Vector> results = - new Vector>(); + public Status read(String table, + String key, + Set fields, + HashMap result) { + Vector> results = new Vector<>(); final Status status = scan(table, key, 1, fields, results); if (!status.equals(Status.OK)) { return status; } if (results.size() != 1) { return Status.NOT_FOUND; } result.putAll(results.firstElement()); return Status.OK; } @Override - public Status scan(String table, String startkey, int recordcount, - Set fields, Vector> result) { + public Status scan(String table, + String startkey, + int recordcount, + Set fields, + Vector> result) { try { - KuduScanner.KuduScannerBuilder scannerBuilder = - client.newScannerBuilder(this.kuduTable); + KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(kuduTable); List querySchema; if (fields == null) { querySchema = COLUMN_NAMES; // No need to set the projected columns with the whole schema. } else { querySchema = new ArrayList<>(fields); scannerBuilder.setProjectedColumnNames(querySchema); } - PartialRow lowerBound = schema.newPartialRow(); - lowerBound.addString(0, startkey); - scannerBuilder.lowerBound(lowerBound); - - if (recordcount == 1) { - PartialRow upperBound = schema.newPartialRow(); - upperBound.addString(0, startkey + '\0'); - scannerBuilder.exclusiveUpperBound(upperBound); - } - + ColumnSchema column = schema.getColumnByIndex(0); + KuduPredicate.ComparisonOp predicateOp = recordcount == 1 ? EQUAL : GREATER_EQUAL; + KuduPredicate predicate = KuduPredicate.newComparisonPredicate(column, predicateOp, startkey); + scannerBuilder.addPredicate(predicate); scannerBuilder.limit(recordcount); // currently noop KuduScanner scanner = scannerBuilder.build(); while (scanner.hasMoreRows()) { RowResultIterator data = scanner.nextRows(); addAllRowsToResult(data, recordcount, querySchema, result); if (recordcount == result.size()) { break; } } RowResultIterator closer = scanner.close(); addAllRowsToResult(closer, recordcount, querySchema, result); } catch (TimeoutException te) { - if (printErrors) { - System.err.println( - "Waited too long for a scan operation with start key=" + startkey); - } + LOG.info("Waited too long for a scan operation with start key={}", startkey); return TIMEOUT; } catch (Exception e) { - System.err.println("Unexpected exception " + e); - e.printStackTrace(); + LOG.warn("Unexpected exception", e); return Status.ERROR; } return Status.OK; } - private void addAllRowsToResult(RowResultIterator it, int recordcount, - List querySchema, Vector> result) - throws Exception { + private void addAllRowsToResult(RowResultIterator it, + int recordcount, + List querySchema, + Vector> result) throws Exception { RowResult row; - HashMap rowResult = - new HashMap(querySchema.size()); + HashMap rowResult = new HashMap<>(querySchema.size()); if (it == null) { return; } while (it.hasNext()) { if (result.size() == recordcount) { return; } row = it.next(); int colIdx = 0; for (String col : querySchema) { rowResult.put(col, new StringByteIterator(row.getString(colIdx))); colIdx++; } result.add(rowResult); } } @Override - public Status update(String table, String key, - HashMap values) { + public Status update(String table, String key, HashMap values) { Update update = this.kuduTable.newUpdate(); PartialRow row = update.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { String columnName = schema.getColumnByIndex(i).getName(); if (values.containsKey(columnName)) { String value = values.get(columnName).toString(); row.addString(columnName, value); } } apply(update); return Status.OK; } @Override - public Status insert(String table, String key, - HashMap values) { + public Status insert(String table, String key, HashMap values) { Insert insert = this.kuduTable.newInsert(); PartialRow row = insert.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { - row.addString(i, new String( - values.get(schema.getColumnByIndex(i).getName()).toArray())); + row.addString(i, values.get(schema.getColumnByIndex(i).getName()).toString()); } apply(insert); return Status.OK; } @Override public Status delete(String table, String key) { Delete delete = this.kuduTable.newDelete(); PartialRow row = delete.getRow(); row.addString(KEY, key); apply(delete); return Status.OK; } private void apply(Operation op) { try { OperationResponse response = session.apply(op); - if (response != null && response.hasRowError() && printErrors) { - System.err.println("Got a row error " + response.getRowError()); - } - } catch (Exception ex) { - if (printErrors) { - System.err.println("Failed to apply an operation " + ex.toString()); - ex.printStackTrace(); + if (response != null && response.hasRowError()) { + LOG.info("Write operation failed: {}", response.getRowError()); } + } catch (KuduException ex) { + LOG.warn("Write operation failed", ex); } } } diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java index 5d6c4961..a76fb24f 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -1,22 +1,22 @@ -/* - * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. +/** + * Copyright (c) 2015-2016 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ /** - * The YCSB binding for Kudu. + * The YCSB binding for Apache Kudu. */ package com.yahoo.ycsb.db; diff --git a/kudu/src/main/resources/log4j.properties b/kudu/src/main/resources/log4j.properties new file mode 100644 index 00000000..7a754a8b --- /dev/null +++ b/kudu/src/main/resources/log4j.properties @@ -0,0 +1,11 @@ +# Root logger option +log4j.rootLogger=DEBUG, stderr + +log4j.logger.com.stumbleupon.async=WARN +log4j.logger.org.apache.kudu=WARN + +# Direct log messages to stderr +log4j.appender.stderr = org.apache.log4j.ConsoleAppender +log4j.appender.stderr.Target=System.err +log4j.appender.stderr.layout = org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern = [%p] %d{HH:mm:ss.SSS} (%F:%L) %m%n diff --git a/pom.xml b/pom.xml index af538ca3..6f935968 100644 --- a/pom.xml +++ b/pom.xml @@ -1,183 +1,183 @@ 4.0.0 com.yahoo.ycsb root 0.13.0-SNAPSHOT pom YCSB Root This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure. scm:git:git://github.com/brianfrankcooper/YCSB.git master https://github.com/brianfrankcooper/YCSB com.puppycrawl.tools checkstyle 7.7.1 org.jdom jdom 1.1 com.google.collections google-collections 1.0 org.slf4j slf4j-api 1.6.4 2.5.5 2.10 1.7.1 0.94.27 0.98.14-hadoop2 1.0.2 1.6.0 3.0.0 1.0.0-incubating.M3 0.2.3 7.2.2.Final - 0.9.0 + 1.1.0 2.1.1 3.0.3 2.0.1 2.2.10 2.0.0 1.10.20 0.81 UTF-8 0.8.0 0.9.5.6 1.4.10 2.3.1 1.6.5 2.0.5 3.1.2 5.4.0 6.2.1 2.7.3 4.0.0 core binding-parent accumulo aerospike arangodb asynchbase azuretablestorage cassandra couchbase couchbase2 distribution dynamodb elasticsearch geode googlebigtable googledatastore hbase094 hbase098 hbase10 hypertable infinispan jdbc kudu memcached mongodb nosqldb orientdb rados redis rest riak s3 solr solr6 tarantool org.apache.maven.plugins maven-checkstyle-plugin 2.16 org.apache.maven.plugins maven-compiler-plugin 3.3 1.7 1.7 org.apache.maven.plugins maven-checkstyle-plugin validate validate check checkstyle.xml