diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
index 3a637eeb..5d0eabd0 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
@@ -1,1286 +1,1286 @@
/**
* Copyright (c) 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Client;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.NumericByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import com.yahoo.ycsb.Workload;
import com.yahoo.ycsb.WorkloadException;
import com.yahoo.ycsb.generator.DiscreteGenerator;
import com.yahoo.ycsb.generator.Generator;
import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;
import com.yahoo.ycsb.generator.SequentialGenerator;
import com.yahoo.ycsb.generator.UniformLongGenerator;
import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator;
import com.yahoo.ycsb.generator.ZipfianGenerator;
import com.yahoo.ycsb.measurements.Measurements;
/**
* A specialized workload dealing with time series data, i.e. series of discreet
* events associated with timestamps and identifiers. For this workload, identities
* consist of a {@link String} key and a set of {@link String} tag key/value
* pairs.
*
* For example:
*
* Time Series Key | Tag Keys/Values | 1483228800 | 1483228860 | 1483228920 |
* AA | AA=AA, AB=AA | 42.5 | 1.0 | 85.9 |
* AA | AA=AA, AB=AB | -9.4 | 76.9 | 0.18 |
* AB | AA=AA, AB=AA | -93.0 | 57.1 | -63.8 |
* AB | AA=AA, AB=AB | 7.6 | 56.1 | -0.3 |
*
*
* This table shows four time series with 3 measurements at three different timestamps.
* Keys, tags, timestamps and values (numeric only at this time) are generated by
* this workload. For details on properties and behavior, see the
* {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation
* and how {@link DB} clients can parse the workload.
*
* In order to avoid having existing DB implementations implement a brand new interface
* this workload uses the existing APIs to encode a few special values that can be parsed
* by the client. The special values include the timestamp, numeric value and some
* query (read or scan) parameters. As an example on how to parse the fields, see
* {@link BasicTSDB}.
*
* Timestamps
*
* Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS},
* {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the
* {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)}
* and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the
* {@code values} map encoded in a {@link NumericByteIterator} with the key defined
* in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp
* when iterating over the values map, cast the {@link ByteIterator} to a
* {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}.
*
* Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps
* earlier than the timestamp generator's timestamp will be choosen at random to
* mimic a lambda architecture or old job re-reporting some data.
*
* For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and
* {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps
* are encoded in a {@link StringByteIterator} in a key/value format with the
* {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan}
* has been set to a positive value then the value will include a range with the
* starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator
* and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}.
*
* For calls to {@link DB#delete(String, String)}, encoding is the same as reads and
* scans but key/value pairs are separated by the {@code deletedelimiter} property value.
*
* By default, the starting timestamp is the current system time without any rounding.
* All timestamps are then offsets from that starting value.
*
* Values
*
* Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored
* in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV").
* Values can either be 64 bit signed {@link long}s or double precision {@link double}s
* depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing
* out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine
* whether or not to call {@link NumericByteIterator#getDouble()} (true) or
* {@link NumericByteIterator#getLong()} (false).
*
* When {@code dataintegrity} is set to true, then the value is always set to a
* 64 bit signed integer which is the Java hash code of the concatenation of the
* key and map of values (sorted on the map keys and skipping the timestamp and value
* entries) OR'd with the timestamp of the data point. See
* {@link #validationFunction(String, long, TreeMap)} for the implementation.
*
* Keys and Tags
*
* As mentioned, the workload generates strings for the keys and tags. On initialization
* three string generators are created using the {@link IncrementingPrintableStringGenerator}
* implementation. Then the generators fill three arrays with values based on the
* number of keys, the number of tags and the cardinality of each tag key/value pair.
* This implementation gives us time series like the example table where every string
* starts at something like "AA" (depending on the length of keys, tag keys and tag values)
* and continuing to "ZZ" wherein they rollover back to "AA".
*
* Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear
* more than once per time series. If the workload is configured for four tags with a
* tag key length of 2, the keys would be "AA", "AB", "AC" and "AD".
*
* Each tag key is then associated with a tag value. Tag values may appear more than once
* in each time series. E.g. time series will usually start with the tags "AA=AA",
* "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many
* unique values will be generated per tag key. In the example table above, the
* {@code tagcardinality} property would have been set to {@code 1,2} meaning tag
* key "AA" would always have the tag value "AA" given a cardinality of 1. However
* tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This
* cardinality map, along with the number of unique time series keys determines how
* many unique time series are generated for the workload. Tag values share a common
* array of generated strings to save on memory.
*
* Operation Order
*
* The default behavior of the workload (for inserts and updates) is to generate a
* value for each time series for a given timestamp before incrementing to the next
* timestamp and writing values. This is an ideal workload and some time series
* databases are designed for this behavior. However in the real-world events will
* arrive grouped close to the current system time with a number of events being
* delayed, hence their timestamps are further in the past. The {@code delayedseries}
* property determines the percentage of time series that are delayed by up to
* {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that
* 5% of the time series will be written with timestamps earlier than the timestamp
* generator's current time.
*
* Reads and Scans
*
* For benchmarking queries, some common tasks implemented by almost every time series
* data base are available and are passed in the fields {@link Set}:
*
* GroupBy - A common operation is to aggregate multiple time series into a
* single time series via common parameters. For example, a user may want to see the
* total network traffic in a data center so they'll issue a SQL query like:
* SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value);
* If the {@code groupbyfunction} has been set to a group by function, then the fields
* will contain a key/value pair with the key set in {@code groupbykey}. E.g.
* {@code YCSBGB=SUM}.
*
* Additionally with grouping enabled, fields on tag keys where group bys should
* occur will only have the key defined and will not have a value or delimiter. E.g.
* if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}.
*
* Downsampling - Another common operation is to reduce the resolution of the
* queried time series when fetching a wide time range of data so fewer data points
* are returned. For example, a user may fetch a week of data but if the data is
* recorded on a 1 second interval, that would be over 600k data points so they
* may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all
* of the data points for a "bucket" are aggregated into a single value.
*
* To enable downsampling, the {@code downsamplingfunction} property must be set to
* a supported function such as "SUM" and the {@code downsamplinginterval} must be
* set to a valid time interval with the same units as {@code timestampunits}, e.g.
* "3600" which would create 1 hour buckets if the time units were set to seconds.
* With downsampling, query fields will include a key/value pair with
* {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being
* a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval},
* for example {@code YCSBDS=SUM60}.
*
* Timestamps - For every read, a random timestamp is selected from the interval
* set. If {@code querytimespan} has been set to a positive value, then the configured
* query time interval is added to the selected timestamp so the read passes the DB
* a range of times. Note that during the run phase, if no data was previously loaded,
* or if there are more {@code recordcount}s set for the run phase, reads may be sent
* to the DB with timestamps that are beyond the written data time range (or even the
* system clock of the DB).
*
* Deletes
*
* Because the delete API only accepts a single key, a full key and tag key/value
* pair map is flattened into a single string for parsing by the database. Common
* workloads include deleting a single time series (wherein all tag key and values are
* defined), deleting all series containing a tag key and value or deleting all of the
* time series sharing a common time series key.
*
* Right now the workload supports deletes with a key and for time series tag key/value
* pairs or a key with tags and a group by on one or more tags (meaning, delete all of
* the series with any value for the given tag key). The parameters are collapsed into
* a single string delimited with the character in the {@code deletedelimiter} property.
* For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the
* first time series in the table above.
*
* Threads
*
* For a multi-threaded execution, the number of time series keys set via the
* {@code fieldcount} property, must be greater than or equal to the number of
* threads set via {@code threads}. This is due to each thread choosing a subset
* of the total number of time series keys and being responsible for writing values
* for each time series containing those keys at each timestamp. Thus each thread
* will have it's own timestamp generator, incrementing each time every time series
* it is responsible for has had a value written.
*
* Each thread may, however, issue reads and scans for any time series in the
* complete set.
*
* Sparsity
*
* By default, during loads, every time series will have a data point written at every
* time stamp in the interval set. This is common in workloads where a sensor writes
* a value at regular intervals. However some time series are only reported under
* certain conditions.
*
* For example, a counter may track the number of errors over a
* time period for a web service and only report when the value is greater than 1.
* Or a time series may include tags such as a user ID and IP address when a request
* arrives at the web service and only report values when that combination is seen.
* This means the timeseries will not have a value at every timestamp and in
* some cases there may be only a single value!
*
* This workload has a {@code sparsity} parameter that can choose how often a
* time series should record a value. The default value of 0.0 means every series
* will get a value at every timestamp. A value of 0.95 will mean that for each
* series, only 5% of the timestamps in the interval will have a value. The distribution
* of values is random.
*
* Notes/Warnings
*
*
* - Because time series keys and tag key/values are generated and stored in memory,
* be careful of setting the cardinality too high for the JVM's heap.
* - When running for data integrity, a number of settings are incompatible and will
* throw errors. Check the error messages for details.
* - Databases that support keys only and can't store tags should order and then
* collapse the tag values using a delimiter. For example the series in the example
* table at the top could be written as:
*
* - {@code AA.AA.AA}
* - {@code AA.AA.AB}
* - {@code AB.AA.AA}
* - {@code AB.AA.AB}
*
*
*
* TODOs
*
*
* - Support random time intervals. E.g. some series write every second, others every
* 60 seconds.
* - Support random time series cardinality. Right now every series has the same
* cardinality.
* - Truly random timetamps per time series. We could use bitmaps to determine if
* a series has had a value written for a given timestamp. Right now all of the series
* are in sync time-wise.
* - Possibly a real-time load where values are written with the current system time.
* It's more of a bulk-loading operation now.
*
*/
public class TimeSeriesWorkload extends Workload {
/**
* The types of values written to the timeseries store.
*/
public enum ValueType {
INTEGERS("integers"),
FLOATS("floats"),
MIXED("mixednumbers");
protected final String name;
ValueType(final String name) {
this.name = name;
}
public static ValueType fromString(final String name) {
for (final ValueType type : ValueType.values()) {
if (type.name.equalsIgnoreCase(name)) {
return type;
}
}
throw new IllegalArgumentException("Unrecognized type: " + name);
}
}
/** Name and default value for the timestamp key property. */
public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey";
public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS";
/** Name and default value for the value key property. */
public static final String VALUE_KEY_PROPERTY = "valuekey";
public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV";
/** Name and default value for the timestamp interval property. */
public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval";
public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60";
/** Name and default value for the timestamp units property. */
public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits";
public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS";
/** Name and default value for the number of tags property. */
public static final String TAG_COUNT_PROPERTY = "tagcount";
public static final String TAG_COUNT_PROPERTY_DEFAULT = "4";
/** Name and default value for the tag value cardinality map property. */
public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality";
public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8";
/** Name and default value for the tag key length property. */
public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength";
public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8";
/** Name and default value for the tag value length property. */
public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength";
public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8";
/** Name and default value for the tag pair delimiter property. */
public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter";
public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "=";
/** Name and default value for the delete string delimiter property. */
public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter";
public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":";
/** Name and default value for the random timestamp write order property. */
public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder";
public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false";
/** Name and default value for the random time series write order property. */
public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder";
public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true";
/** Name and default value for the value types property. */
public static final String VALUE_TYPE_PROPERTY = "valuetype";
public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats";
/** Name and default value for the sparsity property. */
public static final String SPARSITY_PROPERTY = "sparsity";
public static final String SPARSITY_PROPERTY_DEFAULT = "0.00";
/** Name and default value for the delayed series percentage property. */
public static final String DELAYED_SERIES_PROPERTY = "delayedseries";
public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10";
/** Name and default value for the delayed series intervals property. */
public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals";
public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5";
/** Name and default value for the query time span property. */
public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan";
public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0";
/** Name and default value for the randomized query time span property. */
public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan";
public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false";
/** Name and default value for the query time stamp delimiter property. */
public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter";
public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ",";
/** Name and default value for the group-by key property. */
public static final String GROUPBY_KEY_PROPERTY = "groupbykey";
public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB";
/** Name and default value for the group-by function property. */
public static final String GROUPBY_PROPERTY = "groupbyfunction";
/** Name and default value for the group-by key map property. */
public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys";
/** Name and default value for the downsampling key property. */
public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey";
public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS";
/** Name and default value for the downsampling function property. */
public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction";
/** Name and default value for the downsampling interval property. */
public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval";
/** The properties to pull settings from. */
protected Properties properties;
/** Generators for keys, tag keys and tag values. */
protected Generator keyGenerator;
protected Generator tagKeyGenerator;
protected Generator tagValueGenerator;
/** The timestamp key, defaults to "YCSBTS". */
protected String timestampKey;
/** The value key, defaults to "YCSBDS". */
protected String valueKey;
/** The number of time units in between timestamps. */
protected int timestampInterval;
/** The units of time the timestamp and various intervals represent. */
protected TimeUnit timeUnits;
/** Whether or not to randomize the timestamp order when writing. */
protected boolean randomizeTimestampOrder;
/** Whether or not to randomize (shuffle) the time series order. NOT compatible
* with data integrity. */
protected boolean randomizeTimeseriesOrder;
/** The type of values to generate when writing data. */
protected ValueType valueType;
/** Used to calculate an offset for each time series. */
protected int[] cumulativeCardinality;
/** The calculated total cardinality based on the config. */
protected int totalCardinality;
/** The calculated per-time-series-key cardinality. I.e. the number of unique
* tag key and value combinations. */
protected int perKeyCardinality;
/** How much data to scan for in each call. */
protected NumberGenerator scanlength;
/** A generator used to select a random time series key per read/scan. */
protected NumberGenerator keychooser;
/** A generator to select what operation to perform during the run phase. */
protected DiscreteGenerator operationchooser;
/** The maximum number of interval offsets from the starting timestamp. Calculated
* based on the number of records configured for the run. */
protected int maxOffsets;
/** The number of records or operations to perform for this run. */
protected int recordcount;
/** The number of tag pairs per time series. */
protected int tagPairs;
/** The table we'll write to. */
protected String table;
/** How many time series keys will be generated. */
protected int numKeys;
/** The generated list of possible time series key values. */
protected String[] keys;
/** The generated list of possible tag key values. */
protected String[] tagKeys;
/** The generated list of possible tag value values. */
protected String[] tagValues;
/** The cardinality for each tag key. */
protected int[] tagCardinality;
/** A helper to skip non-incrementing tag values. */
protected int firstIncrementableCardinality;
/** How sparse the data written should be. */
protected double sparsity;
/** The percentage of time series that should be delayed in writes. */
protected double delayedSeries;
/** The maximum number of intervals to delay a series. */
protected int delayedIntervals;
/** Optional query time interval during reads/scans. */
protected int queryTimeSpan;
/** Whether or not the actual interval should be randomly chosen, using
* queryTimeSpan as the maximum value. */
protected boolean queryRandomTimeSpan;
/** The delimiter for tag pairs in fields. */
protected String tagPairDelimiter;
/** The delimiter between parameters for the delete key. */
protected String deleteDelimiter;
/** The delimiter between timestamps for query time spans. */
protected String queryTimeSpanDelimiter;
/** Whether or not to issue group-by queries. */
protected boolean groupBy;
/** The key used for group-by tag keys. */
protected String groupByKey;
/** The function used for group-by's. */
protected String groupByFunction;
/** The tag keys to group on. */
protected boolean[] groupBys;
/** Whether or not to issue downsampling queries. */
protected boolean downsample;
/** The key used for downsampling tag keys. */
protected String downsampleKey;
/** The downsampling function. */
protected String downsampleFunction;
/** The downsampling interval. */
protected int downsampleInterval;
/**
* Set to true if want to check correctness of reads. Must also
* be set to true during loading phase to function.
*/
protected boolean dataintegrity;
/** Measurements to write data integrity results to. */
protected Measurements measurements = Measurements.getMeasurements();
@Override
public void init(final Properties p) throws WorkloadException {
properties = p;
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY,
Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0) {
recordcount = Integer.MAX_VALUE;
}
timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT);
valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT);
operationchooser = CoreWorkload.createOperationGenerator(properties);
final int maxscanlength =
Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY,
CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY,
CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformLongGenerator(1, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(1, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty(
RANDOMIZE_TIMESTAMP_ORDER_PROPERTY,
RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT));
randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty(
RANDOMIZE_TIMESERIES_ORDER_PROPERTY,
RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT));
// setup the cardinality
numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY,
CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY,
TAG_COUNT_PROPERTY_DEFAULT));
sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT));
tagCardinality = new int[tagPairs];
final String requestdistrib =
p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY,
CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformLongGenerator(0, numKeys - 1);
} else if (requestdistrib.compareTo("sequential") == 0) {
keychooser = new SequentialGenerator(0, numKeys - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
keychooser = new ScrambledZipfianGenerator(0, numKeys - 1);
//} else if (requestdistrib.compareTo("latest") == 0) {
// keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION,
CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION,
CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, numKeys - 1,
hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
// figure out the start timestamp based on the units, cardinality and interval
try {
timestampInterval = Integer.parseInt(p.getProperty(
TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT));
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse the " +
TIMESTAMP_INTERVAL_PROPERTY, nfe);
}
try {
timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY,
TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase());
} catch (IllegalArgumentException e) {
throw new WorkloadException("Unknown time unit type", e);
}
if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) {
throw new WorkloadException("YCSB doesn't support " + timeUnits +
" at this time.");
}
tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT);
deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT);
dataintegrity = Boolean.parseBoolean(
p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY,
CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT));
queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY,
QUERY_TIMESPAN_PROPERTY_DEFAULT));
queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY,
QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT));
queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY,
QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT);
groupByFunction = p.getProperty(GROUPBY_PROPERTY);
if (groupByFunction != null && !groupByFunction.isEmpty()) {
final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY);
if (groupByKeys == null || groupByKeys.isEmpty()) {
throw new WorkloadException("Group by was enabled but no keys were specified.");
}
final String[] gbKeys = groupByKeys.split(",");
if (gbKeys.length != tagKeys.length) {
throw new WorkloadException("Only " + gbKeys.length + " group by keys "
+ "were specified but there were " + tagKeys.length + " tag keys given.");
}
groupBys = new boolean[gbKeys.length];
for (int i = 0; i < gbKeys.length; i++) {
groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true;
}
groupBy = true;
}
downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT);
downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY);
if (downsampleFunction != null && !downsampleFunction.isEmpty()) {
final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY);
if (interval == null || interval.isEmpty()) {
throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '"
+ DOWNSAMPLING_FUNCTION_PROPERTY + "' being set.");
}
downsampleInterval = Integer.parseInt(interval);
downsample = true;
}
delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT));
delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT));
valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT));
table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
initKeysAndTags();
validateSettings();
}
@Override
public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
if (properties == null) {
throw new WorkloadException("Workload has not been initialized.");
}
return new ThreadState(mythreadid, threadcount);
}
@Override
public boolean doInsert(DB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
final Map tags = new TreeMap();
final String key = ((ThreadState)threadstate).nextDataPoint(tags, true);
if (db.insert(table, key, tags) == Status.OK) {
return true;
}
return false;
}
@Override
public boolean doTransaction(DB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
switch (operationchooser.nextString()) {
case "READ":
doTransactionRead(db, threadstate);
break;
case "UPDATE":
doTransactionUpdate(db, threadstate);
break;
case "INSERT":
doTransactionInsert(db, threadstate);
break;
case "SCAN":
doTransactionScan(db, threadstate);
break;
case "DELETE":
doTransactionDelete(db, threadstate);
break;
default:
return false;
}
return true;
}
protected void doTransactionRead(final DB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final String keyname = keys[keychooser.nextValue().intValue()];
int offsets = state.queryOffsetGenerator.nextValue().intValue();
//int offsets = Utils.random().nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
Set fields = new HashSet();
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
fields.add(tagKeys[i]);
} else {
fields.add(tagKeys[i] + tagPairDelimiter +
tagValues[Utils.random().nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
fields.add(timestampKey + tagPairDelimiter + startTimestamp);
}
if (groupBy) {
fields.add(groupByKey + tagPairDelimiter + groupByFunction);
}
if (downsample) {
fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval);
}
final Map cells = new HashMap();
final Status status = db.read(table, keyname, fields, cells);
if (dataintegrity && status == Status.OK) {
verifyRow(keyname, cells);
}
}
protected void doTransactionUpdate(final DB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
final Map tags = new TreeMap();
final String key = ((ThreadState)threadstate).nextDataPoint(tags, false);
db.update(table, key, tags);
}
protected void doTransactionInsert(final DB db, Object threadstate) {
doInsert(db, threadstate);
}
protected void doTransactionScan(final DB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final String keyname = keys[Utils.random().nextInt(keys.length)];
// choose a random scan length
int len = scanlength.nextValue().intValue();
int offsets = Utils.random().nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
Set fields = new HashSet();
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
fields.add(tagKeys[i]);
} else {
fields.add(tagKeys[i] + tagPairDelimiter +
tagValues[Utils.random().nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
fields.add(timestampKey + tagPairDelimiter + startTimestamp);
}
if (groupBy) {
fields.add(groupByKey + tagPairDelimiter + groupByFunction);
}
if (downsample) {
fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval);
}
final Vector> results = new Vector>();
db.scan(table, keyname, len, fields, results);
}
protected void doTransactionDelete(final DB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final StringBuilder buf = new StringBuilder().append(keys[Utils.random().nextInt(keys.length)]);
int offsets = Utils.random().nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
buf.append(deleteDelimiter)
.append(tagKeys[i]);
} else {
buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter +
tagValues[Utils.random().nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
buf.append(deleteDelimiter)
.append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
buf.append(deleteDelimiter)
.append(timestampKey + tagPairDelimiter + startTimestamp);
}
db.delete(table, buf.toString());
}
/**
* Parses the values returned by a read or scan operation and determines whether
* or not the integer value matches the hash and timestamp of the original timestamp.
* Only works for raw data points, will not work for group-by's or downsampled data.
* @param key The time series key.
* @param cells The cells read by the DB.
* @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if
* the data did not match.
*/
protected Status verifyRow(final String key, final Map cells) {
Status verifyStatus = Status.UNEXPECTED_STATE;
long startTime = System.nanoTime();
double value = 0;
long timestamp = 0;
final TreeMap validationTags = new TreeMap();
for (final Entry entry : cells.entrySet()) {
if (entry.getKey().equals(timestampKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
timestamp = it.getLong();
} else if (entry.getKey().equals(valueKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
value = it.isFloatingPoint() ? it.getDouble() : it.getLong();
} else {
validationTags.put(entry.getKey(), entry.getValue().toString());
}
}
if (validationFunction(key, timestamp, validationTags) == value) {
verifyStatus = Status.OK;
}
long endTime = System.nanoTime();
measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
measurements.reportStatus("VERIFY", verifyStatus);
return verifyStatus;
}
/**
* Function used for generating a deterministic hash based on the combination
* of metric, tags and timestamp.
* @param key A non-null string representing the key.
* @param timestamp A timestamp in the proper units for the workload.
* @param tags A non-null map of tag keys and values NOT including the YCSB
* key or timestamp.
* @return A hash value as an 8 byte integer.
*/
protected long validationFunction(final String key, final long timestamp,
final TreeMap tags) {
final StringBuilder validationBuffer = new StringBuilder(keys[0].length() +
(tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1]));
for (final Entry pair : tags.entrySet()) {
validationBuffer.append(pair.getKey()).append(pair.getValue());
}
return (long) validationBuffer.toString().hashCode() ^ timestamp;
}
/**
* Breaks out the keys, tags and cardinality initialization in another method
* to keep CheckStyle happy.
* @throws WorkloadException If something goes pear shaped.
*/
protected void initKeysAndTags() throws WorkloadException {
final int keyLength = Integer.parseInt(properties.getProperty(
CoreWorkload.FIELD_LENGTH_PROPERTY,
CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT));
final int tagKeyLength = Integer.parseInt(properties.getProperty(
TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT));
final int tagValueLength = Integer.parseInt(properties.getProperty(
TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT));
keyGenerator = new IncrementingPrintableStringGenerator(keyLength);
tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength);
tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength);
final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1"));
final String tagCardinalityString = properties.getProperty(
TAG_CARDINALITY_PROPERTY,
TAG_CARDINALITY_PROPERTY_DEFAULT);
final String[] tagCardinalityParts = tagCardinalityString.split(",");
int idx = 0;
totalCardinality = numKeys;
perKeyCardinality = 1;
int maxCardinality = 0;
for (final String card : tagCardinalityParts) {
try {
tagCardinality[idx] = Integer.parseInt(card.trim());
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse cardinality: " +
card, nfe);
}
if (tagCardinality[idx] < 1) {
throw new WorkloadException("Cardinality must be greater than zero: " +
tagCardinality[idx]);
}
totalCardinality *= tagCardinality[idx];
perKeyCardinality *= tagCardinality[idx];
if (tagCardinality[idx] > maxCardinality) {
maxCardinality = tagCardinality[idx];
}
++idx;
if (idx >= tagPairs) {
// we have more cardinalities than tag keys so bail at this point.
break;
}
}
if (numKeys < threads) {
throw new WorkloadException("Field count " + numKeys + " (keys for time "
+ "series workloads) must be greater or equal to the number of "
+ "threads " + threads);
}
// fill tags without explicit cardinality with 1
if (idx < tagPairs) {
tagCardinality[idx++] = 1;
}
for (int i = 0; i < tagCardinality.length; ++i) {
if (tagCardinality[i] > 1) {
firstIncrementableCardinality = i;
break;
}
}
keys = new String[numKeys];
tagKeys = new String[tagPairs];
tagValues = new String[maxCardinality];
for (int i = 0; i < numKeys; ++i) {
keys[i] = keyGenerator.nextString();
}
for (int i = 0; i < tagPairs; ++i) {
tagKeys[i] = tagKeyGenerator.nextString();
}
for (int i = 0; i < maxCardinality; i++) {
tagValues[i] = tagValueGenerator.nextString();
}
if (randomizeTimeseriesOrder) {
Utils.shuffleArray(keys);
Utils.shuffleArray(tagValues);
}
maxOffsets = (recordcount / totalCardinality) + 1;
final int[] keyAndTagCardinality = new int[tagPairs + 1];
keyAndTagCardinality[0] = numKeys;
for (int i = 0; i < tagPairs; i++) {
keyAndTagCardinality[i + 1] = tagCardinality[i];
}
cumulativeCardinality = new int[keyAndTagCardinality.length];
for (int i = 0; i < keyAndTagCardinality.length; i++) {
int cumulation = 1;
for (int x = i; x <= keyAndTagCardinality.length - 1; x++) {
cumulation *= keyAndTagCardinality[x];
}
if (i > 0) {
cumulativeCardinality[i - 1] = cumulation;
}
}
cumulativeCardinality[cumulativeCardinality.length - 1] = 1;
}
/**
* Makes sure the settings as given are compatible.
* @throws WorkloadException If one or more settings were invalid.
*/
protected void validateSettings() throws WorkloadException {
if (dataintegrity) {
if (valueType != ValueType.INTEGERS) {
throw new WorkloadException("Data integrity was enabled. 'valuetype' must "
+ "be set to 'integers'.");
}
if (groupBy) {
throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must "
+ "be empty or null.");
}
if (downsample) {
throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must "
+ "be empty or null.");
}
if (queryTimeSpan > 0) {
throw new WorkloadException("Data integrity was enabled. 'querytimespan' must "
+ "be empty or 0.");
}
if (randomizeTimeseriesOrder) {
throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must "
+ "be false.");
}
final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
if (startTimestamp == null || startTimestamp.isEmpty()) {
throw new WorkloadException("Data integrity was enabled. 'insertstart' must "
+ "be set to a Unix Epoch timestamp.");
}
}
}
/**
* Thread state class holding thread local generators and indices.
*/
protected class ThreadState {
/** The timestamp generator for this thread. */
protected final UnixEpochTimestampGenerator timestampGenerator;
/** An offset generator to select a random offset for queries. */
protected final NumberGenerator queryOffsetGenerator;
/** The current write key index. */
protected int keyIdx;
/** The starting fence for writing keys. */
protected int keyIdxStart;
/** The ending fence for writing keys. */
protected int keyIdxEnd;
/** Indices for each tag value for writes. */
protected int[] tagValueIdxs;
/** Whether or not all time series have written values for the current timestamp. */
protected boolean rollover;
/** The starting timestamp. */
protected long startTimestamp;
/**
* Default ctor.
* @param threadID The zero based thread ID.
* @param threadCount The total number of threads.
* @throws WorkloadException If something went pear shaped.
*/
protected ThreadState(final int threadID, final int threadCount) throws WorkloadException {
int totalThreads = threadCount > 0 ? threadCount : 1;
if (threadID >= totalThreads) {
throw new IllegalStateException("Thread ID " + threadID + " cannot be greater "
+ "than or equal than the thread count " + totalThreads);
}
if (keys.length < threadCount) {
throw new WorkloadException("Thread count " + totalThreads + " must be greater "
+ "than or equal to key count " + keys.length);
}
int keysPerThread = keys.length / totalThreads;
keyIdx = keysPerThread * threadID;
keyIdxStart = keyIdx;
if (totalThreads - 1 == threadID) {
keyIdxEnd = keys.length;
} else {
keyIdxEnd = keyIdxStart + keysPerThread;
}
tagValueIdxs = new int[tagPairs]; // all zeros
final String startingTimestamp =
properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
if (startingTimestamp == null || startingTimestamp.isEmpty()) {
timestampGenerator = randomizeTimestampOrder ?
new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) :
new UnixEpochTimestampGenerator(timestampInterval, timeUnits);
} else {
try {
timestampGenerator = randomizeTimestampOrder ?
new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits,
Long.parseLong(startingTimestamp), maxOffsets) :
new UnixEpochTimestampGenerator(timestampInterval, timeUnits,
Long.parseLong(startingTimestamp));
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse the " +
CoreWorkload.INSERT_START_PROPERTY, nfe);
}
}
// Set the last value properly for the timestamp, otherwise it may start
// one interval ago.
startTimestamp = timestampGenerator.nextValue();
// TODO - pick it
queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2);
}
/**
* Generates the next write value for thread.
* @param map An initialized map to populate with tag keys and values as well
* as the timestamp and actual value.
* @param isInsert Whether or not it's an insert or an update. Updates will pick
* an older timestamp (if random isn't enabled).
* @return The next key to write.
*/
protected String nextDataPoint(final Map map, final boolean isInsert) {
int iterations = sparsity <= 0 ? 1 :
Utils.random().nextInt((int) ((double) perKeyCardinality * sparsity));
if (iterations < 1) {
iterations = 1;
}
while (true) {
iterations--;
if (rollover) {
timestampGenerator.nextValue();
rollover = false;
}
String key = null;
if (iterations <= 0) {
final TreeMap validationTags;
if (dataintegrity) {
validationTags = new TreeMap();
} else {
validationTags = null;
}
key = keys[keyIdx];
int overallIdx = keyIdx * cumulativeCardinality[0];
for (int i = 0; i < tagPairs; ++i) {
int tvidx = tagValueIdxs[i];
map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx]));
if (dataintegrity) {
validationTags.put(tagKeys[i], tagValues[tvidx]);
}
if (delayedSeries > 0) {
overallIdx += (tvidx * cumulativeCardinality[i + 1]);
}
}
if (!isInsert) {
final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval;
final int intervals = Utils.random().nextInt((int) delta);
map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval)));
} else if (delayedSeries > 0) {
// See if the series falls in a delay bucket and calculate an offset earlier
// than the current timestamp value if so.
double pct = (double) overallIdx / (double) totalCardinality;
if (pct < delayedSeries) {
int modulo = overallIdx % delayedIntervals;
if (modulo < 0) {
modulo *= -1;
}
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() -
timestampInterval * modulo));
} else {
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
}
} else {
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
}
if (dataintegrity) {
map.put(valueKey, new NumericByteIterator(validationFunction(key,
timestampGenerator.currentValue(), validationTags)));
} else {
switch (valueType) {
case INTEGERS:
map.put(valueKey, new NumericByteIterator(Utils.random().nextInt()));
break;
case FLOATS:
map.put(valueKey, new NumericByteIterator(
Utils.random().nextDouble() * (double) 100000));
break;
case MIXED:
if (Utils.random().nextBoolean()) {
map.put(valueKey, new NumericByteIterator(Utils.random().nextInt()));
} else {
map.put(valueKey, new NumericByteIterator(
Utils.random().nextDouble() * (double) 100000));
}
break;
default:
throw new IllegalStateException("Somehow we didn't have a value "
+ "type configured that we support: " + valueType);
}
}
}
boolean tagRollover = false;
for (int i = tagCardinality.length - 1; i >= 0; --i) {
if (tagCardinality[i] <= 1) {
- // nothing to increment here
+ tagRollover = true; // Only one tag so needs roll over.
continue;
}
if (tagValueIdxs[i] + 1 >= tagCardinality[i]) {
tagValueIdxs[i] = 0;
if (i == firstIncrementableCardinality) {
tagRollover = true;
}
} else {
++tagValueIdxs[i];
break;
}
}
if (tagRollover) {
if (keyIdx + 1 >= keyIdxEnd) {
keyIdx = keyIdxStart;
rollover = true;
} else {
++keyIdx;
}
}
if (iterations <= 0) {
return key;
}
}
}
}
}
\ No newline at end of file
diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
index 55acc1f6..409331ac 100644
--- a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
+++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
@@ -1,550 +1,577 @@
/**
* Copyright (c) 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Client;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.NumericByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import com.yahoo.ycsb.WorkloadException;
import com.yahoo.ycsb.measurements.Measurements;
import org.testng.annotations.Test;
public class TestTimeSeriesWorkload {
@Test
public void twoThreads() throws Exception {
final Properties p = getUTProperties();
Measurements.setProperties(p);
final TimeSeriesWorkload wl = new TimeSeriesWorkload();
wl.init(p);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAB");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test (expectedExceptions = WorkloadException.class)
public void badTimeUnit() throws Exception {
final Properties p = new Properties();
p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar");
getWorkload(p, true);
}
@Test (expectedExceptions = WorkloadException.class)
public void failedToInitWorkloadBeforeThreadInit() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, false);
//wl.init(p); // <-- we NEED this :(
final Object threadState = wl.initThread(p, 0, 2);
final MockDB db = new MockDB();
wl.doInsert(db, threadState);
}
@Test (expectedExceptions = IllegalStateException.class)
public void failedToInitThread() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final MockDB db = new MockDB();
wl.doInsert(db, null);
}
+
+ @Test
+ public void insertOneKeyOneTagCardinalityOne() throws Exception {
+ final Properties p = getUTProperties();
+ p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
+ p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "1");
+ p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1");
+ final TimeSeriesWorkload wl = getWorkload(p, true);
+ final Object threadState = wl.initThread(p, 0, 1);
+
+ final MockDB db = new MockDB();
+ for (int i = 0; i < 74; i++) {
+ assertTrue(wl.doInsert(db, threadState));
+ }
+ assertEquals(db.keys.size(), 74);
+ assertEquals(db.values.size(), 74);
+ long timestamp = 1451606400;
+ for (int i = 0; i < db.keys.size(); i++) {
+ assertEquals(db.keys.get(i), "AAAA");
+ assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+ assertEquals(Utils.bytesToLong(db.values.get(i).get(
+ TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+ assertTrue(((NumericByteIterator) db.values.get(i)
+ .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+ timestamp += 60;
+ }
+ }
@Test
public void insertOneKeyTwoTagsLowCardinality() throws Exception {
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void insertTwoKeysTwoTagsLowCardinality() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
int metricCtr = 0;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
}
if (metricCtr++ > 1) {
assertEquals(db.keys.get(i), "AAAB");
if (metricCtr >= 4) {
metricCtr = 0;
timestamp += 60;
}
} else {
assertEquals(db.keys.get(i), "AAAA");
}
}
}
@Test
public void insertTwoKeysTwoThreads() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA"); // <-- key 1
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAB"); // <-- key 2
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void insertThreeKeysTwoThreads() throws Exception {
// To make sure the distribution doesn't miss any metrics
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3");
final TimeSeriesWorkload wl = getWorkload(p, true);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
timestamp = 1451606400;
int metricCtr = 0;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
}
if (metricCtr++ > 1) {
assertEquals(db.keys.get(i), "AAAC");
if (metricCtr >= 4) {
metricCtr = 0;
timestamp += 60;
}
} else {
assertEquals(db.keys.get(i), "AAAB");
}
}
}
@Test
public void insertWithValidation() throws Exception {
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertFalse(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
// validation check
final TreeMap validationTags = new TreeMap();
for (final Entry entry : db.values.get(i).entrySet()) {
if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) ||
entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) {
continue;
}
validationTags.put(entry.getKey(), entry.getValue().toString());
}
assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags),
((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void read() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 20; i++) {
wl.doTransactionRead(db, threadState);
}
}
@Test
public void verifyRow() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final TreeMap validationTags = new TreeMap();
final HashMap cells = new HashMap();
validationTags.put("AA", "AAAA");
cells.put("AA", new StringByteIterator("AAAA"));
validationTags.put("AB", "AAAB");
cells.put("AB", new StringByteIterator("AAAB"));
long hash = wl.validationFunction("AAAA", 1451606400L, validationTags);
cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L));
cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash));
assertEquals(wl.verifyRow("AAAA", cells), Status.OK);
// tweak the last value a bit
for (final ByteIterator it : cells.values()) {
it.reset();
}
cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1));
assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
// no value cell, returns an unexpected state
for (final ByteIterator it : cells.values()) {
it.reset();
}
cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
}
@Test
public void validateSettingsDataIntegrity() throws Exception {
Properties p = getUTProperties();
// data validation incompatibilities
p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok
p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p = getUTProperties();
p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, "");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
}
/** Helper method that generates unit testing defaults for the properties map */
private Properties getUTProperties() {
final Properties p = new Properties();
p.put(Client.RECORD_COUNT_PROPERTY, "10");
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2");
p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4");
p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2");
p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4");
p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2");
p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2");
p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400");
p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0");
p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
return p;
}
/** Helper to setup the workload for testing. */
private TimeSeriesWorkload getWorkload(final Properties p, final boolean init)
throws WorkloadException {
Measurements.setProperties(p);
if (!init) {
return new TimeSeriesWorkload();
} else {
final TimeSeriesWorkload workload = new TimeSeriesWorkload();
workload.init(p);
return workload;
}
}
static class MockDB extends DB {
final List keys = new ArrayList();
final List