diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java index 14d8e41b..f68352f4 100644 --- a/core/src/main/java/com/yahoo/ycsb/Client.java +++ b/core/src/main/java/com/yahoo/ycsb/Client.java @@ -1,842 +1,842 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Enumeration; import java.util.Properties; import java.util.Vector; import java.util.concurrent.locks.LockSupport; import com.yahoo.ycsb.measurements.Measurements; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter; //import org.apache.log4j.BasicConfigurator; /** * A thread to periodically show the status of the experiment, to reassure you that progress is being made. * * @author cooperb * */ class StatusThread extends Thread { Vector _threads; String _label; boolean _standardstatus; /** * The interval for reporting status. */ public static final long sleeptime=10000; public StatusThread(Vector threads, String label, boolean standardstatus) { _threads=threads; _label=label; _standardstatus=standardstatus; } /** * Run and periodically report status. */ public void run() { long st=System.currentTimeMillis(); long lasten=st; long lasttotalops=0; boolean alldone; SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); do { alldone=true; int totalops=0; //terminate this thread when all the worker threads are done for (Thread t : _threads) { if (t.getState()!=Thread.State.TERMINATED) { alldone=false; } ClientThread ct=(ClientThread)t; totalops+=ct.getOpsDone(); } long en=System.currentTimeMillis(); long interval=en-st; //double throughput=1000.0*((double)totalops)/((double)interval); double curthroughput=1000.0*(((double)(totalops-lasttotalops))/((double)(en-lasten))); lasttotalops=totalops; lasten=en; DecimalFormat d = new DecimalFormat("#.##"); String label = _label + format.format(new Date()); StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: "); msg.append(totalops).append(" operations; "); if (totalops != 0) { msg.append(d.format(curthroughput)).append(" current ops/sec; "); } msg.append(Measurements.getMeasurements().getSummary()); System.err.println(msg); if (_standardstatus) { System.out.println(msg); } try { sleep(sleeptime); } catch (InterruptedException e) { //do nothing } } while (!alldone); } } /** * A thread for executing transactions or data inserts to the database. * * @author cooperb * */ class ClientThread extends Thread { DB _db; boolean _dotransactions; Workload _workload; int _opcount; double _targetOpsPerMs; int _opsdone; int _threadid; int _threadcount; Object _workloadstate; Properties _props; long _targetOpsTickNs; final Measurements _measurements; final boolean _measureFromIntendedDeadline; /** * Constructor. * * @param db the DB implementation to use * @param dotransactions true to do transactions, false to insert data * @param workload the workload to use * @param threadid the id of this thread * @param threadcount the total number of threads * @param props the properties defining the experiment * @param opcount the number of operations (transactions or inserts) to do * @param targetperthreadperms target number of operations per thread per ms */ public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms, boolean measureFromIntendedDeadline) { //TODO: consider removing threadcount and threadid _db=db; _dotransactions=dotransactions; _workload=workload; _opcount=opcount; _opsdone=0; if(targetperthreadperms > 0){ _targetOpsPerMs=targetperthreadperms; _targetOpsTickNs=(long)(1000000/_targetOpsPerMs); } _threadid=threadid; _threadcount=threadcount; _props=props; _measurements = Measurements.getMeasurements(); //System.out.println("Interval = "+interval); _measureFromIntendedDeadline = false; } public int getOpsDone() { return _opsdone; } public void run() { try { _db.init(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } try { _workloadstate=_workload.initThread(_props,_threadid,_threadcount); } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } //NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements // and the client thread have the same view on time. //spread the thread operations out so they don't all hit the DB at the same time // GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0 // and the sleep() doesn't make sense for granularities < 1 ms anyway if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0)) { long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs); sleepUntil(System.nanoTime() + randomMinorDelay); } try { if (_dotransactions) { long startTimeNanos = System.nanoTime(); while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested()) { if (!_workload.doTransaction(_db,_workloadstate)) { break; } _opsdone++; throttleNanos(startTimeNanos); } } else { long startTimeNanos = System.nanoTime(); while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested()) { if (!_workload.doInsert(_db,_workloadstate)) { break; } _opsdone++; throttleNanos(startTimeNanos); } } } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { - _measurements.setStartTimeNs(0); + _measurements.setIntendedStartTimeNs(0); _db.cleanup(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } } private void sleepUntil(long deadline) { long now = System.nanoTime(); while((now = System.nanoTime()) < deadline) { LockSupport.parkNanos(deadline - now); } } private void throttleNanos(long startTimeNanos) { //throttle the operations if (_targetOpsPerMs > 0) { // delay until next tick long deadline = startTimeNanos + _opsdone*_targetOpsTickNs; sleepUntil(deadline); if(_measureFromIntendedDeadline) - _measurements.setStartTimeNs(deadline); + _measurements.setIntendedStartTimeNs(deadline); } } } /** * Main class for executing YCSB. */ public class Client { public static final String DEFAULT_RECORD_COUNT = "0"; /** * The target number of operations to perform. */ public static final String OPERATION_COUNT_PROPERTY="operationcount"; /** * The number of records to load into the database initially. */ public static final String RECORD_COUNT_PROPERTY="recordcount"; /** * The workload class to be loaded. */ public static final String WORKLOAD_PROPERTY="workload"; /** * The database class to be used. */ public static final String DB_PROPERTY="db"; /** * The exporter class to be used. The default is * com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter. */ public static final String EXPORTER_PROPERTY="exporter"; /** * If set to the path of a file, YCSB will write all output to this file * instead of STDOUT. */ public static final String EXPORT_FILE_PROPERTY="exportfile"; /** * The number of YCSB client threads to run. */ public static final String THREAD_COUNT_PROPERTY="threadcount"; /** * Indicates how many inserts to do, if less than recordcount. Useful for partitioning * the load among multiple servers, if the client is the bottleneck. Additionally, workloads * should support the "insertstart" property, which tells them which record to start at. */ public static final String INSERT_COUNT_PROPERTY="insertcount"; /** * Target number of operations per second */ public static final String TARGET_PROPERTY="target"; /** * The maximum amount of time (in seconds) for which the benchmark will be run. */ public static final String MAX_EXECUTION_TIME = "maxexecutiontime"; private static final String CO_CORRECT_PROPERTY = "co.correct"; private static final String DEFAULT_CO_CORRECT = "true"; public static void usageMessage() { System.out.println("Usage: java com.yahoo.ycsb.Client [options]"); System.out.println("Options:"); System.out.println(" -threads n: execute using n threads (default: 1) - can also be specified as the \n" + " \"threadcount\" property using -p"); System.out.println(" -target n: attempt to do n operations per second (default: unlimited) - can also\n" + " be specified as the \"target\" property using -p"); System.out.println(" -load: run the loading phase of the workload"); System.out.println(" -t: run the transactions phase of the workload (default)"); System.out.println(" -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - \n" + " can also be specified as the \"db\" property using -p"); System.out.println(" -P propertyfile: load properties from the given file. Multiple files can"); System.out.println(" be specified, and will be processed in the order specified"); System.out.println(" -p name=value: specify a property to be passed to the DB and workloads;"); System.out.println(" multiple properties can be specified, and override any"); System.out.println(" values in the propertyfile"); System.out.println(" -s: show status during run (default: no status)"); System.out.println(" -l label: use label for status (e.g. to label one experiment out of a whole batch)"); System.out.println(""); System.out.println("Required properties:"); System.out.println(" "+WORKLOAD_PROPERTY+": the name of the workload class to use (e.g. com.yahoo.ycsb.workloads.CoreWorkload)"); System.out.println(""); System.out.println("To run the transaction phase from multiple servers, start a separate client on each."); System.out.println("To run the load phase from multiple servers, start a separate client on each; additionally,"); System.out.println("use the \"insertcount\" and \"insertstart\" properties to divide up the records to be inserted"); } public static boolean checkRequiredProperties(Properties props) { if (props.getProperty(WORKLOAD_PROPERTY)==null) { System.out.println("Missing property: "+WORKLOAD_PROPERTY); return false; } return true; } /** * Exports the measurements to either sysout or a file using the exporter * loaded from conf. * @throws IOException Either failed to write to output stream or failed to close it. */ private static void exportMeasurements(Properties props, int opcount, long runtime) throws IOException { MeasurementsExporter exporter = null; try { // if no destination file is provided the results will be written to stdout OutputStream out; String exportFile = props.getProperty(EXPORT_FILE_PROPERTY); if (exportFile == null) { out = System.out; } else { out = new FileOutputStream(exportFile); } // if no exporter is provided the default text one will be used String exporterStr = props.getProperty(EXPORTER_PROPERTY, "com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter"); try { exporter = (MeasurementsExporter) Class.forName(exporterStr).getConstructor(OutputStream.class).newInstance(out); } catch (Exception e) { System.err.println("Could not find exporter " + exporterStr + ", will use default text reporter."); e.printStackTrace(); exporter = new TextMeasurementsExporter(out); } exporter.write("OVERALL", "RunTime(ms)", runtime); double throughput = 1000.0 * ((double) opcount) / ((double) runtime); exporter.write("OVERALL", "Throughput(ops/sec)", throughput); Measurements.getMeasurements().exportMeasurements(exporter); } finally { if (exporter != null) { exporter.close(); } } } @SuppressWarnings("unchecked") public static void main(String[] args) { String dbname; Properties props=new Properties(); Properties fileprops=new Properties(); boolean dotransactions=true; int threadcount=1; int target=0; boolean status=false; String label=""; //parse arguments int argindex=0; if (args.length==0) { usageMessage(); System.exit(0); } while (args[argindex].startsWith("-")) { if (args[argindex].compareTo("-threads")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } int tcount=Integer.parseInt(args[argindex]); props.setProperty(THREAD_COUNT_PROPERTY, tcount+""); argindex++; } else if (args[argindex].compareTo("-target")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } int ttarget=Integer.parseInt(args[argindex]); props.setProperty(TARGET_PROPERTY, ttarget+""); argindex++; } else if (args[argindex].compareTo("-load")==0) { dotransactions=false; argindex++; } else if (args[argindex].compareTo("-t")==0) { dotransactions=true; argindex++; } else if (args[argindex].compareTo("-s")==0) { status=true; argindex++; } else if (args[argindex].compareTo("-db")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } props.setProperty(DB_PROPERTY,args[argindex]); argindex++; } else if (args[argindex].compareTo("-l")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } label=args[argindex]; argindex++; } else if (args[argindex].compareTo("-P")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } String propfile=args[argindex]; argindex++; Properties myfileprops=new Properties(); try { myfileprops.load(new FileInputStream(propfile)); } catch (IOException e) { System.out.println(e.getMessage()); System.exit(0); } //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e=myfileprops.propertyNames(); e.hasMoreElements(); ) { String prop=(String)e.nextElement(); fileprops.setProperty(prop,myfileprops.getProperty(prop)); } } else if (args[argindex].compareTo("-p")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } int eq=args[argindex].indexOf('='); if (eq<0) { usageMessage(); System.exit(0); } String name=args[argindex].substring(0,eq); String value=args[argindex].substring(eq+1); props.put(name,value); //System.out.println("["+name+"]=["+value+"]"); argindex++; } else { System.out.println("Unknown option "+args[argindex]); usageMessage(); System.exit(0); } if (argindex>=args.length) { break; } } if (argindex!=args.length) { usageMessage(); System.exit(0); } //set up logging //BasicConfigurator.configure(); //overwrite file properties with properties from the command line //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e=props.propertyNames(); e.hasMoreElements(); ) { String prop=(String)e.nextElement(); fileprops.setProperty(prop,props.getProperty(prop)); } props=fileprops; if (!checkRequiredProperties(props)) { System.exit(0); } long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0")); //get number of threads, target and db threadcount=Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY,"1")); dbname=props.getProperty(DB_PROPERTY,"com.yahoo.ycsb.BasicDB"); target=Integer.parseInt(props.getProperty(TARGET_PROPERTY,"0")); //compute the target throughput double targetperthreadperms=-1; if (target>0) { double targetperthread=((double)target)/((double)threadcount); targetperthreadperms=targetperthread/1000.0; } System.out.println("YCSB Client 0.1"); System.out.print("Command line:"); for (int i=0; i threads=new Vector(); for (int threadid=0; threadid 0) { terminator = new TerminatorThread(maxExecutionTime, threads, workload); terminator.start(); } int opsDone = 0; for (Thread t : threads) { try { t.join(); opsDone += ((ClientThread)t).getOpsDone(); } catch (InterruptedException e) { } } long en=System.currentTimeMillis(); if (terminator != null && !terminator.isInterrupted()) { terminator.interrupt(); } if (status) { // wake up status thread if it's asleep statusthread.interrupt(); // at this point we assume all the monitored threads are already gone as per above join loop. try { statusthread.join(); } catch (InterruptedException e) { } } try { workload.cleanup(); } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { exportMeasurements(props, opsDone, en - st); } catch (IOException e) { System.err.println("Could not export measurements, error: " + e.getMessage()); e.printStackTrace(); System.exit(-1); } System.exit(0); } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java index dae8d784..9882f381 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java +++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java @@ -1,171 +1,182 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import java.util.HashMap; import java.util.Properties; import java.util.Set; import java.util.Vector; import com.yahoo.ycsb.measurements.Measurements; /** * Wrapper around a "real" DB that measures latencies and counts return codes. */ public class DBWrapper extends DB { DB _db; Measurements _measurements; public DBWrapper(DB db) { _db=db; _measurements=Measurements.getMeasurements(); } /** * Set the properties for this DB. */ public void setProperties(Properties p) { _db.setProperties(p); } /** * Get the set of properties for this DB. */ public Properties getProperties() { return _db.getProperties(); } /** * Initialize any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void init() throws DBException { _db.init(); } /** * Cleanup any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void cleanup() throws DBException { - long st=_measurements.startTimeNs(); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); _db.cleanup(); long en=System.nanoTime(); - _measurements.measure("CLEANUP", (int)((en-st)/1000)); + measure(ist, st, en); } /** * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. * * @param table The name of the table * @param key The record key of the record to read. * @param fields The list of fields to read, or null for all of them * @param result A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error */ public int read(String table, String key, Set fields, HashMap result) { - long st=_measurements.startTimeNs(); - int res=_db.read(table,key,fields,result); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + int res=_db.read(table,key,fields,result); long en=System.nanoTime(); - _measurements.measure("READ",(int)((en-st)/1000)); - _measurements.reportReturnCode("READ",res); + measure(ist, st, en); + _measurements.reportReturnCode("READ",res); return res; } /** * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap. * * @param table The name of the table * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return Zero on success, a non-zero error code on error */ public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - long st=_measurements.startTimeNs(); - int res=_db.scan(table,startkey,recordcount,fields,result); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + int res=_db.scan(table,startkey,recordcount,fields,result); long en=System.nanoTime(); - _measurements.measure("SCAN",(int)((en-st)/1000)); - _measurements.reportReturnCode("SCAN",res); + measure(ist, st, en); + _measurements.reportReturnCode("SCAN",res); return res; } + + private void measure(long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) { + _measurements.measure("CLEANUP", (int)((endTimeNanos-startTimeNanos)/1000)); + _measurements.measureIntended("CLEANUP", (int)((endTimeNanos-intendedStartTimeNanos)/1000)); + } /** * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * record key, overwriting any existing values with the same field name. * * @param table The name of the table * @param key The record key of the record to write. * @param values A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error */ public int update(String table, String key, HashMap values) { - long st=_measurements.startTimeNs(); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); int res=_db.update(table,key,values); long en=System.nanoTime(); - _measurements.measure("UPDATE",(int)((en-st)/1000)); + measure(ist, st, en); _measurements.reportReturnCode("UPDATE",res); return res; } /** * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * record key. * * @param table The name of the table * @param key The record key of the record to insert. * @param values A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error */ public int insert(String table, String key, HashMap values) { - long st=_measurements.startTimeNs(); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); int res=_db.insert(table,key,values); long en=System.nanoTime(); - _measurements.measure("INSERT",(int)((en-st)/1000)); + measure(ist, st, en); _measurements.reportReturnCode("INSERT",res); return res; } /** * Delete a record from the database. * * @param table The name of the table * @param key The record key of the record to delete. * @return Zero on success, a non-zero error code on error */ public int delete(String table, String key) { - long st=_measurements.startTimeNs(); + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); int res=_db.delete(table,key); long en=System.nanoTime(); - _measurements.measure("DELETE",(int)((en-st)/1000)); + measure(ist, st, en); _measurements.reportReturnCode("DELETE",res); return res; } } diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java index e0223b01..dc2e5f3e 100644 --- a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java +++ b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java @@ -1,197 +1,243 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.measurements; import java.io.IOException; -import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; /** * Collects latency measurements, and reports them when requested. * * @author cooperb * */ public class Measurements { public static final String MEASUREMENT_TYPE_PROPERTY = "measurementtype"; private static final String MEASUREMENT_TYPE_PROPERTY_DEFAULT = "histogram"; static Measurements singleton=null; static Properties measurementproperties=null; public static void setProperties(Properties props) { measurementproperties=props; } /** * Return the singleton Measurements object. */ public synchronized static Measurements getMeasurements() { if (singleton==null) { singleton=new Measurements(measurementproperties); } return singleton; } - final ConcurrentHashMap data; - int measurementType=0; + final ConcurrentHashMap opToMesurementMap; + final ConcurrentHashMap opToIntendedMesurementMap; + final int measurementType; private Properties _props; /** * Create a new object with the specified properties. */ public Measurements(Properties props) { - data=new ConcurrentHashMap(); + opToMesurementMap=new ConcurrentHashMap(); + opToIntendedMesurementMap=new ConcurrentHashMap(); _props=props; String mTypeString = _props.getProperty(MEASUREMENT_TYPE_PROPERTY, MEASUREMENT_TYPE_PROPERTY_DEFAULT); if (mTypeString.equals("histogram")) { measurementType=0; } else if (mTypeString.equals("hdrhistogram")) { measurementType=1; + } + else if (mTypeString.equals("hdrhistogram+buckethistogram")) + { + measurementType=2; } else { - measurementType=2; + measurementType=3; } } OneMeasurement constructOneMeasurement(String name) { switch (measurementType) { case 0: return new OneMeasurementHistogram(name, _props); case 1: return new OneMeasurementHdrHistogram(name, _props); - + case 2: + return new TwoInOneMeasurement(name, + new OneMeasurementHdrHistogram("Hdr"+name, _props), + new OneMeasurementHistogram("Bucket"+name, _props)); default: return new OneMeasurementTimeSeries(name, _props); } } static class StartTimeHolder{ long time; long startTime(){ if(time == 0) { return System.nanoTime(); } else { return time; } } } ThreadLocal tls = new ThreadLocal(){ protected StartTimeHolder initialValue() { return new StartTimeHolder(); }; }; - public void setStartTimeNs(long time){ + public void setIntendedStartTimeNs(long time){ tls.get().time=time; } - public long startTimeNs(){ + + public long getIntendedtartTimeNs(){ return tls.get().startTime(); } - /** - * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured value. - */ + + /** + * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured + * value. + */ public void measure(String operation, int latency) { - try { - OneMeasurement m = data.get(operation); - if(m == null) { - m = constructOneMeasurement(operation); - OneMeasurement oldM = data.putIfAbsent(operation, m); - if(oldM != null) - { - m = oldM; - } - } + OneMeasurement m = getOpMeasurement(operation); m.measure(latency); } + // This seems like a terribly hacky way to cover up for a bug in the measurement code catch (java.lang.ArrayIndexOutOfBoundsException e) { System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing"); e.printStackTrace(); e.printStackTrace(System.out); } } + /** + * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured + * value. + */ + public void measureIntended(String operation, int latency) + { + try + { + OneMeasurement m = getOpIntendedMeasurement(operation); + m.measure(latency); + } + // This seems like a terribly hacky way to cover up for a bug in the measurement code + catch (java.lang.ArrayIndexOutOfBoundsException e) + { + System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing"); + e.printStackTrace(); + e.printStackTrace(System.out); + } + } + private OneMeasurement getOpMeasurement(String operation) { + OneMeasurement m = opToMesurementMap.get(operation); + if(m == null) + { + m = constructOneMeasurement(operation); + OneMeasurement oldM = opToMesurementMap.putIfAbsent(operation, m); + if(oldM != null) + { + m = oldM; + } + } + return m; + } + private OneMeasurement getOpIntendedMeasurement(String operation) { + OneMeasurement m = opToIntendedMesurementMap.get(operation); + if(m == null) + { + m = constructOneMeasurement("Intended-"+operation); + OneMeasurement oldM = opToIntendedMesurementMap.putIfAbsent(operation, m); + if(oldM != null) + { + m = oldM; + } + } + return m; + } /** * Report a return code for a single DB operaiton. */ public void reportReturnCode(String operation, int code) { - if (!data.containsKey(operation)) - { - synchronized(this) - { - if (!data.containsKey(operation)) - { - data.put(operation,constructOneMeasurement(operation)); - } - } - } - data.get(operation).reportReturnCode(code); + OneMeasurement m = getOpMeasurement(operation); + m.reportReturnCode(code); } /** * Export the current measurements to a suitable format. * * @param exporter Exporter representing the type of format to write to. * @throws IOException Thrown if the export failed. */ public void exportMeasurements(MeasurementsExporter exporter) throws IOException { - for (OneMeasurement measurement : data.values()) - { - measurement.exportMeasurements(exporter); - } + for (OneMeasurement measurement : opToMesurementMap.values()) + { + measurement.exportMeasurements(exporter); + } + for (OneMeasurement measurement : opToIntendedMesurementMap.values()) + { + measurement.exportMeasurements(exporter); + } } /** * Return a one line summary of the measurements. */ public synchronized String getSummary() { String ret=""; - for (OneMeasurement m : data.values()) + for (OneMeasurement m : opToMesurementMap.values()) { ret+=m.getSummary()+" "; } - + for (OneMeasurement m : opToIntendedMesurementMap.values()) + { + ret+=m.getSummary()+" "; + } return ret; } } diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java index 1265a65c..1ed37733 100644 --- a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java +++ b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java @@ -1,134 +1,133 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.measurements; import java.io.IOException; import java.text.DecimalFormat; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.HdrHistogram.ConcurrentHistogram; import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; /** - * Take measurements and maintain a histogram of a given metric, such as READ LATENCY. + * Take measurements and maintain a HdrHistogram of a given metric, such as READ LATENCY. * - * @author cooperb + * @author nitsanw * */ public class OneMeasurementHdrHistogram extends OneMeasurement { Recorder histogram = new Recorder(3); final ConcurrentHashMap returncodes; Histogram totalHistogram; public OneMeasurementHdrHistogram(String name, Properties props) { super(name); returncodes = new ConcurrentHashMap(); } /** * No need for synchronization, using CHM to deal with that * * @see com.yahoo.ycsb.OneMeasurement#reportReturnCode(int) */ public void reportReturnCode(int code) { Integer Icode = code; AtomicInteger counter = returncodes.get(Icode); if (counter == null) { AtomicInteger other = returncodes.putIfAbsent(Icode, counter = new AtomicInteger()); if (other != null) { counter = other; } } counter.incrementAndGet(); } /** * It appears latency is reported in micros. - * Using {@link ConcurrentHistogram} to support concurrent updates to histogram. + * Using {@link Recorder} to support concurrent updates to histogram. * * @see com.yahoo.ycsb.OneMeasurement#measure(int) */ public void measure(int latencyInMicros) { histogram.recordValue(latencyInMicros); } /** * This is called from a main thread, on orderly termination. * * @see com.yahoo.ycsb.measurements.OneMeasurement#exportMeasurements(com.yahoo.ycsb.measurements.exporter.MeasurementsExporter) */ @Override public void exportMeasurements(MeasurementsExporter exporter) throws IOException { - Histogram lastIntervalHistogram = histogram.getIntervalHistogram(); - // add this to the total time histogram. - if (totalHistogram == null) { - totalHistogram = lastIntervalHistogram; - } - else { - totalHistogram.add(lastIntervalHistogram); - } + // accumulate the last interval which was not caught by status thread + getIntervalHistogramAndAccumulate(); exporter.write(getName(), "Operations", totalHistogram.getTotalCount()); exporter.write(getName(), "AverageLatency(us)", totalHistogram.getMean()); exporter.write(getName(), "MinLatency(us)", totalHistogram.getMinValue()); exporter.write(getName(), "MaxLatency(us)", totalHistogram.getMaxValue()); exporter.write(getName(), "95thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(90)/1000); exporter.write(getName(), "99thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(99)/1000); for (Map.Entry entry : returncodes.entrySet()) { exporter.write(getName(), "Return=" + entry.getKey(), entry.getValue().get()); } } /** * This is called periodically from the StatusThread. There's a single StatusThread per Client process. + * We optionally serialize the interval to log on this opportunity. * @see com.yahoo.ycsb.measurements.OneMeasurement#getSummary() */ @Override public String getSummary() { - Histogram intervalHistogram = histogram.getIntervalHistogram(); - // add this to the total time histogram. - if (totalHistogram == null) { - totalHistogram = intervalHistogram; - } - else { - totalHistogram.add(intervalHistogram); - } + Histogram intervalHistogram = getIntervalHistogramAndAccumulate(); DecimalFormat d = new DecimalFormat("#.##"); return "[" + getName() + ": Count=" + intervalHistogram.getTotalCount() + ", Max=" + intervalHistogram.getMaxValue() + ", Min=" + intervalHistogram.getMinValue() + ", Avg=" + d.format(intervalHistogram.getMean()) + ", 90=" + d.format(intervalHistogram.getValueAtPercentile(90)) + ", 99=" + d.format(intervalHistogram.getValueAtPercentile(99)) + ", 99.9=" + d.format(intervalHistogram.getValueAtPercentile(99.9)) + ", 99.99=" + d.format(intervalHistogram.getValueAtPercentile(99.99)) +"]"; } + private Histogram getIntervalHistogramAndAccumulate() { + Histogram intervalHistogram = histogram.getIntervalHistogram(); + // add this to the total time histogram. + if (totalHistogram == null) { + totalHistogram = intervalHistogram; + } + else { + totalHistogram.add(intervalHistogram); + } + return intervalHistogram; + } + } diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java b/core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java similarity index 53% copy from core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java copy to core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java index 1265a65c..8fdabde0 100644 --- a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java +++ b/core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java @@ -1,134 +1,79 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.measurements; import java.io.IOException; -import java.text.DecimalFormat; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.HdrHistogram.ConcurrentHistogram; -import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; /** - * Take measurements and maintain a histogram of a given metric, such as READ LATENCY. - * - * @author cooperb + * delegates to 2 measuremement instances. + * @author nitsanw * */ -public class OneMeasurementHdrHistogram extends OneMeasurement { - - Recorder histogram = new Recorder(3); - - final ConcurrentHashMap returncodes; - - Histogram totalHistogram; - - public OneMeasurementHdrHistogram(String name, Properties props) { +public class TwoInOneMeasurement extends OneMeasurement { + final OneMeasurement thing1,thing2; + public TwoInOneMeasurement(String name, OneMeasurement thing1,OneMeasurement thing2) { super(name); - returncodes = new ConcurrentHashMap(); + this.thing1 = thing1; + this.thing2 = thing2; } /** * No need for synchronization, using CHM to deal with that * * @see com.yahoo.ycsb.OneMeasurement#reportReturnCode(int) */ public void reportReturnCode(int code) { - Integer Icode = code; - AtomicInteger counter = returncodes.get(Icode); - if (counter == null) { - AtomicInteger other = returncodes.putIfAbsent(Icode, counter = new AtomicInteger()); - if (other != null) { - counter = other; - } - } - - counter.incrementAndGet(); + thing1.reportReturnCode(code); } /** * It appears latency is reported in micros. - * Using {@link ConcurrentHistogram} to support concurrent updates to histogram. + * Using {@link Recorder} to support concurrent updates to histogram. * * @see com.yahoo.ycsb.OneMeasurement#measure(int) */ public void measure(int latencyInMicros) { - histogram.recordValue(latencyInMicros); + thing1.measure(latencyInMicros); + thing2.measure(latencyInMicros); } /** * This is called from a main thread, on orderly termination. * * @see com.yahoo.ycsb.measurements.OneMeasurement#exportMeasurements(com.yahoo.ycsb.measurements.exporter.MeasurementsExporter) */ @Override public void exportMeasurements(MeasurementsExporter exporter) throws IOException { - Histogram lastIntervalHistogram = histogram.getIntervalHistogram(); - // add this to the total time histogram. - if (totalHistogram == null) { - totalHistogram = lastIntervalHistogram; - } - else { - totalHistogram.add(lastIntervalHistogram); - } - exporter.write(getName(), "Operations", totalHistogram.getTotalCount()); - exporter.write(getName(), "AverageLatency(us)", totalHistogram.getMean()); - exporter.write(getName(), "MinLatency(us)", totalHistogram.getMinValue()); - exporter.write(getName(), "MaxLatency(us)", totalHistogram.getMaxValue()); - exporter.write(getName(), "95thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(90)/1000); - exporter.write(getName(), "99thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(99)/1000); - - for (Map.Entry entry : returncodes.entrySet()) { - exporter.write(getName(), "Return=" + entry.getKey(), entry.getValue().get()); - } - + thing1.exportMeasurements(exporter); + thing2.exportMeasurements(exporter); } /** * This is called periodically from the StatusThread. There's a single StatusThread per Client process. + * We optionally serialize the interval to log on this opportunity. * @see com.yahoo.ycsb.measurements.OneMeasurement#getSummary() */ @Override public String getSummary() { - Histogram intervalHistogram = histogram.getIntervalHistogram(); - // add this to the total time histogram. - if (totalHistogram == null) { - totalHistogram = intervalHistogram; - } - else { - totalHistogram.add(intervalHistogram); - } - DecimalFormat d = new DecimalFormat("#.##"); - return "[" + getName() + - ": Count=" + intervalHistogram.getTotalCount() + - ", Max=" + intervalHistogram.getMaxValue() + - ", Min=" + intervalHistogram.getMinValue() + - ", Avg=" + d.format(intervalHistogram.getMean()) + - ", 90=" + d.format(intervalHistogram.getValueAtPercentile(90)) + - ", 99=" + d.format(intervalHistogram.getValueAtPercentile(99)) + - ", 99.9=" + d.format(intervalHistogram.getValueAtPercentile(99.9)) + - ", 99.99=" + d.format(intervalHistogram.getValueAtPercentile(99.99)) +"]"; + return thing1.getSummary() + "\n" + thing2.getSummary(); } - } diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index ec63c96f..5e65c18c 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,762 +1,765 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import java.util.Properties; + import com.yahoo.ycsb.*; import com.yahoo.ycsb.generator.CounterGenerator; import com.yahoo.ycsb.generator.DiscreteGenerator; import com.yahoo.ycsb.generator.ExponentialGenerator; import com.yahoo.ycsb.generator.Generator; import com.yahoo.ycsb.generator.ConstantIntegerGenerator; import com.yahoo.ycsb.generator.HotspotIntegerGenerator; import com.yahoo.ycsb.generator.HistogramGenerator; import com.yahoo.ycsb.generator.IntegerGenerator; import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; import com.yahoo.ycsb.generator.SkewedLatestGenerator; import com.yahoo.ycsb.generator.UniformIntegerGenerator; import com.yahoo.ycsb.generator.ZipfianGenerator; import com.yahoo.ycsb.measurements.Measurements; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Vector; import java.util.List; import java.util.Map; import java.util.ArrayList; /** * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative * proportion of different kinds of operations, and other properties of the workload, are controlled * by parameters specified at runtime. * * Properties to control the client: *
    *
  • fieldcount: the number of fields in a record (default: 10) *
  • fieldlength: the size of each field (default: 100) *
  • readallfields: should reads read all fields (true) or just one (false) (default: true) *
  • writeallfields: should updates and read/modify/writes update all fields (true) or just one (false) (default: false) *
  • readproportion: what proportion of operations should be reads (default: 0.95) *
  • updateproportion: what proportion of operations should be updates (default: 0.05) *
  • insertproportion: what proportion of operations should be inserts (default: 0) *
  • scanproportion: what proportion of operations should be scans (default: 0) *
  • readmodifywriteproportion: what proportion of operations should be read a record, modify it, write it back (default: 0) *
  • requestdistribution: what distribution should be used to select the records to operate on - uniform, zipfian, hotspot, or latest (default: uniform) *
  • maxscanlength: for scans, what is the maximum number of records to scan (default: 1000) *
  • scanlengthdistribution: for scans, what distribution should be used to choose the number of records to scan, for each scan, between 1 and maxscanlength (default: uniform) *
  • insertorder: should records be inserted in order by key ("ordered"), or in hashed order ("hashed") (default: hashed) *
