diff --git a/bin/ycsb b/bin/ycsb index 16c5a82a..de01a975 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -1,259 +1,260 @@ #!/usr/bin/env python # # Copyright (c) 2012 - 2015 YCSB contributors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # import errno import fnmatch import io import os import shlex import sys import subprocess try: import argparse except ImportError: print >> sys.stderr, '[ERROR] argparse not found. Try installing it via "pip".' raise BASE_URL = "https://github.com/brianfrankcooper/YCSB/tree/master/" COMMANDS = { "shell" : { "command" : "", "description" : "Interactive mode", "main" : "com.yahoo.ycsb.CommandLine", }, "load" : { "command" : "-load", "description" : "Execute the load phase", "main" : "com.yahoo.ycsb.Client", }, "run" : { "command" : "-t", "description" : "Execute the transaction phase", "main" : "com.yahoo.ycsb.Client", }, } DATABASES = { "accumulo" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "basic" : "com.yahoo.ycsb.BasicDB", "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticsearchClient", "geode" : "com.yahoo.ycsb.db.GeodeClient", "googledatastore" : "com.yahoo.ycsb.db.GoogleDatastoreClient", "hbase094" : "com.yahoo.ycsb.db.HBaseClient", "hbase098" : "com.yahoo.ycsb.db.HBaseClient", "hbase10" : "com.yahoo.ycsb.db.HBaseClient10", "hypertable" : "com.yahoo.ycsb.db.HypertableClient", "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient", "infinispan" : "com.yahoo.ycsb.db.InfinispanClient", "jdbc" : "com.yahoo.ycsb.db.JdbcDBClient", "kudu" : "com.yahoo.ycsb.db.KuduYCSBClient", "mapkeeper" : "com.yahoo.ycsb.db.MapKeeperClient", "memcached" : "com.yahoo.ycsb.db.MemcachedClient", "mongodb" : "com.yahoo.ycsb.db.MongoDbClient", "mongodb-async": "com.yahoo.ycsb.db.AsyncMongoDbClient", "nosqldb" : "com.yahoo.ycsb.db.NoSqlDbClient", "orientdb" : "com.yahoo.ycsb.db.OrientDBClient", "redis" : "com.yahoo.ycsb.db.RedisClient", + "riak" : "com.yahoo.ycsb.db.RiakKVClient", "s3" : "com.yahoo.ycsb.db.S3Client", "solr" : "com.yahoo.ycsb.db.SolrClient", "tarantool" : "com.yahoo.ycsb.db.TarantoolClient", "voldemort" : "com.yahoo.ycsb.db.VoldemortClient" } OPTIONS = { "-P file" : "Specify workload file", "-p key=value" : "Override workload property", "-s" : "Print status to stderr", "-target n" : "Target ops/sec (default: unthrottled)", "-threads n" : "Number of client threads (default: 1)", "-cp path" : "Additional Java classpath entries", "-jvm-args args" : "Additional arguments to the JVM", } def usage(): output = io.BytesIO() print >> output, "%s command database [options]" % sys.argv[0] print >> output, "\nCommands:" for command in sorted(COMMANDS.keys()): print >> output, " %s %s" % (command.ljust(14), COMMANDS[command]["description"]) print >> output, "\nDatabases:" for db in sorted(DATABASES.keys()): print >> output, " %s %s" % (db.ljust(14), BASE_URL + db.split("-")[0]) print >> output, "\nOptions:" for option in sorted(OPTIONS.keys()): print >> output, " %s %s" % (option.ljust(14), OPTIONS[option]) print >> output, """\nWorkload Files: There are various predefined workloads under workloads/ directory. See https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for the list of workload properties.""" return output.getvalue() def check_output(cmd): p = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout, _ = p.communicate() if p.returncode: raise subprocess.CalledProcessError(p.returncode, cmd) return stdout def debug(message): print >> sys.stderr, "[DEBUG] ", message def warn(message): print >> sys.stderr, "[WARN] ", message def error(message): print >> sys.stderr, "[ERROR] ", message def find_jars(dir, glob='*.jar'): jars = [] for (dirpath, dirnames, filenames) in os.walk(dir): for filename in fnmatch.filter(filenames, glob): jars.append(os.path.join(dirpath, filename)) return jars def get_ycsb_home(): dir = os.path.abspath(os.path.dirname(sys.argv[0])) while "LICENSE.txt" not in os.listdir(dir): dir = os.path.join(dir, os.path.pardir) return os.path.abspath(dir) def is_distribution(): # If there's a top level pom, we're a source checkout. otherwise a dist artifact return "pom.xml" not in os.listdir(get_ycsb_home()) # Run the maven dependency plugin to get the local jar paths. # presumes maven can run, so should only be run on source checkouts # will invoke the 'package' goal for the given binding in order to resolve intra-project deps # presumes maven properly handles system-specific path separators # Given module is full module name eg. 'core' or 'couchbase-binding' def get_classpath_from_maven(module): try: debug("Running 'mvn -pl com.yahoo.ycsb:" + module + " -am package -DskipTests " "dependency:build-classpath -DincludeScope=compile -Dmdep.outputFilterFile=true'") mvn_output = check_output(["mvn", "-pl", "com.yahoo.ycsb:" + module, "-am", "package", "-DskipTests", "dependency:build-classpath", "-DincludeScope=compile", "-Dmdep.outputFilterFile=true"]) # the above outputs a "classpath=/path/tojar:/path/to/other/jar" for each module # the last module will be the datastore binding line = [x for x in mvn_output.splitlines() if x.startswith("classpath=")][-1:] return line[0][len("classpath="):] except subprocess.CalledProcessError, err: error("Attempting to generate a classpath from Maven failed " "with return code '" + str(err.returncode) + "'. The output from " "Maven follows, try running " "'mvn -DskipTests package dependency:build=classpath' on your " "own and correct errors." + os.linesep + os.linesep + "mvn output:" + os.linesep + err.output) sys.exit(err.returncode) def main(): p = argparse.ArgumentParser( usage=usage(), formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument('-cp', dest='classpath', help="""Additional classpath entries, e.g. '-cp /tmp/hbase-1.0.1.1/conf'. Will be prepended to the YCSB classpath.""") p.add_argument("-jvm-args", default=[], type=shlex.split, help="""Additional arguments to pass to 'java', e.g. '-Xmx4g'""") p.add_argument("command", choices=sorted(COMMANDS), help="""Command to run.""") p.add_argument("database", choices=sorted(DATABASES), help="""Database to test.""") args, remaining = p.parse_known_args() ycsb_home = get_ycsb_home() # Use JAVA_HOME to find java binary if set, otherwise just use PATH. java = "java" java_home = os.getenv("JAVA_HOME") if java_home: java = os.path.join(java_home, "bin", "java") db_classname = DATABASES[args.database] command = COMMANDS[args.command]["command"] main_classname = COMMANDS[args.command]["main"] # Classpath set up binding = args.database.split("-")[0] # Deprecation message for the entire cassandra-binding if binding == "cassandra": warn("The 'cassandra-7', 'cassandra-8', 'cassandra-10', and " "cassandra-cql' clients are deprecated. If you are using " "Cassandra 2.X try using the 'cassandra2-cql' client instead.") if is_distribution(): db_dir = os.path.join(ycsb_home, binding + "-binding") # include top-level conf for when we're a binding-specific artifact. # If we add top-level conf to the general artifact, starting here # will allow binding-specific conf to override (because it's prepended) cp = [os.path.join(ycsb_home, "conf")] cp.extend(find_jars(os.path.join(ycsb_home, "lib"))) cp.extend(find_jars(os.path.join(db_dir, "lib"))) else: warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") db_location = "core" if binding == "basic" else binding project = "core" if binding == "basic" else binding + "-binding" db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package maven_says = get_classpath_from_maven(project) # TODO when we have a version property, skip the glob cp = find_jars(os.path.join(db_dir, "target"), project + "*.jar") # alredy in jar:jar:jar form cp.append(maven_says) cp.insert(0, os.path.join(db_dir, "conf")) classpath = os.pathsep.join(cp) if args.classpath: classpath = os.pathsep.join([args.classpath, classpath]) ycsb_command = ([java] + args.jvm_args + ["-cp", classpath, main_classname, "-db", db_classname] + remaining) if command: ycsb_command.append(command) print >> sys.stderr, " ".join(ycsb_command) try: return subprocess.call(ycsb_command) except OSError as e: if e.errno == errno.ENOENT: error('Command failed. Is java installed and on your PATH?') return 1 else: raise if __name__ == '__main__': sys.exit(main()) diff --git a/pom.xml b/pom.xml index d1f662fb..a5085d37 100644 --- a/pom.xml +++ b/pom.xml @@ -1,170 +1,172 @@ 4.0.0 com.yahoo.ycsb root 0.9.0-SNAPSHOT pom YCSB Root This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure. scm:git:git://github.com/brianfrankcooper/YCSB.git master https://github.com/brianfrankcooper/YCSB checkstyle checkstyle 5.0 org.jdom jdom 1.1 com.google.collections google-collections 1.0 org.slf4j slf4j-api 1.6.4 2.5.5 2.10 0.94.27 0.98.14-hadoop2 1.0.2 1.6.0 1.2.9 1.0.3 3.0.0 1.0.0-incubating.M1 7.2.2.Final 0.6.0 2.1.1 3.0.3 2.0.1 2.1.8 2.0.0 1.10.20 0.81 UTF-8 0.8.0 0.9.5.6 1.4.10 1.6.5 + 2.0.5 3.1.2 5.4.0 core binding-parent accumulo aerospike cassandra cassandra2 couchbase - distribution dynamodb elasticsearch geode googledatastore hbase094 hbase098 hbase10 hypertable infinispan jdbc kudu memcached mongodb nosqldb orientdb redis + riak s3 solr tarantool + distribution org.apache.maven.plugins maven-checkstyle-plugin 2.15 org.apache.maven.plugins maven-compiler-plugin 3.3 1.7 1.7 org.apache.maven.plugins maven-checkstyle-plugin validate validate check checkstyle.xml diff --git a/riak/README.md b/riak/README.md new file mode 100644 index 00000000..ca536caf --- /dev/null +++ b/riak/README.md @@ -0,0 +1,51 @@ +Riak KV Client for Yahoo! Cloud System Benchmark (YCSB) +-------------------------------------------------------- + +The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.0.X line of the Riak KV database. + +Creating a bucket type to use with YCSB +---------------------------- + +Perform the following operations on your Riak cluster to configure it for the benchmarks. + +Set the default backend for Riak to LevelDB in `riak.conf` (required to support secondary indexes used for the scan workloads): + +``` +storage_backend = leveldb +``` + +Create a bucket type named "ycsb"[1](#f1) by logging into one of the nodes in your cluster. Then if you want to use the + +* default consistency model (i.e. eventual), run the following riak-admin commands: + +``` +riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}' +riak-admin bucket-type activate ycsb +``` + +* strong consistency model, type: + +``` +riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false","consistent":true}}' +riak-admin bucket-type activate ycsb +``` +Note that you may want to specify the number of replicas to create for each object. To do so, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3). + +Riak KV configuration parameters +---------------------------- +You can either specify these configuration parameters via command line or set them in the `riak.properties` file. + +* `riak.hosts` - string list, comma separated list of IPs or FQDNs. Example: `riak.hosts=127.0.0.1,127.0.0.2,127.0.0.3` or `riak.hosts=riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com`. +* `riak.port` - int, the port on which every node is listening. It must match the one specified in the `riak.conf` file at the line `listener.protobuf.internal`. +* `riak.bucket_type` - string, it must match the value of the bucket type created during setup (see section above). +* `riak.r_val` - int, the R value represents the number of Riak nodes that must return results for a read before the read is considered successful. +* `riak.w_val` - int, the W value represents the number of Riak nodes that must report success before an update is considered complete. +* `riak.read_retry_count` - int, the number of times the client will try to read a key from Riak. +* `riak.wait_time_before_retry` - int, the time (in milliseconds) before client attempts to perform another read if the previous one failed. +* `riak.transaction_time_limit` - int, the time (in seconds) the client waits before aborting the current transaction. +* `riak.strong_consistency` - boolean, indicates whether to use strong consistency (true) or eventual consistency (false). +* `riak.debug` - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur. + +Note: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload + +1 As specified in the `riak.properties` file. See parameters configuration section for further info. [↩](#a1) diff --git a/riak/pom.xml b/riak/pom.xml new file mode 100644 index 00000000..a7430a3d --- /dev/null +++ b/riak/pom.xml @@ -0,0 +1,52 @@ + + + + + 4.0.0 + + + com.yahoo.ycsb + binding-parent + 0.9.0-SNAPSHOT + ../binding-parent + + + riak-binding + Riak KV Binding + jar + + + + com.basho.riak + riak-client + 2.0.5 + + + com.yahoo.ycsb + core + ${project.version} + provided + + + com.google.collections + google-collections + 1.0 + + + + \ No newline at end of file diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java b/riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java new file mode 100644 index 00000000..83c9be60 --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/RiakKVClient.java @@ -0,0 +1,493 @@ +/* + * Copyright 2016 nygard_89 + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.ycsb.db; + +import com.basho.riak.client.api.commands.kv.UpdateValue; +import com.basho.riak.client.core.RiakFuture; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.ByteIterator; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.basho.riak.client.api.RiakClient; +import com.basho.riak.client.api.cap.Quorum; +import com.basho.riak.client.api.commands.indexes.IntIndexQuery; +import com.basho.riak.client.api.commands.kv.DeleteValue; +import com.basho.riak.client.api.commands.kv.FetchValue; +import com.basho.riak.client.api.commands.kv.StoreValue; +import com.basho.riak.client.api.commands.kv.StoreValue.Option; +import com.basho.riak.client.core.RiakCluster; +import com.basho.riak.client.core.RiakNode; +import com.basho.riak.client.core.query.Location; +import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.RiakObject; +import com.basho.riak.client.core.query.indexes.LongIntIndex; +import com.basho.riak.client.core.util.BinaryValue; + +import static com.yahoo.ycsb.db.RiakUtils.getKeyAsLong; +import static com.yahoo.ycsb.db.RiakUtils.serializeTable; + + +/** + * @author nygard_89 + * @author Basho Technologies, Inc. + * + */ +public final class RiakKVClient extends DB { + private static final String HOST_PROPERTY = "riak.hosts"; + private static final String PORT_PROPERTY = "riak.port"; + private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type"; + private static final String R_VALUE_PROPERTY = "riak.r_val"; + private static final String W_VALUE_PROPERTY = "riak.w_val"; + private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count"; + private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry"; + private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit"; + private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency"; + private static final String DEBUG_PROPERTY = "riak.debug"; + + private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time " + + "for transaction indicated"); + + private String[] hosts; + private int port; + private String bucketType; + private Quorum rQuorumValue; + private Quorum wQuorumValue; + private int readRetryCount; + private int waitTimeBeforeRetry; + private int transactionTimeLimit; + private boolean strongConsistency; + private boolean debug; + + private RiakClient riakClient; + private RiakCluster riakCluster; + + private void loadDefaultProperties() { + InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties"); + Properties propsPF = new Properties(System.getProperties()); + + try { + propsPF.load(propFile); + } catch (IOException e) { + e.printStackTrace(); + } + + hosts = propsPF.getProperty(HOST_PROPERTY).split(","); + port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY)); + bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY); + rQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY))); + wQuorumValue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY))); + readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY)); + waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY)); + transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY)); + strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY)); + debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY)); + } + + private void loadProperties() { + loadDefaultProperties(); + + Properties props = getProperties(); + + String portString = props.getProperty(PORT_PROPERTY); + if (portString != null) { + port = Integer.parseInt(portString); + } + + String hostsString = props.getProperty(HOST_PROPERTY); + if (hostsString != null) { + hosts = hostsString.split(","); + } + + String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY); + if (bucketTypeString != null) { + bucketType = bucketTypeString; + } + + String rQuorumValueString = props.getProperty(R_VALUE_PROPERTY); + if (rQuorumValueString != null) { + rQuorumValue = new Quorum(Integer.parseInt(rQuorumValueString)); + } + + String wQuorumValueString = props.getProperty(W_VALUE_PROPERTY); + if (wQuorumValueString != null) { + wQuorumValue = new Quorum(Integer.parseInt(wQuorumValueString)); + } + + String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY); + if (readRetryCountString != null) { + readRetryCount = Integer.parseInt(readRetryCountString); + } + + String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY); + if (waitTimeBeforeRetryString != null) { + waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString); + } + + String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY); + if (transactionTimeLimitString != null) { + transactionTimeLimit = Integer.parseInt(transactionTimeLimitString); + } + + String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY); + if (strongConsistencyString != null) { + strongConsistency = Boolean.parseBoolean(strongConsistencyString); + } + + String debugString = props.getProperty(DEBUG_PROPERTY); + if (debugString != null) { + debug = Boolean.parseBoolean(debugString); + } + } + + public void init() throws DBException { + loadProperties(); + + if (debug) { + System.out.println("DEBUG ENABLED. Configuration parameters:"); + System.out.println("-----------------------------------------"); + System.out.println("Hosts: " + Arrays.toString(hosts)); + System.out.println("Port: " + port); + System.out.println("Bucket Type: " + bucketType); + System.out.println("R Quorum Value: " + rQuorumValue.toString()); + System.out.println("W Quorum Value: " + wQuorumValue.toString()); + System.out.println("Read Retry Count: " + readRetryCount); + System.out.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms"); + System.out.println("Transaction Time Limit: " + transactionTimeLimit + " s"); + System.out.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual")); + } + + RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port); + List nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts)); + riakCluster = new RiakCluster.Builder(nodes).build(); + + try { + riakCluster.start(); + riakClient = new RiakClient(riakCluster); + } catch (Exception e) { + System.err.println("Unable to properly start up the cluster. Reason: " + e.toString()); + throw new DBException(e); + } + } + + /** + * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status read(String table, String key, Set fields, HashMap result) { + Location location = new Location(new Namespace(bucketType, table), key); + FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rQuorumValue).build(); + + try { + FetchValue.Response response = fetch(fv); + + if (response.isNotFound()) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: NOT FOUND"); + } + + return Status.NOT_FOUND; + } + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + + /** + * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in + * a HashMap. + *

+ * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB. + * + * @param table The name of the table (Riak bucket) + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + Namespace ns = new Namespace(bucketType, table); + + IntIndexQuery iiq = new IntIndexQuery + .Builder(ns, "key", getKeyAsLong(startkey), 999999999999999999L) + .withMaxResults(recordcount) + .withPaginationSort(true) + .build(); + + RiakFuture future = riakClient.executeAsync(iiq); + + try { + IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS); + List entries = response.getEntries(); + + for (IntIndexQuery.Response.Entry entry : entries) { + Location location = entry.getRiakObjectLocation(); + FetchValue fv = new FetchValue.Builder(location) + .withOption(FetchValue.Option.R, rQuorumValue) + .build(); + + FetchValue.Response keyResponse = fetch(fv); + + if (keyResponse.isNotFound()) { + if (debug) { + System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: NOT " + + "FOUND"); + } + + return Status.NOT_FOUND; + } + } + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to scan starting from key " + startkey + ", aborting transaction. Reason: " + + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + /** + * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated, + * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the + * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after + * transactionTimeLimit, the transaction is discarded immediately. + * + * @param fv The value to fetch from the cluster. + */ + private FetchValue.Response fetch(FetchValue fv) throws TimeoutException { + FetchValue.Response response = null; + + for (int i = 0; i < readRetryCount; i++) { + RiakFuture future = riakClient.executeAsync(fv); + + try { + response = future.get(transactionTimeLimit, TimeUnit.SECONDS); + + if (!response.isNotFound()) { + break; + } + } catch (TimeoutException e) { + // Let the callee decide how to handle this exception... + throw new TimeoutException(); + } catch (Exception e) { + // Sleep for a few ms before retrying... + try { + Thread.sleep(waitTimeBeforeRetry); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + + return response; + } + + /** + * Insert a record in the database. Any field/value pairs in the specified values HashMap + * will be written into the record with the specified record key. Also creates a + * secondary index (2i) for each record consisting of the key converted to long to be used + * for the scan operation + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status insert(String table, String key, HashMap values) { + Location location = new Location(new Namespace(bucketType, table), key); + RiakObject object = new RiakObject(); + + object.setValue(BinaryValue.create(serializeTable(values))); + object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); + + StoreValue store = new StoreValue.Builder(object) + .withLocation(location) + .withOption(Option.W, wQuorumValue) + .build(); + + RiakFuture future = riakClient.executeAsync(store); + + try { + future.get(transactionTimeLimit, TimeUnit.SECONDS); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] + .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] + .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + /** + * Define a class to permit object substitution within the update operation, following the same, identical steps + * made in an insert operation. This is needed for strong-consistent updates. + */ + private static final class UpdateEntity extends UpdateValue.Update { + private final RiakObject object; + + private UpdateEntity(RiakObject e) { + this.object = e; + } + + /* Simply returns the object. + */ + @Override + public RiakObject apply(RiakObject original) { + return object; + } + } + + /** + * Update a record in the database. Any field/value pairs in the specified values + * HashMap will be written into the record with the specified + * record key, overwriting any existing values with the same field name. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status update(String table, String key, HashMap values) { + if (!strongConsistency) { + return insert(table, key, values); + } + + Location location = new Location(new Namespace(bucketType, table), key); + RiakObject object = new RiakObject(); + + object.setValue(BinaryValue.create(serializeTable(values))); + object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); + + UpdateValue update = new UpdateValue.Builder(location) + .withFetchOption(FetchValue.Option.DELETED_VCLOCK, true) + .withStoreOption(Option.W, wQuorumValue) + .withUpdate(new UpdateEntity(object)) + .build(); + + RiakFuture future = riakClient.executeAsync(update); + + try { + future.get(transactionTimeLimit, TimeUnit.SECONDS); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to update key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to update key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + + /** + * Delete a record from the database. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status delete(String table, String key) { + Location location = new Location(new Namespace(bucketType, table), key); + DeleteValue dv = new DeleteValue.Builder(location).build(); + + RiakFuture future = riakClient.executeAsync(dv); + + try { + future.get(transactionTimeLimit, TimeUnit.SECONDS); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to delete key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to delete key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + public void cleanup() throws DBException { + try { + riakCluster.shutdown(); + } catch (Exception e) { + System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString()); + throw new DBException(e); + } + } +} diff --git a/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java new file mode 100644 index 00000000..2f4d74ca --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/RiakUtils.java @@ -0,0 +1,85 @@ +/* + * Copyright 2016 nygard_89 + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.ycsb.db; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Set; + +import com.yahoo.ycsb.ByteIterator; + +/** + * @author nygard_89 + * @author Basho Technologies, Inc. + * + */ +final class RiakUtils { + + private RiakUtils() { + super(); + } + + private static byte[] toBytes(final int anInteger) { + byte[] aResult = new byte[4]; + + aResult[0] = (byte) (anInteger >> 24); + aResult[1] = (byte) (anInteger >> 16); + aResult[2] = (byte) (anInteger >> 8); + aResult[3] = (byte) (anInteger /* >> 0 */); + + return aResult; + } + + private static void close(final OutputStream anOutputStream) { + try { + anOutputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + static byte[] serializeTable(Map aTable) { + final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream(); + final Set> theEntries = aTable.entrySet(); + + try { + for (final Map.Entry anEntry : theEntries) { + final byte[] aColumnName = anEntry.getKey().getBytes(); + + anOutputStream.write(toBytes(aColumnName.length)); + anOutputStream.write(aColumnName); + + final byte[] aColumnValue = anEntry.getValue().toArray(); + + anOutputStream.write(toBytes(aColumnValue.length)); + anOutputStream.write(aColumnValue); + } + return anOutputStream.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); + } finally { + close(anOutputStream); + } + } + + static Long getKeyAsLong(String key) { + String keyString = key.replace("user", "").replaceFirst("^0*", ""); + return Long.parseLong(keyString); + } +} diff --git a/riak/src/main/java/com/yahoo/ycsb/db/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 00000000..7e1d7212 --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The YCSB binding for Riak KV + * 2.0+. + */ +package com.yahoo.ycsb.db; \ No newline at end of file diff --git a/riak/src/main/resources/riak.properties b/riak/src/main/resources/riak.properties new file mode 100644 index 00000000..62eddbcf --- /dev/null +++ b/riak/src/main/resources/riak.properties @@ -0,0 +1,39 @@ +# RiakDBClient - Default Properties +# Note: Change the properties below to set the values to use for your test. You can set them either here or from the +# command line. Note that the latter choice overrides these settings. + +# riak.hosts - string list, comma separated list of IPs or FQDNs. +# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com +riak.hosts=127.0.0.1 + +# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file +# at the line "listener.protobuf.internal". +riak.port=8087 + +# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information +riak.bucket_type=ycsb + +# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read +# is considered successful. +riak.r_val=2 + +# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is +# considered complete. +riak.w_val=2 + +# riak.read_retry_count - int, number of times the client will try to read a key from Riak. +riak.read_retry_count=5 + +# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another +# read if the previous one failed. +riak.wait_time_before_retry=200 + +# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction. +riak.transaction_time_limit=10 + +# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false). +riak.strong_consistency=false + +# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark +# is started. +riak.debug=false \ No newline at end of file