diff --git a/bin/bindings.properties b/bin/bindings.properties index 6f03c633..063a90cb 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -1,80 +1,81 @@ # # Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # #DATABASE BINDINGS # # Available bindings should be listed here in the form of # name:class # # - the name must start in column 0. # - the name is also the directory where the class can be found. # - if the directory contains multiple versions with different classes, # use a dash with the version. (e.g. cassandra-7, cassandra-cql) # accumulo:com.yahoo.ycsb.db.accumulo.AccumuloClient accumulo1.6:com.yahoo.ycsb.db.accumulo.AccumuloClient accumulo1.7:com.yahoo.ycsb.db.accumulo.AccumuloClient accumulo1.8:com.yahoo.ycsb.db.accumulo.AccumuloClient aerospike:com.yahoo.ycsb.db.AerospikeClient asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient arangodb:com.yahoo.ycsb.db.arangodb.ArangoDBClient arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDBClient azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient basic:com.yahoo.ycsb.BasicDB basicts:com.yahoo.ycsb.BasicTSDB cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient couchbase:com.yahoo.ycsb.db.CouchbaseClient couchbase2:com.yahoo.ycsb.db.couchbase2.Couchbase2Client dynamodb:com.yahoo.ycsb.db.DynamoDBClient elasticsearch:com.yahoo.ycsb.db.ElasticsearchClient elasticsearch5:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchClient elasticsearch5-rest:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchRestClient foundationdb:com.yahoo.ycsb.db.foundationdb.FoundationDBClient geode:com.yahoo.ycsb.db.GeodeClient googlebigtable:com.yahoo.ycsb.db.GoogleBigtableClient googledatastore:com.yahoo.ycsb.db.GoogleDatastoreClient hbase098:com.yahoo.ycsb.db.HBaseClient hbase10:com.yahoo.ycsb.db.HBaseClient10 hbase12:com.yahoo.ycsb.db.hbase12.HBaseClient12 hbase14:com.yahoo.ycsb.db.hbase14.HBaseClient14 hbase20:com.yahoo.ycsb.db.hbase14.HBaseClient20 hypertable:com.yahoo.ycsb.db.HypertableClient ignite:com.yahoo.ycsb.db.ignite.IgniteClient ignite-sql:com.yahoo.ycsb.db.ignite.IgniteSqlClient infinispan-cs:com.yahoo.ycsb.db.InfinispanRemoteClient infinispan:com.yahoo.ycsb.db.InfinispanClient jdbc:com.yahoo.ycsb.db.JdbcDBClient kudu:com.yahoo.ycsb.db.KuduYCSBClient memcached:com.yahoo.ycsb.db.MemcachedClient mongodb:com.yahoo.ycsb.db.MongoDbClient mongodb-async:com.yahoo.ycsb.db.AsyncMongoDbClient +mongodb-asyncnative:com.yahoo.ycsb.db.NativeAsyncMongoDbClient mysql:com.yahoo.ycsb.db.JAsyncMySQLDBClient nosqldb:com.yahoo.ycsb.db.NoSqlDbClient orientdb:com.yahoo.ycsb.db.OrientDBClient rados:com.yahoo.ycsb.db.RadosClient redis:com.yahoo.ycsb.db.RedisClient rest:com.yahoo.ycsb.webservice.rest.RestClient riak:com.yahoo.ycsb.db.riak.RiakKVClient rocksdb:com.yahoo.ycsb.db.rocksdb.RocksDBClient s3:com.yahoo.ycsb.db.S3Client solr:com.yahoo.ycsb.db.solr.SolrClient solr6:com.yahoo.ycsb.db.solr6.SolrClient tarantool:com.yahoo.ycsb.db.TarantoolClient diff --git a/bin/ycsb b/bin/ycsb index 0db74913..d77d8e8b 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -1,343 +1,344 @@ #!/usr/bin/env python # # Copyright (c) 2012 - 2015 YCSB contributors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # import errno import fnmatch import io import os import shlex import sys import subprocess try: mod = __import__('argparse') import argparse except ImportError: print >> sys.stderr, '[ERROR] argparse not found. Try installing it via "pip".' exit(1) BASE_URL = "https://github.com/brianfrankcooper/YCSB/tree/master/" COMMANDS = { "shell" : { "command" : "", "description" : "Interactive mode", "main" : "com.yahoo.ycsb.CommandLine", }, "load" : { "command" : "-load", "description" : "Execute the load phase", "main" : "com.yahoo.ycsb.Client", }, "run" : { "command" : "-t", "description" : "Execute the transaction phase", "main" : "com.yahoo.ycsb.Client", }, } DATABASES = { "accumulo" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "accumulo1.6" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "accumulo1.7" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "accumulo1.8" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "arangodb" : "com.yahoo.ycsb.db.arangodb.ArangoDBClient", "arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDBClient", "asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient", "azuredocumentdb" : "com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient", "azuretablestorage" : "com.yahoo.ycsb.db.azuretablestorage.AzureClient", "basic" : "com.yahoo.ycsb.BasicDB", "basicts" : "com.yahoo.ycsb.BasicTSDB", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cloudspanner" : "com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "couchbase2" : "com.yahoo.ycsb.db.couchbase2.Couchbase2Client", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticsearchClient", "elasticsearch5": "com.yahoo.ycsb.db.elasticsearch5.ElasticsearchClient", "foundationdb" : "com.yahoo.ycsb.db.foundationdb.FoundationDBClient", "geode" : "com.yahoo.ycsb.db.GeodeClient", "googlebigtable" : "com.yahoo.ycsb.db.GoogleBigtableClient", "googledatastore" : "com.yahoo.ycsb.db.GoogleDatastoreClient", "hbase098" : "com.yahoo.ycsb.db.HBaseClient", "hbase10" : "com.yahoo.ycsb.db.HBaseClient10", "hbase12" : "com.yahoo.ycsb.db.hbase12.HBaseClient12", "hbase14" : "com.yahoo.ycsb.db.hbase14.HBaseClient14", "hbase20" : "com.yahoo.ycsb.db.hbase20.HBaseClient20", "hypertable" : "com.yahoo.ycsb.db.HypertableClient", "ignite" : "com.yahoo.ycsb.db.ignite.IgniteClient", "ignite-sql" : "com.yahoo.ycsb.db.ignite.IgniteSqlClient", "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient", "infinispan" : "com.yahoo.ycsb.db.InfinispanClient", "jdbc" : "com.yahoo.ycsb.db.JdbcDBClient", "kudu" : "com.yahoo.ycsb.db.KuduYCSBClient", "memcached" : "com.yahoo.ycsb.db.MemcachedClient", "maprdb" : "com.yahoo.ycsb.db.mapr.MapRDBClient", "maprjsondb" : "com.yahoo.ycsb.db.mapr.MapRJSONDBClient", "mongodb" : "com.yahoo.ycsb.db.MongoDbClient", "mongodb-async": "com.yahoo.ycsb.db.AsyncMongoDbClient", + "mongodb-asyncnative": "com.yahoo.ycsb.db.NativeAsyncMongoDbClient", "mysql" : "com.yahoo.ycsb.db.JAsyncMySQLDBClient", "nosqldb" : "com.yahoo.ycsb.db.NoSqlDbClient", "orientdb" : "com.yahoo.ycsb.db.OrientDBClient", "rados" : "com.yahoo.ycsb.db.RadosClient", "redis" : "com.yahoo.ycsb.db.RedisClient", "rest" : "com.yahoo.ycsb.webservice.rest.RestClient", "riak" : "com.yahoo.ycsb.db.riak.RiakKVClient", "rocksdb" : "com.yahoo.ycsb.db.rocksdb.RocksDBClient", "s3" : "com.yahoo.ycsb.db.S3Client", "solr" : "com.yahoo.ycsb.db.solr.SolrClient", "solr6" : "com.yahoo.ycsb.db.solr6.SolrClient", "tarantool" : "com.yahoo.ycsb.db.TarantoolClient", } OPTIONS = { "-P file" : "Specify workload file", "-p key=value" : "Override workload property", "-s" : "Print status to stderr", "-target n" : "Target ops/sec (default: unthrottled)", "-threads n" : "Number of client threads (default: 1)", "-cp path" : "Additional Java classpath entries", "-jvm-args args" : "Additional arguments to the JVM", } def usage(): output = io.BytesIO() print >> output, "%s command database [options]" % sys.argv[0] print >> output, "\nCommands:" for command in sorted(COMMANDS.keys()): print >> output, " %s %s" % (command.ljust(14), COMMANDS[command]["description"]) print >> output, "\nDatabases:" for db in sorted(DATABASES.keys()): print >> output, " %s %s" % (db.ljust(14), BASE_URL + db.split("-")[0]) print >> output, "\nOptions:" for option in sorted(OPTIONS.keys()): print >> output, " %s %s" % (option.ljust(14), OPTIONS[option]) print >> output, """\nWorkload Files: There are various predefined workloads under workloads/ directory. See https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for the list of workload properties.""" return output.getvalue() # Python 2.6 doesn't have check_output. Add the method as it is in Python 2.7 # Based on https://github.com/python/cpython/blob/2.7/Lib/subprocess.py#L545 def check_output(*popenargs, **kwargs): r"""Run command with arguments and return its output as a byte string. If the exit code was non-zero it raises a CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute and output in the output attribute. The arguments are the same as for the Popen constructor. Example: >>> check_output(["ls", "-l", "/dev/null"]) 'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n' The stdout argument is not allowed as it is used internally. To capture standard error in the result, use stderr=STDOUT. >>> check_output(["/bin/sh", "-c", ... "ls -l non_existent_file ; exit 0"], ... stderr=STDOUT) 'ls: non_existent_file: No such file or directory\n' """ if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] error = subprocess.CalledProcessError(retcode, cmd) error.output = output raise error return output def debug(message): print >> sys.stderr, "[DEBUG] ", message def warn(message): print >> sys.stderr, "[WARN] ", message def error(message): print >> sys.stderr, "[ERROR] ", message def find_jars(dir, glob='*.jar'): jars = [] for (dirpath, dirnames, filenames) in os.walk(dir): for filename in fnmatch.filter(filenames, glob): jars.append(os.path.join(dirpath, filename)) return jars def get_ycsb_home(): dir = os.path.abspath(os.path.dirname(sys.argv[0])) while "LICENSE.txt" not in os.listdir(dir): dir = os.path.join(dir, os.path.pardir) return os.path.abspath(dir) def is_distribution(): # If there's a top level pom, we're a source checkout. otherwise a dist artifact return "pom.xml" not in os.listdir(get_ycsb_home()) # Run the maven dependency plugin to get the local jar paths. # presumes maven can run, so should only be run on source checkouts # will invoke the 'package' goal for the given binding in order to resolve intra-project deps # presumes maven properly handles system-specific path separators # Given module is full module name eg. 'core' or 'couchbase-binding' def get_classpath_from_maven(module): try: debug("Running 'mvn -pl com.yahoo.ycsb:" + module + " -am package -DskipTests " "dependency:build-classpath -DincludeScope=compile -Dmdep.outputFilterFile=true'") mvn_output = check_output(["mvn", "-pl", "com.yahoo.ycsb:" + module, "-am", "package", "-DskipTests", "dependency:build-classpath", "-DincludeScope=compile", "-Dmdep.outputFilterFile=true"]) # the above outputs a "classpath=/path/tojar:/path/to/other/jar" for each module # the last module will be the datastore binding line = [x for x in mvn_output.splitlines() if x.startswith("classpath=")][-1:] return line[0][len("classpath="):] except subprocess.CalledProcessError, err: error("Attempting to generate a classpath from Maven failed " "with return code '" + str(err.returncode) + "'. The output from " "Maven follows, try running " "'mvn -DskipTests package dependency:build-classpath' on your " "own and correct errors." + os.linesep + os.linesep + "mvn output:" + os.linesep + err.output) sys.exit(err.returncode) def main(): p = argparse.ArgumentParser( usage=usage(), formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument('-cp', dest='classpath', help="""Additional classpath entries, e.g. '-cp /tmp/hbase-1.0.1.1/conf'. Will be prepended to the YCSB classpath.""") p.add_argument("-jvm-args", default=[], type=shlex.split, help="""Additional arguments to pass to 'java', e.g. '-Xmx4g'""") p.add_argument("command", choices=sorted(COMMANDS), help="""Command to run.""") p.add_argument("database", choices=sorted(DATABASES), help="""Database to test.""") args, remaining = p.parse_known_args() ycsb_home = get_ycsb_home() # Use JAVA_HOME to find java binary if set, otherwise just use PATH. java = "java" java_home = os.getenv("JAVA_HOME") if java_home: java = os.path.join(java_home, "bin", "java") db_classname = DATABASES[args.database] command = COMMANDS[args.command]["command"] main_classname = COMMANDS[args.command]["main"] # Classpath set up binding = args.database.split("-")[0] if binding == "accumulo": warn("The 'accumulo' client has been deprecated in favor of version " "specific bindings. This name still maps to the binding for " "Accumulo 1.6, which is named 'accumulo-1.6'. This alias will " "be removed in a future YCSB release.") binding = "accumulo1.6" if binding == "accumulo1.6": warn("The 'accumulo1.6' client has been deprecated because Accumulo 1.6 " "is EOM. If you are using Accumulo 1.7+ try using the 'accumulo1.7' " "client instead.") if binding == "cassandra2": warn("The 'cassandra2-cql' client has been deprecated. It has been " "renamed to simply 'cassandra-cql'. This alias will be removed" " in the next YCSB release.") binding = "cassandra" if binding == "couchbase": warn("The 'couchbase' client has been deprecated. If you are using " "Couchbase 4.0+ try using the 'couchbase2' client instead.") if binding == "hbase098": warn("The 'hbase098' client has been deprecated because HBase 0.98 " "is EOM. If you are using HBase 1.2+ try using the 'hbase12' " "client instead.") if binding == "hbase10": warn("The 'hbase10' client has been deprecated because HBase 1.0 " "is EOM. If you are using HBase 1.2+ try using the 'hbase12' " "client instead.") if binding == "arangodb3": warn("The 'arangodb3' client has been deprecated. The binding 'arangodb' " "now covers every ArangoDB version. This alias will be removed " "in the next YCSB release.") binding = "arangodb" if is_distribution(): db_dir = os.path.join(ycsb_home, binding + "-binding") # include top-level conf for when we're a binding-specific artifact. # If we add top-level conf to the general artifact, starting here # will allow binding-specific conf to override (because it's prepended) cp = [os.path.join(ycsb_home, "conf")] cp.extend(find_jars(os.path.join(ycsb_home, "lib"))) cp.extend(find_jars(os.path.join(db_dir, "lib"))) else: warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") db_location = "core" if (binding == "basic" or binding == "basicts") else binding project = "core" if (binding == "basic" or binding == "basicts") else binding + "-binding" db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package maven_says = get_classpath_from_maven(project) # TODO when we have a version property, skip the glob cp = find_jars(os.path.join(db_dir, "target"), project + "*.jar") # alredy in jar:jar:jar form cp.append(maven_says) cp.insert(0, os.path.join(db_dir, "conf")) classpath = os.pathsep.join(cp) if args.classpath: classpath = os.pathsep.join([args.classpath, classpath]) ycsb_command = ([java] + args.jvm_args + ["-cp", classpath, main_classname, "-db", db_classname] + remaining) if command: ycsb_command.append(command) print >> sys.stderr, " ".join(ycsb_command) try: return subprocess.call(ycsb_command) except OSError as e: if e.errno == errno.ENOENT: error('Command failed. Is java installed and on your PATH?') return 1 else: raise if __name__ == '__main__': sys.exit(main()) diff --git a/mongodb/pom.xml b/mongodb/pom.xml index a9041967..7b5c396d 100644 --- a/mongodb/pom.xml +++ b/mongodb/pom.xml @@ -1,89 +1,94 @@ 4.0.0 com.yahoo.ycsb binding-parent 0.15.0 ../binding-parent mongodb-binding MongoDB Binding jar org.mongodb mongo-java-driver ${mongodb.version} + + org.mongodb + mongodb-driver-async + ${mongodb.version} + com.allanbank mongodb-async-driver ${mongodb.async.version} com.yahoo.ycsb core ${project.version} provided ch.qos.logback logback-classic 1.1.2 runtime junit junit 4.12 test org.xerial.snappy snappy-java 1.1.7.1 jar compile true always warn false never fail allanbank Allanbank Releases http://www.allanbank.com/repo/ default diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/NativeAsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/NativeAsyncMongoDbClient.java new file mode 100644 index 00000000..5226fc91 --- /dev/null +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/NativeAsyncMongoDbClient.java @@ -0,0 +1,474 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.db; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoClientSettings; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; +import com.mongodb.async.client.*; +import com.mongodb.async.client.MongoClient; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.model.*; +import com.mongodb.client.result.DeleteResult; +import com.yahoo.ycsb.AsyncDB; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import org.bson.Document; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.gte; +import static com.mongodb.client.model.Projections.fields; +import static com.mongodb.client.model.Projections.include; + +/** + * MongoDB asynchronous client for YCSB framework using the Asynchronous Java + * Driver + *