*/ public class CoreWorkload extends Workload { /** * The name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY="table"; /** * The default name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY_DEFAULT="usertable"; public static String table; /** * The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY="fieldcount"; /** * Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT="10"; int fieldcount; private List fieldnames; /** * The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram". * * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the fieldlength property. If "histogram", then the * histogram will be read from the filename specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY="fieldlengthdistribution"; /** * The default field length distribution. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; /** * The name of the property for the length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY="fieldlength"; /** * The default maximum length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY_DEFAULT="100"; /** * The name of a property that specifies the filename containing the field length histogram (only used if fieldlengthdistribution is "histogram"). */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; /** * The default filename containing a field length histogram. */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; /** * Generator object that produces field lengths. The value of this depends on the properties that start with "FIELD_LENGTH_". */ IntegerGenerator fieldlengthgenerator; /** * The name of the property for deciding whether to read one field (false) or all fields (true) of a record. */ public static final String READ_ALL_FIELDS_PROPERTY="readallfields"; /** * The default value for the readallfields property. */ public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT="true"; boolean readallfields; /** * The name of the property for deciding whether to write one field (false) or all fields (true) of a record. */ public static final String WRITE_ALL_FIELDS_PROPERTY="writeallfields"; /** * The default value for the writeallfields property. */ public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT="false"; boolean writeallfields; /** * The name of the property for deciding whether to check all returned * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; /** * The default value for the dataintegrity property. */ public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false"; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ private boolean dataintegrity; /** * Response values for data integrity checks. * Need to be multiples of 1000 to match bucket offsets of * measurements/OneMeasurementHistogram.java. */ private final int DATA_INT_MATCH = 0; private final int DATA_INT_DEVIATE = 1000; private final int DATA_INT_UNEXPECTED_NULL = 2000; /** * The name of the property for the proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY="readproportion"; /** * The default proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY_DEFAULT="0.95"; /** * The name of the property for the proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY="updateproportion"; /** * The default proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT="0.05"; /** * The name of the property for the proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY="insertproportion"; /** * The default proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY="scanproportion"; /** * The default proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the proportion of transactions that are read-modify-write. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY="readmodifywriteproportion"; /** * The default proportion of transactions that are scans. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the the distribution of requests across the keyspace. Options are "uniform", "zipfian" and "latest" */ public static final String REQUEST_DISTRIBUTION_PROPERTY="requestdistribution"; /** * The default distribution of requests across the keyspace */ public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; /** * The name of the property for the max scan length (number of records) */ public static final String MAX_SCAN_LENGTH_PROPERTY="maxscanlength"; /** * The default max scan length. */ public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT="1000"; /** * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" (favoring short scans) */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY="scanlengthdistribution"; /** * The default max scan length. */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; /** * The name of the property for the order to insert records. Options are "ordered" or "hashed" */ public static final String INSERT_ORDER_PROPERTY="insertorder"; /** * Default insert order. */ public static final String INSERT_ORDER_PROPERTY_DEFAULT="hashed"; /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; IntegerGenerator keysequence; DiscreteGenerator operationchooser; IntegerGenerator keychooser; Generator fieldchooser; CounterGenerator transactioninsertkeysequence; IntegerGenerator scanlength; boolean orderedinserts; int recordcount; private Measurements _measurements = Measurements.getMeasurements(); protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{ IntegerGenerator fieldlengthgenerator; String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int fieldlength=Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY,FIELD_LENGTH_PROPERTY_DEFAULT)); String fieldlengthhistogram = p.getProperty(FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); if(fieldlengthdistribution.compareTo("constant") == 0) { fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); } else if(fieldlengthdistribution.compareTo("uniform") == 0) { fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength); } else if(fieldlengthdistribution.compareTo("zipfian") == 0) { fieldlengthgenerator = new ZipfianGenerator(1, fieldlength); } else if(fieldlengthdistribution.compareTo("histogram") == 0) { try { fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); } catch(IOException e) { throw new WorkloadException("Couldn't read field length histogram file: "+fieldlengthhistogram, e); } } else { throw new WorkloadException("Unknown field length distribution \""+fieldlengthdistribution+"\""); } return fieldlengthgenerator; } /** * Initialize the scenario. * Called once, in the main client thread, before any operations are started. */ public void init(Properties p) throws WorkloadException { table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT); fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT)); fieldnames = new ArrayList(); for (int i = 0; i < fieldcount; i++) { fieldnames.add("field" + i); } fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT)); double updateproportion=Double.parseDouble(p.getProperty(UPDATE_PROPORTION_PROPERTY,UPDATE_PROPORTION_PROPERTY_DEFAULT)); double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT)); double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT)); double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int insertstart=Integer.parseInt(p.getProperty(INSERT_START_PROPERTY,INSERT_START_PROPERTY_DEFAULT)); readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT)); writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); //Confirm that fieldlengthgenerator returns a constant if data //integrity check requested. if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0) { orderedinserts=false; } else if (requestdistrib.compareTo("exponential")==0) { double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordcount*frac); } else { orderedinserts=true; } keysequence=new CounterGenerator(insertstart); operationchooser=new DiscreteGenerator(); if (readproportion>0) { operationchooser.addValue(readproportion,"READ"); } if (updateproportion>0) { operationchooser.addValue(updateproportion,"UPDATE"); } if (insertproportion>0) { operationchooser.addValue(insertproportion,"INSERT"); } if (scanproportion>0) { operationchooser.addValue(scanproportion,"SCAN"); } if (readmodifywriteproportion>0) { operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE"); } transactioninsertkeysequence=new CounterGenerator(recordcount); if (requestdistrib.compareTo("uniform")==0) { keychooser=new UniformIntegerGenerator(0,recordcount-1); } else if (requestdistrib.compareTo("zipfian")==0) { //it does this by generating a random "next key" in part by taking the modulus over the number of keys //if the number of keys changes, this would shift the modulus, and we don't want that to change which keys are popular //so we'll actually construct the scrambled zipfian generator with a keyspace that is larger than exists at the beginning //of the test. that is, we'll predict the number of inserts, and tell the scrambled zipfian generator the number of existing keys //plus the number of predicted keys as the total keyspace. then, if the generator picks a key that hasn't been inserted yet, will //just ignore it and pick another key. this way, the size of the keyspace doesn't change from the perspective of the scrambled zipfian generator int opcount=Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys=(int)(((double)opcount)*insertproportion*2.0); //2 is fudge factor keychooser=new ScrambledZipfianGenerator(recordcount+expectednewkeys); } else if (requestdistrib.compareTo("latest")==0) { keychooser=new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty( HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty( HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \""+requestdistrib+"\""); } fieldchooser=new UniformIntegerGenerator(0,fieldcount-1); if (scanlengthdistrib.compareTo("uniform")==0) { scanlength=new UniformIntegerGenerator(1,maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian")==0) { scanlength=new ZipfianGenerator(1,maxscanlength); } else { throw new WorkloadException("Distribution \""+scanlengthdistrib+"\" not allowed for scan length"); } } public String buildKeyName(long keynum) { if (!orderedinserts) { keynum=Utils.hash(keynum); } return "user"+keynum; } /** * Builds a value for a randomly chosen field. */ private HashMap buildSingleValue(String key) { HashMap value = new HashMap(); String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { //fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } value.put(fieldkey,data); return value; } /** * Builds values for all fields. */ private HashMap buildValues(String key) { HashMap values = new HashMap(); for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { //fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } values.put(fieldkey,data); } return values; } /** * Build a deterministic value given the key information. */ private String buildDeterministicValue(String key, String fieldkey) { int size = fieldlengthgenerator.nextInt(); StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations. */ public boolean doInsert(DB db, Object threadstate) { int keynum=keysequence.nextInt(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); if (db.insert(table,dbkey,values) == 0) return true; else return false; } /** * Do one transaction operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations. */ public boolean doTransaction(DB db, Object threadstate) { String op=operationchooser.nextString(); if (op.compareTo("READ")==0) { doTransactionRead(db); } else if (op.compareTo("UPDATE")==0) { doTransactionUpdate(db); } else if (op.compareTo("INSERT")==0) { doTransactionInsert(db); } else if (op.compareTo("SCAN")==0) { doTransactionScan(db); } else { doTransactionReadModifyWrite(db); } return true; } /** * Results are reported in the first three buckets of the histogram under * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. * Bucket 2 means null data was returned when some data was expected. */ protected void verifyRow(String key, HashMap cells) { int matchType = DATA_INT_MATCH; if (!cells.isEmpty()) { for (Map.Entry entry : cells.entrySet()) { if (!entry.getValue().toString().equals( buildDeterministicValue(key, entry.getKey()))) { matchType = DATA_INT_DEVIATE; break; } } } else { //This assumes that null data is never valid matchType = DATA_INT_UNEXPECTED_NULL; } Measurements.getMeasurements().measure("VERIFY", matchType); } int nextKeynum() { int keynum; if(keychooser instanceof ExponentialGenerator) { do { keynum=transactioninsertkeysequence.lastInt() - keychooser.nextInt(); } while(keynum < 0); } else { do { keynum=keychooser.nextInt(); } while (keynum > transactioninsertkeysequence.lastInt()); } return keynum; } public void doTransactionRead(DB db) { //choose a random key int keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } HashMap cells = new HashMap(); db.read(table,keyname,fields,cells); if (dataintegrity) { verifyRow(keyname, cells); } } public void doTransactionReadModifyWrite(DB db) { //choose a random key int keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } HashMap values; if (writeallfields) { //new data for all the fields values = buildValues(keyname); } else { //update a random field values = buildSingleValue(keyname); } //do the transaction HashMap cells = new HashMap(); - long st=_measurements.startTimeNs(); - + + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); db.read(table,keyname,fields,cells); db.update(table,keyname,values); long en=System.nanoTime(); if (dataintegrity) { verifyRow(keyname, cells); } _measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000)); + _measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000)); } public void doTransactionScan(DB db) { //choose a random key int keynum = nextKeynum(); String startkeyname = buildKeyName(keynum); //choose a random scan length int len=scanlength.nextInt(); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } db.scan(table,startkeyname,len,fields,new Vector>()); } public void doTransactionUpdate(DB db) { //choose a random key int keynum = nextKeynum(); String keyname=buildKeyName(keynum); HashMap values; if (writeallfields) { //new data for all the fields values = buildValues(keyname); } else { //update a random field values = buildSingleValue(keyname); } db.update(table,keyname,values); } public void doTransactionInsert(DB db) { //choose the next key int keynum=transactioninsertkeysequence.nextInt(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); db.insert(table,dbkey,values); } }