Page MenuHomec4science

S3Client.java
No OneTemporary

File Metadata

Created
Wed, May 8, 23:56

S3Client.java

/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*
* S3 storage client binding for YCSB.
*/
package com.yahoo.ycsb.db;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.*;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.*;
import com.amazonaws.auth.*;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
/**
* S3 Storage client for YCSB framework.
*
* Properties to set:
*
* s3.accessKeyId=access key S3 aws
* s3.secretKey=secret key S3 aws
* s3.endPoint=s3.amazonaws.com
* s3.region=us-east-1
* The parameter table is the name of the Bucket where to upload the files.
* This must be created before to start the benchmark
* The size of the file to upload is determined by two parameters:
* - fieldcount this is the number of fields of a record in YCSB
* - fieldlength this is the size in bytes of a single field in the record
* together these two parameters define the size of the file to upload,
* the size in bytes is given by the fieldlength multiplied by the fieldcount.
* The name of the file is determined by the parameter key.
*This key is automatically generated by YCSB.
*
*/
public class S3Client extends DB {
private static AmazonS3Client s3Client;
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
//private static int initCount = 0;
/**
* Cleanup any state for this storage.
* Called once per S3 instance;
*/
@Override
public void cleanup() throws DBException {
if (INIT_COUNT.decrementAndGet() == 0) {
try {
s3Client.shutdown();
System.out.println("The client is shutdown successfully");
} catch (Exception e){
System.err.println("Could not shutdown the S3Client: "+e.toString());
e.printStackTrace();
} finally {
if (s3Client != null){
s3Client = null;
}
}
}
}
/**
* Delete a file from S3 Storage.
*
* @param bucket
* The name of the bucket
* @param key
* The record key of the file to delete.
* @return Zero on success, a non-zero error code on error. See the
* {@link DB} class's description for a discussion of error codes.
*/
@Override
public int delete(String bucket, String key) {
try {
s3Client.deleteObject(new DeleteObjectRequest(bucket, key));
} catch (Exception e){
System.err.println("Not possible to delete the key "+key);
e.printStackTrace();
return 1;
}
return 0;
}
/**
* Initialize any state for the storage.
* Called once per S3 instance; If the client is not null it is re-used.
*/
@Override
public void init() throws DBException {
final int count = INIT_COUNT.incrementAndGet();
synchronized (S3Client.class){
Properties propsCL = getProperties();
int recordcount = Integer.parseInt(
propsCL.getProperty("recordcount"));
int operationcount = Integer.parseInt(
propsCL.getProperty("operationcount"));
int numberOfOperations = 0;
if (recordcount > 0){
if (recordcount > operationcount){
numberOfOperations = recordcount;
} else {
numberOfOperations = operationcount;
}
} else {
numberOfOperations = operationcount;
}
if (count <= numberOfOperations) {
String accessKeyId = null;
String secretKey = null;
String endPoint = null;
String region = null;
String maxErrorRetry = null;
BasicAWSCredentials s3Credentials;
ClientConfiguration clientConfig;
if (s3Client != null) {
System.out.println("Reusing the same client");
return;
}
try {
InputStream propFile = S3Client.class.getClassLoader()
.getResourceAsStream("s3.properties");
Properties props = new Properties(System.getProperties());
props.load(propFile);
accessKeyId = props.getProperty("s3.accessKeyId");
if (accessKeyId == null){
accessKeyId = propsCL.getProperty("s3.accessKeyId");
}
System.out.println(accessKeyId);
secretKey = props.getProperty("s3.secretKey");
if (secretKey == null){
secretKey = propsCL.getProperty("s3.secretKey");
}
System.out.println(secretKey);
endPoint = props.getProperty("s3.endPoint");
if (endPoint == null){
endPoint = propsCL.getProperty("s3.endPoint", "s3.amazonaws.com");
}
System.out.println(endPoint);
region = props.getProperty("s3.region");
if (region == null){
region = propsCL.getProperty("s3.region", "us-east-1");
}
System.out.println(region);
maxErrorRetry = props.getProperty("s3.maxErrorRetry");
if (maxErrorRetry == null){
maxErrorRetry = propsCL.getProperty("s3.maxErrorRetry", "15");
}
System.out.println(maxErrorRetry);
} catch (Exception e){
System.err.println("The file properties doesn't exist "+e.toString());
e.printStackTrace();
}
try {
System.out.println("Inizializing the S3 connection");
s3Credentials = new BasicAWSCredentials(accessKeyId, secretKey);
clientConfig = new ClientConfiguration();
clientConfig.setMaxErrorRetry(Integer.parseInt(maxErrorRetry));
s3Client = new AmazonS3Client(s3Credentials, clientConfig);
s3Client.setRegion(Region.getRegion(Regions.fromName(region)));
s3Client.setEndpoint(endPoint);
System.out.println("Connection successfully initialized");
} catch (Exception e){
System.err.println("Could not connect to S3 storage: "+ e.toString());
e.printStackTrace();
throw new DBException(e);
}
} else {
System.err.println(
"The number of threads must be less or equal than the operations");
throw new DBException(new Error(
"The number of threads must be less or equal than the operations"));
}
}
}
/**
* Create a new File in the Bucket. Any field/value pairs in the specified
* values HashMap will be written into the file with the specified record
* key.
*
* @param bucket
* The name of the bucket
* @param key
* The record key of the file to insert.
* @param values
* A HashMap of field/value pairs to insert in the file.
* Only the content of the first field is written to a byteArray
* multiplied by the number of field. In this way the size
* of the file to upload is determined by the fieldlength
* and fieldcount parameters.
* @return Zero on success, a non-zero error code on error. See the
* {@link DB} class's description for a discussion of error codes.
*/
@Override
public int insert(String bucket, String key,
HashMap<String, ByteIterator> values) {
return writeToStorage(bucket, key, values, true);
}
/**
* Read a file from the Bucket. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param bucket
* The name of the bucket
* @param key
* The record key of the file to read.
* @param fields
* The list of fields to read, or null for all of them,
* it is null by default
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
public int read(String bucket, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
return readFromStorage(bucket, key, result);
}
/**
* Update a file in the database. Any field/value pairs in the specified
* values HashMap will be written into the file with the specified file
* key, overwriting any existing values with the same field name.
*
* @param bucket
* The name of the bucket
* @param key
* The file key of the file to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public int update(String bucket, String key,
HashMap<String, ByteIterator> values) {
return writeToStorage(bucket, key, values, false);
}
/**
* Perform a range scan for a set of files in the bucket. Each
* field/value pair from the result will be stored in a HashMap.
*
* @param bucket
* The name of the bucket
* @param startkey
* The file key of the first file to read.
* @param recordcount
* The number of files to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one file
* @return Zero on success, a non-zero error code on error. See the
* {@link DB} class's description for a discussion of error codes.
*/
@Override
public int scan(String bucket, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
return scanFromStorage(bucket, startkey, recordcount, result);
}
/**
* Upload a new object to S3 or update an object on S3.
*
* @param bucket
* The name of the bucket
* @param key
* The file key of the object to upload/update.
* @param values
* The data to be written on the object
* @param updateMarker
* A boolean value. If true a new object will be uploaded
* to S3. If false an existing object will be re-uploaded
*
*/
protected int writeToStorage(String bucket, String key,
HashMap<String, ByteIterator> values, Boolean updateMarker) {
int totalSize = 0;
int fieldCount = values.size(); //number of fields to concatenate
// getting the first field in the values
Object keyToSearch = values.keySet().toArray()[0];
// getting the content of just one field
byte[] sourceArray = values.get(keyToSearch).toArray();
int sizeArray = sourceArray.length; //size of each array
if (updateMarker){
totalSize = sizeArray*fieldCount;
} else {
try {
S3Object object =
s3Client.getObject(new GetObjectRequest(bucket, key));
ObjectMetadata objectMetadata =
s3Client.getObjectMetadata(bucket, key);
int sizeOfFile = (int)objectMetadata.getContentLength();
fieldCount = sizeOfFile/sizeArray;
totalSize = sizeOfFile;
} catch (Exception e){
System.err.println("Not possible to get the object :"+key);
e.printStackTrace();
return 1;
}
}
byte[] destinationArray = new byte[totalSize];
int offset = 0;
for (int i = 0; i < fieldCount; i++) {
System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray);
offset += sizeArray;
}
try (InputStream input = new ByteArrayInputStream(destinationArray)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(totalSize);
try {
PutObjectResult res =
s3Client.putObject(bucket, key, input, metadata);
if(res.getETag() == null) {
return 1;
}
} catch (Exception e) {
System.err.println("Not possible to write object :"+key);
e.printStackTrace();
return 1;
} finally {
return 0;
}
} catch (Exception e) {
System.err.println("Error in the creation of the stream :"+e.toString());
e.printStackTrace();
return 1;
}
}
/**
* Download an object from S3.
*
* @param bucket
* The name of the bucket
* @param key
* The file key of the object to upload/update.
* @param result
* The Hash map where data from the object are written
*
*/
protected int readFromStorage(String bucket, String key,
HashMap<String, ByteIterator> result) {
try {
S3Object object =
s3Client.getObject(new GetObjectRequest(bucket, key));
ObjectMetadata objectMetadata =
s3Client.getObjectMetadata(bucket, key);
InputStream objectData = object.getObjectContent(); //consuming the stream
// writing the stream to bytes and to results
int sizeOfFile = (int)objectMetadata.getContentLength();
byte[] inputStreamToByte = new byte[sizeOfFile];
objectData.read(inputStreamToByte, 0, sizeOfFile);
result.put(key, new ByteArrayByteIterator(inputStreamToByte));
objectData.close();
} catch (Exception e){
System.err.println("Not possible to get the object "+key);
e.printStackTrace();
return 1;
} finally {
return 0;
}
}
/**
* Perform an emulation of a database scan operation on a S3 bucket.
*
* @param bucket
* The name of the bucket
* @param startkey
* The file key of the first file to read.
* @param recordcount
* The number of files to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one file
*
*/
protected int scanFromStorage(String bucket, String startkey,
int recordcount, Vector<HashMap<String, ByteIterator>> result) {
int counter = 0;
ObjectListing listing = s3Client.listObjects(bucket);
List<S3ObjectSummary> summaries = listing.getObjectSummaries();
List<String> keyList = new ArrayList();
int startkeyNumber = 0;
int numberOfIteration = 0;
// getting the list of files in the bucket
while (listing.isTruncated()) {
listing = s3Client.listNextBatchOfObjects(listing);
summaries.addAll(listing.getObjectSummaries());
}
for (S3ObjectSummary summary : summaries) {
String summaryKey = summary.getKey();
keyList.add(summaryKey);
}
// Sorting the list of files in Alphabetical order
Collections.sort(keyList); // sorting the list
// Getting the position of the startingfile for the scan
for (String key : keyList) {
if (key.equals(startkey)){
startkeyNumber = counter;
} else {
counter = counter + 1;
}
}
// Checking if the total number of file is bigger than the file to read,
// if not using the total number of Files
if (recordcount < keyList.size()) {
numberOfIteration = recordcount;
} else {
numberOfIteration = keyList.size();
}
// Reading the Files starting from the startkey File till the end
// of the Files or Till the recordcount number
for (int i = startkeyNumber; i < numberOfIteration; i++){
HashMap<String, ByteIterator> resultTemp =
new HashMap<String, ByteIterator>();
readFromStorage(bucket, keyList.get(i), resultTemp);
result.add(resultTemp);
}
return 0;
}
}

Event Timeline