+ * See the README.md for configuration information. + *

+ * + * @author rjm + * @see Asynchronous + * Java Driver + */ +public class NativeAsyncMongoDbClient extends AsyncDB { + + /** + * Used to include a field in a response. + */ + protected static final int INCLUDE = 1; + + /** + * The database to use. + */ + private static String databaseName; + + /** + * The write concern for the requests. + */ + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + /** + * The connection to MongoDB. + */ + private static MongoClient mongoClient; + + /** + * The write concern for the requests. + */ + private static WriteConcern writeConcern; + + /** + * Which servers to use for reads. + */ + private static ReadPreference readPreference; + + /** + * The database to MongoDB. + */ + private MongoDatabase database; + + /** + * The batch size to use for inserts. + */ + private static int batchSize; + + /** + * If true then use updates with the upsert option for inserts. + */ + private static boolean useUpsert; + + /** + * The bulk inserts pending for the thread. + */ + private final ArrayList batchedWrites = new ArrayList<>(); + private final ArrayList> batchedUpserts = new ArrayList<>(); + + /** + * The number of writes in the batchedWrite. + */ + private int batchedWriteCount = 0; + + private final InsertManyOptions insertManyOptions = new InsertManyOptions().ordered(false); + private final UpdateOptions upsertOptions = new UpdateOptions().upsert(true); + private final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions().ordered(false); + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + @Override + public final void cleanup() throws DBException { + if (INIT_COUNT.decrementAndGet() == 0) { + try { + mongoClient.close(); + } catch (final Exception e1) { + System.err.println("Could not close MongoDB connection pool: " + + e1.toString()); + e1.printStackTrace(); + return; + } finally { + mongoClient = null; + database = null; + } + } + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. + */ + @Override + public final CompletableFuture delete(final String table, final String key) { + CompletableFuture deleteResult = new CompletableFuture<>(); + final MongoCollection collection = database.getCollection(table).withWriteConcern(writeConcern); + collection.deleteOne(eq(key), (r, ex) -> { + DeleteResult result = (DeleteResult) r; + if (ex != null) { + System.err.println(ex.toString()); + deleteResult.complete(Status.ERROR); + } else if (!result.wasAcknowledged() && writeConcern.isAcknowledged()) { + System.err.println("Delete was not acknowledged for key " + key); + deleteResult.complete(Status.UNEXPECTED_STATE); + } else if (result.getDeletedCount() == 0) { + System.err.println("Nothing deleted for key " + key); + deleteResult.complete(Status.NOT_FOUND); + } else { + deleteResult.complete(Status.OK); + } + }); + return deleteResult; + } + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public final void init() throws DBException { + final int count = INIT_COUNT.incrementAndGet(); + + synchronized (NativeAsyncMongoDbClient.class) { + final Properties props = getProperties(); + + if (mongoClient != null) { + database = mongoClient.getDatabase(databaseName); + +// // If there are more threads (count) than connections then the +// // Low latency spin lock is not really needed as we will keep +// // the connections occupied. +// if (count > mongoClient.getConfig().getMaxConnectionCount()) { +// mongoClient.getConfig().setLockType(LockType.MUTEX); +// } + + return; + } + + // Set insert batchsize, default 1 - to be YCSB-original equivalent + batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1")); + if (batchSize > 1) { + batchedWrites.ensureCapacity(batchSize); + } + + // Set is inserts are done as upserts. Defaults to false. + useUpsert = Boolean.parseBoolean( + props.getProperty("mongodb.upsert", "false")); + + // Just use the standard connection format URL + // http://docs.mongodb.org/manual/reference/connection-string/ + // to configure the client. + String url = + props + .getProperty("mongodb.url", "mongodb://localhost:27017/ycsb?w=1"); + if (!url.startsWith("mongodb://")) { + System.err.println("ERROR: Invalid URL: '" + url + + "'. Must be of the form " + + "'mongodb://:,:/database?" + + "options'. See " + + "http://docs.mongodb.org/manual/reference/connection-string/."); + System.exit(1); + } + + ConnectionString uri = new ConnectionString(url); + + try { + databaseName = uri.getDatabase(); + if ((databaseName == null) || databaseName.isEmpty()) { + // Default database is "ycsb" if database is not + // specified in URL + databaseName = "ycsb"; + } + + MongoClientSettings config = MongoClientSettings.builder().applyConnectionString(uri).build(); + mongoClient = MongoClients.create(config); + +// MongoClientConfiguration config = mongoClient.getConfig(); +// if (!url.toLowerCase().contains("locktype=")) { +// config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed... +// } + + readPreference = config.getReadPreference(); + writeConcern = config.getWriteConcern(); + + database = mongoClient.getDatabase(databaseName); + + System.out.println("mongo connection created with " + url); + } catch (final Exception e1) { + System.err + .println("Could not initialize MongoDB connection pool for Loader: " + + e1.toString()); + e1.printStackTrace(); + return; + } + } + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error. See the {@link AsyncDB} + * class's description for a discussion of error codes. + */ + @Override + public final CompletableFuture insert(final String table, final String key, + final Map values) { + CompletableFuture insertResult = new CompletableFuture<>(); + Consumer bulkWriteExceptionHandler = ex -> { + if (ex != null) { + if (ex instanceof MongoBulkWriteException) { + BulkWriteResult wResult = ((MongoBulkWriteException) ex).getWriteResult(); + if (!wResult.wasAcknowledged() && writeConcern.isAcknowledged()) { + System.err.println("Bulk insert was not acknowledged."); + insertResult.complete(Status.UNEXPECTED_STATE); + } else { + System.err.println("Bulk insert of " + batchSize + + " records failed for " + (batchSize - wResult.getInsertedCount()) + " records."); + insertResult.complete(Status.ERROR); + } + } else { + System.err.println("Exception while trying to bulk-insert " + batchSize + " records."); + insertResult.complete(Status.ERROR); + } + } else { + insertResult.complete(Status.OK); + } + }; + final MongoCollection collection = database.getCollection(table).withWriteConcern(writeConcern); + final Document toInsert = new Document("_id", key); + for (final Map.Entry entry : values.entrySet()) { + toInsert.append(entry.getKey(), entry.getValue().toArray()); + } + + if (batchSize <= 1) { + if (useUpsert) { + collection.updateOne(eq(key), new Document("$setOnInsert", toInsert), upsertOptions, (r, ex) -> { + if (ex != null) { + System.err.println("Exception while trying to insert one record: " + ex); + insertResult.complete(Status.ERROR); + } else if (!r.wasAcknowledged() && writeConcern.isAcknowledged()) { + System.err.println("Upsert was not acknowledged for key " + key); + insertResult.complete(Status.UNEXPECTED_STATE); + } else if (r.getUpsertedId() == null || !r.getUpsertedId().asString().toString().equals(key)) { + insertResult.complete(Status.NOT_FOUND); + } else { + insertResult.complete(Status.OK); + } + }); + } else { + collection.insertOne(toInsert, (r, ex) -> { + if (ex != null) { + System.err.println("Exception while trying to insert one record: " + ex); + insertResult.complete(Status.ERROR); + } else { + insertResult.complete(Status.OK); + } + }); + } + } else { + if (useUpsert) { + batchedUpserts.add(new UpdateOneModel<>(eq(key), new Document("$setOnInsert", toInsert), upsertOptions)); + } else { + batchedWrites.add(toInsert); + } + batchedWriteCount += 1; + + if (batchedWriteCount < batchSize) { + insertResult.complete(Status.BATCHED_OK); + } else { + if (useUpsert) { + collection.bulkWrite(batchedUpserts, bulkWriteOptions, (r, ex) -> bulkWriteExceptionHandler.accept(ex)); + batchedUpserts.clear(); + } else { + collection.insertMany(batchedWrites, insertManyOptions, (r, ex) -> bulkWriteExceptionHandler.accept(ex)); + batchedWrites.clear(); + } + batchedWriteCount = 0; + } + } + + return insertResult; + } + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error or "not found". + */ + @Override + public final CompletableFuture read(final String table, final String key, + final Set fields, final Map result) { + CompletableFuture readResult = new CompletableFuture<>(); + final MongoCollection collection = database.getCollection(table).withReadPreference(readPreference); + FindIterable findQuery = collection.find(eq(key)).limit(1).batchSize(1); + if (fields != null) { + findQuery = findQuery.projection(fields(include(new ArrayList<>(fields)))); + } + findQuery.first((r, ex) -> { + if (ex != null) { + System.err.println("Exception while trying to read key " + key + ": " + ex); + readResult.complete(Status.ERROR); + } else { + if (r == null) { + readResult.complete(Status.NOT_FOUND); + } else { + r.forEach((k, v) -> result.put(k, new StringByteIterator((String) v))); + readResult.complete(Status.OK); + } + } + }); + + return readResult; + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error. See the {@link AsyncDB} + * class's description for a discussion of error codes. + */ + @Override + public final CompletableFuture scan(final String table, final String startkey, + final int recordcount, final Set fields, + final Vector> result) { + CompletableFuture scanResult = new CompletableFuture<>(); + final MongoCollection collection = database.getCollection(table).withReadPreference(readPreference); + FindIterable scanQuery = collection.find(gte("_id", startkey)) + .limit(recordcount).batchSize(recordcount).sort(Sorts.ascending("_id")); + if (fields != null) { + scanQuery = scanQuery.projection(fields(include(new ArrayList<>(fields)))); + } + + result.ensureCapacity(recordcount); + + scanQuery.forEach((r -> { + HashMap map = new HashMap<>(); + r.forEach((k, v) -> map.put(k, new StringByteIterator((String) v))); + result.add(map); + }), (r, ex) -> { + if (ex != null) { + System.err.println("Exception while trying to scan from key " + startkey + + " " + recordcount + " records: " + ex); + scanResult.complete(Status.ERROR); + } else if (result.size() < recordcount) { + System.err.println("Scan returned fewer records than expected: " + result.size() + + " instead of " + recordcount + "."); + scanResult.complete(Status.NOT_FOUND); + } else { + scanResult.complete(Status.OK); + } + }); + + return scanResult; + + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error. See the {@link DB} + * class's description for a discussion of error codes. + */ + @Override + public final CompletableFuture update(final String table, final String key, + final Map values) { + + CompletableFuture updateResult = new CompletableFuture<>(); + final MongoCollection collection = database.getCollection(table).withWriteConcern(writeConcern); + Document toUpdate = new Document(); + for (final Map.Entry entry : values.entrySet()) { + toUpdate.append(entry.getKey(), entry.getValue()); + } + + collection.updateOne(eq(key), new Document("$set", toUpdate), (r, ex) -> { + if (ex != null) { + System.err.println("Exception while trying to update one record: " + ex); + updateResult.complete(Status.ERROR); + } else if (!r.wasAcknowledged() && writeConcern.isAcknowledged()) { + System.err.println("Update was not acknowledged for key " + key); + updateResult.complete(Status.UNEXPECTED_STATE); + } else if (r.getMatchedCount() < 1) { + updateResult.complete(Status.NOT_FOUND); + } else { + updateResult.complete(Status.OK); + } + }); + + return updateResult; + } + +} diff --git a/pom.xml b/pom.xml index ec3483d3..da1d3d7c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,225 +1,225 @@ 4.0.0 com.yahoo.ycsb root 0.15.0 pom YCSB Root This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure. scm:git:git://github.com/brianfrankcooper/YCSB.git master https://github.com/brianfrankcooper/YCSB com.puppycrawl.tools checkstyle 7.7.1 org.jdom jdom 1.1 com.google.collections google-collections 1.0 org.slf4j slf4j-api 1.7.25 2.5.5 2.10 1.6.6 1.7.4 1.9.1 3.1.2 4.4.1 1.7.1 1.8.1 4.0.0 3.0.0 0.24.0-beta 1.4.10 2.3.1 5.5.1 5.2.5 1.2.0 1.3.0 0.98.14-hadoop2 1.0.2 1.2.5 1.4.2 2.0.0 0.9.5.6 2.6.0 7.2.2.Final 1.6.0 1.1.8-mapr-1710 - 3.6.3 + 3.8.2 2.0.1 2.1.1 2.2.10 UTF-8 2.9.0 2.0.5 5.11.3 1.10.20 5.5.3 6.4.1 1.6.5 0.8.0 0.81 core binding-parent distribution accumulo1.6 accumulo1.7 accumulo1.8 aerospike arangodb asynchbase azuredocumentdb azuretablestorage cassandra cloudspanner couchbase couchbase2 dynamodb elasticsearch elasticsearch5 foundationdb geode googlebigtable googledatastore hbase098 hbase10 hbase12 hbase14 hbase20 hypertable ignite infinispan jdbc kudu maprdb maprjsondb memcached mongodb mysql nosqldb orientdb rados redis rest riak rocksdb s3 solr solr6 tarantool org.apache.maven.plugins maven-checkstyle-plugin 2.16 org.apache.maven.plugins maven-enforcer-plugin 3.0.0-M1 enforce-maven enforce 3.1.0 org.apache.maven.plugins maven-compiler-plugin 3.7.0 1.8 1.8 org.apache.maven.plugins maven-checkstyle-plugin validate validate check checkstyle.xml