diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDB.java b/core/src/main/java/com/yahoo/ycsb/AsyncDB.java
new file mode 100644
index 00000000..f70ccbfc
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/AsyncDB.java
@@ -0,0 +1,198 @@
+/**
+ * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A layer for accessing a database to be benchmarked. Each thread in the client
+ * will be given its own instance of whatever DB class is to be used in the test.
+ * This class should be constructed using a no-argument constructor, so we can
+ * load it dynamically. Any argument-based initialization should be
+ * done by init().
+ *
+ * Note that YCSB does not make any use of the return codes returned by this class.
+ * Instead, it keeps a count of the return values and presents them to the user.
+ *
+ * The semantics of methods such as insert, update and delete vary from database
+ * to database. In particular, operations may or may not be durable once these
+ * methods commit, and some systems may return 'success' regardless of whether
+ * or not a tuple with a matching key existed before the call. Rather than dictate
+ * the exact semantics of these methods, we recommend you either implement them
+ * to match the database's default semantics, or the semantics of your
+ * target application. For the sake of comparison between experiments we also
+ * recommend you explain the semantics you chose when presenting performance results.
+ */
+public abstract class AsyncDB {
+ /**
+ * Properties for configuring this DB.
+ */
+ private Properties properties = new Properties();
+
+ /**
+ * Synchronous view of this DB.
+ */
+ private DB syncDB;
+
+ /**
+ * Set the properties for this DB.
+ */
+ public void setProperties(Properties p) {
+ properties = p;
+
+ }
+
+ /**
+ * Get the set of properties for this DB.
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Initialize any state for this DB.
+ * Called once per DB instance; there is one DB instance per client thread.
+ */
+ public void init() throws DBException {
+ }
+
+ /**
+ * Cleanup any state for this DB.
+ * Called once per DB instance; there is one DB instance per client thread.
+ */
+ public void cleanup() throws DBException {
+ }
+
+ /**
+ * Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
+ *
+ * @param table The name of the table
+ * @param key The record key of the record to read.
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A HashMap of field/value pairs for the result
+ * @return The result of the operation.
+ */
+ public abstract CompletableFuture read(String table, String key, Set fields,
+ Map result);
+
+ /**
+ * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored
+ * in a HashMap.
+ *
+ * @param table The name of the table
+ * @param startkey The record key of the first record to read.
+ * @param recordcount The number of records to read
+ * @param fields The list of fields to read, or null for all of them
+ * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+ * @return The result of the operation.
+ */
+ public abstract CompletableFuture scan(String table, String startkey, int recordcount, Set fields,
+ Vector> result);
+
+ /**
+ * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+ * record with the specified record key, overwriting any existing values with the same field name.
+ *
+ * @param table The name of the table
+ * @param key The record key of the record to write.
+ * @param values A HashMap of field/value pairs to update in the record
+ * @return The result of the operation.
+ */
+ public abstract CompletableFuture update(String table, String key, Map values);
+
+ /**
+ * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+ * record with the specified record key.
+ *
+ * @param table The name of the table
+ * @param key The record key of the record to insert.
+ * @param values A HashMap of field/value pairs to insert in the record
+ * @return The result of the operation.
+ */
+ public abstract CompletableFuture insert(String table, String key, Map values);
+
+ /**
+ * Delete a record from the database.
+ *
+ * @param table The name of the table
+ * @param key The record key of the record to delete.
+ * @return The result of the operation.
+ */
+ public abstract CompletableFuture delete(String table, String key);
+
+ private static final class SyncView extends DB {
+ private final AsyncDB db;
+
+ SyncView(AsyncDB db) {
+ this.db = db;
+ }
+
+ @Override
+ public void setProperties(Properties p) {
+ db.setProperties(p);
+ }
+
+ @Override
+ public Properties getProperties() {
+ return db.getProperties();
+ }
+
+ @Override
+ public Status read(String table, String key, Set fields, Map result) {
+ return db.read(table, key, fields, result).join();
+ }
+
+ @Override
+ public Status scan(String table, String startkey, int recordcount, Set fields,
+ Vector> result) {
+ return db.scan(table, startkey, recordcount, fields, result).join();
+ }
+
+ @Override
+ public Status update(String table, String key, Map values) {
+ return db.update(table, key, values).join();
+ }
+
+ @Override
+ public Status insert(String table, String key, Map values) {
+ return db.insert(table, key, values).join();
+ }
+
+ @Override
+ public Status delete(String table, String key) {
+ return db.delete(table, key).join();
+ }
+ }
+
+ /**
+ * Return a synchronous view of this database.
+ *
+ * @return A DB object that operates synchronously.
+ */
+ public DB syncView() {
+ if (syncDB == null) {
+ synchronized (this) {
+ if (syncDB == null) {
+ syncDB = new SyncView(this);
+ }
+ }
+ }
+ return syncDB;
+ }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java
new file mode 100644
index 00000000..a7bf1179
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java
@@ -0,0 +1,58 @@
+package com.yahoo.ycsb;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ *
+ */
+public class AsyncDBAdapter extends AsyncDB {
+
+ private final DB db;
+
+ public AsyncDBAdapter(final DB db) {
+ this.db = db;
+ }
+
+ @Override
+ public void setProperties(Properties p) {
+ db.setProperties(p);
+ }
+
+ @Override
+ public Properties getProperties() {
+ return db.getProperties();
+ }
+
+ @Override
+ public CompletableFuture read(String table, String key, Set fields,
+ Map result) {
+ return CompletableFuture.completedFuture(db.read(table, key, fields, result));
+ }
+
+ @Override
+ public CompletableFuture scan(String table, String startkey, int recordcount, Set fields,
+ Vector> result) {
+ return CompletableFuture.completedFuture(db.scan(table, startkey, recordcount, fields, result));
+ }
+
+ @Override
+ public CompletableFuture update(String table, String key, Map values) {
+ return CompletableFuture.completedFuture(db.update(table, key, values));
+ }
+
+ @Override
+ public CompletableFuture insert(String table, String key, Map values) {
+ return CompletableFuture.completedFuture(db.insert(table, key, values));
+ }
+
+ @Override
+ public CompletableFuture delete(String table, String key) {
+ return CompletableFuture.completedFuture(db.delete(table, key));
+ }
+
+ @Override
+ public DB syncView() {
+ return db;
+ }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java
index 00b1e378..b196957c 100644
--- a/core/src/main/java/com/yahoo/ycsb/Client.java
+++ b/core/src/main/java/com/yahoo/ycsb/Client.java
@@ -1,1124 +1,1124 @@
/**
* Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import com.yahoo.ycsb.measurements.Measurements;
import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* A thread to periodically show the status of the experiment to reassure you that progress is being made.
*/
class StatusThread extends Thread {
// Counts down each of the clients completing
private final CountDownLatch completeLatch;
// Stores the measurements for the run
private final Measurements measurements;
// Whether or not to track the JVM stats per run
private final boolean trackJVMStats;
// The clients that are running.
private final List clients;
private final String label;
private final boolean standardstatus;
// The interval for reporting status.
private long sleeptimeNs;
// JVM max/mins
private int maxThreads;
private int minThreads = Integer.MAX_VALUE;
private long maxUsedMem;
private long minUsedMem = Long.MAX_VALUE;
private double maxLoadAvg;
private double minLoadAvg = Double.MAX_VALUE;
private long lastGCCount = 0;
private long lastGCTime = 0;
/**
* Creates a new StatusThread without JVM stat tracking.
*
* @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()}
* as they complete.
* @param clients The clients to collect metrics from.
* @param label The label for the status.
* @param standardstatus If true the status is printed to stdout in addition to stderr.
* @param statusIntervalSeconds The number of seconds between status updates.
*/
public StatusThread(CountDownLatch completeLatch, List clients,
String label, boolean standardstatus, int statusIntervalSeconds) {
this(completeLatch, clients, label, standardstatus, statusIntervalSeconds, false);
}
/**
* Creates a new StatusThread.
*
* @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()}
* as they complete.
* @param clients The clients to collect metrics from.
* @param label The label for the status.
* @param standardstatus If true the status is printed to stdout in addition to stderr.
* @param statusIntervalSeconds The number of seconds between status updates.
* @param trackJVMStats Whether or not to track JVM stats.
*/
public StatusThread(CountDownLatch completeLatch, List clients,
String label, boolean standardstatus, int statusIntervalSeconds,
boolean trackJVMStats) {
this.completeLatch = completeLatch;
this.clients = clients;
this.label = label;
this.standardstatus = standardstatus;
sleeptimeNs = TimeUnit.SECONDS.toNanos(statusIntervalSeconds);
measurements = Measurements.getMeasurements();
this.trackJVMStats = trackJVMStats;
}
/**
* Run and periodically report status.
*/
@Override
public void run() {
final long startTimeMs = System.currentTimeMillis();
final long startTimeNanos = System.nanoTime();
long deadline = startTimeNanos + sleeptimeNs;
long startIntervalMs = startTimeMs;
long lastTotalOps = 0;
boolean alldone;
do {
long nowMs = System.currentTimeMillis();
lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps);
if (trackJVMStats) {
measureJVM();
}
alldone = waitForClientsUntil(deadline);
startIntervalMs = nowMs;
deadline += sleeptimeNs;
}
while (!alldone);
if (trackJVMStats) {
measureJVM();
}
// Print the final stats.
computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps);
}
/**
* Computes and prints the stats.
*
* @param startTimeMs The start time of the test.
* @param startIntervalMs The start time of this interval.
* @param endIntervalMs The end time (now) for the interval.
* @param lastTotalOps The last total operations count.
* @return The current operation count.
*/
private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs,
long lastTotalOps) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
long totalops = 0;
long todoops = 0;
// Calculate the total number of operations completed.
for (ClientThread t : clients) {
totalops += t.getOpsDone();
todoops += t.getOpsTodo();
}
long interval = endIntervalMs - startTimeMs;
double throughput = 1000.0 * (((double) totalops) / (double) interval);
double curthroughput = 1000.0 * (((double) (totalops - lastTotalOps)) /
((double) (endIntervalMs - startIntervalMs)));
long estremaining = (long) Math.ceil(todoops / throughput);
DecimalFormat d = new DecimalFormat("#.##");
String labelString = this.label + format.format(new Date());
StringBuilder msg = new StringBuilder(labelString).append(" ").append(interval / 1000).append(" sec: ");
msg.append(totalops).append(" operations; ");
if (totalops != 0) {
msg.append(d.format(curthroughput)).append(" current ops/sec; ");
}
if (todoops != 0) {
msg.append("est completion in ").append(RemainingFormatter.format(estremaining));
}
msg.append(Measurements.getMeasurements().getSummary());
System.err.println(msg);
if (standardstatus) {
System.out.println(msg);
}
return totalops;
}
/**
* Waits for all of the client to finish or the deadline to expire.
*
* @param deadline The current deadline.
* @return True if all of the clients completed.
*/
private boolean waitForClientsUntil(long deadline) {
boolean alldone = false;
long now = System.nanoTime();
while (!alldone && now < deadline) {
try {
alldone = completeLatch.await(deadline - now, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
// If we are interrupted the thread is being asked to shutdown.
// Return true to indicate that and reset the interrupt state
// of the thread.
Thread.currentThread().interrupt();
alldone = true;
}
now = System.nanoTime();
}
return alldone;
}
/**
* Executes the JVM measurements.
*/
private void measureJVM() {
final int threads = Utils.getActiveThreadCount();
if (threads < minThreads) {
minThreads = threads;
}
if (threads > maxThreads) {
maxThreads = threads;
}
measurements.measure("THREAD_COUNT", threads);
// TODO - once measurements allow for other number types, switch to using
// the raw bytes. Otherwise we can track in MB to avoid negative values
// when faced with huge heaps.
final int usedMem = Utils.getUsedMemoryMegaBytes();
if (usedMem < minUsedMem) {
minUsedMem = usedMem;
}
if (usedMem > maxUsedMem) {
maxUsedMem = usedMem;
}
measurements.measure("USED_MEM_MB", usedMem);
// Some JVMs may not implement this feature so if the value is less than
// zero, just ommit it.
final double systemLoad = Utils.getSystemLoadAverage();
if (systemLoad >= 0) {
// TODO - store the double if measurements allows for them
measurements.measure("SYS_LOAD_AVG", (int) systemLoad);
if (systemLoad > maxLoadAvg) {
maxLoadAvg = systemLoad;
}
if (systemLoad < minLoadAvg) {
minLoadAvg = systemLoad;
}
}
final long gcs = Utils.getGCTotalCollectionCount();
measurements.measure("GCS", (int) (gcs - lastGCCount));
final long gcTime = Utils.getGCTotalTime();
measurements.measure("GCS_TIME", (int) (gcTime - lastGCTime));
lastGCCount = gcs;
lastGCTime = gcTime;
}
/**
* @return The maximum threads running during the test.
*/
public int getMaxThreads() {
return maxThreads;
}
/**
* @return The minimum threads running during the test.
*/
public int getMinThreads() {
return minThreads;
}
/**
* @return The maximum memory used during the test.
*/
public long getMaxUsedMem() {
return maxUsedMem;
}
/**
* @return The minimum memory used during the test.
*/
public long getMinUsedMem() {
return minUsedMem;
}
/**
* @return The maximum load average during the test.
*/
public double getMaxLoadAvg() {
return maxLoadAvg;
}
/**
* @return The minimum load average during the test.
*/
public double getMinLoadAvg() {
return minLoadAvg;
}
/**
* @return Whether or not the thread is tracking JVM stats.
*/
public boolean trackJVMStats() {
return trackJVMStats;
}
}
/**
* Turn seconds remaining into more useful units.
* i.e. if there are hours or days worth of seconds, use them.
*/
final class RemainingFormatter {
private RemainingFormatter() {
// not used
}
public static StringBuilder format(long seconds) {
StringBuilder time = new StringBuilder();
long days = TimeUnit.SECONDS.toDays(seconds);
if (days > 0) {
time.append(days).append(days == 1 ? " day " : " days ");
seconds -= TimeUnit.DAYS.toSeconds(days);
}
long hours = TimeUnit.SECONDS.toHours(seconds);
if (hours > 0) {
time.append(hours).append(hours == 1 ? " hour " : " hours ");
seconds -= TimeUnit.HOURS.toSeconds(hours);
}
/* Only include minute granularity if we're < 1 day. */
if (days < 1) {
long minutes = TimeUnit.SECONDS.toMinutes(seconds);
if (minutes > 0) {
time.append(minutes).append(minutes == 1 ? " minute " : " minutes ");
seconds -= TimeUnit.MINUTES.toSeconds(seconds);
}
}
/* Only bother to include seconds if we're < 1 minute */
if (time.length() == 0) {
time.append(seconds).append(time.length() == 1 ? " second " : " seconds ");
}
return time;
}
}
/**
* A thread for executing transactions or data inserts to the database.
*/
class ClientThread implements Runnable {
// Counts down each of the clients completing.
private final CountDownLatch completeLatch;
private static boolean spinSleep;
- private DB db;
+ private AsyncDB db;
private boolean dotransactions;
private Workload workload;
private int opcount;
private double targetOpsPerMs;
private int opsdone;
private int threadid;
private int threadcount;
private Object workloadstate;
private Properties props;
private long targetOpsTickNs;
private final Measurements measurements;
/**
* Constructor.
*
* @param db the DB implementation to use
* @param dotransactions true to do transactions, false to insert data
* @param workload the workload to use
* @param props the properties defining the experiment
* @param opcount the number of operations (transactions or inserts) to do
* @param targetperthreadperms target number of operations per thread per ms
* @param completeLatch The latch tracking the completion of all clients.
*/
- public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount,
+ public ClientThread(AsyncDB db, boolean dotransactions, Workload workload, Properties props, int opcount,
double targetperthreadperms, CountDownLatch completeLatch) {
this.db = db;
this.dotransactions = dotransactions;
this.workload = workload;
this.opcount = opcount;
opsdone = 0;
if (targetperthreadperms > 0) {
targetOpsPerMs = targetperthreadperms;
targetOpsTickNs = (long) (1000000 / targetOpsPerMs);
}
this.props = props;
measurements = Measurements.getMeasurements();
spinSleep = Boolean.valueOf(this.props.getProperty("spin.sleep", "false"));
this.completeLatch = completeLatch;
}
public void setThreadId(final int threadId) {
threadid = threadId;
}
public void setThreadCount(final int threadCount) {
threadcount = threadCount;
}
public int getOpsDone() {
return opsdone;
}
@Override
public void run() {
try {
db.init();
} catch (DBException e) {
e.printStackTrace();
e.printStackTrace(System.out);
return;
}
try {
workloadstate = workload.initThread(props, threadid, threadcount);
} catch (WorkloadException e) {
e.printStackTrace();
e.printStackTrace(System.out);
return;
}
//NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements
// and the client thread have the same view on time.
//spread the thread operations out so they don't all hit the DB at the same time
// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
// and the sleep() doesn't make sense for granularities < 1 ms anyway
if ((targetOpsPerMs > 0) && (targetOpsPerMs <= 1.0)) {
long randomMinorDelay = ThreadLocalRandom.current().nextInt((int) targetOpsTickNs);
sleepUntil(System.nanoTime() + randomMinorDelay);
}
try {
if (dotransactions) {
long startTimeNanos = System.nanoTime();
while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) {
if (!workload.doTransaction(db, workloadstate)) {
break;
}
opsdone++;
throttleNanos(startTimeNanos);
}
} else {
long startTimeNanos = System.nanoTime();
while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) {
if (!workload.doInsert(db, workloadstate)) {
break;
}
opsdone++;
throttleNanos(startTimeNanos);
}
}
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try {
measurements.setIntendedStartTimeNs(0);
db.cleanup();
} catch (DBException e) {
e.printStackTrace();
e.printStackTrace(System.out);
} finally {
completeLatch.countDown();
}
}
private static void sleepUntil(long deadline) {
while (System.nanoTime() < deadline) {
if (!spinSleep) {
LockSupport.parkNanos(deadline - System.nanoTime());
}
}
}
private void throttleNanos(long startTimeNanos) {
//throttle the operations
if (targetOpsPerMs > 0) {
// delay until next tick
long deadline = startTimeNanos + opsdone * targetOpsTickNs;
sleepUntil(deadline);
measurements.setIntendedStartTimeNs(deadline);
}
}
/**
* The total amount of work this thread is still expected to do.
*/
int getOpsTodo() {
int todo = opcount - opsdone;
return todo < 0 ? 0 : todo;
}
}
/**
* Main class for executing YCSB.
*/
public final class Client {
private Client() {
//not used
}
public static final String DEFAULT_RECORD_COUNT = "0";
/**
* The target number of operations to perform.
*/
public static final String OPERATION_COUNT_PROPERTY = "operationcount";
/**
* The number of records to load into the database initially.
*/
public static final String RECORD_COUNT_PROPERTY = "recordcount";
/**
* The workload class to be loaded.
*/
public static final String WORKLOAD_PROPERTY = "workload";
/**
* The database class to be used.
*/
public static final String DB_PROPERTY = "db";
/**
* The exporter class to be used. The default is
* com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter.
*/
public static final String EXPORTER_PROPERTY = "exporter";
/**
* If set to the path of a file, YCSB will write all output to this file
* instead of STDOUT.
*/
public static final String EXPORT_FILE_PROPERTY = "exportfile";
/**
* The number of YCSB client threads to run.
*/
public static final String THREAD_COUNT_PROPERTY = "threadcount";
/**
* Indicates how many inserts to do if less than recordcount.
* Useful for partitioning the load among multiple servers if the client is the bottleneck.
* Additionally workloads should support the "insertstart" property which tells them which record to start at.
*/
public static final String INSERT_COUNT_PROPERTY = "insertcount";
/**
* Target number of operations per second.
*/
public static final String TARGET_PROPERTY = "target";
/**
* The maximum amount of time (in seconds) for which the benchmark will be run.
*/
public static final String MAX_EXECUTION_TIME = "maxexecutiontime";
/**
* Whether or not this is the transaction phase (run) or not (load).
*/
public static final String DO_TRANSACTIONS_PROPERTY = "dotransactions";
/**
* Whether or not to show status during run.
*/
public static final String STATUS_PROPERTY = "status";
/**
* Use label for status (e.g. to label one experiment out of a whole batch).
*/
public static final String LABEL_PROPERTY = "label";
/**
* An optional thread used to track progress and measure JVM stats.
*/
private static StatusThread statusthread = null;
// HTrace integration related constants.
/**
* All keys for configuring the tracing system start with this prefix.
*/
private static final String HTRACE_KEY_PREFIX = "htrace.";
private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init";
private static final String CLIENT_INIT_SPAN = "Client#init";
private static final String CLIENT_WORKLOAD_SPAN = "Client#workload";
private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup";
private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements";
public static void usageMessage() {
System.out.println("Usage: java com.yahoo.ycsb.Client [options]");
System.out.println("Options:");
System.out.println(" -threads n: execute using n threads (default: 1) - can also be specified as the \n" +
" \"threadcount\" property using -p");
System.out.println(" -target n: attempt to do n operations per second (default: unlimited) - can also\n" +
" be specified as the \"target\" property using -p");
System.out.println(" -load: run the loading phase of the workload");
System.out.println(" -t: run the transactions phase of the workload (default)");
System.out.println(" -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - \n" +
" can also be specified as the \"db\" property using -p");
System.out.println(" -P propertyfile: load properties from the given file. Multiple files can");
System.out.println(" be specified, and will be processed in the order specified");
System.out.println(" -p name=value: specify a property to be passed to the DB and workloads;");
System.out.println(" multiple properties can be specified, and override any");
System.out.println(" values in the propertyfile");
System.out.println(" -s: show status during run (default: no status)");
System.out.println(" -l label: use label for status (e.g. to label one experiment out of a whole batch)");
System.out.println("");
System.out.println("Required properties:");
System.out.println(" " + WORKLOAD_PROPERTY + ": the name of the workload class to use (e.g. " +
"com.yahoo.ycsb.workloads.CoreWorkload)");
System.out.println("");
System.out.println("To run the transaction phase from multiple servers, start a separate client on each.");
System.out.println("To run the load phase from multiple servers, start a separate client on each; additionally,");
System.out.println("use the \"insertcount\" and \"insertstart\" properties to divide up the records " +
"to be inserted");
}
public static boolean checkRequiredProperties(Properties props) {
if (props.getProperty(WORKLOAD_PROPERTY) == null) {
System.out.println("Missing property: " + WORKLOAD_PROPERTY);
return false;
}
return true;
}
/**
* Exports the measurements to either sysout or a file using the exporter
* loaded from conf.
*
* @throws IOException Either failed to write to output stream or failed to close it.
*/
private static void exportMeasurements(Properties props, int opcount, long runtime)
throws IOException {
MeasurementsExporter exporter = null;
try {
// if no destination file is provided the results will be written to stdout
OutputStream out;
String exportFile = props.getProperty(EXPORT_FILE_PROPERTY);
if (exportFile == null) {
out = System.out;
} else {
out = new FileOutputStream(exportFile);
}
// if no exporter is provided the default text one will be used
String exporterStr = props.getProperty(EXPORTER_PROPERTY,
"com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter");
try {
exporter = (MeasurementsExporter) Class.forName(exporterStr).getConstructor(OutputStream.class)
.newInstance(out);
} catch (Exception e) {
System.err.println("Could not find exporter " + exporterStr
+ ", will use default text reporter.");
e.printStackTrace();
exporter = new TextMeasurementsExporter(out);
}
exporter.write("OVERALL", "RunTime(ms)", runtime);
double throughput = 1000.0 * (opcount) / (runtime);
exporter.write("OVERALL", "Throughput(ops/sec)", throughput);
final Map gcs = Utils.getGCStatst();
long totalGCCount = 0;
long totalGCTime = 0;
for (final Entry entry : gcs.entrySet()) {
exporter.write("TOTAL_GCS_" + entry.getKey(), "Count", entry.getValue()[0]);
exporter.write("TOTAL_GC_TIME_" + entry.getKey(), "Time(ms)", entry.getValue()[1]);
exporter.write("TOTAL_GC_TIME_%_" + entry.getKey(), "Time(%)",
((double) entry.getValue()[1] / runtime) * (double) 100);
totalGCCount += entry.getValue()[0];
totalGCTime += entry.getValue()[1];
}
exporter.write("TOTAL_GCs", "Count", totalGCCount);
exporter.write("TOTAL_GC_TIME", "Time(ms)", totalGCTime);
exporter.write("TOTAL_GC_TIME_%", "Time(%)", ((double) totalGCTime / runtime) * (double) 100);
if (statusthread != null && statusthread.trackJVMStats()) {
exporter.write("MAX_MEM_USED", "MBs", statusthread.getMaxUsedMem());
exporter.write("MIN_MEM_USED", "MBs", statusthread.getMinUsedMem());
exporter.write("MAX_THREADS", "Count", statusthread.getMaxThreads());
exporter.write("MIN_THREADS", "Count", statusthread.getMinThreads());
exporter.write("MAX_SYS_LOAD_AVG", "Load", statusthread.getMaxLoadAvg());
exporter.write("MIN_SYS_LOAD_AVG", "Load", statusthread.getMinLoadAvg());
}
Measurements.getMeasurements().exportMeasurements(exporter);
} finally {
if (exporter != null) {
exporter.close();
}
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args) {
Properties props = parseArguments(args);
boolean status = Boolean.valueOf(props.getProperty(STATUS_PROPERTY, String.valueOf(false)));
String label = props.getProperty(LABEL_PROPERTY, "");
long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0"));
//get number of threads, target and db
int threadcount = Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY, "1"));
String dbname = props.getProperty(DB_PROPERTY, "com.yahoo.ycsb.BasicDB");
int target = Integer.parseInt(props.getProperty(TARGET_PROPERTY, "0"));
//compute the target throughput
double targetperthreadperms = -1;
if (target > 0) {
double targetperthread = ((double) target) / ((double) threadcount);
targetperthreadperms = targetperthread / 1000.0;
}
Thread warningthread = setupWarningThread();
warningthread.start();
Measurements.setProperties(props);
Workload workload = getWorkload(props);
final Tracer tracer = getTracer(props, workload);
initWorkload(props, warningthread, workload, tracer);
System.err.println("Starting test.");
final CountDownLatch completeLatch = new CountDownLatch(threadcount);
final List clients = initDb(dbname, props, threadcount, targetperthreadperms,
workload, tracer, completeLatch);
if (status) {
boolean standardstatus = false;
if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY, "").compareTo("timeseries") == 0) {
standardstatus = true;
}
int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval", "10"));
boolean trackJVMStats = props.getProperty(Measurements.MEASUREMENT_TRACK_JVM_PROPERTY,
Measurements.MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT).equals("true");
statusthread = new StatusThread(completeLatch, clients, label, standardstatus, statusIntervalSeconds,
trackJVMStats);
statusthread.start();
}
Thread terminator = null;
long st;
long en;
int opsDone;
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) {
final Map threads = new HashMap<>(threadcount);
for (ClientThread client : clients) {
threads.put(new Thread(tracer.wrap(client, "ClientThread")), client);
}
st = System.currentTimeMillis();
for (Thread t : threads.keySet()) {
t.start();
}
if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload);
terminator.start();
}
opsDone = 0;
for (Map.Entry entry : threads.entrySet()) {
try {
entry.getKey().join();
opsDone += entry.getValue().getOpsDone();
} catch (InterruptedException ignored) {
// ignored
}
}
en = System.currentTimeMillis();
}
try {
try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) {
if (terminator != null && !terminator.isInterrupted()) {
terminator.interrupt();
}
if (status) {
// wake up status thread if it's asleep
statusthread.interrupt();
// at this point we assume all the monitored threads are already gone as per above join loop.
try {
statusthread.join();
} catch (InterruptedException ignored) {
// ignored
}
}
workload.cleanup();
}
} catch (WorkloadException e) {
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try {
try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) {
exportMeasurements(props, opsDone, en - st);
}
} catch (IOException e) {
System.err.println("Could not export measurements, error: " + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
private static List initDb(String dbname, Properties props, int threadcount,
double targetperthreadperms, Workload workload, Tracer tracer,
CountDownLatch completeLatch) {
boolean initFailed = false;
boolean dotransactions = Boolean.valueOf(props.getProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true)));
final List clients = new ArrayList<>(threadcount);
try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) {
int opcount;
if (dotransactions) {
opcount = Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY, "0"));
} else {
if (props.containsKey(INSERT_COUNT_PROPERTY)) {
opcount = Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY, "0"));
} else {
opcount = Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
}
}
for (int threadid = 0; threadid < threadcount; threadid++) {
- DB db;
+ AsyncDB db;
try {
db = DBFactory.newDB(dbname, props, tracer);
} catch (UnknownDBException e) {
System.out.println("Unknown DB " + dbname);
initFailed = true;
break;
}
int threadopcount = opcount / threadcount;
// ensure correct number of operations, in case opcount is not a multiple of threadcount
if (threadid < opcount % threadcount) {
++threadopcount;
}
ClientThread t = new ClientThread(db, dotransactions, workload, props, threadopcount, targetperthreadperms,
completeLatch);
t.setThreadId(threadid);
t.setThreadCount(threadcount);
clients.add(t);
}
if (initFailed) {
System.err.println("Error initializing datastore bindings.");
System.exit(0);
}
}
return clients;
}
private static Tracer getTracer(Properties props, Workload workload) {
return new Tracer.Builder("YCSB " + workload.getClass().getSimpleName())
.conf(getHTraceConfiguration(props))
.build();
}
private static void initWorkload(Properties props, Thread warningthread, Workload workload, Tracer tracer) {
try {
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) {
workload.init(props);
warningthread.interrupt();
}
} catch (WorkloadException e) {
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
}
private static HTraceConfiguration getHTraceConfiguration(Properties props) {
final Map filteredProperties = new HashMap<>();
for (String key : props.stringPropertyNames()) {
if (key.startsWith(HTRACE_KEY_PREFIX)) {
filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key));
}
}
return HTraceConfiguration.fromMap(filteredProperties);
}
private static Thread setupWarningThread() {
//show a warning message that creating the workload is taking a while
//but only do so if it is taking longer than 2 seconds
//(showing the message right away if the setup wasn't taking very long was confusing people)
return new Thread() {
@Override
public void run() {
try {
sleep(2000);
} catch (InterruptedException e) {
return;
}
System.err.println(" (might take a few minutes for large data sets)");
}
};
}
private static Workload getWorkload(Properties props) {
ClassLoader classLoader = Client.class.getClassLoader();
try {
Properties projectProp = new Properties();
projectProp.load(classLoader.getResourceAsStream("project.properties"));
System.err.println("YCSB Client " + projectProp.getProperty("version"));
} catch (IOException e) {
System.err.println("Unable to retrieve client version.");
}
System.err.println();
System.err.println("Loading workload...");
try {
Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY));
return (Workload) workloadclass.newInstance();
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
return null;
}
private static Properties parseArguments(String[] args) {
Properties props = new Properties();
System.err.print("Command line:");
for (String arg : args) {
System.err.print(" " + arg);
}
System.err.println();
Properties fileprops = new Properties();
int argindex = 0;
if (args.length == 0) {
usageMessage();
System.out.println("At least one argument specifying a workload is required.");
System.exit(0);
}
while (args[argindex].startsWith("-")) {
if (args[argindex].compareTo("-threads") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -threads.");
System.exit(0);
}
int tcount = Integer.parseInt(args[argindex]);
props.setProperty(THREAD_COUNT_PROPERTY, String.valueOf(tcount));
argindex++;
} else if (args[argindex].compareTo("-target") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -target.");
System.exit(0);
}
int ttarget = Integer.parseInt(args[argindex]);
props.setProperty(TARGET_PROPERTY, String.valueOf(ttarget));
argindex++;
} else if (args[argindex].compareTo("-load") == 0) {
props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(false));
argindex++;
} else if (args[argindex].compareTo("-t") == 0) {
props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true));
argindex++;
} else if (args[argindex].compareTo("-s") == 0) {
props.setProperty(STATUS_PROPERTY, String.valueOf(true));
argindex++;
} else if (args[argindex].compareTo("-db") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -db.");
System.exit(0);
}
props.setProperty(DB_PROPERTY, args[argindex]);
argindex++;
} else if (args[argindex].compareTo("-l") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -l.");
System.exit(0);
}
props.setProperty(LABEL_PROPERTY, args[argindex]);
argindex++;
} else if (args[argindex].compareTo("-P") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -P.");
System.exit(0);
}
String propfile = args[argindex];
argindex++;
Properties myfileprops = new Properties();
try {
myfileprops.load(new FileInputStream(propfile));
} catch (IOException e) {
System.out.println("Unable to open the properties file " + propfile);
System.out.println(e.getMessage());
System.exit(0);
}
//Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5
for (Enumeration e = myfileprops.propertyNames(); e.hasMoreElements();) {
String prop = (String) e.nextElement();
fileprops.setProperty(prop, myfileprops.getProperty(prop));
}
} else if (args[argindex].compareTo("-p") == 0) {
argindex++;
if (argindex >= args.length) {
usageMessage();
System.out.println("Missing argument value for -p");
System.exit(0);
}
int eq = args[argindex].indexOf('=');
if (eq < 0) {
usageMessage();
System.out.println("Argument '-p' expected to be in key=value format (e.g., -p operationcount=99999)");
System.exit(0);
}
String name = args[argindex].substring(0, eq);
String value = args[argindex].substring(eq + 1);
props.put(name, value);
argindex++;
} else {
usageMessage();
System.out.println("Unknown option " + args[argindex]);
System.exit(0);
}
if (argindex >= args.length) {
break;
}
}
if (argindex != args.length) {
usageMessage();
if (argindex < args.length) {
System.out.println("An argument value without corresponding argument specifier (e.g., -p, -s) was found. "
+ "We expected an argument specifier and instead found " + args[argindex]);
} else {
System.out.println("An argument specifier without corresponding value was found at the end of the supplied " +
"command line arguments.");
}
System.exit(0);
}
//overwrite file properties with properties from the command line
//Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5
for (Enumeration e = props.propertyNames(); e.hasMoreElements();) {
String prop = (String) e.nextElement();
fileprops.setProperty(prop, props.getProperty(prop));
}
props = fileprops;
if (!checkRequiredProperties(props)) {
System.out.println("Failed check required properties.");
System.exit(0);
}
return props;
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/main/java/com/yahoo/ycsb/DBFactory.java
index 2529e6ce..12043635 100644
--- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java
+++ b/core/src/main/java/com/yahoo/ycsb/DBFactory.java
@@ -1,51 +1,55 @@
/**
* Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import org.apache.htrace.core.Tracer;
import java.util.Properties;
/**
* Creates a DB layer by dynamically classloading the specified DB class.
*/
public final class DBFactory {
private DBFactory() {
// not used
}
- public static DB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException {
+ public static AsyncDB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException {
ClassLoader classLoader = DBFactory.class.getClassLoader();
- DB ret;
+ AsyncDB ret;
try {
Class dbclass = classLoader.loadClass(dbname);
- ret = (DB) dbclass.newInstance();
+ if (DB.class.isAssignableFrom(dbclass)) {
+ ret = new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance());
+ } else {
+ ret = (AsyncDB) dbclass.getDeclaredConstructor().newInstance();
+ }
} catch (Exception e) {
e.printStackTrace();
return null;
}
ret.setProperties(properties);
return new DBWrapper(ret, tracer);
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
index d5a238b6..68e66d2d 100644
--- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
+++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
@@ -1,247 +1,294 @@
/**
* Copyright (c) 2010 Yahoo! Inc., 2016-2017 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
-import java.util.Map;
import com.yahoo.ycsb.measurements.Measurements;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
/**
* Wrapper around a "real" DB that measures latencies and counts return codes.
* Also reports latency separately between OK and failed operations.
+ * Waits for async calls to finish during cleanup() operation.
*/
-public class DBWrapper extends DB {
- private final DB db;
+public class DBWrapper extends AsyncDB {
+ private final AsyncDB db;
private final Measurements measurements;
private final Tracer tracer;
private boolean reportLatencyForEachError = false;
private Set latencyTrackedErrors = new HashSet();
+ private final Set> queries = new HashSet<>();
private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY = "reportlatencyforeacherror";
private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT = "false";
private static final String LATENCY_TRACKED_ERRORS_PROPERTY = "latencytrackederrors";
private final String scopeStringCleanup;
private final String scopeStringDelete;
private final String scopeStringInit;
private final String scopeStringInsert;
private final String scopeStringRead;
private final String scopeStringScan;
private final String scopeStringUpdate;
- public DBWrapper(final DB db, final Tracer tracer) {
+ public DBWrapper(final AsyncDB db, final Tracer tracer) {
this.db = db;
measurements = Measurements.getMeasurements();
this.tracer = tracer;
final String simple = db.getClass().getSimpleName();
scopeStringCleanup = simple + "#cleanup";
scopeStringDelete = simple + "#delete";
scopeStringInit = simple + "#init";
scopeStringInsert = simple + "#insert";
scopeStringRead = simple + "#read";
scopeStringScan = simple + "#scan";
scopeStringUpdate = simple + "#update";
}
/**
* Set the properties for this DB.
*/
public void setProperties(Properties p) {
db.setProperties(p);
}
/**
* Get the set of properties for this DB.
*/
public Properties getProperties() {
return db.getProperties();
}
/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void init() throws DBException {
try (final TraceScope span = tracer.newScope(scopeStringInit)) {
db.init();
this.reportLatencyForEachError = Boolean.parseBoolean(getProperties().
getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY,
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT));
if (!reportLatencyForEachError) {
String latencyTrackedErrorsProperty = getProperties().getProperty(LATENCY_TRACKED_ERRORS_PROPERTY, null);
if (latencyTrackedErrorsProperty != null) {
this.latencyTrackedErrors = new HashSet(Arrays.asList(
latencyTrackedErrorsProperty.split(",")));
}
}
System.err.println("DBWrapper: report latency for each error is " +
this.reportLatencyForEachError + " and specific error codes to track" +
" for latency are: " + this.latencyTrackedErrors.toString());
}
}
/**
* Cleanup any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void cleanup() throws DBException {
try (final TraceScope span = tracer.newScope(scopeStringCleanup)) {
+ // Wait for any asynchronous operations to finish.
+ System.err.println("DBWrapper: Waiting 100ms for all queries to finish.");
+ CompletableFuture.allOf(queries.toArray(new CompletableFuture[0])).join();
+
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
db.cleanup();
long en = System.nanoTime();
measure("CLEANUP", Status.OK, ist, st, en);
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return The result of the operation.
*/
- public Status read(String table, String key, Set fields,
- Map result) {
+ public CompletableFuture read(String table, String key, Set fields,
+ Map result) {
try (final TraceScope span = tracer.newScope(scopeStringRead)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- Status res = db.read(table, key, fields, result);
- long en = System.nanoTime();
- measure("READ", res, ist, st, en);
- measurements.reportStatus("READ", res);
- return res;
+
+ CompletableFuture readFuture = db.read(table, key, fields, result).whenComplete((res, ex) -> {
+ if (ex != null) {
+ System.err.println("READ failed due to exception: " + ex);
+ res = Status.ERROR;
+ }
+ long en = System.nanoTime();
+ measure("READ", res, ist, st, en);
+ measurements.reportStatus("READ", res);
+ });
+
+ queries.add(readFuture);
+ return readFuture;
}
}
/**
* Perform a range scan for a set of records in the database.
* Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return The result of the operation.
*/
- public Status scan(String table, String startkey, int recordcount,
- Set fields, Vector> result) {
+ public CompletableFuture scan(String table, String startkey, int recordcount,
+ Set fields, Vector> result) {
try (final TraceScope span = tracer.newScope(scopeStringScan)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- Status res = db.scan(table, startkey, recordcount, fields, result);
- long en = System.nanoTime();
- measure("SCAN", res, ist, st, en);
- measurements.reportStatus("SCAN", res);
- return res;
+
+ CompletableFuture scanFuture = db.scan(table, startkey, recordcount, fields, result)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ System.err.println("SCAN failed due to exception: " + ex);
+ res = Status.ERROR;
+ }
+ long en = System.nanoTime();
+ measure("SCAN", res, ist, st, en);
+ measurements.reportStatus("SCAN", res);
+ });
+
+ queries.add(scanFuture);
+ return scanFuture;
}
}
private void measure(String op, Status result, long intendedStartTimeNanos,
long startTimeNanos, long endTimeNanos) {
String measurementName = op;
if (result == null || !result.isOk()) {
if (this.reportLatencyForEachError ||
this.latencyTrackedErrors.contains(result.getName())) {
measurementName = op + "-" + result.getName();
} else {
measurementName = op + "-FAILED";
}
}
measurements.measure(measurementName,
(int) ((endTimeNanos - startTimeNanos) / 1000));
measurements.measureIntended(measurementName,
(int) ((endTimeNanos - intendedStartTimeNanos) / 1000));
}
/**
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
* record with the specified record key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return The result of the operation.
*/
- public Status update(String table, String key,
- Map values) {
+ public CompletableFuture update(String table, String key,
+ Map values) {
try (final TraceScope span = tracer.newScope(scopeStringUpdate)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- Status res = db.update(table, key, values);
- long en = System.nanoTime();
- measure("UPDATE", res, ist, st, en);
- measurements.reportStatus("UPDATE", res);
- return res;
+
+ CompletableFuture updateFuture = db.update(table, key, values).whenComplete((res, ex) -> {
+ if (ex != null) {
+ System.err.println("UPDATE failed due to exception: " + ex);
+ res = Status.ERROR;
+ }
+ long en = System.nanoTime();
+ measure("UPDATE", res, ist, st, en);
+ measurements.reportStatus("UPDATE", res);
+ });
+
+ queries.add(updateFuture);
+ return updateFuture;
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified
* record key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return The result of the operation.
*/
- public Status insert(String table, String key,
- Map values) {
+ public CompletableFuture insert(String table, String key,
+ Map values) {
try (final TraceScope span = tracer.newScope(scopeStringInsert)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- Status res = db.insert(table, key, values);
- long en = System.nanoTime();
- measure("INSERT", res, ist, st, en);
- measurements.reportStatus("INSERT", res);
- return res;
+
+ CompletableFuture insertFuture = db.insert(table, key, values).whenComplete((res, ex) -> {
+ if (ex != null) {
+ System.err.println("INSERT failed due to exception: " + ex);
+ res = Status.ERROR;
+ }
+ long en = System.nanoTime();
+ measure("INSERT", res, ist, st, en);
+ measurements.reportStatus("INSERT", res);
+ });
+
+ queries.add(insertFuture);
+ return insertFuture;
}
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return The result of the operation.
*/
- public Status delete(String table, String key) {
+ public CompletableFuture delete(String table, String key) {
try (final TraceScope span = tracer.newScope(scopeStringDelete)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- Status res = db.delete(table, key);
- long en = System.nanoTime();
- measure("DELETE", res, ist, st, en);
- measurements.reportStatus("DELETE", res);
- return res;
+
+ CompletableFuture deleteFuture = db.delete(table, key).whenComplete((res, ex) -> {
+ if (ex != null) {
+ System.err.println("DELETE failed due to exception: " + ex);
+ res = Status.ERROR;
+ }
+ long en = System.nanoTime();
+ measure("DELETE", res, ist, st, en);
+ measurements.reportStatus("DELETE", res);
+ });
+
+ queries.add(deleteFuture);
+ return deleteFuture;
}
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/Workload.java b/core/src/main/java/com/yahoo/ycsb/Workload.java
index 11aae672..289c2b23 100644
--- a/core/src/main/java/com/yahoo/ycsb/Workload.java
+++ b/core/src/main/java/com/yahoo/ycsb/Workload.java
@@ -1,122 +1,122 @@
/**
* Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Properties;
/**
* One experiment scenario. One object of this type will
* be instantiated and shared among all client threads. This class
* should be constructed using a no-argument constructor, so we can
* load it dynamically. Any argument-based initialization should be
* done by init().
*
* If you extend this class, you should support the "insertstart" property. This
* allows the Client to proceed from multiple clients on different machines, in case
* the client is the bottleneck. For example, if we want to load 1 million records from
* 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally,
* the "insertcount" property, which is interpreted by Client, can be used to tell each instance of the
* client how many inserts to do. In the example above, both clients should have insertcount=500000.
*/
public abstract class Workload {
public static final String INSERT_START_PROPERTY = "insertstart";
public static final String INSERT_COUNT_PROPERTY = "insertcount";
public static final String INSERT_START_PROPERTY_DEFAULT = "0";
private volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
/** Operations available for a database. */
public enum Operation {
READ,
UPDATE,
INSERT,
SCAN,
DELETE
}
/**
* Initialize the scenario. Create any generators and other shared objects here.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException {
}
/**
* Initialize any state for a particular client thread. Since the scenario object
* will be shared among all threads, this is the place to create any state that is specific
* to one thread. To be clear, this means the returned object should be created anew on each
* call to initThread(); do not return the same object multiple times.
* The returned object will be passed to invocations of doInsert() and doTransaction()
* for this thread. There should be no side effects from this call; all state should be encapsulated
* in the returned object. If you have no state to retain for this thread, return null. (But if you have
* no state to retain for this thread, probably you don't need to override initThread().)
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread.
* Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
* traces from a file, return true when there are more to do, false when you are done.
*/
public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
return null;
}
/**
* Cleanup the scenario. Called once, in the main client thread, after all operations have completed.
*/
public void cleanup() throws WorkloadException {
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*/
- public abstract boolean doInsert(DB db, Object threadstate);
+ public abstract boolean doInsert(AsyncDB db, Object threadstate);
/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread.
* Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
* traces from a file, return true when there are more to do, false when you are done.
*/
- public abstract boolean doTransaction(DB db, Object threadstate);
+ public abstract boolean doTransaction(AsyncDB db, Object threadstate);
/**
* Allows scheduling a request to stop the workload.
*/
public void requestStop() {
stopRequested.set(true);
}
/**
* Check the status of the stop request flag.
* @return true if stop was requested, false otherwise.
*/
public boolean isStopRequested() {
return stopRequested.get();
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
index d3fd84b3..564d3ea3 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
@@ -1,865 +1,913 @@
/**
* Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016-2017 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import com.yahoo.ycsb.*;
import com.yahoo.ycsb.generator.*;
import com.yahoo.ycsb.generator.UniformLongGenerator;
import com.yahoo.ycsb.measurements.Measurements;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The
* relative proportion of different kinds of operations, and other properties of the workload,
* are controlled by parameters specified at runtime.
*
* Properties to control the client:
*
* - fieldcount: the number of fields in a record (default: 10)
*
- fieldlength: the size of each field (default: 100)
*
- minfieldlength: the minimum size of each field (default: 1)
*
- readallfields: should reads read all fields (true) or just one (false) (default: true)
*
- writeallfields: should updates and read/modify/writes update all fields (true) or just
* one (false) (default: false)
*
- readproportion: what proportion of operations should be reads (default: 0.95)
*
- updateproportion: what proportion of operations should be updates (default: 0.05)
*
- insertproportion: what proportion of operations should be inserts (default: 0)
*
- scanproportion: what proportion of operations should be scans (default: 0)
*
- readmodifywriteproportion: what proportion of operations should be read a record,
* modify it, write it back (default: 0)
*
- requestdistribution: what distribution should be used to select the records to operate
* on - uniform, zipfian, hotspot, sequential, exponential or latest (default: uniform)
*
- minscanlength: for scans, what is the minimum number of records to scan (default: 1)
*
- maxscanlength: for scans, what is the maximum number of records to scan (default: 1000)
*
- scanlengthdistribution: for scans, what distribution should be used to choose the
* number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
*
- insertstart: for parallel loads and runs, defines the starting record for this
* YCSB instance (default: 0)
*
- insertcount: for parallel loads and runs, defines the number of records for this
* YCSB instance (default: recordcount)
*
- zeropadding: for generating a record sequence compatible with string sort order by
* 0 padding the record number. Controls the number of 0s to use for padding. (default: 1)
* For example for row 5, with zeropadding=1 you get 'user5' key and with zeropading=8 you get
* 'user00000005' key. In order to see its impact, zeropadding needs to be bigger than number of
* digits in the record number.
*
- insertorder: should records be inserted in order by key ("ordered"), or in hashed
* order ("hashed") (default: hashed)
+ *
- openloop: should requests be made asynchronously to the server (true) or sequentially (false)
+ * (default: false)
*
*/
public class CoreWorkload extends Workload {
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY = "table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT = "usertable";
protected String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10";
private List fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian"
* (favouring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the
* fieldlength property. If "histogram", then the histogram will be read from the filename
* specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY = "fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100";
/**
* The name of the property for the minimum length of a field in bytes.
*/
public static final String MIN_FIELD_LENGTH_PROPERTY = "minfieldlength";
/**
* The default minimum length of a field in bytes.
*/
public static final String MIN_FIELD_LENGTH_PROPERTY_DEFAULT = "1";
/**
* The name of a property that specifies the filename containing the field length histogram (only
* used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that
* start with "FIELD_LENGTH_".
*/
protected NumberGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of
* a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY = "readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true";
protected boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true)
* of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false";
protected boolean writeallfields;
/**
* The name of the property for deciding whether to check all returned
* data against the formation template to ensure data integrity.
*/
public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity";
/**
* The default value for the dataintegrity property.
*/
public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false";
/**
* Set to true if want to check correctness of reads. Must also
* be set to true during loading phase to function.
*/
private boolean dataintegrity;
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY = "readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY = "insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY = "scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are
* "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution";
/**
* The default distribution of requests across the keyspace.
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for adding zero padding to record numbers in order to match
* string sort order. Controls the number of 0s to left pad with.
*/
public static final String ZERO_PADDING_PROPERTY = "zeropadding";
/**
* The default zero padding value. Matches integer sort order
*/
public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1";
/**
* The name of the property for the min scan length (number of records).
*/
public static final String MIN_SCAN_LENGTH_PROPERTY = "minscanlength";
/**
* The default min scan length.
*/
public static final String MIN_SCAN_LENGTH_PROPERTY_DEFAULT = "1";
/**
* The name of the property for the max scan length (number of records).
*/
public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian"
* (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY = "insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed";
/**
* Percentage data items that constitute the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction";
/**
* Default value of the size of the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2";
/**
* Percentage operations that access the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction";
/**
* Default value of the percentage operations accessing the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8";
/**
* How many times to retry when insertion of a single item to a DB fails.
*/
public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit";
public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0";
/**
* On average, how long to wait between the retries, in seconds.
*/
public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval";
public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3";
+ public static final String OPEN_LOOP_PROPERTY = "openloop";
+ public static final String OPEN_LOOP_PROPERTY_DEFAULT = "false";
+
+ protected boolean openloop;
+
protected NumberGenerator keysequence;
protected DiscreteGenerator operationchooser;
protected NumberGenerator keychooser;
protected NumberGenerator fieldchooser;
protected AcknowledgedCounterGenerator transactioninsertkeysequence;
protected NumberGenerator scanlength;
protected boolean orderedinserts;
protected long fieldcount;
protected long recordcount;
protected int zeropadding;
protected int insertionRetryLimit;
protected int insertionRetryInterval;
private Measurements measurements = Measurements.getMeasurements();
protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
NumberGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength =
Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT));
int minfieldlength =
Integer.parseInt(p.getProperty(MIN_FIELD_LENGTH_PROPERTY, MIN_FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(
FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if (fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if (fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformLongGenerator(minfieldlength, fieldlength);
} else if (fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(minfieldlength, fieldlength);
} else if (fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch (IOException e) {
throw new WorkloadException(
"Couldn't read field length histogram file: " + fieldlengthhistogram, e);
}
} else {
throw new WorkloadException(
"Unknown field length distribution \"" + fieldlengthdistribution + "\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
@Override
public void init(Properties p) throws WorkloadException {
table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
fieldcount =
Long.parseLong(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT));
fieldnames = new ArrayList<>();
for (int i = 0; i < fieldcount; i++) {
fieldnames.add("field" + i);
}
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
recordcount =
Long.parseLong(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0) {
recordcount = Integer.MAX_VALUE;
}
String requestdistrib =
p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int minscanlength =
Integer.parseInt(p.getProperty(MIN_SCAN_LENGTH_PROPERTY, MIN_SCAN_LENGTH_PROPERTY_DEFAULT));
int maxscanlength =
Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
long insertstart =
Long.parseLong(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT));
long insertcount=
Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart)));
// Confirm valid values for insertstart and insertcount in relation to recordcount
if (recordcount < (insertstart + insertcount)) {
System.err.println("Invalid combination of insertstart, insertcount and recordcount.");
System.err.println("recordcount must be bigger than insertstart + insertcount.");
System.exit(-1);
}
zeropadding =
Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(
p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields = Boolean.parseBoolean(
p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(
p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
// Confirm that fieldlengthgenerator returns a constant if data
// integrity check requested.
if (dataintegrity && !(p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) {
orderedinserts = false;
} else if (requestdistrib.compareTo("exponential") == 0) {
double percentile = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount * frac);
} else {
orderedinserts = true;
}
keysequence = new CounterGenerator(insertstart);
operationchooser = createOperationGenerator(p);
transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformLongGenerator(insertstart, insertstart + insertcount - 1);
} else if (requestdistrib.compareTo("sequential") == 0) {
keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
// it does this by generating a random "next key" in part by taking the modulus over the
// number of keys.
// If the number of keys changes, this would shift the modulus, and we don't want that to
// change which keys are popular so we'll actually construct the scrambled zipfian generator
// with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict
// the number of inserts, and tell the scrambled zipfian generator the number of existing keys
// plus the number of predicted keys as the total keyspace. then, if the generator picks a key
// that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of
// the keyspace doesn't change from the perspective of the scrambled zipfian generator
final double insertproportion = Double.parseDouble(
p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor
keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys);
} else if (requestdistrib.compareTo("latest") == 0) {
keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1,
hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
fieldchooser = new UniformLongGenerator(0, fieldcount - 1);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformLongGenerator(minscanlength, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(minscanlength, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
insertionRetryLimit = Integer.parseInt(p.getProperty(
INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT));
insertionRetryInterval = Integer.parseInt(p.getProperty(
INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT));
+
+ openloop = Boolean.parseBoolean(
+ p.getProperty(OPEN_LOOP_PROPERTY, OPEN_LOOP_PROPERTY_DEFAULT));
}
protected String buildKeyName(long keynum) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
String value = Long.toString(keynum);
int fill = zeropadding - value.length();
String prekey = "user";
for (int i = 0; i < fill; i++) {
prekey += '0';
}
return prekey + value;
}
/**
* Builds a value for a randomly chosen field.
*/
private HashMap buildSingleValue(String key) {
HashMap value = new HashMap<>();
String fieldkey = fieldnames.get(fieldchooser.nextValue().intValue());
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue());
}
value.put(fieldkey, data);
return value;
}
/**
* Builds values for all fields.
*/
private HashMap buildValues(String key) {
HashMap values = new HashMap<>();
for (String fieldkey : fieldnames) {
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue());
}
values.put(fieldkey, data);
}
return values;
}
/**
* Build a deterministic value given the key information.
*/
private String buildDeterministicValue(String key, String fieldkey) {
int size = fieldlengthgenerator.nextValue().intValue();
StringBuilder sb = new StringBuilder(size);
sb.append(key);
sb.append(':');
sb.append(fieldkey);
while (sb.length() < size) {
sb.append(':');
sb.append(sb.toString().hashCode());
}
sb.setLength(size);
return sb.toString();
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads,
* this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
@Override
- public boolean doInsert(DB db, Object threadstate) {
+ public boolean doInsert(AsyncDB db, Object threadstate) {
int keynum = keysequence.nextValue().intValue();
String dbkey = buildKeyName(keynum);
HashMap values = buildValues(dbkey);
Status status;
int numOfRetries = 0;
do {
- status = db.insert(table, dbkey, values);
+ status = db.syncView().insert(table, dbkey, values);
if (null != status && status.isOk()) {
break;
}
// Retry if configured. Without retrying, the load process will fail
// even if one single insertion fails. User can optionally configure
// an insertion retry limit (default is 0) to enable retry.
if (++numOfRetries <= insertionRetryLimit) {
System.err.println("Retrying insertion, retry count: " + numOfRetries);
try {
// Sleep for a random number between [0.8, 1.2)*insertionRetryInterval.
int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random()));
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
break;
}
} else {
System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries +
"Insertion Retry Limit: " + insertionRetryLimit);
break;
}
} while (true);
return null != status && status.isOk();
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client
* threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
@Override
- public boolean doTransaction(DB db, Object threadstate) {
+ public boolean doTransaction(AsyncDB db, Object threadstate) {
String operation = operationchooser.nextString();
if(operation == null) {
return false;
}
switch (operation) {
case "READ":
doTransactionRead(db);
break;
case "UPDATE":
doTransactionUpdate(db);
break;
case "INSERT":
doTransactionInsert(db);
break;
case "SCAN":
doTransactionScan(db);
break;
default:
doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Results are reported in the first three buckets of the histogram under
* the label "VERIFY".
* Bucket 0 means the expected data was returned.
* Bucket 1 means incorrect data was returned.
* Bucket 2 means null data was returned when some data was expected.
*/
protected void verifyRow(String key, HashMap cells) {
Status verifyStatus = Status.OK;
long startTime = System.nanoTime();
if (!cells.isEmpty()) {
for (Map.Entry entry : cells.entrySet()) {
if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) {
verifyStatus = Status.UNEXPECTED_STATE;
break;
}
}
} else {
// This assumes that null data is never valid
verifyStatus = Status.ERROR;
}
long endTime = System.nanoTime();
measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
measurements.reportStatus("VERIFY", verifyStatus);
}
long nextKeynum() {
long keynum;
if (keychooser instanceof ExponentialGenerator) {
do {
keynum = transactioninsertkeysequence.lastValue() - keychooser.nextValue().intValue();
} while (keynum < 0);
} else {
do {
keynum = keychooser.nextValue().intValue();
} while (keynum > transactioninsertkeysequence.lastValue());
}
return keynum;
}
- public void doTransactionRead(DB db) {
+ public void doTransactionRead(AsyncDB db) {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
} else if (dataintegrity) {
// pass the full field list if dataintegrity is on for verification
fields = new HashSet(fieldnames);
}
HashMap cells = new HashMap();
- db.read(table, keyname, fields, cells);
+ CompletableFuture readFuture = db.read(table, keyname, fields, cells).thenAccept((res) -> {
+ if (res.isOk() && dataintegrity) {
+ verifyRow(keyname, cells);
+ }
+ });
- if (dataintegrity) {
- verifyRow(keyname, cells);
+ if (!openloop) {
+ readFuture.join();
}
}
- public void doTransactionReadModifyWrite(DB db) {
+ public void doTransactionReadModifyWrite(AsyncDB db) {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
}
HashMap values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
// do the transaction
HashMap cells = new HashMap();
- long ist = measurements.getIntendedtartTimeNs();
+ long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
- db.read(table, keyname, fields, cells);
-
- db.update(table, keyname, values);
- long en = System.nanoTime();
+ String opName = "READ-MODIFY-WRITE";
+ String opReadFailureName = "READ-MODIFY-WRITE-READ_FAILED";
+ String opWriteFailureName = "READ-MODIFY-WRITE-WRITE_FAILED";
+
+ CompletableFuture rFuture = db.read(table, keyname, fields, cells).thenAccept((rres) -> {
+ if (!rres.isOk()) {
+ long en = System.nanoTime();
+ measurements.measure(opReadFailureName, (int) ((en - st) / 1000));
+ measurements.measureIntended(opReadFailureName, (int) ((en - ist) / 1000));
+ } else {
+ CompletableFuture wFuture = db.update(table, keyname, values).thenAccept((wres) -> {
+ long en = System.nanoTime();
+
+ if (wres.isOk() && dataintegrity) {
+ verifyRow(keyname, cells);
+ }
+
+ measurements.measure(wres.isOk() ? opName : opWriteFailureName, (int) ((en - st) / 1000));
+ measurements.measureIntended(wres.isOk() ? opName : opWriteFailureName, (int) ((en - ist) / 1000));
+ });
+
+ if (!openloop) {
+ wFuture.join();
+ }
+ }
+ });
- if (dataintegrity) {
- verifyRow(keyname, cells);
+ if (!openloop) {
+ rFuture.join();
}
- measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000));
- measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000));
}
- public void doTransactionScan(DB db) {
+ public void doTransactionScan(AsyncDB db) {
// choose a random key
long keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
// choose a random scan length
int len = scanlength.nextValue().intValue();
HashSet fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(fieldchooser.nextValue().intValue());
fields = new HashSet();
fields.add(fieldname);
}
- db.scan(table, startkeyname, len, fields, new Vector>());
+ CompletableFuture scanFuture = db.scan(table, startkeyname, len, fields,
+ new Vector>());
+
+ if (!openloop) {
+ scanFuture.join();
+ }
}
- public void doTransactionUpdate(DB db) {
+ public void doTransactionUpdate(AsyncDB db) {
// choose a random key
long keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashMap values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
- db.update(table, keyname, values);
+ CompletableFuture updateFuture = db.update(table, keyname, values);
+
+ if (!openloop) {
+ updateFuture.join();
+ }
}
- public void doTransactionInsert(DB db) {
+ public void doTransactionInsert(AsyncDB db) {
// choose the next key
long keynum = transactioninsertkeysequence.nextValue();
- try {
- String dbkey = buildKeyName(keynum);
+ String dbkey = buildKeyName(keynum);
+
+ HashMap values = buildValues(dbkey);
+ CompletableFuture insertFuture = db.insert(table, dbkey, values).thenAccept((res) -> {
+ if (res.isOk()) {
+ transactioninsertkeysequence.acknowledge(keynum);
+ }
+ });
- HashMap values = buildValues(dbkey);
- db.insert(table, dbkey, values);
- } finally {
- transactioninsertkeysequence.acknowledge(keynum);
+ if (!openloop) {
+ insertFuture.join();
}
}
/**
* Creates a weighted discrete values with database operations for a workload to perform.
* Weights/proportions are read from the properties list and defaults are used
* when values are not configured.
* Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE".
*
* @param p The properties list to pull weights from.
* @return A generator that can be used to determine the next operation to perform.
* @throws IllegalArgumentException if the properties object was null.
*/
protected static DiscreteGenerator createOperationGenerator(final Properties p) {
if (p == null) {
throw new IllegalArgumentException("Properties object cannot be null");
}
final double readproportion = Double.parseDouble(
p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT));
final double updateproportion = Double.parseDouble(
p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT));
final double insertproportion = Double.parseDouble(
p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
final double scanproportion = Double.parseDouble(
p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT));
final double readmodifywriteproportion = Double.parseDouble(p.getProperty(
READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
final DiscreteGenerator operationchooser = new DiscreteGenerator();
if (readproportion > 0) {
operationchooser.addValue(readproportion, "READ");
}
if (updateproportion > 0) {
operationchooser.addValue(updateproportion, "UPDATE");
}
if (insertproportion > 0) {
operationchooser.addValue(insertproportion, "INSERT");
}
if (scanproportion > 0) {
operationchooser.addValue(scanproportion, "SCAN");
}
if (readmodifywriteproportion > 0) {
operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE");
}
return operationchooser;
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java
index e215ef16..4ed9423e 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java
@@ -1,306 +1,306 @@
/**
* Copyright (c) 2016-2017 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
+import com.yahoo.ycsb.AsyncDB;
import com.yahoo.ycsb.ByteIterator;
-import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.RandomByteIterator;
import com.yahoo.ycsb.WorkloadException;
import com.yahoo.ycsb.generator.*;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import com.yahoo.ycsb.generator.UniformLongGenerator;
/**
* Typical RESTFul services benchmarking scenario. Represents a set of client
* calling REST operations like HTTP DELETE, GET, POST, PUT on a web service.
* This scenario is completely different from CoreWorkload which is mainly
* designed for databases benchmarking. However due to some reusable
* functionality this class extends {@link CoreWorkload} and overrides necessary
* methods like init, doTransaction etc.
*/
public class RestWorkload extends CoreWorkload {
/**
* The name of the property for the proportion of transactions that are
* delete.
*/
public static final String DELETE_PROPORTION_PROPERTY = "deleteproportion";
/**
* The default proportion of transactions that are delete.
*/
public static final String DELETE_PROPORTION_PROPERTY_DEFAULT = "0.00";
/**
* The name of the property for the file that holds the field length size for insert operations.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_FILE_PROPERTY = "fieldlengthdistfile";
/**
* The default file name that holds the field length size for insert operations.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_FILE_PROPERTY_DEFAULT = "fieldLengthDistFile.txt";
/**
* In web services even though the CRUD operations follow the same request
* distribution, they have different traces and distribution parameter
* values. Hence configuring the parameters of these operations separately
* makes the benchmark more flexible and capable of generating better
* realistic workloads.
*/
// Read related properties.
private static final String READ_TRACE_FILE = "url.trace.read";
private static final String READ_TRACE_FILE_DEFAULT = "readtrace.txt";
private static final String READ_ZIPFIAN_CONSTANT = "readzipfconstant";
private static final String READ_ZIPFIAN_CONSTANT_DEAFULT = "0.99";
private static final String READ_RECORD_COUNT_PROPERTY = "readrecordcount";
// Insert related properties.
private static final String INSERT_TRACE_FILE = "url.trace.insert";
private static final String INSERT_TRACE_FILE_DEFAULT = "inserttrace.txt";
private static final String INSERT_ZIPFIAN_CONSTANT = "insertzipfconstant";
private static final String INSERT_ZIPFIAN_CONSTANT_DEAFULT = "0.99";
private static final String INSERT_SIZE_ZIPFIAN_CONSTANT = "insertsizezipfconstant";
private static final String INSERT_SIZE_ZIPFIAN_CONSTANT_DEAFULT = "0.99";
private static final String INSERT_RECORD_COUNT_PROPERTY = "insertrecordcount";
// Delete related properties.
private static final String DELETE_TRACE_FILE = "url.trace.delete";
private static final String DELETE_TRACE_FILE_DEFAULT = "deletetrace.txt";
private static final String DELETE_ZIPFIAN_CONSTANT = "deletezipfconstant";
private static final String DELETE_ZIPFIAN_CONSTANT_DEAFULT = "0.99";
private static final String DELETE_RECORD_COUNT_PROPERTY = "deleterecordcount";
// Delete related properties.
private static final String UPDATE_TRACE_FILE = "url.trace.update";
private static final String UPDATE_TRACE_FILE_DEFAULT = "updatetrace.txt";
private static final String UPDATE_ZIPFIAN_CONSTANT = "updatezipfconstant";
private static final String UPDATE_ZIPFIAN_CONSTANT_DEAFULT = "0.99";
private static final String UPDATE_RECORD_COUNT_PROPERTY = "updaterecordcount";
private Map readUrlMap;
private Map insertUrlMap;
private Map deleteUrlMap;
private Map updateUrlMap;
private int readRecordCount;
private int insertRecordCount;
private int deleteRecordCount;
private int updateRecordCount;
private NumberGenerator readKeyChooser;
private NumberGenerator insertKeyChooser;
private NumberGenerator deleteKeyChooser;
private NumberGenerator updateKeyChooser;
private NumberGenerator fieldlengthgenerator;
private DiscreteGenerator operationchooser;
@Override
public void init(Properties p) throws WorkloadException {
readRecordCount = Integer.parseInt(p.getProperty(READ_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE)));
insertRecordCount = Integer
.parseInt(p.getProperty(INSERT_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE)));
deleteRecordCount = Integer
.parseInt(p.getProperty(DELETE_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE)));
updateRecordCount = Integer
.parseInt(p.getProperty(UPDATE_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE)));
readUrlMap = getTrace(p.getProperty(READ_TRACE_FILE, READ_TRACE_FILE_DEFAULT), readRecordCount);
insertUrlMap = getTrace(p.getProperty(INSERT_TRACE_FILE, INSERT_TRACE_FILE_DEFAULT), insertRecordCount);
deleteUrlMap = getTrace(p.getProperty(DELETE_TRACE_FILE, DELETE_TRACE_FILE_DEFAULT), deleteRecordCount);
updateUrlMap = getTrace(p.getProperty(UPDATE_TRACE_FILE, UPDATE_TRACE_FILE_DEFAULT), updateRecordCount);
operationchooser = createOperationGenerator(p);
// Common distribution for all operations.
String requestDistrib = p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
double readZipfconstant = Double.parseDouble(p.getProperty(READ_ZIPFIAN_CONSTANT, READ_ZIPFIAN_CONSTANT_DEAFULT));
readKeyChooser = getKeyChooser(requestDistrib, readUrlMap.size(), readZipfconstant, p);
double updateZipfconstant = Double
.parseDouble(p.getProperty(UPDATE_ZIPFIAN_CONSTANT, UPDATE_ZIPFIAN_CONSTANT_DEAFULT));
updateKeyChooser = getKeyChooser(requestDistrib, updateUrlMap.size(), updateZipfconstant, p);
double insertZipfconstant = Double
.parseDouble(p.getProperty(INSERT_ZIPFIAN_CONSTANT, INSERT_ZIPFIAN_CONSTANT_DEAFULT));
insertKeyChooser = getKeyChooser(requestDistrib, insertUrlMap.size(), insertZipfconstant, p);
double deleteZipfconstant = Double
.parseDouble(p.getProperty(DELETE_ZIPFIAN_CONSTANT, DELETE_ZIPFIAN_CONSTANT_DEAFULT));
deleteKeyChooser = getKeyChooser(requestDistrib, deleteUrlMap.size(), deleteZipfconstant, p);
fieldlengthgenerator = getFieldLengthGenerator(p);
}
public static DiscreteGenerator createOperationGenerator(final Properties p) {
// Re-using CoreWorkload method.
final DiscreteGenerator operationChooser = CoreWorkload.createOperationGenerator(p);
// Needs special handling for delete operations not supported in CoreWorkload.
double deleteproportion = Double
.parseDouble(p.getProperty(DELETE_PROPORTION_PROPERTY, DELETE_PROPORTION_PROPERTY_DEFAULT));
if (deleteproportion > 0) {
operationChooser.addValue(deleteproportion, "DELETE");
}
return operationChooser;
}
private static NumberGenerator getKeyChooser(String requestDistrib, int recordCount, double zipfContant,
Properties p) throws WorkloadException {
NumberGenerator keychooser;
switch (requestDistrib) {
case "exponential":
double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordCount * frac);
break;
case "uniform":
keychooser = new UniformLongGenerator(0, recordCount - 1);
break;
case "zipfian":
keychooser = new ZipfianGenerator(recordCount, zipfContant);
break;
case "latest":
throw new WorkloadException("Latest request distribution is not supported for RestWorkload.");
case "hotspot":
double hotsetfraction = Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordCount - 1, hotsetfraction, hotopnfraction);
break;
default:
throw new WorkloadException("Unknown request distribution \"" + requestDistrib + "\"");
}
return keychooser;
}
protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
// Re-using CoreWorkload method.
NumberGenerator fieldLengthGenerator = CoreWorkload.getFieldLengthGenerator(p);
String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
// Needs special handling for Zipfian distribution for variable Zipf Constant.
if (fieldlengthdistribution.compareTo("zipfian") == 0) {
int fieldlength = Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT));
double insertsizezipfconstant = Double
.parseDouble(p.getProperty(INSERT_SIZE_ZIPFIAN_CONSTANT, INSERT_SIZE_ZIPFIAN_CONSTANT_DEAFULT));
fieldLengthGenerator = new ZipfianGenerator(1, fieldlength, insertsizezipfconstant);
}
return fieldLengthGenerator;
}
/**
* Reads the trace file and returns a URL map.
*/
private static Map getTrace(String filePath, int recordCount)
throws WorkloadException {
Map urlMap = new HashMap();
int count = 0;
String line;
try {
FileReader inputFile = new FileReader(filePath);
BufferedReader bufferReader = new BufferedReader(inputFile);
while ((line = bufferReader.readLine()) != null) {
urlMap.put(count++, line.trim());
if (count >= recordCount) {
break;
}
}
bufferReader.close();
} catch (IOException e) {
throw new WorkloadException(
"Error while reading the trace. Please make sure the trace file path is correct. "
+ e.getLocalizedMessage());
}
return urlMap;
}
/**
* Not required for Rest Clients as data population is service specific.
*/
@Override
- public boolean doInsert(DB db, Object threadstate) {
+ public boolean doInsert(AsyncDB db, Object threadstate) {
return false;
}
@Override
- public boolean doTransaction(DB db, Object threadstate) {
+ public boolean doTransaction(AsyncDB db, Object threadstate) {
String operation = operationchooser.nextString();
if (operation == null) {
return false;
}
switch (operation) {
case "UPDATE":
doTransactionUpdate(db);
break;
case "INSERT":
doTransactionInsert(db);
break;
case "DELETE":
doTransactionDelete(db);
break;
default:
doTransactionRead(db);
}
return true;
}
/**
* Returns next URL to be called.
*/
private String getNextURL(int opType) {
if (opType == 1) {
return readUrlMap.get(readKeyChooser.nextValue().intValue());
} else if (opType == 2) {
return insertUrlMap.get(insertKeyChooser.nextValue().intValue());
} else if (opType == 3) {
return deleteUrlMap.get(deleteKeyChooser.nextValue().intValue());
} else {
return updateUrlMap.get(updateKeyChooser.nextValue().intValue());
}
}
@Override
- public void doTransactionRead(DB db) {
+ public void doTransactionRead(AsyncDB db) {
HashMap result = new HashMap();
db.read(null, getNextURL(1), null, result);
}
@Override
- public void doTransactionInsert(DB db) {
+ public void doTransactionInsert(AsyncDB db) {
HashMap value = new HashMap();
// Create random bytes of insert data with a specific size.
value.put("data", new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()));
db.insert(null, getNextURL(2), value);
}
- public void doTransactionDelete(DB db) {
+ public void doTransactionDelete(AsyncDB db) {
db.delete(null, getNextURL(3));
}
@Override
- public void doTransactionUpdate(DB db) {
+ public void doTransactionUpdate(AsyncDB db) {
HashMap value = new HashMap();
// Create random bytes of update data with a specific size.
value.put("data", new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()));
db.update(null, getNextURL(4), value);
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
index be97b206..b81e966c 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
@@ -1,1286 +1,1286 @@
/**
* Copyright (c) 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import com.yahoo.ycsb.AsyncDB;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Client;
-import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.NumericByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import com.yahoo.ycsb.Workload;
import com.yahoo.ycsb.WorkloadException;
import com.yahoo.ycsb.generator.DiscreteGenerator;
import com.yahoo.ycsb.generator.Generator;
import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;
import com.yahoo.ycsb.generator.SequentialGenerator;
import com.yahoo.ycsb.generator.UniformLongGenerator;
import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator;
import com.yahoo.ycsb.generator.ZipfianGenerator;
import com.yahoo.ycsb.measurements.Measurements;
/**
* A specialized workload dealing with time series data, i.e. series of discreet
* events associated with timestamps and identifiers. For this workload, identities
* consist of a {@link String} key and a set of {@link String} tag key/value
* pairs.
*
* For example:
*
* Time Series Key | Tag Keys/Values | 1483228800 | 1483228860 | 1483228920 |
* AA | AA=AA, AB=AA | 42.5 | 1.0 | 85.9 |
* AA | AA=AA, AB=AB | -9.4 | 76.9 | 0.18 |
* AB | AA=AA, AB=AA | -93.0 | 57.1 | -63.8 |
* AB | AA=AA, AB=AB | 7.6 | 56.1 | -0.3 |
*
*
* This table shows four time series with 3 measurements at three different timestamps.
* Keys, tags, timestamps and values (numeric only at this time) are generated by
* this workload. For details on properties and behavior, see the
* {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation
* and how {@link DB} clients can parse the workload.
*
* In order to avoid having existing DB implementations implement a brand new interface
* this workload uses the existing APIs to encode a few special values that can be parsed
* by the client. The special values include the timestamp, numeric value and some
* query (read or scan) parameters. As an example on how to parse the fields, see
* {@link BasicTSDB}.
*
* Timestamps
*
* Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS},
* {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the
* {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)}
* and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the
* {@code values} map encoded in a {@link NumericByteIterator} with the key defined
* in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp
* when iterating over the values map, cast the {@link ByteIterator} to a
* {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}.
*
* Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps
* earlier than the timestamp generator's timestamp will be choosen at random to
* mimic a lambda architecture or old job re-reporting some data.
*
* For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and
* {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps
* are encoded in a {@link StringByteIterator} in a key/value format with the
* {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan}
* has been set to a positive value then the value will include a range with the
* starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator
* and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}.
*
* For calls to {@link DB#delete(String, String)}, encoding is the same as reads and
* scans but key/value pairs are separated by the {@code deletedelimiter} property value.
*
* By default, the starting timestamp is the current system time without any rounding.
* All timestamps are then offsets from that starting value.
*
* Values
*
* Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored
* in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV").
* Values can either be 64 bit signed {@link long}s or double precision {@link double}s
* depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing
* out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine
* whether or not to call {@link NumericByteIterator#getDouble()} (true) or
* {@link NumericByteIterator#getLong()} (false).
*
* When {@code dataintegrity} is set to true, then the value is always set to a
* 64 bit signed integer which is the Java hash code of the concatenation of the
* key and map of values (sorted on the map keys and skipping the timestamp and value
* entries) OR'd with the timestamp of the data point. See
* {@link #validationFunction(String, long, TreeMap)} for the implementation.
*
* Keys and Tags
*
* As mentioned, the workload generates strings for the keys and tags. On initialization
* three string generators are created using the {@link IncrementingPrintableStringGenerator}
* implementation. Then the generators fill three arrays with values based on the
* number of keys, the number of tags and the cardinality of each tag key/value pair.
* This implementation gives us time series like the example table where every string
* starts at something like "AA" (depending on the length of keys, tag keys and tag values)
* and continuing to "ZZ" wherein they rollover back to "AA".
*
* Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear
* more than once per time series. If the workload is configured for four tags with a
* tag key length of 2, the keys would be "AA", "AB", "AC" and "AD".
*
* Each tag key is then associated with a tag value. Tag values may appear more than once
* in each time series. E.g. time series will usually start with the tags "AA=AA",
* "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many
* unique values will be generated per tag key. In the example table above, the
* {@code tagcardinality} property would have been set to {@code 1,2} meaning tag
* key "AA" would always have the tag value "AA" given a cardinality of 1. However
* tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This
* cardinality map, along with the number of unique time series keys determines how
* many unique time series are generated for the workload. Tag values share a common
* array of generated strings to save on memory.
*
* Operation Order
*
* The default behavior of the workload (for inserts and updates) is to generate a
* value for each time series for a given timestamp before incrementing to the next
* timestamp and writing values. This is an ideal workload and some time series
* databases are designed for this behavior. However in the real-world events will
* arrive grouped close to the current system time with a number of events being
* delayed, hence their timestamps are further in the past. The {@code delayedseries}
* property determines the percentage of time series that are delayed by up to
* {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that
* 5% of the time series will be written with timestamps earlier than the timestamp
* generator's current time.
*
* Reads and Scans
*
* For benchmarking queries, some common tasks implemented by almost every time series
* data base are available and are passed in the fields {@link Set}:
*
* GroupBy - A common operation is to aggregate multiple time series into a
* single time series via common parameters. For example, a user may want to see the
* total network traffic in a data center so they'll issue a SQL query like:
* SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value);
* If the {@code groupbyfunction} has been set to a group by function, then the fields
* will contain a key/value pair with the key set in {@code groupbykey}. E.g.
* {@code YCSBGB=SUM}.
*
* Additionally with grouping enabled, fields on tag keys where group bys should
* occur will only have the key defined and will not have a value or delimiter. E.g.
* if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}.
*
* Downsampling - Another common operation is to reduce the resolution of the
* queried time series when fetching a wide time range of data so fewer data points
* are returned. For example, a user may fetch a week of data but if the data is
* recorded on a 1 second interval, that would be over 600k data points so they
* may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all
* of the data points for a "bucket" are aggregated into a single value.
*
* To enable downsampling, the {@code downsamplingfunction} property must be set to
* a supported function such as "SUM" and the {@code downsamplinginterval} must be
* set to a valid time interval with the same units as {@code timestampunits}, e.g.
* "3600" which would create 1 hour buckets if the time units were set to seconds.
* With downsampling, query fields will include a key/value pair with
* {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being
* a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval},
* for example {@code YCSBDS=SUM60}.
*
* Timestamps - For every read, a random timestamp is selected from the interval
* set. If {@code querytimespan} has been set to a positive value, then the configured
* query time interval is added to the selected timestamp so the read passes the DB
* a range of times. Note that during the run phase, if no data was previously loaded,
* or if there are more {@code recordcount}s set for the run phase, reads may be sent
* to the DB with timestamps that are beyond the written data time range (or even the
* system clock of the DB).
*
* Deletes
*
* Because the delete API only accepts a single key, a full key and tag key/value
* pair map is flattened into a single string for parsing by the database. Common
* workloads include deleting a single time series (wherein all tag key and values are
* defined), deleting all series containing a tag key and value or deleting all of the
* time series sharing a common time series key.
*
* Right now the workload supports deletes with a key and for time series tag key/value
* pairs or a key with tags and a group by on one or more tags (meaning, delete all of
* the series with any value for the given tag key). The parameters are collapsed into
* a single string delimited with the character in the {@code deletedelimiter} property.
* For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the
* first time series in the table above.
*
* Threads
*
* For a multi-threaded execution, the number of time series keys set via the
* {@code fieldcount} property, must be greater than or equal to the number of
* threads set via {@code threads}. This is due to each thread choosing a subset
* of the total number of time series keys and being responsible for writing values
* for each time series containing those keys at each timestamp. Thus each thread
* will have it's own timestamp generator, incrementing each time every time series
* it is responsible for has had a value written.
*
* Each thread may, however, issue reads and scans for any time series in the
* complete set.
*
* Sparsity
*
* By default, during loads, every time series will have a data point written at every
* time stamp in the interval set. This is common in workloads where a sensor writes
* a value at regular intervals. However some time series are only reported under
* certain conditions.
*
* For example, a counter may track the number of errors over a
* time period for a web service and only report when the value is greater than 1.
* Or a time series may include tags such as a user ID and IP address when a request
* arrives at the web service and only report values when that combination is seen.
* This means the timeseries will not have a value at every timestamp and in
* some cases there may be only a single value!
*
* This workload has a {@code sparsity} parameter that can choose how often a
* time series should record a value. The default value of 0.0 means every series
* will get a value at every timestamp. A value of 0.95 will mean that for each
* series, only 5% of the timestamps in the interval will have a value. The distribution
* of values is random.
*
* Notes/Warnings
*
*
* - Because time series keys and tag key/values are generated and stored in memory,
* be careful of setting the cardinality too high for the JVM's heap.
* - When running for data integrity, a number of settings are incompatible and will
* throw errors. Check the error messages for details.
* - Databases that support keys only and can't store tags should order and then
* collapse the tag values using a delimiter. For example the series in the example
* table at the top could be written as:
*
* - {@code AA.AA.AA}
* - {@code AA.AA.AB}
* - {@code AB.AA.AA}
* - {@code AB.AA.AB}
*
*
*
* TODOs
*
*
* - Support random time intervals. E.g. some series write every second, others every
* 60 seconds.
* - Support random time series cardinality. Right now every series has the same
* cardinality.
* - Truly random timetamps per time series. We could use bitmaps to determine if
* a series has had a value written for a given timestamp. Right now all of the series
* are in sync time-wise.
* - Possibly a real-time load where values are written with the current system time.
* It's more of a bulk-loading operation now.
*
*/
public class TimeSeriesWorkload extends Workload {
/**
* The types of values written to the timeseries store.
*/
public enum ValueType {
INTEGERS("integers"),
FLOATS("floats"),
MIXED("mixednumbers");
protected final String name;
ValueType(final String name) {
this.name = name;
}
public static ValueType fromString(final String name) {
for (final ValueType type : ValueType.values()) {
if (type.name.equalsIgnoreCase(name)) {
return type;
}
}
throw new IllegalArgumentException("Unrecognized type: " + name);
}
}
/** Name and default value for the timestamp key property. */
public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey";
public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS";
/** Name and default value for the value key property. */
public static final String VALUE_KEY_PROPERTY = "valuekey";
public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV";
/** Name and default value for the timestamp interval property. */
public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval";
public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60";
/** Name and default value for the timestamp units property. */
public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits";
public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS";
/** Name and default value for the number of tags property. */
public static final String TAG_COUNT_PROPERTY = "tagcount";
public static final String TAG_COUNT_PROPERTY_DEFAULT = "4";
/** Name and default value for the tag value cardinality map property. */
public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality";
public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8";
/** Name and default value for the tag key length property. */
public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength";
public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8";
/** Name and default value for the tag value length property. */
public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength";
public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8";
/** Name and default value for the tag pair delimiter property. */
public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter";
public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "=";
/** Name and default value for the delete string delimiter property. */
public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter";
public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":";
/** Name and default value for the random timestamp write order property. */
public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder";
public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false";
/** Name and default value for the random time series write order property. */
public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder";
public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true";
/** Name and default value for the value types property. */
public static final String VALUE_TYPE_PROPERTY = "valuetype";
public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats";
/** Name and default value for the sparsity property. */
public static final String SPARSITY_PROPERTY = "sparsity";
public static final String SPARSITY_PROPERTY_DEFAULT = "0.00";
/** Name and default value for the delayed series percentage property. */
public static final String DELAYED_SERIES_PROPERTY = "delayedseries";
public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10";
/** Name and default value for the delayed series intervals property. */
public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals";
public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5";
/** Name and default value for the query time span property. */
public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan";
public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0";
/** Name and default value for the randomized query time span property. */
public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan";
public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false";
/** Name and default value for the query time stamp delimiter property. */
public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter";
public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ",";
/** Name and default value for the group-by key property. */
public static final String GROUPBY_KEY_PROPERTY = "groupbykey";
public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB";
/** Name and default value for the group-by function property. */
public static final String GROUPBY_PROPERTY = "groupbyfunction";
/** Name and default value for the group-by key map property. */
public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys";
/** Name and default value for the downsampling key property. */
public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey";
public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS";
/** Name and default value for the downsampling function property. */
public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction";
/** Name and default value for the downsampling interval property. */
public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval";
/** The properties to pull settings from. */
protected Properties properties;
/** Generators for keys, tag keys and tag values. */
protected Generator keyGenerator;
protected Generator tagKeyGenerator;
protected Generator tagValueGenerator;
/** The timestamp key, defaults to "YCSBTS". */
protected String timestampKey;
/** The value key, defaults to "YCSBDS". */
protected String valueKey;
/** The number of time units in between timestamps. */
protected int timestampInterval;
/** The units of time the timestamp and various intervals represent. */
protected TimeUnit timeUnits;
/** Whether or not to randomize the timestamp order when writing. */
protected boolean randomizeTimestampOrder;
/** Whether or not to randomize (shuffle) the time series order. NOT compatible
* with data integrity. */
protected boolean randomizeTimeseriesOrder;
/** The type of values to generate when writing data. */
protected ValueType valueType;
/** Used to calculate an offset for each time series. */
protected int[] cumulativeCardinality;
/** The calculated total cardinality based on the config. */
protected int totalCardinality;
/** The calculated per-time-series-key cardinality. I.e. the number of unique
* tag key and value combinations. */
protected int perKeyCardinality;
/** How much data to scan for in each call. */
protected NumberGenerator scanlength;
/** A generator used to select a random time series key per read/scan. */
protected NumberGenerator keychooser;
/** A generator to select what operation to perform during the run phase. */
protected DiscreteGenerator operationchooser;
/** The maximum number of interval offsets from the starting timestamp. Calculated
* based on the number of records configured for the run. */
protected int maxOffsets;
/** The number of records or operations to perform for this run. */
protected int recordcount;
/** The number of tag pairs per time series. */
protected int tagPairs;
/** The table we'll write to. */
protected String table;
/** How many time series keys will be generated. */
protected int numKeys;
/** The generated list of possible time series key values. */
protected String[] keys;
/** The generated list of possible tag key values. */
protected String[] tagKeys;
/** The generated list of possible tag value values. */
protected String[] tagValues;
/** The cardinality for each tag key. */
protected int[] tagCardinality;
/** A helper to skip non-incrementing tag values. */
protected int firstIncrementableCardinality;
/** How sparse the data written should be. */
protected double sparsity;
/** The percentage of time series that should be delayed in writes. */
protected double delayedSeries;
/** The maximum number of intervals to delay a series. */
protected int delayedIntervals;
/** Optional query time interval during reads/scans. */
protected int queryTimeSpan;
/** Whether or not the actual interval should be randomly chosen, using
* queryTimeSpan as the maximum value. */
protected boolean queryRandomTimeSpan;
/** The delimiter for tag pairs in fields. */
protected String tagPairDelimiter;
/** The delimiter between parameters for the delete key. */
protected String deleteDelimiter;
/** The delimiter between timestamps for query time spans. */
protected String queryTimeSpanDelimiter;
/** Whether or not to issue group-by queries. */
protected boolean groupBy;
/** The key used for group-by tag keys. */
protected String groupByKey;
/** The function used for group-by's. */
protected String groupByFunction;
/** The tag keys to group on. */
protected boolean[] groupBys;
/** Whether or not to issue downsampling queries. */
protected boolean downsample;
/** The key used for downsampling tag keys. */
protected String downsampleKey;
/** The downsampling function. */
protected String downsampleFunction;
/** The downsampling interval. */
protected int downsampleInterval;
/**
* Set to true if want to check correctness of reads. Must also
* be set to true during loading phase to function.
*/
protected boolean dataintegrity;
/** Measurements to write data integrity results to. */
protected Measurements measurements = Measurements.getMeasurements();
@Override
public void init(final Properties p) throws WorkloadException {
properties = p;
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY,
Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0) {
recordcount = Integer.MAX_VALUE;
}
timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT);
valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT);
operationchooser = CoreWorkload.createOperationGenerator(properties);
final int maxscanlength =
Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY,
CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY,
CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformLongGenerator(1, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(1, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty(
RANDOMIZE_TIMESTAMP_ORDER_PROPERTY,
RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT));
randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty(
RANDOMIZE_TIMESERIES_ORDER_PROPERTY,
RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT));
// setup the cardinality
numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY,
CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY,
TAG_COUNT_PROPERTY_DEFAULT));
sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT));
tagCardinality = new int[tagPairs];
final String requestdistrib =
p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY,
CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformLongGenerator(0, numKeys - 1);
} else if (requestdistrib.compareTo("sequential") == 0) {
keychooser = new SequentialGenerator(0, numKeys - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
keychooser = new ScrambledZipfianGenerator(0, numKeys - 1);
//} else if (requestdistrib.compareTo("latest") == 0) {
// keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION,
CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION,
CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, numKeys - 1,
hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
// figure out the start timestamp based on the units, cardinality and interval
try {
timestampInterval = Integer.parseInt(p.getProperty(
TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT));
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse the " +
TIMESTAMP_INTERVAL_PROPERTY, nfe);
}
try {
timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY,
TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase());
} catch (IllegalArgumentException e) {
throw new WorkloadException("Unknown time unit type", e);
}
if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) {
throw new WorkloadException("YCSB doesn't support " + timeUnits +
" at this time.");
}
tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT);
deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT);
dataintegrity = Boolean.parseBoolean(
p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY,
CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT));
queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY,
QUERY_TIMESPAN_PROPERTY_DEFAULT));
queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY,
QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT));
queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY,
QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT);
groupByFunction = p.getProperty(GROUPBY_PROPERTY);
if (groupByFunction != null && !groupByFunction.isEmpty()) {
final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY);
if (groupByKeys == null || groupByKeys.isEmpty()) {
throw new WorkloadException("Group by was enabled but no keys were specified.");
}
final String[] gbKeys = groupByKeys.split(",");
if (gbKeys.length != tagKeys.length) {
throw new WorkloadException("Only " + gbKeys.length + " group by keys "
+ "were specified but there were " + tagKeys.length + " tag keys given.");
}
groupBys = new boolean[gbKeys.length];
for (int i = 0; i < gbKeys.length; i++) {
groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true;
}
groupBy = true;
}
downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT);
downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY);
if (downsampleFunction != null && !downsampleFunction.isEmpty()) {
final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY);
if (interval == null || interval.isEmpty()) {
throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '"
+ DOWNSAMPLING_FUNCTION_PROPERTY + "' being set.");
}
downsampleInterval = Integer.parseInt(interval);
downsample = true;
}
delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT));
delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT));
valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT));
table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
initKeysAndTags();
validateSettings();
}
@Override
public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
if (properties == null) {
throw new WorkloadException("Workload has not been initialized.");
}
return new ThreadState(mythreadid, threadcount);
}
@Override
- public boolean doInsert(DB db, Object threadstate) {
+ public boolean doInsert(AsyncDB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
final Map tags = new TreeMap();
final String key = ((ThreadState)threadstate).nextDataPoint(tags, true);
- if (db.insert(table, key, tags) == Status.OK) {
+ if (db.insert(table, key, tags).join() == Status.OK) {
return true;
}
return false;
}
@Override
- public boolean doTransaction(DB db, Object threadstate) {
+ public boolean doTransaction(AsyncDB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
switch (operationchooser.nextString()) {
case "READ":
doTransactionRead(db, threadstate);
break;
case "UPDATE":
doTransactionUpdate(db, threadstate);
break;
case "INSERT":
doTransactionInsert(db, threadstate);
break;
case "SCAN":
doTransactionScan(db, threadstate);
break;
case "DELETE":
doTransactionDelete(db, threadstate);
break;
default:
return false;
}
return true;
}
- protected void doTransactionRead(final DB db, Object threadstate) {
+ protected void doTransactionRead(final AsyncDB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final String keyname = keys[keychooser.nextValue().intValue()];
final Random random = ThreadLocalRandom.current();
int offsets = state.queryOffsetGenerator.nextValue().intValue();
//int offsets = random.nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
Set fields = new HashSet();
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
fields.add(tagKeys[i]);
} else {
fields.add(tagKeys[i] + tagPairDelimiter +
tagValues[random.nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
fields.add(timestampKey + tagPairDelimiter + startTimestamp);
}
if (groupBy) {
fields.add(groupByKey + tagPairDelimiter + groupByFunction);
}
if (downsample) {
fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval);
}
final Map cells = new HashMap();
- final Status status = db.read(table, keyname, fields, cells);
+ final Status status = db.read(table, keyname, fields, cells).join();
if (dataintegrity && status == Status.OK) {
verifyRow(keyname, cells);
}
}
- protected void doTransactionUpdate(final DB db, Object threadstate) {
+ protected void doTransactionUpdate(final AsyncDB db, Object threadstate) {
if (threadstate == null) {
throw new IllegalStateException("Missing thread state.");
}
final Map tags = new TreeMap();
final String key = ((ThreadState)threadstate).nextDataPoint(tags, false);
db.update(table, key, tags);
}
- protected void doTransactionInsert(final DB db, Object threadstate) {
+ protected void doTransactionInsert(final AsyncDB db, Object threadstate) {
doInsert(db, threadstate);
}
- protected void doTransactionScan(final DB db, Object threadstate) {
+ protected void doTransactionScan(final AsyncDB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final Random random = ThreadLocalRandom.current();
final String keyname = keys[random.nextInt(keys.length)];
// choose a random scan length
int len = scanlength.nextValue().intValue();
int offsets = random.nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
Set fields = new HashSet();
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
fields.add(tagKeys[i]);
} else {
fields.add(tagKeys[i] + tagPairDelimiter +
tagValues[random.nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
fields.add(timestampKey + tagPairDelimiter + startTimestamp);
}
if (groupBy) {
fields.add(groupByKey + tagPairDelimiter + groupByFunction);
}
if (downsample) {
fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval);
}
final Vector> results = new Vector>();
db.scan(table, keyname, len, fields, results);
}
- protected void doTransactionDelete(final DB db, Object threadstate) {
+ protected void doTransactionDelete(final AsyncDB db, Object threadstate) {
final ThreadState state = (ThreadState) threadstate;
final Random random = ThreadLocalRandom.current();
final StringBuilder buf = new StringBuilder().append(keys[random.nextInt(keys.length)]);
int offsets = random.nextInt(maxOffsets - 1);
final long startTimestamp;
if (offsets > 0) {
startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
} else {
startTimestamp = state.startTimestamp;
}
// rando tags
for (int i = 0; i < tagPairs; ++i) {
if (groupBy && groupBys[i]) {
buf.append(deleteDelimiter)
.append(tagKeys[i]);
} else {
buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter +
tagValues[random.nextInt(tagCardinality[i])]);
}
}
if (queryTimeSpan > 0) {
final long endTimestamp;
if (queryRandomTimeSpan) {
endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
} else {
endTimestamp = startTimestamp + queryTimeSpan;
}
buf.append(deleteDelimiter)
.append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
} else {
buf.append(deleteDelimiter)
.append(timestampKey + tagPairDelimiter + startTimestamp);
}
db.delete(table, buf.toString());
}
/**
* Parses the values returned by a read or scan operation and determines whether
* or not the integer value matches the hash and timestamp of the original timestamp.
* Only works for raw data points, will not work for group-by's or downsampled data.
* @param key The time series key.
* @param cells The cells read by the DB.
* @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if
* the data did not match.
*/
protected Status verifyRow(final String key, final Map cells) {
Status verifyStatus = Status.UNEXPECTED_STATE;
long startTime = System.nanoTime();
double value = 0;
long timestamp = 0;
final TreeMap validationTags = new TreeMap();
for (final Entry entry : cells.entrySet()) {
if (entry.getKey().equals(timestampKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
timestamp = it.getLong();
} else if (entry.getKey().equals(valueKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
value = it.isFloatingPoint() ? it.getDouble() : it.getLong();
} else {
validationTags.put(entry.getKey(), entry.getValue().toString());
}
}
if (validationFunction(key, timestamp, validationTags) == value) {
verifyStatus = Status.OK;
}
long endTime = System.nanoTime();
measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
measurements.reportStatus("VERIFY", verifyStatus);
return verifyStatus;
}
/**
* Function used for generating a deterministic hash based on the combination
* of metric, tags and timestamp.
* @param key A non-null string representing the key.
* @param timestamp A timestamp in the proper units for the workload.
* @param tags A non-null map of tag keys and values NOT including the YCSB
* key or timestamp.
* @return A hash value as an 8 byte integer.
*/
protected long validationFunction(final String key, final long timestamp,
final TreeMap tags) {
final StringBuilder validationBuffer = new StringBuilder(keys[0].length() +
(tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1]));
for (final Entry pair : tags.entrySet()) {
validationBuffer.append(pair.getKey()).append(pair.getValue());
}
return (long) validationBuffer.toString().hashCode() ^ timestamp;
}
/**
* Breaks out the keys, tags and cardinality initialization in another method
* to keep CheckStyle happy.
* @throws WorkloadException If something goes pear shaped.
*/
protected void initKeysAndTags() throws WorkloadException {
final int keyLength = Integer.parseInt(properties.getProperty(
CoreWorkload.FIELD_LENGTH_PROPERTY,
CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT));
final int tagKeyLength = Integer.parseInt(properties.getProperty(
TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT));
final int tagValueLength = Integer.parseInt(properties.getProperty(
TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT));
keyGenerator = new IncrementingPrintableStringGenerator(keyLength);
tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength);
tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength);
final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1"));
final String tagCardinalityString = properties.getProperty(
TAG_CARDINALITY_PROPERTY,
TAG_CARDINALITY_PROPERTY_DEFAULT);
final String[] tagCardinalityParts = tagCardinalityString.split(",");
int idx = 0;
totalCardinality = numKeys;
perKeyCardinality = 1;
int maxCardinality = 0;
for (final String card : tagCardinalityParts) {
try {
tagCardinality[idx] = Integer.parseInt(card.trim());
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse cardinality: " +
card, nfe);
}
if (tagCardinality[idx] < 1) {
throw new WorkloadException("Cardinality must be greater than zero: " +
tagCardinality[idx]);
}
totalCardinality *= tagCardinality[idx];
perKeyCardinality *= tagCardinality[idx];
if (tagCardinality[idx] > maxCardinality) {
maxCardinality = tagCardinality[idx];
}
++idx;
if (idx >= tagPairs) {
// we have more cardinalities than tag keys so bail at this point.
break;
}
}
if (numKeys < threads) {
throw new WorkloadException("Field count " + numKeys + " (keys for time "
+ "series workloads) must be greater or equal to the number of "
+ "threads " + threads);
}
// fill tags without explicit cardinality with 1
if (idx < tagPairs) {
tagCardinality[idx++] = 1;
}
for (int i = 0; i < tagCardinality.length; ++i) {
if (tagCardinality[i] > 1) {
firstIncrementableCardinality = i;
break;
}
}
keys = new String[numKeys];
tagKeys = new String[tagPairs];
tagValues = new String[maxCardinality];
for (int i = 0; i < numKeys; ++i) {
keys[i] = keyGenerator.nextString();
}
for (int i = 0; i < tagPairs; ++i) {
tagKeys[i] = tagKeyGenerator.nextString();
}
for (int i = 0; i < maxCardinality; i++) {
tagValues[i] = tagValueGenerator.nextString();
}
if (randomizeTimeseriesOrder) {
Utils.shuffleArray(keys);
Utils.shuffleArray(tagValues);
}
maxOffsets = (recordcount / totalCardinality) + 1;
final int[] keyAndTagCardinality = new int[tagPairs + 1];
keyAndTagCardinality[0] = numKeys;
for (int i = 0; i < tagPairs; i++) {
keyAndTagCardinality[i + 1] = tagCardinality[i];
}
cumulativeCardinality = new int[keyAndTagCardinality.length];
for (int i = 0; i < keyAndTagCardinality.length; i++) {
int cumulation = 1;
for (int x = i; x <= keyAndTagCardinality.length - 1; x++) {
cumulation *= keyAndTagCardinality[x];
}
if (i > 0) {
cumulativeCardinality[i - 1] = cumulation;
}
}
cumulativeCardinality[cumulativeCardinality.length - 1] = 1;
}
/**
* Makes sure the settings as given are compatible.
* @throws WorkloadException If one or more settings were invalid.
*/
protected void validateSettings() throws WorkloadException {
if (dataintegrity) {
if (valueType != ValueType.INTEGERS) {
throw new WorkloadException("Data integrity was enabled. 'valuetype' must "
+ "be set to 'integers'.");
}
if (groupBy) {
throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must "
+ "be empty or null.");
}
if (downsample) {
throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must "
+ "be empty or null.");
}
if (queryTimeSpan > 0) {
throw new WorkloadException("Data integrity was enabled. 'querytimespan' must "
+ "be empty or 0.");
}
if (randomizeTimeseriesOrder) {
throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must "
+ "be false.");
}
final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
if (startTimestamp == null || startTimestamp.isEmpty()) {
throw new WorkloadException("Data integrity was enabled. 'insertstart' must "
+ "be set to a Unix Epoch timestamp.");
}
}
}
/**
* Thread state class holding thread local generators and indices.
*/
protected class ThreadState {
/** The timestamp generator for this thread. */
protected final UnixEpochTimestampGenerator timestampGenerator;
/** An offset generator to select a random offset for queries. */
protected final NumberGenerator queryOffsetGenerator;
/** The current write key index. */
protected int keyIdx;
/** The starting fence for writing keys. */
protected int keyIdxStart;
/** The ending fence for writing keys. */
protected int keyIdxEnd;
/** Indices for each tag value for writes. */
protected int[] tagValueIdxs;
/** Whether or not all time series have written values for the current timestamp. */
protected boolean rollover;
/** The starting timestamp. */
protected long startTimestamp;
/**
* Default ctor.
* @param threadID The zero based thread ID.
* @param threadCount The total number of threads.
* @throws WorkloadException If something went pear shaped.
*/
protected ThreadState(final int threadID, final int threadCount) throws WorkloadException {
int totalThreads = threadCount > 0 ? threadCount : 1;
if (threadID >= totalThreads) {
throw new IllegalStateException("Thread ID " + threadID + " cannot be greater "
+ "than or equal than the thread count " + totalThreads);
}
if (keys.length < threadCount) {
throw new WorkloadException("Thread count " + totalThreads + " must be greater "
+ "than or equal to key count " + keys.length);
}
int keysPerThread = keys.length / totalThreads;
keyIdx = keysPerThread * threadID;
keyIdxStart = keyIdx;
if (totalThreads - 1 == threadID) {
keyIdxEnd = keys.length;
} else {
keyIdxEnd = keyIdxStart + keysPerThread;
}
tagValueIdxs = new int[tagPairs]; // all zeros
final String startingTimestamp =
properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
if (startingTimestamp == null || startingTimestamp.isEmpty()) {
timestampGenerator = randomizeTimestampOrder ?
new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) :
new UnixEpochTimestampGenerator(timestampInterval, timeUnits);
} else {
try {
timestampGenerator = randomizeTimestampOrder ?
new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits,
Long.parseLong(startingTimestamp), maxOffsets) :
new UnixEpochTimestampGenerator(timestampInterval, timeUnits,
Long.parseLong(startingTimestamp));
} catch (NumberFormatException nfe) {
throw new WorkloadException("Unable to parse the " +
CoreWorkload.INSERT_START_PROPERTY, nfe);
}
}
// Set the last value properly for the timestamp, otherwise it may start
// one interval ago.
startTimestamp = timestampGenerator.nextValue();
// TODO - pick it
queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2);
}
/**
* Generates the next write value for thread.
* @param map An initialized map to populate with tag keys and values as well
* as the timestamp and actual value.
* @param isInsert Whether or not it's an insert or an update. Updates will pick
* an older timestamp (if random isn't enabled).
* @return The next key to write.
*/
protected String nextDataPoint(final Map map, final boolean isInsert) {
final Random random = ThreadLocalRandom.current();
int iterations = sparsity <= 0 ? 1 : random.nextInt((int) ((double) perKeyCardinality * sparsity));
if (iterations < 1) {
iterations = 1;
}
while (true) {
iterations--;
if (rollover) {
timestampGenerator.nextValue();
rollover = false;
}
String key = null;
if (iterations <= 0) {
final TreeMap validationTags;
if (dataintegrity) {
validationTags = new TreeMap();
} else {
validationTags = null;
}
key = keys[keyIdx];
int overallIdx = keyIdx * cumulativeCardinality[0];
for (int i = 0; i < tagPairs; ++i) {
int tvidx = tagValueIdxs[i];
map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx]));
if (dataintegrity) {
validationTags.put(tagKeys[i], tagValues[tvidx]);
}
if (delayedSeries > 0) {
overallIdx += (tvidx * cumulativeCardinality[i + 1]);
}
}
if (!isInsert) {
final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval;
final int intervals = random.nextInt((int) delta);
map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval)));
} else if (delayedSeries > 0) {
// See if the series falls in a delay bucket and calculate an offset earlier
// than the current timestamp value if so.
double pct = (double) overallIdx / (double) totalCardinality;
if (pct < delayedSeries) {
int modulo = overallIdx % delayedIntervals;
if (modulo < 0) {
modulo *= -1;
}
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() -
timestampInterval * modulo));
} else {
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
}
} else {
map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
}
if (dataintegrity) {
map.put(valueKey, new NumericByteIterator(validationFunction(key,
timestampGenerator.currentValue(), validationTags)));
} else {
switch (valueType) {
case INTEGERS:
map.put(valueKey, new NumericByteIterator(random.nextInt()));
break;
case FLOATS:
map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000));
break;
case MIXED:
if (random.nextBoolean()) {
map.put(valueKey, new NumericByteIterator(random.nextInt()));
} else {
map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000));
}
break;
default:
throw new IllegalStateException("Somehow we didn't have a value "
+ "type configured that we support: " + valueType);
}
}
}
boolean tagRollover = false;
for (int i = tagCardinality.length - 1; i >= 0; --i) {
if (tagCardinality[i] <= 1) {
tagRollover = true; // Only one tag so needs roll over.
continue;
}
if (tagValueIdxs[i] + 1 >= tagCardinality[i]) {
tagValueIdxs[i] = 0;
if (i == firstIncrementableCardinality) {
tagRollover = true;
}
} else {
++tagValueIdxs[i];
break;
}
}
if (tagRollover) {
if (keyIdx + 1 >= keyIdxEnd) {
keyIdx = keyIdxStart;
rollover = true;
} else {
++keyIdx;
}
}
if (iterations <= 0) {
return key;
}
}
}
}
}
\ No newline at end of file
diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
index 409331ac..9b8464b9 100644
--- a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
+++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
@@ -1,577 +1,579 @@
/**
* Copyright (c) 2017 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.workloads;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
+import java.util.concurrent.CompletableFuture;
+
+import com.yahoo.ycsb.AsyncDB;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Client;
-import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.NumericByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import com.yahoo.ycsb.WorkloadException;
import com.yahoo.ycsb.measurements.Measurements;
import org.testng.annotations.Test;
public class TestTimeSeriesWorkload {
@Test
public void twoThreads() throws Exception {
final Properties p = getUTProperties();
Measurements.setProperties(p);
final TimeSeriesWorkload wl = new TimeSeriesWorkload();
wl.init(p);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAB");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test (expectedExceptions = WorkloadException.class)
public void badTimeUnit() throws Exception {
final Properties p = new Properties();
p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar");
getWorkload(p, true);
}
@Test (expectedExceptions = WorkloadException.class)
public void failedToInitWorkloadBeforeThreadInit() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, false);
//wl.init(p); // <-- we NEED this :(
final Object threadState = wl.initThread(p, 0, 2);
final MockDB db = new MockDB();
wl.doInsert(db, threadState);
}
@Test (expectedExceptions = IllegalStateException.class)
public void failedToInitThread() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final MockDB db = new MockDB();
wl.doInsert(db, null);
}
@Test
public void insertOneKeyOneTagCardinalityOne() throws Exception {
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "1");
p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1");
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
timestamp += 60;
}
}
@Test
public void insertOneKeyTwoTagsLowCardinality() throws Exception {
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void insertTwoKeysTwoTagsLowCardinality() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
int metricCtr = 0;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
}
if (metricCtr++ > 1) {
assertEquals(db.keys.get(i), "AAAB");
if (metricCtr >= 4) {
metricCtr = 0;
timestamp += 60;
}
} else {
assertEquals(db.keys.get(i), "AAAA");
}
}
}
@Test
public void insertTwoKeysTwoThreads() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA"); // <-- key 1
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAB"); // <-- key 2
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void insertThreeKeysTwoThreads() throws Exception {
// To make sure the distribution doesn't miss any metrics
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3");
final TimeSeriesWorkload wl = getWorkload(p, true);
Object threadState = wl.initThread(p, 0, 2);
MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertTrue(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
threadState = wl.initThread(p, 1, 2);
db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
timestamp = 1451606400;
int metricCtr = 0;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
}
if (metricCtr++ > 1) {
assertEquals(db.keys.get(i), "AAAC");
if (metricCtr >= 4) {
metricCtr = 0;
timestamp += 60;
}
} else {
assertEquals(db.keys.get(i), "AAAB");
}
}
}
@Test
public void insertWithValidation() throws Exception {
final Properties p = getUTProperties();
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 74; i++) {
assertTrue(wl.doInsert(db, threadState));
}
assertEquals(db.keys.size(), 74);
assertEquals(db.values.size(), 74);
long timestamp = 1451606400;
for (int i = 0; i < db.keys.size(); i++) {
assertEquals(db.keys.get(i), "AAAA");
assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
assertEquals(Utils.bytesToLong(db.values.get(i).get(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
assertFalse(((NumericByteIterator) db.values.get(i)
.get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
// validation check
final TreeMap validationTags = new TreeMap();
for (final Entry entry : db.values.get(i).entrySet()) {
if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) ||
entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) {
continue;
}
validationTags.put(entry.getKey(), entry.getValue().toString());
}
assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags),
((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong());
if (i % 2 == 0) {
assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
} else {
assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
timestamp += 60;
}
}
}
@Test
public void read() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final Object threadState = wl.initThread(p, 0, 1);
final MockDB db = new MockDB();
for (int i = 0; i < 20; i++) {
wl.doTransactionRead(db, threadState);
}
}
@Test
public void verifyRow() throws Exception {
final Properties p = getUTProperties();
final TimeSeriesWorkload wl = getWorkload(p, true);
final TreeMap validationTags = new TreeMap();
final HashMap cells = new HashMap();
validationTags.put("AA", "AAAA");
cells.put("AA", new StringByteIterator("AAAA"));
validationTags.put("AB", "AAAB");
cells.put("AB", new StringByteIterator("AAAB"));
long hash = wl.validationFunction("AAAA", 1451606400L, validationTags);
cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L));
cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash));
assertEquals(wl.verifyRow("AAAA", cells), Status.OK);
// tweak the last value a bit
for (final ByteIterator it : cells.values()) {
it.reset();
}
cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1));
assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
// no value cell, returns an unexpected state
for (final ByteIterator it : cells.values()) {
it.reset();
}
cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
}
@Test
public void validateSettingsDataIntegrity() throws Exception {
Properties p = getUTProperties();
// data validation incompatibilities
p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok
p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "");
p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p = getUTProperties();
p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, "");
try {
getWorkload(p, true);
fail("Expected WorkloadException");
} catch (WorkloadException e) { }
}
/** Helper method that generates unit testing defaults for the properties map */
private Properties getUTProperties() {
final Properties p = new Properties();
p.put(Client.RECORD_COUNT_PROPERTY, "10");
p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2");
p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4");
p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2");
p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4");
p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2");
p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2");
p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400");
p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0");
p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
return p;
}
/** Helper to setup the workload for testing. */
private TimeSeriesWorkload getWorkload(final Properties p, final boolean init)
throws WorkloadException {
Measurements.setProperties(p);
if (!init) {
return new TimeSeriesWorkload();
} else {
final TimeSeriesWorkload workload = new TimeSeriesWorkload();
workload.init(p);
return workload;
}
}
- static class MockDB extends DB {
+ static class MockDB extends AsyncDB {
final List keys = new ArrayList();
final List