diff --git a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java index fc59c85a..ffe19b80 100644 --- a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java +++ b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java @@ -1,343 +1,346 @@ /** * S3 storage client binding for YCSB. * * Submitted by Ivan Baldinotti on 14/07/2015 * */ package com.yahoo.ycsb.db; import java.util.HashMap; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.*; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.*; import com.amazonaws.auth.*; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.ClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; /** * S3 Storage client for YCSB framework. * * Properties to set: * * s3.accessKeyId=access key S3 aws * s3.secretKey=secret key S3 aws * s3.endPoint=s3.amazonaws.com * s3.region=us-east-1 * The parameter table is the name of the Bucket where to upload the files. * This must be created before to start the benchmark * The size of the file to upload is determined by two parameters: * - fieldcount this is the number of fields of a record in YCSB * - fieldlength this is the size in bytes of a single field in the record * together these two parameters define the size of the file to upload, * the size in bytes is given by the fieldlength multiplied by the fieldcount. * The name of the file is determined by the parameter key. *This key is automatically generated by YCSB. * * @author ivanB1975 */ public class S3Client extends DB { private static String accessKeyId; private static String secretKey; private static String endPoint; private static String region; private static String maxErrorRetry; private static BasicAWSCredentials s3Credentials; private static AmazonS3Client s3Client; private static ClientConfiguration clientConfig; /** * Cleanup any state for this storage. * Called once per S3 instance; * there is one S3 instance per client thread. */ @Override public void cleanup() throws DBException { - try { - this.s3Client.shutdown(); //this should not be used - //this.s3Client = null; - } catch (Exception e){ - e.printStackTrace(); + if(this.s3Client != null){ + try { + this.s3Client.shutdown(); + } catch (Exception e){ + e.printStackTrace(); + } finally { + this.s3Client = null; + } } } /** * Delete a file from S3 Storage. * * @param bucket * The name of the bucket * @param key * The record key of the file to delete. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int delete(String bucket, String key) { try { this.s3Client.deleteObject(new DeleteObjectRequest(bucket, key)); } catch (Exception e){ e.printStackTrace(); return 1; } return 0; } /** * Initialize any state for the storage. * Called once per S3 instance; If the client is not null it is re-used. */ @Override public void init() throws DBException { synchronized (S3Client.class){ Properties props = getProperties(); accessKeyId = props.getProperty("s3.accessKeyId", "accessKeyId"); secretKey = props.getProperty("s3.secretKey", "secretKey"); endPoint = props.getProperty("s3.endPoint", "s3.amazonaws.com"); region = props.getProperty("s3.region", "us-east-1"); maxErrorRetry = props.getProperty("s3.maxErrorRetry", "15"); System.out.println("Inizializing the S3 connection"); s3Credentials = new BasicAWSCredentials(accessKeyId, secretKey); clientConfig = new ClientConfiguration(); clientConfig.setMaxErrorRetry(Integer.parseInt(maxErrorRetry)); if (s3Client != null) { System.out.println("Reusing the same client"); return; } try { s3Client = new AmazonS3Client(s3Credentials, clientConfig); s3Client.setRegion(Region.getRegion(Regions.fromName(region))); s3Client.setEndpoint(endPoint); System.out.println("Connection successfully initialized"); } catch (Exception e){ System.err.println("Could not connect to S3 storage: "+ e.toString()); e.printStackTrace(); return; } } } /** * Create a new File in the Bucket. Any field/value pairs in the specified * values HashMap will be written into the file with the specified record * key. * * @param bucket * The name of the bucket * @param key * The record key of the file to insert. * @param values * A HashMap of field/value pairs to insert in the file. * Only the content of the first field is written to a byteArray * multiplied by the number of field. In this way the size * of the file to upload is determined by the fieldlength * and fieldcount parameters. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int insert(String bucket, String key, HashMap values) { return writeToStorage(bucket, key, values, 0); } /** * Read a file from the Bucket. Each field/value pair from the result * will be stored in a HashMap. * * @param bucket * The name of the bucket * @param key * The record key of the file to read. * @param fields * The list of fields to read, or null for all of them, * it is null by default * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public int read(String bucket, String key, Set fields, HashMap result) { return readFromStorage(bucket, key, result); } /** * Update a file in the database. Any field/value pairs in the specified * values HashMap will be written into the file with the specified file * key, overwriting any existing values with the same field name. * * @param bucket * The name of the bucket * @param key * The file key of the file to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int update(String bucket, String key, HashMap values) { return writeToStorage(bucket, key, values, 1); } /** * Perform a range scan for a set of files in the bucket. Each * field/value pair from the result will be stored in a HashMap. * * @param bucket * The name of the bucket * @param startkey * The file key of the first file to read. * @param recordcount * The number of files to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one file * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int scan(String bucket, String startkey, int recordcount, Set fields, Vector> result) { return scanFromStorage(bucket, startkey, recordcount, result); } protected int writeToStorage(String bucket, String key, HashMap values, int updateMarker) { int totalSize = 0; int fieldCount = values.size(); //number of fields to concatenate // getting the first field in the values Object keyToSearch = values.keySet().toArray()[0]; // getting the content of just one field byte[] sourceArray = values.get(keyToSearch).toArray(); int sizeArray = sourceArray.length; //size of each array if (updateMarker == 0){ totalSize = sizeArray*fieldCount; } else { try { S3Object object = this.s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = this.s3Client.getObjectMetadata(bucket, key); int sizeOfFile = (int)objectMetadata.getContentLength(); fieldCount = sizeOfFile/sizeArray; totalSize = sizeOfFile; } catch (Exception e){ e.printStackTrace(); return 1; } } byte[] destinationArray = new byte[totalSize]; int offset = 0; for (int i = 0; i < fieldCount; i++) { System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray); offset += sizeArray; } InputStream input = new ByteArrayInputStream(destinationArray); ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(totalSize); try { PutObjectResult res = this.s3Client.putObject(bucket, key, input, metadata); if(res.getETag() == null) { return 1; } } catch (Exception e) { e.printStackTrace(); return 1; } finally { try { input.close(); } catch (Exception e) { e.printStackTrace(); return 1; } return 0; } } protected int readFromStorage(String bucket, String key, HashMap result) { try { S3Object object = this.s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = this.s3Client.getObjectMetadata(bucket, key); InputStream objectData = object.getObjectContent(); //consuming the stream // writing the stream to bytes and to results int sizeOfFile = (int)objectMetadata.getContentLength(); byte[] inputStreamToByte = new byte[sizeOfFile]; objectData.read(inputStreamToByte, 0, sizeOfFile); result.put(key, new ByteArrayByteIterator(inputStreamToByte)); objectData.close(); } catch (Exception e){ e.printStackTrace(); return 1; } finally { return 0; } } protected int scanFromStorage(String bucket, String startkey, int recordcount, Vector> result) { int counter = 0; ObjectListing listing = s3Client.listObjects(bucket); List summaries = listing.getObjectSummaries(); List keyList = new ArrayList(); int startkeyNumber = 0; int numberOfIteration = 0; // getting the list of files in the bucket while (listing.isTruncated()) { listing = s3Client.listNextBatchOfObjects(listing); summaries.addAll(listing.getObjectSummaries()); } for (S3ObjectSummary summary : summaries) { String summaryKey = summary.getKey(); keyList.add(summaryKey); } // Sorting the list of files in Alphabetical order Collections.sort(keyList); // sorting the list // Getting the position of the startingfile for the scan for (String key : keyList) { if (key.equals(startkey)){ startkeyNumber = counter; } else { counter = counter + 1; } } // Checking if the total number of file is bigger than the file to read, // if not using the total number of Files if (recordcount < keyList.size()) { numberOfIteration = recordcount; } else { numberOfIteration = keyList.size(); } // Reading the Files starting from the startkey File till the end // of the Files or Till the recordcount number for (int i = startkeyNumber; i < numberOfIteration; i++){ HashMap resultTemp = new HashMap(); readFromStorage(bucket, keyList.get(i), resultTemp); result.add(resultTemp); } return 0; } }