diff --git a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java index 8beb52d5..c50a1778 100644 --- a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java +++ b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java @@ -1,459 +1,457 @@ /** * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. * * S3 storage client binding for YCSB. */ package com.yahoo.ycsb.db; import java.util.HashMap; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.net.*; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.*; import com.amazonaws.auth.*; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.ClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; /** * S3 Storage client for YCSB framework. * * Properties to set: * * s3.accessKeyId=access key S3 aws * s3.secretKey=secret key S3 aws * s3.endPoint=s3.amazonaws.com * s3.region=us-east-1 * The parameter table is the name of the Bucket where to upload the files. * This must be created before to start the benchmark * The size of the file to upload is determined by two parameters: * - fieldcount this is the number of fields of a record in YCSB * - fieldlength this is the size in bytes of a single field in the record * together these two parameters define the size of the file to upload, * the size in bytes is given by the fieldlength multiplied by the fieldcount. * The name of the file is determined by the parameter key. *This key is automatically generated by YCSB. * */ public class S3Client extends DB { private static AmazonS3Client s3Client; private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); //private static int initCount = 0; /** * Cleanup any state for this storage. * Called once per S3 instance; */ @Override public void cleanup() throws DBException { if (INIT_COUNT.decrementAndGet() == 0) { try { s3Client.shutdown(); System.out.println("The client is shutdown successfully"); } catch (Exception e){ System.err.println("Could not shutdown the S3Client: "+e.toString()); e.printStackTrace(); } finally { if (s3Client != null){ s3Client = null; } } } } /** * Delete a file from S3 Storage. * * @param bucket * The name of the bucket * @param key * The record key of the file to delete. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int delete(String bucket, String key) { try { s3Client.deleteObject(new DeleteObjectRequest(bucket, key)); } catch (Exception e){ System.err.println("Not possible to delete the key "+key); e.printStackTrace(); return 1; } return 0; } /** * Initialize any state for the storage. * Called once per S3 instance; If the client is not null it is re-used. */ @Override public void init() throws DBException { final int count = INIT_COUNT.incrementAndGet(); synchronized (S3Client.class){ Properties propsCL = getProperties(); int recordcount = Integer.parseInt( propsCL.getProperty("recordcount")); int operationcount = Integer.parseInt( propsCL.getProperty("operationcount")); int numberOfOperations = 0; if (recordcount > 0){ if (recordcount > operationcount){ numberOfOperations = recordcount; } else { numberOfOperations = operationcount; } } else { numberOfOperations = operationcount; } if (count <= numberOfOperations) { String accessKeyId = null; String secretKey = null; String endPoint = null; String region = null; String maxErrorRetry = null; BasicAWSCredentials s3Credentials; ClientConfiguration clientConfig; if (s3Client != null) { System.out.println("Reusing the same client"); return; } try { InputStream propFile = S3Client.class.getClassLoader() .getResourceAsStream("s3.properties"); Properties props = new Properties(System.getProperties()); props.load(propFile); accessKeyId = props.getProperty("s3.accessKeyId"); if (accessKeyId == null){ accessKeyId = propsCL.getProperty("s3.accessKeyId"); } System.out.println(accessKeyId); secretKey = props.getProperty("s3.secretKey"); if (secretKey == null){ secretKey = propsCL.getProperty("s3.secretKey"); } System.out.println(secretKey); endPoint = props.getProperty("s3.endPoint"); if (endPoint == null){ endPoint = propsCL.getProperty("s3.endPoint", "s3.amazonaws.com"); } System.out.println(endPoint); region = props.getProperty("s3.region"); if (region == null){ region = propsCL.getProperty("s3.region", "us-east-1"); } System.out.println(region); maxErrorRetry = props.getProperty("s3.maxErrorRetry"); if (maxErrorRetry == null){ maxErrorRetry = propsCL.getProperty("s3.maxErrorRetry", "15"); } System.out.println(maxErrorRetry); } catch (Exception e){ System.err.println("The file properties doesn't exist "+e.toString()); e.printStackTrace(); } try { System.out.println("Inizializing the S3 connection"); s3Credentials = new BasicAWSCredentials(accessKeyId, secretKey); clientConfig = new ClientConfiguration(); clientConfig.setMaxErrorRetry(Integer.parseInt(maxErrorRetry)); s3Client = new AmazonS3Client(s3Credentials, clientConfig); s3Client.setRegion(Region.getRegion(Regions.fromName(region))); s3Client.setEndpoint(endPoint); System.out.println("Connection successfully initialized"); } catch (Exception e){ System.err.println("Could not connect to S3 storage: "+ e.toString()); e.printStackTrace(); throw new DBException(e); } } else { System.err.println( "The number of threads must be less or equal than the operations"); throw new DBException(new Error( "The number of threads must be less or equal than the operations")); } } } /** * Create a new File in the Bucket. Any field/value pairs in the specified * values HashMap will be written into the file with the specified record * key. * * @param bucket * The name of the bucket * @param key * The record key of the file to insert. * @param values * A HashMap of field/value pairs to insert in the file. * Only the content of the first field is written to a byteArray * multiplied by the number of field. In this way the size * of the file to upload is determined by the fieldlength * and fieldcount parameters. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int insert(String bucket, String key, HashMap values) { return writeToStorage(bucket, key, values, true); } /** * Read a file from the Bucket. Each field/value pair from the result * will be stored in a HashMap. * * @param bucket * The name of the bucket * @param key * The record key of the file to read. * @param fields * The list of fields to read, or null for all of them, * it is null by default * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public int read(String bucket, String key, Set fields, HashMap result) { return readFromStorage(bucket, key, result); } /** * Update a file in the database. Any field/value pairs in the specified * values HashMap will be written into the file with the specified file * key, overwriting any existing values with the same field name. * * @param bucket * The name of the bucket * @param key * The file key of the file to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int update(String bucket, String key, HashMap values) { return writeToStorage(bucket, key, values, false); } /** * Perform a range scan for a set of files in the bucket. Each * field/value pair from the result will be stored in a HashMap. * * @param bucket * The name of the bucket * @param startkey * The file key of the first file to read. * @param recordcount * The number of files to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one file * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int scan(String bucket, String startkey, int recordcount, Set fields, Vector> result) { return scanFromStorage(bucket, startkey, recordcount, result); } /** * Upload a new object to S3 or update an object on S3. * * @param bucket * The name of the bucket * @param key * The file key of the object to upload/update. * @param values * The data to be written on the object * @param updateMarker * A boolean value. If true a new object will be uploaded * to S3. If false an existing object will be re-uploaded * */ protected int writeToStorage(String bucket, String key, HashMap values, Boolean updateMarker) { int totalSize = 0; int fieldCount = values.size(); //number of fields to concatenate // getting the first field in the values Object keyToSearch = values.keySet().toArray()[0]; // getting the content of just one field byte[] sourceArray = values.get(keyToSearch).toArray(); int sizeArray = sourceArray.length; //size of each array if (updateMarker){ totalSize = sizeArray*fieldCount; } else { try { S3Object object = s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, key); int sizeOfFile = (int)objectMetadata.getContentLength(); fieldCount = sizeOfFile/sizeArray; totalSize = sizeOfFile; } catch (Exception e){ System.err.println("Not possible to get the object :"+key); e.printStackTrace(); return 1; } } byte[] destinationArray = new byte[totalSize]; int offset = 0; for (int i = 0; i < fieldCount; i++) { System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray); offset += sizeArray; } - InputStream input = new ByteArrayInputStream(destinationArray); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(totalSize); - try { - PutObjectResult res = - s3Client.putObject(bucket, key, input, metadata); - if(res.getETag() == null) { - return 1; - } - } catch (Exception e) { - System.err.println("Not possible to write object :"+key); - e.printStackTrace(); - return 1; - } finally { + try (InputStream input = new ByteArrayInputStream(destinationArray)) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(totalSize); try { - input.close(); + PutObjectResult res = + s3Client.putObject(bucket, key, input, metadata); + if(res.getETag() == null) { + return 1; + } } catch (Exception e) { - System.err.println("Not possible to close the stream :"+e.toString()); + System.err.println("Not possible to write object :"+key); e.printStackTrace(); return 1; + } finally { + return 0; } - return 0; + } catch (Exception e) { + System.err.println("Error in the creation of the stream :"+e.toString()); + e.printStackTrace(); + return 1; } } /** * Download an object from S3. * * @param bucket * The name of the bucket * @param key * The file key of the object to upload/update. * @param result * The Hash map where data from the object are written * */ protected int readFromStorage(String bucket, String key, HashMap result) { try { S3Object object = s3Client.getObject(new GetObjectRequest(bucket, key)); ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, key); InputStream objectData = object.getObjectContent(); //consuming the stream // writing the stream to bytes and to results int sizeOfFile = (int)objectMetadata.getContentLength(); byte[] inputStreamToByte = new byte[sizeOfFile]; objectData.read(inputStreamToByte, 0, sizeOfFile); result.put(key, new ByteArrayByteIterator(inputStreamToByte)); objectData.close(); } catch (Exception e){ System.err.println("Not possible to get the object "+key); e.printStackTrace(); return 1; } finally { return 0; } } /** * Perform an emulation of a database scan operation on a S3 bucket. * * @param bucket * The name of the bucket * @param startkey * The file key of the first file to read. * @param recordcount * The number of files to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one file * */ protected int scanFromStorage(String bucket, String startkey, int recordcount, Vector> result) { int counter = 0; ObjectListing listing = s3Client.listObjects(bucket); List summaries = listing.getObjectSummaries(); List keyList = new ArrayList(); int startkeyNumber = 0; int numberOfIteration = 0; // getting the list of files in the bucket while (listing.isTruncated()) { listing = s3Client.listNextBatchOfObjects(listing); summaries.addAll(listing.getObjectSummaries()); } for (S3ObjectSummary summary : summaries) { String summaryKey = summary.getKey(); keyList.add(summaryKey); } // Sorting the list of files in Alphabetical order Collections.sort(keyList); // sorting the list // Getting the position of the startingfile for the scan for (String key : keyList) { if (key.equals(startkey)){ startkeyNumber = counter; } else { counter = counter + 1; } } // Checking if the total number of file is bigger than the file to read, // if not using the total number of Files if (recordcount < keyList.size()) { numberOfIteration = recordcount; } else { numberOfIteration = keyList.size(); } // Reading the Files starting from the startkey File till the end // of the Files or Till the recordcount number for (int i = startkeyNumber; i < numberOfIteration; i++){ HashMap resultTemp = new HashMap(); readFromStorage(bucket, keyList.get(i), resultTemp); result.add(resultTemp); } return 0; } }