diff --git a/s3/src/main/conf/s3.properties b/s3/src/main/conf/s3.properties index 652988f0..93274232 100644 --- a/s3/src/main/conf/s3.properties +++ b/s3/src/main/conf/s3.properties @@ -1,15 +1,18 @@ # # Sample S3 configuration properties # # You may either set properties here or via the command line. # # the AWS S3 access key ID s3.accessKeyId=yourKey # the AWS S3 secret access key ID s3.secretKey=YourSecret # the AWS endpoint s3.endpoint=s3.amazonaws.com +# activating the SSE server side encryption if true + +s3.sse=false diff --git a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java index c50a1778..07e0942c 100644 --- a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java +++ b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java @@ -1,457 +1,470 @@ /** * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. * * S3 storage client binding for YCSB. */ package com.yahoo.ycsb.db; import java.util.HashMap; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.net.*; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.*; import com.amazonaws.auth.*; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.ClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; /** * S3 Storage client for YCSB framework. * * Properties to set: * * s3.accessKeyId=access key S3 aws * s3.secretKey=secret key S3 aws * s3.endPoint=s3.amazonaws.com * s3.region=us-east-1 * The parameter table is the name of the Bucket where to upload the files. * This must be created before to start the benchmark * The size of the file to upload is determined by two parameters: * - fieldcount this is the number of fields of a record in YCSB * - fieldlength this is the size in bytes of a single field in the record * together these two parameters define the size of the file to upload, * the size in bytes is given by the fieldlength multiplied by the fieldcount. * The name of the file is determined by the parameter key. *This key is automatically generated by YCSB. * */ public class S3Client extends DB { private static AmazonS3Client s3Client; + private static String sse; private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); //private static int initCount = 0; /** * Cleanup any state for this storage. * Called once per S3 instance; */ @Override public void cleanup() throws DBException { if (INIT_COUNT.decrementAndGet() == 0) { try { s3Client.shutdown(); System.out.println("The client is shutdown successfully"); } catch (Exception e){ System.err.println("Could not shutdown the S3Client: "+e.toString()); e.printStackTrace(); } finally { if (s3Client != null){ s3Client = null; } } } } /** * Delete a file from S3 Storage. * * @param bucket * The name of the bucket * @param key * The record key of the file to delete. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int delete(String bucket, String key) { try { s3Client.deleteObject(new DeleteObjectRequest(bucket, key)); } catch (Exception e){ System.err.println("Not possible to delete the key "+key); e.printStackTrace(); return 1; } return 0; } /** * Initialize any state for the storage. * Called once per S3 instance; If the client is not null it is re-used. */ @Override public void init() throws DBException { final int count = INIT_COUNT.incrementAndGet(); synchronized (S3Client.class){ Properties propsCL = getProperties(); int recordcount = Integer.parseInt( propsCL.getProperty("recordcount")); int operationcount = Integer.parseInt( propsCL.getProperty("operationcount")); int numberOfOperations = 0; if (recordcount > 0){ if (recordcount > operationcount){ numberOfOperations = recordcount; } else { numberOfOperations = operationcount; } } else { numberOfOperations = operationcount; } if (count <= numberOfOperations) { String accessKeyId = null; String secretKey = null; String endPoint = null; String region = null; String maxErrorRetry = null; BasicAWSCredentials s3Credentials; ClientConfiguration clientConfig; if (s3Client != null) { System.out.println("Reusing the same client"); return; } try { InputStream propFile = S3Client.class.getClassLoader() .getResourceAsStream("s3.properties"); Properties props = new Properties(System.getProperties()); props.load(propFile); accessKeyId = props.getProperty("s3.accessKeyId"); if (accessKeyId == null){ accessKeyId = propsCL.getProperty("s3.accessKeyId"); } System.out.println(accessKeyId); secretKey = props.getProperty("s3.secretKey"); if (secretKey == null){ secretKey = propsCL.getProperty("s3.secretKey"); } System.out.println(secretKey); endPoint = props.getProperty("s3.endPoint"); if (endPoint == null){ endPoint = propsCL.getProperty("s3.endPoint", "s3.amazonaws.com"); } System.out.println(endPoint); region = props.getProperty("s3.region"); if (region == null){ region = propsCL.getProperty("s3.region", "us-east-1"); } System.out.println(region); maxErrorRetry = props.getProperty("s3.maxErrorRetry"); if (maxErrorRetry == null){ maxErrorRetry = propsCL.getProperty("s3.maxErrorRetry", "15"); } - System.out.println(maxErrorRetry); + sse = props.getProperty("s3.sse"); + if (sse == null){ + sse = propsCL.getProperty("s3.sse", "false"); + } } catch (Exception e){ System.err.println("The file properties doesn't exist "+e.toString()); e.printStackTrace(); } try { System.out.println("Inizializing the S3 connection"); s3Credentials = new BasicAWSCredentials(accessKeyId, secretKey); clientConfig = new ClientConfiguration(); clientConfig.setMaxErrorRetry(Integer.parseInt(maxErrorRetry)); s3Client = new AmazonS3Client(s3Credentials, clientConfig); s3Client.setRegion(Region.getRegion(Regions.fromName(region))); s3Client.setEndpoint(endPoint); System.out.println("Connection successfully initialized"); } catch (Exception e){ System.err.println("Could not connect to S3 storage: "+ e.toString()); e.printStackTrace(); throw new DBException(e); } } else { System.err.println( "The number of threads must be less or equal than the operations"); throw new DBException(new Error( "The number of threads must be less or equal than the operations")); } } } /** * Create a new File in the Bucket. Any field/value pairs in the specified * values HashMap will be written into the file with the specified record * key. * * @param bucket * The name of the bucket * @param key * The record key of the file to insert. * @param values * A HashMap of field/value pairs to insert in the file. * Only the content of the first field is written to a byteArray * multiplied by the number of field. In this way the size * of the file to upload is determined by the fieldlength * and fieldcount parameters. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int insert(String bucket, String key, HashMap values) { - return writeToStorage(bucket, key, values, true); + return writeToStorage(bucket, key, values, true, sse); } /** * Read a file from the Bucket. Each field/value pair from the result * will be stored in a HashMap. * * @param bucket * The name of the bucket * @param key * The record key of the file to read. * @param fields * The list of fields to read, or null for all of them, * it is null by default * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public int read(String bucket, String key, Set fields, HashMap result) { return readFromStorage(bucket, key, result); } /** * Update a file in the database. Any field/value pairs in the specified * values HashMap will be written into the file with the specified file * key, overwriting any existing values with the same field name. * * @param bucket * The name of the bucket * @param key * The file key of the file to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int update(String bucket, String key, HashMap values) { - return writeToStorage(bucket, key, values, false); + return writeToStorage(bucket, key, values, false, sse); } /** * Perform a range scan for a set of files in the bucket. Each * field/value pair from the result will be stored in a HashMap. * * @param bucket * The name of the bucket * @param startkey * The file key of the first file to read. * @param recordcount * The number of files to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one file * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int scan(String bucket, String startkey, int recordcount, Set fields, Vector> result) { return scanFromStorage(bucket, startkey, recordcount, result); } /** * Upload a new object to S3 or update an object on S3. * * @param bucket * The name of the bucket * @param key * The file key of the object to upload/update. * @param values * The data to be written on the object * @param updateMarker * A boolean value. If true a new object will be uploaded * to S3. If false an existing object will be re-uploaded * */ protected int writeToStorage(String bucket, String key, - HashMap values, Boolean updateMarker) { + HashMap values, Boolean updateMarker, + String sseLocal) { int totalSize = 0; int fieldCount = values.size(); //number of fields to concatenate // getting the first field in the values Object keyToSearch = values.keySet().toArray()[0]; // getting the content of just one field byte[] sourceArray = values.get(keyToSearch).toArray(); int sizeArray = sourceArray.length; //size of each array if (updateMarker){ totalSize = sizeArray*fieldCount; } else { try { S3Object object = s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, key); int sizeOfFile = (int)objectMetadata.getContentLength(); fieldCount = sizeOfFile/sizeArray; totalSize = sizeOfFile; } catch (Exception e){ System.err.println("Not possible to get the object :"+key); e.printStackTrace(); return 1; } } byte[] destinationArray = new byte[totalSize]; int offset = 0; for (int i = 0; i < fieldCount; i++) { System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray); offset += sizeArray; } try (InputStream input = new ByteArrayInputStream(destinationArray)) { ObjectMetadata metadata = new ObjectMetadata(); + if (sseLocal.equals("true")) { + metadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } metadata.setContentLength(totalSize); try { PutObjectResult res = s3Client.putObject(bucket, key, input, metadata); if(res.getETag() == null) { return 1; + } else { + if (sseLocal.equals("true")) { + System.out.println("Uploaded object encryption status is " + + res.getSSEAlgorithm()); + } } } catch (Exception e) { System.err.println("Not possible to write object :"+key); e.printStackTrace(); return 1; } finally { return 0; } } catch (Exception e) { System.err.println("Error in the creation of the stream :"+e.toString()); e.printStackTrace(); return 1; } } /** * Download an object from S3. * * @param bucket * The name of the bucket * @param key * The file key of the object to upload/update. * @param result * The Hash map where data from the object are written * */ protected int readFromStorage(String bucket, String key, HashMap result) { try { S3Object object = s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, key); InputStream objectData = object.getObjectContent(); //consuming the stream // writing the stream to bytes and to results int sizeOfFile = (int)objectMetadata.getContentLength(); byte[] inputStreamToByte = new byte[sizeOfFile]; objectData.read(inputStreamToByte, 0, sizeOfFile); result.put(key, new ByteArrayByteIterator(inputStreamToByte)); objectData.close(); } catch (Exception e){ System.err.println("Not possible to get the object "+key); e.printStackTrace(); return 1; } finally { return 0; } } /** * Perform an emulation of a database scan operation on a S3 bucket. * * @param bucket * The name of the bucket * @param startkey * The file key of the first file to read. * @param recordcount * The number of files to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one file * */ protected int scanFromStorage(String bucket, String startkey, int recordcount, Vector> result) { int counter = 0; ObjectListing listing = s3Client.listObjects(bucket); List summaries = listing.getObjectSummaries(); List keyList = new ArrayList(); int startkeyNumber = 0; int numberOfIteration = 0; // getting the list of files in the bucket while (listing.isTruncated()) { listing = s3Client.listNextBatchOfObjects(listing); summaries.addAll(listing.getObjectSummaries()); } for (S3ObjectSummary summary : summaries) { String summaryKey = summary.getKey(); keyList.add(summaryKey); } // Sorting the list of files in Alphabetical order Collections.sort(keyList); // sorting the list // Getting the position of the startingfile for the scan for (String key : keyList) { if (key.equals(startkey)){ startkeyNumber = counter; } else { counter = counter + 1; } } // Checking if the total number of file is bigger than the file to read, // if not using the total number of Files if (recordcount < keyList.size()) { numberOfIteration = recordcount; } else { numberOfIteration = keyList.size(); } // Reading the Files starting from the startkey File till the end // of the Files or Till the recordcount number for (int i = startkeyNumber; i < numberOfIteration; i++){ HashMap resultTemp = new HashMap(); readFromStorage(bucket, keyList.get(i), resultTemp); result.add(resultTemp); } return 0; } }