diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index 08d9b7b1..7974bcfe 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,772 +1,765 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import java.util.Properties; import com.yahoo.ycsb.*; import com.yahoo.ycsb.generator.AcknowledgedCounterGenerator; import com.yahoo.ycsb.generator.CounterGenerator; import com.yahoo.ycsb.generator.DiscreteGenerator; import com.yahoo.ycsb.generator.ExponentialGenerator; import com.yahoo.ycsb.generator.Generator; import com.yahoo.ycsb.generator.ConstantIntegerGenerator; import com.yahoo.ycsb.generator.HotspotIntegerGenerator; import com.yahoo.ycsb.generator.HistogramGenerator; import com.yahoo.ycsb.generator.IntegerGenerator; import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; import com.yahoo.ycsb.generator.SkewedLatestGenerator; import com.yahoo.ycsb.generator.UniformIntegerGenerator; import com.yahoo.ycsb.generator.ZipfianGenerator; import com.yahoo.ycsb.measurements.Measurements; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Vector; import java.util.List; import java.util.Map; import java.util.ArrayList; /** * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative * proportion of different kinds of operations, and other properties of the workload, are controlled * by parameters specified at runtime. * * Properties to control the client: * */ public class CoreWorkload extends Workload { /** * The name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY="table"; /** * The default name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY_DEFAULT="usertable"; public static String table; /** * The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY="fieldcount"; /** * Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT="10"; int fieldcount; private List fieldnames; /** * The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram". * * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the fieldlength property. If "histogram", then the * histogram will be read from the filename specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY="fieldlengthdistribution"; /** * The default field length distribution. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; /** * The name of the property for the length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY="fieldlength"; /** * The default maximum length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY_DEFAULT="100"; /** * The name of a property that specifies the filename containing the field length histogram (only used if fieldlengthdistribution is "histogram"). */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; /** * The default filename containing a field length histogram. */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; /** * Generator object that produces field lengths. The value of this depends on the properties that start with "FIELD_LENGTH_". */ IntegerGenerator fieldlengthgenerator; /** * The name of the property for deciding whether to read one field (false) or all fields (true) of a record. */ public static final String READ_ALL_FIELDS_PROPERTY="readallfields"; /** * The default value for the readallfields property. */ public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT="true"; boolean readallfields; /** * The name of the property for deciding whether to write one field (false) or all fields (true) of a record. */ public static final String WRITE_ALL_FIELDS_PROPERTY="writeallfields"; /** * The default value for the writeallfields property. */ public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT="false"; boolean writeallfields; /** * The name of the property for deciding whether to check all returned * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; /** * The default value for the dataintegrity property. */ public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false"; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ private boolean dataintegrity; - /** - * Response values for data integrity checks. - * Need to be multiples of 1000 to match bucket offsets of - * measurements/OneMeasurementHistogram.java. - */ - private final int DATA_INT_MATCH = 0; - private final int DATA_INT_DEVIATE = 1000; - private final int DATA_INT_UNEXPECTED_NULL = 2000; - - /** * The name of the property for the proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY="readproportion"; /** * The default proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY_DEFAULT="0.95"; /** * The name of the property for the proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY="updateproportion"; /** * The default proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT="0.05"; /** * The name of the property for the proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY="insertproportion"; /** * The default proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY="scanproportion"; /** * The default proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the proportion of transactions that are read-modify-write. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY="readmodifywriteproportion"; /** * The default proportion of transactions that are scans. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT="0.0"; /** * The name of the property for the the distribution of requests across the keyspace. Options are "uniform", "zipfian" and "latest" */ public static final String REQUEST_DISTRIBUTION_PROPERTY="requestdistribution"; /** * The default distribution of requests across the keyspace */ public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; /** * The name of the property for the max scan length (number of records) */ public static final String MAX_SCAN_LENGTH_PROPERTY="maxscanlength"; /** * The default max scan length. */ public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT="1000"; /** * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" (favoring short scans) */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY="scanlengthdistribution"; /** * The default max scan length. */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; /** * The name of the property for the order to insert records. Options are "ordered" or "hashed" */ public static final String INSERT_ORDER_PROPERTY="insertorder"; /** * Default insert order. */ public static final String INSERT_ORDER_PROPERTY_DEFAULT="hashed"; /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; IntegerGenerator keysequence; DiscreteGenerator operationchooser; IntegerGenerator keychooser; Generator fieldchooser; AcknowledgedCounterGenerator transactioninsertkeysequence; IntegerGenerator scanlength; boolean orderedinserts; int recordcount; private Measurements _measurements = Measurements.getMeasurements(); protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{ IntegerGenerator fieldlengthgenerator; String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int fieldlength=Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY,FIELD_LENGTH_PROPERTY_DEFAULT)); String fieldlengthhistogram = p.getProperty(FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); if(fieldlengthdistribution.compareTo("constant") == 0) { fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); } else if(fieldlengthdistribution.compareTo("uniform") == 0) { fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength); } else if(fieldlengthdistribution.compareTo("zipfian") == 0) { fieldlengthgenerator = new ZipfianGenerator(1, fieldlength); } else if(fieldlengthdistribution.compareTo("histogram") == 0) { try { fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); } catch(IOException e) { throw new WorkloadException("Couldn't read field length histogram file: "+fieldlengthhistogram, e); } } else { throw new WorkloadException("Unknown field length distribution \""+fieldlengthdistribution+"\""); } return fieldlengthgenerator; } /** * Initialize the scenario. * Called once, in the main client thread, before any operations are started. */ public void init(Properties p) throws WorkloadException { table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT); fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT)); fieldnames = new ArrayList(); for (int i = 0; i < fieldcount; i++) { fieldnames.add("field" + i); } fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT)); double updateproportion=Double.parseDouble(p.getProperty(UPDATE_PROPORTION_PROPERTY,UPDATE_PROPORTION_PROPERTY_DEFAULT)); double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT)); double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT)); double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if(recordcount == 0) recordcount = Integer.MAX_VALUE; String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int insertstart=Integer.parseInt(p.getProperty(INSERT_START_PROPERTY,INSERT_START_PROPERTY_DEFAULT)); readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT)); writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); //Confirm that fieldlengthgenerator returns a constant if data //integrity check requested. if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0) { orderedinserts=false; } else if (requestdistrib.compareTo("exponential")==0) { double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordcount*frac); } else { orderedinserts=true; } keysequence=new CounterGenerator(insertstart); operationchooser=new DiscreteGenerator(); if (readproportion>0) { operationchooser.addValue(readproportion,"READ"); } if (updateproportion>0) { operationchooser.addValue(updateproportion,"UPDATE"); } if (insertproportion>0) { operationchooser.addValue(insertproportion,"INSERT"); } if (scanproportion>0) { operationchooser.addValue(scanproportion,"SCAN"); } if (readmodifywriteproportion>0) { operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE"); } transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform")==0) { keychooser=new UniformIntegerGenerator(0,recordcount-1); } else if (requestdistrib.compareTo("zipfian")==0) { //it does this by generating a random "next key" in part by taking the modulus over the number of keys //if the number of keys changes, this would shift the modulus, and we don't want that to change which keys are popular //so we'll actually construct the scrambled zipfian generator with a keyspace that is larger than exists at the beginning //of the test. that is, we'll predict the number of inserts, and tell the scrambled zipfian generator the number of existing keys //plus the number of predicted keys as the total keyspace. then, if the generator picks a key that hasn't been inserted yet, will //just ignore it and pick another key. this way, the size of the keyspace doesn't change from the perspective of the scrambled zipfian generator int opcount=Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys=(int)(((double)opcount)*insertproportion*2.0); //2 is fudge factor keychooser=new ScrambledZipfianGenerator(recordcount+expectednewkeys); } else if (requestdistrib.compareTo("latest")==0) { keychooser=new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty( HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty( HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \""+requestdistrib+"\""); } fieldchooser=new UniformIntegerGenerator(0,fieldcount-1); if (scanlengthdistrib.compareTo("uniform")==0) { scanlength=new UniformIntegerGenerator(1,maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian")==0) { scanlength=new ZipfianGenerator(1,maxscanlength); } else { throw new WorkloadException("Distribution \""+scanlengthdistrib+"\" not allowed for scan length"); } } public String buildKeyName(long keynum) { if (!orderedinserts) { keynum=Utils.hash(keynum); } return "user"+keynum; } /** * Builds a value for a randomly chosen field. */ private HashMap buildSingleValue(String key) { HashMap value = new HashMap(); String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { //fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } value.put(fieldkey,data); return value; } /** * Builds values for all fields. */ private HashMap buildValues(String key) { HashMap values = new HashMap(); for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { //fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } values.put(fieldkey,data); } return values; } /** * Build a deterministic value given the key information. */ private String buildDeterministicValue(String key, String fieldkey) { int size = fieldlengthgenerator.nextInt(); StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations. */ public boolean doInsert(DB db, Object threadstate) { int keynum=keysequence.nextInt(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); if (db.insert(table,dbkey,values).equals(Status.OK)) return true; else return false; } /** * Do one transaction operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations. */ public boolean doTransaction(DB db, Object threadstate) { String op=operationchooser.nextString(); if (op.compareTo("READ")==0) { doTransactionRead(db); } else if (op.compareTo("UPDATE")==0) { doTransactionUpdate(db); } else if (op.compareTo("INSERT")==0) { doTransactionInsert(db); } else if (op.compareTo("SCAN")==0) { doTransactionScan(db); } else { doTransactionReadModifyWrite(db); } return true; } /** * Results are reported in the first three buckets of the histogram under * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. * Bucket 2 means null data was returned when some data was expected. */ protected void verifyRow(String key, HashMap cells) { - int matchType = DATA_INT_MATCH; - if (!cells.isEmpty()) { + Status verifyStatus = Status.OK; + long startTime = System.nanoTime(); + if (!cells.isEmpty()) { for (Map.Entry entry : cells.entrySet()) { - if (!entry.getValue().toString().equals( + if (!entry.getValue().toString().equals( buildDeterministicValue(key, entry.getKey()))) { - matchType = DATA_INT_DEVIATE; - break; + verifyStatus = Status.UNEXPECTED_STATE; + break; } } } else { //This assumes that null data is never valid - matchType = DATA_INT_UNEXPECTED_NULL; + verifyStatus = Status.ERROR; } - Measurements.getMeasurements().measure("VERIFY", matchType); + long endTime = System.nanoTime(); + _measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); + _measurements.reportStatus("VERIFY",verifyStatus); } int nextKeynum() { int keynum; if(keychooser instanceof ExponentialGenerator) { do { keynum=transactioninsertkeysequence.lastInt() - keychooser.nextInt(); } while(keynum < 0); } else { do { keynum=keychooser.nextInt(); } while (keynum > transactioninsertkeysequence.lastInt()); } return keynum; } public void doTransactionRead(DB db) { //choose a random key int keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } HashMap cells = new HashMap(); db.read(table,keyname,fields,cells); if (dataintegrity) { verifyRow(keyname, cells); } } public void doTransactionReadModifyWrite(DB db) { //choose a random key int keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } HashMap values; if (writeallfields) { //new data for all the fields values = buildValues(keyname); } else { //update a random field values = buildSingleValue(keyname); } //do the transaction HashMap cells = new HashMap(); long ist=_measurements.getIntendedtartTimeNs(); long st = System.nanoTime(); db.read(table,keyname,fields,cells); db.update(table,keyname,values); long en=System.nanoTime(); if (dataintegrity) { verifyRow(keyname, cells); } _measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000)); _measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000)); } public void doTransactionScan(DB db) { //choose a random key int keynum = nextKeynum(); String startkeyname = buildKeyName(keynum); //choose a random scan length int len=scanlength.nextInt(); HashSet fields=null; if (!readallfields) { //read a random field String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); fields=new HashSet(); fields.add(fieldname); } db.scan(table,startkeyname,len,fields,new Vector>()); } public void doTransactionUpdate(DB db) { //choose a random key int keynum = nextKeynum(); String keyname=buildKeyName(keynum); HashMap values; if (writeallfields) { //new data for all the fields values = buildValues(keyname); } else { //update a random field values = buildSingleValue(keyname); } db.update(table,keyname,values); } public void doTransactionInsert(DB db) { //choose the next key int keynum=transactioninsertkeysequence.nextInt(); try { String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); db.insert(table,dbkey,values); } finally { transactioninsertkeysequence.acknowledge(keynum); } } }