(gcBeans.size());
+ for (final GarbageCollectorMXBean bean : gcBeans) {
+ if (!bean.isValid() || bean.getCollectionCount() < 0 ||
+ bean.getCollectionTime() < 0) {
+ continue;
+ }
+
+ final Long[] measurements = new Long[] {
+ bean.getCollectionCount(),
+ bean.getCollectionTime()
+ };
+ map.put(bean.getName().replace(" ", "_"), measurements);
+ }
+ return map;
+ }
}
diff --git a/core/src/main/java/com/yahoo/ycsb/Workload.java b/core/src/main/java/com/yahoo/ycsb/Workload.java
index 6b3e8ba7..7bebafbf 100644
--- a/core/src/main/java/com/yahoo/ycsb/Workload.java
+++ b/core/src/main/java/com/yahoo/ycsb/Workload.java
@@ -1,112 +1,113 @@
-/**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying
- * LICENSE file.
+/**
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
*/
package com.yahoo.ycsb;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Properties;
+
/**
* One experiment scenario. One object of this type will
* be instantiated and shared among all client threads. This class
* should be constructed using a no-argument constructor, so we can
* load it dynamically. Any argument-based initialization should be
* done by init().
*
- * If you extend this class, you should support the "insertstart" property. This
- * allows the load phase to proceed from multiple clients on different machines, in case
+ * If you extend this class, you should support the "insertstart" property. This
+ * allows the Client to proceed from multiple clients on different machines, in case
* the client is the bottleneck. For example, if we want to load 1 million records from
* 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally,
* the "insertcount" property, which is interpreted by Client, can be used to tell each instance of the
* client how many inserts to do. In the example above, both clients should have insertcount=500000.
*/
-public abstract class Workload
-{
- public static final String INSERT_START_PROPERTY="insertstart";
-
- public static final String INSERT_START_PROPERTY_DEFAULT="0";
-
- private volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
-
- /**
- * Initialize the scenario. Create any generators and other shared objects here.
- * Called once, in the main client thread, before any operations are started.
- */
- public void init(Properties p) throws WorkloadException
- {
- }
+public abstract class Workload {
+ public static final String INSERT_START_PROPERTY = "insertstart";
+ public static final String INSERT_COUNT_PROPERTY = "insertcount";
+
+ public static final String INSERT_START_PROPERTY_DEFAULT = "0";
+
+ private volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
+
+ /**
+ * Initialize the scenario. Create any generators and other shared objects here.
+ * Called once, in the main client thread, before any operations are started.
+ */
+ public void init(Properties p) throws WorkloadException {
+ }
- /**
- * Initialize any state for a particular client thread. Since the scenario object
- * will be shared among all threads, this is the place to create any state that is specific
- * to one thread. To be clear, this means the returned object should be created anew on each
- * call to initThread(); do not return the same object multiple times.
- * The returned object will be passed to invocations of doInsert() and doTransaction()
- * for this thread. There should be no side effects from this call; all state should be encapsulated
- * in the returned object. If you have no state to retain for this thread, return null. (But if you have
- * no state to retain for this thread, probably you don't need to override initThread().)
- *
- * @return false if the workload knows it is done for this thread. Client will terminate the thread. Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read traces from a file, return true when there are more to do, false when you are done.
- */
- public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException
- {
- return null;
- }
-
- /**
- * Cleanup the scenario. Called once, in the main client thread, after all operations have completed.
- */
- public void cleanup() throws WorkloadException
- {
- }
-
- /**
- * Do one insert operation. Because it will be called concurrently from multiple client threads, this
- * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
- * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
- * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
- * synchronized, since each thread has its own threadstate instance.
- */
- public abstract boolean doInsert(DB db, Object threadstate);
+ /**
+ * Initialize any state for a particular client thread. Since the scenario object
+ * will be shared among all threads, this is the place to create any state that is specific
+ * to one thread. To be clear, this means the returned object should be created anew on each
+ * call to initThread(); do not return the same object multiple times.
+ * The returned object will be passed to invocations of doInsert() and doTransaction()
+ * for this thread. There should be no side effects from this call; all state should be encapsulated
+ * in the returned object. If you have no state to retain for this thread, return null. (But if you have
+ * no state to retain for this thread, probably you don't need to override initThread().)
+ *
+ * @return false if the workload knows it is done for this thread. Client will terminate the thread.
+ * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
+ * traces from a file, return true when there are more to do, false when you are done.
+ */
+ public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
+ return null;
+ }
- /**
- * Do one transaction operation. Because it will be called concurrently from multiple client threads, this
- * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
- * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
- * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
- * synchronized, since each thread has its own threadstate instance.
- *
- * @return false if the workload knows it is done for this thread. Client will terminate the thread. Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read traces from a file, return true when there are more to do, false when you are done.
- */
- public abstract boolean doTransaction(DB db, Object threadstate);
-
- /**
- * Allows scheduling a request to stop the workload.
- */
- public void requestStop() {
- stopRequested.set(true);
- }
-
- /**
- * Check the status of the stop request flag.
- * @return true if stop was requested, false otherwise.
- */
- public boolean isStopRequested() {
- if (stopRequested.get() == true) return true;
- else return false;
- }
+ /**
+ * Cleanup the scenario. Called once, in the main client thread, after all operations have completed.
+ */
+ public void cleanup() throws WorkloadException {
+ }
+
+ /**
+ * Do one insert operation. Because it will be called concurrently from multiple client threads, this
+ * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
+ * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
+ * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
+ * synchronized, since each thread has its own threadstate instance.
+ */
+ public abstract boolean doInsert(DB db, Object threadstate);
+
+ /**
+ * Do one transaction operation. Because it will be called concurrently from multiple client threads, this
+ * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
+ * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
+ * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
+ * synchronized, since each thread has its own threadstate instance.
+ *
+ * @return false if the workload knows it is done for this thread. Client will terminate the thread.
+ * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
+ * traces from a file, return true when there are more to do, false when you are done.
+ */
+ public abstract boolean doTransaction(DB db, Object threadstate);
+
+ /**
+ * Allows scheduling a request to stop the workload.
+ */
+ public void requestStop() {
+ stopRequested.set(true);
+ }
+
+ /**
+ * Check the status of the stop request flag.
+ * @return true if stop was requested, false otherwise.
+ */
+ public boolean isStopRequested() {
+ return stopRequested.get();
+ }
}
diff --git a/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java
new file mode 100644
index 00000000..82406f02
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java
@@ -0,0 +1,389 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.generator;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A generator that produces strings of {@link #length} using a set of code points
+ * from {@link #characterSet}. Each time {@link #nextValue()} is executed, the string
+ * is incremented by one character. Eventually the string may rollover to the beginning
+ * and the user may choose to have the generator throw a NoSuchElementException at that
+ * point or continue incrementing. (By default the generator will continue incrementing).
+ *
+ * For example, if we set a length of 2 characters and the character set includes
+ * [A, B] then the generator output will be:
+ *
+ * AA
+ * AB
+ * BA
+ * BB
+ * AA <-- rolled over
+ *
+ *
+ * This class includes some default character sets to choose from including ASCII
+ * and plane 0 UTF.
+ */
+public class IncrementingPrintableStringGenerator extends Generator {
+
+ /** Default string length for the generator. */
+ public static final int DEFAULTSTRINGLENGTH = 8;
+
+ /**
+ * Set of all character types that include every symbol other than non-printable
+ * control characters.
+ */
+ public static final Set CHAR_TYPES_ALL_BUT_CONTROL;
+ static {
+ CHAR_TYPES_ALL_BUT_CONTROL = new HashSet(24);
+ // numbers
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.DECIMAL_DIGIT_NUMBER);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LETTER_NUMBER);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_NUMBER);
+
+ // letters
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.UPPERCASE_LETTER);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LOWERCASE_LETTER);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.TITLECASE_LETTER);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_LETTER);
+
+ // marks
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.COMBINING_SPACING_MARK);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.NON_SPACING_MARK);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.ENCLOSING_MARK);
+
+ // punctuation
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.CONNECTOR_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.DASH_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.START_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.END_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.INITIAL_QUOTE_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.FINAL_QUOTE_PUNCTUATION);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_PUNCTUATION);
+
+ // symbols
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.MATH_SYMBOL);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.CURRENCY_SYMBOL);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.MODIFIER_SYMBOL);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_SYMBOL);
+
+ // separators
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.SPACE_SEPARATOR);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LINE_SEPARATOR);
+ CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.PARAGRAPH_SEPARATOR);
+ }
+
+ /**
+ * Set of character types including only decimals, upper and lower case letters.
+ */
+ public static final Set CHAR_TYPES_BASIC_ALPHA;
+ static {
+ CHAR_TYPES_BASIC_ALPHA = new HashSet(2);
+ CHAR_TYPES_BASIC_ALPHA.add((int)Character.UPPERCASE_LETTER);
+ CHAR_TYPES_BASIC_ALPHA.add((int)Character.LOWERCASE_LETTER);
+ }
+
+ /**
+ * Set of character types including only decimals, upper and lower case letters.
+ */
+ public static final Set CHAR_TYPES_BASIC_ALPHANUMERICS;
+ static {
+ CHAR_TYPES_BASIC_ALPHANUMERICS = new HashSet(3);
+ CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.DECIMAL_DIGIT_NUMBER);
+ CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.UPPERCASE_LETTER);
+ CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.LOWERCASE_LETTER);
+ }
+
+ /**
+ * Set of character types including only decimals, letter numbers,
+ * other numbers, upper, lower, title case as well as letter modifiers
+ * and other letters.
+ */
+ public static final Set CHAR_TYPE_EXTENDED_ALPHANUMERICS;
+ static {
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS = new HashSet(8);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.DECIMAL_DIGIT_NUMBER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.LETTER_NUMBER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.OTHER_NUMBER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.UPPERCASE_LETTER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.LOWERCASE_LETTER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.TITLECASE_LETTER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.MODIFIER_LETTER);
+ CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.OTHER_LETTER);
+ }
+
+ /** The character set to iterate over. */
+ private final int[] characterSet;
+
+ /** An array indices matching a position in the output string. */
+ private int[] indices;
+
+ /** The length of the output string in characters. */
+ private final int length;
+
+ /** The last value returned by the generator. Should be null if {@link #nextValue()}
+ * has not been called.*/
+ private String lastValue;
+
+ /** Whether or not to throw an exception when the string rolls over. */
+ private boolean throwExceptionOnRollover;
+
+ /** Whether or not the generator has rolled over. */
+ private boolean hasRolledOver;
+
+ /**
+ * Generates strings of 8 characters using only the upper and lower case alphabetical
+ * characters from the ASCII set.
+ */
+ public IncrementingPrintableStringGenerator() {
+ this(DEFAULTSTRINGLENGTH, printableBasicAlphaASCIISet());
+ }
+
+ /**
+ * Generates strings of {@link #length} characters using only the upper and lower
+ * case alphabetical characters from the ASCII set.
+ * @param length The length of string to return from the generator.
+ * @throws IllegalArgumentException if the length is less than one.
+ */
+ public IncrementingPrintableStringGenerator(final int length) {
+ this(length, printableBasicAlphaASCIISet());
+ }
+
+ /**
+ * Generates strings of {@link #length} characters using the code points in
+ * {@link #characterSet}.
+ * @param length The length of string to return from the generator.
+ * @param characterSet A set of code points to choose from. Code points in the
+ * set can be in any order, not necessarily lexical.
+ * @throws IllegalArgumentException if the length is less than one or the character
+ * set has fewer than one code points.
+ */
+ public IncrementingPrintableStringGenerator(final int length, final int[] characterSet) {
+ if (length < 1) {
+ throw new IllegalArgumentException("Length must be greater than or equal to 1");
+ }
+ if (characterSet == null || characterSet.length < 1) {
+ throw new IllegalArgumentException("Character set must have at least one character");
+ }
+ this.length = length;
+ this.characterSet = characterSet;
+ indices = new int[length];
+ }
+
+ @Override
+ public String nextValue() {
+ if (hasRolledOver && throwExceptionOnRollover) {
+ throw new NoSuchElementException("The generator has rolled over to the beginning");
+ }
+
+ final StringBuilder buffer = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ buffer.append(Character.toChars(characterSet[indices[i]]));
+ }
+
+ // increment the indices;
+ for (int i = length - 1; i >= 0; --i) {
+ if (indices[i] >= characterSet.length - 1) {
+ indices[i] = 0;
+ if (i == 0 || characterSet.length == 1 && lastValue != null) {
+ hasRolledOver = true;
+ }
+ } else {
+ ++indices[i];
+ break;
+ }
+ }
+
+ lastValue = buffer.toString();
+ return lastValue;
+ }
+
+ @Override
+ public String lastValue() {
+ return lastValue;
+ }
+
+ /** @param exceptionOnRollover Whether or not to throw an exception on rollover. */
+ public void setThrowExceptionOnRollover(final boolean exceptionOnRollover) {
+ this.throwExceptionOnRollover = exceptionOnRollover;
+ }
+
+ /** @return Whether or not to throw an exception on rollover. */
+ public boolean getThrowExceptionOnRollover() {
+ return throwExceptionOnRollover;
+ }
+
+ /**
+ * Returns an array of printable code points with only the upper and lower
+ * case alphabetical characters from the basic ASCII set.
+ * @return An array of code points
+ */
+ public static int[] printableBasicAlphaASCIISet() {
+ final List validCharacters =
+ generatePrintableCharacterSet(0, 127, null, false, CHAR_TYPES_BASIC_ALPHA);
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Returns an array of printable code points with the upper and lower case
+ * alphabetical characters as well as the numeric values from the basic
+ * ASCII set.
+ * @return An array of code points
+ */
+ public static int[] printableBasicAlphaNumericASCIISet() {
+ final List validCharacters =
+ generatePrintableCharacterSet(0, 127, null, false, CHAR_TYPES_BASIC_ALPHANUMERICS);
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Returns an array of printable code points with the entire basic ASCII table,
+ * including spaces. Excludes new lines.
+ * @return An array of code points
+ */
+ public static int[] fullPrintableBasicASCIISet() {
+ final List validCharacters =
+ generatePrintableCharacterSet(32, 127, null, false, null);
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Returns an array of printable code points with the entire basic ASCII table,
+ * including spaces and new lines.
+ * @return An array of code points
+ */
+ public static int[] fullPrintableBasicASCIISetWithNewlines() {
+ final List validCharacters =new ArrayList();
+ validCharacters.add(10); // newline
+ validCharacters.addAll(generatePrintableCharacterSet(32, 127, null, false, null));
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Returns an array of printable code points the first plane of Unicode characters
+ * including only the alpha-numeric values.
+ * @return An array of code points
+ */
+ public static int[] printableAlphaNumericPlaneZeroSet() {
+ final List validCharacters =
+ generatePrintableCharacterSet(0, 65535, null, false, CHAR_TYPES_BASIC_ALPHANUMERICS);
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Returns an array of printable code points the first plane of Unicode characters
+ * including all printable characters.
+ * @return An array of code points
+ */
+ public static int[] fullPrintablePlaneZeroSet() {
+ final List validCharacters =
+ generatePrintableCharacterSet(0, 65535, null, false, CHAR_TYPES_ALL_BUT_CONTROL);
+ final int[] characterSet = new int[validCharacters.size()];
+ for (int i = 0; i < validCharacters.size(); i++) {
+ characterSet[i] = validCharacters.get(i);
+ }
+ return characterSet;
+ }
+
+ /**
+ * Generates a list of code points based on a range and filters.
+ * These can be used for generating strings with various ASCII and/or
+ * Unicode printable character sets for use with DBs that may have
+ * character limitations.
+ *
+ * Note that control, surrogate, format, private use and unassigned
+ * code points are skipped.
+ * @param startCodePoint The starting code point, inclusive.
+ * @param lastCodePoint The final code point, inclusive.
+ * @param characterTypesFilter An optional set of allowable character
+ * types. See {@link Character} for types.
+ * @param isFilterAllowableList Determines whether the {@code allowableTypes}
+ * set is inclusive or exclusive. When true, only those code points that
+ * appear in the list will be included in the resulting set. Otherwise
+ * matching code points are excluded.
+ * @param allowableTypes An optional list of code points for inclusion or
+ * exclusion.
+ * @return A list of code points matching the given range and filters. The
+ * list may be empty but is guaranteed not to be null.
+ */
+ public static List generatePrintableCharacterSet(
+ final int startCodePoint,
+ final int lastCodePoint,
+ final Set characterTypesFilter,
+ final boolean isFilterAllowableList,
+ final Set allowableTypes) {
+
+ // since we don't know the final size of the allowable character list we
+ // start with a list then we'll flatten it to an array.
+ final List validCharacters = new ArrayList(lastCodePoint);
+
+ for (int codePoint = startCodePoint; codePoint <= lastCodePoint; ++codePoint) {
+ if (allowableTypes != null &&
+ !allowableTypes.contains(Character.getType(codePoint))) {
+ continue;
+ } else {
+ // skip control points, formats, surrogates, etc
+ final int type = Character.getType(codePoint);
+ if (type == Character.CONTROL ||
+ type == Character.SURROGATE ||
+ type == Character.FORMAT ||
+ type == Character.PRIVATE_USE ||
+ type == Character.UNASSIGNED) {
+ continue;
+ }
+ }
+
+ if (characterTypesFilter != null) {
+ // if the filter is enabled then we need to make sure the code point
+ // is in the allowable list if it's a whitelist or that the code point
+ // is NOT in the list if it's a blacklist.
+ if ((isFilterAllowableList && !characterTypesFilter.contains(codePoint)) ||
+ (characterTypesFilter.contains(codePoint))) {
+ continue;
+ }
+ }
+
+ validCharacters.add(codePoint);
+ }
+ return validCharacters;
+ }
+
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java
new file mode 100644
index 00000000..4d53385e
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (c) 2016 YCSB Contributors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.generator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Generates a sequence of integers 0, 1, ...
+ */
+public class SequentialGenerator extends NumberGenerator {
+ final AtomicInteger counter;
+ int _interval, _countstart;
+
+ /**
+ * Create a counter that starts at countstart.
+ */
+ public SequentialGenerator(int countstart, int countend) {
+ counter = new AtomicInteger();
+ setLastValue(counter.get());
+ _countstart = countstart;
+ _interval = countend - countstart + 1;
+ }
+
+ /**
+ * If the generator returns numeric (integer) values, return the next value as an int.
+ * Default is to return -1, which is appropriate for generators that do not return numeric values.
+ */
+ public int nextInt() {
+ int ret = _countstart + counter.getAndIncrement() % _interval;
+ setLastValue(ret);
+ return ret;
+ }
+
+ @Override
+ public Number nextValue() {
+ int ret = _countstart + counter.getAndIncrement() % _interval;
+ setLastValue(ret);
+ return ret;
+ }
+
+ @Override
+ public Number lastValue() {
+ return counter.get() + 1;
+ }
+
+ @Override
+ public double mean() {
+ throw new UnsupportedOperationException("Can't compute mean of non-stationary distribution!");
+ }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
index fe1e9cc1..26d340cc 100644
--- a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
+++ b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
@@ -1,311 +1,314 @@
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.measurements;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* Collects latency measurements, and reports them when requested.
*
* @author cooperb
*
*/
public class Measurements {
/**
* All supported measurement types are defined in this enum.
*
*/
public enum MeasurementType {
HISTOGRAM,
HDRHISTOGRAM,
HDRHISTOGRAM_AND_HISTOGRAM,
HDRHISTOGRAM_AND_RAW,
TIMESERIES,
RAW
}
public static final String MEASUREMENT_TYPE_PROPERTY = "measurementtype";
private static final String MEASUREMENT_TYPE_PROPERTY_DEFAULT = "hdrhistogram";
public static final String MEASUREMENT_INTERVAL = "measurement.interval";
private static final String MEASUREMENT_INTERVAL_DEFAULT = "op";
+
+ public static final String MEASUREMENT_TRACK_JVM_PROPERTY = "measurement.trackjvm";
+ public static final String MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT = "false";
static Measurements singleton=null;
static Properties measurementproperties=null;
public static void setProperties(Properties props)
{
measurementproperties=props;
}
/**
* Return the singleton Measurements object.
*/
public synchronized static Measurements getMeasurements()
{
if (singleton==null)
{
singleton=new Measurements(measurementproperties);
}
return singleton;
}
final ConcurrentHashMap _opToMesurementMap;
final ConcurrentHashMap _opToIntendedMesurementMap;
final MeasurementType _measurementType;
final int _measurementInterval;
private Properties _props;
/**
* Create a new object with the specified properties.
*/
public Measurements(Properties props)
{
_opToMesurementMap=new ConcurrentHashMap();
_opToIntendedMesurementMap=new ConcurrentHashMap();
_props=props;
String mTypeString = _props.getProperty(MEASUREMENT_TYPE_PROPERTY, MEASUREMENT_TYPE_PROPERTY_DEFAULT);
if (mTypeString.equals("histogram"))
{
_measurementType = MeasurementType.HISTOGRAM;
}
else if (mTypeString.equals("hdrhistogram"))
{
_measurementType = MeasurementType.HDRHISTOGRAM;
}
else if (mTypeString.equals("hdrhistogram+histogram"))
{
_measurementType = MeasurementType.HDRHISTOGRAM_AND_HISTOGRAM;
}
else if (mTypeString.equals("hdrhistogram+raw"))
{
_measurementType = MeasurementType.HDRHISTOGRAM_AND_RAW;
}
else if (mTypeString.equals("timeseries"))
{
_measurementType = MeasurementType.TIMESERIES;
}
else if (mTypeString.equals("raw"))
{
_measurementType = MeasurementType.RAW;
}
else {
throw new IllegalArgumentException("unknown "+MEASUREMENT_TYPE_PROPERTY+"="+mTypeString);
}
String mIntervalString = _props.getProperty(MEASUREMENT_INTERVAL, MEASUREMENT_INTERVAL_DEFAULT);
if (mIntervalString.equals("op"))
{
_measurementInterval = 0;
}
else if (mIntervalString.equals("intended"))
{
_measurementInterval = 1;
}
else if (mIntervalString.equals("both"))
{
_measurementInterval = 2;
}
else {
throw new IllegalArgumentException("unknown "+MEASUREMENT_INTERVAL+"="+mIntervalString);
}
}
OneMeasurement constructOneMeasurement(String name)
{
switch (_measurementType)
{
case HISTOGRAM:
return new OneMeasurementHistogram(name, _props);
case HDRHISTOGRAM:
return new OneMeasurementHdrHistogram(name, _props);
case HDRHISTOGRAM_AND_HISTOGRAM:
return new TwoInOneMeasurement(name,
new OneMeasurementHdrHistogram("Hdr"+name, _props),
new OneMeasurementHistogram("Bucket"+name, _props));
case HDRHISTOGRAM_AND_RAW:
return new TwoInOneMeasurement(name,
new OneMeasurementHdrHistogram("Hdr"+name, _props),
new OneMeasurementHistogram("Raw"+name, _props));
case TIMESERIES:
return new OneMeasurementTimeSeries(name, _props);
case RAW:
return new OneMeasurementRaw(name, _props);
default:
throw new AssertionError("Impossible to be here. Dead code reached. Bugs?");
}
}
static class StartTimeHolder {
long time;
long startTime(){
if(time == 0) {
return System.nanoTime();
}
else {
return time;
}
}
}
ThreadLocal tlIntendedStartTime = new ThreadLocal() {
protected StartTimeHolder initialValue() {
return new StartTimeHolder();
}
};
public void setIntendedStartTimeNs(long time) {
if(_measurementInterval==0)
return;
tlIntendedStartTime.get().time=time;
}
public long getIntendedtartTimeNs() {
if(_measurementInterval==0)
return 0L;
return tlIntendedStartTime.get().startTime();
}
/**
* Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured
* value.
*/
public void measure(String operation, int latency)
{
if(_measurementInterval==1)
return;
try
{
OneMeasurement m = getOpMeasurement(operation);
m.measure(latency);
}
// This seems like a terribly hacky way to cover up for a bug in the measurement code
catch (java.lang.ArrayIndexOutOfBoundsException e)
{
System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing");
e.printStackTrace();
e.printStackTrace(System.out);
}
}
/**
* Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured
* value.
*/
public void measureIntended(String operation, int latency)
{
if(_measurementInterval==0)
return;
try
{
OneMeasurement m = getOpIntendedMeasurement(operation);
m.measure(latency);
}
// This seems like a terribly hacky way to cover up for a bug in the measurement code
catch (java.lang.ArrayIndexOutOfBoundsException e)
{
System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing");
e.printStackTrace();
e.printStackTrace(System.out);
}
}
private OneMeasurement getOpMeasurement(String operation) {
OneMeasurement m = _opToMesurementMap.get(operation);
if(m == null)
{
m = constructOneMeasurement(operation);
OneMeasurement oldM = _opToMesurementMap.putIfAbsent(operation, m);
if(oldM != null)
{
m = oldM;
}
}
return m;
}
private OneMeasurement getOpIntendedMeasurement(String operation) {
OneMeasurement m = _opToIntendedMesurementMap.get(operation);
if(m == null)
{
final String name = _measurementInterval==1 ? operation : "Intended-" + operation;
m = constructOneMeasurement(name);
OneMeasurement oldM = _opToIntendedMesurementMap.putIfAbsent(operation, m);
if(oldM != null)
{
m = oldM;
}
}
return m;
}
/**
* Report a return code for a single DB operation.
*/
public void reportStatus(final String operation, final Status status)
{
OneMeasurement m = _measurementInterval==1 ?
getOpIntendedMeasurement(operation) :
getOpMeasurement(operation);
m.reportStatus(status);
}
/**
* Export the current measurements to a suitable format.
*
* @param exporter Exporter representing the type of format to write to.
* @throws IOException Thrown if the export failed.
*/
public void exportMeasurements(MeasurementsExporter exporter) throws IOException
{
for (OneMeasurement measurement : _opToMesurementMap.values())
{
measurement.exportMeasurements(exporter);
}
for (OneMeasurement measurement : _opToIntendedMesurementMap.values())
{
measurement.exportMeasurements(exporter);
}
}
/**
* Return a one line summary of the measurements.
*/
public synchronized String getSummary()
{
String ret="";
for (OneMeasurement m : _opToMesurementMap.values())
{
ret += m.getSummary()+" ";
}
for (OneMeasurement m : _opToIntendedMesurementMap.values())
{
ret += m.getSummary()+" ";
}
return ret;
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
index ad127d8b..b9ff7e73 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
@@ -1,804 +1,863 @@
/**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import java.util.Properties;
import com.yahoo.ycsb.*;
import com.yahoo.ycsb.generator.AcknowledgedCounterGenerator;
+import com.yahoo.ycsb.generator.ConstantIntegerGenerator;
import com.yahoo.ycsb.generator.CounterGenerator;
import com.yahoo.ycsb.generator.DiscreteGenerator;
import com.yahoo.ycsb.generator.ExponentialGenerator;
-import com.yahoo.ycsb.generator.ConstantIntegerGenerator;
-import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.HistogramGenerator;
+import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;
+import com.yahoo.ycsb.generator.SequentialGenerator;
import com.yahoo.ycsb.generator.SkewedLatestGenerator;
import com.yahoo.ycsb.generator.UniformIntegerGenerator;
import com.yahoo.ycsb.generator.ZipfianGenerator;
import com.yahoo.ycsb.measurements.Measurements;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Vector;
import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
+import java.util.Vector;
+
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The
* relative proportion of different kinds of operations, and other properties of the workload,
* are controlled by parameters specified at runtime.
*
* Properties to control the client:
*
* fieldcount : the number of fields in a record (default: 10)
* fieldlength : the size of each field (default: 100)
* readallfields : should reads read all fields (true) or just one (false) (default: true)
* writeallfields : should updates and read/modify/writes update all fields (true) or just
* one (false) (default: false)
* readproportion : what proportion of operations should be reads (default: 0.95)
* updateproportion : what proportion of operations should be updates (default: 0.05)
* insertproportion : what proportion of operations should be inserts (default: 0)
* scanproportion : what proportion of operations should be scans (default: 0)
* readmodifywriteproportion : what proportion of operations should be read a record,
* modify it, write it back (default: 0)
* requestdistribution : what distribution should be used to select the records to operate
- * on - uniform, zipfian, hotspot, or latest (default: uniform)
+ * on - uniform, zipfian, hotspot, sequential, exponential or latest (default: uniform)
* maxscanlength : for scans, what is the maximum number of records to scan (default: 1000)
* scanlengthdistribution : for scans, what distribution should be used to choose the
* number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
+ * insertstart : for parallel loads and runs, defines the starting record for this
+ * YCSB instance (default: 0)
+ * insertcount : for parallel loads and runs, defines the number of records for this
+ * YCSB instance (default: recordcount)
+ * zeropadding : for generating a record sequence compatible with string sort order by
+ * 0 padding the record number. Controls the number of 0s to use for padding. (default: 1)
+ * For example for row 5, with zeropadding=1 you get 'user5' key and with zeropading=8 you get
+ * 'user00000005' key. In order to see its impact, zeropadding needs to be bigger than number of
+ * digits in the record number.
* insertorder : should records be inserted in order by key ("ordered"), or in hashed
* order ("hashed") (default: hashed)
*
*/
public class CoreWorkload extends Workload {
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY = "table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT = "usertable";
public static String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10";
int fieldcount;
private List fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian"
- * (favoring short records), "constant", and "histogram".
+ * (favouring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the
- * fieldlength property. If "histogram", then the
- * histogram will be read from the filename specified in the "fieldlengthhistogram" property.
+ * fieldlength property. If "histogram", then the histogram will be read from the filename
+ * specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY = "fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100";
/**
* The name of a property that specifies the filename containing the field length histogram (only
* used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that
* start with "FIELD_LENGTH_".
*/
NumberGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of
* a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY = "readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true";
boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true)
* of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false";
boolean writeallfields;
/**
* The name of the property for deciding whether to check all returned
* data against the formation template to ensure data integrity.
*/
public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity";
/**
* The default value for the dataintegrity property.
*/
public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false";
/**
* Set to true if want to check correctness of reads. Must also
* be set to true during loading phase to function.
*/
private boolean dataintegrity;
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY = "readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY = "insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY = "scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are
* "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution";
/**
- * The default distribution of requests across the keyspace
+ * The default distribution of requests across the keyspace.
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
+ /**
+ * The name of the property for adding zero padding to record numbers in order to match
+ * string sort order. Controls the number of 0s to left pad with.
+ */
+ public static final String ZERO_PADDING_PROPERTY = "zeropadding";
+
/**
- * The name of the property for the max scan length (number of records)
+ * The default zero padding value. Matches integer sort order
+ */
+ public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1";
+
+
+ /**
+ * The name of the property for the max scan length (number of records).
*/
public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian"
* (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY = "insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed";
/**
* Percentage data items that constitute the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction";
/**
* Default value of the size of the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2";
/**
* Percentage operations that access the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction";
/**
* Default value of the percentage operations accessing the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8";
/**
* How many times to retry when insertion of a single item to a DB fails.
*/
public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit";
public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0";
/**
* On average, how long to wait between the retries, in seconds.
*/
public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval";
public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3";
NumberGenerator keysequence;
DiscreteGenerator operationchooser;
NumberGenerator keychooser;
NumberGenerator fieldchooser;
AcknowledgedCounterGenerator transactioninsertkeysequence;
NumberGenerator scanlength;
boolean orderedinserts;
int recordcount;
+ int zeropadding;
int insertionRetryLimit;
int insertionRetryInterval;
private Measurements _measurements = Measurements.getMeasurements();
protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
NumberGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength =
Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(
FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if (fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if (fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch (IOException e) {
throw new WorkloadException(
"Couldn't read field length histogram file: " + fieldlengthhistogram, e);
}
} else {
throw new WorkloadException(
"Unknown field length distribution \"" + fieldlengthdistribution + "\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
@Override
public void init(Properties p) throws WorkloadException {
table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
fieldcount =
Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT));
fieldnames = new ArrayList();
for (int i = 0; i < fieldcount; i++) {
fieldnames.add("field" + i);
}
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
-
- double readproportion = Double.parseDouble(
- p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT));
- double updateproportion = Double.parseDouble(
- p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT));
- double insertproportion = Double.parseDouble(
- p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
- double scanproportion = Double.parseDouble(
- p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT));
- double readmodifywriteproportion = Double.parseDouble(p.getProperty(
- READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
+
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
- if (recordcount == 0)
+ if (recordcount == 0) {
recordcount = Integer.MAX_VALUE;
+ }
String requestdistrib =
p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength =
Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int insertstart =
Integer.parseInt(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT));
+ int insertcount =
+ Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart)));
+ // Confirm valid values for insertstart and insertcount in relation to recordcount
+ if (recordcount < (insertstart + insertcount)) {
+ System.err.println("Invalid combination of insertstart, insertcount and recordcount.");
+ System.err.println("recordcount must be bigger than insertstart + insertcount.");
+ System.exit(-1);
+ }
+ zeropadding =
+ Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(
p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields = Boolean.parseBoolean(
p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(
p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
// Confirm that fieldlengthgenerator returns a constant if data
// integrity check requested.
- if (dataintegrity
- && !(p.getProperty(
- FIELD_LENGTH_DISTRIBUTION_PROPERTY,
- FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
+ if (dataintegrity && !(p.getProperty(
+ FIELD_LENGTH_DISTRIBUTION_PROPERTY,
+ FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
- if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")
- == 0) {
+ if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) {
orderedinserts = false;
} else if (requestdistrib.compareTo("exponential") == 0) {
double percentile = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount * frac);
} else {
orderedinserts = true;
}
keysequence = new CounterGenerator(insertstart);
- operationchooser = new DiscreteGenerator();
- if (readproportion > 0) {
- operationchooser.addValue(readproportion, "READ");
- }
-
- if (updateproportion > 0) {
- operationchooser.addValue(updateproportion, "UPDATE");
- }
-
- if (insertproportion > 0) {
- operationchooser.addValue(insertproportion, "INSERT");
- }
-
- if (scanproportion > 0) {
- operationchooser.addValue(scanproportion, "SCAN");
- }
-
- if (readmodifywriteproportion > 0) {
- operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE");
- }
+ operationchooser = createOperationGenerator(p);
transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform") == 0) {
- keychooser = new UniformIntegerGenerator(0, recordcount - 1);
- } else if (requestdistrib.compareTo("zipfian") == 0) {
+ keychooser = new UniformIntegerGenerator(insertstart, insertstart + insertcount - 1);
+ } else if (requestdistrib.compareTo("sequential") == 0) {
+ keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1);
+ }else if (requestdistrib.compareTo("zipfian") == 0) {
// it does this by generating a random "next key" in part by taking the modulus over the
// number of keys.
// If the number of keys changes, this would shift the modulus, and we don't want that to
// change which keys are popular so we'll actually construct the scrambled zipfian generator
// with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict
// the number of inserts, and tell the scrambled zipfian generator the number of existing keys
// plus the number of predicted keys as the total keyspace. then, if the generator picks a key
// that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of
// the keyspace doesn't change from the perspective of the scrambled zipfian generator
-
+ final double insertproportion = Double.parseDouble(
+ p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor
- keychooser = new ScrambledZipfianGenerator(recordcount + expectednewkeys);
+ keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys);
} else if (requestdistrib.compareTo("latest") == 0) {
keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
- keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction);
+ keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1,
+ hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
fieldchooser = new UniformIntegerGenerator(0, fieldcount - 1);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformIntegerGenerator(1, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(1, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
insertionRetryLimit = Integer.parseInt(p.getProperty(
INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT));
-
insertionRetryInterval = Integer.parseInt(p.getProperty(
INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT));
}
public String buildKeyName(long keynum) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
- return "user" + keynum;
+ String value = Long.toString(keynum);
+ int fill = zeropadding - value.length();
+ String prekey = "user";
+ for(int i=0; i buildSingleValue(String key) {
HashMap value = new HashMap();
String fieldkey = fieldnames.get(fieldchooser.nextValue().intValue());
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue());
}
value.put(fieldkey, data);
return value;
}
/**
* Builds values for all fields.
*/
private HashMap buildValues(String key) {
HashMap values = new HashMap();
for (String fieldkey : fieldnames) {
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue());
}
values.put(fieldkey, data);
}
return values;
}
/**
* Build a deterministic value given the key information.
*/
private String buildDeterministicValue(String key, String fieldkey) {
int size = fieldlengthgenerator.nextValue().intValue();
StringBuilder sb = new StringBuilder(size);
sb.append(key);
sb.append(':');
sb.append(fieldkey);
while (sb.length() < size) {
sb.append(':');
sb.append(sb.toString().hashCode());
}
sb.setLength(size);
return sb.toString();
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads,
* this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
@Override
public boolean doInsert(DB db, Object threadstate) {
int keynum = keysequence.nextValue().intValue();
String dbkey = buildKeyName(keynum);
HashMap values = buildValues(dbkey);
Status status;
int numOfRetries = 0;
do {
status = db.insert(table, dbkey, values);
if (status == Status.OK) {
break;
}
// Retry if configured. Without retrying, the load process will fail
// even if one single insertion fails. User can optionally configure
// an insertion retry limit (default is 0) to enable retry.
if (++numOfRetries <= insertionRetryLimit) {
System.err.println("Retrying insertion, retry count: " + numOfRetries);
try {
// Sleep for a random number between [0.8, 1.2)*insertionRetryInterval.
int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random()));
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
break;
}
} else {
System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries +
"Insertion Retry Limit: " + insertionRetryLimit);
break;
}
} while (true);
return (status == Status.OK);
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client
* threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
@Override
public boolean doTransaction(DB db, Object threadstate) {
switch (operationchooser.nextString()) {
-
- case "READ":
- doTransactionRead(db);
- break;
- case "UPDATE":
- doTransactionUpdate(db);
- break;
- case "INSERT":
- doTransactionInsert(db);
- break;
- case "SCAN":
- doTransactionScan(db);
- break;
- default:
- doTransactionReadModifyWrite(db);
+ case "READ":
+ doTransactionRead(db);
+ break;
+ case "UPDATE":
+ doTransactionUpdate(db);
+ break;
+ case "INSERT":
+ doTransactionInsert(db);
+ break;
+ case "SCAN":
+ doTransactionScan(db);
+ break;
+ default:
+ doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Results are reported in the first three buckets of the histogram under
* the label "VERIFY".
* Bucket 0 means the expected data was returned.
* Bucket 1 means incorrect data was returned.
* Bucket 2 means null data was returned when some data was expected.
*/
protected void verifyRow(String key, HashMap cells) {
Status verifyStatus = Status.OK;
long startTime = System.nanoTime();
if (!cells.isEmpty()) {
for (Map.Entry entry : cells.entrySet()) {
if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) {
verifyStatus = Status.UNEXPECTED_STATE;
break;
}
}
} else {
// This assumes that null data is never valid
verifyStatus = Status.ERROR;
}
long endTime = System.nanoTime();
_measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
_measurements.reportStatus("VERIFY", verifyStatus);
}
int nextKeynum() {
int keynum;
if (keychooser instanceof ExponentialGenerator) {
do {
keynum = transactioninsertkeysequence.lastValue() - keychooser.nextValue().intValue();
} while (keynum < 0);
} else {
do {
keynum = keychooser.nextValue().intValue();
} while (keynum > transactioninsertkeysequence.lastValue());
}
return keynum;
}
public void doTransactionRead(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
} else if (dataintegrity) {
// pass the full field list if dataintegrity is on for verification
fields = new HashSet(fieldnames);
}
HashMap cells = new HashMap();
db.read(table, keyname, fields, cells);
if (dataintegrity) {
verifyRow(keyname, cells);
}
}
public void doTransactionReadModifyWrite(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
}
HashMap values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
// do the transaction
HashMap cells = new HashMap();
long ist = _measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
db.read(table, keyname, fields, cells);
db.update(table, keyname, values);
long en = System.nanoTime();
if (dataintegrity) {
verifyRow(keyname, cells);
}
_measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000));
_measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000));
}
public void doTransactionScan(DB db) {
// choose a random key
int keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
// choose a random scan length
int len = scanlength.nextValue().intValue();
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
}
db.scan(table, startkeyname, len, fields, new Vector>());
}
public void doTransactionUpdate(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashMap values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
db.update(table, keyname, values);
}
public void doTransactionInsert(DB db) {
// choose the next key
int keynum = transactioninsertkeysequence.nextValue();
try {
String dbkey = buildKeyName(keynum);
HashMap values = buildValues(dbkey);
db.insert(table, dbkey, values);
} finally {
transactioninsertkeysequence.acknowledge(keynum);
}
}
+
+ /**
+ * Creates a weighted discrete values with database operations for a workload to perform.
+ * Weights/proportions are read from the properties list and defaults are used
+ * when values are not configured.
+ * Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE".
+ * @param p The properties list to pull weights from.
+ * @return A generator that can be used to determine the next operation to perform.
+ * @throws IllegalArgumentException if the properties object was null.
+ */
+ public static DiscreteGenerator createOperationGenerator(final Properties p) {
+ if (p == null) {
+ throw new IllegalArgumentException("Properties object cannot be null");
+ }
+ final double readproportion = Double.parseDouble(
+ p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT));
+ final double updateproportion = Double.parseDouble(
+ p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT));
+ final double insertproportion = Double.parseDouble(
+ p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
+ final double scanproportion = Double.parseDouble(
+ p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT));
+ final double readmodifywriteproportion = Double.parseDouble(p.getProperty(
+ READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
+
+ final DiscreteGenerator operationchooser = new DiscreteGenerator();
+ if (readproportion > 0) {
+ operationchooser.addValue(readproportion, "READ");
+ }
+
+ if (updateproportion > 0) {
+ operationchooser.addValue(updateproportion, "UPDATE");
+ }
+
+ if (insertproportion > 0) {
+ operationchooser.addValue(insertproportion, "INSERT");
+ }
+
+ if (scanproportion > 0) {
+ operationchooser.addValue(scanproportion, "SCAN");
+ }
+
+ if (readmodifywriteproportion > 0) {
+ operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE");
+ }
+ return operationchooser;
+ }
}
diff --git a/core/src/test/java/com/yahoo/ycsb/TestUtils.java b/core/src/test/java/com/yahoo/ycsb/TestUtils.java
index cde51776..a84eca86 100644
--- a/core/src/test/java/com/yahoo/ycsb/TestUtils.java
+++ b/core/src/test/java/com/yahoo/ycsb/TestUtils.java
@@ -1,129 +1,146 @@
/**
* Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Arrays;
import org.testng.annotations.Test;
public class TestUtils {
@Test
public void bytesToFromLong() throws Exception {
byte[] bytes = new byte[8];
assertEquals(Utils.bytesToLong(bytes), 0L);
assertArrayEquals(Utils.longToBytes(0), bytes);
bytes[7] = 1;
assertEquals(Utils.bytesToLong(bytes), 1L);
assertArrayEquals(Utils.longToBytes(1L), bytes);
bytes = new byte[] { 127, -1, -1, -1, -1, -1, -1, -1 };
assertEquals(Utils.bytesToLong(bytes), Long.MAX_VALUE);
assertArrayEquals(Utils.longToBytes(Long.MAX_VALUE), bytes);
bytes = new byte[] { -128, 0, 0, 0, 0, 0, 0, 0 };
assertEquals(Utils.bytesToLong(bytes), Long.MIN_VALUE);
assertArrayEquals(Utils.longToBytes(Long.MIN_VALUE), bytes);
bytes = new byte[] { (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF };
assertEquals(Utils.bytesToLong(bytes), -1L);
assertArrayEquals(Utils.longToBytes(-1L), bytes);
// if the array is too long we just skip the remainder
bytes = new byte[] { 0, 0, 0, 0, 0, 0, 0, 1, 42, 42, 42 };
assertEquals(Utils.bytesToLong(bytes), 1L);
}
@Test
public void bytesToFromDouble() throws Exception {
byte[] bytes = new byte[8];
assertEquals(Utils.bytesToDouble(bytes), 0, 0.0001);
assertArrayEquals(Utils.doubleToBytes(0), bytes);
bytes = new byte[] { 63, -16, 0, 0, 0, 0, 0, 0 };
assertEquals(Utils.bytesToDouble(bytes), 1, 0.0001);
assertArrayEquals(Utils.doubleToBytes(1), bytes);
bytes = new byte[] { -65, -16, 0, 0, 0, 0, 0, 0 };
assertEquals(Utils.bytesToDouble(bytes), -1, 0.0001);
assertArrayEquals(Utils.doubleToBytes(-1), bytes);
bytes = new byte[] { 127, -17, -1, -1, -1, -1, -1, -1 };
assertEquals(Utils.bytesToDouble(bytes), Double.MAX_VALUE, 0.0001);
assertArrayEquals(Utils.doubleToBytes(Double.MAX_VALUE), bytes);
bytes = new byte[] { 0, 0, 0, 0, 0, 0, 0, 1 };
assertEquals(Utils.bytesToDouble(bytes), Double.MIN_VALUE, 0.0001);
assertArrayEquals(Utils.doubleToBytes(Double.MIN_VALUE), bytes);
bytes = new byte[] { 127, -8, 0, 0, 0, 0, 0, 0 };
assertTrue(Double.isNaN(Utils.bytesToDouble(bytes)));
assertArrayEquals(Utils.doubleToBytes(Double.NaN), bytes);
bytes = new byte[] { 63, -16, 0, 0, 0, 0, 0, 0, 42, 42, 42 };
assertEquals(Utils.bytesToDouble(bytes), 1, 0.0001);
}
@Test (expectedExceptions = NullPointerException.class)
public void bytesToLongNull() throws Exception {
Utils.bytesToLong(null);
}
@Test (expectedExceptions = IndexOutOfBoundsException.class)
public void bytesToLongTooShort() throws Exception {
Utils.bytesToLong(new byte[] { 0, 0, 0, 0, 0, 0, 0 });
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void bytesToDoubleTooShort() throws Exception {
Utils.bytesToDouble(new byte[] { 0, 0, 0, 0, 0, 0, 0 });
}
+ @Test
+ public void jvmUtils() throws Exception {
+ // This should ALWAYS return at least one thread.
+ assertTrue(Utils.getActiveThreadCount() > 0);
+ // This should always be greater than 0 or something is goofed up in the JVM.
+ assertTrue(Utils.getUsedMemoryBytes() > 0);
+ // Some operating systems may not implement this so we don't have a good
+ // test. Just make sure it doesn't throw an exception.
+ Utils.getSystemLoadAverage();
+ // This will probably be zero but should never be negative.
+ assertTrue(Utils.getGCTotalCollectionCount() >= 0);
+ // Could be zero similar to GC total collection count
+ assertTrue(Utils.getGCTotalTime() >= 0);
+ // Could be empty
+ assertTrue(Utils.getGCStatst().size() >= 0);
+ }
+
/**
* Since this version of TestNG doesn't appear to have an assertArrayEquals,
* this will compare the two to make sure they're the same.
* @param actual Actual array to validate
* @param expected What the array should contain
* @throws AssertionError if the test fails.
*/
public void assertArrayEquals(final byte[] actual, final byte[] expected) {
if (actual == null && expected != null) {
throw new AssertionError("Expected " + Arrays.toString(expected) +
" but found [null]");
}
if (actual != null && expected == null) {
throw new AssertionError("Expected [null] but found " +
Arrays.toString(actual));
}
if (actual.length != expected.length) {
throw new AssertionError("Expected length " + expected.length +
" but found " + actual.length);
}
for (int i = 0; i < expected.length; i++) {
if (actual[i] != expected[i]) {
throw new AssertionError("Expected byte [" + expected[i] +
"] at index " + i + " but found [" + actual[i] + "]");
}
}
}
}
\ No newline at end of file
diff --git a/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java b/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java
new file mode 100644
index 00000000..eea3d507
--- /dev/null
+++ b/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.generator;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+import java.util.NoSuchElementException;
+
+import org.testng.annotations.Test;
+
+public class TestIncrementingPrintableStringGenerator {
+ private final static int[] ATOC = new int[] { 65, 66, 67 };
+
+ @Test
+ public void rolloverOK() throws Exception {
+ final IncrementingPrintableStringGenerator gen =
+ new IncrementingPrintableStringGenerator(2, ATOC);
+
+ assertNull(gen.lastValue());
+ assertEquals(gen.nextValue(), "AA");
+ assertEquals(gen.lastValue(), "AA");
+ assertEquals(gen.nextValue(), "AB");
+ assertEquals(gen.lastValue(), "AB");
+ assertEquals(gen.nextValue(), "AC");
+ assertEquals(gen.lastValue(), "AC");
+ assertEquals(gen.nextValue(), "BA");
+ assertEquals(gen.lastValue(), "BA");
+ assertEquals(gen.nextValue(), "BB");
+ assertEquals(gen.lastValue(), "BB");
+ assertEquals(gen.nextValue(), "BC");
+ assertEquals(gen.lastValue(), "BC");
+ assertEquals(gen.nextValue(), "CA");
+ assertEquals(gen.lastValue(), "CA");
+ assertEquals(gen.nextValue(), "CB");
+ assertEquals(gen.lastValue(), "CB");
+ assertEquals(gen.nextValue(), "CC");
+ assertEquals(gen.lastValue(), "CC");
+ assertEquals(gen.nextValue(), "AA"); // <-- rollover
+ assertEquals(gen.lastValue(), "AA");
+ }
+
+ @Test
+ public void rolloverOneCharacterOK() throws Exception {
+ // It would be silly to create a generator with one character.
+ final IncrementingPrintableStringGenerator gen =
+ new IncrementingPrintableStringGenerator(2, new int[] { 65 });
+ for (int i = 0; i < 5; i++) {
+ assertEquals(gen.nextValue(), "AA");
+ }
+ }
+
+ @Test
+ public void rolloverException() throws Exception {
+ final IncrementingPrintableStringGenerator gen =
+ new IncrementingPrintableStringGenerator(2, ATOC);
+ gen.setThrowExceptionOnRollover(true);
+
+ int i = 0;
+ try {
+ while(i < 11) {
+ ++i;
+ gen.nextValue();
+ }
+ fail("Expected NoSuchElementException");
+ } catch (NoSuchElementException e) {
+ assertEquals(i, 10);
+ }
+ }
+
+ @Test
+ public void rolloverOneCharacterException() throws Exception {
+ // It would be silly to create a generator with one character.
+ final IncrementingPrintableStringGenerator gen =
+ new IncrementingPrintableStringGenerator(2, new int[] { 65 });
+ gen.setThrowExceptionOnRollover(true);
+
+ int i = 0;
+ try {
+ while(i < 3) {
+ ++i;
+ gen.nextValue();
+ }
+ fail("Expected NoSuchElementException");
+ } catch (NoSuchElementException e) {
+ assertEquals(i, 2);
+ }
+ }
+
+ @Test
+ public void invalidLengths() throws Exception {
+ try {
+ new IncrementingPrintableStringGenerator(0, ATOC);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+
+ try {
+ new IncrementingPrintableStringGenerator(-42, ATOC);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+ }
+
+ @Test
+ public void invalidCharacterSets() throws Exception {
+ try {
+ new IncrementingPrintableStringGenerator(2, null);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+
+ try {
+ new IncrementingPrintableStringGenerator(2, new int[] {});
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+ }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java
similarity index 61%
copy from core/src/main/java/com/yahoo/ycsb/DBFactory.java
copy to core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java
index 18f7f5e1..d52f29b2 100644
--- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java
+++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java
@@ -1,52 +1,70 @@
/**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
+package com.yahoo.ycsb.workloads;
-package com.yahoo.ycsb;
+import static org.testng.Assert.assertTrue;
import java.util.Properties;
-/**
- * Creates a DB layer by dynamically classloading the specified DB class.
- */
-public class DBFactory
-{
- @SuppressWarnings("unchecked")
- public static DB newDB(String dbname, Properties properties) throws UnknownDBException
- {
- ClassLoader classLoader = DBFactory.class.getClassLoader();
+import org.testng.annotations.Test;
- DB ret=null;
+import com.yahoo.ycsb.generator.DiscreteGenerator;
- try
- {
- Class dbclass = classLoader.loadClass(dbname);
- //System.out.println("dbclass.getName() = " + dbclass.getName());
-
- ret=(DB)dbclass.newInstance();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return null;
- }
-
- ret.setProperties(properties);
+public class TestCoreWorkload {
- return new DBWrapper(ret);
- }
-
-}
+ @Test
+ public void createOperationChooser() {
+ final Properties p = new Properties();
+ p.setProperty(CoreWorkload.READ_PROPORTION_PROPERTY, "0.20");
+ p.setProperty(CoreWorkload.UPDATE_PROPORTION_PROPERTY, "0.20");
+ p.setProperty(CoreWorkload.INSERT_PROPORTION_PROPERTY, "0.20");
+ p.setProperty(CoreWorkload.SCAN_PROPORTION_PROPERTY, "0.20");
+ p.setProperty(CoreWorkload.READMODIFYWRITE_PROPORTION_PROPERTY, "0.20");
+ final DiscreteGenerator generator = CoreWorkload.createOperationGenerator(p);
+ final int[] counts = new int[5];
+
+ for (int i = 0; i < 100; ++i) {
+ switch (generator.nextString()) {
+ case "READ":
+ ++counts[0];
+ break;
+ case "UPDATE":
+ ++counts[1];
+ break;
+ case "INSERT":
+ ++counts[2];
+ break;
+ case "SCAN":
+ ++counts[3];
+ break;
+ default:
+ ++counts[4];
+ }
+ }
+
+ for (int i : counts) {
+ // Doesn't do a wonderful job of equal distribution, but in a hundred, if we
+ // don't see at least one of each operation then the generator is really broke.
+ assertTrue(i > 1);
+ }
+ }
+
+ @Test (expectedExceptions = IllegalArgumentException.class)
+ public void createOperationChooserNullProperties() {
+ CoreWorkload.createOperationGenerator(null);
+ }
+}
\ No newline at end of file
diff --git a/couchbase/pom.xml b/couchbase/pom.xml
index 5f4780aa..6f3157e9 100644
--- a/couchbase/pom.xml
+++ b/couchbase/pom.xml
@@ -1,55 +1,55 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
couchbase-binding
Couchbase Binding
jar
com.couchbase.client
couchbase-client
${couchbase.version}
com.yahoo.ycsb
core
${project.version}
provided
com.fasterxml.jackson.core
jackson-databind
2.2.2
org.slf4j
slf4j-api
diff --git a/couchbase2/README.md b/couchbase2/README.md
new file mode 100644
index 00000000..786060da
--- /dev/null
+++ b/couchbase2/README.md
@@ -0,0 +1,137 @@
+
+
+# Couchbase (SDK 2.x) Driver for YCSB
+This driver is a binding for the YCSB facilities to operate against a Couchbase Server cluster. It uses the official
+Couchbase Java SDK (version 2.x) and provides a rich set of configuration options, including support for the N1QL
+query language.
+
+## Quickstart
+
+### 1. Start Couchbase Server
+You need to start a single node or a cluster to point the client at. Please see [http://couchbase.com](couchbase.com)
+for more details and instructions.
+
+### 2. Set up YCSB
+You can either download the release zip and run it, or just clone from master.
+
+```
+git clone git://github.com/brianfrankcooper/YCSB.git
+cd YCSB
+mvn clean package
+```
+
+### 3. Run the Workload
+Before you can actually run the workload, you need to "load" the data first.
+
+```
+bin/ycsb load couchbase2 -s -P workloads/workloada
+```
+
+Then, you can run the workload:
+
+```
+bin/ycsb run couchbase2 -s -P workloads/workloada
+```
+
+Please see the general instructions in the `doc` folder if you are not sure how it all works. You can apply a property
+(as seen in the next section) like this:
+
+```
+bin/ycsb run couchbase -s -P workloads/workloada -p couchbase.epoll=true
+```
+
+## N1QL Index Setup
+In general, every time N1QL is used (either implicitly through using `workloade` or through setting `kv=false`) some
+kind of index must be present to make it work. Depending on the workload and data size, choosing the right index is
+crucial at runtime in order to get the best performance. If in doubt, please ask at the
+[forums](http://forums.couchbase.com) or get in touch with our team at Couchbase.
+
+For `workloade` and the default `readallfields=true` we recommend creating the following index, and if using Couchbase
+Server 4.5 or later with the "Memory Optimized Index" setting on the bucket.
+
+```
+CREATE INDEX wle_idx ON `bucketname`(meta().id);
+```
+
+For other workloads, different index setups might be even more performant.
+
+## Performance Considerations
+As it is with any benchmark, there are lot of knobs to tune in order to get great or (if you are reading
+this and trying to write a competitor benchmark ;-)) bad performance.
+
+The first setting you should consider, if you are running on Linux 64bit is setting `-p couchbase.epoll=true`. This will
+then turn on the Epoll IO mechanisms in the underlying Netty library which provides better performance since it has less
+synchronization to do than the NIO default. This only works on Linux, but you are benchmarking on the OS you are
+deploying to, right?
+
+The second option, `boost`, sounds more magic than it actually is. By default this benchmark trades CPU for throughput,
+but this can be disabled by setting `-p couchbase.boost=0`. This defaults to 3, and 3 is the number of event loops run
+in the IO layer. 3 is a reasonable default but you should set it to the number of **physical** cores you have available
+on the machine if you only plan to run one YCSB instance. Make sure (using profiling) to max out your cores, but don't
+overdo it.
+
+## Sync vs Async
+By default, since YCSB is sync the code will always wait for the operation to complete. In some cases it can be useful
+to just "drive load" and disable the waiting. Note that when the "-p couchbase.syncMutationResponse=false" option is
+used, the measured results by YCSB can basically be thrown away. Still helpful sometimes during load phases to speed
+them up :)
+
+## Debugging Latency
+The Couchbase Java SDK has the ability to collect and dump different kinds of metrics which allow you to analyze
+performance during benchmarking and production. By default this option is disabled in the benchmark, but by setting
+`couchbase.networkMetricsInterval` and/or `couchbase.runtimeMetricsInterval` to something greater than 0 it will
+output the information as JSON into the configured logger. The number provides is the interval in seconds. If you are
+unsure what interval to pick, start with 10 or 30 seconds, depending on your runtime length.
+
+This is how such logs look like:
+
+```
+INFO: {"heap.used":{"init":268435456,"used":36500912,"committed":232259584,"max":3817865216},"gc.ps marksweep.collectionTime":0,"gc.ps scavenge.collectionTime":54,"gc.ps scavenge.collectionCount":17,"thread.count":26,"offHeap.used":{"init":2555904,"used":30865944,"committed":31719424,"max":-1},"gc.ps marksweep.collectionCount":0,"heap.pendingFinalize":0,"thread.peakCount":26,"event":{"name":"RuntimeMetrics","type":"METRIC"},"thread.startedCount":28}
+INFO: {"localhost/127.0.0.1:11210":{"BINARY":{"ReplaceRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":102,"90.0":136,"95.0":155,"99.0":244,"99.9":428},"min":55,"max":1564,"count":35787,"timeUnit":"MICROSECONDS"}}},"GetRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":74,"90.0":98,"95.0":110,"99.0":158,"99.9":358},"min":34,"max":2310,"count":35604,"timeUnit":"MICROSECONDS"}}},"GetBucketConfigRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":462,"90.0":462,"95.0":462,"99.0":462,"99.9":462},"min":460,"max":462,"count":1,"timeUnit":"MICROSECONDS"}}}}},"event":{"name":"NetworkLatencyMetrics","type":"METRIC"}}
+```
+
+It is recommended to either feed it into a program which can analyze and visualize JSON or just dump it into a JSON
+pretty printer and look at it manually. Since the output can be changed (only by changing the code at the moment), you
+can even configure to put those messages into another couchbase bucket and then analyze it through N1QL! You can learn
+more about this in general [in the official docs](http://developer.couchbase.com/documentation/server/4.0/sdks/java-2.2/event-bus-metrics.html).
+
+
+## Configuration Options
+Since no setup is the same and the goal of YCSB is to deliver realistic benchmarks, here are some setups that you can
+tune. Note that if you need more flexibility (let's say a custom transcoder), you still need to extend this driver and
+implement the facilities on your own.
+
+You can set the following properties (with the default settings applied):
+
+ - couchbase.host=127.0.0.1: The hostname from one server.
+ - couchbase.bucket=default: The bucket name to use.
+ - couchbase.password=: The password of the bucket.
+ - couchbase.syncMutationResponse=true: If mutations should wait for the response to complete.
+ - couchbase.persistTo=0: Persistence durability requirement
+ - couchbase.replicateTo=0: Replication durability requirement
+ - couchbase.upsert=false: Use upsert instead of insert or replace.
+ - couchbase.adhoc=false: If set to true, prepared statements are not used.
+ - couchbase.kv=true: If set to false, mutation operations will also be performed through N1QL.
+ - couchbase.maxParallelism=1: The server parallelism for all n1ql queries.
+ - couchbase.kvEndpoints=1: The number of KV sockets to open per server.
+ - couchbase.queryEndpoints=5: The number of N1QL Query sockets to open per server.
+ - couchbase.epoll=false: If Epoll instead of NIO should be used (only available for linux.
+ - couchbase.boost=3: If > 0 trades CPU for higher throughput. N is the number of event loops, ideally
+ set to the number of physical cores. Setting higher than that will likely degrade performance.
+ - couchbase.networkMetricsInterval=0: The interval in seconds when latency metrics will be logged.
+ - couchbase.runtimeMetricsInterval=0: The interval in seconds when runtime metrics will be logged.
\ No newline at end of file
diff --git a/couchbase/pom.xml b/couchbase2/pom.xml
similarity index 61%
copy from couchbase/pom.xml
copy to couchbase2/pom.xml
index 5f4780aa..a73ad4cb 100644
--- a/couchbase/pom.xml
+++ b/couchbase2/pom.xml
@@ -1,55 +1,48 @@
-
-
+
+
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
- couchbase-binding
- Couchbase Binding
+ couchbase2-binding
+ Couchbase Java SDK 2.x Binding
jar
com.couchbase.client
- couchbase-client
- ${couchbase.version}
+ java-client
+ ${couchbase2.version}
com.yahoo.ycsb
core
${project.version}
provided
-
- com.fasterxml.jackson.core
- jackson-databind
- 2.2.2
-
-
- org.slf4j
- slf4j-api
-
+
diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java
new file mode 100644
index 00000000..3d0bc039
--- /dev/null
+++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java
@@ -0,0 +1,939 @@
+/**
+ * Copyright (c) 2016 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.couchbase2;
+
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.resources.IoPoolShutdownHook;
+import com.couchbase.client.core.logging.CouchbaseLogger;
+import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
+import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig;
+import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig;
+import com.couchbase.client.core.metrics.LatencyMetricsCollectorConfig;
+import com.couchbase.client.core.metrics.MetricsCollectorConfig;
+import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonFactory;
+import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonGenerator;
+import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
+import com.couchbase.client.deps.com.fasterxml.jackson.databind.node.ObjectNode;
+import com.couchbase.client.deps.io.netty.channel.DefaultSelectStrategyFactory;
+import com.couchbase.client.deps.io.netty.channel.EventLoopGroup;
+import com.couchbase.client.deps.io.netty.channel.SelectStrategy;
+import com.couchbase.client.deps.io.netty.channel.SelectStrategyFactory;
+import com.couchbase.client.deps.io.netty.channel.epoll.EpollEventLoopGroup;
+import com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup;
+import com.couchbase.client.deps.io.netty.util.IntSupplier;
+import com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.PersistTo;
+import com.couchbase.client.java.ReplicateTo;
+import com.couchbase.client.java.document.Document;
+import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.document.json.JsonArray;
+import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
+import com.couchbase.client.java.error.TemporaryFailureException;
+import com.couchbase.client.java.query.*;
+import com.couchbase.client.java.transcoder.JacksonTransformers;
+import com.couchbase.client.java.util.Blocking;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.Status;
+import com.yahoo.ycsb.StringByteIterator;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import java.io.StringWriter;
+import java.io.Writer;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.*;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A class that wraps the 2.x Couchbase SDK to be used with YCSB.
+ *
+ *
The following options can be passed when using this database client to override the defaults.
+ *
+ *
+ * couchbase.host=127.0.0.1 The hostname from one server.
+ * couchbase.bucket=default The bucket name to use.
+ * couchbase.password= The password of the bucket.
+ * couchbase.syncMutationResponse=true If mutations should wait for the response to complete.
+ * couchbase.persistTo=0 Persistence durability requirement
+ * couchbase.replicateTo=0 Replication durability requirement
+ * couchbase.upsert=false Use upsert instead of insert or replace.
+ * couchbase.adhoc=false If set to true, prepared statements are not used.
+ * couchbase.kv=true If set to false, mutation operations will also be performed through N1QL.
+ * couchbase.maxParallelism=1 The server parallelism for all n1ql queries.
+ * couchbase.kvEndpoints=1 The number of KV sockets to open per server.
+ * couchbase.queryEndpoints=5 The number of N1QL Query sockets to open per server.
+ * couchbase.epoll=false If Epoll instead of NIO should be used (only available for linux.
+ * couchbase.boost=3 If > 0 trades CPU for higher throughput. N is the number of event loops, ideally
+ * set to the number of physical cores. Setting higher than that will likely degrade performance.
+ * couchbase.networkMetricsInterval=0 The interval in seconds when latency metrics will be logged.
+ * couchbase.runtimeMetricsInterval=0 The interval in seconds when runtime metrics will be logged.
+ *
+ */
+public class Couchbase2Client extends DB {
+
+ static {
+ // No need to send the full encoded_plan for this benchmark workload, less network overhead!
+ System.setProperty("com.couchbase.query.encodedPlanEnabled", "false");
+ }
+
+ private static final String SEPARATOR = ":";
+ private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class);
+ private static final Object INIT_COORDINATOR = new Object();
+
+ private static volatile CouchbaseEnvironment env = null;
+
+ private Cluster cluster;
+ private Bucket bucket;
+ private String bucketName;
+ private boolean upsert;
+ private PersistTo persistTo;
+ private ReplicateTo replicateTo;
+ private boolean syncMutResponse;
+ private boolean epoll;
+ private long kvTimeout;
+ private boolean adhoc;
+ private boolean kv;
+ private int maxParallelism;
+ private String host;
+ private int kvEndpoints;
+ private int queryEndpoints;
+ private int boost;
+ private int networkMetricsInterval;
+ private int runtimeMetricsInterval;
+ private String scanAllQuery;
+
+ @Override
+ public void init() throws DBException {
+ Properties props = getProperties();
+
+ host = props.getProperty("couchbase.host", "127.0.0.1");
+ bucketName = props.getProperty("couchbase.bucket", "default");
+ String bucketPassword = props.getProperty("couchbase.password", "");
+
+ upsert = props.getProperty("couchbase.upsert", "false").equals("true");
+ persistTo = parsePersistTo(props.getProperty("couchbase.persistTo", "0"));
+ replicateTo = parseReplicateTo(props.getProperty("couchbase.replicateTo", "0"));
+ syncMutResponse = props.getProperty("couchbase.syncMutationResponse", "true").equals("true");
+ adhoc = props.getProperty("couchbase.adhoc", "false").equals("true");
+ kv = props.getProperty("couchbase.kv", "true").equals("true");
+ maxParallelism = Integer.parseInt(props.getProperty("couchbase.maxParallelism", "1"));
+ kvEndpoints = Integer.parseInt(props.getProperty("couchbase.kvEndpoints", "1"));
+ queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "1"));
+ epoll = props.getProperty("couchbase.epoll", "false").equals("true");
+ boost = Integer.parseInt(props.getProperty("couchbase.boost", "3"));
+ networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0"));
+ runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0"));
+ scanAllQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
+
+ try {
+ synchronized (INIT_COORDINATOR) {
+ if (env == null) {
+
+ LatencyMetricsCollectorConfig latencyConfig = networkMetricsInterval <= 0
+ ? DefaultLatencyMetricsCollectorConfig.disabled()
+ : DefaultLatencyMetricsCollectorConfig
+ .builder()
+ .emitFrequency(networkMetricsInterval)
+ .emitFrequencyUnit(TimeUnit.SECONDS)
+ .build();
+
+ MetricsCollectorConfig runtimeConfig = runtimeMetricsInterval <= 0
+ ? DefaultMetricsCollectorConfig.disabled()
+ : DefaultMetricsCollectorConfig.create(runtimeMetricsInterval, TimeUnit.SECONDS);
+
+ DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment
+ .builder()
+ .queryEndpoints(queryEndpoints)
+ .callbacksOnIoPool(true)
+ .runtimeMetricsCollectorConfig(runtimeConfig)
+ .networkLatencyMetricsCollectorConfig(latencyConfig)
+ .socketConnectTimeout(10000) // 10 secs socket connect timeout
+ .connectTimeout(30000) // 30 secs overall bucket open timeout
+ .kvTimeout(10000) // 10 instead of 2.5s for KV ops
+ .kvEndpoints(kvEndpoints);
+
+ // Tune boosting and epoll based on settings
+ SelectStrategyFactory factory = boost > 0 ?
+ new BackoffSelectStrategyFactory() : DefaultSelectStrategyFactory.INSTANCE;
+
+ int poolSize = boost > 0 ? boost : Integer.parseInt(
+ System.getProperty("com.couchbase.ioPoolSize", Integer.toString(DefaultCoreEnvironment.IO_POOL_SIZE))
+ );
+ ThreadFactory threadFactory = new DefaultThreadFactory("cb-io", true);
+
+ EventLoopGroup group = epoll ? new EpollEventLoopGroup(poolSize, threadFactory, factory)
+ : new NioEventLoopGroup(poolSize, threadFactory, SelectorProvider.provider(), factory);
+ builder.ioPool(group, new IoPoolShutdownHook(group));
+
+ env = builder.build();
+ logParams();
+ }
+ }
+
+ cluster = CouchbaseCluster.create(env, host);
+ bucket = cluster.openBucket(bucketName, bucketPassword);
+ kvTimeout = env.kvTimeout();
+ } catch (Exception ex) {
+ throw new DBException("Could not connect to Couchbase Bucket.", ex);
+ }
+
+ if (!kv && !syncMutResponse) {
+ throw new DBException("Not waiting for N1QL responses on mutations not yet implemented.");
+ }
+ }
+
+ /**
+ * Helper method to log the CLI params so that on the command line debugging is easier.
+ */
+ private void logParams() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("host=").append(host);
+ sb.append(", bucket=").append(bucketName);
+ sb.append(", upsert=").append(upsert);
+ sb.append(", persistTo=").append(persistTo);
+ sb.append(", replicateTo=").append(replicateTo);
+ sb.append(", syncMutResponse=").append(syncMutResponse);
+ sb.append(", adhoc=").append(adhoc);
+ sb.append(", kv=").append(kv);
+ sb.append(", maxParallelism=").append(maxParallelism);
+ sb.append(", queryEndpoints=").append(queryEndpoints);
+ sb.append(", kvEndpoints=").append(kvEndpoints);
+ sb.append(", queryEndpoints=").append(queryEndpoints);
+ sb.append(", epoll=").append(epoll);
+ sb.append(", boost=").append(boost);
+ sb.append(", networkMetricsInterval=").append(networkMetricsInterval);
+ sb.append(", runtimeMetricsInterval=").append(runtimeMetricsInterval);
+
+ LOGGER.info("===> Using Params: " + sb.toString());
+ }
+
+ @Override
+ public Status read(final String table, final String key, Set fields,
+ final HashMap result) {
+ try {
+ String docId = formatId(table, key);
+ if (kv) {
+ return readKv(docId, fields, result);
+ } else {
+ return readN1ql(docId, fields, result);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #read(String, String, Set, HashMap)} operation via Key/Value ("get").
+ *
+ * @param docId the document ID
+ * @param fields the fields to be loaded
+ * @param result the result map where the doc needs to be converted into
+ * @return The result of the operation.
+ */
+ private Status readKv(final String docId, final Set fields, final HashMap result)
+ throws Exception {
+ RawJsonDocument loaded = bucket.get(docId, RawJsonDocument.class);
+ if (loaded == null) {
+ return Status.NOT_FOUND;
+ }
+ decode(loaded.content(), fields, result);
+ return Status.OK;
+ }
+
+ /**
+ * Performs the {@link #read(String, String, Set, HashMap)} operation via N1QL ("SELECT").
+ *
+ * If this option should be used, the "-p couchbase.kv=false" property must be set.
+ *
+ * @param docId the document ID
+ * @param fields the fields to be loaded
+ * @param result the result map where the doc needs to be converted into
+ * @return The result of the operation.
+ */
+ private Status readN1ql(final String docId, Set fields, final HashMap result)
+ throws Exception {
+ String readQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` USE KEYS [$1]";
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ readQuery,
+ JsonArray.from(docId),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new DBException("Error while parsing N1QL Result. Query: " + readQuery
+ + ", Errors: " + queryResult.errors());
+ }
+
+ N1qlQueryRow row;
+ try {
+ row = queryResult.rows().next();
+ } catch (NoSuchElementException ex) {
+ return Status.NOT_FOUND;
+ }
+
+ JsonObject content = row.value();
+ if (fields == null) {
+ content = content.getObject(bucketName); // n1ql result set scoped under *.bucketName
+ fields = content.getNames();
+ }
+
+ for (String field : fields) {
+ Object value = content.get(field);
+ result.put(field, new StringByteIterator(value != null ? value.toString() : ""));
+ }
+
+ return Status.OK;
+ }
+
+ @Override
+ public Status update(final String table, final String key, final HashMap values) {
+ if (upsert) {
+ return upsert(table, key, values);
+ }
+
+ try {
+ String docId = formatId(table, key);
+ if (kv) {
+ return updateKv(docId, values);
+ } else {
+ return updateN1ql(docId, values);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #update(String, String, HashMap)} operation via Key/Value ("replace").
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status updateKv(final String docId, final HashMap values) {
+ waitForMutationResponse(bucket.async().replace(
+ RawJsonDocument.create(docId, encode(values)),
+ persistTo,
+ replicateTo
+ ));
+ return Status.OK;
+ }
+
+ /**
+ * Performs the {@link #update(String, String, HashMap)} operation via N1QL ("UPDATE").
+ *
+ * If this option should be used, the "-p couchbase.kv=false" property must be set.
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status updateN1ql(final String docId, final HashMap values)
+ throws Exception {
+ String fields = encodeN1qlFields(values);
+ String updateQuery = "UPDATE `" + bucketName + "` USE KEYS [$1] SET " + fields;
+
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ updateQuery,
+ JsonArray.from(docId),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new DBException("Error while parsing N1QL Result. Query: " + updateQuery
+ + ", Errors: " + queryResult.errors());
+ }
+ return Status.OK;
+ }
+
+ @Override
+ public Status insert(final String table, final String key, final HashMap values) {
+ if (upsert) {
+ return upsert(table, key, values);
+ }
+
+ try {
+ String docId = formatId(table, key);
+ if (kv) {
+ return insertKv(docId, values);
+ } else {
+ return insertN1ql(docId, values);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #insert(String, String, HashMap)} operation via Key/Value ("INSERT").
+ *
+ * Note that during the "load" phase it makes sense to retry TMPFAILS (so that even if the server is
+ * overloaded temporarily the ops will succeed eventually). The current code will retry TMPFAILs
+ * for maximum of one minute and then bubble up the error.
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status insertKv(final String docId, final HashMap values) {
+ int tries = 60; // roughly 60 seconds with the 1 second sleep, not 100% accurate.
+
+ for(int i = 0; i < tries; i++) {
+ try {
+ waitForMutationResponse(bucket.async().insert(
+ RawJsonDocument.create(docId, encode(values)),
+ persistTo,
+ replicateTo
+ ));
+ return Status.OK;
+ } catch (TemporaryFailureException ex) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while sleeping on TMPFAIL backoff.", ex);
+ }
+ }
+ }
+
+ throw new RuntimeException("Still receiving TMPFAIL from the server after trying " + tries + " times. " +
+ "Check your server.");
+ }
+
+ /**
+ * Performs the {@link #insert(String, String, HashMap)} operation via N1QL ("INSERT").
+ *
+ * If this option should be used, the "-p couchbase.kv=false" property must be set.
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status insertN1ql(final String docId, final HashMap values)
+ throws Exception {
+ String insertQuery = "INSERT INTO `" + bucketName + "`(KEY,VALUE) VALUES ($1,$2)";
+
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ insertQuery,
+ JsonArray.from(docId, valuesToJsonObject(values)),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new DBException("Error while parsing N1QL Result. Query: " + insertQuery
+ + ", Errors: " + queryResult.errors());
+ }
+ return Status.OK;
+ }
+
+ /**
+ * Performs an upsert instead of insert or update using either Key/Value or N1QL.
+ *
+ * If this option should be used, the "-p couchbase.upsert=true" property must be set.
+ *
+ * @param table The name of the table
+ * @param key The record key of the record to insert.
+ * @param values A HashMap of field/value pairs to insert in the record
+ * @return The result of the operation.
+ */
+ private Status upsert(final String table, final String key, final HashMap values) {
+ try {
+ String docId = formatId(table, key);
+ if (kv) {
+ return upsertKv(docId, values);
+ } else {
+ return upsertN1ql(docId, values);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #upsert(String, String, HashMap)} operation via Key/Value ("upsert").
+ *
+ * If this option should be used, the "-p couchbase.upsert=true" property must be set.
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status upsertKv(final String docId, final HashMap values) {
+ waitForMutationResponse(bucket.async().upsert(
+ RawJsonDocument.create(docId, encode(values)),
+ persistTo,
+ replicateTo
+ ));
+ return Status.OK;
+ }
+
+ /**
+ * Performs the {@link #upsert(String, String, HashMap)} operation via N1QL ("UPSERT").
+ *
+ * If this option should be used, the "-p couchbase.upsert=true -p couchbase.kv=false" properties must be set.
+ *
+ * @param docId the document ID
+ * @param values the values to update the document with.
+ * @return The result of the operation.
+ */
+ private Status upsertN1ql(final String docId, final HashMap values)
+ throws Exception {
+ String upsertQuery = "UPSERT INTO `" + bucketName + "`(KEY,VALUE) VALUES ($1,$2)";
+
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ upsertQuery,
+ JsonArray.from(docId, valuesToJsonObject(values)),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new DBException("Error while parsing N1QL Result. Query: " + upsertQuery
+ + ", Errors: " + queryResult.errors());
+ }
+ return Status.OK;
+ }
+
+ @Override
+ public Status delete(final String table, final String key) {
+ try {
+ String docId = formatId(table, key);
+ if (kv) {
+ return deleteKv(docId);
+ } else {
+ return deleteN1ql(docId);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #delete(String, String)} (String, String)} operation via Key/Value ("remove").
+ *
+ * @param docId the document ID.
+ * @return The result of the operation.
+ */
+ private Status deleteKv(final String docId) {
+ waitForMutationResponse(bucket.async().remove(
+ docId,
+ persistTo,
+ replicateTo
+ ));
+ return Status.OK;
+ }
+
+ /**
+ * Performs the {@link #delete(String, String)} (String, String)} operation via N1QL ("DELETE").
+ *
+ * If this option should be used, the "-p couchbase.kv=false" property must be set.
+ *
+ * @param docId the document ID.
+ * @return The result of the operation.
+ */
+ private Status deleteN1ql(final String docId) throws Exception {
+ String deleteQuery = "DELETE FROM `" + bucketName + "` USE KEYS [$1]";
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ deleteQuery,
+ JsonArray.from(docId),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new DBException("Error while parsing N1QL Result. Query: " + deleteQuery
+ + ", Errors: " + queryResult.errors());
+ }
+ return Status.OK;
+ }
+
+ @Override
+ public Status scan(final String table, final String startkey, final int recordcount, final Set fields,
+ final Vector> result) {
+ try {
+ if (fields == null || fields.isEmpty()) {
+ return scanAllFields(table, startkey, recordcount, result);
+ } else {
+ return scanSpecificFields(table, startkey, recordcount, fields, result);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return Status.ERROR;
+ }
+ }
+
+ /**
+ * Performs the {@link #scan(String, String, int, Set, Vector)} operation, optimized for all fields.
+ *
+ * Since the full document bodies need to be loaded anyways, it makes sense to just grab the document IDs
+ * from N1QL and then perform the bulk loading via KV for better performance. This is a usual pattern with
+ * Couchbase and shows the benefits of using both N1QL and KV together.
+ *
+ * @param table The name of the table
+ * @param startkey The record key of the first record to read.
+ * @param recordcount The number of records to read
+ * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+ * @return The result of the operation.
+ */
+ private Status scanAllFields(final String table, final String startkey, final int recordcount,
+ final Vector> result) {
+ final List> data = new ArrayList>(recordcount);
+
+ bucket.async()
+ .query(N1qlQuery.parameterized(
+ scanAllQuery,
+ JsonArray.from(formatId(table, startkey), recordcount),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ))
+ .doOnNext(new Action1() {
+ @Override
+ public void call(AsyncN1qlQueryResult result) {
+ if (!result.parseSuccess()) {
+ throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery
+ + ", Errors: " + result.errors());
+ }
+ }
+ })
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(AsyncN1qlQueryResult result) {
+ return result.rows();
+ }
+ })
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(AsyncN1qlQueryRow row) {
+ String id = new String(row.byteValue());
+ return bucket.async().get(
+ id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')),
+ RawJsonDocument.class
+ );
+ }
+ })
+ .map(new Func1>() {
+ @Override
+ public HashMap call(RawJsonDocument document) {
+ HashMap tuple = new HashMap();
+ decode(document.content(), null, tuple);
+ return tuple;
+ }
+ })
+ .toBlocking()
+ .forEach(new Action1>() {
+ @Override
+ public void call(HashMap tuple) {
+ data.add(tuple);
+ }
+ });
+
+ result.addAll(data);
+ return Status.OK;
+ }
+
+ /**
+ * Performs the {@link #scan(String, String, int, Set, Vector)} operation N1Ql only for a subset of the fields.
+ *
+ * @param table The name of the table
+ * @param startkey The record key of the first record to read.
+ * @param recordcount The number of records to read
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+ * @return The result of the operation.
+ */
+ private Status scanSpecificFields(final String table, final String startkey, final int recordcount,
+ final Set fields, final Vector> result) {
+ String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName
+ + "` WHERE meta().id >= '$1' LIMIT $2";
+ N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
+ scanSpecQuery,
+ JsonArray.from(formatId(table, startkey), recordcount),
+ N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+ ));
+
+ if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
+ throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery
+ + ", Errors: " + queryResult.errors());
+ }
+
+ boolean allFields = fields == null || fields.isEmpty();
+ result.ensureCapacity(recordcount);
+
+ for (N1qlQueryRow row : queryResult) {
+ JsonObject value = row.value();
+ if (fields == null) {
+ value = value.getObject(bucketName);
+ }
+ Set f = allFields ? value.getNames() : fields;
+ HashMap tuple = new HashMap(f.size());
+ for (String field : f) {
+ tuple.put(field, new StringByteIterator(value.getString(field)));
+ }
+ result.add(tuple);
+ }
+ return Status.OK;
+ }
+
+ /**
+ * Helper method to block on the response, depending on the property set.
+ *
+ * By default, since YCSB is sync the code will always wait for the operation to complete. In some
+ * cases it can be useful to just "drive load" and disable the waiting. Note that when the
+ * "-p couchbase.syncMutationResponse=false" option is used, the measured results by YCSB can basically
+ * be thrown away. Still helpful sometimes during load phases to speed them up :)
+ *
+ * @param input the async input observable.
+ */
+ private void waitForMutationResponse(final Observable extends Document>> input) {
+ if (!syncMutResponse) {
+ input.subscribe(new Subscriber>() {
+ @Override
+ public void onCompleted() {
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+
+ @Override
+ public void onNext(Document> document) {
+ }
+ });
+ } else {
+ Blocking.blockForSingle(input, kvTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Helper method to turn the values into a String, used with {@link #upsertN1ql(String, HashMap)}.
+ *
+ * @param values the values to encode.
+ * @return the encoded string.
+ */
+ private static String encodeN1qlFields(final HashMap values) {
+ if (values.isEmpty()) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry entry : values.entrySet()) {
+ String raw = entry.getValue().toString();
+ String escaped = raw.replace("\"", "\\\"").replace("\'", "\\\'");
+ sb.append(entry.getKey()).append("=\"").append(escaped).append("\" ");
+ }
+ String toReturn = sb.toString();
+ return toReturn.substring(0, toReturn.length() - 1);
+ }
+
+ /**
+ * Helper method to turn the map of values into a {@link JsonObject} for further use.
+ *
+ * @param values the values to transform.
+ * @return the created json object.
+ */
+ private static JsonObject valuesToJsonObject(final HashMap values) {
+ JsonObject result = JsonObject.create();
+ for (Map.Entry entry : values.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().toString());
+ }
+ return result;
+ }
+
+ /**
+ * Helper method to join the set of fields into a String suitable for N1QL.
+ *
+ * @param fields the fields to join.
+ * @return the joined fields as a String.
+ */
+ private static String joinFields(final Set fields) {
+ if (fields == null || fields.isEmpty()) {
+ return "*";
+ }
+ StringBuilder builder = new StringBuilder();
+ for (String f : fields) {
+ builder.append("`").append(f).append("`").append(",");
+ }
+ String toReturn = builder.toString();
+ return toReturn.substring(0, toReturn.length() - 1);
+ }
+
+ /**
+ * Helper method to turn the prefix and key into a proper document ID.
+ *
+ * @param prefix the prefix (table).
+ * @param key the key itself.
+ * @return a document ID that can be used with Couchbase.
+ */
+ private static String formatId(final String prefix, final String key) {
+ return prefix + SEPARATOR + key;
+ }
+
+ /**
+ * Helper method to parse the "ReplicateTo" property on startup.
+ *
+ * @param property the proeprty to parse.
+ * @return the parsed setting.
+ */
+ private static ReplicateTo parseReplicateTo(final String property) throws DBException {
+ int value = Integer.parseInt(property);
+
+ switch (value) {
+ case 0:
+ return ReplicateTo.NONE;
+ case 1:
+ return ReplicateTo.ONE;
+ case 2:
+ return ReplicateTo.TWO;
+ case 3:
+ return ReplicateTo.THREE;
+ default:
+ throw new DBException("\"couchbase.replicateTo\" must be between 0 and 3");
+ }
+ }
+
+ /**
+ * Helper method to parse the "PersistTo" property on startup.
+ *
+ * @param property the proeprty to parse.
+ * @return the parsed setting.
+ */
+ private static PersistTo parsePersistTo(final String property) throws DBException {
+ int value = Integer.parseInt(property);
+
+ switch (value) {
+ case 0:
+ return PersistTo.NONE;
+ case 1:
+ return PersistTo.ONE;
+ case 2:
+ return PersistTo.TWO;
+ case 3:
+ return PersistTo.THREE;
+ case 4:
+ return PersistTo.FOUR;
+ default:
+ throw new DBException("\"couchbase.persistTo\" must be between 0 and 4");
+ }
+ }
+
+ /**
+ * Decode the String from server and pass it into the decoded destination.
+ *
+ * @param source the loaded object.
+ * @param fields the fields to check.
+ * @param dest the result passed back to YCSB.
+ */
+ private void decode(final String source, final Set fields,
+ final HashMap dest) {
+ try {
+ JsonNode json = JacksonTransformers.MAPPER.readTree(source);
+ boolean checkFields = fields != null && !fields.isEmpty();
+ for (Iterator> jsonFields = json.fields(); jsonFields.hasNext();) {
+ Map.Entry jsonField = jsonFields.next();
+ String name = jsonField.getKey();
+ if (checkFields && fields.contains(name)) {
+ continue;
+ }
+ JsonNode jsonValue = jsonField.getValue();
+ if (jsonValue != null && !jsonValue.isNull()) {
+ dest.put(name, new StringByteIterator(jsonValue.asText()));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not decode JSON");
+ }
+ }
+
+ /**
+ * Encode the source into a String for storage.
+ *
+ * @param source the source value.
+ * @return the encoded string.
+ */
+ private String encode(final HashMap source) {
+ HashMap stringMap = StringByteIterator.getStringMap(source);
+ ObjectNode node = JacksonTransformers.MAPPER.createObjectNode();
+ for (Map.Entry pair : stringMap.entrySet()) {
+ node.put(pair.getKey(), pair.getValue());
+ }
+ JsonFactory jsonFactory = new JsonFactory();
+ Writer writer = new StringWriter();
+ try {
+ JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer);
+ JacksonTransformers.MAPPER.writeTree(jsonGenerator, node);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not encode JSON value");
+ }
+ return writer.toString();
+ }
+}
+
+/**
+ * Factory for the {@link BackoffSelectStrategy} to be used with boosting.
+ */
+class BackoffSelectStrategyFactory implements SelectStrategyFactory {
+ @Override
+ public SelectStrategy newSelectStrategy() {
+ return new BackoffSelectStrategy();
+ }
+}
+
+/**
+ * Custom IO select strategy which trades CPU for throughput, used with the boost setting.
+ */
+class BackoffSelectStrategy implements SelectStrategy {
+
+ private int counter = 0;
+
+ @Override
+ public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks) throws Exception {
+ int selectNowResult = supplier.get();
+ if (hasTasks || selectNowResult != 0) {
+ counter = 0;
+ return selectNowResult;
+ }
+ counter++;
+
+ if (counter > 2000) {
+ LockSupport.parkNanos(1);
+ } else if (counter > 3000) {
+ Thread.yield();
+ } else if (counter > 4000) {
+ LockSupport.parkNanos(1000);
+ } else if (counter > 5000) {
+ // defer to blocking select
+ counter = 0;
+ return SelectStrategy.SELECT;
+ }
+
+ return SelectStrategy.CONTINUE;
+ }
+}
diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java
new file mode 100644
index 00000000..0eb3b399
--- /dev/null
+++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 - 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for Couchbase , new driver.
+ */
+package com.yahoo.ycsb.db.couchbase2;
+
diff --git a/distribution/pom.xml b/distribution/pom.xml
index a6f21943..d3888c9e 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -1,197 +1,217 @@
4.0.0
com.yahoo.ycsb
root
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
ycsb
YCSB Release Distribution Builder
pom
This module creates the release package of the YCSB with all DB library bindings.
It is only used by the build process and does not contain any real
code of itself.
com.yahoo.ycsb
core
${project.version}
com.yahoo.ycsb
accumulo-binding
${project.version}
com.yahoo.ycsb
aerospike-binding
${project.version}
+
+ com.yahoo.ycsb
+ asynchbase-binding
+ ${project.version}
+
com.yahoo.ycsb
cassandra-binding
${project.version}
com.yahoo.ycsb
couchbase-binding
${project.version}
+
+ com.yahoo.ycsb
+ couchbase2-binding
+ ${project.version}
+
com.yahoo.ycsb
dynamodb-binding
${project.version}
com.yahoo.ycsb
elasticsearch-binding
${project.version}
com.yahoo.ycsb
geode-binding
${project.version}
com.yahoo.ycsb
googledatastore-binding
${project.version}
+
+ com.yahoo.ycsb
+ googlebigtable-binding
+ ${project.version}
+
com.yahoo.ycsb
hbase094-binding
${project.version}
com.yahoo.ycsb
hbase098-binding
${project.version}
com.yahoo.ycsb
hbase10-binding
${project.version}
com.yahoo.ycsb
hypertable-binding
${project.version}
com.yahoo.ycsb
infinispan-binding
${project.version}
com.yahoo.ycsb
jdbc-binding
${project.version}
com.yahoo.ycsb
kudu-binding
${project.version}
com.yahoo.ycsb
memcached-binding
${project.version}
com.yahoo.ycsb
mongodb-binding
${project.version}
com.yahoo.ycsb
nosqldb-binding
${project.version}
com.yahoo.ycsb
orientdb-binding
${project.version}
com.yahoo.ycsb
rados-binding
${project.version}
com.yahoo.ycsb
redis-binding
${project.version}
+
+ com.yahoo.ycsb
+ riak-binding
+ ${project.version}
+
com.yahoo.ycsb
s3-binding
${project.version}
com.yahoo.ycsb
solr-binding
${project.version}
com.yahoo.ycsb
tarantool-binding
${project.version}
org.apache.maven.plugins
maven-assembly-plugin
${maven.assembly.version}
src/main/assembly/distribution.xml
false
posix
package
single
diff --git a/dynamodb/pom.xml b/dynamodb/pom.xml
index 593371a6..3aa162c6 100644
--- a/dynamodb/pom.xml
+++ b/dynamodb/pom.xml
@@ -1,77 +1,77 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
dynamodb-binding
DynamoDB DB Binding
false
com.amazonaws
aws-java-sdk
1.10.48
log4j
log4j
1.2.17
com.yahoo.ycsb
core
${project.version}
provided
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
true
../checkstyle.xml
true
true
validate
validate
checkstyle
diff --git a/elasticsearch/README.md b/elasticsearch/README.md
index 344ea9c9..157ccec0 100644
--- a/elasticsearch/README.md
+++ b/elasticsearch/README.md
@@ -1,90 +1,79 @@
## Quick Start
This section describes how to run YCSB on Elasticsearch running locally.
### 1. Set Up YCSB
Clone the YCSB git repository and compile:
git clone git://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn clean package
### 2. Run YCSB
Now you are ready to run! First, load the data:
./bin/ycsb load elasticsearch -s -P workloads/workloada
Then, run the workload:
./bin/ycsb run elasticsearch -s -P workloads/workloada
For further configuration see below:
### Defaults Configuration
The default setting for the Elasticsearch node that is created is as follows:
cluster.name=es.ycsb.cluster
- node.local=true
- path.data=$TEMP_DIR/esdata
- discovery.zen.ping.multicast.enabled=false
- index.mapping._id.indexed=true
- index.gateway.type=none
- gateway.type=none
- index.number_of_shards=1
- index.number_of_replicas=0
es.index.key=es.ycsb
+ es.number_of_shards=1
+ es.number_of_replicas=0
+ es.remote=false
+ es.newdb=false
+ es.hosts.list=localhost:9200 (only applies if es.remote=true)
### Custom Configuration
If you wish to customize the settings used to create the Elasticsearch node
you can created a new property file that contains your desired Elasticsearch
node settings and pass it in via the parameter to 'bin/ycsb' script. Note that
the default properties will be kept if you don't explicitly overwrite them.
Assuming that we have a properties file named "myproperties.data" that contains
custom Elasticsearch node configuration you can execute the following to
pass it into the Elasticsearch client:
./bin/ycsb run elasticsearch -P workloads/workloada -P myproperties.data -s
-
-If you wish to use a in-memory store type rather than the default disk store add
-the following properties to your custom properties file. For a large number of
-insert operations insure that you have sufficient memory on your test system
-otherwise you will run out of memory.
-
- index.store.type=memory
- index.store.fs.memory.enabled=true
- cache.memory.small_buffer_size=4mb
- cache.memory.large_cache_size=1024mb
-
If you wish to change the default index name you can set the following property:
es.index.key=my_index_key
-### Troubleshoot
-If you encounter error messages such as :
-"Primary shard is not active or isn't assigned is a known node."
+If you wish to run against a remote cluster you can set the following property:
+
+ es.remote=true
+
+By default this will use localhost:9300 as a seed node to discover the cluster.
+You can also specify
-Try removing /tmp/esdata/ folder.
- rm -rf /tmp/esdata
+ es.hosts.list=(\w+:\d+)+
+(a comma-separated list of host/port pairs) to change this.
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index f285dab2..90598be7 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -1,60 +1,60 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
elasticsearch-binding
Elasticsearch Binding
jar
- 2.2.0
+ 2.3.2
net.java.dev.jna
jna
4.1.0
com.yahoo.ycsb
core
${project.version}
provided
org.elasticsearch
elasticsearch
${elasticsearch-version}
org.testng
testng
6.1.1
test
diff --git a/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticsearchClient.java b/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticsearchClient.java
index 1d79e3c2..6a95d9ce 100644
--- a/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticsearchClient.java
+++ b/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticsearchClient.java
@@ -1,362 +1,360 @@
/**
* Copyright (c) 2012 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import static org.elasticsearch.common.settings.Settings.Builder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
/**
* Elasticsearch client for YCSB framework.
*
*
* Default properties to set:
*
*
- * es.cluster.name = es.ycsb.cluster
- * es.client = true
+ * cluster.name = es.ycsb.cluster
* es.index.key = es.ycsb
+ * es.number_of_shards = 1
+ * es.number_of_replicas = 0
*
- *
- * @author Sharmarke Aden
- *
*/
public class ElasticsearchClient extends DB {
- public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
- public static final String DEFAULT_INDEX_KEY = "es.ycsb";
- public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
+ private static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
+ private static final String DEFAULT_INDEX_KEY = "es.ycsb";
+ private static final String DEFAULT_REMOTE_HOST = "localhost:9300";
+ private static final int NUMBER_OF_SHARDS = 1;
+ private static final int NUMBER_OF_REPLICAS = 0;
private Node node;
private Client client;
private String indexKey;
private Boolean remoteMode;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
Properties props = getProperties();
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
- String clusterName =
- props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
+
+ int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
+ int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
+
// Check if transport client needs to be used (To connect to multiple
// elasticsearch nodes)
- remoteMode = Boolean
- .parseBoolean(props.getProperty("elasticsearch.remote", "false"));
- Boolean newdb =
- Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
+ remoteMode = Boolean.parseBoolean(props.getProperty("es.remote", "false"));
+ Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
Builder settings = Settings.settingsBuilder()
- .put("node.local", "true")
- .put("path.data", System.getProperty("java.io.tmpdir") + "/esdata")
- .put("discovery.zen.ping.multicast.enabled", "false")
- .put("index.mapping._id.indexed", "true")
- .put("index.gateway.type", "none")
- .put("index.number_of_shards", "1")
- .put("index.number_of_replicas", "0")
+ .put("cluster.name", DEFAULT_CLUSTER_NAME)
+ .put("node.local", Boolean.toString(!remoteMode))
.put("path.home", System.getProperty("java.io.tmpdir"));
// if properties file contains elasticsearch user defined properties
// add it to the settings file (will overwrite the defaults).
settings.put(props);
- System.out.println(
- "Elasticsearch starting node = " + settings.get("cluster.name"));
- System.out
- .println("Elasticsearch node data path = " + settings.get("path.data"));
- System.out.println("Elasticsearch Remote Mode = " + remoteMode);
+ final String clusterName = settings.get("cluster.name");
+ System.err.println("Elasticsearch starting node = " + clusterName);
+ System.err.println("Elasticsearch node path.home = " + settings.get("path.home"));
+ System.err.println("Elasticsearch Remote Mode = " + remoteMode);
// Remote mode support for connecting to remote elasticsearch cluster
if (remoteMode) {
settings.put("client.transport.sniff", true)
.put("client.transport.ignore_cluster_name", false)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s");
// Default it to localhost:9300
- String[] nodeList =
- props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST)
- .split(",");
- System.out.println("Elasticsearch Remote Hosts = "
- + props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
- TransportClient tClient = TransportClient.builder()
- .settings(settings).build();
+ String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
+ System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
+ TransportClient tClient = TransportClient.builder().settings(settings).build();
for (String h : nodeList) {
String[] nodes = h.split(":");
try {
tClient.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName(nodes[0]),
Integer.parseInt(nodes[1])
));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Unable to parse port number.", e);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Unable to Identify host.", e);
}
}
client = tClient;
} else { // Start node only if transport client mode is disabled
node = nodeBuilder().clusterName(clusterName).settings(settings).node();
node.start();
client = node.client();
}
- //wait for shards to be ready
- client.admin().cluster()
- .health(new ClusterHealthRequest("lists").waitForActiveShards(1))
- .actionGet();
- if (newdb) {
+ final boolean exists =
+ client.admin().indices()
+ .exists(Requests.indicesExistsRequest(indexKey)).actionGet()
+ .isExists();
+ if (exists && newdb) {
client.admin().indices().prepareDelete(indexKey).execute().actionGet();
- client.admin().indices().prepareCreate(indexKey).execute().actionGet();
- } else {
- boolean exists = client.admin().indices()
- .exists(Requests.indicesExistsRequest(indexKey)).actionGet()
- .isExists();
- if (!exists) {
- client.admin().indices().prepareCreate(indexKey).execute().actionGet();
- }
}
+ if (!exists || newdb) {
+ client.admin().indices().create(
+ new CreateIndexRequest(indexKey)
+ .settings(
+ Settings.builder()
+ .put("index.number_of_shards", numberOfShards)
+ .put("index.number_of_replicas", numberOfReplicas)
+ .put("index.mapping._id.indexed", true)
+ )).actionGet();
+ }
+ client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
+ }
+
+ private int parseIntegerProperty(Properties properties, String key, int defaultValue) {
+ String value = properties.getProperty(key);
+ return value == null ? defaultValue : Integer.parseInt(value);
}
@Override
public void cleanup() throws DBException {
if (!remoteMode) {
if (!node.isClosed()) {
client.close();
node.close();
}
} else {
client.close();
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
- public Status insert(String table, String key,
- HashMap values) {
+ public Status insert(String table, String key, HashMap values) {
try {
final XContentBuilder doc = jsonBuilder().startObject();
- for (Entry entry : StringByteIterator.getStringMap(values)
- .entrySet()) {
+ for (Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
doc.field(entry.getKey(), entry.getValue());
}
doc.endObject();
- client.prepareIndex(indexKey, table, key).setSource(doc).execute()
- .actionGet();
+ client.prepareIndex(indexKey, table, key).setSource(doc).execute().actionGet();
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
+ return Status.ERROR;
}
- return Status.ERROR;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public Status delete(String table, String key) {
try {
- client.prepareDelete(indexKey, table, key).execute().actionGet();
- return Status.OK;
+ DeleteResponse response = client.prepareDelete(indexKey, table, key).execute().actionGet();
+ if (response.isFound()) {
+ return Status.OK;
+ } else {
+ return Status.NOT_FOUND;
+ }
} catch (Exception e) {
e.printStackTrace();
+ return Status.ERROR;
}
- return Status.ERROR;
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
- public Status read(String table, String key, Set fields,
- HashMap result) {
+ public Status read(String table, String key, Set fields, HashMap result) {
try {
- final GetResponse response =
- client.prepareGet(indexKey, table, key).execute().actionGet();
+ final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
if (response.isExists()) {
if (fields != null) {
for (String field : fields) {
result.put(field, new StringByteIterator(
(String) response.getSource().get(field)));
}
} else {
for (String field : response.getSource().keySet()) {
result.put(field, new StringByteIterator(
(String) response.getSource().get(field)));
}
}
return Status.OK;
+ } else {
+ return Status.NOT_FOUND;
}
} catch (Exception e) {
e.printStackTrace();
+ return Status.ERROR;
}
- return Status.ERROR;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
- public Status update(String table, String key,
- HashMap values) {
+ public Status update(String table, String key, HashMap values) {
try {
- final GetResponse response =
- client.prepareGet(indexKey, table, key).execute().actionGet();
+ final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
if (response.isExists()) {
- for (Entry entry : StringByteIterator
- .getStringMap(values).entrySet()) {
+ for (Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
response.getSource().put(entry.getKey(), entry.getValue());
}
- client.prepareIndex(indexKey, table, key)
- .setSource(response.getSource()).execute().actionGet();
+ client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet();
return Status.OK;
+ } else {
+ return Status.NOT_FOUND;
}
-
} catch (Exception e) {
e.printStackTrace();
+ return Status.ERROR;
}
- return Status.ERROR;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
- public Status scan(String table, String startkey, int recordcount,
- Set fields, Vector> result) {
+ public Status scan(
+ String table,
+ String startkey,
+ int recordcount,
+ Set fields,
+ Vector> result) {
try {
final RangeQueryBuilder rangeQuery = rangeQuery("_id").gte(startkey);
final SearchResponse response = client.prepareSearch(indexKey)
.setTypes(table)
.setQuery(rangeQuery)
.setSize(recordcount)
.execute()
.actionGet();
HashMap entry;
for (SearchHit hit : response.getHits()) {
- entry = new HashMap(fields.size());
-
+ entry = new HashMap<>(fields.size());
for (String field : fields) {
- entry.put(field,
- new StringByteIterator((String) hit.getSource().get(field)));
+ entry.put(field, new StringByteIterator((String) hit.getSource().get(field)));
}
-
result.add(entry);
}
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
+ return Status.ERROR;
}
- return Status.ERROR;
}
}
diff --git a/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java b/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java
index 1a80cca3..69e52ff6 100644
--- a/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java
+++ b/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java
@@ -1,153 +1,149 @@
/**
* Copyright (c) 2012 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.yahoo.ycsb.db;
import static org.testng.AssertJUnit.assertEquals;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Set;
import java.util.Vector;
-/**
- *
- * @author saden
- */
public class ElasticsearchClientTest {
protected final static ElasticsearchClient instance = new ElasticsearchClient();
protected final static HashMap MOCK_DATA;
protected final static String MOCK_TABLE = "MOCK_TABLE";
protected final static String MOCK_KEY0 = "0";
protected final static String MOCK_KEY1 = "1";
protected final static String MOCK_KEY2 = "2";
static {
MOCK_DATA = new HashMap(10);
for (int i = 1; i <= 10; i++) {
MOCK_DATA.put("field" + i, new StringByteIterator("value" + i));
}
}
@BeforeClass
public static void setUpClass() throws DBException {
instance.init();
}
@AfterClass
public static void tearDownClass() throws DBException {
instance.cleanup();
}
@BeforeMethod
public void setUp() {
instance.insert(MOCK_TABLE, MOCK_KEY1, MOCK_DATA);
instance.insert(MOCK_TABLE, MOCK_KEY2, MOCK_DATA);
}
@AfterMethod
public void tearDown() {
instance.delete(MOCK_TABLE, MOCK_KEY1);
instance.delete(MOCK_TABLE, MOCK_KEY2);
}
/**
* Test of insert method, of class ElasticsearchClient.
*/
@Test
public void testInsert() {
System.out.println("insert");
Status result = instance.insert(MOCK_TABLE, MOCK_KEY0, MOCK_DATA);
assertEquals(Status.OK, result);
}
/**
* Test of delete method, of class ElasticsearchClient.
*/
@Test
public void testDelete() {
System.out.println("delete");
Status result = instance.delete(MOCK_TABLE, MOCK_KEY1);
assertEquals(Status.OK, result);
}
/**
* Test of read method, of class ElasticsearchClient.
*/
@Test
public void testRead() {
System.out.println("read");
Set fields = MOCK_DATA.keySet();
HashMap resultParam = new HashMap(10);
Status result = instance.read(MOCK_TABLE, MOCK_KEY1, fields, resultParam);
assertEquals(Status.OK, result);
}
/**
* Test of update method, of class ElasticsearchClient.
*/
@Test
public void testUpdate() {
System.out.println("update");
int i;
HashMap newValues = new HashMap(10);
for (i = 1; i <= 10; i++) {
newValues.put("field" + i, new StringByteIterator("newvalue" + i));
}
Status result = instance.update(MOCK_TABLE, MOCK_KEY1, newValues);
assertEquals(Status.OK, result);
//validate that the values changed
HashMap resultParam = new HashMap(10);
instance.read(MOCK_TABLE, MOCK_KEY1, MOCK_DATA.keySet(), resultParam);
for (i = 1; i <= 10; i++) {
assertEquals("newvalue" + i, resultParam.get("field" + i).toString());
}
}
/**
* Test of scan method, of class ElasticsearchClient.
*/
@Test
public void testScan() {
System.out.println("scan");
int recordcount = 10;
Set fields = MOCK_DATA.keySet();
Vector> resultParam = new Vector>(10);
Status result = instance.scan(MOCK_TABLE, MOCK_KEY1, recordcount, fields, resultParam);
assertEquals(Status.OK, result);
}
}
diff --git a/geode/pom.xml b/geode/pom.xml
index 40111413..2ac23979 100644
--- a/geode/pom.xml
+++ b/geode/pom.xml
@@ -1,74 +1,74 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
geode-binding
Geode DB Binding
jar
false
org.apache.geode
- gemfire-core
+ geode-core
${geode.version}
com.yahoo.ycsb
core
${project.version}
provided
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
true
../checkstyle.xml
true
true
validate
validate
checkstyle
diff --git a/googlebigtable/README.md b/googlebigtable/README.md
new file mode 100644
index 00000000..3938b525
--- /dev/null
+++ b/googlebigtable/README.md
@@ -0,0 +1,80 @@
+
+
+# Google Bigtable Driver for YCSB
+
+This driver provides a YCSB workload binding for Google's hosted Bigtable, the inspiration for a number of key-value stores like HBase and Cassandra. The Bigtable Java client provides both Protobuf based GRPC and HBase client APIs. This binding implements the Protobuf API for testing the native client. To test Bigtable using the HBase API, see the `hbase10` binding.
+
+## Quickstart
+
+### 1. Setup a Bigtable Cluster
+
+Login to the Google Cloud Console and follow the [Creating Cluster](https://cloud.google.com/bigtable/docs/creating-cluster) steps. Make a note of your cluster name, zone and project ID.
+
+### 2. Launch the Bigtable Shell
+
+From the Cloud Console, launch a shell and follow the [Quickstart](https://cloud.google.com/bigtable/docs/quickstart) up to step 4 where you launch the HBase shell.
+
+### 3. Create a Table
+
+For best results, use the pre-splitting strategy recommended in [HBASE-4163](https://issues.apache.org/jira/browse/HBASE-4163):
+
+```
+hbase(main):001:0> n_splits = 200 # HBase recommends (10 * number of regionservers)
+hbase(main):002:0> create 'usertable', 'cf', {SPLITS => (1..n_splits).map {|i| "user#{1000+i*(9999-1000)/n_splits}"}}
+```
+
+Make a note of the column family, in this example it's `cf``.
+
+### 4. Fetch the Proper ALPN Boot Jar
+
+The Bigtable protocol uses HTTP/2 which requires an ALPN protocol negotiation implementation. On JVM instantiation the implementation must be loaded before attempting to connect to the cluster. If you're using Java 7 or 8, use this [Jetty Version Table](http://www.eclipse.org/jetty/documentation/current/alpn-chapter.html#alpn-versions) to determine the version appropriate for your JVM. (ALPN is included in JDK 9+). Download the proper jar from [Maven](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.mortbay.jetty.alpn%22%20AND%20a%3A%22alpn-boot%22) somewhere on your system.
+
+### 5. Download JSON Credentials
+
+Follow these instructions for [Generating a JSON key](https://cloud.google.com/bigtable/docs/installing-hbase-shell#service-account) and save it to your host.
+
+### 6. Load a Workload
+
+Switch to the root of the YCSB repo and choose the workload you want to run and `load` it first. With the CLI you must provide the column family, cluster properties and the ALPN jar to load.
+
+```
+bin/ycsb load googlebigtable -p columnfamily=cf -p google.bigtable.project.id= -p google.bigtable.cluster.name= -p google.bigtable.zone.name= -p google.bigtable.auth.service.account.enable=true -p google.bigtable.auth.json.keyfile= -jvm-args='-Xbootclasspath/p:' -P workloads/workloada
+
+```
+
+Make sure to replace the variables in the angle brackets above with the proper value from your cluster. Additional configuration parameters are available below.
+
+The `load` step only executes inserts into the datastore. After loading data, run the same workload to mix reads with writes.
+
+```
+bin/ycsb run googlebigtable -p columnfamily=cf -p google.bigtable.project.id= -p google.bigtable.cluster.name= -p google.bigtable.zone.name= -p google.bigtable.auth.service.account.enable=true -p google.bigtable.auth.json.keyfile= -jvm-args='-Xbootclasspath/p:' -P workloads/workloada
+
+```
+
+## Configuration Options
+
+The following options can be configured using CLI (using the `-p` parameter) or hbase-site.xml (add the HBase config directory to YCSB's class path via CLI). Check the [Cloud Bigtable Client](https://github.com/manolama/cloud-bigtable-client) project for additional tuning parameters.
+
+* `columnfamily`: (Required) The Bigtable column family to target.
+* `google.bigtable.project.id`: (Required) The ID of a Bigtable project.
+* `google.bigtable.cluster.name`: (Required) The name of a Bigtable cluster.
+* `google.bigtable.zone.name`: (Required) Zone where the Bigtable cluster is running.
+* `google.bigtable.auth.service.account.enable`: Whether or not to authenticate with a service account. The default is true.
+* `google.bigtable.auth.json.keyfile`: (Required) A service account key for authentication.
+* `debug`: If true, prints debug information to standard out. The default is false.
+* `clientbuffering`: Whether or not to use client side buffering and batching of write operations. This can significantly improve performance and defaults to true.
diff --git a/hbase098/pom.xml b/googlebigtable/pom.xml
similarity index 74%
copy from hbase098/pom.xml
copy to googlebigtable/pom.xml
index 29600f7f..802cd6ee 100644
--- a/hbase098/pom.xml
+++ b/googlebigtable/pom.xml
@@ -1,48 +1,47 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent/
- hbase098-binding
- HBase 0.98.x DB Binding
-
-
- false
-
+ googlebigtable-binding
+ Google Cloud Bigtable Binding
+ jar
- org.apache.hbase
- hbase-client
- ${hbase098.version}
+ com.google.cloud.bigtable
+ bigtable-hbase-1.0
+ ${googlebigtable.version}
+
com.yahoo.ycsb
core
${project.version}
provided
+
-
+
\ No newline at end of file
diff --git a/googlebigtable/src/main/java/com/yahoo/ycsb/db/GoogleBigtableClient.java b/googlebigtable/src/main/java/com/yahoo/ycsb/db/GoogleBigtableClient.java
new file mode 100644
index 00000000..d0d21dda
--- /dev/null
+++ b/googlebigtable/src/main/java/com/yahoo/ycsb/db/GoogleBigtableClient.java
@@ -0,0 +1,445 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.db;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.ExecutionException;
+
+import com.google.bigtable.repackaged.com.google.protobuf.ByteString;
+import com.google.bigtable.repackaged.com.google.protobuf.ServiceException;
+import com.google.bigtable.v1.Column;
+import com.google.bigtable.v1.Family;
+import com.google.bigtable.v1.MutateRowRequest;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.v1.Mutation.DeleteFromRow;
+import com.google.bigtable.v1.Mutation.SetCell;
+import com.google.bigtable.v1.RowFilter.Chain.Builder;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableDataClient;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
+import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
+import com.google.cloud.bigtable.util.ByteStringer;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.Status;
+
+/**
+ * Google Bigtable Proto client for YCSB framework.
+ *
+ * Bigtable offers two APIs. These include a native Protobuf GRPC API as well as
+ * an HBase API wrapper for the GRPC API. This client implements the Protobuf
+ * API to test the underlying calls wrapped up in the HBase API. To use the
+ * HBase API, see the hbase10 client binding.
+ */
+public class GoogleBigtableClient extends com.yahoo.ycsb.DB {
+ public static final Charset UTF8_CHARSET = Charset.forName("UTF8");
+
+ /** Property names for the CLI. */
+ private static final String ASYNC_MUTATOR_MAX_MEMORY = "mutatorMaxMemory";
+ private static final String ASYNC_MAX_INFLIGHT_RPCS = "mutatorMaxInflightRPCs";
+ private static final String CLIENT_SIDE_BUFFERING = "clientbuffering";
+
+ /** Tracks running thread counts so we know when to close the session. */
+ private static int threadCount = 0;
+
+ /** This will load the hbase-site.xml config file and/or store CLI options. */
+ private static final Configuration CONFIG = HBaseConfiguration.create();
+
+ /** Print debug information to standard out. */
+ private boolean debug = false;
+
+ /** Global Bigtable native API objects. */
+ private static BigtableOptions options;
+ private static BigtableSession session;
+
+ /** Thread loacal Bigtable native API objects. */
+ private BigtableDataClient client;
+ private HeapSizeManager heapSizeManager;
+ private AsyncExecutor asyncExecutor;
+
+ /** The column family use for the workload. */
+ private byte[] columnFamilyBytes;
+
+ /** Cache for the last table name/ID to avoid byte conversions. */
+ private String lastTable = "";
+ private byte[] lastTableBytes;
+
+ /**
+ * If true, buffer mutations on the client. For measuring insert/update/delete
+ * latencies, client side buffering should be disabled.
+ */
+ private boolean clientSideBuffering = false;
+
+ @Override
+ public void init() throws DBException {
+ Properties props = getProperties();
+
+ // Defaults the user can override if needed
+ CONFIG.set("google.bigtable.auth.service.account.enable", "true");
+
+ // make it easy on ourselves by copying all CLI properties into the config object.
+ final Iterator> it = props.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry entry = it.next();
+ CONFIG.set((String)entry.getKey(), (String)entry.getValue());
+ }
+
+ clientSideBuffering = getProperties().getProperty(CLIENT_SIDE_BUFFERING, "false")
+ .equals("true") ? true : false;
+
+ System.err.println("Running Google Bigtable with Proto API" +
+ (clientSideBuffering ? " and client side buffering." : "."));
+
+ synchronized (CONFIG) {
+ ++threadCount;
+ if (session == null) {
+ try {
+ options = BigtableOptionsFactory.fromConfiguration(CONFIG);
+ session = new BigtableSession(options);
+ // important to instantiate the first client here, otherwise the
+ // other threads may receive an NPE from the options when they try
+ // to read the cluster name.
+ client = session.getDataClient();
+ } catch (IOException e) {
+ throw new DBException("Error loading options from config: ", e);
+ }
+ } else {
+ client = session.getDataClient();
+ }
+
+ if (clientSideBuffering) {
+ heapSizeManager = new HeapSizeManager(
+ Long.parseLong(
+ getProperties().getProperty(ASYNC_MUTATOR_MAX_MEMORY,
+ Long.toString(AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT))),
+ Integer.parseInt(
+ getProperties().getProperty(ASYNC_MAX_INFLIGHT_RPCS,
+ Integer.toString(AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT))));
+ asyncExecutor = new AsyncExecutor(client, heapSizeManager);
+ }
+ }
+
+ if ((getProperties().getProperty("debug") != null)
+ && (getProperties().getProperty("debug").compareTo("true") == 0)) {
+ debug = true;
+ }
+
+ final String columnFamily = getProperties().getProperty("columnfamily");
+ if (columnFamily == null) {
+ System.err.println("Error, must specify a columnfamily for Bigtable table");
+ throw new DBException("No columnfamily specified");
+ }
+ columnFamilyBytes = Bytes.toBytes(columnFamily);
+ }
+
+ @Override
+ public void cleanup() throws DBException {
+ if (asyncExecutor != null) {
+ try {
+ asyncExecutor.flush();
+ } catch (IOException e) {
+ throw new DBException(e);
+ }
+ }
+ synchronized (CONFIG) {
+ --threadCount;
+ if (threadCount <= 0) {
+ try {
+ session.close();
+ } catch (IOException e) {
+ throw new DBException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Status read(String table, String key, Set fields,
+ HashMap result) {
+ if (debug) {
+ System.out.println("Doing read from Bigtable columnfamily "
+ + new String(columnFamilyBytes));
+ System.out.println("Doing read for key: " + key);
+ }
+
+ setTable(table);
+
+ RowFilter filter = RowFilter.newBuilder()
+ .setFamilyNameRegexFilterBytes(ByteStringer.wrap(columnFamilyBytes))
+ .build();
+ if (fields != null && fields.size() > 0) {
+ Builder filterChain = RowFilter.Chain.newBuilder();
+ filterChain.addFilters(filter);
+ filterChain.addFilters(RowFilter.newBuilder()
+ .setCellsPerColumnLimitFilter(1)
+ .build());
+ int count = 0;
+ // usually "field#" so pre-alloc
+ final StringBuilder regex = new StringBuilder(fields.size() * 6);
+ for (final String field : fields) {
+ if (count++ > 0) {
+ regex.append("|");
+ }
+ regex.append(field);
+ }
+ filterChain.addFilters(RowFilter.newBuilder()
+ .setColumnQualifierRegexFilter(
+ ByteStringer.wrap(regex.toString().getBytes()))).build();
+ filter = RowFilter.newBuilder().setChain(filterChain.build()).build();
+ }
+
+ final ReadRowsRequest.Builder rrr = ReadRowsRequest.newBuilder()
+ .setTableNameBytes(ByteStringer.wrap(lastTableBytes))
+ .setFilter(filter)
+ .setRowKey(ByteStringer.wrap(key.getBytes()));
+
+ List rows;
+ try {
+ rows = client.readRowsAsync(rrr.build()).get();
+ if (rows == null || rows.isEmpty()) {
+ return Status.NOT_FOUND;
+ }
+ for (final Row row : rows) {
+ for (final Family family : row.getFamiliesList()) {
+ if (Arrays.equals(family.getNameBytes().toByteArray(), columnFamilyBytes)) {
+ for (final Column column : family.getColumnsList()) {
+ // we should only have a single cell per column
+ result.put(column.getQualifier().toString(UTF8_CHARSET),
+ new ByteArrayByteIterator(column.getCells(0).getValue().toByteArray()));
+ if (debug) {
+ System.out.println(
+ "Result for field: " + column.getQualifier().toString(UTF8_CHARSET)
+ + " is: " + column.getCells(0).getValue().toString(UTF8_CHARSET));
+ }
+ }
+ }
+ }
+ }
+
+ return Status.OK;
+ } catch (InterruptedException e) {
+ System.err.println("Interrupted during get: " + e);
+ Thread.currentThread().interrupt();
+ return Status.ERROR;
+ } catch (ExecutionException e) {
+ System.err.println("Exception during get: " + e);
+ return Status.ERROR;
+ }
+ }
+
+ @Override
+ public Status scan(String table, String startkey, int recordcount,
+ Set fields, Vector> result) {
+ setTable(table);
+
+ RowFilter filter = RowFilter.newBuilder()
+ .setFamilyNameRegexFilterBytes(ByteStringer.wrap(columnFamilyBytes))
+ .build();
+ if (fields != null && fields.size() > 0) {
+ Builder filterChain = RowFilter.Chain.newBuilder();
+ filterChain.addFilters(filter);
+ filterChain.addFilters(RowFilter.newBuilder()
+ .setCellsPerColumnLimitFilter(1)
+ .build());
+ int count = 0;
+ // usually "field#" so pre-alloc
+ final StringBuilder regex = new StringBuilder(fields.size() * 6);
+ for (final String field : fields) {
+ if (count++ > 0) {
+ regex.append("|");
+ }
+ regex.append(field);
+ }
+ filterChain.addFilters(RowFilter.newBuilder()
+ .setColumnQualifierRegexFilter(
+ ByteStringer.wrap(regex.toString().getBytes()))).build();
+ filter = RowFilter.newBuilder().setChain(filterChain.build()).build();
+ }
+
+ final RowRange range = RowRange.newBuilder()
+ .setStartKey(ByteStringer.wrap(startkey.getBytes()))
+ .build();
+
+ final ReadRowsRequest.Builder rrr = ReadRowsRequest.newBuilder()
+ .setTableNameBytes(ByteStringer.wrap(lastTableBytes))
+ .setFilter(filter)
+ .setRowRange(range);
+
+ List rows;
+ try {
+ rows = client.readRowsAsync(rrr.build()).get();
+ if (rows == null || rows.isEmpty()) {
+ return Status.NOT_FOUND;
+ }
+ int numResults = 0;
+
+ for (final Row row : rows) {
+ final HashMap rowResult =
+ new HashMap(fields != null ? fields.size() : 10);
+
+ for (final Family family : row.getFamiliesList()) {
+ if (Arrays.equals(family.getNameBytes().toByteArray(), columnFamilyBytes)) {
+ for (final Column column : family.getColumnsList()) {
+ // we should only have a single cell per column
+ rowResult.put(column.getQualifier().toString(UTF8_CHARSET),
+ new ByteArrayByteIterator(column.getCells(0).getValue().toByteArray()));
+ if (debug) {
+ System.out.println(
+ "Result for field: " + column.getQualifier().toString(UTF8_CHARSET)
+ + " is: " + column.getCells(0).getValue().toString(UTF8_CHARSET));
+ }
+ }
+ }
+ }
+
+ result.add(rowResult);
+
+ numResults++;
+ if (numResults >= recordcount) {// if hit recordcount, bail out
+ break;
+ }
+ }
+ return Status.OK;
+ } catch (InterruptedException e) {
+ System.err.println("Interrupted during scan: " + e);
+ Thread.currentThread().interrupt();
+ return Status.ERROR;
+ } catch (ExecutionException e) {
+ System.err.println("Exception during scan: " + e);
+ return Status.ERROR;
+ }
+ }
+
+ @Override
+ public Status update(String table, String key,
+ HashMap values) {
+ if (debug) {
+ System.out.println("Setting up put for key: " + key);
+ }
+
+ setTable(table);
+
+ final MutateRowRequest.Builder rowMutation = MutateRowRequest.newBuilder();
+ rowMutation.setRowKey(ByteString.copyFromUtf8(key));
+ rowMutation.setTableNameBytes(ByteStringer.wrap(lastTableBytes));
+
+ for (final Entry entry : values.entrySet()) {
+ final Mutation.Builder mutationBuilder = rowMutation.addMutationsBuilder();
+ final SetCell.Builder setCellBuilder = mutationBuilder.getSetCellBuilder();
+
+ setCellBuilder.setFamilyNameBytes(ByteStringer.wrap(columnFamilyBytes));
+ setCellBuilder.setColumnQualifier(ByteStringer.wrap(entry.getKey().getBytes()));
+ setCellBuilder.setValue(ByteStringer.wrap(entry.getValue().toArray()));
+
+ // Bigtable uses a 1ms granularity
+ setCellBuilder.setTimestampMicros(System.currentTimeMillis() * 1000);
+ }
+
+ try {
+ if (clientSideBuffering) {
+ asyncExecutor.mutateRowAsync(rowMutation.build());
+ } else {
+ client.mutateRow(rowMutation.build());
+ }
+ return Status.OK;
+ } catch (ServiceException e) {
+ System.err.println("Failed to insert key: " + key + " " + e.getMessage());
+ return Status.ERROR;
+ } catch (InterruptedException e) {
+ System.err.println("Interrupted while inserting key: " + key + " "
+ + e.getMessage());
+ Thread.currentThread().interrupt();
+ return Status.ERROR; // never get here, but lets make the compiler happy
+ }
+ }
+
+ @Override
+ public Status insert(String table, String key,
+ HashMap values) {
+ return update(table, key, values);
+ }
+
+ @Override
+ public Status delete(String table, String key) {
+ if (debug) {
+ System.out.println("Doing delete for key: " + key);
+ }
+
+ setTable(table);
+
+ final MutateRowRequest.Builder rowMutation = MutateRowRequest.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8(key))
+ .setTableNameBytes(ByteStringer.wrap(lastTableBytes));
+ rowMutation.addMutationsBuilder().setDeleteFromRow(
+ DeleteFromRow.getDefaultInstance());
+
+ try {
+ if (clientSideBuffering) {
+ asyncExecutor.mutateRowAsync(rowMutation.build());
+ } else {
+ client.mutateRow(rowMutation.build());
+ }
+ return Status.OK;
+ } catch (ServiceException e) {
+ System.err.println("Failed to delete key: " + key + " " + e.getMessage());
+ return Status.ERROR;
+ } catch (InterruptedException e) {
+ System.err.println("Interrupted while delete key: " + key + " "
+ + e.getMessage());
+ Thread.currentThread().interrupt();
+ return Status.ERROR; // never get here, but lets make the compiler happy
+ }
+ }
+
+ /**
+ * Little helper to set the table byte array. If it's different than the last
+ * table we reset the byte array. Otherwise we just use the existing array.
+ * @param table The table we're operating against
+ */
+ private void setTable(final String table) {
+ if (!lastTable.equals(table)) {
+ lastTable = table;
+ lastTableBytes = options
+ .getClusterName()
+ .toTableName(table)
+ .toString()
+ .getBytes();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/googlebigtable/src/main/java/com/yahoo/ycsb/db/package-info.java b/googlebigtable/src/main/java/com/yahoo/ycsb/db/package-info.java
new file mode 100644
index 00000000..f0ab9e74
--- /dev/null
+++ b/googlebigtable/src/main/java/com/yahoo/ycsb/db/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for Google's
+ * Bigtable .
+ */
+package com.yahoo.ycsb.db;
diff --git a/googledatastore/README.md b/googledatastore/README.md
index a6755a65..80b6a4cf 100644
--- a/googledatastore/README.md
+++ b/googledatastore/README.md
@@ -1,91 +1,94 @@
# Google Cloud Datastore Binding
https://cloud.google.com/datastore/docs/concepts/overview?hl=en
+Please refer [here] (https://cloud.google.com/datastore/docs/apis/overview) for more information on
+Google Cloud Datastore API.
+
## Configure
YCSB_HOME - YCSB home directory
DATASTORE_HOME - Google Cloud Datastore YCSB client package files
Please refer to https://github.com/brianfrankcooper/YCSB/wiki/Using-the-Database-Libraries
for more information on setup.
# Benchmark
$YCSB_HOME/bin/ycsb load googledatastore -P workloads/workloada -P googledatastore.properties
$YCSB_HOME/bin/ycsb run googledatastore -P workloads/workloada -P googledatastore.properties
# Properties
$DATASTORE_HOME/conf/googledatastore.properties
# Details
A. Configuration and setup:
See this link for instructions about setting up Google Cloud Datastore and
authentication:
-https://cloud.google.com/datastore/docs/getstarted/start_java/
+https://cloud.google.com/datastore/docs/activate#accessing_the_datastore_api_from_another_platform
After you setup your environment, you will have 3 pieces of information ready:
- datasetId,
- service account email, and
- a private key file in P12 format.
These will be configured via corresponding properties in the googledatastore.properties file.
B. EntityGroupingMode
In Google Datastore, Entity Group is the unit in which the user can
perform strongly consistent query on multiple items; Meanwhile, Entity group
also has certain limitations in performance, especially with write QPS.
We support two modes here:
1. [default] One entity per group (ONE_ENTITY_PER_GROUP)
In this mode, every entity is a "root" entity and sits in one group,
and every entity group has only one entity. Write QPS is high in this
mode (and there is no documented limitation on this). But query across
multiple entities are eventually consistent.
When this mode is set, every entity is created with no ancestor key (meaning
the entity itself is the "root" entity).
2. Multiple entities per group (MULTI_ENTITY_PER_GROUP)
In this mode, all entities in one benchmark run are placed under one
ancestor (root) node therefore inside one entity group. Query/scan
performed on these entities will be strongly consistent but write QPS
will be subject to documented limitation (current is at 1 QPS).
Because of the write QPS limit, it's highly recommended that you rate
limit your benchmark's test rate to avoid excessive errors.
The goal of this MULTI_ENTITY_PER_GROUP mode is to allow user to
benchmark and understand performance characteristics of a single entity
group of the Google Datastore.
While in this mode, one can optionally specify a root key name. If not
specified, a default name will be used.
diff --git a/googledatastore/conf/googledatastore.properties b/googledatastore/conf/googledatastore.properties
index ac95b570..408acf0d 100644
--- a/googledatastore/conf/googledatastore.properties
+++ b/googledatastore/conf/googledatastore.properties
@@ -1,56 +1,56 @@
# Copyright (c) 2015 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
#
# Sample property file for Google Cloud Datastore DB client
## Mandatory parameters
#
# Your credentials to Google datastore. See README.md for details.
#
# googledatastore.datasetId=
# googledatastore.privateKeyFile=
# googledatastore.serviceAccountEmail=
# Google Cloud Datastore's read and update APIs do not support
# reading or updating a select subset of properties for an entity.
-# (as of version v1beta2-rev1-3.0.2)
+# (as of version v1beta3)
# Therefore, it's recommended that you set writeallfields and readallfields
# to true to get stable and comparable performance numbers.
writeallfields = true
readallfields = true
## Optional parameters
#
# Decides the consistency level of read requests. Acceptable values are:
# EVENTUAL, STRONG (default is STRONG)
#
# googledatastore.readConsistency=STRONG
# Decides how we group entities into entity groups.
# (See the details section in README.md for documentation)
#
# googledatastore.entityGroupingMode=ONE_ENTITY_PER_GROUP
# If you set the googledatastore.entityGroupingMode property to
# MULTI_ENTITY_PER_GROUP, you can optionally specify the name of the root entity
#
# googledatastore.rootEntityName="YCSB_ROOT_ENTITY"
# Strongly recommended to set to uniform.
# requestdistribution = uniform
# Enable/disable debug message, default is false.
# googledatastore.debug = false
\ No newline at end of file
diff --git a/googledatastore/pom.xml b/googledatastore/pom.xml
index 57db3505..4beef6d1 100644
--- a/googledatastore/pom.xml
+++ b/googledatastore/pom.xml
@@ -1,50 +1,50 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
googledatastore-binding
Google Cloud Datastore Binding
https://github.com/GoogleCloudPlatform/google-cloud-datastore
- com.google.apis
- google-api-services-datastore-protobuf
- v1beta2-rev1-3.0.2
+ com.google.cloud.datastore
+ datastore-v1beta3-proto-client
+ 1.0.0-beta.1
log4j
log4j
1.2.17
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java b/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java
index 12fc0fac..a3f65534 100644
--- a/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java
+++ b/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java
@@ -1,336 +1,335 @@
/*
* Copyright 2015 YCSB contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.services.datastore.DatastoreV1.*;
-import com.google.api.services.datastore.DatastoreV1.CommitRequest.Mode;
-import com.google.api.services.datastore.DatastoreV1.ReadOptions
- .ReadConsistency;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreException;
-import com.google.api.services.datastore.client.DatastoreFactory;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.DatastoreOptions;
+import com.google.datastore.v1beta3.*;
+import com.google.datastore.v1beta3.CommitRequest.Mode;
+import com.google.datastore.v1beta3.ReadOptions.ReadConsistency;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreException;
+import com.google.datastore.v1beta3.client.DatastoreFactory;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.DatastoreOptions;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Vector;
import javax.annotation.Nullable;
/**
* Google Cloud Datastore Client for YCSB.
*/
public class GoogleDatastoreClient extends DB {
/**
* Defines a MutationType used in this class.
*/
private enum MutationType {
UPSERT,
UPDATE,
DELETE
}
/**
* Defines a EntityGroupingMode enum used in this class.
*/
private enum EntityGroupingMode {
ONE_ENTITY_PER_GROUP,
MULTI_ENTITY_PER_GROUP
}
private static Logger logger =
Logger.getLogger(GoogleDatastoreClient.class);
// Read consistency defaults to "STRONG" per YCSB guidance.
// User can override this via configure.
private ReadConsistency readConsistency = ReadConsistency.STRONG;
private EntityGroupingMode entityGroupingMode =
EntityGroupingMode.ONE_ENTITY_PER_GROUP;
private String rootEntityName;
private Datastore datastore = null;
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
String debug = getProperties().getProperty("googledatastore.debug", null);
if (null != debug && "true".equalsIgnoreCase(debug)) {
logger.setLevel(Level.DEBUG);
}
// We need the following 3 essential properties to initialize datastore:
//
// - DatasetId,
// - Path to private key file,
// - Service account email address.
String datasetId = getProperties().getProperty(
"googledatastore.datasetId", null);
if (datasetId == null) {
throw new DBException(
"Required property \"datasetId\" missing.");
}
String privateKeyFile = getProperties().getProperty(
"googledatastore.privateKeyFile", null);
if (privateKeyFile == null) {
throw new DBException(
"Required property \"privateKeyFile\" missing.");
}
String serviceAccountEmail = getProperties().getProperty(
"googledatastore.serviceAccountEmail", null);
if (serviceAccountEmail == null) {
throw new DBException(
"Required property \"serviceAccountEmail\" missing.");
}
// Below are properties related to benchmarking.
String readConsistencyConfig = getProperties().getProperty(
"googledatastore.readConsistency", null);
if (readConsistencyConfig != null) {
try {
this.readConsistency = ReadConsistency.valueOf(
readConsistencyConfig.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new DBException("Invalid read consistency specified: " +
readConsistencyConfig + ". Expecting STRONG or EVENTUAL.");
}
}
//
// Entity Grouping Mode (googledatastore.entitygroupingmode), see
// documentation in conf/googledatastore.properties.
//
String entityGroupingConfig = getProperties().getProperty(
"googledatastore.entityGroupingMode", null);
if (entityGroupingConfig != null) {
try {
this.entityGroupingMode = EntityGroupingMode.valueOf(
entityGroupingConfig.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new DBException("Invalid entity grouping mode specified: " +
entityGroupingConfig + ". Expecting ONE_ENTITY_PER_GROUP or " +
"MULTI_ENTITY_PER_GROUP.");
}
}
this.rootEntityName = getProperties().getProperty(
"googledatastore.rootEntityName", "YCSB_ROOT_ENTITY");
try {
// Setup the connection to Google Cloud Datastore with the credentials
// obtained from the configure.
DatastoreOptions.Builder options = new DatastoreOptions.Builder();
Credential credential = DatastoreHelper.getServiceAccountCredential(
serviceAccountEmail, privateKeyFile);
logger.info("Using JWT Service Account credential.");
logger.info("DatasetID: " + datasetId + ", Service Account Email: " +
serviceAccountEmail + ", Private Key File Path: " + privateKeyFile);
datastore = DatastoreFactory.get().create(
- options.credential(credential).dataset(datasetId).build());
+ options.credential(credential).projectId(datasetId).build());
} catch (GeneralSecurityException exception) {
throw new DBException("Security error connecting to the datastore: " +
exception.getMessage(), exception);
} catch (IOException exception) {
throw new DBException("I/O error connecting to the datastore: " +
exception.getMessage(), exception);
}
logger.info("Datastore client instance created: " +
datastore.toString());
}
@Override
public Status read(String table, String key, Set fields,
HashMap result) {
LookupRequest.Builder lookupRequest = LookupRequest.newBuilder();
- lookupRequest.addKey(buildPrimaryKey(table, key));
+ lookupRequest.addKeys(buildPrimaryKey(table, key));
lookupRequest.getReadOptionsBuilder().setReadConsistency(
this.readConsistency);
// Note above, datastore lookupRequest always reads the entire entity, it
// does not support reading a subset of "fields" (properties) of an entity.
logger.debug("Built lookup request as: " + lookupRequest.toString());
LookupResponse response = null;
try {
response = datastore.lookup(lookupRequest.build());
} catch (DatastoreException exception) {
logger.error(
String.format("Datastore Exception when reading (%s): %s %s",
exception.getMessage(),
exception.getMethodName(),
exception.getCode()));
// DatastoreException.getCode() returns an HTTP response code which we
// will bubble up to the user as part of the YCSB Status "name".
return new Status("ERROR-" + exception.getCode(), exception.getMessage());
}
if (response.getFoundCount() == 0) {
return new Status("ERROR-404", "Not Found, key is: " + key);
} else if (response.getFoundCount() > 1) {
// We only asked to lookup for one key, shouldn't have got more than one
// entity back. Unexpected State.
return Status.UNEXPECTED_STATE;
}
Entity entity = response.getFound(0).getEntity();
logger.debug("Read entity: " + entity.toString());
- Map properties = DatastoreHelper.getPropertyMap(entity);
+ Map properties = entity.getProperties();
Set propertiesToReturn =
(fields == null ? properties.keySet() : fields);
for (String name : propertiesToReturn) {
if (properties.containsKey(name)) {
result.put(name, new StringByteIterator(properties.get(name)
.getStringValue()));
}
}
return Status.OK;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set fields, Vector> result) {
// TODO: Implement Scan as query on primary key.
return Status.NOT_IMPLEMENTED;
}
@Override
public Status update(String table, String key,
HashMap values) {
return doSingleItemMutation(table, key, values, MutationType.UPDATE);
}
@Override
public Status insert(String table, String key,
HashMap values) {
// Use Upsert to allow overwrite of existing key instead of failing the
// load (or run) just because the DB already has the key.
// This is the same behavior as what other DB does here (such as
// the DynamoDB client).
return doSingleItemMutation(table, key, values, MutationType.UPSERT);
}
@Override
public Status delete(String table, String key) {
return doSingleItemMutation(table, key, null, MutationType.DELETE);
}
private Key.Builder buildPrimaryKey(String table, String key) {
Key.Builder result = Key.newBuilder();
if (this.entityGroupingMode == EntityGroupingMode.MULTI_ENTITY_PER_GROUP) {
// All entities are in side the same group when we are in this mode.
- result.addPathElement(Key.PathElement.newBuilder().setKind(table).
+ result.addPath(Key.PathElement.newBuilder().setKind(table).
setName(rootEntityName));
}
- return result.addPathElement(Key.PathElement.newBuilder().setKind(table)
+ return result.addPath(Key.PathElement.newBuilder().setKind(table)
.setName(key));
}
private Status doSingleItemMutation(String table, String key,
@Nullable HashMap values,
MutationType mutationType) {
// First build the key.
Key.Builder datastoreKey = buildPrimaryKey(table, key);
// Build a commit request in non-transactional mode.
// Single item mutation to google datastore
// is always atomic and strongly consistent. Transaction is only necessary
// for multi-item mutation, or Read-modify-write operation.
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(Mode.NON_TRANSACTIONAL);
if (mutationType == MutationType.DELETE) {
- commitRequest.getMutationBuilder().addDelete(datastoreKey);
+ commitRequest.addMutationsBuilder().setDelete(datastoreKey);
} else {
// If this is not for delete, build the entity.
Entity.Builder entityBuilder = Entity.newBuilder();
entityBuilder.setKey(datastoreKey);
for (Entry val : values.entrySet()) {
- entityBuilder.addProperty(Property.newBuilder()
- .setName(val.getKey())
- .setValue(Value.newBuilder()
- .setStringValue(val.getValue().toString())));
+ entityBuilder.getMutableProperties()
+ .put(val.getKey(),
+ Value.newBuilder()
+ .setStringValue(val.getValue().toString()).build());
}
Entity entity = entityBuilder.build();
logger.debug("entity built as: " + entity.toString());
if (mutationType == MutationType.UPSERT) {
- commitRequest.getMutationBuilder().addUpsert(entity);
+ commitRequest.addMutationsBuilder().setUpsert(entity);
} else if (mutationType == MutationType.UPDATE){
- commitRequest.getMutationBuilder().addUpdate(entity);
+ commitRequest.addMutationsBuilder().setUpdate(entity);
} else {
throw new RuntimeException("Impossible MutationType, code bug.");
}
}
try {
datastore.commit(commitRequest.build());
logger.debug("successfully committed.");
} catch (DatastoreException exception) {
// Catch all Datastore rpc errors.
// Log the exception, the name of the method called and the error code.
logger.error(
String.format("Datastore Exception when committing (%s): %s %s",
exception.getMessage(),
exception.getMethodName(),
exception.getCode()));
// DatastoreException.getCode() returns an HTTP response code which we
// will bubble up to the user as part of the YCSB Status "name".
return new Status("ERROR-" + exception.getCode(), exception.getMessage());
}
return Status.OK;
}
}
diff --git a/hbase094/pom.xml b/hbase094/pom.xml
index ca7d4c60..1823b41f 100644
--- a/hbase094/pom.xml
+++ b/hbase094/pom.xml
@@ -1,68 +1,68 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent/
hbase094-binding
HBase 0.94.x DB Binding
org.apache.hbase
hbase
${hbase094.version}
org.apache.hadoop
hadoop-core
1.0.4
com.yahoo.ycsb
hbase098-binding
${project.version}
*
*
org.slf4j
slf4j-simple
1.7.12
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/hbase098/README.md b/hbase098/README.md
index e6a7fb41..83c3c7a0 100644
--- a/hbase098/README.md
+++ b/hbase098/README.md
@@ -1,79 +1,82 @@
# HBase (0.98.x) Driver for YCSB
This driver is a binding for the YCSB facilities to operate against a HBase 0.98.x Server cluster.
To run against an HBase 0.94.x cluster, use the `hbase094` binding.
To run against an HBase >= 1.0 cluster, use the `hbase10` binding.
## Quickstart
### 1. Start a HBase Server
You need to start a single node or a cluster to point the client at. Please see [Apache HBase Reference Guide](http://hbase.apache.org/book.html) for more details and instructions.
### 2. Set up YCSB
You need to clone the repository and compile everything.
```
git clone git://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn clean package
```
### 3. Create a HBase table for testing
For best results, use the pre-splitting strategy recommended in [HBASE-4163](https://issues.apache.org/jira/browse/HBASE-4163):
```
hbase(main):001:0> n_splits = 200 # HBase recommends (10 * number of regionservers)
hbase(main):002:0> create 'usertable', 'family', {SPLITS => (1..n_splits).map {|i| "user#{1000+i*(9999-1000)/n_splits}"}}
```
*Failing to do so will cause all writes to initially target a single region server*.
### 4. Run the Workload
Before you can actually run the workload, you need to "load" the data first.
You should specify a HBase config directory(or any other directory containing your hbase-site.xml) and a table name and a column family(-cp is used to set java classpath and -p is used to set various properties).
```
bin/ycsb load hbase -P workloads/workloada -cp /HBASE-HOME-DIR/conf -p table=usertable -p columnfamily=family
```
Then, you can run the workload:
```
bin/ycsb run hbase -P workloads/workloada -cp /HBASE-HOME-DIR/conf -p table=usertable -p columnfamily=family
```
Please see the general instructions in the `doc` folder if you are not sure how it all works. You can apply additional properties (as seen in the next section) like this:
```
bin/ycsb run hbase -P workloads/workloada -cp /HBASE-HOME-DIR/conf -p table=usertable -p columnfamily=family -p clientbuffering=true
```
## Configuration Options
Following options can be configurable using `-p`.
* `columnfamily`: The HBase column family to target.
* `debug` : If true, debugging logs are activated. The default is false.
* `hbase.usepagefilter` : If true, HBase
[PageFilter](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PageFilter.html)s
are used to limit the number of records consumed in a scan operation. The default is true.
* `principal`: If testing need to be done against a secure HBase cluster using Kerberos Keytab,
this property can be used to pass the principal in the keytab file.
* `keytab`: The Kerberos keytab file name and location can be passed through this property.
+* `writebuffersize`: The maximum amount, in bytes, of data to buffer on the client side before a flush is forced. The default is 12MB.
+
+Additional HBase settings should be provided in the `hbase-site.xml` file located in your `/HBASE-HOME-DIR/conf` directory. Typically this will be `/etc/hbase/conf`.
\ No newline at end of file
diff --git a/hbase098/pom.xml b/hbase098/pom.xml
index 29600f7f..18efe411 100644
--- a/hbase098/pom.xml
+++ b/hbase098/pom.xml
@@ -1,48 +1,48 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent/
hbase098-binding
HBase 0.98.x DB Binding
false
org.apache.hbase
hbase-client
${hbase098.version}
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/hbase10/README.md b/hbase10/README.md
index 1da5bc43..dd01249e 100644
--- a/hbase10/README.md
+++ b/hbase10/README.md
@@ -1,23 +1,116 @@
# HBase (1.0.x) Driver for YCSB
-This driver is a binding for the YCSB facilities to operate against a HBase 1.0.x Server cluster.
+This driver is a binding for the YCSB facilities to operate against a HBase 1.0.x Server cluster or Google's hosted Bigtable.
To run against an HBase 0.94.x cluster, use the `hbase094` binding.
To run against an HBase 0.98.x cluster, use the `hbase098` binding.
-See `hbase098/README.md` for configuration details.
+See `hbase098/README.md` for a quickstart to setup HBase for load testing and common configuration details.
+
+## Configuration Options
+In addition to those options available for the `hbase098` binding, the following options are available for the `hbase10` binding:
+
+* `durability`: Whether or not writes should be appended to the WAL. Bypassing the WAL can improve throughput but data cannot be recovered in the event of a crash. The default is true.
+
+## Bigtable
+
+Google's Bigtable service provides an implementation of the HBase API for migrating existing applications. Users can perform load tests against Bigtable using this binding.
+
+### 1. Setup a Bigtable Cluster
+
+Login to the Google Cloud Console and follow the [Creating Cluster](https://cloud.google.com/bigtable/docs/creating-cluster) steps. Make a note of your cluster name, zone and project ID.
+
+### 2. Launch the Bigtable Shell
+
+From the Cloud Console, launch a shell and follow the [Quickstart](https://cloud.google.com/bigtable/docs/quickstart) up to step 4 where you launch the HBase shell.
+
+### 3. Create a Table
+
+For best results, use the pre-splitting strategy recommended in [HBASE-4163](https://issues.apache.org/jira/browse/HBASE-4163):
+
+```
+hbase(main):001:0> n_splits = 200 # HBase recommends (10 * number of regionservers)
+hbase(main):002:0> create 'usertable', 'cf', {SPLITS => (1..n_splits).map {|i| "user#{1000+i*(9999-1000)/n_splits}"}}
+```
+
+Make a note of the column family, in this example it's `cf``.
+
+### 4. Fetch the Proper ALPN Boot Jar
+
+The Bigtable protocol uses HTTP/2 which requires an ALPN protocol negotiation implementation. On JVM instantiation the implementation must be loaded before attempting to connect to the cluster. If you're using Java 7 or 8, use this [Jetty Version Table](http://www.eclipse.org/jetty/documentation/current/alpn-chapter.html#alpn-versions) to determine the version appropriate for your JVM. (ALPN is included in JDK 9+). Download the proper jar from [Maven](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.mortbay.jetty.alpn%22%20AND%20a%3A%22alpn-boot%22) somewhere on your system.
+
+### 5. Download the Bigtable Client Jar
+
+Download one of the `bigtable-hbase-1.#` jars from [Maven](http://search.maven.org/#search%7Cga%7C1%7Ccom.google.cloud.bigtable) to your host.
+
+### 6. Download JSON Credentials
+
+Follow these instructions for [Generating a JSON key](https://cloud.google.com/bigtable/docs/installing-hbase-shell#service-account) and save it to your host.
+
+### 7. Create or Edit hbase-site.xml
+
+If you have an existing HBase configuration directory with an `hbase-site.xml` file, edit the file as per below. If not, create a directory called `conf` under the `hbase10` directory. Create a file in the conf directory named `hbase-site.xml`. Provide the following settings in the XML file, making sure to replace the bracketed examples with the proper values from your Cloud console.
+
+```
+
+
+ hbase.client.connection.impl
+ com.google.cloud.bigtable.hbase1_0.BigtableConnection
+
+
+ google.bigtable.cluster.name
+ [YOUR-CLUSTER-ID]
+
+
+ google.bigtable.project.id
+ [YOUR-PROJECT-ID]
+
+
+ google.bigtable.zone.name
+ [YOUR-ZONE-NAME]
+
+
+ google.bigtable.auth.service.account.enable
+ true
+
+
+ google.bigtable.auth.json.keyfile
+ [PATH-TO-YOUR-KEY-FILE]
+
+
+```
+
+If you wish to try other API implementations (1.1.x or 1.2.x) change the `hbase.client.connection.impl` appropriately to match the JAR you downloaded.
+
+If you have an existing HBase config directory, make sure to add it to the class path via `-cp :`.
+
+### 8. Execute a Workload
+
+Switch to the root of the YCSB repo and choose the workload you want to run and `load` it first. With the CLI you must provide the column family, cluster properties and the ALPN jar to load.
+
+```
+bin/ycsb load hbase10 -p columnfamily=cf -cp -jvm-args='-Xbootclasspath/p:' -P workloads/workloada
+
+```
+
+The `load` step only executes inserts into the datastore. After loading data, run the same workload to mix reads with writes.
+
+```
+bin/ycsb run hbase10 -p columnfamily=cf -jvm-args='-Xbootclasspath/p:' -P workloads/workloada
+
+```
\ No newline at end of file
diff --git a/hbase10/pom.xml b/hbase10/pom.xml
index 3f6bec07..b3b53673 100644
--- a/hbase10/pom.xml
+++ b/hbase10/pom.xml
@@ -1,56 +1,56 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent/
hbase10-binding
HBase 1.0 DB Binding
org.apache.hbase
hbase-client
${hbase10.version}
com.yahoo.ycsb
core
${project.version}
provided
junit
junit
4.12
test
org.apache.hbase
hbase-testing-util
${hbase10.version}
test
diff --git a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
index a41c1987..da72f4f8 100644
--- a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
+++ b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
@@ -1,524 +1,538 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.measurements.Measurements;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* HBase 1.0 client for YCSB framework.
*
* A modified version of HBaseClient (which targets HBase v0.9) utilizing the
* HBase 1.0.0 API.
*
* This client also adds toggleable client-side buffering and configurable write
* durability.
*/
public class HBaseClient10 extends com.yahoo.ycsb.DB {
private Configuration config = HBaseConfiguration.create();
-
- // Must be an object for synchronization and tracking running thread counts.
- private static Integer threadCount = 0;
+
+ private static AtomicInteger threadCount = new AtomicInteger(0);
private boolean debug = false;
private String tableName = "";
+
+ /**
+ * A Cluster Connection instance that is shared by all running ycsb threads.
+ * Needs to be initialized late so we pick up command-line configs if any.
+ * To ensure one instance only in a multi-threaded context, guard access
+ * with a 'lock' object.
+ * @See #CONNECTION_LOCK.
+ */
private static Connection connection = null;
+ private static final Object CONNECTION_LOCK = new Object();
// Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
private Table currentTable = null;
private BufferedMutator bufferedMutator = null;
private String columnFamily = "";
private byte[] columnFamilyBytes;
/**
* Durability to use for puts and deletes.
*/
private Durability durability = Durability.USE_DEFAULT;
/** Whether or not a page filter should be used to limit scan length. */
private boolean usePageFilter = true;
/**
* If true, buffer mutations on the client. This is the default behavior for
* HBaseClient. For measuring insert/update/delete latencies, client side
* buffering should be disabled.
*/
private boolean clientSideBuffering = false;
private long writeBufferSize = 1024 * 1024 * 12;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
if ("true"
.equals(getProperties().getProperty("clientbuffering", "false"))) {
this.clientSideBuffering = true;
}
if (getProperties().containsKey("writebuffersize")) {
writeBufferSize =
Long.parseLong(getProperties().getProperty("writebuffersize"));
}
if (getProperties().getProperty("durability") != null) {
this.durability =
Durability.valueOf(getProperties().getProperty("durability"));
}
if ("kerberos".equalsIgnoreCase(config.get("hbase.security.authentication"))) {
config.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(config);
}
- if ((getProperties().getProperty("principal")!=null)
+ if ((getProperties().getProperty("principal")!=null)
&& (getProperties().getProperty("keytab")!=null)) {
try {
- UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"),
+ UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"),
getProperties().getProperty("keytab"));
} catch (IOException e) {
System.err.println("Keytab file is not readable or not found");
throw new DBException(e);
}
}
try {
- synchronized(threadCount) {
- ++threadCount;
+ threadCount.getAndIncrement();
+ synchronized (CONNECTION_LOCK) {
if (connection == null) {
+ // Initialize if not set up already.
connection = ConnectionFactory.createConnection(config);
}
}
} catch (java.io.IOException e) {
throw new DBException(e);
}
if ((getProperties().getProperty("debug") != null)
&& (getProperties().getProperty("debug").compareTo("true") == 0)) {
debug = true;
}
if ("false"
.equals(getProperties().getProperty("hbase.usepagefilter", "true"))) {
usePageFilter = false;
}
columnFamily = getProperties().getProperty("columnfamily");
if (columnFamily == null) {
System.err.println("Error, must specify a columnfamily for HBase table");
throw new DBException("No columnfamily specified");
}
columnFamilyBytes = Bytes.toBytes(columnFamily);
// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
try {
final TableName tName = TableName.valueOf(table);
- connection.getTable(tName).getTableDescriptor();
+ synchronized (CONNECTION_LOCK) {
+ connection.getTable(tName).getTableDescriptor();
+ }
} catch (IOException e) {
throw new DBException(e);
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public void cleanup() throws DBException {
// Get the measurements instance as this is the only client that should
// count clean up time like an update if client-side buffering is
// enabled.
Measurements measurements = Measurements.getMeasurements();
try {
long st = System.nanoTime();
if (bufferedMutator != null) {
bufferedMutator.close();
}
if (currentTable != null) {
currentTable.close();
}
long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000));
- synchronized(threadCount) {
- --threadCount;
- if (threadCount <= 0 && connection != null) {
- connection.close();
- connection = null;
+ threadCount.decrementAndGet();
+ if (threadCount.get() <= 0) {
+ // Means we are done so ok to shut down the Connection.
+ synchronized (CONNECTION_LOCK) {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
}
}
} catch (IOException e) {
throw new DBException(e);
}
}
public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table);
- this.currentTable = this.connection.getTable(tName);
- // suggestions from
- // http://ryantwopointoh.blogspot.com/2009/01/
- // performance-of-hbase-importing.html
- if (clientSideBuffering) {
- final BufferedMutatorParams p = new BufferedMutatorParams(tName);
- p.writeBufferSize(writeBufferSize);
- this.bufferedMutator = this.connection.getBufferedMutator(p);
+ synchronized (CONNECTION_LOCK) {
+ this.currentTable = connection.getTable(tName);
+ if (clientSideBuffering) {
+ final BufferedMutatorParams p = new BufferedMutatorParams(tName);
+ p.writeBufferSize(writeBufferSize);
+ this.bufferedMutator = connection.getBufferedMutator(p);
+ }
}
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public Status read(String table, String key, Set fields,
HashMap result) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
Result r = null;
try {
if (debug) {
System.out
.println("Doing read from HBase columnfamily " + columnFamily);
System.out.println("Doing read for key: " + key);
}
Get g = new Get(Bytes.toBytes(key));
if (fields == null) {
g.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
g.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
r = currentTable.get(g);
} catch (IOException e) {
if (debug) {
System.err.println("Error doing get: " + e);
}
return Status.ERROR;
} catch (ConcurrentModificationException e) {
// do nothing for now...need to understand HBase concurrency model better
return Status.ERROR;
}
if (r.isEmpty()) {
return Status.NOT_FOUND;
}
while (r.advance()) {
final Cell c = r.current();
result.put(Bytes.toString(CellUtil.cloneQualifier(c)),
new ByteArrayByteIterator(CellUtil.cloneValue(c)));
if (debug) {
System.out.println(
"Result for field: " + Bytes.toString(CellUtil.cloneQualifier(c))
+ " is: " + Bytes.toString(CellUtil.cloneValue(c)));
}
}
return Status.OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status scan(String table, String startkey, int recordcount,
Set fields, Vector> result) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
Scan s = new Scan(Bytes.toBytes(startkey));
// HBase has no record limit. Here, assume recordcount is small enough to
// bring back in one call.
// We get back recordcount records
s.setCaching(recordcount);
if (this.usePageFilter) {
s.setFilter(new PageFilter(recordcount));
}
// add specified fields or else all fields
if (fields == null) {
s.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
s.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
// get results
ResultScanner scanner = null;
try {
scanner = currentTable.getScanner(s);
int numResults = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
// get row key
String key = Bytes.toString(rr.getRow());
if (debug) {
System.out.println("Got scan result for key: " + key);
}
HashMap rowResult =
new HashMap();
while (rr.advance()) {
final Cell cell = rr.current();
rowResult.put(Bytes.toString(CellUtil.cloneQualifier(cell)),
new ByteArrayByteIterator(CellUtil.cloneValue(cell)));
}
// add rowResult to result vector
result.add(rowResult);
numResults++;
// PageFilter does not guarantee that the number of results is <=
// pageSize, so this
// break is required.
if (numResults >= recordcount) {// if hit recordcount, bail out
break;
}
} // done with row
} catch (IOException e) {
if (debug) {
System.out.println("Error in getting/parsing scan result: " + e);
}
return Status.ERROR;
} finally {
if (scanner != null) {
scanner.close();
}
}
return Status.OK;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status update(String table, String key,
HashMap values) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
if (debug) {
System.out.println("Setting up put for key: " + key);
}
Put p = new Put(Bytes.toBytes(key));
p.setDurability(durability);
for (Map.Entry entry : values.entrySet()) {
byte[] value = entry.getValue().toArray();
if (debug) {
System.out.println("Adding field/value " + entry.getKey() + "/"
+ Bytes.toStringBinary(value) + " to put request");
}
p.addColumn(columnFamilyBytes, Bytes.toBytes(entry.getKey()), value);
}
try {
if (clientSideBuffering) {
Preconditions.checkNotNull(bufferedMutator);
bufferedMutator.mutate(p);
} else {
currentTable.put(p);
}
} catch (IOException e) {
if (debug) {
System.err.println("Error doing put: " + e);
}
return Status.ERROR;
} catch (ConcurrentModificationException e) {
// do nothing for now...hope this is rare
return Status.ERROR;
}
return Status.OK;
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status insert(String table, String key,
HashMap values) {
return update(table, key, values);
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status delete(String table, String key) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
if (debug) {
System.out.println("Doing delete for key: " + key);
}
final Delete d = new Delete(Bytes.toBytes(key));
d.setDurability(durability);
try {
if (clientSideBuffering) {
Preconditions.checkNotNull(bufferedMutator);
bufferedMutator.mutate(d);
} else {
currentTable.delete(d);
}
} catch (IOException e) {
if (debug) {
System.err.println("Error doing delete: " + e);
}
return Status.ERROR;
}
return Status.OK;
}
@VisibleForTesting
void setConfiguration(final Configuration newConfig) {
this.config = newConfig;
}
}
/*
* For customized vim control set autoindent set si set shiftwidth=4
*/
diff --git a/hypertable/pom.xml b/hypertable/pom.xml
index 6c457f72..991b660b 100644
--- a/hypertable/pom.xml
+++ b/hypertable/pom.xml
@@ -1,85 +1,85 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
hypertable-binding
Hypertable DB Binding
jar
false
com.yahoo.ycsb
core
${project.version}
provided
org.apache.thrift
libthrift
${thrift.version}
org.hypertable
hypertable
${hypertable.version}
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
true
../checkstyle.xml
true
true
validate
validate
checkstyle
clojars.org
http://clojars.org/repo
diff --git a/infinispan/pom.xml b/infinispan/pom.xml
index 943b1937..4b3fad03 100644
--- a/infinispan/pom.xml
+++ b/infinispan/pom.xml
@@ -1,78 +1,78 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
infinispan-binding
Infinispan DB Binding
jar
false
org.infinispan
infinispan-client-hotrod
${infinispan.version}
org.infinispan
infinispan-core
${infinispan.version}
com.yahoo.ycsb
core
${project.version}
provided
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
true
../checkstyle.xml
true
true
validate
validate
checkstyle
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 67770336..6ad5bc40 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -1,57 +1,57 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
jdbc-binding
JDBC DB Binding
jar
org.apache.openjpa
openjpa-jdbc
${openjpa.jdbc.version}
com.yahoo.ycsb
core
${project.version}
provided
junit
junit
4.12
test
org.hsqldb
hsqldb
2.3.3
test
diff --git a/kudu/README.md b/kudu/README.md
index cd5cffd6..e1f2b286 100644
--- a/kudu/README.md
+++ b/kudu/README.md
@@ -1,44 +1,56 @@
# Kudu bindings for YCSB
[Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data.
## Benchmarking Kudu
Use the following command line to load the initial data into an existing Kudu cluster with default
configurations.
```
bin/ycsb load kudu -P workloads/workloada
```
Additional configurations:
* `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost.
* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default
uses 4 tablets. A good rule of thumb is to use 5 per tablet server.
* `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should
only be configured to use 1 instead, for single node tests.
* `kudu_sync_ops`: If the client should wait after every write operation. The default is true.
* `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes.
Then, you can run the workload:
```
bin/ycsb run kudu -P workloads/workloada
```
+
+## Using a previous client version
+
+If you wish to use a different Kudu client version than the one shipped with YCSB, you can specify on the
+command line with `-Dkudu.version=x`. For example:
+
+```
+mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=0.7.1
+```
+
+Note that prior to 1.0, Kudu doesn't guarantee wire or API compability between versions and only the latest
+one is officially supported.
diff --git a/kudu/pom.xml b/kudu/pom.xml
index f0d3088c..ec6d5d65 100644
--- a/kudu/pom.xml
+++ b/kudu/pom.xml
@@ -1,58 +1,58 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
kudu-binding
Kudu DB Binding
jar
org.kududb
kudu-client
${kudu.version}
com.yahoo.ycsb
core
${project.version}
provided
true
false
cloudera-repo
Cloudera Releases
https://repository.cloudera.com/artifactory/cloudera-repos
diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
index 503c574a..05757b41 100644
--- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
+++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
@@ -1,348 +1,351 @@
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.stumbleupon.async.TimeoutException;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.workloads.CoreWorkload;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.client.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import static org.kududb.Type.STRING;
/**
* Kudu client for YCSB framework. Example to load:
*
*
*
* $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
*
*
*
* Example to run:
*
*
*
* ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
*
*
*
*
*/
public class KuduYCSBClient extends com.yahoo.ycsb.DB {
public static final String KEY = "key";
public static final Status TIMEOUT =
new Status("TIMEOUT", "The operation timed out.");
public static final int MAX_TABLETS = 9000;
public static final long DEFAULT_SLEEP = 60000;
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String DEBUG_OPT = "kudu_debug";
private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors";
private static final String PRE_SPLIT_NUM_TABLETS_OPT =
"kudu_pre_split_num_tablets";
private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final int BLOCK_SIZE_DEFAULT = 4096;
private static final List COLUMN_NAMES = new ArrayList();
private static KuduClient client;
private static Schema schema;
private static int fieldCount;
private boolean debug = false;
private boolean printErrors = false;
private String tableName;
private KuduSession session;
private KuduTable kuduTable;
@Override
public void init() throws DBException {
if (getProperties().getProperty(DEBUG_OPT) != null) {
this.debug = getProperties().getProperty(DEBUG_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table;
initClient(debug, tableName, getProperties());
this.session = client.newSession();
if (getProperties().getProperty(SYNC_OPS_OPT) != null
&& getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
this.session.setMutationBufferSpace(100);
} else {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
}
try {
this.kuduTable = client.openTable(tableName);
} catch (Exception e) {
throw new DBException("Could not open a table because of:", e);
}
}
private static synchronized void initClient(boolean debug, String tableName,
Properties prop) throws DBException {
if (client != null) {
return;
}
String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT);
if (masterAddresses == null) {
masterAddresses = "localhost:7051";
}
int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
throw new DBException("Specified number of tablets (" + numTablets
+ ") must be equal " + "or below " + MAX_TABLETS);
}
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
client = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP).build();
if (debug) {
System.out.println("Connecting to the masters at " + masterAddresses);
}
fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
List columns = new ArrayList(fieldCount + 1);
ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
.key(true).desiredBlockSize(blockSize).build();
columns.add(keyColumn);
COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
.desiredBlockSize(blockSize).build());
}
schema = new Schema(columns);
CreateTableOptions builder = new CreateTableOptions();
builder.setNumReplicas(numReplicas);
// create n-1 split keys, which will end up being n tablets master-side
for (int i = 1; i < numTablets + 0; i++) {
// We do +1000 since YCSB starts at user1.
int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000;
String startKey = String.format("%04d", startKeyInt);
PartialRow splitRow = schema.newPartialRow();
splitRow.addString(0, "user" + startKey);
builder.addSplitRow(splitRow);
}
try {
client.createTable(tableName, schema, builder);
} catch (Exception e) {
if (!e.getMessage().contains("ALREADY_PRESENT")) {
throw new DBException("Couldn't create the table", e);
}
}
}
private static int getIntFromProp(Properties prop, String propName,
int defaultValue) throws DBException {
String intStr = prop.getProperty(propName);
if (intStr == null) {
return defaultValue;
} else {
try {
return Integer.valueOf(intStr);
} catch (NumberFormatException ex) {
throw new DBException(
"Provided number for " + propName + " isn't a valid integer");
}
}
}
@Override
public void cleanup() throws DBException {
try {
this.session.close();
} catch (Exception e) {
throw new DBException("Couldn't cleanup the session", e);
}
}
@Override
public Status read(String table, String key, Set fields,
HashMap result) {
Vector> results =
new Vector>();
final Status status = scan(table, key, 1, fields, results);
if (!status.equals(Status.OK)) {
return status;
}
if (results.size() != 1) {
return Status.NOT_FOUND;
}
result.putAll(results.firstElement());
return Status.OK;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set fields, Vector> result) {
try {
KuduScanner.KuduScannerBuilder scannerBuilder =
client.newScannerBuilder(this.kuduTable);
List querySchema;
if (fields == null) {
querySchema = COLUMN_NAMES;
// No need to set the projected columns with the whole schema.
} else {
querySchema = new ArrayList(fields);
scannerBuilder.setProjectedColumnNames(querySchema);
}
- PartialRow lowerBound = schema.newPartialRow();
- lowerBound.addString(0, startkey);
- scannerBuilder.lowerBound(lowerBound);
+ KuduPredicate.ComparisonOp comparisonOp;
if (recordcount == 1) {
- PartialRow upperBound = schema.newPartialRow();
- // Keys are fixed length, just adding something at the end is safe.
- upperBound.addString(0, startkey.concat(" "));
- scannerBuilder.exclusiveUpperBound(upperBound);
+ comparisonOp = KuduPredicate.ComparisonOp.EQUAL;
+ } else {
+ comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL;
}
+ KuduPredicate keyPredicate = KuduPredicate.newComparisonPredicate(
+ schema.getColumnByIndex(0),
+ comparisonOp,
+ startkey);
- KuduScanner scanner = scannerBuilder.limit(recordcount) // currently noop
+ KuduScanner scanner = scannerBuilder
+ .addPredicate(keyPredicate)
+ .limit(recordcount) // currently noop
.build();
while (scanner.hasMoreRows()) {
RowResultIterator data = scanner.nextRows();
addAllRowsToResult(data, recordcount, querySchema, result);
if (recordcount == result.size()) {
break;
}
}
RowResultIterator closer = scanner.close();
addAllRowsToResult(closer, recordcount, querySchema, result);
} catch (TimeoutException te) {
if (printErrors) {
System.err.println(
"Waited too long for a scan operation with start key=" + startkey);
}
return TIMEOUT;
} catch (Exception e) {
System.err.println("Unexpected exception " + e);
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
private void addAllRowsToResult(RowResultIterator it, int recordcount,
List querySchema, Vector> result)
throws Exception {
RowResult row;
HashMap rowResult =
new HashMap(querySchema.size());
if (it == null) {
return;
}
while (it.hasNext()) {
if (result.size() == recordcount) {
return;
}
row = it.next();
int colIdx = 0;
for (String col : querySchema) {
rowResult.put(col, new StringByteIterator(row.getString(colIdx)));
colIdx++;
}
result.add(rowResult);
}
}
@Override
public Status update(String table, String key,
HashMap values) {
Update update = this.kuduTable.newUpdate();
PartialRow row = update.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
String columnName = schema.getColumnByIndex(i).getName();
if (values.containsKey(columnName)) {
String value = values.get(columnName).toString();
row.addString(columnName, value);
}
}
apply(update);
return Status.OK;
}
@Override
public Status insert(String table, String key,
HashMap values) {
Insert insert = this.kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
row.addString(i, new String(
values.get(schema.getColumnByIndex(i).getName()).toArray()));
}
apply(insert);
return Status.OK;
}
@Override
public Status delete(String table, String key) {
Delete delete = this.kuduTable.newDelete();
PartialRow row = delete.getRow();
row.addString(KEY, key);
apply(delete);
return Status.OK;
}
private void apply(Operation op) {
try {
OperationResponse response = session.apply(op);
if (response != null && response.hasRowError() && printErrors) {
System.err.println("Got a row error " + response.getRowError());
}
} catch (Exception ex) {
if (printErrors) {
System.err.println("Failed to apply an operation " + ex.toString());
ex.printStackTrace();
}
}
}
}
diff --git a/mapkeeper/pom.xml b/mapkeeper/pom.xml
index 6fac3795..5ac0a611 100644
--- a/mapkeeper/pom.xml
+++ b/mapkeeper/pom.xml
@@ -1,52 +1,52 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
mapkeeper-binding
Mapkeeper DB Binding
jar
com.yahoo.mapkeeper
mapkeeper
${mapkeeper.version}
com.yahoo.ycsb
core
${project.version}
provided
mapkeeper-releases
https://raw.github.com/m1ch1/m1ch1-mvn-repo/master/releases
diff --git a/memcached/pom.xml b/memcached/pom.xml
index 10bcbbe9..9c6843fe 100644
--- a/memcached/pom.xml
+++ b/memcached/pom.xml
@@ -1,78 +1,78 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
memcached-binding
memcached binding
jar
log4j
log4j
1.2.17
com.yahoo.ycsb
core
${project.version}
org.codehaus.jackson
jackson-mapper-asl
1.9.13
net.spy
spymemcached
2.11.4
org.apache.maven.plugins
maven-assembly-plugin
${maven.assembly.version}
jar-with-dependencies
false
package
single
diff --git a/mongodb/pom.xml b/mongodb/pom.xml
index f510d19a..8d9dcf18 100644
--- a/mongodb/pom.xml
+++ b/mongodb/pom.xml
@@ -1,82 +1,82 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
mongodb-binding
MongoDB Binding
jar
org.mongodb
mongo-java-driver
${mongodb.version}
com.allanbank
mongodb-async-driver
${mongodb.async.version}
com.yahoo.ycsb
core
${project.version}
provided
ch.qos.logback
logback-classic
1.1.2
runtime
junit
junit
4.12
test
true
always
warn
false
never
fail
allanbank
Allanbank Releases
http://www.allanbank.com/repo/
default
diff --git a/nosqldb/pom.xml b/nosqldb/pom.xml
index e2823102..e11a2895 100644
--- a/nosqldb/pom.xml
+++ b/nosqldb/pom.xml
@@ -1,45 +1,45 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
nosqldb-binding
Oracle NoSQL Database Binding
jar
com.oracle.kv
oracle-nosql-client
3.0.5
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/orientdb/pom.xml b/orientdb/pom.xml
index db83a942..2f7e3c80 100644
--- a/orientdb/pom.xml
+++ b/orientdb/pom.xml
@@ -1,62 +1,62 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
orientdb-binding
OrientDB Binding
jar
sonatype-nexus-snapshots
Sonatype Nexus Snapshots
https://oss.sonatype.org/content/repositories/snapshots
com.yahoo.ycsb
core
${project.version}
provided
com.orientechnologies
orientdb-client
${orientdb.version}
junit
junit
4.12
test
org.slf4j
slf4j-log4j12
1.7.10
diff --git a/pom.xml b/pom.xml
index 14cd0885..0a6a072a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,171 +1,179 @@
4.0.0
com.yahoo.ycsb
root
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
pom
YCSB Root
This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure.
scm:git:git://github.com/brianfrankcooper/YCSB.git
master
https://github.com/brianfrankcooper/YCSB
checkstyle
checkstyle
5.0
org.jdom
jdom
1.1
com.google.collections
google-collections
1.0
org.slf4j
slf4j-api
1.6.4
2.5.5
2.10
+ 1.7.1
0.94.27
0.98.14-hadoop2
1.0.2
1.6.0
1.2.9
1.0.3
3.0.0
- 1.0.0-incubating.M1
+ 1.0.0-incubating.M2
+ 0.2.3
7.2.2.Final
- 0.6.0
+ 0.8.0
2.1.1
3.0.3
2.0.1
2.1.8
2.0.0
1.10.20
0.81
UTF-8
0.8.0
0.9.5.6
1.4.10
+ 2.2.6
1.6.5
+ 2.0.5
3.1.2
5.4.0
core
binding-parent
accumulo
aerospike
+ asynchbase
cassandra
cassandra2
couchbase
+ couchbase2
distribution
dynamodb
elasticsearch
geode
+ googlebigtable
googledatastore
hbase094
hbase098
hbase10
hypertable
infinispan
jdbc
kudu
memcached
mongodb
nosqldb
orientdb
rados
redis
+ riak
s3
solr
tarantool
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
org.apache.maven.plugins
maven-compiler-plugin
3.3
1.7
1.7
org.apache.maven.plugins
maven-checkstyle-plugin
validate
validate
check
checkstyle.xml
diff --git a/redis/pom.xml b/redis/pom.xml
index 65dbc50b..f05d5c09 100644
--- a/redis/pom.xml
+++ b/redis/pom.xml
@@ -1,45 +1,45 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
redis-binding
Redis DB Binding
jar
redis.clients
jedis
${redis.version}
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/riak/README.md b/riak/README.md
new file mode 100644
index 00000000..58bd6d48
--- /dev/null
+++ b/riak/README.md
@@ -0,0 +1,92 @@
+
+
+Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
+=======================================================
+
+The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database.
+
+Creating a bucket-type to use with YCSB
+----------------------------
+
+Perform the following operations on your Riak cluster to configure it for the benchmarks.
+
+Set the default backend for Riak to LevelDB in the `riak.conf` file of every node of your cluster. This is required to support secondary indexes , which are used for the `scan` transactions. You can do this by modifying the proper line as shown below.
+
+```
+storage_backend = leveldb
+```
+After this, create a bucket type named "ycsb"[1](#f1) by logging into one of the nodes in your cluster. Now you're ready to set up the cluster to operate using one between strong and eventual consistency model as shown in the next two subsections.
+
+###Strong consistency model
+
+To use the strong consistency model (default), you need to follow the next two steps.
+
+1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this before you start your cluster !
+2. Run the following `riak-admin` commands:
+
+ ```
+ riak-admin bucket-type create ycsb '{"props":{"consistent":true}}'
+ riak-admin bucket-type activate ycsb
+ ```
+
+When using this model, you **may want to specify the number of replicas to create for each object**[2](#f2) : the *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
+
+####A note on the scan transactions
+Currently, `scan` transactions are not _directly_ supported, as there is no suitable mean to perform them properly. This will not cause the benchmark to fail, it simply won't perform any scan transaction at all (these will immediately return with a `Status.NOT_IMPLEMENTED` code).
+
+However, a possible workaround has been provided: considering that Riak doesn't allow strong-consistent bucket-types to use secondary indexes, we can create an eventually consistent one just to store (*key*, *2i indexes*) pairs. This will be later used only to obtain the keys where the objects are located, which will be then used to retrieve the actual objects from the strong-consistent bucket. If you want to use this workaround, then you have to create and activate a "_fake bucket-type_" using the following commands:
+```
+riak-admin bucket-type create fakeBucketType '{"props":{"allow_mult":"false","n_val":1,"dvv_enabled":false,"last_write_wins":true}}'
+riak-admin bucket-type activate fakeBucketType
+```
+A bucket-type so defined isn't allowed to _create siblings_ (`allow_mult":"false"`), it'll have just _one replica_ (`"n_val":1`) which'll store the _last value provided_ (`"last_write_wins":true`) and _vector clocks_ will be used instead of _dotted version vectors_ (`"dvv_enabled":false`). Note that setting `"n_val":1` means that the `scan` transactions won't be much *fault-tolerant*, considering that if a node fails then a lot of them could potentially fail. You may indeed increase this value, but this choice will necessarily load the cluster with more work. So, the choice is yours to make!
+Then you have to set the `riak.strong_consistent_scans_bucket_type` property (see next section) equal to the name you gave to the aforementioned "fake bucket-type" (e.g. `fakeBucketType` in this case).
+
+Please note that this workaround involves a **double store operation for each insert transaction**, one to store the actual object and another one to save the corresponding 2i index. In practice, the client won't notice any difference, as the latter operation is performed asynchronously. However, the cluster will be obviously loaded more, and this is why the proposed "fake bucket-type" to create is as less _resource-demanding_ as possible.
+
+###Eventual consistency model
+
+If you want to use the eventual consistency model implemented in Riak, you have just to type:
+```
+riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
+riak-admin bucket-type activate ycsb
+```
+
+Riak KV configuration parameters
+----------------------------
+You can either specify these configuration parameters via command line or set them in the `riak.properties` file.
+
+* `riak.hosts` - string list , comma separated list of IPs or FQDNs. For example: `riak.hosts=127.0.0.1,127.0.0.2,127.0.0.3` or `riak.hosts=riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com`.
+* `riak.port` - int , the port on which every node is listening. It must match the one specified in the `riak.conf` file at the line `listener.protobuf.internal`.
+* `riak.bucket_type` - string , it must match the name of the bucket type created during setup (see section above).
+* `riak.r_val` - int , this value represents the number of Riak nodes that must return results for a read operation before the transaction is considered successfully completed.
+* `riak.w_val` - int , this value represents the number of Riak nodes that must report success before an insert/update transaction is considered complete.
+* `riak.read_retry_count` - int , the number of times the client will try to read a key from Riak.
+* `riak.wait_time_before_retry` - int , the time (in milliseconds) before the client attempts to perform another read if the previous one failed.
+* `riak.transaction_time_limit` - int , the time (in seconds) the client waits before aborting the current transaction.
+* `riak.strong_consistency` - boolean , indicates whether to use *strong consistency* (true) or *eventual consistency* (false).
+* `riak.strong_consistent_scans_bucket_type` - **string**, indicates the bucket-type to use to allow scans transactions when using strong consistency mode.
+* `riak.debug` - boolean , enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur.
+
+Note : For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
+
+1 As specified in the `riak.properties` file. See parameters configuration section for further info. [↩](#a1)
+
+2 More info about properly setting up a fault-tolerant cluster can be found at http://docs.basho.com/riak/kv/2.1.4/configuring/strong-consistency/#enabling-strong-consistency.[↩](#a2)
+
diff --git a/jdbc/pom.xml b/riak/pom.xml
similarity index 64%
copy from jdbc/pom.xml
copy to riak/pom.xml
index 67770336..4cb303c0 100644
--- a/jdbc/pom.xml
+++ b/riak/pom.xml
@@ -1,57 +1,59 @@
-
-
+
4.0.0
+
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
-
- jdbc-binding
- JDBC DB Binding
+
+ riak-binding
+ Riak KV Binding
jar
- org.apache.openjpa
- openjpa-jdbc
- ${openjpa.jdbc.version}
+ com.basho.riak
+ riak-client
+ 2.0.5
com.yahoo.ycsb
core
${project.version}
provided
+
+ com.google.collections
+ google-collections
+ 1.0
+
junit
junit
4.12
test
-
- org.hsqldb
- hsqldb
- 2.3.3
- test
-
-
+
+
\ No newline at end of file
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java
new file mode 100644
index 00000000..42c3e90e
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java
@@ -0,0 +1,594 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
+import com.basho.riak.client.api.commands.kv.StoreValue;
+import com.basho.riak.client.api.commands.kv.UpdateValue;
+import com.basho.riak.client.core.RiakFuture;
+import com.basho.riak.client.core.query.RiakObject;
+import com.basho.riak.client.core.query.indexes.LongIntIndex;
+import com.basho.riak.client.core.util.BinaryValue;
+import com.yahoo.ycsb.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.basho.riak.client.api.RiakClient;
+import com.basho.riak.client.api.cap.Quorum;
+import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
+import com.basho.riak.client.api.commands.kv.DeleteValue;
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.basho.riak.client.core.RiakCluster;
+import com.basho.riak.client.core.RiakNode;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+
+import static com.yahoo.ycsb.db.riak.RiakUtils.createResultHashMap;
+import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong;
+import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
+
+/**
+ * Riak KV 2.x.y client for YCSB framework.
+ *
+ */
+public class RiakKVClient extends DB {
+ private static final String HOST_PROPERTY = "riak.hosts";
+ private static final String PORT_PROPERTY = "riak.port";
+ private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type";
+ private static final String R_VALUE_PROPERTY = "riak.r_val";
+ private static final String W_VALUE_PROPERTY = "riak.w_val";
+ private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count";
+ private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry";
+ private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit";
+ private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency";
+ private static final String STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY = "riak.strong_consistent_scans_bucket_type";
+ private static final String DEBUG_PROPERTY = "riak.debug";
+
+ private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time.");
+
+ private String[] hosts;
+ private int port;
+ private String bucketType;
+ private String bucketType2i;
+ private Quorum rvalue;
+ private Quorum wvalue;
+ private int readRetryCount;
+ private int waitTimeBeforeRetry;
+ private int transactionTimeLimit;
+ private boolean strongConsistency;
+ private String strongConsistentScansBucketType;
+ private boolean performStrongConsistentScans;
+ private boolean debug;
+
+ private RiakClient riakClient;
+ private RiakCluster riakCluster;
+
+ private void loadDefaultProperties() {
+ InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties");
+ Properties propsPF = new Properties(System.getProperties());
+
+ try {
+ propsPF.load(propFile);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ hosts = propsPF.getProperty(HOST_PROPERTY).split(",");
+ port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY));
+ bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY);
+ rvalue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY)));
+ wvalue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY)));
+ readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY));
+ waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
+ transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
+ strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY));
+ strongConsistentScansBucketType = propsPF.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
+ debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY));
+ }
+
+ private void loadProperties() {
+ // First, load the default properties...
+ loadDefaultProperties();
+
+ // ...then, check for some props set at command line!
+ Properties props = getProperties();
+
+ String portString = props.getProperty(PORT_PROPERTY);
+ if (portString != null) {
+ port = Integer.parseInt(portString);
+ }
+
+ String hostsString = props.getProperty(HOST_PROPERTY);
+ if (hostsString != null) {
+ hosts = hostsString.split(",");
+ }
+
+ String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY);
+ if (bucketTypeString != null) {
+ bucketType = bucketTypeString;
+ }
+
+ String rValueString = props.getProperty(R_VALUE_PROPERTY);
+ if (rValueString != null) {
+ rvalue = new Quorum(Integer.parseInt(rValueString));
+ }
+
+ String wValueString = props.getProperty(W_VALUE_PROPERTY);
+ if (wValueString != null) {
+ wvalue = new Quorum(Integer.parseInt(wValueString));
+ }
+
+ String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY);
+ if (readRetryCountString != null) {
+ readRetryCount = Integer.parseInt(readRetryCountString);
+ }
+
+ String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY);
+ if (waitTimeBeforeRetryString != null) {
+ waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString);
+ }
+
+ String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY);
+ if (transactionTimeLimitString != null) {
+ transactionTimeLimit = Integer.parseInt(transactionTimeLimitString);
+ }
+
+ String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY);
+ if (strongConsistencyString != null) {
+ strongConsistency = Boolean.parseBoolean(strongConsistencyString);
+ }
+
+ String strongConsistentScansBucketTypeString = props.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
+ if (strongConsistentScansBucketTypeString != null) {
+ strongConsistentScansBucketType = strongConsistentScansBucketTypeString;
+ }
+
+ String debugString = props.getProperty(DEBUG_PROPERTY);
+ if (debugString != null) {
+ debug = Boolean.parseBoolean(debugString);
+ }
+ }
+
+ public void init() throws DBException {
+ loadProperties();
+
+ RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
+ List nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
+ riakCluster = new RiakCluster.Builder(nodes).build();
+
+ try {
+ riakCluster.start();
+ riakClient = new RiakClient(riakCluster);
+ } catch (Exception e) {
+ System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
+ throw new DBException(e);
+ }
+
+ // If strong consistency is in use, we need to change the bucket-type where the 2i indexes will be stored.
+ if (strongConsistency && !strongConsistentScansBucketType.isEmpty()) {
+ // The 2i indexes have to be stored in the appositely created strongConsistentScansBucketType: this however has
+ // to be done only if the user actually created it! So, if the latter doesn't exist, then the scan transactions
+ // will not be performed at all.
+ bucketType2i = strongConsistentScansBucketType;
+ performStrongConsistentScans = true;
+ } else {
+ // If instead eventual consistency is in use, then the 2i indexes have to be stored in the bucket-type
+ // indicated with the bucketType variable.
+ bucketType2i = bucketType;
+ performStrongConsistentScans = false;
+ }
+
+ if (debug) {
+ System.err.println("DEBUG ENABLED. Configuration parameters:");
+ System.err.println("-----------------------------------------");
+ System.err.println("Hosts: " + Arrays.toString(hosts));
+ System.err.println("Port: " + port);
+ System.err.println("Bucket Type: " + bucketType);
+ System.err.println("R Val: " + rvalue.toString());
+ System.err.println("W Val: " + wvalue.toString());
+ System.err.println("Read Retry Count: " + readRetryCount);
+ System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
+ System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s");
+ System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
+
+ if (strongConsistency) {
+ System.err.println("Strong Consistent Scan Transactions " + (performStrongConsistentScans ? "" : "NOT ") +
+ "allowed.");
+ }
+ }
+ }
+
+ /**
+ * Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to read.
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A HashMap of field/value pairs for the result
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status read(String table, String key, Set fields, HashMap result) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build();
+ FetchValue.Response response;
+
+ try {
+ response = fetch(fv);
+
+ if (response.isNotFound()) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: NOT FOUND");
+ }
+
+ return Status.NOT_FOUND;
+ }
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ // Create the result HashMap.
+ createResultHashMap(fields, response, result);
+
+ return Status.OK;
+ }
+
+ /**
+ * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
+ * a HashMap.
+ * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param startkey The record key of the first record to read.
+ * @param recordcount The number of records to read
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status scan(String table, String startkey, int recordcount, Set fields,
+ Vector> result) {
+ if (strongConsistency && !performStrongConsistentScans) {
+ return Status.NOT_IMPLEMENTED;
+ }
+
+ // The strong consistent bucket-type is not capable of storing 2i indexes. So, we need to read them from the fake
+ // one (which we use only to store indexes). This is why, when using such a consistency model, the bucketType2i
+ // variable is set to FAKE_BUCKET_TYPE.
+ IntIndexQuery iiq = new IntIndexQuery
+ .Builder(new Namespace(bucketType2i, table), "key", getKeyAsLong(startkey), Long.MAX_VALUE)
+ .withMaxResults(recordcount)
+ .withPaginationSort(true)
+ .build();
+
+ Location location;
+ RiakFuture future = riakClient.executeAsync(iiq);
+
+ try {
+ IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ List entries = response.getEntries();
+
+ // If no entries were retrieved, then something bad happened...
+ if (entries.size() == 0) {
+ if (debug) {
+ System.err.println("Unable to scan any record starting from key " + startkey + ", aborting transaction. " +
+ "Reason: NOT FOUND");
+ }
+
+ return Status.NOT_FOUND;
+ }
+
+ for (IntIndexQuery.Response.Entry entry : entries) {
+ // If strong consistency is in use, then the actual location of the object we want to read is obtained by
+ // fetching the key from the one retrieved with the 2i indexes search operation.
+ if (strongConsistency) {
+ location = new Location(new Namespace(bucketType, table), entry.getRiakObjectLocation().getKeyAsString());
+ } else {
+ location = entry.getRiakObjectLocation();
+ }
+
+ FetchValue fv = new FetchValue.Builder(location)
+ .withOption(FetchValue.Option.R, rvalue)
+ .build();
+
+ FetchValue.Response keyResponse = fetch(fv);
+
+ if (keyResponse.isNotFound()) {
+ if (debug) {
+ System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
+ "transaction. Reason: NOT FOUND");
+ }
+
+ return Status.NOT_FOUND;
+ }
+
+ // Create the partial result to add to the result vector.
+ HashMap partialResult = new HashMap<>();
+ createResultHashMap(fields, keyResponse, partialResult);
+ result.add(partialResult);
+ }
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
+ "transaction. Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
+ "Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ /**
+ * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated,
+ * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the
+ * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after
+ * transactionTimeLimit, the transaction is discarded immediately.
+ *
+ * @param fv The value to fetch from the cluster.
+ */
+ private FetchValue.Response fetch(FetchValue fv) throws TimeoutException {
+ FetchValue.Response response = null;
+
+ for (int i = 0; i < readRetryCount; i++) {
+ RiakFuture future = riakClient.executeAsync(fv);
+
+ try {
+ response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+
+ if (!response.isNotFound()) {
+ break;
+ }
+ } catch (TimeoutException e) {
+ // Let the callee decide how to handle this exception...
+ throw new TimeoutException();
+ } catch (Exception e) {
+ // Sleep for a few ms before retrying...
+ try {
+ Thread.sleep(waitTimeBeforeRetry);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return response;
+ }
+
+ /**
+ * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+ * record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key
+ * converted to long to be used for the scan operation.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to insert.
+ * @param values A HashMap of field/value pairs to insert in the record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status insert(String table, String key, HashMap values) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ RiakObject object = new RiakObject();
+
+ // Strong consistency doesn't support secondary indexing, but eventually consistent model does. So, we can mock a
+ // 2i usage by creating a fake object stored in an eventually consistent bucket-type with the SAME KEY THAT THE
+ // ACTUAL OBJECT HAS. This latter is obviously stored in the strong consistent bucket-type indicated with the
+ // riak.bucket_type property.
+ if (strongConsistency && performStrongConsistentScans) {
+ // Create a fake object to store in the default bucket-type just to keep track of the 2i indices.
+ Location fakeLocation = new Location(new Namespace(strongConsistentScansBucketType, table), key);
+
+ // Obviously, we want the fake object to contain as less data as possible. We can't create a void object, so
+ // we have to choose the minimum data size allowed: it is one byte.
+ RiakObject fakeObject = new RiakObject();
+ fakeObject.setValue(BinaryValue.create(new byte[]{0x00}));
+ fakeObject.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+
+ StoreValue fakeStore = new StoreValue.Builder(fakeObject)
+ .withLocation(fakeLocation)
+ .build();
+
+ // We don't mind whether the operation is finished or not, because waiting for it to complete would slow down the
+ // client and make our solution too heavy to be seen as a valid compromise. This will obviously mean that under
+ // heavy load conditions a scan operation could fail due to an unfinished "fakeStore".
+ riakClient.executeAsync(fakeStore);
+ } else if (!strongConsistency) {
+ // The next operation is useless when using strong consistency model, so it's ok to perform it only when using
+ // eventual consistency.
+ object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+ }
+
+ // Store proper values into the object.
+ object.setValue(BinaryValue.create(serializeTable(values)));
+
+ StoreValue store = new StoreValue.Builder(object)
+ .withOption(StoreValue.Option.W, wvalue)
+ .withLocation(location)
+ .build();
+
+ RiakFuture future = riakClient.executeAsync(store);
+
+ try {
+ future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to insert key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to insert key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ /**
+ * Auxiliary class needed for object substitution within the update operation. It is a fundamental part of the
+ * fetch-update (locally)-store cycle described by Basho to properly perform a strong-consistent update.
+ */
+ private static final class UpdateEntity extends UpdateValue.Update {
+ private final RiakObject object;
+
+ private UpdateEntity(RiakObject object) {
+ this.object = object;
+ }
+
+ //Simply returns the object.
+ @Override
+ public RiakObject apply(RiakObject original) {
+ return object;
+ }
+ }
+
+ /**
+ * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+ * record with the specified record key, overwriting any existing values with the same field name.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to write.
+ * @param values A HashMap of field/value pairs to update in the record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status update(String table, String key, HashMap values) {
+ // If eventual consistency model is in use, then an update operation is pratically equivalent to an insert one.
+ if (!strongConsistency) {
+ return insert(table, key, values);
+ }
+
+ Location location = new Location(new Namespace(bucketType, table), key);
+
+ UpdateValue update = new UpdateValue.Builder(location)
+ .withUpdate(new UpdateEntity(new RiakObject().setValue(BinaryValue.create(serializeTable(values)))))
+ .build();
+
+ RiakFuture future = riakClient.executeAsync(update);
+
+ try {
+ // For some reason, the update transaction doesn't throw any exception when no cluster has been started, so one
+ // needs to check whether it was done or not. When calling the wasUpdated() function with no nodes available, a
+ // NullPointerException is thrown.
+ // Moreover, such exception could be thrown when more threads are trying to update the same key or, more
+ // generally, when the system is being queried by many clients (i.e. overloaded). This is a known limitation of
+ // Riak KV's strong consistency implementation.
+ future.get(transactionTimeLimit, TimeUnit.SECONDS).wasUpdated();
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to update key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to update key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ /**
+ * Delete a record from the database.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to delete.
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status delete(String table, String key) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ DeleteValue dv = new DeleteValue.Builder(location).build();
+
+ RiakFuture future = riakClient.executeAsync(dv);
+
+ try {
+ future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to delete key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to delete key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ public void cleanup() throws DBException {
+ try {
+ riakCluster.shutdown();
+ } catch (Exception e) {
+ System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString());
+ throw new DBException(e);
+ }
+ }
+
+ /**
+ * Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency
+ * problem by disallowing the siblings creation. Moreover, it disables strong consistency, because we don't have
+ * the possibility to create a proper bucket-type to use to fake 2i indexes usage.
+ *
+ * @param bucket The bucket name.
+ * @throws Exception Thrown if something bad happens.
+ */
+ void setTestEnvironment(String bucket) throws Exception {
+ bucketType = "default";
+ bucketType2i = bucketType;
+ strongConsistency = false;
+
+ Namespace ns = new Namespace(bucketType, bucket);
+ StoreBucketProperties newBucketProperties = new StoreBucketProperties.Builder(ns).withAllowMulti(false).build();
+
+ riakClient.execute(newBucketProperties);
+ }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
new file mode 100644
index 00000000..59090fa1
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
@@ -0,0 +1,188 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Utility class for Riak KV Client.
+ *
+ */
+final class RiakUtils {
+
+ private RiakUtils() {
+ super();
+ }
+
+ private static byte[] toBytes(final int anInteger) {
+ byte[] aResult = new byte[4];
+
+ aResult[0] = (byte) (anInteger >> 24);
+ aResult[1] = (byte) (anInteger >> 16);
+ aResult[2] = (byte) (anInteger >> 8);
+ aResult[3] = (byte) (anInteger /* >> 0 */);
+
+ return aResult;
+ }
+
+ private static int fromBytes(final byte[] aByteArray) {
+ checkArgument(aByteArray.length == 4);
+
+ return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF);
+ }
+
+ private static void close(final OutputStream anOutputStream) {
+ try {
+ anOutputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void close(final InputStream anInputStream) {
+ try {
+ anInputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Serializes a Map, transforming the contained list of (String, ByteIterator) couples into a byte array.
+ *
+ * @param aTable A Map to serialize.
+ * @return A byte array containng the serialized table.
+ */
+ static byte[] serializeTable(Map aTable) {
+ final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
+ final Set> theEntries = aTable.entrySet();
+
+ try {
+ for (final Map.Entry anEntry : theEntries) {
+ final byte[] aColumnName = anEntry.getKey().getBytes();
+
+ anOutputStream.write(toBytes(aColumnName.length));
+ anOutputStream.write(aColumnName);
+
+ final byte[] aColumnValue = anEntry.getValue().toArray();
+
+ anOutputStream.write(toBytes(aColumnValue.length));
+ anOutputStream.write(aColumnValue);
+ }
+ return anOutputStream.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ close(anOutputStream);
+ }
+ }
+
+ /**
+ * Deserializes an input byte array, transforming it into a list of (String, ByteIterator) pairs (i.e. a Map).
+ *
+ * @param aValue A byte array containing the table to deserialize.
+ * @param theResult A Map containing the deserialized table.
+ */
+ private static void deserializeTable(final byte[] aValue, final Map theResult) {
+ final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
+ byte[] aSizeBuffer = new byte[4];
+
+ try {
+ while (anInputStream.available() > 0) {
+ anInputStream.read(aSizeBuffer);
+ final int aColumnNameLength = fromBytes(aSizeBuffer);
+
+ final byte[] aColumnNameBuffer = new byte[aColumnNameLength];
+ anInputStream.read(aColumnNameBuffer);
+
+ anInputStream.read(aSizeBuffer);
+ final int aColumnValueLength = fromBytes(aSizeBuffer);
+
+ final byte[] aColumnValue = new byte[aColumnValueLength];
+ anInputStream.read(aColumnValue);
+
+ theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue));
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ } finally {
+ close(anInputStream);
+ }
+ }
+
+ /**
+ * Obtains a Long number from a key string. This will be the key used by Riak for all the transactions.
+ *
+ * @param key The key to convert from String to Long.
+ * @return A Long number parsed from the key String.
+ */
+ static Long getKeyAsLong(String key) {
+ String keyString = key.replaceFirst("[a-zA-Z]*", "");
+
+ return Long.parseLong(keyString);
+ }
+
+ /**
+ * Function that retrieves all the fields searched within a read or scan operation and puts them in the result
+ * HashMap.
+ *
+ * @param fields The list of fields to read, or null for all of them.
+ * @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
+ * @param resultHashMap The HashMap to return as result.
+ */
+ static void createResultHashMap(Set fields, FetchValue.Response response,
+ HashMapresultHashMap) {
+ // If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
+ // value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
+ // The following line retrieves the previously serialized table which was store with an insert transaction.
+ byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
+
+ // Deserialize the stored response table.
+ HashMap deserializedTable = new HashMap<>();
+ deserializeTable(responseFieldsAndValues, deserializedTable);
+
+ // If only specific fields are requested, then only these should be put in the result object!
+ if (fields != null) {
+ // Populate the HashMap to provide as result.
+ for (Object field : fields.toArray()) {
+ // Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
+ // DOES NOT return a null value), then proceed to store the pair in the resultHashMap.
+ ByteIterator value = deserializedTable.get(field);
+
+ if (value != null) {
+ resultHashMap.put((String) field, value);
+ }
+ }
+ } else {
+ // If, instead, no field is specified, then all those retrieved must be provided as result.
+ for (String field : deserializedTable.keySet()) {
+ resultHashMap.put(field, deserializedTable.get(field));
+ }
+ }
+ }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
new file mode 100644
index 00000000..32d163fd
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for Riak KV 2.x.y.
+ *
+ */
+package com.yahoo.ycsb.db.riak;
\ No newline at end of file
diff --git a/riak/src/main/resources/riak.properties b/riak/src/main/resources/riak.properties
new file mode 100644
index 00000000..46c598fa
--- /dev/null
+++ b/riak/src/main/resources/riak.properties
@@ -0,0 +1,61 @@
+##
+# Copyright (c) 2016 YCSB contributors All rights reserved.
+# Copyright 2014 Basho Technologies, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+#
+
+# RiakKVClient - Default Properties
+# Note: Change the properties below to set the values to use for your test. You can set them either here or from the
+# command line. Note that the latter choice overrides these settings.
+
+# riak.hosts - string list, comma separated list of IPs or FQDNs.
+# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com
+riak.hosts=127.0.0.1
+
+# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file
+# at the line "listener.protobuf.internal".
+riak.port=8087
+
+# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information
+riak.bucket_type=ycsb
+
+# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read
+# is considered successful.
+riak.r_val=2
+
+# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is
+# considered complete.
+riak.w_val=2
+
+# riak.read_retry_count - int, number of times the client will try to read a key from Riak.
+riak.read_retry_count=5
+
+# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another
+# read if the previous one failed.
+riak.wait_time_before_retry=200
+
+# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction.
+riak.transaction_time_limit=10
+
+# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
+riak.strong_consistency=true
+
+# riak.strong_consistent_scans_bucket_type - string, indicates the bucket-type to use to allow scans transactions
+# when using strong consistency mode. Example: fakeBucketType.
+riak.strong_consistent_scans_bucket_type=
+
+# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
+# is started.
+riak.debug=false
diff --git a/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java
new file mode 100644
index 00000000..a571fe43
--- /dev/null
+++ b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java
@@ -0,0 +1,264 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import java.util.*;
+
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.Status;
+import com.yahoo.ycsb.StringByteIterator;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Integration tests for the Riak KV client.
+ */
+public class RiakKVClientTest {
+ private static RiakKVClient riakClient;
+
+ private static final String bucket = "testBucket";
+ private static final String keyPrefix = "testKey";
+ private static final int recordsToInsert = 20;
+ private static final int recordsToScan = 7;
+ private static final String firstField = "Key number";
+ private static final String secondField = "Key number doubled";
+ private static final String thirdField = "Key number square";
+
+ private static boolean testStarted = false;
+
+ /**
+ * Creates a cluster for testing purposes.
+ */
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ riakClient = new RiakKVClient();
+ riakClient.init();
+
+ // Set the test bucket environment with the appropriate parameters.
+ try {
+ riakClient.setTestEnvironment(bucket);
+ } catch(Exception e) {
+ assumeNoException("Unable to configure Riak KV for test, aborting.", e);
+ }
+
+ // Just add some records to work on...
+ for (int i = 0; i < recordsToInsert; i++) {
+ // Abort the entire test whenever the dataset population operation fails.
+ assumeThat("Riak KV is NOT RUNNING, aborting test.",
+ riakClient.insert(bucket, keyPrefix + String.valueOf(i), StringByteIterator.getByteIteratorMap(
+ createExpectedHashMap(i))),
+ is(Status.OK));
+ }
+
+ // Variable to check to determine whether the test has started or not.
+ testStarted = true;
+ }
+
+ /**
+ * Shuts down the cluster created.
+ */
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ // Delete all added keys before cleanup ONLY IF TEST ACTUALLY STARTED.
+ if (testStarted) {
+ for (int i = 0; i <= recordsToInsert; i++) {
+ delete(keyPrefix + Integer.toString(i));
+ }
+ }
+
+ riakClient.cleanup();
+ }
+
+ /**
+ * Test method for read transaction. It is designed to read two of the three fields stored for each key, to also test
+ * if the createResultHashMap() function implemented in RiakKVClient.java works as expected.
+ */
+ @Test
+ public void testRead() {
+ // Choose a random key to read, among the available ones.
+ int readKeyNumber = new Random().nextInt(recordsToInsert);
+
+ // Prepare two fields to read.
+ Set fields = new HashSet<>();
+ fields.add(firstField);
+ fields.add(thirdField);
+
+ // Prepare an expected result.
+ HashMap expectedValue = new HashMap<>();
+ expectedValue.put(firstField, Integer.toString(readKeyNumber));
+ expectedValue.put(thirdField, Integer.toString(readKeyNumber * readKeyNumber));
+
+ // Define a HashMap to store the actual result.
+ HashMap readValue = new HashMap<>();
+
+ // If a read transaction has been properly done, then one has to receive a Status.OK return from the read()
+ // function. Moreover, the actual returned result MUST match the expected one.
+ assertEquals("Read transaction FAILED.",
+ Status.OK,
+ riakClient.read(bucket, keyPrefix + Integer.toString(readKeyNumber), fields, readValue));
+
+ assertEquals("Read test FAILED. Actual read transaction value is NOT MATCHING the expected one.",
+ expectedValue.toString(),
+ readValue.toString());
+ }
+
+ /**
+ * Test method for scan transaction. A scan transaction has to be considered successfully completed only if all the
+ * requested values are read (i.e. scan transaction returns with Status.OK). Moreover, one has to check if the
+ * obtained results match the expected ones.
+ */
+ @Test
+ public void testScan() {
+ // Choose, among the available ones, a random key as starting point for the scan transaction.
+ int startScanKeyNumber = new Random().nextInt(recordsToInsert - recordsToScan);
+
+ // Prepare a HashMap vector to store the scan transaction results.
+ Vector> scannedValues = new Vector<>();
+
+ // Check whether the scan transaction is correctly performed or not.
+ assertEquals("Scan transaction FAILED.",
+ Status.OK,
+ riakClient.scan(bucket, keyPrefix + Integer.toString(startScanKeyNumber), recordsToScan, null,
+ scannedValues));
+
+ // After the scan transaction completes, compare the obtained results with the expected ones.
+ for (int i = 0; i < recordsToScan; i++) {
+ assertEquals("Scan test FAILED: the current scanned key is NOT MATCHING the expected one.",
+ createExpectedHashMap(startScanKeyNumber + i).toString(),
+ scannedValues.get(i).toString());
+ }
+ }
+
+ /**
+ * Test method for update transaction. The test is designed to restore the previously read key. It is assumed to be
+ * correct when, after performing the update transaction, one reads the just provided values.
+ */
+ @Test
+ public void testUpdate() {
+ // Choose a random key to read, among the available ones.
+ int updateKeyNumber = new Random().nextInt(recordsToInsert);
+
+ // Define a HashMap to save the previously stored values for eventually restoring them.
+ HashMap readValueBeforeUpdate = new HashMap<>();
+ riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueBeforeUpdate);
+
+ // Prepare an update HashMap to store.
+ HashMap updateValue = new HashMap<>();
+ updateValue.put(firstField, "UPDATED");
+ updateValue.put(secondField, "UPDATED");
+ updateValue.put(thirdField, "UPDATED");
+
+ // First of all, perform the update and check whether it's failed or not.
+ assertEquals("Update transaction FAILED.",
+ Status.OK,
+ riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), StringByteIterator
+ .getByteIteratorMap(updateValue)));
+
+ // Then, read the key again and...
+ HashMap readValueAfterUpdate = new HashMap<>();
+ assertEquals("Update test FAILED. Unable to read key value.",
+ Status.OK,
+ riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueAfterUpdate));
+
+ // ...compare the result with the new one!
+ assertEquals("Update transaction NOT EXECUTED PROPERLY. Values DID NOT CHANGE.",
+ updateValue.toString(),
+ readValueAfterUpdate.toString());
+
+ // Finally, restore the previously read key.
+ assertEquals("Update test FAILED. Unable to restore previous key value.",
+ Status.OK,
+ riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), readValueBeforeUpdate));
+ }
+
+ /**
+ * Test method for insert transaction. It is designed to insert a key just after the last key inserted in the setUp()
+ * phase.
+ */
+ @Test
+ public void testInsert() {
+ // Define a HashMap to insert and another one for the comparison operation.
+ HashMap insertValue = createExpectedHashMap(recordsToInsert);
+ HashMap readValue = new HashMap<>();
+
+ // Check whether the insertion transaction was performed or not.
+ assertEquals("Insert transaction FAILED.",
+ Status.OK,
+ riakClient.insert(bucket, keyPrefix + Integer.toString(recordsToInsert), StringByteIterator.
+ getByteIteratorMap(insertValue)));
+
+ // Finally, compare the insertion performed with the one expected by reading the key.
+ assertEquals("Insert test FAILED. Unable to read inserted value.",
+ Status.OK,
+ riakClient.read(bucket, keyPrefix + Integer.toString(recordsToInsert), null, readValue));
+ assertEquals("Insert test FAILED. Actual read transaction value is NOT MATCHING the inserted one.",
+ insertValue.toString(),
+ readValue.toString());
+ }
+
+ /**
+ * Test method for delete transaction. The test deletes a key, then performs a read that should give a
+ * Status.NOT_FOUND response. Finally, it restores the previously read key.
+ */
+ @Test
+ public void testDelete() {
+ // Choose a random key to delete, among the available ones.
+ int deleteKeyNumber = new Random().nextInt(recordsToInsert);
+
+ // Define a HashMap to save the previously stored values for its eventual restore.
+ HashMap readValueBeforeDelete = new HashMap<>();
+ riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, readValueBeforeDelete);
+
+ // First of all, delete the key.
+ assertEquals("Delete transaction FAILED.",
+ Status.OK,
+ delete(keyPrefix + Integer.toString(deleteKeyNumber)));
+
+ // Then, check if the deletion was actually achieved.
+ assertEquals("Delete test FAILED. Key NOT deleted.",
+ Status.NOT_FOUND,
+ riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, null));
+
+ // Finally, restore the previously deleted key.
+ assertEquals("Delete test FAILED. Unable to restore previous key value.",
+ Status.OK,
+ riakClient.insert(bucket, keyPrefix + Integer.toString(deleteKeyNumber), readValueBeforeDelete));
+ }
+
+ private static Status delete(String key) {
+ return riakClient.delete(bucket, key);
+ }
+
+ private static HashMap createExpectedHashMap(int value) {
+ HashMap values = new HashMap<>();
+
+ values.put(firstField, Integer.toString(value));
+ values.put(secondField, Integer.toString(2 * value));
+ values.put(thirdField, Integer.toString(value * value));
+
+ return values;
+ }
+}
diff --git a/s3/pom.xml b/s3/pom.xml
index d5726a46..f2246417 100644
--- a/s3/pom.xml
+++ b/s3/pom.xml
@@ -1,44 +1,44 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
s3-binding
S3 Storage Binding
jar
com.amazonaws
aws-java-sdk-s3
${s3.version}
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/solr/pom.xml b/solr/pom.xml
index 8253ea02..2959bf9c 100644
--- a/solr/pom.xml
+++ b/solr/pom.xml
@@ -1,59 +1,59 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
solr-binding
Solr Binding
jar
com.yahoo.ycsb
core
${project.version}
provided
org.apache.solr
solr-solrj
${solr.version}
org.slf4j
slf4j-log4j12
1.7.10
org.apache.solr
solr-test-framework
${solr.version}
test
diff --git a/tarantool/pom.xml b/tarantool/pom.xml
index acaea4ff..4b43ffc2 100644
--- a/tarantool/pom.xml
+++ b/tarantool/pom.xml
@@ -1,75 +1,75 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent/
tarantool-binding
Tarantool DB Binding
jar
false
org.tarantool
connector
${tarantool.version}
com.yahoo.ycsb
core
${project.version}
provided
org.apache.maven.plugins
maven-checkstyle-plugin
2.15
true
../checkstyle.xml
true
true
validate
validate
checkstyle
diff --git a/voldemort/pom.xml b/voldemort/pom.xml
index f891a65a..fab15f0b 100644
--- a/voldemort/pom.xml
+++ b/voldemort/pom.xml
@@ -1,51 +1,51 @@
4.0.0
com.yahoo.ycsb
binding-parent
- 0.9.0-SNAPSHOT
+ 0.10.0-SNAPSHOT
../binding-parent
voldemort-binding
Voldemort DB Binding
jar
voldemort
voldemort
${voldemort.version}
log4j
log4j
1.2.16
com.yahoo.ycsb
core
${project.version}
provided
diff --git a/workloads/workload_template b/workloads/workload_template
index f5e80c88..b66d3b6e 100644
--- a/workloads/workload_template
+++ b/workloads/workload_template
@@ -1,171 +1,203 @@
-# Copyright (c) 2012 YCSB contributors. All rights reserved.
+# Copyright (c) 2012-2016 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Yahoo! Cloud System Benchmark
# Workload Template: Default Values
#
# File contains all properties that can be set to define a
# YCSB session. All properties are set to their default
# value if one exists. If not, the property is commented
# out. When a property has a finite number of settings,
# the default is enabled and the alternates are shown in
# comments below it.
#
# Use of most explained through comments in Client.java or
# CoreWorkload.java or on the YCSB wiki page:
# https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties
# The name of the workload class to use
workload=com.yahoo.ycsb.workloads.CoreWorkload
# There is no default setting for recordcount but it is
# required to be set.
# The number of records in the table to be inserted in
# the load phase or the number of records already in the
# table before the run phase.
recordcount=1000000
# There is no default setting for operationcount but it is
# required to be set.
# The number of operations to use during the run phase.
operationcount=3000000
# The number of insertions to do, if different from recordcount.
# Used with insertstart to grow an existing table.
#insertcount=
# The offset of the first insertion
insertstart=0
# The number of fields in a record
fieldcount=10
# The size of each field (in bytes)
fieldlength=100
# Should read all fields
readallfields=true
# Should write all fields on update
writeallfields=false
# The distribution used to choose the length of a field
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
# What proportion of operations are reads
readproportion=0.95
# What proportion of operations are updates
updateproportion=0.05
# What proportion of operations are inserts
insertproportion=0
# What proportion of operations read then modify a record
readmodifywriteproportion=0
# What proportion of operations are scans
scanproportion=0
# On a single scan, the maximum number of records to access
maxscanlength=1000
# The distribution used to choose the number of records to access on a scan
scanlengthdistribution=uniform
#scanlengthdistribution=zipfian
# Should records be inserted in order or pseudo-randomly
insertorder=hashed
#insertorder=ordered
# The distribution of requests across the keyspace
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
# Percentage of data items that constitute the hot set
hotspotdatafraction=0.2
# Percentage of operations that access the hot set
hotspotopnfraction=0.8
# Maximum execution time in seconds
#maxexecutiontime=
# The name of the database table to run queries against
table=usertable
# The column family of fields (required by some databases)
#columnfamily=
# How the latency measurements are presented
measurementtype=histogram
#measurementtype=timeseries
#measurementtype=raw
# When measurementtype is set to raw, measurements will be output
# as RAW datapoints in the following csv format:
# "operation, timestamp of the measurement, latency in us"
#
# Raw datapoints are collected in-memory while the test is running. Each
# data point consumes about 50 bytes (including java object overhead).
# For a typical run of 1 million to 10 million operations, this should
# fit into memory most of the time. If you plan to do 100s of millions of
# operations per run, consider provisioning a machine with larger RAM when using
# the RAW measurement type, or split the run into multiple runs.
#
# Optionally, you can specify an output file to save raw datapoints.
# Otherwise, raw datapoints will be written to stdout.
# The output file will be appended to if it already exists, otherwise
# a new output file will be created.
#measurement.raw.output_file = /tmp/your_output_file_for_this_run
+# JVM Reporting.
+#
+# Measure JVM information over time including GC counts, max and min memory
+# used, max and min thread counts, max and min system load and others. This
+# setting must be enabled in conjunction with the "-s" flag to run the status
+# thread. Every "status.interval", the status thread will capture JVM
+# statistics and record the results. At the end of the run, max and mins will
+# be recorded.
+# measurement.trackjvm = false
+
# The range of latencies to track in the histogram (milliseconds)
histogram.buckets=1000
# Granularity for time series (in milliseconds)
timeseries.granularity=1000
# Latency reporting.
#
# YCSB records latency of failed operations separately from successful ones.
# Latency of all OK operations will be reported under their operation name,
# such as [READ], [UPDATE], etc.
#
# For failed operations:
# By default we don't track latency numbers of specific error status.
# We just report latency of all failed operation under one measurement name
# such as [READ-FAILED]. But optionally, user can configure to have either:
# 1. Record and report latency for each and every error status code by
# setting reportLatencyForEachError to true, or
# 2. Record and report latency for a select set of error status codes by
# providing a CSV list of Status codes via the "latencytrackederrors"
# property.
# reportlatencyforeacherror=false
# latencytrackederrors=""
# Insertion error retry for the core workload.
#
# By default, the YCSB core workload does not retry any operations.
# However, during the load process, if any insertion fails, the entire
# load process is terminated.
# If a user desires to have more robust behavior during this phase, they can
# enable retry for insertion by setting the following property to a positive
# number.
# core_workload_insertion_retry_limit = 0
#
# the following number controls the interval between retries (in seconds):
# core_workload_insertion_retry_interval = 3
+
+# Distributed Tracing via Apache HTrace (http://htrace.incubator.apache.org/)
+#
+# Defaults to blank / no tracing
+# Below sends to a local file, sampling at 0.1%
+#
+# htrace.sampler.classes=ProbabilitySampler
+# htrace.sampler.fraction=0.001
+# htrace.span.receiver.classes=org.apache.htrace.core.LocalFileSpanReceiver
+# htrace.local.file.span.receiver.path=/some/path/to/local/file
+#
+# To capture all spans, use the AlwaysSampler
+#
+# htrace.sampler.classes=AlwaysSampler
+#
+# To send spans to an HTraced receiver, use the below and ensure
+# your classpath contains the htrace-htraced jar (i.e. when invoking the ycsb
+# command add -cp /path/to/htrace-htraced.jar)
+#
+# htrace.span.receiver.classes=org.apache.htrace.impl.HTracedSpanReceiver
+# htrace.htraced.receiver.address=example.com:9075
+# htrace.htraced.error.log.period.ms=10000