diff --git a/bin/ycsb b/bin/ycsb index 54dcfbbd..b839e91a 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -1,232 +1,233 @@ #!/usr/bin/env python # # Copyright (c) 2012 - 2015 YCSB contributors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # import argparse import fnmatch import io import os import shlex import sys import subprocess BASE_URL = "https://github.com/brianfrankcooper/YCSB/tree/master/" COMMANDS = { "shell" : { "command" : "", "description" : "Interactive mode", "main" : "com.yahoo.ycsb.CommandLine", }, "load" : { "command" : "-load", "description" : "Execute the load phase", "main" : "com.yahoo.ycsb.Client", }, "run" : { "command" : "-t", "description" : "Execute the transaction phase", "main" : "com.yahoo.ycsb.Client", }, } DATABASES = { "accumulo" : "com.yahoo.ycsb.db.AccumuloClient", "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "basic" : "com.yahoo.ycsb.BasicDB", "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient", "gemfire" : "com.yahoo.ycsb.db.GemFireClient", "googledatastore" : "com.yahoo.ycsb.db.GoogleDatastoreClient", "hbase094" : "com.yahoo.ycsb.db.HBaseClient", "hbase098" : "com.yahoo.ycsb.db.HBaseClient", "hbase10" : "com.yahoo.ycsb.db.HBaseClient10", "hypertable" : "com.yahoo.ycsb.db.HypertableClient", "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient", "infinispan" : "com.yahoo.ycsb.db.InfinispanClient", "jdbc" : "com.yahoo.ycsb.db.JdbcDBClient", "kudu" : "com.yahoo.ycsb.db.KuduYCSBClient", "mapkeeper" : "com.yahoo.ycsb.db.MapKeeperClient", "mongodb" : "com.yahoo.ycsb.db.MongoDbClient", "mongodb-async": "com.yahoo.ycsb.db.AsyncMongoDbClient", "nosqldb" : "com.yahoo.ycsb.db.NoSqlDbClient", "orientdb" : "com.yahoo.ycsb.db.OrientDBClient", "redis" : "com.yahoo.ycsb.db.RedisClient", + "s3" : "com.yahoo.ycsb.db.S3Client", "tarantool" : "com.yahoo.ycsb.db.TarantoolClient", "voldemort" : "com.yahoo.ycsb.db.VoldemortClient" } OPTIONS = { "-P file" : "Specify workload file", "-p key=value" : "Override workload property", "-s" : "Print status to stderr", "-target n" : "Target ops/sec (default: unthrottled)", "-threads n" : "Number of client threads (default: 1)", "-cp path" : "Additional Java classpath entries", "-jvm-args args" : "Additional arguments to the JVM", } def usage(): output = io.BytesIO() print >> output, "%s command database [options]" % sys.argv[0] print >> output, "\nCommands:" for command in sorted(COMMANDS.keys()): print >> output, " %s %s" % (command.ljust(14), COMMANDS[command]["description"]) print >> output, "\nDatabases:" for db in sorted(DATABASES.keys()): print >> output, " %s %s" % (db.ljust(14), BASE_URL + db.split("-")[0]) print >> output, "\nOptions:" for option in sorted(OPTIONS.keys()): print >> output, " %s %s" % (option.ljust(14), OPTIONS[option]) print >> output, """\nWorkload Files: There are various predefined workloads under workloads/ directory. See https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for the list of workload properties.""" return output.getvalue() def debug(message): print >> sys.stderr, "[DEBUG] ", message def warn(message): print >> sys.stderr, "[WARN] ", message def error(message): print >> sys.stderr, "[ERROR] ", message def find_jars(dir, glob='*.jar'): jars = [] for (dirpath, dirnames, filenames) in os.walk(dir): for filename in fnmatch.filter(filenames, glob): jars.append(os.path.join(dirpath, filename)) return jars def get_ycsb_home(): dir = os.path.abspath(os.path.dirname(sys.argv[0])) while "LICENSE.txt" not in os.listdir(dir): dir = os.path.join(dir, os.path.pardir) return os.path.abspath(dir) def is_distribution(): # If there's a top level pom, we're a source checkout. otherwise a dist artifact return "pom.xml" not in os.listdir(get_ycsb_home()) # Run the maven dependency plugin to get the local jar paths. # presumes maven can run, so should only be run on source checkouts # will invoke the 'package' goal for the given binding in order to resolve intra-project deps # presumes maven properly handles system-specific path separators # Given module is full module name eg. 'core' or 'couchbase-binding' def get_classpath_from_maven(module): try: debug("Running 'mvn -pl com.yahoo.ycsb:" + module + " -am package -DskipTests " "dependency:build-classpath -DincludeScope=compile -Dmdep.outputFilterFile=true'") mvn_output = subprocess.check_output(["mvn", "-pl", "com.yahoo.ycsb:" + module, "-am", "package", "-DskipTests", "dependency:build-classpath", "-DincludeScope=compile", "-Dmdep.outputFilterFile=true"]) # the above outputs a "classpath=/path/tojar:/path/to/other/jar" for each module # the last module will be the datastore binding line = [x for x in mvn_output.splitlines() if x.startswith("classpath=")][-1:] return line[0][len("classpath="):] except subprocess.CalledProcessError, err: error("Attempting to generate a classpath from Maven failed " "with return code '" + str(err.returncode) + "'. The output from " "Maven follows, try running " "'mvn -DskipTests package dependency:build=classpath' on your " "own and correct errors." + os.linesep + os.linesep + "mvn output:" + os.linesep + err.output) sys.exit(err.returncode) def main(): p = argparse.ArgumentParser( usage=usage(), formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument('-cp', dest='classpath', help="""Additional classpath entries, e.g. '-cp /tmp/hbase-1.0.1.1/conf'. Will be prepended to the YCSB classpath.""") p.add_argument("-jvm-args", default=[], type=shlex.split, help="""Additional arguments to pass to 'java', e.g. '-Xmx4g'""") p.add_argument("command", choices=sorted(COMMANDS), help="""Command to run.""") p.add_argument("database", choices=sorted(DATABASES), help="""Database to test.""") args, remaining = p.parse_known_args() ycsb_home = get_ycsb_home() # Use JAVA_HOME to find java binary if set, otherwise just use PATH. java = "java" java_home = os.getenv("JAVA_HOME") if java_home: java = os.path.join(java_home, "bin", "java") db_classname = DATABASES[args.database] command = COMMANDS[args.command]["command"] main_classname = COMMANDS[args.command]["main"] # Classpath set up binding = args.database.split("-")[0] if is_distribution(): db_dir = os.path.join(ycsb_home, binding + "-binding") # include top-level conf for when we're a binding-specific artifact. # If we add top-level conf to the general artifact, starting here # will allow binding-specific conf to override (because it's prepended) cp = [os.path.join(ycsb_home, "conf")] cp.extend(find_jars(os.path.join(ycsb_home, "lib"))) cp.extend(find_jars(os.path.join(db_dir, "lib"))) else: warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") db_location = "core" if binding == "basic" else binding project = "core" if binding == "basic" else binding + "-binding" db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package maven_says = get_classpath_from_maven(project) # TODO when we have a version property, skip the glob cp = find_jars(os.path.join(db_dir, "target"), project + "*.jar") # alredy in jar:jar:jar form cp.append(maven_says) cp.insert(0, os.path.join(db_dir, "conf")) classpath = os.pathsep.join(cp) if args.classpath: classpath = os.pathsep.join([args.classpath, classpath]) ycsb_command = ([java] + args.jvm_args + ["-cp", classpath, main_classname, "-db", db_classname] + remaining) if command: ycsb_command.append(command) print >> sys.stderr, " ".join(ycsb_command) return subprocess.call(ycsb_command) if __name__ == '__main__': sys.exit(main()) diff --git a/distribution/pom.xml b/distribution/pom.xml index 9c11636a..998daa56 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -1,172 +1,177 @@ 4.0.0 com.yahoo.ycsb root 0.6.0-SNAPSHOT ycsb YCSB Release Distribution Builder pom This module creates the release package of the YCSB with all DB library bindings. It is only used by the build process and does not contain any real code of itself. com.yahoo.ycsb core ${project.version} com.yahoo.ycsb accumulo-binding ${project.version} com.yahoo.ycsb aerospike-binding ${project.version} com.yahoo.ycsb cassandra-binding ${project.version} com.yahoo.ycsb couchbase-binding ${project.version} com.yahoo.ycsb dynamodb-binding ${project.version} com.yahoo.ycsb elasticsearch-binding ${project.version} com.yahoo.ycsb gemfire-binding ${project.version} com.yahoo.ycsb googledatastore-binding ${project.version} com.yahoo.ycsb hbase094-binding ${project.version} com.yahoo.ycsb hbase098-binding ${project.version} com.yahoo.ycsb hbase10-binding ${project.version} com.yahoo.ycsb hypertable-binding ${project.version} com.yahoo.ycsb infinispan-binding ${project.version} com.yahoo.ycsb jdbc-binding ${project.version} com.yahoo.ycsb kudu-binding ${project.version} com.yahoo.ycsb mongodb-binding ${project.version} com.yahoo.ycsb orientdb-binding ${project.version} com.yahoo.ycsb redis-binding ${project.version} + + com.yahoo.ycsb + s3-binding + ${project.version} + com.yahoo.ycsb tarantool-binding ${project.version} org.apache.maven.plugins maven-assembly-plugin ${maven.assembly.version} src/main/assembly/distribution.xml false posix package single diff --git a/pom.xml b/pom.xml index a948a5de..b42d651d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,159 +1,161 @@ 4.0.0 com.yahoo.ycsb root 0.6.0-SNAPSHOT pom YCSB Root This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure. scm:git:git://github.com/brianfrankcooper/YCSB.git master https://github.com/brianfrankcooper/YCSB checkstyle checkstyle 5.0 org.jdom jdom 1.1 com.google.collections google-collections 1.0 org.slf4j slf4j-api 1.6.4 2.5.5 2.10 0.94.27 0.98.14-hadoop2 1.0.2 1.6.0 1.2.9 1.0.3 2.1.8 8.1.0 7.2.2.Final 0.5.0 2.1.1 3.0.3 2.0.1 1.0.1 2.0.0 + 1.10.20 0.81 UTF-8 0.8.0 0.9.5.6 1.1.8 1.6.1 3.1.2 core binding-parent accumulo aerospike cassandra cassandra2 couchbase distribution dynamodb elasticsearch gemfire googledatastore hbase094 hbase098 hbase10 hypertable infinispan jdbc kudu mongodb orientdb redis + s3 tarantool org.apache.maven.plugins maven-compiler-plugin 3.3 - 1.6 - 1.6 + 1.7 + 1.7 org.apache.maven.plugins maven-checkstyle-plugin 2.15 true checkstyle.xml validate validate checkstyle diff --git a/s3/README.md b/s3/README.md new file mode 100644 index 00000000..347449de --- /dev/null +++ b/s3/README.md @@ -0,0 +1,79 @@ + +Quick Start +=============== +### 1. Set Up YCSB + +Download the YCSB from this website: + + https://github.com/brianfrankcooper/YCSB/releases/ + +You can choose to download either the full stable version or just one of the available binding. + +### 2. Configuration of the AWS credentials + +The access key ID and secret access key as well as the endPoint and region and the Client configurations like the maxErrorRetry can be set in a properties file under s3-binding/conf/s3.properties or sent by command line (see below). +It is highly suggested to use the property file instead of to send the credentials through the command line. + + +### 3. Run YCSB + +To execute the benchmark using the S3 storage binding, first files must be uploaded using the "load" option with this command: + + ./bin/ycsb load s3 -p table=theBucket -p s3.endPoint=s3.amazonaws.com -p s3.accessKeyId=yourAccessKeyId -p s3.secretKey=yourSecretKey -p fieldlength=10 -p fieldcount=20 -P workloads/workloada + +With this command, the workload A will be executing with the loading phase. The file size is determined by the number of fields (fieldcount) and by the field size (fieldlength). In this case each file is 200 bytes (10 bytes for each field multiplied by 20 fields). + +Running the command: + + ./bin/ycsb -t s3 -p table=theBucket -p s3.endPoint=s3.amazonaws.com -p s3.accessKeyId=yourAccessKeyId -p s3.secretKey=yourSecretKey -p fieldlength=10 -p fieldcount=20 -P workloads/workloada + +the workload A will be executed with file size 200 bytes. + +#### S3 Storage Configuration Parameters + +The parameters to configure the S3 client can be set using the file "s3-binding/conf/s3.properties". This is highly advisable for the parameters s3.accessKeyId and s3.secretKey. All the other parameters can be set also on the command line. Here the list of all the parameters that is possible to configure: + +- `table` + - This should be a S3 Storage bucket name and it replace the standard table name assigned by YCSB. + +- `s3.endpoint` + - This indicate the endpoint used to connect to the S3 Storage service. + - Default value is `s3.amazonaws.com`. + +- `s3.region` + - This indicate the region where your buckets are. + - Default value is `us-east-1`. + +- `s3.accessKeyId` + - This is the accessKey of your S3 account. + +- `s3.secretKey` + - This is the secret associated with your S3 account. + +- `s3.maxErrorRetry` + - This is the maxErrorRetry parameter for the S3Client. + +- `s3.protocol` + - This is the protocol parameter for the S3Client. The default value is HTTPS. + +- `s3.sse` + - This parameter set to true activates the Server Side Encryption. + +- `s3.ssec` + - This parameter if not null activates the SSE-C client side encryption. The value passed with this parameter is the client key used to encrpyt the files. + diff --git a/s3/pom.xml b/s3/pom.xml new file mode 100644 index 00000000..3ffcc58a --- /dev/null +++ b/s3/pom.xml @@ -0,0 +1,68 @@ + + + + + 4.0.0 + + com.yahoo.ycsb + binding-parent + 0.5.0-SNAPSHOT + ../binding-parent + + + s3-binding + S3 Storage Binding + jar + + + + com.amazonaws + aws-java-sdk-s3 + ${s3.version} + + + + com.yahoo.ycsb + core + ${project.version} + provided + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.15 + + true + ../checkstyle.xml + true + true + + + + validate + validate + + checkstyle + + + + + + + diff --git a/s3/src/main/conf/s3.properties b/s3/src/main/conf/s3.properties new file mode 100644 index 00000000..f36c7745 --- /dev/null +++ b/s3/src/main/conf/s3.properties @@ -0,0 +1,31 @@ +# +# Sample S3 configuration properties +# +# You may either set properties here or via the command line. +# + +# the AWS S3 access key ID +s3.accessKeyId=yourKey + +# the AWS S3 secret access key ID +s3.secretKey=YourSecret + +# the AWS endpoint +s3.endpoint=s3.amazonaws.com + +# activating the SSE server side encryption if true +s3.sse=false + +# activating the SSE-C client side encryption if used +#s3.ssec=U2CccCI40he2mZtg2aCEzofP7nQsfy4nP14VSYu6bFA= + +# set the protocol to use for the Client, default is HTTPS +#s3.protocol=HTTPS + +# set the maxConnections to use for the Client, it should be not less than the +# threads since only one client is created and shared between threads +#s3.maxConnections= + +# set the maxErrorRetry parameter to use for the Client +#s3.maxErrorRetry= + diff --git a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java new file mode 100644 index 00000000..c017b013 --- /dev/null +++ b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java @@ -0,0 +1,537 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + * + * S3 storage client binding for YCSB. + */ +package com.yahoo.ycsb.db; + +import java.util.HashMap; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.net.*; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.*; +import com.amazonaws.auth.*; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.Protocol; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.SSECustomerKey; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; + +/** + * S3 Storage client for YCSB framework. + * + * Properties to set: + * + * s3.accessKeyId=access key S3 aws + * s3.secretKey=secret key S3 aws + * s3.endPoint=s3.amazonaws.com + * s3.region=us-east-1 + * The parameter table is the name of the Bucket where to upload the files. + * This must be created before to start the benchmark + * The size of the file to upload is determined by two parameters: + * - fieldcount this is the number of fields of a record in YCSB + * - fieldlength this is the size in bytes of a single field in the record + * together these two parameters define the size of the file to upload, + * the size in bytes is given by the fieldlength multiplied by the fieldcount. + * The name of the file is determined by the parameter key. + *This key is automatically generated by YCSB. + * + */ +public class S3Client extends DB { + + private static AmazonS3Client s3Client; + private static String sse; + private static SSECustomerKey ssecKey; + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + /** + * Cleanup any state for this storage. + * Called once per S3 instance; + */ + @Override + public void cleanup() throws DBException { + if (INIT_COUNT.decrementAndGet() == 0) { + try { + s3Client.shutdown(); + System.out.println("The client is shutdown successfully"); + } catch (Exception e){ + System.err.println("Could not shutdown the S3Client: "+e.toString()); + e.printStackTrace(); + } finally { + if (s3Client != null){ + s3Client = null; + } + } + } + } + /** + * Delete a file from S3 Storage. + * + * @param bucket + * The name of the bucket + * @param key + * The record key of the file to delete. + * @return Zero on success, a non-zero error code on error. See the + * {@link DB} class's description for a discussion of error codes. + */ + @Override + public int delete(String bucket, String key) { + try { + s3Client.deleteObject(new DeleteObjectRequest(bucket, key)); + } catch (Exception e){ + System.err.println("Not possible to delete the key "+key); + e.printStackTrace(); + return 1; + } + return 0; + } + /** + * Initialize any state for the storage. + * Called once per S3 instance; If the client is not null it is re-used. + */ + @Override + public void init() throws DBException { + final int count = INIT_COUNT.incrementAndGet(); + synchronized (S3Client.class){ + Properties propsCL = getProperties(); + int recordcount = Integer.parseInt( + propsCL.getProperty("recordcount")); + int operationcount = Integer.parseInt( + propsCL.getProperty("operationcount")); + int numberOfOperations = 0; + if (recordcount > 0){ + if (recordcount > operationcount){ + numberOfOperations = recordcount; + } else { + numberOfOperations = operationcount; + } + } else { + numberOfOperations = operationcount; + } + if (count <= numberOfOperations) { + String accessKeyId = null; + String secretKey = null; + String endPoint = null; + String region = null; + String maxErrorRetry = null; + String maxConnections = null; + String protocol = null; + BasicAWSCredentials s3Credentials; + ClientConfiguration clientConfig; + if (s3Client != null) { + System.out.println("Reusing the same client"); + return; + } + try { + InputStream propFile = S3Client.class.getClassLoader() + .getResourceAsStream("s3.properties"); + Properties props = new Properties(System.getProperties()); + props.load(propFile); + accessKeyId = props.getProperty("s3.accessKeyId"); + if (accessKeyId == null){ + accessKeyId = propsCL.getProperty("s3.accessKeyId"); + } + System.out.println(accessKeyId); + secretKey = props.getProperty("s3.secretKey"); + if (secretKey == null){ + secretKey = propsCL.getProperty("s3.secretKey"); + } + System.out.println(secretKey); + endPoint = props.getProperty("s3.endPoint"); + if (endPoint == null){ + endPoint = propsCL.getProperty("s3.endPoint", "s3.amazonaws.com"); + } + System.out.println(endPoint); + region = props.getProperty("s3.region"); + if (region == null){ + region = propsCL.getProperty("s3.region", "us-east-1"); + } + System.out.println(region); + maxErrorRetry = props.getProperty("s3.maxErrorRetry"); + if (maxErrorRetry == null){ + maxErrorRetry = propsCL.getProperty("s3.maxErrorRetry", "15"); + } + maxConnections = props.getProperty("s3.maxConnections"); + if (maxConnections == null){ + maxConnections = propsCL.getProperty("s3.maxConnections"); + } + protocol = props.getProperty("s3.protocol"); + if (protocol == null){ + protocol = propsCL.getProperty("s3.protocol", "HTTPS"); + } + sse = props.getProperty("s3.sse"); + if (sse == null){ + sse = propsCL.getProperty("s3.sse", "false"); + } + String ssec = props.getProperty("s3.ssec"); + if (ssec == null){ + ssec = propsCL.getProperty("s3.ssec", null); + } else { + ssecKey = new SSECustomerKey(ssec); + } + } catch (Exception e){ + System.err.println("The file properties doesn't exist "+e.toString()); + e.printStackTrace(); + } + try { + System.out.println("Inizializing the S3 connection"); + s3Credentials = new BasicAWSCredentials(accessKeyId, secretKey); + clientConfig = new ClientConfiguration(); + clientConfig.setMaxErrorRetry(Integer.parseInt(maxErrorRetry)); + if(protocol.equals("HTTP")) { + clientConfig.setProtocol(Protocol.HTTP); + } else { + clientConfig.setProtocol(Protocol.HTTPS); + } + if(maxConnections != null) { + clientConfig.setMaxConnections(Integer.parseInt(maxConnections)); + } + s3Client = new AmazonS3Client(s3Credentials, clientConfig); + s3Client.setRegion(Region.getRegion(Regions.fromName(region))); + s3Client.setEndpoint(endPoint); + System.out.println("Connection successfully initialized"); + } catch (Exception e){ + System.err.println("Could not connect to S3 storage: "+ e.toString()); + e.printStackTrace(); + throw new DBException(e); + } + } else { + System.err.println( + "The number of threads must be less or equal than the operations"); + throw new DBException(new Error( + "The number of threads must be less or equal than the operations")); + } + } + } + /** + * Create a new File in the Bucket. Any field/value pairs in the specified + * values HashMap will be written into the file with the specified record + * key. + * + * @param bucket + * The name of the bucket + * @param key + * The record key of the file to insert. + * @param values + * A HashMap of field/value pairs to insert in the file. + * Only the content of the first field is written to a byteArray + * multiplied by the number of field. In this way the size + * of the file to upload is determined by the fieldlength + * and fieldcount parameters. + * @return Zero on success, a non-zero error code on error. See the + * {@link DB} class's description for a discussion of error codes. + */ + @Override + public int insert(String bucket, String key, + HashMap values) { + return writeToStorage(bucket, key, values, true, sse, ssecKey); + } + /** + * Read a file from the Bucket. Each field/value pair from the result + * will be stored in a HashMap. + * + * @param bucket + * The name of the bucket + * @param key + * The record key of the file to read. + * @param fields + * The list of fields to read, or null for all of them, + * it is null by default + * @param result + * A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error or "not found". + */ + @Override + public int read(String bucket, String key, Set fields, + HashMap result) { + return readFromStorage(bucket, key, result, ssecKey); + } + /** + * Update a file in the database. Any field/value pairs in the specified + * values HashMap will be written into the file with the specified file + * key, overwriting any existing values with the same field name. + * + * @param bucket + * The name of the bucket + * @param key + * The file key of the file to write. + * @param values + * A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. + */ + @Override + public int update(String bucket, String key, + HashMap values) { + return writeToStorage(bucket, key, values, false, sse, ssecKey); + } + /** + * Perform a range scan for a set of files in the bucket. Each + * field/value pair from the result will be stored in a HashMap. + * + * @param bucket + * The name of the bucket + * @param startkey + * The file key of the first file to read. + * @param recordcount + * The number of files to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one file + * @return Zero on success, a non-zero error code on error. See the + * {@link DB} class's description for a discussion of error codes. + */ + @Override + public int scan(String bucket, String startkey, int recordcount, + Set fields, Vector> result) { + return scanFromStorage(bucket, startkey, recordcount, result, ssecKey); + } + /** + * Upload a new object to S3 or update an object on S3. + * + * @param bucket + * The name of the bucket + * @param key + * The file key of the object to upload/update. + * @param values + * The data to be written on the object + * @param updateMarker + * A boolean value. If true a new object will be uploaded + * to S3. If false an existing object will be re-uploaded + * + */ + protected int writeToStorage(String bucket, String key, + HashMap values, Boolean updateMarker, + String sseLocal, SSECustomerKey ssecLocal) { + int totalSize = 0; + int fieldCount = values.size(); //number of fields to concatenate + // getting the first field in the values + Object keyToSearch = values.keySet().toArray()[0]; + // getting the content of just one field + byte[] sourceArray = values.get(keyToSearch).toArray(); + int sizeArray = sourceArray.length; //size of each array + if (updateMarker){ + totalSize = sizeArray*fieldCount; + } else { + try { + GetObjectRequest getObjectRequest = null; + GetObjectMetadataRequest getObjectMetadataRequest = null; + if (ssecLocal != null) { + getObjectRequest = new GetObjectRequest(bucket, + key).withSSECustomerKey(ssecLocal); + getObjectMetadataRequest = new GetObjectMetadataRequest(bucket, + key).withSSECustomerKey(ssecLocal); + } else { + getObjectRequest = new GetObjectRequest(bucket, key); + getObjectMetadataRequest = new GetObjectMetadataRequest(bucket, + key); + } + S3Object object = + s3Client.getObject(getObjectRequest); + ObjectMetadata objectMetadata = + s3Client.getObjectMetadata(getObjectMetadataRequest); + int sizeOfFile = (int)objectMetadata.getContentLength(); + fieldCount = sizeOfFile/sizeArray; + totalSize = sizeOfFile; + } catch (Exception e){ + System.err.println("Not possible to get the object :"+key); + e.printStackTrace(); + return 1; + } + } + byte[] destinationArray = new byte[totalSize]; + int offset = 0; + for (int i = 0; i < fieldCount; i++) { + System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray); + offset += sizeArray; + } + try (InputStream input = new ByteArrayInputStream(destinationArray)) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(totalSize); + PutObjectRequest putObjectRequest = null; + if (sseLocal.equals("true")) { + metadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + putObjectRequest = new PutObjectRequest(bucket, key, + input, metadata); + } else if (ssecLocal != null) { + putObjectRequest = new PutObjectRequest(bucket, key, + input, metadata).withSSECustomerKey(ssecLocal); + } else { + putObjectRequest = new PutObjectRequest(bucket, key, + input, metadata); + } + + try { + PutObjectResult res = + s3Client.putObject(putObjectRequest); + if(res.getETag() == null) { + return 1; + } else { + if (sseLocal.equals("true")) { + System.out.println("Uploaded object encryption status is " + + res.getSSEAlgorithm()); + } else if (ssecLocal != null) { + System.out.println("Uploaded object encryption status is " + + res.getSSEAlgorithm()); + } + } + } catch (Exception e) { + System.err.println("Not possible to write object :"+key); + e.printStackTrace(); + return 1; + } finally { + return 0; + } + } catch (Exception e) { + System.err.println("Error in the creation of the stream :"+e.toString()); + e.printStackTrace(); + return 1; + } + } + + /** + * Download an object from S3. + * + * @param bucket + * The name of the bucket + * @param key + * The file key of the object to upload/update. + * @param result + * The Hash map where data from the object are written + * + */ + protected int readFromStorage(String bucket, String key, + HashMap result, SSECustomerKey ssecLocal) { + try { + GetObjectRequest getObjectRequest = null; + GetObjectMetadataRequest getObjectMetadataRequest = null; + if (ssecLocal != null) { + getObjectRequest = new GetObjectRequest(bucket, + key).withSSECustomerKey(ssecLocal); + getObjectMetadataRequest = new GetObjectMetadataRequest(bucket, + key).withSSECustomerKey(ssecLocal); + } else { + getObjectRequest = new GetObjectRequest(bucket, key); + getObjectMetadataRequest = new GetObjectMetadataRequest(bucket, + key); + } + S3Object object = + s3Client.getObject(getObjectRequest); + ObjectMetadata objectMetadata = + s3Client.getObjectMetadata(getObjectMetadataRequest); + InputStream objectData = object.getObjectContent(); //consuming the stream + // writing the stream to bytes and to results + int sizeOfFile = (int)objectMetadata.getContentLength(); + byte[] inputStreamToByte = new byte[sizeOfFile]; + objectData.read(inputStreamToByte, 0, sizeOfFile); + result.put(key, new ByteArrayByteIterator(inputStreamToByte)); + objectData.close(); + } catch (Exception e){ + System.err.println("Not possible to get the object "+key); + e.printStackTrace(); + return 1; + } finally { + return 0; + } + } + + /** + * Perform an emulation of a database scan operation on a S3 bucket. + * + * @param bucket + * The name of the bucket + * @param startkey + * The file key of the first file to read. + * @param recordcount + * The number of files to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one file + * + */ + protected int scanFromStorage(String bucket, String startkey, + int recordcount, Vector> result, + SSECustomerKey ssecLocal) { + + int counter = 0; + ObjectListing listing = s3Client.listObjects(bucket); + List summaries = listing.getObjectSummaries(); + List keyList = new ArrayList(); + int startkeyNumber = 0; + int numberOfIteration = 0; + // getting the list of files in the bucket + while (listing.isTruncated()) { + listing = s3Client.listNextBatchOfObjects(listing); + summaries.addAll(listing.getObjectSummaries()); + } + for (S3ObjectSummary summary : summaries) { + String summaryKey = summary.getKey(); + keyList.add(summaryKey); + } + // Sorting the list of files in Alphabetical order + Collections.sort(keyList); // sorting the list + // Getting the position of the startingfile for the scan + for (String key : keyList) { + if (key.equals(startkey)){ + startkeyNumber = counter; + } else { + counter = counter + 1; + } + } + // Checking if the total number of file is bigger than the file to read, + // if not using the total number of Files + if (recordcount < keyList.size()) { + numberOfIteration = recordcount; + } else { + numberOfIteration = keyList.size(); + } + // Reading the Files starting from the startkey File till the end + // of the Files or Till the recordcount number + for (int i = startkeyNumber; i < numberOfIteration; i++){ + HashMap resultTemp = + new HashMap(); + readFromStorage(bucket, keyList.get(i), resultTemp, + ssecLocal); + result.add(resultTemp); + } + return 0; + } +} diff --git a/s3/src/main/java/com/yahoo/ycsb/db/package-info.java b/s3/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 00000000..42859b2f --- /dev/null +++ b/s3/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + * + * S3 storage client binding for YCSB. + */ + +package com.yahoo.ycsb.db; +