result) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rQuorumValue).build();
+
+ try {
+ FetchValue.Response response = fetch(fv);
+
+ if (response.isNotFound()) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: NOT FOUND");
+ }
+
+ return Status.NOT_FOUND;
+ }
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to read key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+
+ /**
+ * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
+ * a HashMap.
+ *
+ * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param startkey The record key of the first record to read.
+ * @param recordcount The number of records to read
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status scan(String table, String startkey, int recordcount, Set fields,
+ Vector> result) {
+ Namespace ns = new Namespace(bucketType, table);
+
+ IntIndexQuery iiq = new IntIndexQuery
+ .Builder(ns, "key", getKeyAsLong(startkey), 999999999999999999L)
+ .withMaxResults(recordcount)
+ .withPaginationSort(true)
+ .build();
+
+ RiakFuture future = riakClient.executeAsync(iiq);
+
+ try {
+ IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ List entries = response.getEntries();
+
+ for (IntIndexQuery.Response.Entry entry : entries) {
+ Location location = entry.getRiakObjectLocation();
+ FetchValue fv = new FetchValue.Builder(location)
+ .withOption(FetchValue.Option.R, rQuorumValue)
+ .build();
+
+ FetchValue.Response keyResponse = fetch(fv);
+
+ if (keyResponse.isNotFound()) {
+ if (debug) {
+ System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: NOT " +
+ "FOUND");
+ }
+
+ return Status.NOT_FOUND;
+ }
+ }
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: " +
+ e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ /**
+ * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated,
+ * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the
+ * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after
+ * transactionTimeLimit, the transaction is discarded immediately.
+ *
+ * @param fv The value to fetch from the cluster.
+ */
+ private FetchValue.Response fetch(FetchValue fv) throws TimeoutException {
+ FetchValue.Response response = null;
+
+ for (int i = 0; i < readRetryCount; i++) {
+ RiakFuture future = riakClient.executeAsync(fv);
+
+ try {
+ response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+
+ if (!response.isNotFound()) {
+ break;
+ }
+ } catch (TimeoutException e) {
+ // Let the callee decide how to handle this exception...
+ throw new TimeoutException();
+ } catch (Exception e) {
+ // Sleep for a few ms before retrying...
+ try {
+ Thread.sleep(waitTimeBeforeRetry);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return response;
+ }
+
+ /**
+ * Insert a record in the database. Any field/value pairs in the specified values HashMap
+ * will be written into the record with the specified record key. Also creates a
+ * secondary index (2i) for each record consisting of the key converted to long to be used
+ * for the scan operation
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to insert.
+ * @param values A HashMap of field/value pairs to insert in the record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status insert(String table, String key, HashMap values) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ RiakObject object = new RiakObject();
+
+ object.setValue(BinaryValue.create(serializeTable(values)));
+ object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+
+ StoreValue store = new StoreValue.Builder(object)
+ .withLocation(location)
+ .withOption(Option.W, wQuorumValue)
+ .build();
+
+ RiakFuture future = riakClient.executeAsync(store);
+
+ try {
+ future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
+ .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
+ .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ /**
+ * Define a class to permit object substitution within the update operation, following the same, identical steps
+ * made in an insert operation. This is needed for strong-consistent updates.
+ */
+ private static final class UpdateEntity extends UpdateValue.Update {
+ private final RiakObject object;
+
+ private UpdateEntity(RiakObject e) {
+ this.object = e;
+ }
+
+ /* Simply returns the object.
+ */
+ @Override
+ public RiakObject apply(RiakObject original) {
+ return object;
+ }
+ }
+
+ /**
+ * Update a record in the database. Any field/value pairs in the specified values
+ * HashMap will be written into the record with the specified
+ * record key, overwriting any existing values with the same field name.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to write.
+ * @param values A HashMap of field/value pairs to update in the record
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status update(String table, String key, HashMap values) {
+ if (!strongConsistency) {
+ return insert(table, key, values);
+ }
+
+ Location location = new Location(new Namespace(bucketType, table), key);
+ RiakObject object = new RiakObject();
+
+ object.setValue(BinaryValue.create(serializeTable(values)));
+ object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+
+ UpdateValue update = new UpdateValue.Builder(location)
+ .withFetchOption(FetchValue.Option.DELETED_VCLOCK, true)
+ .withStoreOption(Option.W, wQuorumValue)
+ .withUpdate(new UpdateEntity(object))
+ .build();
+
+ RiakFuture future = riakClient.executeAsync(update);
+
+ try {
+ future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to update key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to update key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+
+ /**
+ * Delete a record from the database.
+ *
+ * @param table The name of the table (Riak bucket)
+ * @param key The record key of the record to delete.
+ * @return Zero on success, a non-zero error code on error
+ */
+ @Override
+ public Status delete(String table, String key) {
+ Location location = new Location(new Namespace(bucketType, table), key);
+ DeleteValue dv = new DeleteValue.Builder(location).build();
+
+ RiakFuture future = riakClient.executeAsync(dv);
+
+ try {
+ future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ if (debug) {
+ System.err.println("Unable to delete key " + key + ". Reason: TIME OUT");
+ }
+
+ return TIME_OUT;
+ } catch (Exception e) {
+ if (debug) {
+ System.err.println("Unable to delete key " + key + ". Reason: " + e.toString());
+ }
+
+ return Status.ERROR;
+ }
+
+ return Status.OK;
+ }
+
+ public void cleanup() throws DBException {
+ try {
+ riakCluster.shutdown();
+ } catch (Exception e) {
+ System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString());
+ throw new DBException(e);
+ }
+ }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java
new file mode 100644
index 00000000..2f4d74ca
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2016 nygard_89
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.ycsb.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import com.yahoo.ycsb.ByteIterator;
+
+/**
+ * @author nygard_89
+ * @author Basho Technologies, Inc.
+ *
+ */
+final class RiakUtils {
+
+ private RiakUtils() {
+ super();
+ }
+
+ private static byte[] toBytes(final int anInteger) {
+ byte[] aResult = new byte[4];
+
+ aResult[0] = (byte) (anInteger >> 24);
+ aResult[1] = (byte) (anInteger >> 16);
+ aResult[2] = (byte) (anInteger >> 8);
+ aResult[3] = (byte) (anInteger /* >> 0 */);
+
+ return aResult;
+ }
+
+ private static void close(final OutputStream anOutputStream) {
+ try {
+ anOutputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ static byte[] serializeTable(Map aTable) {
+ final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
+ final Set> theEntries = aTable.entrySet();
+
+ try {
+ for (final Map.Entry anEntry : theEntries) {
+ final byte[] aColumnName = anEntry.getKey().getBytes();
+
+ anOutputStream.write(toBytes(aColumnName.length));
+ anOutputStream.write(aColumnName);
+
+ final byte[] aColumnValue = anEntry.getValue().toArray();
+
+ anOutputStream.write(toBytes(aColumnValue.length));
+ anOutputStream.write(aColumnValue);
+ }
+ return anOutputStream.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ } finally {
+ close(anOutputStream);
+ }
+ }
+
+ static Long getKeyAsLong(String key) {
+ String keyString = key.replace("user", "").replaceFirst("^0*", "");
+ return Long.parseLong(keyString);
+ }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/package-info.java
new file mode 100644
index 00000000..7e1d7212
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The YCSB binding for Riak KV
+ * 2.0+.
+ */
+package com.yahoo.ycsb.db;
\ No newline at end of file
diff --git a/riak/src/main/resources/riak.properties b/riak/src/main/resources/riak.properties
new file mode 100644
index 00000000..62eddbcf
--- /dev/null
+++ b/riak/src/main/resources/riak.properties
@@ -0,0 +1,39 @@
+# RiakDBClient - Default Properties
+# Note: Change the properties below to set the values to use for your test. You can set them either here or from the
+# command line. Note that the latter choice overrides these settings.
+
+# riak.hosts - string list, comma separated list of IPs or FQDNs.
+# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com
+riak.hosts=127.0.0.1
+
+# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file
+# at the line "listener.protobuf.internal".
+riak.port=8087
+
+# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information
+riak.bucket_type=ycsb
+
+# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read
+# is considered successful.
+riak.r_val=2
+
+# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is
+# considered complete.
+riak.w_val=2
+
+# riak.read_retry_count - int, number of times the client will try to read a key from Riak.
+riak.read_retry_count=5
+
+# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another
+# read if the previous one failed.
+riak.wait_time_before_retry=200
+
+# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction.
+riak.transaction_time_limit=10
+
+# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
+riak.strong_consistency=false
+
+# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
+# is started.
+riak.debug=false
\ No newline at end of file