diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java index 3a637eeb..5d0eabd0 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java @@ -1,1286 +1,1286 @@ /** * Copyright (c) 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.TimeUnit; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Client; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.NumericByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.Utils; import com.yahoo.ycsb.Workload; import com.yahoo.ycsb.WorkloadException; import com.yahoo.ycsb.generator.DiscreteGenerator; import com.yahoo.ycsb.generator.Generator; import com.yahoo.ycsb.generator.HotspotIntegerGenerator; import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator; import com.yahoo.ycsb.generator.NumberGenerator; import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator; import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; import com.yahoo.ycsb.generator.SequentialGenerator; import com.yahoo.ycsb.generator.UniformLongGenerator; import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator; import com.yahoo.ycsb.generator.ZipfianGenerator; import com.yahoo.ycsb.measurements.Measurements; /** * A specialized workload dealing with time series data, i.e. series of discreet * events associated with timestamps and identifiers. For this workload, identities * consist of a {@link String} key and a set of {@link String} tag key/value * pairs. *

* For example: * * * * * * *
Time Series KeyTag Keys/Values148322880014832288601483228920
AAAA=AA, AB=AA42.51.085.9
AAAA=AA, AB=AB-9.476.90.18
ABAA=AA, AB=AA-93.057.1-63.8
ABAA=AA, AB=AB7.656.1-0.3
*

* This table shows four time series with 3 measurements at three different timestamps. * Keys, tags, timestamps and values (numeric only at this time) are generated by * this workload. For details on properties and behavior, see the * {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation * and how {@link DB} clients can parse the workload. *

* In order to avoid having existing DB implementations implement a brand new interface * this workload uses the existing APIs to encode a few special values that can be parsed * by the client. The special values include the timestamp, numeric value and some * query (read or scan) parameters. As an example on how to parse the fields, see * {@link BasicTSDB}. *

* Timestamps *

* Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS}, * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the * {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)} * and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the * {@code values} map encoded in a {@link NumericByteIterator} with the key defined * in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp * when iterating over the values map, cast the {@link ByteIterator} to a * {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}. *

* Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps * earlier than the timestamp generator's timestamp will be choosen at random to * mimic a lambda architecture or old job re-reporting some data. *

* For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and * {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps * are encoded in a {@link StringByteIterator} in a key/value format with the * {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan} * has been set to a positive value then the value will include a range with the * starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator * and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}. *

* For calls to {@link DB#delete(String, String)}, encoding is the same as reads and * scans but key/value pairs are separated by the {@code deletedelimiter} property value. *

* By default, the starting timestamp is the current system time without any rounding. * All timestamps are then offsets from that starting value. *

* Values *

* Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored * in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV"). * Values can either be 64 bit signed {@link long}s or double precision {@link double}s * depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing * out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine * whether or not to call {@link NumericByteIterator#getDouble()} (true) or * {@link NumericByteIterator#getLong()} (false). *

* When {@code dataintegrity} is set to true, then the value is always set to a * 64 bit signed integer which is the Java hash code of the concatenation of the * key and map of values (sorted on the map keys and skipping the timestamp and value * entries) OR'd with the timestamp of the data point. See * {@link #validationFunction(String, long, TreeMap)} for the implementation. *

* Keys and Tags *

* As mentioned, the workload generates strings for the keys and tags. On initialization * three string generators are created using the {@link IncrementingPrintableStringGenerator} * implementation. Then the generators fill three arrays with values based on the * number of keys, the number of tags and the cardinality of each tag key/value pair. * This implementation gives us time series like the example table where every string * starts at something like "AA" (depending on the length of keys, tag keys and tag values) * and continuing to "ZZ" wherein they rollover back to "AA". *

* Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear * more than once per time series. If the workload is configured for four tags with a * tag key length of 2, the keys would be "AA", "AB", "AC" and "AD". *

* Each tag key is then associated with a tag value. Tag values may appear more than once * in each time series. E.g. time series will usually start with the tags "AA=AA", * "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many * unique values will be generated per tag key. In the example table above, the * {@code tagcardinality} property would have been set to {@code 1,2} meaning tag * key "AA" would always have the tag value "AA" given a cardinality of 1. However * tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This * cardinality map, along with the number of unique time series keys determines how * many unique time series are generated for the workload. Tag values share a common * array of generated strings to save on memory. *

* Operation Order *

* The default behavior of the workload (for inserts and updates) is to generate a * value for each time series for a given timestamp before incrementing to the next * timestamp and writing values. This is an ideal workload and some time series * databases are designed for this behavior. However in the real-world events will * arrive grouped close to the current system time with a number of events being * delayed, hence their timestamps are further in the past. The {@code delayedseries} * property determines the percentage of time series that are delayed by up to * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that * 5% of the time series will be written with timestamps earlier than the timestamp * generator's current time. *

* Reads and Scans *

* For benchmarking queries, some common tasks implemented by almost every time series * data base are available and are passed in the fields {@link Set}: *

* GroupBy - A common operation is to aggregate multiple time series into a * single time series via common parameters. For example, a user may want to see the * total network traffic in a data center so they'll issue a SQL query like: * SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value); * If the {@code groupbyfunction} has been set to a group by function, then the fields * will contain a key/value pair with the key set in {@code groupbykey}. E.g. * {@code YCSBGB=SUM}. *

* Additionally with grouping enabled, fields on tag keys where group bys should * occur will only have the key defined and will not have a value or delimiter. E.g. * if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}. *

* Downsampling - Another common operation is to reduce the resolution of the * queried time series when fetching a wide time range of data so fewer data points * are returned. For example, a user may fetch a week of data but if the data is * recorded on a 1 second interval, that would be over 600k data points so they * may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all * of the data points for a "bucket" are aggregated into a single value. *

* To enable downsampling, the {@code downsamplingfunction} property must be set to * a supported function such as "SUM" and the {@code downsamplinginterval} must be * set to a valid time interval with the same units as {@code timestampunits}, e.g. * "3600" which would create 1 hour buckets if the time units were set to seconds. * With downsampling, query fields will include a key/value pair with * {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being * a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval}, * for example {@code YCSBDS=SUM60}. *

* Timestamps - For every read, a random timestamp is selected from the interval * set. If {@code querytimespan} has been set to a positive value, then the configured * query time interval is added to the selected timestamp so the read passes the DB * a range of times. Note that during the run phase, if no data was previously loaded, * or if there are more {@code recordcount}s set for the run phase, reads may be sent * to the DB with timestamps that are beyond the written data time range (or even the * system clock of the DB). *

* Deletes *

* Because the delete API only accepts a single key, a full key and tag key/value * pair map is flattened into a single string for parsing by the database. Common * workloads include deleting a single time series (wherein all tag key and values are * defined), deleting all series containing a tag key and value or deleting all of the * time series sharing a common time series key. *

* Right now the workload supports deletes with a key and for time series tag key/value * pairs or a key with tags and a group by on one or more tags (meaning, delete all of * the series with any value for the given tag key). The parameters are collapsed into * a single string delimited with the character in the {@code deletedelimiter} property. * For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the * first time series in the table above. *

* Threads *

* For a multi-threaded execution, the number of time series keys set via the * {@code fieldcount} property, must be greater than or equal to the number of * threads set via {@code threads}. This is due to each thread choosing a subset * of the total number of time series keys and being responsible for writing values * for each time series containing those keys at each timestamp. Thus each thread * will have it's own timestamp generator, incrementing each time every time series * it is responsible for has had a value written. *

* Each thread may, however, issue reads and scans for any time series in the * complete set. *

* Sparsity *

* By default, during loads, every time series will have a data point written at every * time stamp in the interval set. This is common in workloads where a sensor writes * a value at regular intervals. However some time series are only reported under * certain conditions. *

* For example, a counter may track the number of errors over a * time period for a web service and only report when the value is greater than 1. * Or a time series may include tags such as a user ID and IP address when a request * arrives at the web service and only report values when that combination is seen. * This means the timeseries will not have a value at every timestamp and in * some cases there may be only a single value! *

* This workload has a {@code sparsity} parameter that can choose how often a * time series should record a value. The default value of 0.0 means every series * will get a value at every timestamp. A value of 0.95 will mean that for each * series, only 5% of the timestamps in the interval will have a value. The distribution * of values is random. *

* Notes/Warnings *

*

*

* TODOs *

*

*/ public class TimeSeriesWorkload extends Workload { /** * The types of values written to the timeseries store. */ public enum ValueType { INTEGERS("integers"), FLOATS("floats"), MIXED("mixednumbers"); protected final String name; ValueType(final String name) { this.name = name; } public static ValueType fromString(final String name) { for (final ValueType type : ValueType.values()) { if (type.name.equalsIgnoreCase(name)) { return type; } } throw new IllegalArgumentException("Unrecognized type: " + name); } } /** Name and default value for the timestamp key property. */ public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey"; public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS"; /** Name and default value for the value key property. */ public static final String VALUE_KEY_PROPERTY = "valuekey"; public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV"; /** Name and default value for the timestamp interval property. */ public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval"; public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60"; /** Name and default value for the timestamp units property. */ public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits"; public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS"; /** Name and default value for the number of tags property. */ public static final String TAG_COUNT_PROPERTY = "tagcount"; public static final String TAG_COUNT_PROPERTY_DEFAULT = "4"; /** Name and default value for the tag value cardinality map property. */ public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality"; public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8"; /** Name and default value for the tag key length property. */ public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength"; public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8"; /** Name and default value for the tag value length property. */ public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength"; public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8"; /** Name and default value for the tag pair delimiter property. */ public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter"; public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "="; /** Name and default value for the delete string delimiter property. */ public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter"; public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":"; /** Name and default value for the random timestamp write order property. */ public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder"; public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false"; /** Name and default value for the random time series write order property. */ public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder"; public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true"; /** Name and default value for the value types property. */ public static final String VALUE_TYPE_PROPERTY = "valuetype"; public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats"; /** Name and default value for the sparsity property. */ public static final String SPARSITY_PROPERTY = "sparsity"; public static final String SPARSITY_PROPERTY_DEFAULT = "0.00"; /** Name and default value for the delayed series percentage property. */ public static final String DELAYED_SERIES_PROPERTY = "delayedseries"; public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10"; /** Name and default value for the delayed series intervals property. */ public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals"; public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5"; /** Name and default value for the query time span property. */ public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan"; public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0"; /** Name and default value for the randomized query time span property. */ public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan"; public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false"; /** Name and default value for the query time stamp delimiter property. */ public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter"; public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ","; /** Name and default value for the group-by key property. */ public static final String GROUPBY_KEY_PROPERTY = "groupbykey"; public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB"; /** Name and default value for the group-by function property. */ public static final String GROUPBY_PROPERTY = "groupbyfunction"; /** Name and default value for the group-by key map property. */ public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys"; /** Name and default value for the downsampling key property. */ public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey"; public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS"; /** Name and default value for the downsampling function property. */ public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction"; /** Name and default value for the downsampling interval property. */ public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval"; /** The properties to pull settings from. */ protected Properties properties; /** Generators for keys, tag keys and tag values. */ protected Generator keyGenerator; protected Generator tagKeyGenerator; protected Generator tagValueGenerator; /** The timestamp key, defaults to "YCSBTS". */ protected String timestampKey; /** The value key, defaults to "YCSBDS". */ protected String valueKey; /** The number of time units in between timestamps. */ protected int timestampInterval; /** The units of time the timestamp and various intervals represent. */ protected TimeUnit timeUnits; /** Whether or not to randomize the timestamp order when writing. */ protected boolean randomizeTimestampOrder; /** Whether or not to randomize (shuffle) the time series order. NOT compatible * with data integrity. */ protected boolean randomizeTimeseriesOrder; /** The type of values to generate when writing data. */ protected ValueType valueType; /** Used to calculate an offset for each time series. */ protected int[] cumulativeCardinality; /** The calculated total cardinality based on the config. */ protected int totalCardinality; /** The calculated per-time-series-key cardinality. I.e. the number of unique * tag key and value combinations. */ protected int perKeyCardinality; /** How much data to scan for in each call. */ protected NumberGenerator scanlength; /** A generator used to select a random time series key per read/scan. */ protected NumberGenerator keychooser; /** A generator to select what operation to perform during the run phase. */ protected DiscreteGenerator operationchooser; /** The maximum number of interval offsets from the starting timestamp. Calculated * based on the number of records configured for the run. */ protected int maxOffsets; /** The number of records or operations to perform for this run. */ protected int recordcount; /** The number of tag pairs per time series. */ protected int tagPairs; /** The table we'll write to. */ protected String table; /** How many time series keys will be generated. */ protected int numKeys; /** The generated list of possible time series key values. */ protected String[] keys; /** The generated list of possible tag key values. */ protected String[] tagKeys; /** The generated list of possible tag value values. */ protected String[] tagValues; /** The cardinality for each tag key. */ protected int[] tagCardinality; /** A helper to skip non-incrementing tag values. */ protected int firstIncrementableCardinality; /** How sparse the data written should be. */ protected double sparsity; /** The percentage of time series that should be delayed in writes. */ protected double delayedSeries; /** The maximum number of intervals to delay a series. */ protected int delayedIntervals; /** Optional query time interval during reads/scans. */ protected int queryTimeSpan; /** Whether or not the actual interval should be randomly chosen, using * queryTimeSpan as the maximum value. */ protected boolean queryRandomTimeSpan; /** The delimiter for tag pairs in fields. */ protected String tagPairDelimiter; /** The delimiter between parameters for the delete key. */ protected String deleteDelimiter; /** The delimiter between timestamps for query time spans. */ protected String queryTimeSpanDelimiter; /** Whether or not to issue group-by queries. */ protected boolean groupBy; /** The key used for group-by tag keys. */ protected String groupByKey; /** The function used for group-by's. */ protected String groupByFunction; /** The tag keys to group on. */ protected boolean[] groupBys; /** Whether or not to issue downsampling queries. */ protected boolean downsample; /** The key used for downsampling tag keys. */ protected String downsampleKey; /** The downsampling function. */ protected String downsampleFunction; /** The downsampling interval. */ protected int downsampleInterval; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ protected boolean dataintegrity; /** Measurements to write data integrity results to. */ protected Measurements measurements = Measurements.getMeasurements(); @Override public void init(final Properties p) throws WorkloadException { properties = p; recordcount = Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { recordcount = Integer.MAX_VALUE; } timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT); valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT); operationchooser = CoreWorkload.createOperationGenerator(properties); final int maxscanlength = Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY, CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib = p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY, CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); if (scanlengthdistrib.compareTo("uniform") == 0) { scanlength = new UniformLongGenerator(1, maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian") == 0) { scanlength = new ZipfianGenerator(1, maxscanlength); } else { throw new WorkloadException( "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); } randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty( RANDOMIZE_TIMESTAMP_ORDER_PROPERTY, RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT)); randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty( RANDOMIZE_TIMESERIES_ORDER_PROPERTY, RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT)); // setup the cardinality numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, TAG_COUNT_PROPERTY_DEFAULT)); sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT)); tagCardinality = new int[tagPairs]; final String requestdistrib = p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY, CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); if (requestdistrib.compareTo("uniform") == 0) { keychooser = new UniformLongGenerator(0, numKeys - 1); } else if (requestdistrib.compareTo("sequential") == 0) { keychooser = new SequentialGenerator(0, numKeys - 1); } else if (requestdistrib.compareTo("zipfian") == 0) { keychooser = new ScrambledZipfianGenerator(0, numKeys - 1); //} else if (requestdistrib.compareTo("latest") == 0) { // keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION, CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION, CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(0, numKeys - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); } // figure out the start timestamp based on the units, cardinality and interval try { timestampInterval = Integer.parseInt(p.getProperty( TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT)); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse the " + TIMESTAMP_INTERVAL_PROPERTY, nfe); } try { timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY, TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase()); } catch (IllegalArgumentException e) { throw new WorkloadException("Unknown time unit type", e); } if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) { throw new WorkloadException("YCSB doesn't support " + timeUnits + " at this time."); } tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT); deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT); dataintegrity = Boolean.parseBoolean( p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT)); queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, QUERY_TIMESPAN_PROPERTY_DEFAULT)); queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT)); queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT); groupByFunction = p.getProperty(GROUPBY_PROPERTY); if (groupByFunction != null && !groupByFunction.isEmpty()) { final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY); if (groupByKeys == null || groupByKeys.isEmpty()) { throw new WorkloadException("Group by was enabled but no keys were specified."); } final String[] gbKeys = groupByKeys.split(","); if (gbKeys.length != tagKeys.length) { throw new WorkloadException("Only " + gbKeys.length + " group by keys " + "were specified but there were " + tagKeys.length + " tag keys given."); } groupBys = new boolean[gbKeys.length]; for (int i = 0; i < gbKeys.length; i++) { groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true; } groupBy = true; } downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT); downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY); if (downsampleFunction != null && !downsampleFunction.isEmpty()) { final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY); if (interval == null || interval.isEmpty()) { throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '" + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set."); } downsampleInterval = Integer.parseInt(interval); downsample = true; } delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT)); delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT)); valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT)); table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); initKeysAndTags(); validateSettings(); } @Override public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { if (properties == null) { throw new WorkloadException("Workload has not been initialized."); } return new ThreadState(mythreadid, threadcount); } @Override public boolean doInsert(DB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } final Map tags = new TreeMap(); final String key = ((ThreadState)threadstate).nextDataPoint(tags, true); if (db.insert(table, key, tags) == Status.OK) { return true; } return false; } @Override public boolean doTransaction(DB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } switch (operationchooser.nextString()) { case "READ": doTransactionRead(db, threadstate); break; case "UPDATE": doTransactionUpdate(db, threadstate); break; case "INSERT": doTransactionInsert(db, threadstate); break; case "SCAN": doTransactionScan(db, threadstate); break; case "DELETE": doTransactionDelete(db, threadstate); break; default: return false; } return true; } protected void doTransactionRead(final DB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final String keyname = keys[keychooser.nextValue().intValue()]; int offsets = state.queryOffsetGenerator.nextValue().intValue(); //int offsets = Utils.random().nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags Set fields = new HashSet(); for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { fields.add(tagKeys[i]); } else { fields.add(tagKeys[i] + tagPairDelimiter + tagValues[Utils.random().nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { fields.add(timestampKey + tagPairDelimiter + startTimestamp); } if (groupBy) { fields.add(groupByKey + tagPairDelimiter + groupByFunction); } if (downsample) { fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval); } final Map cells = new HashMap(); final Status status = db.read(table, keyname, fields, cells); if (dataintegrity && status == Status.OK) { verifyRow(keyname, cells); } } protected void doTransactionUpdate(final DB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } final Map tags = new TreeMap(); final String key = ((ThreadState)threadstate).nextDataPoint(tags, false); db.update(table, key, tags); } protected void doTransactionInsert(final DB db, Object threadstate) { doInsert(db, threadstate); } protected void doTransactionScan(final DB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final String keyname = keys[Utils.random().nextInt(keys.length)]; // choose a random scan length int len = scanlength.nextValue().intValue(); int offsets = Utils.random().nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags Set fields = new HashSet(); for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { fields.add(tagKeys[i]); } else { fields.add(tagKeys[i] + tagPairDelimiter + tagValues[Utils.random().nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { fields.add(timestampKey + tagPairDelimiter + startTimestamp); } if (groupBy) { fields.add(groupByKey + tagPairDelimiter + groupByFunction); } if (downsample) { fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval); } final Vector> results = new Vector>(); db.scan(table, keyname, len, fields, results); } protected void doTransactionDelete(final DB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final StringBuilder buf = new StringBuilder().append(keys[Utils.random().nextInt(keys.length)]); int offsets = Utils.random().nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { buf.append(deleteDelimiter) .append(tagKeys[i]); } else { buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter + tagValues[Utils.random().nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } buf.append(deleteDelimiter) .append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { buf.append(deleteDelimiter) .append(timestampKey + tagPairDelimiter + startTimestamp); } db.delete(table, buf.toString()); } /** * Parses the values returned by a read or scan operation and determines whether * or not the integer value matches the hash and timestamp of the original timestamp. * Only works for raw data points, will not work for group-by's or downsampled data. * @param key The time series key. * @param cells The cells read by the DB. * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if * the data did not match. */ protected Status verifyRow(final String key, final Map cells) { Status verifyStatus = Status.UNEXPECTED_STATE; long startTime = System.nanoTime(); double value = 0; long timestamp = 0; final TreeMap validationTags = new TreeMap(); for (final Entry entry : cells.entrySet()) { if (entry.getKey().equals(timestampKey)) { final NumericByteIterator it = (NumericByteIterator) entry.getValue(); timestamp = it.getLong(); } else if (entry.getKey().equals(valueKey)) { final NumericByteIterator it = (NumericByteIterator) entry.getValue(); value = it.isFloatingPoint() ? it.getDouble() : it.getLong(); } else { validationTags.put(entry.getKey(), entry.getValue().toString()); } } if (validationFunction(key, timestamp, validationTags) == value) { verifyStatus = Status.OK; } long endTime = System.nanoTime(); measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); measurements.reportStatus("VERIFY", verifyStatus); return verifyStatus; } /** * Function used for generating a deterministic hash based on the combination * of metric, tags and timestamp. * @param key A non-null string representing the key. * @param timestamp A timestamp in the proper units for the workload. * @param tags A non-null map of tag keys and values NOT including the YCSB * key or timestamp. * @return A hash value as an 8 byte integer. */ protected long validationFunction(final String key, final long timestamp, final TreeMap tags) { final StringBuilder validationBuffer = new StringBuilder(keys[0].length() + (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1])); for (final Entry pair : tags.entrySet()) { validationBuffer.append(pair.getKey()).append(pair.getValue()); } return (long) validationBuffer.toString().hashCode() ^ timestamp; } /** * Breaks out the keys, tags and cardinality initialization in another method * to keep CheckStyle happy. * @throws WorkloadException If something goes pear shaped. */ protected void initKeysAndTags() throws WorkloadException { final int keyLength = Integer.parseInt(properties.getProperty( CoreWorkload.FIELD_LENGTH_PROPERTY, CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT)); final int tagKeyLength = Integer.parseInt(properties.getProperty( TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT)); final int tagValueLength = Integer.parseInt(properties.getProperty( TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT)); keyGenerator = new IncrementingPrintableStringGenerator(keyLength); tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength); tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength); final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1")); final String tagCardinalityString = properties.getProperty( TAG_CARDINALITY_PROPERTY, TAG_CARDINALITY_PROPERTY_DEFAULT); final String[] tagCardinalityParts = tagCardinalityString.split(","); int idx = 0; totalCardinality = numKeys; perKeyCardinality = 1; int maxCardinality = 0; for (final String card : tagCardinalityParts) { try { tagCardinality[idx] = Integer.parseInt(card.trim()); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse cardinality: " + card, nfe); } if (tagCardinality[idx] < 1) { throw new WorkloadException("Cardinality must be greater than zero: " + tagCardinality[idx]); } totalCardinality *= tagCardinality[idx]; perKeyCardinality *= tagCardinality[idx]; if (tagCardinality[idx] > maxCardinality) { maxCardinality = tagCardinality[idx]; } ++idx; if (idx >= tagPairs) { // we have more cardinalities than tag keys so bail at this point. break; } } if (numKeys < threads) { throw new WorkloadException("Field count " + numKeys + " (keys for time " + "series workloads) must be greater or equal to the number of " + "threads " + threads); } // fill tags without explicit cardinality with 1 if (idx < tagPairs) { tagCardinality[idx++] = 1; } for (int i = 0; i < tagCardinality.length; ++i) { if (tagCardinality[i] > 1) { firstIncrementableCardinality = i; break; } } keys = new String[numKeys]; tagKeys = new String[tagPairs]; tagValues = new String[maxCardinality]; for (int i = 0; i < numKeys; ++i) { keys[i] = keyGenerator.nextString(); } for (int i = 0; i < tagPairs; ++i) { tagKeys[i] = tagKeyGenerator.nextString(); } for (int i = 0; i < maxCardinality; i++) { tagValues[i] = tagValueGenerator.nextString(); } if (randomizeTimeseriesOrder) { Utils.shuffleArray(keys); Utils.shuffleArray(tagValues); } maxOffsets = (recordcount / totalCardinality) + 1; final int[] keyAndTagCardinality = new int[tagPairs + 1]; keyAndTagCardinality[0] = numKeys; for (int i = 0; i < tagPairs; i++) { keyAndTagCardinality[i + 1] = tagCardinality[i]; } cumulativeCardinality = new int[keyAndTagCardinality.length]; for (int i = 0; i < keyAndTagCardinality.length; i++) { int cumulation = 1; for (int x = i; x <= keyAndTagCardinality.length - 1; x++) { cumulation *= keyAndTagCardinality[x]; } if (i > 0) { cumulativeCardinality[i - 1] = cumulation; } } cumulativeCardinality[cumulativeCardinality.length - 1] = 1; } /** * Makes sure the settings as given are compatible. * @throws WorkloadException If one or more settings were invalid. */ protected void validateSettings() throws WorkloadException { if (dataintegrity) { if (valueType != ValueType.INTEGERS) { throw new WorkloadException("Data integrity was enabled. 'valuetype' must " + "be set to 'integers'."); } if (groupBy) { throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must " + "be empty or null."); } if (downsample) { throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must " + "be empty or null."); } if (queryTimeSpan > 0) { throw new WorkloadException("Data integrity was enabled. 'querytimespan' must " + "be empty or 0."); } if (randomizeTimeseriesOrder) { throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must " + "be false."); } final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); if (startTimestamp == null || startTimestamp.isEmpty()) { throw new WorkloadException("Data integrity was enabled. 'insertstart' must " + "be set to a Unix Epoch timestamp."); } } } /** * Thread state class holding thread local generators and indices. */ protected class ThreadState { /** The timestamp generator for this thread. */ protected final UnixEpochTimestampGenerator timestampGenerator; /** An offset generator to select a random offset for queries. */ protected final NumberGenerator queryOffsetGenerator; /** The current write key index. */ protected int keyIdx; /** The starting fence for writing keys. */ protected int keyIdxStart; /** The ending fence for writing keys. */ protected int keyIdxEnd; /** Indices for each tag value for writes. */ protected int[] tagValueIdxs; /** Whether or not all time series have written values for the current timestamp. */ protected boolean rollover; /** The starting timestamp. */ protected long startTimestamp; /** * Default ctor. * @param threadID The zero based thread ID. * @param threadCount The total number of threads. * @throws WorkloadException If something went pear shaped. */ protected ThreadState(final int threadID, final int threadCount) throws WorkloadException { int totalThreads = threadCount > 0 ? threadCount : 1; if (threadID >= totalThreads) { throw new IllegalStateException("Thread ID " + threadID + " cannot be greater " + "than or equal than the thread count " + totalThreads); } if (keys.length < threadCount) { throw new WorkloadException("Thread count " + totalThreads + " must be greater " + "than or equal to key count " + keys.length); } int keysPerThread = keys.length / totalThreads; keyIdx = keysPerThread * threadID; keyIdxStart = keyIdx; if (totalThreads - 1 == threadID) { keyIdxEnd = keys.length; } else { keyIdxEnd = keyIdxStart + keysPerThread; } tagValueIdxs = new int[tagPairs]; // all zeros final String startingTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); if (startingTimestamp == null || startingTimestamp.isEmpty()) { timestampGenerator = randomizeTimestampOrder ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) : new UnixEpochTimestampGenerator(timestampInterval, timeUnits); } else { try { timestampGenerator = randomizeTimestampOrder ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, Long.parseLong(startingTimestamp), maxOffsets) : new UnixEpochTimestampGenerator(timestampInterval, timeUnits, Long.parseLong(startingTimestamp)); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse the " + CoreWorkload.INSERT_START_PROPERTY, nfe); } } // Set the last value properly for the timestamp, otherwise it may start // one interval ago. startTimestamp = timestampGenerator.nextValue(); // TODO - pick it queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2); } /** * Generates the next write value for thread. * @param map An initialized map to populate with tag keys and values as well * as the timestamp and actual value. * @param isInsert Whether or not it's an insert or an update. Updates will pick * an older timestamp (if random isn't enabled). * @return The next key to write. */ protected String nextDataPoint(final Map map, final boolean isInsert) { int iterations = sparsity <= 0 ? 1 : Utils.random().nextInt((int) ((double) perKeyCardinality * sparsity)); if (iterations < 1) { iterations = 1; } while (true) { iterations--; if (rollover) { timestampGenerator.nextValue(); rollover = false; } String key = null; if (iterations <= 0) { final TreeMap validationTags; if (dataintegrity) { validationTags = new TreeMap(); } else { validationTags = null; } key = keys[keyIdx]; int overallIdx = keyIdx * cumulativeCardinality[0]; for (int i = 0; i < tagPairs; ++i) { int tvidx = tagValueIdxs[i]; map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx])); if (dataintegrity) { validationTags.put(tagKeys[i], tagValues[tvidx]); } if (delayedSeries > 0) { overallIdx += (tvidx * cumulativeCardinality[i + 1]); } } if (!isInsert) { final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval; final int intervals = Utils.random().nextInt((int) delta); map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval))); } else if (delayedSeries > 0) { // See if the series falls in a delay bucket and calculate an offset earlier // than the current timestamp value if so. double pct = (double) overallIdx / (double) totalCardinality; if (pct < delayedSeries) { int modulo = overallIdx % delayedIntervals; if (modulo < 0) { modulo *= -1; } map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() - timestampInterval * modulo)); } else { map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); } } else { map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); } if (dataintegrity) { map.put(valueKey, new NumericByteIterator(validationFunction(key, timestampGenerator.currentValue(), validationTags))); } else { switch (valueType) { case INTEGERS: map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); break; case FLOATS: map.put(valueKey, new NumericByteIterator( Utils.random().nextDouble() * (double) 100000)); break; case MIXED: if (Utils.random().nextBoolean()) { map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); } else { map.put(valueKey, new NumericByteIterator( Utils.random().nextDouble() * (double) 100000)); } break; default: throw new IllegalStateException("Somehow we didn't have a value " + "type configured that we support: " + valueType); } } } boolean tagRollover = false; for (int i = tagCardinality.length - 1; i >= 0; --i) { if (tagCardinality[i] <= 1) { - // nothing to increment here + tagRollover = true; // Only one tag so needs roll over. continue; } if (tagValueIdxs[i] + 1 >= tagCardinality[i]) { tagValueIdxs[i] = 0; if (i == firstIncrementableCardinality) { tagRollover = true; } } else { ++tagValueIdxs[i]; break; } } if (tagRollover) { if (keyIdx + 1 >= keyIdxEnd) { keyIdx = keyIdxStart; rollover = true; } else { ++keyIdx; } } if (iterations <= 0) { return key; } } } } } \ No newline at end of file diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java index 55acc1f6..409331ac 100644 --- a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java +++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java @@ -1,550 +1,577 @@ /** * Copyright (c) 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.TreeMap; import java.util.Vector; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Client; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.NumericByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.Utils; import com.yahoo.ycsb.WorkloadException; import com.yahoo.ycsb.measurements.Measurements; import org.testng.annotations.Test; public class TestTimeSeriesWorkload { @Test public void twoThreads() throws Exception { final Properties p = getUTProperties(); Measurements.setProperties(p); final TimeSeriesWorkload wl = new TimeSeriesWorkload(); wl.init(p); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAB"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test (expectedExceptions = WorkloadException.class) public void badTimeUnit() throws Exception { final Properties p = new Properties(); p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar"); getWorkload(p, true); } @Test (expectedExceptions = WorkloadException.class) public void failedToInitWorkloadBeforeThreadInit() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, false); //wl.init(p); // <-- we NEED this :( final Object threadState = wl.initThread(p, 0, 2); final MockDB db = new MockDB(); wl.doInsert(db, threadState); } @Test (expectedExceptions = IllegalStateException.class) public void failedToInitThread() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final MockDB db = new MockDB(); wl.doInsert(db, null); } + + @Test + public void insertOneKeyOneTagCardinalityOne() throws Exception { + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); + p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "1"); + p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1"); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + timestamp += 60; + } + } @Test public void insertOneKeyTwoTagsLowCardinality() throws Exception { final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void insertTwoKeysTwoTagsLowCardinality() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; int metricCtr = 0; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); } if (metricCtr++ > 1) { assertEquals(db.keys.get(i), "AAAB"); if (metricCtr >= 4) { metricCtr = 0; timestamp += 60; } } else { assertEquals(db.keys.get(i), "AAAA"); } } } @Test public void insertTwoKeysTwoThreads() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); // <-- key 1 assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAB"); // <-- key 2 assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void insertThreeKeysTwoThreads() throws Exception { // To make sure the distribution doesn't miss any metrics final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3"); final TimeSeriesWorkload wl = getWorkload(p, true); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } timestamp = 1451606400; int metricCtr = 0; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); } if (metricCtr++ > 1) { assertEquals(db.keys.get(i), "AAAC"); if (metricCtr >= 4) { metricCtr = 0; timestamp += 60; } } else { assertEquals(db.keys.get(i), "AAAB"); } } } @Test public void insertWithValidation() throws Exception { final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertFalse(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); // validation check final TreeMap validationTags = new TreeMap(); for (final Entry entry : db.values.get(i).entrySet()) { if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) || entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) { continue; } validationTags.put(entry.getKey(), entry.getValue().toString()); } assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags), ((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void read() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 20; i++) { wl.doTransactionRead(db, threadState); } } @Test public void verifyRow() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final TreeMap validationTags = new TreeMap(); final HashMap cells = new HashMap(); validationTags.put("AA", "AAAA"); cells.put("AA", new StringByteIterator("AAAA")); validationTags.put("AB", "AAAB"); cells.put("AB", new StringByteIterator("AAAB")); long hash = wl.validationFunction("AAAA", 1451606400L, validationTags); cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L)); cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash)); assertEquals(wl.verifyRow("AAAA", cells), Status.OK); // tweak the last value a bit for (final ByteIterator it : cells.values()) { it.reset(); } cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1)); assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); // no value cell, returns an unexpected state for (final ByteIterator it : cells.values()) { it.reset(); } cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); } @Test public void validateSettingsDataIntegrity() throws Exception { Properties p = getUTProperties(); // data validation incompatibilities p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum"); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p = getUTProperties(); p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, ""); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } } /** Helper method that generates unit testing defaults for the properties map */ private Properties getUTProperties() { final Properties p = new Properties(); p.put(Client.RECORD_COUNT_PROPERTY, "10"); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2"); p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4"); p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2"); p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4"); p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2"); p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2"); p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400"); p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0"); p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); return p; } /** Helper to setup the workload for testing. */ private TimeSeriesWorkload getWorkload(final Properties p, final boolean init) throws WorkloadException { Measurements.setProperties(p); if (!init) { return new TimeSeriesWorkload(); } else { final TimeSeriesWorkload workload = new TimeSeriesWorkload(); workload.init(p); return workload; } } static class MockDB extends DB { final List keys = new ArrayList(); final List> values = new ArrayList>(); @Override public Status read(String table, String key, Set fields, Map result) { return Status.OK; } @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { // TODO Auto-generated method stub return Status.OK; } @Override public Status update(String table, String key, Map values) { // TODO Auto-generated method stub return Status.OK; } @Override public Status insert(String table, String key, Map values) { keys.add(key); this.values.add(values); return Status.OK; } @Override public Status delete(String table, String key) { // TODO Auto-generated method stub return Status.OK; } public void dumpStdout() { for (int i = 0; i < keys.size(); i++) { System.out.print("[" + i + "] Key: " + keys.get(i) + " Values: {"); int x = 0; for (final Entry entry : values.get(i).entrySet()) { if (x++ > 0) { System.out.print(", "); } System.out.print("{" + entry.getKey() + " => "); if (entry.getKey().equals("YCSBV")) { System.out.print(new String(Utils.bytesToDouble(entry.getValue().toArray()) + "}")); } else if (entry.getKey().equals("YCSBTS")) { System.out.print(new String(Utils.bytesToLong(entry.getValue().toArray()) + "}")); } else { System.out.print(new String(entry.getValue().toArray()) + "}"); } } System.out.println("}"); } } } } \ No newline at end of file