diff --git a/riak/pom.xml b/riak/pom.xml index b3feb39e..2d80d86d 100644 --- a/riak/pom.xml +++ b/riak/pom.xml @@ -1,58 +1,53 @@ 4.0.0 com.yahoo.ycsb binding-parent 0.9.0-SNAPSHOT ../binding-parent riak-binding Riak KV Binding jar com.basho.riak riak-client 2.0.5 com.yahoo.ycsb core ${project.version} provided com.google.collections google-collections 1.0 - - junit - junit - 4.12 - test - \ No newline at end of file diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java similarity index 97% rename from riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java rename to riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java index c2da947f..2a176ee8 100644 --- a/riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java @@ -1,542 +1,545 @@ /** * Copyright (c) 2016 YCSB contributors All rights reserved. * Copyright 2014 Basho Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ -package com.yahoo.ycsb.db; +package com.yahoo.ycsb.db.riak; import com.basho.riak.client.api.commands.kv.UpdateValue; import com.basho.riak.client.core.RiakFuture; import com.yahoo.ycsb.*; import java.io.IOException; import java.io.InputStream; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.api.cap.Quorum; import com.basho.riak.client.api.commands.indexes.IntIndexQuery; import com.basho.riak.client.api.commands.kv.DeleteValue; import com.basho.riak.client.api.commands.kv.FetchValue; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.api.commands.kv.StoreValue.Option; import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.query.RiakObject; import com.basho.riak.client.core.query.indexes.LongIntIndex; import com.basho.riak.client.core.util.BinaryValue; -import static com.yahoo.ycsb.db.RiakUtils.deserializeTable; -import static com.yahoo.ycsb.db.RiakUtils.getKeyAsLong; -import static com.yahoo.ycsb.db.RiakUtils.serializeTable; +import static com.yahoo.ycsb.db.riak.RiakUtils.deserializeTable; +import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong; +import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable; /** * Riak KV 2.0.x client for YCSB framework. * */ public final class RiakKVClient extends DB { private static final String HOST_PROPERTY = "riak.hosts"; private static final String PORT_PROPERTY = "riak.port"; private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type"; private static final String R_VALUE_PROPERTY = "riak.r_val"; private static final String W_VALUE_PROPERTY = "riak.w_val"; private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count"; private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry"; private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit"; private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency"; private static final String DEBUG_PROPERTY = "riak.debug"; private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time " + "for transaction indicated"); private String[] hosts; private int port; private String bucketType; private Quorum rvalue; private Quorum wvalue; private int readRetryCount; private int waitTimeBeforeRetry; private int transactionTimeLimit; private boolean strongConsistency; private boolean debug; private RiakClient riakClient; private RiakCluster riakCluster; private void loadDefaultProperties() { InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties"); Properties propsPF = new Properties(System.getProperties()); try { propsPF.load(propFile); } catch (IOException e) { e.printStackTrace(); } hosts = propsPF.getProperty(HOST_PROPERTY).split(","); port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY)); bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY); rvalue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY))); wvalue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY))); readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY)); waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY)); transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY)); strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY)); debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY)); } private void loadProperties() { loadDefaultProperties(); Properties props = getProperties(); String portString = props.getProperty(PORT_PROPERTY); if (portString != null) { port = Integer.parseInt(portString); } String hostsString = props.getProperty(HOST_PROPERTY); if (hostsString != null) { hosts = hostsString.split(","); } String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY); if (bucketTypeString != null) { bucketType = bucketTypeString; } String rValueString = props.getProperty(R_VALUE_PROPERTY); if (rValueString != null) { rvalue = new Quorum(Integer.parseInt(rValueString)); } String wValueString = props.getProperty(W_VALUE_PROPERTY); if (wValueString != null) { wvalue = new Quorum(Integer.parseInt(wValueString)); } String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY); if (readRetryCountString != null) { readRetryCount = Integer.parseInt(readRetryCountString); } String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY); if (waitTimeBeforeRetryString != null) { waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString); } String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY); if (transactionTimeLimitString != null) { transactionTimeLimit = Integer.parseInt(transactionTimeLimitString); } String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY); if (strongConsistencyString != null) { strongConsistency = Boolean.parseBoolean(strongConsistencyString); } String debugString = props.getProperty(DEBUG_PROPERTY); if (debugString != null) { debug = Boolean.parseBoolean(debugString); } } public void init() throws DBException { loadProperties(); if (debug) { System.err.println("DEBUG ENABLED. Configuration parameters:"); System.err.println("-----------------------------------------"); System.err.println("Hosts: " + Arrays.toString(hosts)); System.err.println("Port: " + port); System.err.println("Bucket Type: " + bucketType); System.err.println("R Val: " + rvalue.toString()); System.err.println("W Val: " + wvalue.toString()); System.err.println("Read Retry Count: " + readRetryCount); System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms"); System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s"); System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual")); } RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port); List nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts)); riakCluster = new RiakCluster.Builder(nodes).build(); try { riakCluster.start(); riakClient = new RiakClient(riakCluster); } catch (Exception e) { System.err.println("Unable to properly start up the cluster. Reason: " + e.toString()); throw new DBException(e); } } /** * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. * * @param table The name of the table (Riak bucket) * @param key The record key of the record to read. * @param fields The list of fields to read, or null for all of them * @param result A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error */ @Override public Status read(String table, String key, Set fields, HashMap result) { Location location = new Location(new Namespace(bucketType, table), key); FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build(); FetchValue.Response response; try { response = fetch(fv); if (response.isNotFound()) { if (debug) { System.err.println("Unable to read key " + key + ". Reason: NOT FOUND"); } return Status.NOT_FOUND; } } catch (TimeoutException e) { if (debug) { System.err.println("Unable to read key " + key + ". Reason: TIME OUT"); } return TIME_OUT; } catch (Exception e) { if (debug) { System.err.println("Unable to read key " + key + ". Reason: " + e.toString()); } return Status.ERROR; } result.put(key, getFields(fields, response)); return Status.OK; } /** * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in * a HashMap. *

* Note: The scan operation requires the use of secondary indexes (2i) and LevelDB. * * @param table The name of the table (Riak bucket) * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return Zero on success, a non-zero error code on error */ @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { Namespace ns = new Namespace(bucketType, table); IntIndexQuery iiq = new IntIndexQuery .Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE) .withMaxResults(recordcount) .withPaginationSort(true) .build(); RiakFuture future = riakClient.executeAsync(iiq); try { IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS); List entries = response.getEntries(); for (IntIndexQuery.Response.Entry entry : entries) { Location location = entry.getRiakObjectLocation(); FetchValue fv = new FetchValue.Builder(location) .withOption(FetchValue.Option.R, rvalue) .build(); FetchValue.Response keyResponse = fetch(fv); if (keyResponse.isNotFound()) { if (debug) { System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + "Reason: NOT FOUND"); } return Status.NOT_FOUND; } HashMap partialResult = new HashMap<>(); partialResult.put(location.getKeyAsString(), getFields(fields, keyResponse)); result.add(partialResult); } } catch (TimeoutException e) { if (debug) { System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + "Reason: TIME OUT"); } return TIME_OUT; } catch (Exception e) { if (debug) { System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + "Reason: " + e.toString()); } return Status.ERROR; } return Status.OK; } /** * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated, * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after * transactionTimeLimit, the transaction is discarded immediately. * * @param fv The value to fetch from the cluster. */ private FetchValue.Response fetch(FetchValue fv) throws TimeoutException { FetchValue.Response response = null; for (int i = 0; i < readRetryCount; i++) { RiakFuture future = riakClient.executeAsync(fv); try { response = future.get(transactionTimeLimit, TimeUnit.SECONDS); if (!response.isNotFound()) { break; } } catch (TimeoutException e) { // Let the callee decide how to handle this exception... throw new TimeoutException(); } catch (Exception e) { // Sleep for a few ms before retrying... try { Thread.sleep(waitTimeBeforeRetry); } catch (InterruptedException e1) { e1.printStackTrace(); } } } return response; } /** * Function that retrieves all the fields searched within a read or scan operation. * * @param fields The list of fields to read, or null for all of them * @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return A ByteIterator containing all the values that correspond to the fields provided. */ private ByteIterator getFields(Set fields, FetchValue.Response response) { // If everything went fine, then a result must be given. Such an object is an hash table containing the (key, // value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED! byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue(); ByteIterator valuesToPut; // If only specific field are requested, then only these should be put in the result object! if (fields != null) { HashMap deserializedTable = new HashMap<>(); deserializeTable(responseFieldsAndValues, deserializedTable); // Instantiate a new HashMap for returning only the requested fields. HashMap returnMap = new HashMap<>(); // Build the return HashMap to provide as result. for (Object field : fields.toArray()) { // Comparison between a requested field and the ones retrieved: if they're equal, then proceed to store the // couple in the returnMap. ByteIterator value = deserializedTable.get(field); if (value != null) { returnMap.put((String) field, value); } } // Finally, convert the returnMap to a byte array. valuesToPut = new ByteArrayByteIterator(serializeTable(returnMap)); } else { // If, instead, no field is specified, then all the ones retrieved must be provided as result. valuesToPut = new ByteArrayByteIterator(responseFieldsAndValues); } // Results. return valuesToPut; } /** * Insert a record in the database. Any field/value pairs in the specified values HashMap * will be written into the record with the specified record key. Also creates a * secondary index (2i) for each record consisting of the key converted to long to be used * for the scan operation * * @param table The name of the table (Riak bucket) * @param key The record key of the record to insert. * @param values A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error */ @Override public Status insert(String table, String key, HashMap values) { Location location = new Location(new Namespace(bucketType, table), key); RiakObject object = new RiakObject(); object.setValue(BinaryValue.create(serializeTable(values))); object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); StoreValue store = new StoreValue.Builder(object) .withLocation(location) .withOption(Option.W, wvalue) .build(); RiakFuture future = riakClient.executeAsync(store); try { future.get(transactionTimeLimit, TimeUnit.SECONDS); } catch (TimeoutException e) { if (debug) { System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT"); } return TIME_OUT; } catch (Exception e) { if (debug) { System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString()); } return Status.ERROR; } return Status.OK; } /** * Define a class to permit object substitution within the update operation, following the same, identical steps * made in an insert operation. This is needed for strong-consistent updates. */ private static final class UpdateEntity extends UpdateValue.Update { private final RiakObject object; private UpdateEntity(RiakObject e) { this.object = e; } /* Simply returns the object. */ @Override public RiakObject apply(RiakObject original) { return object; } } /** * Update a record in the database. Any field/value pairs in the specified values * HashMap will be written into the record with the specified * record key, overwriting any existing values with the same field name. * * @param table The name of the table (Riak bucket) * @param key The record key of the record to write. * @param values A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error */ @Override public Status update(String table, String key, HashMap values) { if (!strongConsistency) { return insert(table, key, values); } Location location = new Location(new Namespace(bucketType, table), key); RiakObject object = new RiakObject(); object.setValue(BinaryValue.create(serializeTable(values))); object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); UpdateValue update = new UpdateValue.Builder(location) .withFetchOption(FetchValue.Option.DELETED_VCLOCK, true) .withStoreOption(Option.W, wvalue) .withUpdate(new UpdateEntity(object)) .build(); RiakFuture future = riakClient.executeAsync(update); try { - future.get(transactionTimeLimit, TimeUnit.SECONDS); + // For some reason, the update transaction doesn't throw any exception when no cluster has been started, so one + // needs to check whether it was done or not. When calling the wasUpdated() function with no nodes available, a + // NullPointerException is thrown. + future.get(transactionTimeLimit, TimeUnit.SECONDS).wasUpdated(); } catch (TimeoutException e) { if (debug) { System.err.println("Unable to update key " + key + ". Reason: TIME OUT"); } return TIME_OUT; } catch (Exception e) { if (debug) { System.err.println("Unable to update key " + key + ". Reason: " + e.toString()); } return Status.ERROR; } return Status.OK; } /** * Delete a record from the database. * * @param table The name of the table (Riak bucket) * @param key The record key of the record to delete. * @return Zero on success, a non-zero error code on error */ @Override public Status delete(String table, String key) { Location location = new Location(new Namespace(bucketType, table), key); DeleteValue dv = new DeleteValue.Builder(location).build(); RiakFuture future = riakClient.executeAsync(dv); try { future.get(transactionTimeLimit, TimeUnit.SECONDS); } catch (TimeoutException e) { if (debug) { System.err.println("Unable to delete key " + key + ". Reason: TIME OUT"); } return TIME_OUT; } catch (Exception e) { if (debug) { System.err.println("Unable to delete key " + key + ". Reason: " + e.toString()); } return Status.ERROR; } return Status.OK; } public void cleanup() throws DBException { try { riakCluster.shutdown(); } catch (Exception e) { System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString()); throw new DBException(e); } } } diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java similarity index 99% rename from riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java rename to riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java index 4c072028..2b271fd2 100644 --- a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java @@ -1,129 +1,129 @@ /** * Copyright (c) 2016 YCSB contributors All rights reserved. * Copyright 2014 Basho Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ -package com.yahoo.ycsb.db; +package com.yahoo.ycsb.db.riak; import java.io.*; import java.util.Map; import java.util.Set; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import static com.google.common.base.Preconditions.checkArgument; /** * Utility class for Riak KV Client. * */ final class RiakUtils { private RiakUtils() { super(); } private static byte[] toBytes(final int anInteger) { byte[] aResult = new byte[4]; aResult[0] = (byte) (anInteger >> 24); aResult[1] = (byte) (anInteger >> 16); aResult[2] = (byte) (anInteger >> 8); aResult[3] = (byte) (anInteger /* >> 0 */); return aResult; } private static int fromBytes(final byte[] aByteArray) { checkArgument(aByteArray.length == 4); return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF); } private static void close(final OutputStream anOutputStream) { try { anOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } private static void close(final InputStream anInputStream) { try { anInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } static byte[] serializeTable(Map aTable) { final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream(); final Set> theEntries = aTable.entrySet(); try { for (final Map.Entry anEntry : theEntries) { final byte[] aColumnName = anEntry.getKey().getBytes(); anOutputStream.write(toBytes(aColumnName.length)); anOutputStream.write(aColumnName); final byte[] aColumnValue = anEntry.getValue().toArray(); anOutputStream.write(toBytes(aColumnValue.length)); anOutputStream.write(aColumnValue); } return anOutputStream.toByteArray(); } catch (IOException e) { throw new IllegalStateException(e); } finally { close(anOutputStream); } } static void deserializeTable(final byte[] aValue, final Map theResult) { final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue); byte[] aSizeBuffer = new byte[4]; try { while (anInputStream.available() > 0) { anInputStream.read(aSizeBuffer); final int aColumnNameLength = fromBytes(aSizeBuffer); final byte[] aColumnNameBuffer = new byte[aColumnNameLength]; anInputStream.read(aColumnNameBuffer); anInputStream.read(aSizeBuffer); final int aColumnValueLength = fromBytes(aSizeBuffer); final byte[] aColumnValue = new byte[aColumnValueLength]; anInputStream.read(aColumnValue); theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue)); } } catch (Exception e) { throw new IllegalStateException(e); } finally { close(anInputStream); } } static Long getKeyAsLong(String key) { String keyString = key.replaceFirst("[a-zA-Z]*", ""); return Long.parseLong(keyString); } } diff --git a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java similarity index 93% rename from riak/src/main/java/com/yahoo/ycsb/db/package-info.java rename to riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java index 3f517a5d..dbaafb96 100644 --- a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java @@ -1,23 +1,23 @@ /** * Copyright (c) 2016 YCSB contributors All rights reserved. * Copyright 2014 Basho Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ /** - * The YCSB binding for Riak KV - * 2.0.x. + * The YCSB binding for Riak KV 2.0.x. + * */ -package com.yahoo.ycsb.db; \ No newline at end of file +package com.yahoo.ycsb.db.riak; \ No newline at end of file diff --git a/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java b/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java deleted file mode 100644 index 72092eff..00000000 --- a/riak/src/test/java/com/yahoo/ycsb/db/RiakKVClientTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright (c) 2016 YCSB contributors All rights reserved. - * Copyright 2014 Basho Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import java.util.*; - -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.Status; -import com.yahoo.ycsb.StringByteIterator; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * Integration tests for the Riak KV client. All tests are configured to return positive. - */ -public class RiakKVClientTest { - private static RiakKVClient riakClient; - private static String bucket = "testBucket"; - private static String startKey = "testKey"; - private static int recordsToInitiallyInsert = 10; - - /** - * Creates a cluster for testing purposes. - */ - @BeforeClass - public static void setUpClass() throws Exception { - riakClient = new RiakKVClient(); - riakClient.init(); - - HashMap values = new HashMap<>(); - - // Just add some random values to work on... - for (int i = 0; i < recordsToInitiallyInsert; i++) { - values.put("testRecord_1", "testValue_1"); - values.put("testRecord_2", "testValue_2"); - - riakClient.insert(bucket, startKey + String.valueOf(i), StringByteIterator.getByteIteratorMap(values)); - } - } - - /** - * Shuts down the cluster created. - */ - @AfterClass - public static void tearDownClass() throws Exception { - riakClient.cleanup(); - } - - /** - * Test method for read transaction. - */ - @Test - public void testRead() { - Set fields = new HashSet<>(); - fields.add("testRecord_1"); - fields.add("testRecord_2"); - - HashMap results = new HashMap<>(); - - assertEquals(Status.OK, riakClient.read(bucket, startKey + "1", fields, results)); - } - - /** - * Test method for scan transaction. - */ - @Test - public void testScan() { - Vector> results = new Vector<>(); - - assertEquals(Status.OK, riakClient.scan(bucket, startKey + "1", recordsToInitiallyInsert - 1, null, results)); - } - - /** - * Test method for update transaction. - */ - @Test - public void testUpdate() { - HashMap values = new HashMap<>(); - values.put("testRecord_1", "testValue_1_updated"); - values.put("testRecord_2", "testValue_2_updated"); - - assertEquals(Status.OK, riakClient.update(bucket, startKey + "0", StringByteIterator.getByteIteratorMap(values))); - } - - /** - * Test method for insert transaction. - */ - @Test - public void testInsert() { - HashMap values = new HashMap<>(); - values.put("testRecord_1", "testValue_1"); - values.put("testRecord_2", "testValue_2"); - - assertEquals(Status.OK, riakClient.insert(bucket, startKey + Integer.toString(recordsToInitiallyInsert), - StringByteIterator.getByteIteratorMap(values))); - } - - /** - * Test method for delete transaction. - */ - @Test - public void testDelete() { - assertEquals(Status.OK, riakClient.delete(bucket, startKey + "0")); - } -}