result) {
Location location = new Location(new Namespace(bucketType, table), key);
FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build();
FetchValue.Response response;
try {
response = fetch(fv);
if (response.isNotFound()) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to read key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
result.put(key, getFields(fields, response));
return Status.OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
* a HashMap.
*
* Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
*
* @param table The name of the table (Riak bucket)
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status scan(String table, String startkey, int recordcount, Set fields,
Vector> result) {
Namespace ns = new Namespace(bucketType, table);
IntIndexQuery iiq = new IntIndexQuery
.Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE)
.withMaxResults(recordcount)
.withPaginationSort(true)
.build();
RiakFuture future = riakClient.executeAsync(iiq);
try {
IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
List entries = response.getEntries();
for (IntIndexQuery.Response.Entry entry : entries) {
Location location = entry.getRiakObjectLocation();
FetchValue fv = new FetchValue.Builder(location)
.withOption(FetchValue.Option.R, rvalue)
.build();
FetchValue.Response keyResponse = fetch(fv);
if (keyResponse.isNotFound()) {
if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
HashMap partialResult = new HashMap<>();
partialResult.put(location.getKeyAsString(), getFields(fields, keyResponse));
result.add(partialResult);
}
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
"Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated,
* even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the
* cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after
* transactionTimeLimit, the transaction is discarded immediately.
*
* @param fv The value to fetch from the cluster.
*/
private FetchValue.Response fetch(FetchValue fv) throws TimeoutException {
FetchValue.Response response = null;
for (int i = 0; i < readRetryCount; i++) {
RiakFuture future = riakClient.executeAsync(fv);
try {
response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
if (!response.isNotFound()) {
break;
}
} catch (TimeoutException e) {
// Let the callee decide how to handle this exception...
throw new TimeoutException();
} catch (Exception e) {
// Sleep for a few ms before retrying...
try {
Thread.sleep(waitTimeBeforeRetry);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return response;
}
/**
* Function that retrieves all the fields searched within a read or scan operation.
*
* @param fields The list of fields to read, or null for all of them
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return A ByteIterator containing all the values that correspond to the fields provided.
*/
private ByteIterator getFields(Set fields, FetchValue.Response response) {
// If everything went fine, then a result must be given. Such an object is an hash table containing the (key,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
ByteIterator valuesToPut;
// If only specific field are requested, then only these should be put in the result object!
if (fields != null) {
HashMap deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// Instantiate a new HashMap for returning only the requested fields.
HashMap returnMap = new HashMap<>();
// Build the return HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved: if they're equal, then proceed to store the
// couple in the returnMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
returnMap.put((String) field, value);
}
}
// Finally, convert the returnMap to a byte array.
valuesToPut = new ByteArrayByteIterator(serializeTable(returnMap));
} else {
// If, instead, no field is specified, then all the ones retrieved must be provided as result.
valuesToPut = new ByteArrayByteIterator(responseFieldsAndValues);
}
// Results.
return valuesToPut;
}
/**
* Insert a record in the database. Any field/value pairs in the specified values HashMap
* will be written into the record with the specified record key. Also creates a
* secondary index (2i) for each record consisting of the key converted to long to be used
* for the scan operation
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status insert(String table, String key, HashMap values) {
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue store = new StoreValue.Builder(object)
.withLocation(location)
.withOption(Option.W, wvalue)
.build();
RiakFuture future = riakClient.executeAsync(store);
try {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Define a class to permit object substitution within the update operation, following the same, identical steps
* made in an insert operation. This is needed for strong-consistent updates.
*/
private static final class UpdateEntity extends UpdateValue.Update {
private final RiakObject object;
private UpdateEntity(RiakObject e) {
this.object = e;
}
/* Simply returns the object.
*/
@Override
public RiakObject apply(RiakObject original) {
return object;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified values
* HashMap will be written into the record with the specified
* record key, overwriting any existing values with the same field name.
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status update(String table, String key, HashMap values) {
if (!strongConsistency) {
return insert(table, key, values);
}
Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
UpdateValue update = new UpdateValue.Builder(location)
.withFetchOption(FetchValue.Option.DELETED_VCLOCK, true)
.withStoreOption(Option.W, wvalue)
.withUpdate(new UpdateEntity(object))
.build();
RiakFuture future = riakClient.executeAsync(update);
try {
- future.get(transactionTimeLimit, TimeUnit.SECONDS);
+ // For some reason, the update transaction doesn't throw any exception when no cluster has been started, so one
+ // needs to check whether it was done or not. When calling the wasUpdated() function with no nodes available, a
+ // NullPointerException is thrown.
+ future.get(transactionTimeLimit, TimeUnit.SECONDS).wasUpdated();
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to update key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to update key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
/**
* Delete a record from the database.
*
* @param table The name of the table (Riak bucket)
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status delete(String table, String key) {
Location location = new Location(new Namespace(bucketType, table), key);
DeleteValue dv = new DeleteValue.Builder(location).build();
RiakFuture future = riakClient.executeAsync(dv);
try {
future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) {
if (debug) {
System.err.println("Unable to delete key " + key + ". Reason: TIME OUT");
}
return TIME_OUT;
} catch (Exception e) {
if (debug) {
System.err.println("Unable to delete key " + key + ". Reason: " + e.toString());
}
return Status.ERROR;
}
return Status.OK;
}
public void cleanup() throws DBException {
try {
riakCluster.shutdown();
} catch (Exception e) {
System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString());
throw new DBException(e);
}
}
}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
similarity index 99%
rename from riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java
rename to riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
index 4c072028..2b271fd2 100644
--- a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
@@ -1,129 +1,129 @@
/**
* Copyright (c) 2016 YCSB contributors All rights reserved.
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
-package com.yahoo.ycsb.db;
+package com.yahoo.ycsb.db.riak;
import java.io.*;
import java.util.Map;
import java.util.Set;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Utility class for Riak KV Client.
*
*/
final class RiakUtils {
private RiakUtils() {
super();
}
private static byte[] toBytes(final int anInteger) {
byte[] aResult = new byte[4];
aResult[0] = (byte) (anInteger >> 24);
aResult[1] = (byte) (anInteger >> 16);
aResult[2] = (byte) (anInteger >> 8);
aResult[3] = (byte) (anInteger /* >> 0 */);
return aResult;
}
private static int fromBytes(final byte[] aByteArray) {
checkArgument(aByteArray.length == 4);
return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF);
}
private static void close(final OutputStream anOutputStream) {
try {
anOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void close(final InputStream anInputStream) {
try {
anInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
static byte[] serializeTable(Map aTable) {
final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
final Set> theEntries = aTable.entrySet();
try {
for (final Map.Entry anEntry : theEntries) {
final byte[] aColumnName = anEntry.getKey().getBytes();
anOutputStream.write(toBytes(aColumnName.length));
anOutputStream.write(aColumnName);
final byte[] aColumnValue = anEntry.getValue().toArray();
anOutputStream.write(toBytes(aColumnValue.length));
anOutputStream.write(aColumnValue);
}
return anOutputStream.toByteArray();
} catch (IOException e) {
throw new IllegalStateException(e);
} finally {
close(anOutputStream);
}
}
static void deserializeTable(final byte[] aValue, final Map theResult) {
final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
byte[] aSizeBuffer = new byte[4];
try {
while (anInputStream.available() > 0) {
anInputStream.read(aSizeBuffer);
final int aColumnNameLength = fromBytes(aSizeBuffer);
final byte[] aColumnNameBuffer = new byte[aColumnNameLength];
anInputStream.read(aColumnNameBuffer);
anInputStream.read(aSizeBuffer);
final int aColumnValueLength = fromBytes(aSizeBuffer);
final byte[] aColumnValue = new byte[aColumnValueLength];
anInputStream.read(aColumnValue);
theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue));
}
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
close(anInputStream);
}
}
static Long getKeyAsLong(String key) {
String keyString = key.replaceFirst("[a-zA-Z]*", "");
return Long.parseLong(keyString);
}
}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
similarity index 93%
rename from riak/src/main/java/com/yahoo/ycsb/db/package-info.java
rename to riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
index 3f517a5d..dbaafb96 100644
--- a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
@@ -1,23 +1,23 @@
/**
* Copyright (c) 2016 YCSB contributors All rights reserved.
* Copyright 2014 Basho Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
- * The YCSB binding for Riak KV
- * 2.0.x.
+ * The YCSB binding for Riak KV 2.0.x.
+ *
*/
-package com.yahoo.ycsb.db;
\ No newline at end of file
+package com.yahoo.ycsb.db.riak;
\ No newline at end of file
diff --git a/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java b/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java
deleted file mode 100644
index 72092eff..00000000
--- a/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Copyright (c) 2016 YCSB contributors All rights reserved.
- * Copyright 2014 Basho Technologies, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying
- * LICENSE file.
- */
-
-package com.yahoo.ycsb.db;
-
-import java.util.*;
-
-import com.yahoo.ycsb.ByteIterator;
-import com.yahoo.ycsb.Status;
-import com.yahoo.ycsb.StringByteIterator;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Integration tests for the Riak KV client. All tests are configured to return positive.
- */
-public class RiakKVClientTest {
- private static RiakKVClient riakClient;
- private static String bucket = "testBucket";
- private static String startKey = "testKey";
- private static int recordsToInitiallyInsert = 10;
-
- /**
- * Creates a cluster for testing purposes.
- */
- @BeforeClass
- public static void setUpClass() throws Exception {
- riakClient = new RiakKVClient();
- riakClient.init();
-
- HashMap values = new HashMap<>();
-
- // Just add some random values to work on...
- for (int i = 0; i < recordsToInitiallyInsert; i++) {
- values.put("testRecord_1", "testValue_1");
- values.put("testRecord_2", "testValue_2");
-
- riakClient.insert(bucket, startKey + String.valueOf(i), StringByteIterator.getByteIteratorMap(values));
- }
- }
-
- /**
- * Shuts down the cluster created.
- */
- @AfterClass
- public static void tearDownClass() throws Exception {
- riakClient.cleanup();
- }
-
- /**
- * Test method for read transaction.
- */
- @Test
- public void testRead() {
- Set fields = new HashSet<>();
- fields.add("testRecord_1");
- fields.add("testRecord_2");
-
- HashMap results = new HashMap<>();
-
- assertEquals(Status.OK, riakClient.read(bucket, startKey + "1", fields, results));
- }
-
- /**
- * Test method for scan transaction.
- */
- @Test
- public void testScan() {
- Vector> results = new Vector<>();
-
- assertEquals(Status.OK, riakClient.scan(bucket, startKey + "1", recordsToInitiallyInsert - 1, null, results));
- }
-
- /**
- * Test method for update transaction.
- */
- @Test
- public void testUpdate() {
- HashMap values = new HashMap<>();
- values.put("testRecord_1", "testValue_1_updated");
- values.put("testRecord_2", "testValue_2_updated");
-
- assertEquals(Status.OK, riakClient.update(bucket, startKey + "0", StringByteIterator.getByteIteratorMap(values)));
- }
-
- /**
- * Test method for insert transaction.
- */
- @Test
- public void testInsert() {
- HashMap values = new HashMap<>();
- values.put("testRecord_1", "testValue_1");
- values.put("testRecord_2", "testValue_2");
-
- assertEquals(Status.OK, riakClient.insert(bucket, startKey + Integer.toString(recordsToInitiallyInsert),
- StringByteIterator.getByteIteratorMap(values)));
- }
-
- /**
- * Test method for delete transaction.
- */
- @Test
- public void testDelete() {
- assertEquals(Status.OK, riakClient.delete(bucket, startKey + "0"));
- }
-}