diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDB.java b/core/src/main/java/com/yahoo/ycsb/AsyncDB.java new file mode 100644 index 00000000..f70ccbfc --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/AsyncDB.java @@ -0,0 +1,198 @@ +/** + * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +/** + * A layer for accessing a database to be benchmarked. Each thread in the client + * will be given its own instance of whatever DB class is to be used in the test. + * This class should be constructed using a no-argument constructor, so we can + * load it dynamically. Any argument-based initialization should be + * done by init(). + * + * Note that YCSB does not make any use of the return codes returned by this class. + * Instead, it keeps a count of the return values and presents them to the user. + * + * The semantics of methods such as insert, update and delete vary from database + * to database. In particular, operations may or may not be durable once these + * methods commit, and some systems may return 'success' regardless of whether + * or not a tuple with a matching key existed before the call. Rather than dictate + * the exact semantics of these methods, we recommend you either implement them + * to match the database's default semantics, or the semantics of your + * target application. For the sake of comparison between experiments we also + * recommend you explain the semantics you chose when presenting performance results. + */ +public abstract class AsyncDB { + /** + * Properties for configuring this DB. + */ + private Properties properties = new Properties(); + + /** + * Synchronous view of this DB. + */ + private DB syncDB; + + /** + * Set the properties for this DB. + */ + public void setProperties(Properties p) { + properties = p; + + } + + /** + * Get the set of properties for this DB. + */ + public Properties getProperties() { + return properties; + } + + /** + * Initialize any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ + public void init() throws DBException { + } + + /** + * Cleanup any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ + public void cleanup() throws DBException { + } + + /** + * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return The result of the operation. + */ + public abstract CompletableFuture read(String table, String key, Set fields, + Map result); + + /** + * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored + * in a HashMap. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return The result of the operation. + */ + public abstract CompletableFuture scan(String table, String startkey, int recordcount, Set fields, + Vector> result); + + /** + * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return The result of the operation. + */ + public abstract CompletableFuture update(String table, String key, Map values); + + /** + * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return The result of the operation. + */ + public abstract CompletableFuture insert(String table, String key, Map values); + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return The result of the operation. + */ + public abstract CompletableFuture delete(String table, String key); + + private static final class SyncView extends DB { + private final AsyncDB db; + + SyncView(AsyncDB db) { + this.db = db; + } + + @Override + public void setProperties(Properties p) { + db.setProperties(p); + } + + @Override + public Properties getProperties() { + return db.getProperties(); + } + + @Override + public Status read(String table, String key, Set fields, Map result) { + return db.read(table, key, fields, result).join(); + } + + @Override + public Status scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + return db.scan(table, startkey, recordcount, fields, result).join(); + } + + @Override + public Status update(String table, String key, Map values) { + return db.update(table, key, values).join(); + } + + @Override + public Status insert(String table, String key, Map values) { + return db.insert(table, key, values).join(); + } + + @Override + public Status delete(String table, String key) { + return db.delete(table, key).join(); + } + } + + /** + * Return a synchronous view of this database. + * + * @return A DB object that operates synchronously. + */ + public DB syncView() { + if (syncDB == null) { + synchronized (this) { + if (syncDB == null) { + syncDB = new SyncView(this); + } + } + } + return syncDB; + } +} diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java new file mode 100644 index 00000000..a7bf1179 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java @@ -0,0 +1,58 @@ +package com.yahoo.ycsb; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +/** + * + */ +public class AsyncDBAdapter extends AsyncDB { + + private final DB db; + + public AsyncDBAdapter(final DB db) { + this.db = db; + } + + @Override + public void setProperties(Properties p) { + db.setProperties(p); + } + + @Override + public Properties getProperties() { + return db.getProperties(); + } + + @Override + public CompletableFuture read(String table, String key, Set fields, + Map result) { + return CompletableFuture.completedFuture(db.read(table, key, fields, result)); + } + + @Override + public CompletableFuture scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + return CompletableFuture.completedFuture(db.scan(table, startkey, recordcount, fields, result)); + } + + @Override + public CompletableFuture update(String table, String key, Map values) { + return CompletableFuture.completedFuture(db.update(table, key, values)); + } + + @Override + public CompletableFuture insert(String table, String key, Map values) { + return CompletableFuture.completedFuture(db.insert(table, key, values)); + } + + @Override + public CompletableFuture delete(String table, String key) { + return CompletableFuture.completedFuture(db.delete(table, key)); + } + + @Override + public DB syncView() { + return db; + } +} diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java index 00b1e378..b196957c 100644 --- a/core/src/main/java/com/yahoo/ycsb/Client.java +++ b/core/src/main/java/com/yahoo/ycsb/Client.java @@ -1,1124 +1,1124 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import com.yahoo.ycsb.measurements.Measurements; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter; import org.apache.htrace.core.HTraceConfiguration; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** * A thread to periodically show the status of the experiment to reassure you that progress is being made. */ class StatusThread extends Thread { // Counts down each of the clients completing private final CountDownLatch completeLatch; // Stores the measurements for the run private final Measurements measurements; // Whether or not to track the JVM stats per run private final boolean trackJVMStats; // The clients that are running. private final List clients; private final String label; private final boolean standardstatus; // The interval for reporting status. private long sleeptimeNs; // JVM max/mins private int maxThreads; private int minThreads = Integer.MAX_VALUE; private long maxUsedMem; private long minUsedMem = Long.MAX_VALUE; private double maxLoadAvg; private double minLoadAvg = Double.MAX_VALUE; private long lastGCCount = 0; private long lastGCTime = 0; /** * Creates a new StatusThread without JVM stat tracking. * * @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} * as they complete. * @param clients The clients to collect metrics from. * @param label The label for the status. * @param standardstatus If true the status is printed to stdout in addition to stderr. * @param statusIntervalSeconds The number of seconds between status updates. */ public StatusThread(CountDownLatch completeLatch, List clients, String label, boolean standardstatus, int statusIntervalSeconds) { this(completeLatch, clients, label, standardstatus, statusIntervalSeconds, false); } /** * Creates a new StatusThread. * * @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} * as they complete. * @param clients The clients to collect metrics from. * @param label The label for the status. * @param standardstatus If true the status is printed to stdout in addition to stderr. * @param statusIntervalSeconds The number of seconds between status updates. * @param trackJVMStats Whether or not to track JVM stats. */ public StatusThread(CountDownLatch completeLatch, List clients, String label, boolean standardstatus, int statusIntervalSeconds, boolean trackJVMStats) { this.completeLatch = completeLatch; this.clients = clients; this.label = label; this.standardstatus = standardstatus; sleeptimeNs = TimeUnit.SECONDS.toNanos(statusIntervalSeconds); measurements = Measurements.getMeasurements(); this.trackJVMStats = trackJVMStats; } /** * Run and periodically report status. */ @Override public void run() { final long startTimeMs = System.currentTimeMillis(); final long startTimeNanos = System.nanoTime(); long deadline = startTimeNanos + sleeptimeNs; long startIntervalMs = startTimeMs; long lastTotalOps = 0; boolean alldone; do { long nowMs = System.currentTimeMillis(); lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps); if (trackJVMStats) { measureJVM(); } alldone = waitForClientsUntil(deadline); startIntervalMs = nowMs; deadline += sleeptimeNs; } while (!alldone); if (trackJVMStats) { measureJVM(); } // Print the final stats. computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps); } /** * Computes and prints the stats. * * @param startTimeMs The start time of the test. * @param startIntervalMs The start time of this interval. * @param endIntervalMs The end time (now) for the interval. * @param lastTotalOps The last total operations count. * @return The current operation count. */ private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs, long lastTotalOps) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); long totalops = 0; long todoops = 0; // Calculate the total number of operations completed. for (ClientThread t : clients) { totalops += t.getOpsDone(); todoops += t.getOpsTodo(); } long interval = endIntervalMs - startTimeMs; double throughput = 1000.0 * (((double) totalops) / (double) interval); double curthroughput = 1000.0 * (((double) (totalops - lastTotalOps)) / ((double) (endIntervalMs - startIntervalMs))); long estremaining = (long) Math.ceil(todoops / throughput); DecimalFormat d = new DecimalFormat("#.##"); String labelString = this.label + format.format(new Date()); StringBuilder msg = new StringBuilder(labelString).append(" ").append(interval / 1000).append(" sec: "); msg.append(totalops).append(" operations; "); if (totalops != 0) { msg.append(d.format(curthroughput)).append(" current ops/sec; "); } if (todoops != 0) { msg.append("est completion in ").append(RemainingFormatter.format(estremaining)); } msg.append(Measurements.getMeasurements().getSummary()); System.err.println(msg); if (standardstatus) { System.out.println(msg); } return totalops; } /** * Waits for all of the client to finish or the deadline to expire. * * @param deadline The current deadline. * @return True if all of the clients completed. */ private boolean waitForClientsUntil(long deadline) { boolean alldone = false; long now = System.nanoTime(); while (!alldone && now < deadline) { try { alldone = completeLatch.await(deadline - now, TimeUnit.NANOSECONDS); } catch (InterruptedException ie) { // If we are interrupted the thread is being asked to shutdown. // Return true to indicate that and reset the interrupt state // of the thread. Thread.currentThread().interrupt(); alldone = true; } now = System.nanoTime(); } return alldone; } /** * Executes the JVM measurements. */ private void measureJVM() { final int threads = Utils.getActiveThreadCount(); if (threads < minThreads) { minThreads = threads; } if (threads > maxThreads) { maxThreads = threads; } measurements.measure("THREAD_COUNT", threads); // TODO - once measurements allow for other number types, switch to using // the raw bytes. Otherwise we can track in MB to avoid negative values // when faced with huge heaps. final int usedMem = Utils.getUsedMemoryMegaBytes(); if (usedMem < minUsedMem) { minUsedMem = usedMem; } if (usedMem > maxUsedMem) { maxUsedMem = usedMem; } measurements.measure("USED_MEM_MB", usedMem); // Some JVMs may not implement this feature so if the value is less than // zero, just ommit it. final double systemLoad = Utils.getSystemLoadAverage(); if (systemLoad >= 0) { // TODO - store the double if measurements allows for them measurements.measure("SYS_LOAD_AVG", (int) systemLoad); if (systemLoad > maxLoadAvg) { maxLoadAvg = systemLoad; } if (systemLoad < minLoadAvg) { minLoadAvg = systemLoad; } } final long gcs = Utils.getGCTotalCollectionCount(); measurements.measure("GCS", (int) (gcs - lastGCCount)); final long gcTime = Utils.getGCTotalTime(); measurements.measure("GCS_TIME", (int) (gcTime - lastGCTime)); lastGCCount = gcs; lastGCTime = gcTime; } /** * @return The maximum threads running during the test. */ public int getMaxThreads() { return maxThreads; } /** * @return The minimum threads running during the test. */ public int getMinThreads() { return minThreads; } /** * @return The maximum memory used during the test. */ public long getMaxUsedMem() { return maxUsedMem; } /** * @return The minimum memory used during the test. */ public long getMinUsedMem() { return minUsedMem; } /** * @return The maximum load average during the test. */ public double getMaxLoadAvg() { return maxLoadAvg; } /** * @return The minimum load average during the test. */ public double getMinLoadAvg() { return minLoadAvg; } /** * @return Whether or not the thread is tracking JVM stats. */ public boolean trackJVMStats() { return trackJVMStats; } } /** * Turn seconds remaining into more useful units. * i.e. if there are hours or days worth of seconds, use them. */ final class RemainingFormatter { private RemainingFormatter() { // not used } public static StringBuilder format(long seconds) { StringBuilder time = new StringBuilder(); long days = TimeUnit.SECONDS.toDays(seconds); if (days > 0) { time.append(days).append(days == 1 ? " day " : " days "); seconds -= TimeUnit.DAYS.toSeconds(days); } long hours = TimeUnit.SECONDS.toHours(seconds); if (hours > 0) { time.append(hours).append(hours == 1 ? " hour " : " hours "); seconds -= TimeUnit.HOURS.toSeconds(hours); } /* Only include minute granularity if we're < 1 day. */ if (days < 1) { long minutes = TimeUnit.SECONDS.toMinutes(seconds); if (minutes > 0) { time.append(minutes).append(minutes == 1 ? " minute " : " minutes "); seconds -= TimeUnit.MINUTES.toSeconds(seconds); } } /* Only bother to include seconds if we're < 1 minute */ if (time.length() == 0) { time.append(seconds).append(time.length() == 1 ? " second " : " seconds "); } return time; } } /** * A thread for executing transactions or data inserts to the database. */ class ClientThread implements Runnable { // Counts down each of the clients completing. private final CountDownLatch completeLatch; private static boolean spinSleep; - private DB db; + private AsyncDB db; private boolean dotransactions; private Workload workload; private int opcount; private double targetOpsPerMs; private int opsdone; private int threadid; private int threadcount; private Object workloadstate; private Properties props; private long targetOpsTickNs; private final Measurements measurements; /** * Constructor. * * @param db the DB implementation to use * @param dotransactions true to do transactions, false to insert data * @param workload the workload to use * @param props the properties defining the experiment * @param opcount the number of operations (transactions or inserts) to do * @param targetperthreadperms target number of operations per thread per ms * @param completeLatch The latch tracking the completion of all clients. */ - public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount, + public ClientThread(AsyncDB db, boolean dotransactions, Workload workload, Properties props, int opcount, double targetperthreadperms, CountDownLatch completeLatch) { this.db = db; this.dotransactions = dotransactions; this.workload = workload; this.opcount = opcount; opsdone = 0; if (targetperthreadperms > 0) { targetOpsPerMs = targetperthreadperms; targetOpsTickNs = (long) (1000000 / targetOpsPerMs); } this.props = props; measurements = Measurements.getMeasurements(); spinSleep = Boolean.valueOf(this.props.getProperty("spin.sleep", "false")); this.completeLatch = completeLatch; } public void setThreadId(final int threadId) { threadid = threadId; } public void setThreadCount(final int threadCount) { threadcount = threadCount; } public int getOpsDone() { return opsdone; } @Override public void run() { try { db.init(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } try { workloadstate = workload.initThread(props, threadid, threadcount); } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } //NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements // and the client thread have the same view on time. //spread the thread operations out so they don't all hit the DB at the same time // GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0 // and the sleep() doesn't make sense for granularities < 1 ms anyway if ((targetOpsPerMs > 0) && (targetOpsPerMs <= 1.0)) { long randomMinorDelay = ThreadLocalRandom.current().nextInt((int) targetOpsTickNs); sleepUntil(System.nanoTime() + randomMinorDelay); } try { if (dotransactions) { long startTimeNanos = System.nanoTime(); while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) { if (!workload.doTransaction(db, workloadstate)) { break; } opsdone++; throttleNanos(startTimeNanos); } } else { long startTimeNanos = System.nanoTime(); while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) { if (!workload.doInsert(db, workloadstate)) { break; } opsdone++; throttleNanos(startTimeNanos); } } } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { measurements.setIntendedStartTimeNs(0); db.cleanup(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); } finally { completeLatch.countDown(); } } private static void sleepUntil(long deadline) { while (System.nanoTime() < deadline) { if (!spinSleep) { LockSupport.parkNanos(deadline - System.nanoTime()); } } } private void throttleNanos(long startTimeNanos) { //throttle the operations if (targetOpsPerMs > 0) { // delay until next tick long deadline = startTimeNanos + opsdone * targetOpsTickNs; sleepUntil(deadline); measurements.setIntendedStartTimeNs(deadline); } } /** * The total amount of work this thread is still expected to do. */ int getOpsTodo() { int todo = opcount - opsdone; return todo < 0 ? 0 : todo; } } /** * Main class for executing YCSB. */ public final class Client { private Client() { //not used } public static final String DEFAULT_RECORD_COUNT = "0"; /** * The target number of operations to perform. */ public static final String OPERATION_COUNT_PROPERTY = "operationcount"; /** * The number of records to load into the database initially. */ public static final String RECORD_COUNT_PROPERTY = "recordcount"; /** * The workload class to be loaded. */ public static final String WORKLOAD_PROPERTY = "workload"; /** * The database class to be used. */ public static final String DB_PROPERTY = "db"; /** * The exporter class to be used. The default is * com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter. */ public static final String EXPORTER_PROPERTY = "exporter"; /** * If set to the path of a file, YCSB will write all output to this file * instead of STDOUT. */ public static final String EXPORT_FILE_PROPERTY = "exportfile"; /** * The number of YCSB client threads to run. */ public static final String THREAD_COUNT_PROPERTY = "threadcount"; /** * Indicates how many inserts to do if less than recordcount. * Useful for partitioning the load among multiple servers if the client is the bottleneck. * Additionally workloads should support the "insertstart" property which tells them which record to start at. */ public static final String INSERT_COUNT_PROPERTY = "insertcount"; /** * Target number of operations per second. */ public static final String TARGET_PROPERTY = "target"; /** * The maximum amount of time (in seconds) for which the benchmark will be run. */ public static final String MAX_EXECUTION_TIME = "maxexecutiontime"; /** * Whether or not this is the transaction phase (run) or not (load). */ public static final String DO_TRANSACTIONS_PROPERTY = "dotransactions"; /** * Whether or not to show status during run. */ public static final String STATUS_PROPERTY = "status"; /** * Use label for status (e.g. to label one experiment out of a whole batch). */ public static final String LABEL_PROPERTY = "label"; /** * An optional thread used to track progress and measure JVM stats. */ private static StatusThread statusthread = null; // HTrace integration related constants. /** * All keys for configuring the tracing system start with this prefix. */ private static final String HTRACE_KEY_PREFIX = "htrace."; private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init"; private static final String CLIENT_INIT_SPAN = "Client#init"; private static final String CLIENT_WORKLOAD_SPAN = "Client#workload"; private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup"; private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements"; public static void usageMessage() { System.out.println("Usage: java com.yahoo.ycsb.Client [options]"); System.out.println("Options:"); System.out.println(" -threads n: execute using n threads (default: 1) - can also be specified as the \n" + " \"threadcount\" property using -p"); System.out.println(" -target n: attempt to do n operations per second (default: unlimited) - can also\n" + " be specified as the \"target\" property using -p"); System.out.println(" -load: run the loading phase of the workload"); System.out.println(" -t: run the transactions phase of the workload (default)"); System.out.println(" -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - \n" + " can also be specified as the \"db\" property using -p"); System.out.println(" -P propertyfile: load properties from the given file. Multiple files can"); System.out.println(" be specified, and will be processed in the order specified"); System.out.println(" -p name=value: specify a property to be passed to the DB and workloads;"); System.out.println(" multiple properties can be specified, and override any"); System.out.println(" values in the propertyfile"); System.out.println(" -s: show status during run (default: no status)"); System.out.println(" -l label: use label for status (e.g. to label one experiment out of a whole batch)"); System.out.println(""); System.out.println("Required properties:"); System.out.println(" " + WORKLOAD_PROPERTY + ": the name of the workload class to use (e.g. " + "com.yahoo.ycsb.workloads.CoreWorkload)"); System.out.println(""); System.out.println("To run the transaction phase from multiple servers, start a separate client on each."); System.out.println("To run the load phase from multiple servers, start a separate client on each; additionally,"); System.out.println("use the \"insertcount\" and \"insertstart\" properties to divide up the records " + "to be inserted"); } public static boolean checkRequiredProperties(Properties props) { if (props.getProperty(WORKLOAD_PROPERTY) == null) { System.out.println("Missing property: " + WORKLOAD_PROPERTY); return false; } return true; } /** * Exports the measurements to either sysout or a file using the exporter * loaded from conf. * * @throws IOException Either failed to write to output stream or failed to close it. */ private static void exportMeasurements(Properties props, int opcount, long runtime) throws IOException { MeasurementsExporter exporter = null; try { // if no destination file is provided the results will be written to stdout OutputStream out; String exportFile = props.getProperty(EXPORT_FILE_PROPERTY); if (exportFile == null) { out = System.out; } else { out = new FileOutputStream(exportFile); } // if no exporter is provided the default text one will be used String exporterStr = props.getProperty(EXPORTER_PROPERTY, "com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter"); try { exporter = (MeasurementsExporter) Class.forName(exporterStr).getConstructor(OutputStream.class) .newInstance(out); } catch (Exception e) { System.err.println("Could not find exporter " + exporterStr + ", will use default text reporter."); e.printStackTrace(); exporter = new TextMeasurementsExporter(out); } exporter.write("OVERALL", "RunTime(ms)", runtime); double throughput = 1000.0 * (opcount) / (runtime); exporter.write("OVERALL", "Throughput(ops/sec)", throughput); final Map gcs = Utils.getGCStatst(); long totalGCCount = 0; long totalGCTime = 0; for (final Entry entry : gcs.entrySet()) { exporter.write("TOTAL_GCS_" + entry.getKey(), "Count", entry.getValue()[0]); exporter.write("TOTAL_GC_TIME_" + entry.getKey(), "Time(ms)", entry.getValue()[1]); exporter.write("TOTAL_GC_TIME_%_" + entry.getKey(), "Time(%)", ((double) entry.getValue()[1] / runtime) * (double) 100); totalGCCount += entry.getValue()[0]; totalGCTime += entry.getValue()[1]; } exporter.write("TOTAL_GCs", "Count", totalGCCount); exporter.write("TOTAL_GC_TIME", "Time(ms)", totalGCTime); exporter.write("TOTAL_GC_TIME_%", "Time(%)", ((double) totalGCTime / runtime) * (double) 100); if (statusthread != null && statusthread.trackJVMStats()) { exporter.write("MAX_MEM_USED", "MBs", statusthread.getMaxUsedMem()); exporter.write("MIN_MEM_USED", "MBs", statusthread.getMinUsedMem()); exporter.write("MAX_THREADS", "Count", statusthread.getMaxThreads()); exporter.write("MIN_THREADS", "Count", statusthread.getMinThreads()); exporter.write("MAX_SYS_LOAD_AVG", "Load", statusthread.getMaxLoadAvg()); exporter.write("MIN_SYS_LOAD_AVG", "Load", statusthread.getMinLoadAvg()); } Measurements.getMeasurements().exportMeasurements(exporter); } finally { if (exporter != null) { exporter.close(); } } } @SuppressWarnings("unchecked") public static void main(String[] args) { Properties props = parseArguments(args); boolean status = Boolean.valueOf(props.getProperty(STATUS_PROPERTY, String.valueOf(false))); String label = props.getProperty(LABEL_PROPERTY, ""); long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0")); //get number of threads, target and db int threadcount = Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY, "1")); String dbname = props.getProperty(DB_PROPERTY, "com.yahoo.ycsb.BasicDB"); int target = Integer.parseInt(props.getProperty(TARGET_PROPERTY, "0")); //compute the target throughput double targetperthreadperms = -1; if (target > 0) { double targetperthread = ((double) target) / ((double) threadcount); targetperthreadperms = targetperthread / 1000.0; } Thread warningthread = setupWarningThread(); warningthread.start(); Measurements.setProperties(props); Workload workload = getWorkload(props); final Tracer tracer = getTracer(props, workload); initWorkload(props, warningthread, workload, tracer); System.err.println("Starting test."); final CountDownLatch completeLatch = new CountDownLatch(threadcount); final List clients = initDb(dbname, props, threadcount, targetperthreadperms, workload, tracer, completeLatch); if (status) { boolean standardstatus = false; if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY, "").compareTo("timeseries") == 0) { standardstatus = true; } int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval", "10")); boolean trackJVMStats = props.getProperty(Measurements.MEASUREMENT_TRACK_JVM_PROPERTY, Measurements.MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT).equals("true"); statusthread = new StatusThread(completeLatch, clients, label, standardstatus, statusIntervalSeconds, trackJVMStats); statusthread.start(); } Thread terminator = null; long st; long en; int opsDone; try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) { final Map threads = new HashMap<>(threadcount); for (ClientThread client : clients) { threads.put(new Thread(tracer.wrap(client, "ClientThread")), client); } st = System.currentTimeMillis(); for (Thread t : threads.keySet()) { t.start(); } if (maxExecutionTime > 0) { terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload); terminator.start(); } opsDone = 0; for (Map.Entry entry : threads.entrySet()) { try { entry.getKey().join(); opsDone += entry.getValue().getOpsDone(); } catch (InterruptedException ignored) { // ignored } } en = System.currentTimeMillis(); } try { try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) { if (terminator != null && !terminator.isInterrupted()) { terminator.interrupt(); } if (status) { // wake up status thread if it's asleep statusthread.interrupt(); // at this point we assume all the monitored threads are already gone as per above join loop. try { statusthread.join(); } catch (InterruptedException ignored) { // ignored } } workload.cleanup(); } } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) { exportMeasurements(props, opsDone, en - st); } } catch (IOException e) { System.err.println("Could not export measurements, error: " + e.getMessage()); e.printStackTrace(); System.exit(-1); } System.exit(0); } private static List initDb(String dbname, Properties props, int threadcount, double targetperthreadperms, Workload workload, Tracer tracer, CountDownLatch completeLatch) { boolean initFailed = false; boolean dotransactions = Boolean.valueOf(props.getProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true))); final List clients = new ArrayList<>(threadcount); try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) { int opcount; if (dotransactions) { opcount = Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY, "0")); } else { if (props.containsKey(INSERT_COUNT_PROPERTY)) { opcount = Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY, "0")); } else { opcount = Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); } } for (int threadid = 0; threadid < threadcount; threadid++) { - DB db; + AsyncDB db; try { db = DBFactory.newDB(dbname, props, tracer); } catch (UnknownDBException e) { System.out.println("Unknown DB " + dbname); initFailed = true; break; } int threadopcount = opcount / threadcount; // ensure correct number of operations, in case opcount is not a multiple of threadcount if (threadid < opcount % threadcount) { ++threadopcount; } ClientThread t = new ClientThread(db, dotransactions, workload, props, threadopcount, targetperthreadperms, completeLatch); t.setThreadId(threadid); t.setThreadCount(threadcount); clients.add(t); } if (initFailed) { System.err.println("Error initializing datastore bindings."); System.exit(0); } } return clients; } private static Tracer getTracer(Properties props, Workload workload) { return new Tracer.Builder("YCSB " + workload.getClass().getSimpleName()) .conf(getHTraceConfiguration(props)) .build(); } private static void initWorkload(Properties props, Thread warningthread, Workload workload, Tracer tracer) { try { try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) { workload.init(props); warningthread.interrupt(); } } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } } private static HTraceConfiguration getHTraceConfiguration(Properties props) { final Map filteredProperties = new HashMap<>(); for (String key : props.stringPropertyNames()) { if (key.startsWith(HTRACE_KEY_PREFIX)) { filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key)); } } return HTraceConfiguration.fromMap(filteredProperties); } private static Thread setupWarningThread() { //show a warning message that creating the workload is taking a while //but only do so if it is taking longer than 2 seconds //(showing the message right away if the setup wasn't taking very long was confusing people) return new Thread() { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { return; } System.err.println(" (might take a few minutes for large data sets)"); } }; } private static Workload getWorkload(Properties props) { ClassLoader classLoader = Client.class.getClassLoader(); try { Properties projectProp = new Properties(); projectProp.load(classLoader.getResourceAsStream("project.properties")); System.err.println("YCSB Client " + projectProp.getProperty("version")); } catch (IOException e) { System.err.println("Unable to retrieve client version."); } System.err.println(); System.err.println("Loading workload..."); try { Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY)); return (Workload) workloadclass.newInstance(); } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } return null; } private static Properties parseArguments(String[] args) { Properties props = new Properties(); System.err.print("Command line:"); for (String arg : args) { System.err.print(" " + arg); } System.err.println(); Properties fileprops = new Properties(); int argindex = 0; if (args.length == 0) { usageMessage(); System.out.println("At least one argument specifying a workload is required."); System.exit(0); } while (args[argindex].startsWith("-")) { if (args[argindex].compareTo("-threads") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -threads."); System.exit(0); } int tcount = Integer.parseInt(args[argindex]); props.setProperty(THREAD_COUNT_PROPERTY, String.valueOf(tcount)); argindex++; } else if (args[argindex].compareTo("-target") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -target."); System.exit(0); } int ttarget = Integer.parseInt(args[argindex]); props.setProperty(TARGET_PROPERTY, String.valueOf(ttarget)); argindex++; } else if (args[argindex].compareTo("-load") == 0) { props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(false)); argindex++; } else if (args[argindex].compareTo("-t") == 0) { props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true)); argindex++; } else if (args[argindex].compareTo("-s") == 0) { props.setProperty(STATUS_PROPERTY, String.valueOf(true)); argindex++; } else if (args[argindex].compareTo("-db") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -db."); System.exit(0); } props.setProperty(DB_PROPERTY, args[argindex]); argindex++; } else if (args[argindex].compareTo("-l") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -l."); System.exit(0); } props.setProperty(LABEL_PROPERTY, args[argindex]); argindex++; } else if (args[argindex].compareTo("-P") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -P."); System.exit(0); } String propfile = args[argindex]; argindex++; Properties myfileprops = new Properties(); try { myfileprops.load(new FileInputStream(propfile)); } catch (IOException e) { System.out.println("Unable to open the properties file " + propfile); System.out.println(e.getMessage()); System.exit(0); } //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e = myfileprops.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, myfileprops.getProperty(prop)); } } else if (args[argindex].compareTo("-p") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -p"); System.exit(0); } int eq = args[argindex].indexOf('='); if (eq < 0) { usageMessage(); System.out.println("Argument '-p' expected to be in key=value format (e.g., -p operationcount=99999)"); System.exit(0); } String name = args[argindex].substring(0, eq); String value = args[argindex].substring(eq + 1); props.put(name, value); argindex++; } else { usageMessage(); System.out.println("Unknown option " + args[argindex]); System.exit(0); } if (argindex >= args.length) { break; } } if (argindex != args.length) { usageMessage(); if (argindex < args.length) { System.out.println("An argument value without corresponding argument specifier (e.g., -p, -s) was found. " + "We expected an argument specifier and instead found " + args[argindex]); } else { System.out.println("An argument specifier without corresponding value was found at the end of the supplied " + "command line arguments."); } System.exit(0); } //overwrite file properties with properties from the command line //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e = props.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, props.getProperty(prop)); } props = fileprops; if (!checkRequiredProperties(props)) { System.out.println("Failed check required properties."); System.exit(0); } return props; } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/main/java/com/yahoo/ycsb/DBFactory.java index 2529e6ce..12043635 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java +++ b/core/src/main/java/com/yahoo/ycsb/DBFactory.java @@ -1,51 +1,55 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import org.apache.htrace.core.Tracer; import java.util.Properties; /** * Creates a DB layer by dynamically classloading the specified DB class. */ public final class DBFactory { private DBFactory() { // not used } - public static DB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException { + public static AsyncDB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException { ClassLoader classLoader = DBFactory.class.getClassLoader(); - DB ret; + AsyncDB ret; try { Class dbclass = classLoader.loadClass(dbname); - ret = (DB) dbclass.newInstance(); + if (DB.class.isAssignableFrom(dbclass)) { + ret = new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance()); + } else { + ret = (AsyncDB) dbclass.getDeclaredConstructor().newInstance(); + } } catch (Exception e) { e.printStackTrace(); return null; } ret.setProperties(properties); return new DBWrapper(ret, tracer); } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java index d5a238b6..68e66d2d 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java +++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java @@ -1,247 +1,294 @@ /** * Copyright (c) 2010 Yahoo! Inc., 2016-2017 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; -import java.util.Map; import com.yahoo.ycsb.measurements.Measurements; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; import java.util.*; +import java.util.concurrent.CompletableFuture; /** * Wrapper around a "real" DB that measures latencies and counts return codes. * Also reports latency separately between OK and failed operations. + * Waits for async calls to finish during cleanup() operation. */ -public class DBWrapper extends DB { - private final DB db; +public class DBWrapper extends AsyncDB { + private final AsyncDB db; private final Measurements measurements; private final Tracer tracer; private boolean reportLatencyForEachError = false; private Set latencyTrackedErrors = new HashSet(); + private final Set> queries = new HashSet<>(); private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY = "reportlatencyforeacherror"; private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT = "false"; private static final String LATENCY_TRACKED_ERRORS_PROPERTY = "latencytrackederrors"; private final String scopeStringCleanup; private final String scopeStringDelete; private final String scopeStringInit; private final String scopeStringInsert; private final String scopeStringRead; private final String scopeStringScan; private final String scopeStringUpdate; - public DBWrapper(final DB db, final Tracer tracer) { + public DBWrapper(final AsyncDB db, final Tracer tracer) { this.db = db; measurements = Measurements.getMeasurements(); this.tracer = tracer; final String simple = db.getClass().getSimpleName(); scopeStringCleanup = simple + "#cleanup"; scopeStringDelete = simple + "#delete"; scopeStringInit = simple + "#init"; scopeStringInsert = simple + "#insert"; scopeStringRead = simple + "#read"; scopeStringScan = simple + "#scan"; scopeStringUpdate = simple + "#update"; } /** * Set the properties for this DB. */ public void setProperties(Properties p) { db.setProperties(p); } /** * Get the set of properties for this DB. */ public Properties getProperties() { return db.getProperties(); } /** * Initialize any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void init() throws DBException { try (final TraceScope span = tracer.newScope(scopeStringInit)) { db.init(); this.reportLatencyForEachError = Boolean.parseBoolean(getProperties(). getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY, REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT)); if (!reportLatencyForEachError) { String latencyTrackedErrorsProperty = getProperties().getProperty(LATENCY_TRACKED_ERRORS_PROPERTY, null); if (latencyTrackedErrorsProperty != null) { this.latencyTrackedErrors = new HashSet(Arrays.asList( latencyTrackedErrorsProperty.split(","))); } } System.err.println("DBWrapper: report latency for each error is " + this.reportLatencyForEachError + " and specific error codes to track" + " for latency are: " + this.latencyTrackedErrors.toString()); } } /** * Cleanup any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void cleanup() throws DBException { try (final TraceScope span = tracer.newScope(scopeStringCleanup)) { + // Wait for any asynchronous operations to finish. + System.err.println("DBWrapper: Waiting 100ms for all queries to finish."); + CompletableFuture.allOf(queries.toArray(new CompletableFuture[0])).join(); + long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); db.cleanup(); long en = System.nanoTime(); measure("CLEANUP", Status.OK, ist, st, en); } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table The name of the table * @param key The record key of the record to read. * @param fields The list of fields to read, or null for all of them * @param result A HashMap of field/value pairs for the result * @return The result of the operation. */ - public Status read(String table, String key, Set fields, - Map result) { + public CompletableFuture read(String table, String key, Set fields, + Map result) { try (final TraceScope span = tracer.newScope(scopeStringRead)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - Status res = db.read(table, key, fields, result); - long en = System.nanoTime(); - measure("READ", res, ist, st, en); - measurements.reportStatus("READ", res); - return res; + + CompletableFuture readFuture = db.read(table, key, fields, result).whenComplete((res, ex) -> { + if (ex != null) { + System.err.println("READ failed due to exception: " + ex); + res = Status.ERROR; + } + long en = System.nanoTime(); + measure("READ", res, ist, st, en); + measurements.reportStatus("READ", res); + }); + + queries.add(readFuture); + return readFuture; } } /** * Perform a range scan for a set of records in the database. * Each field/value pair from the result will be stored in a HashMap. * * @param table The name of the table * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return The result of the operation. */ - public Status scan(String table, String startkey, int recordcount, - Set fields, Vector> result) { + public CompletableFuture scan(String table, String startkey, int recordcount, + Set fields, Vector> result) { try (final TraceScope span = tracer.newScope(scopeStringScan)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - Status res = db.scan(table, startkey, recordcount, fields, result); - long en = System.nanoTime(); - measure("SCAN", res, ist, st, en); - measurements.reportStatus("SCAN", res); - return res; + + CompletableFuture scanFuture = db.scan(table, startkey, recordcount, fields, result) + .whenComplete((res, ex) -> { + if (ex != null) { + System.err.println("SCAN failed due to exception: " + ex); + res = Status.ERROR; + } + long en = System.nanoTime(); + measure("SCAN", res, ist, st, en); + measurements.reportStatus("SCAN", res); + }); + + queries.add(scanFuture); + return scanFuture; } } private void measure(String op, Status result, long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) { String measurementName = op; if (result == null || !result.isOk()) { if (this.reportLatencyForEachError || this.latencyTrackedErrors.contains(result.getName())) { measurementName = op + "-" + result.getName(); } else { measurementName = op + "-FAILED"; } } measurements.measure(measurementName, (int) ((endTimeNanos - startTimeNanos) / 1000)); measurements.measureIntended(measurementName, (int) ((endTimeNanos - intendedStartTimeNanos) / 1000)); } /** * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the * record with the specified record key, overwriting any existing values with the same field name. * * @param table The name of the table * @param key The record key of the record to write. * @param values A HashMap of field/value pairs to update in the record * @return The result of the operation. */ - public Status update(String table, String key, - Map values) { + public CompletableFuture update(String table, String key, + Map values) { try (final TraceScope span = tracer.newScope(scopeStringUpdate)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - Status res = db.update(table, key, values); - long en = System.nanoTime(); - measure("UPDATE", res, ist, st, en); - measurements.reportStatus("UPDATE", res); - return res; + + CompletableFuture updateFuture = db.update(table, key, values).whenComplete((res, ex) -> { + if (ex != null) { + System.err.println("UPDATE failed due to exception: " + ex); + res = Status.ERROR; + } + long en = System.nanoTime(); + measure("UPDATE", res, ist, st, en); + measurements.reportStatus("UPDATE", res); + }); + + queries.add(updateFuture); + return updateFuture; } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified * record key. * * @param table The name of the table * @param key The record key of the record to insert. * @param values A HashMap of field/value pairs to insert in the record * @return The result of the operation. */ - public Status insert(String table, String key, - Map values) { + public CompletableFuture insert(String table, String key, + Map values) { try (final TraceScope span = tracer.newScope(scopeStringInsert)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - Status res = db.insert(table, key, values); - long en = System.nanoTime(); - measure("INSERT", res, ist, st, en); - measurements.reportStatus("INSERT", res); - return res; + + CompletableFuture insertFuture = db.insert(table, key, values).whenComplete((res, ex) -> { + if (ex != null) { + System.err.println("INSERT failed due to exception: " + ex); + res = Status.ERROR; + } + long en = System.nanoTime(); + measure("INSERT", res, ist, st, en); + measurements.reportStatus("INSERT", res); + }); + + queries.add(insertFuture); + return insertFuture; } } /** * Delete a record from the database. * * @param table The name of the table * @param key The record key of the record to delete. * @return The result of the operation. */ - public Status delete(String table, String key) { + public CompletableFuture delete(String table, String key) { try (final TraceScope span = tracer.newScope(scopeStringDelete)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - Status res = db.delete(table, key); - long en = System.nanoTime(); - measure("DELETE", res, ist, st, en); - measurements.reportStatus("DELETE", res); - return res; + + CompletableFuture deleteFuture = db.delete(table, key).whenComplete((res, ex) -> { + if (ex != null) { + System.err.println("DELETE failed due to exception: " + ex); + res = Status.ERROR; + } + long en = System.nanoTime(); + measure("DELETE", res, ist, st, en); + measurements.reportStatus("DELETE", res); + }); + + queries.add(deleteFuture); + return deleteFuture; } } } diff --git a/core/src/main/java/com/yahoo/ycsb/Workload.java b/core/src/main/java/com/yahoo/ycsb/Workload.java index 11aae672..289c2b23 100644 --- a/core/src/main/java/com/yahoo/ycsb/Workload.java +++ b/core/src/main/java/com/yahoo/ycsb/Workload.java @@ -1,122 +1,122 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import java.util.concurrent.atomic.AtomicBoolean; import java.util.Properties; /** * One experiment scenario. One object of this type will * be instantiated and shared among all client threads. This class * should be constructed using a no-argument constructor, so we can * load it dynamically. Any argument-based initialization should be * done by init(). * * If you extend this class, you should support the "insertstart" property. This * allows the Client to proceed from multiple clients on different machines, in case * the client is the bottleneck. For example, if we want to load 1 million records from * 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally, * the "insertcount" property, which is interpreted by Client, can be used to tell each instance of the * client how many inserts to do. In the example above, both clients should have insertcount=500000. */ public abstract class Workload { public static final String INSERT_START_PROPERTY = "insertstart"; public static final String INSERT_COUNT_PROPERTY = "insertcount"; public static final String INSERT_START_PROPERTY_DEFAULT = "0"; private volatile AtomicBoolean stopRequested = new AtomicBoolean(false); /** Operations available for a database. */ public enum Operation { READ, UPDATE, INSERT, SCAN, DELETE } /** * Initialize the scenario. Create any generators and other shared objects here. * Called once, in the main client thread, before any operations are started. */ public void init(Properties p) throws WorkloadException { } /** * Initialize any state for a particular client thread. Since the scenario object * will be shared among all threads, this is the place to create any state that is specific * to one thread. To be clear, this means the returned object should be created anew on each * call to initThread(); do not return the same object multiple times. * The returned object will be passed to invocations of doInsert() and doTransaction() * for this thread. There should be no side effects from this call; all state should be encapsulated * in the returned object. If you have no state to retain for this thread, return null. (But if you have * no state to retain for this thread, probably you don't need to override initThread().) * * @return false if the workload knows it is done for this thread. Client will terminate the thread. * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read * traces from a file, return true when there are more to do, false when you are done. */ public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { return null; } /** * Cleanup the scenario. Called once, in the main client thread, after all operations have completed. */ public void cleanup() throws WorkloadException { } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. */ - public abstract boolean doInsert(DB db, Object threadstate); + public abstract boolean doInsert(AsyncDB db, Object threadstate); /** * Do one transaction operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. * * @return false if the workload knows it is done for this thread. Client will terminate the thread. * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read * traces from a file, return true when there are more to do, false when you are done. */ - public abstract boolean doTransaction(DB db, Object threadstate); + public abstract boolean doTransaction(AsyncDB db, Object threadstate); /** * Allows scheduling a request to stop the workload. */ public void requestStop() { stopRequested.set(true); } /** * Check the status of the stop request flag. * @return true if stop was requested, false otherwise. */ public boolean isStopRequested() { return stopRequested.get(); } } diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index d3fd84b3..564d3ea3 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,865 +1,913 @@ /** * Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016-2017 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import com.yahoo.ycsb.*; import com.yahoo.ycsb.generator.*; import com.yahoo.ycsb.generator.UniformLongGenerator; import com.yahoo.ycsb.measurements.Measurements; import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; /** * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The * relative proportion of different kinds of operations, and other properties of the workload, * are controlled by parameters specified at runtime. *

* Properties to control the client: *

*/ public class CoreWorkload extends Workload { /** * The name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY = "table"; /** * The default name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY_DEFAULT = "usertable"; protected String table; /** * The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY = "fieldcount"; /** * Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; private List fieldnames; /** * The name of the property for the field length distribution. Options are "uniform", "zipfian" * (favouring short records), "constant", and "histogram". *

* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the * fieldlength property. If "histogram", then the histogram will be read from the filename * specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution"; /** * The default field length distribution. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; /** * The name of the property for the length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY = "fieldlength"; /** * The default maximum length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100"; /** * The name of the property for the minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY = "minfieldlength"; /** * The default minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of a property that specifies the filename containing the field length histogram (only * used if fieldlengthdistribution is "histogram"). */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; /** * The default filename containing a field length histogram. */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; /** * Generator object that produces field lengths. The value of this depends on the properties that * start with "FIELD_LENGTH_". */ protected NumberGenerator fieldlengthgenerator; /** * The name of the property for deciding whether to read one field (false) or all fields (true) of * a record. */ public static final String READ_ALL_FIELDS_PROPERTY = "readallfields"; /** * The default value for the readallfields property. */ public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true"; protected boolean readallfields; /** * The name of the property for deciding whether to write one field (false) or all fields (true) * of a record. */ public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields"; /** * The default value for the writeallfields property. */ public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false"; protected boolean writeallfields; /** * The name of the property for deciding whether to check all returned * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; /** * The default value for the dataintegrity property. */ public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false"; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ private boolean dataintegrity; /** * The name of the property for the proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY = "readproportion"; /** * The default proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95"; /** * The name of the property for the proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion"; /** * The default proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05"; /** * The name of the property for the proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY = "insertproportion"; /** * The default proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY = "scanproportion"; /** * The default proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are read-modify-write. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion"; /** * The default proportion of transactions that are scans. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the the distribution of requests across the keyspace. Options are * "uniform", "zipfian" and "latest" */ public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution"; /** * The default distribution of requests across the keyspace. */ public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for adding zero padding to record numbers in order to match * string sort order. Controls the number of 0s to left pad with. */ public static final String ZERO_PADDING_PROPERTY = "zeropadding"; /** * The default zero padding value. Matches integer sort order */ public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1"; /** * The name of the property for the min scan length (number of records). */ public static final String MIN_SCAN_LENGTH_PROPERTY = "minscanlength"; /** * The default min scan length. */ public static final String MIN_SCAN_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of the property for the max scan length (number of records). */ public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength"; /** * The default max scan length. */ public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000"; /** * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" * (favoring short scans) */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution"; /** * The default max scan length. */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for the order to insert records. Options are "ordered" or "hashed" */ public static final String INSERT_ORDER_PROPERTY = "insertorder"; /** * Default insert order. */ public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed"; /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; /** * How many times to retry when insertion of a single item to a DB fails. */ public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit"; public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0"; /** * On average, how long to wait between the retries, in seconds. */ public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval"; public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3"; + public static final String OPEN_LOOP_PROPERTY = "openloop"; + public static final String OPEN_LOOP_PROPERTY_DEFAULT = "false"; + + protected boolean openloop; + protected NumberGenerator keysequence; protected DiscreteGenerator operationchooser; protected NumberGenerator keychooser; protected NumberGenerator fieldchooser; protected AcknowledgedCounterGenerator transactioninsertkeysequence; protected NumberGenerator scanlength; protected boolean orderedinserts; protected long fieldcount; protected long recordcount; protected int zeropadding; protected int insertionRetryLimit; protected int insertionRetryInterval; private Measurements measurements = Measurements.getMeasurements(); protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException { NumberGenerator fieldlengthgenerator; String fieldlengthdistribution = p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int fieldlength = Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT)); int minfieldlength = Integer.parseInt(p.getProperty(MIN_FIELD_LENGTH_PROPERTY, MIN_FIELD_LENGTH_PROPERTY_DEFAULT)); String fieldlengthhistogram = p.getProperty( FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); if (fieldlengthdistribution.compareTo("constant") == 0) { fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); } else if (fieldlengthdistribution.compareTo("uniform") == 0) { fieldlengthgenerator = new UniformLongGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("zipfian") == 0) { fieldlengthgenerator = new ZipfianGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("histogram") == 0) { try { fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); } catch (IOException e) { throw new WorkloadException( "Couldn't read field length histogram file: " + fieldlengthhistogram, e); } } else { throw new WorkloadException( "Unknown field length distribution \"" + fieldlengthdistribution + "\""); } return fieldlengthgenerator; } /** * Initialize the scenario. * Called once, in the main client thread, before any operations are started. */ @Override public void init(Properties p) throws WorkloadException { table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); fieldcount = Long.parseLong(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT)); fieldnames = new ArrayList<>(); for (int i = 0; i < fieldcount; i++) { fieldnames.add("field" + i); } fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); recordcount = Long.parseLong(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { recordcount = Integer.MAX_VALUE; } String requestdistrib = p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); int minscanlength = Integer.parseInt(p.getProperty(MIN_SCAN_LENGTH_PROPERTY, MIN_SCAN_LENGTH_PROPERTY_DEFAULT)); int maxscanlength = Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib = p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); long insertstart = Long.parseLong(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT)); long insertcount= Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart))); // Confirm valid values for insertstart and insertcount in relation to recordcount if (recordcount < (insertstart + insertcount)) { System.err.println("Invalid combination of insertstart, insertcount and recordcount."); System.err.println("recordcount must be bigger than insertstart + insertcount."); System.exit(-1); } zeropadding = Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT)); readallfields = Boolean.parseBoolean( p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT)); writeallfields = Boolean.parseBoolean( p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); dataintegrity = Boolean.parseBoolean( p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); // Confirm that fieldlengthgenerator returns a constant if data // integrity check requested. if (dataintegrity && !(p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) { orderedinserts = false; } else if (requestdistrib.compareTo("exponential") == 0) { double percentile = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordcount * frac); } else { orderedinserts = true; } keysequence = new CounterGenerator(insertstart); operationchooser = createOperationGenerator(p); transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform") == 0) { keychooser = new UniformLongGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("sequential") == 0) { keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("zipfian") == 0) { // it does this by generating a random "next key" in part by taking the modulus over the // number of keys. // If the number of keys changes, this would shift the modulus, and we don't want that to // change which keys are popular so we'll actually construct the scrambled zipfian generator // with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict // the number of inserts, and tell the scrambled zipfian generator the number of existing keys // plus the number of predicted keys as the total keyspace. then, if the generator picks a key // that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of // the keyspace doesn't change from the perspective of the scrambled zipfian generator final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys); } else if (requestdistrib.compareTo("latest") == 0) { keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); } fieldchooser = new UniformLongGenerator(0, fieldcount - 1); if (scanlengthdistrib.compareTo("uniform") == 0) { scanlength = new UniformLongGenerator(minscanlength, maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian") == 0) { scanlength = new ZipfianGenerator(minscanlength, maxscanlength); } else { throw new WorkloadException( "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); } insertionRetryLimit = Integer.parseInt(p.getProperty( INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT)); insertionRetryInterval = Integer.parseInt(p.getProperty( INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT)); + + openloop = Boolean.parseBoolean( + p.getProperty(OPEN_LOOP_PROPERTY, OPEN_LOOP_PROPERTY_DEFAULT)); } protected String buildKeyName(long keynum) { if (!orderedinserts) { keynum = Utils.hash(keynum); } String value = Long.toString(keynum); int fill = zeropadding - value.length(); String prekey = "user"; for (int i = 0; i < fill; i++) { prekey += '0'; } return prekey + value; } /** * Builds a value for a randomly chosen field. */ private HashMap buildSingleValue(String key) { HashMap value = new HashMap<>(); String fieldkey = fieldnames.get(fieldchooser.nextValue().intValue()); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); } value.put(fieldkey, data); return value; } /** * Builds values for all fields. */ private HashMap buildValues(String key) { HashMap values = new HashMap<>(); for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); } values.put(fieldkey, data); } return values; } /** * Build a deterministic value given the key information. */ private String buildDeterministicValue(String key, String fieldkey) { int size = fieldlengthgenerator.nextValue().intValue(); StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, * this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override - public boolean doInsert(DB db, Object threadstate) { + public boolean doInsert(AsyncDB db, Object threadstate) { int keynum = keysequence.nextValue().intValue(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); Status status; int numOfRetries = 0; do { - status = db.insert(table, dbkey, values); + status = db.syncView().insert(table, dbkey, values); if (null != status && status.isOk()) { break; } // Retry if configured. Without retrying, the load process will fail // even if one single insertion fails. User can optionally configure // an insertion retry limit (default is 0) to enable retry. if (++numOfRetries <= insertionRetryLimit) { System.err.println("Retrying insertion, retry count: " + numOfRetries); try { // Sleep for a random number between [0.8, 1.2)*insertionRetryInterval. int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random())); Thread.sleep(sleepTime); } catch (InterruptedException e) { break; } } else { System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries + "Insertion Retry Limit: " + insertionRetryLimit); break; } } while (true); return null != status && status.isOk(); } /** * Do one transaction operation. Because it will be called concurrently from multiple client * threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override - public boolean doTransaction(DB db, Object threadstate) { + public boolean doTransaction(AsyncDB db, Object threadstate) { String operation = operationchooser.nextString(); if(operation == null) { return false; } switch (operation) { case "READ": doTransactionRead(db); break; case "UPDATE": doTransactionUpdate(db); break; case "INSERT": doTransactionInsert(db); break; case "SCAN": doTransactionScan(db); break; default: doTransactionReadModifyWrite(db); } return true; } /** * Results are reported in the first three buckets of the histogram under * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. * Bucket 2 means null data was returned when some data was expected. */ protected void verifyRow(String key, HashMap cells) { Status verifyStatus = Status.OK; long startTime = System.nanoTime(); if (!cells.isEmpty()) { for (Map.Entry entry : cells.entrySet()) { if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) { verifyStatus = Status.UNEXPECTED_STATE; break; } } } else { // This assumes that null data is never valid verifyStatus = Status.ERROR; } long endTime = System.nanoTime(); measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); measurements.reportStatus("VERIFY", verifyStatus); } long nextKeynum() { long keynum; if (keychooser instanceof ExponentialGenerator) { do { keynum = transactioninsertkeysequence.lastValue() - keychooser.nextValue().intValue(); } while (keynum < 0); } else { do { keynum = keychooser.nextValue().intValue(); } while (keynum > transactioninsertkeysequence.lastValue()); } return keynum; } - public void doTransactionRead(DB db) { + public void doTransactionRead(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } else if (dataintegrity) { // pass the full field list if dataintegrity is on for verification fields = new HashSet(fieldnames); } HashMap cells = new HashMap(); - db.read(table, keyname, fields, cells); + CompletableFuture readFuture = db.read(table, keyname, fields, cells).thenAccept((res) -> { + if (res.isOk() && dataintegrity) { + verifyRow(keyname, cells); + } + }); - if (dataintegrity) { - verifyRow(keyname, cells); + if (!openloop) { + readFuture.join(); } } - public void doTransactionReadModifyWrite(DB db) { + public void doTransactionReadModifyWrite(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } HashMap values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } // do the transaction HashMap cells = new HashMap(); - long ist = measurements.getIntendedtartTimeNs(); + long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); - db.read(table, keyname, fields, cells); - - db.update(table, keyname, values); - long en = System.nanoTime(); + String opName = "READ-MODIFY-WRITE"; + String opReadFailureName = "READ-MODIFY-WRITE-READ_FAILED"; + String opWriteFailureName = "READ-MODIFY-WRITE-WRITE_FAILED"; + + CompletableFuture rFuture = db.read(table, keyname, fields, cells).thenAccept((rres) -> { + if (!rres.isOk()) { + long en = System.nanoTime(); + measurements.measure(opReadFailureName, (int) ((en - st) / 1000)); + measurements.measureIntended(opReadFailureName, (int) ((en - ist) / 1000)); + } else { + CompletableFuture wFuture = db.update(table, keyname, values).thenAccept((wres) -> { + long en = System.nanoTime(); + + if (wres.isOk() && dataintegrity) { + verifyRow(keyname, cells); + } + + measurements.measure(wres.isOk() ? opName : opWriteFailureName, (int) ((en - st) / 1000)); + measurements.measureIntended(wres.isOk() ? opName : opWriteFailureName, (int) ((en - ist) / 1000)); + }); + + if (!openloop) { + wFuture.join(); + } + } + }); - if (dataintegrity) { - verifyRow(keyname, cells); + if (!openloop) { + rFuture.join(); } - measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000)); - measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000)); } - public void doTransactionScan(DB db) { + public void doTransactionScan(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String startkeyname = buildKeyName(keynum); // choose a random scan length int len = scanlength.nextValue().intValue(); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } - db.scan(table, startkeyname, len, fields, new Vector>()); + CompletableFuture scanFuture = db.scan(table, startkeyname, len, fields, + new Vector>()); + + if (!openloop) { + scanFuture.join(); + } } - public void doTransactionUpdate(DB db) { + public void doTransactionUpdate(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashMap values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } - db.update(table, keyname, values); + CompletableFuture updateFuture = db.update(table, keyname, values); + + if (!openloop) { + updateFuture.join(); + } } - public void doTransactionInsert(DB db) { + public void doTransactionInsert(AsyncDB db) { // choose the next key long keynum = transactioninsertkeysequence.nextValue(); - try { - String dbkey = buildKeyName(keynum); + String dbkey = buildKeyName(keynum); + + HashMap values = buildValues(dbkey); + CompletableFuture insertFuture = db.insert(table, dbkey, values).thenAccept((res) -> { + if (res.isOk()) { + transactioninsertkeysequence.acknowledge(keynum); + } + }); - HashMap values = buildValues(dbkey); - db.insert(table, dbkey, values); - } finally { - transactioninsertkeysequence.acknowledge(keynum); + if (!openloop) { + insertFuture.join(); } } /** * Creates a weighted discrete values with database operations for a workload to perform. * Weights/proportions are read from the properties list and defaults are used * when values are not configured. * Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE". * * @param p The properties list to pull weights from. * @return A generator that can be used to determine the next operation to perform. * @throws IllegalArgumentException if the properties object was null. */ protected static DiscreteGenerator createOperationGenerator(final Properties p) { if (p == null) { throw new IllegalArgumentException("Properties object cannot be null"); } final double readproportion = Double.parseDouble( p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); final double updateproportion = Double.parseDouble( p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); final double scanproportion = Double.parseDouble( p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); final double readmodifywriteproportion = Double.parseDouble(p.getProperty( READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); final DiscreteGenerator operationchooser = new DiscreteGenerator(); if (readproportion > 0) { operationchooser.addValue(readproportion, "READ"); } if (updateproportion > 0) { operationchooser.addValue(updateproportion, "UPDATE"); } if (insertproportion > 0) { operationchooser.addValue(insertproportion, "INSERT"); } if (scanproportion > 0) { operationchooser.addValue(scanproportion, "SCAN"); } if (readmodifywriteproportion > 0) { operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); } return operationchooser; } } diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java index e215ef16..4ed9423e 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/RestWorkload.java @@ -1,306 +1,306 @@ /** * Copyright (c) 2016-2017 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; +import com.yahoo.ycsb.AsyncDB; import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.DB; import com.yahoo.ycsb.RandomByteIterator; import com.yahoo.ycsb.WorkloadException; import com.yahoo.ycsb.generator.*; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import com.yahoo.ycsb.generator.UniformLongGenerator; /** * Typical RESTFul services benchmarking scenario. Represents a set of client * calling REST operations like HTTP DELETE, GET, POST, PUT on a web service. * This scenario is completely different from CoreWorkload which is mainly * designed for databases benchmarking. However due to some reusable * functionality this class extends {@link CoreWorkload} and overrides necessary * methods like init, doTransaction etc. */ public class RestWorkload extends CoreWorkload { /** * The name of the property for the proportion of transactions that are * delete. */ public static final String DELETE_PROPORTION_PROPERTY = "deleteproportion"; /** * The default proportion of transactions that are delete. */ public static final String DELETE_PROPORTION_PROPERTY_DEFAULT = "0.00"; /** * The name of the property for the file that holds the field length size for insert operations. */ public static final String FIELD_LENGTH_DISTRIBUTION_FILE_PROPERTY = "fieldlengthdistfile"; /** * The default file name that holds the field length size for insert operations. */ public static final String FIELD_LENGTH_DISTRIBUTION_FILE_PROPERTY_DEFAULT = "fieldLengthDistFile.txt"; /** * In web services even though the CRUD operations follow the same request * distribution, they have different traces and distribution parameter * values. Hence configuring the parameters of these operations separately * makes the benchmark more flexible and capable of generating better * realistic workloads. */ // Read related properties. private static final String READ_TRACE_FILE = "url.trace.read"; private static final String READ_TRACE_FILE_DEFAULT = "readtrace.txt"; private static final String READ_ZIPFIAN_CONSTANT = "readzipfconstant"; private static final String READ_ZIPFIAN_CONSTANT_DEAFULT = "0.99"; private static final String READ_RECORD_COUNT_PROPERTY = "readrecordcount"; // Insert related properties. private static final String INSERT_TRACE_FILE = "url.trace.insert"; private static final String INSERT_TRACE_FILE_DEFAULT = "inserttrace.txt"; private static final String INSERT_ZIPFIAN_CONSTANT = "insertzipfconstant"; private static final String INSERT_ZIPFIAN_CONSTANT_DEAFULT = "0.99"; private static final String INSERT_SIZE_ZIPFIAN_CONSTANT = "insertsizezipfconstant"; private static final String INSERT_SIZE_ZIPFIAN_CONSTANT_DEAFULT = "0.99"; private static final String INSERT_RECORD_COUNT_PROPERTY = "insertrecordcount"; // Delete related properties. private static final String DELETE_TRACE_FILE = "url.trace.delete"; private static final String DELETE_TRACE_FILE_DEFAULT = "deletetrace.txt"; private static final String DELETE_ZIPFIAN_CONSTANT = "deletezipfconstant"; private static final String DELETE_ZIPFIAN_CONSTANT_DEAFULT = "0.99"; private static final String DELETE_RECORD_COUNT_PROPERTY = "deleterecordcount"; // Delete related properties. private static final String UPDATE_TRACE_FILE = "url.trace.update"; private static final String UPDATE_TRACE_FILE_DEFAULT = "updatetrace.txt"; private static final String UPDATE_ZIPFIAN_CONSTANT = "updatezipfconstant"; private static final String UPDATE_ZIPFIAN_CONSTANT_DEAFULT = "0.99"; private static final String UPDATE_RECORD_COUNT_PROPERTY = "updaterecordcount"; private Map readUrlMap; private Map insertUrlMap; private Map deleteUrlMap; private Map updateUrlMap; private int readRecordCount; private int insertRecordCount; private int deleteRecordCount; private int updateRecordCount; private NumberGenerator readKeyChooser; private NumberGenerator insertKeyChooser; private NumberGenerator deleteKeyChooser; private NumberGenerator updateKeyChooser; private NumberGenerator fieldlengthgenerator; private DiscreteGenerator operationchooser; @Override public void init(Properties p) throws WorkloadException { readRecordCount = Integer.parseInt(p.getProperty(READ_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE))); insertRecordCount = Integer .parseInt(p.getProperty(INSERT_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE))); deleteRecordCount = Integer .parseInt(p.getProperty(DELETE_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE))); updateRecordCount = Integer .parseInt(p.getProperty(UPDATE_RECORD_COUNT_PROPERTY, String.valueOf(Integer.MAX_VALUE))); readUrlMap = getTrace(p.getProperty(READ_TRACE_FILE, READ_TRACE_FILE_DEFAULT), readRecordCount); insertUrlMap = getTrace(p.getProperty(INSERT_TRACE_FILE, INSERT_TRACE_FILE_DEFAULT), insertRecordCount); deleteUrlMap = getTrace(p.getProperty(DELETE_TRACE_FILE, DELETE_TRACE_FILE_DEFAULT), deleteRecordCount); updateUrlMap = getTrace(p.getProperty(UPDATE_TRACE_FILE, UPDATE_TRACE_FILE_DEFAULT), updateRecordCount); operationchooser = createOperationGenerator(p); // Common distribution for all operations. String requestDistrib = p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); double readZipfconstant = Double.parseDouble(p.getProperty(READ_ZIPFIAN_CONSTANT, READ_ZIPFIAN_CONSTANT_DEAFULT)); readKeyChooser = getKeyChooser(requestDistrib, readUrlMap.size(), readZipfconstant, p); double updateZipfconstant = Double .parseDouble(p.getProperty(UPDATE_ZIPFIAN_CONSTANT, UPDATE_ZIPFIAN_CONSTANT_DEAFULT)); updateKeyChooser = getKeyChooser(requestDistrib, updateUrlMap.size(), updateZipfconstant, p); double insertZipfconstant = Double .parseDouble(p.getProperty(INSERT_ZIPFIAN_CONSTANT, INSERT_ZIPFIAN_CONSTANT_DEAFULT)); insertKeyChooser = getKeyChooser(requestDistrib, insertUrlMap.size(), insertZipfconstant, p); double deleteZipfconstant = Double .parseDouble(p.getProperty(DELETE_ZIPFIAN_CONSTANT, DELETE_ZIPFIAN_CONSTANT_DEAFULT)); deleteKeyChooser = getKeyChooser(requestDistrib, deleteUrlMap.size(), deleteZipfconstant, p); fieldlengthgenerator = getFieldLengthGenerator(p); } public static DiscreteGenerator createOperationGenerator(final Properties p) { // Re-using CoreWorkload method. final DiscreteGenerator operationChooser = CoreWorkload.createOperationGenerator(p); // Needs special handling for delete operations not supported in CoreWorkload. double deleteproportion = Double .parseDouble(p.getProperty(DELETE_PROPORTION_PROPERTY, DELETE_PROPORTION_PROPERTY_DEFAULT)); if (deleteproportion > 0) { operationChooser.addValue(deleteproportion, "DELETE"); } return operationChooser; } private static NumberGenerator getKeyChooser(String requestDistrib, int recordCount, double zipfContant, Properties p) throws WorkloadException { NumberGenerator keychooser; switch (requestDistrib) { case "exponential": double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordCount * frac); break; case "uniform": keychooser = new UniformLongGenerator(0, recordCount - 1); break; case "zipfian": keychooser = new ZipfianGenerator(recordCount, zipfContant); break; case "latest": throw new WorkloadException("Latest request distribution is not supported for RestWorkload."); case "hotspot": double hotsetfraction = Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(0, recordCount - 1, hotsetfraction, hotopnfraction); break; default: throw new WorkloadException("Unknown request distribution \"" + requestDistrib + "\""); } return keychooser; } protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException { // Re-using CoreWorkload method. NumberGenerator fieldLengthGenerator = CoreWorkload.getFieldLengthGenerator(p); String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); // Needs special handling for Zipfian distribution for variable Zipf Constant. if (fieldlengthdistribution.compareTo("zipfian") == 0) { int fieldlength = Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT)); double insertsizezipfconstant = Double .parseDouble(p.getProperty(INSERT_SIZE_ZIPFIAN_CONSTANT, INSERT_SIZE_ZIPFIAN_CONSTANT_DEAFULT)); fieldLengthGenerator = new ZipfianGenerator(1, fieldlength, insertsizezipfconstant); } return fieldLengthGenerator; } /** * Reads the trace file and returns a URL map. */ private static Map getTrace(String filePath, int recordCount) throws WorkloadException { Map urlMap = new HashMap(); int count = 0; String line; try { FileReader inputFile = new FileReader(filePath); BufferedReader bufferReader = new BufferedReader(inputFile); while ((line = bufferReader.readLine()) != null) { urlMap.put(count++, line.trim()); if (count >= recordCount) { break; } } bufferReader.close(); } catch (IOException e) { throw new WorkloadException( "Error while reading the trace. Please make sure the trace file path is correct. " + e.getLocalizedMessage()); } return urlMap; } /** * Not required for Rest Clients as data population is service specific. */ @Override - public boolean doInsert(DB db, Object threadstate) { + public boolean doInsert(AsyncDB db, Object threadstate) { return false; } @Override - public boolean doTransaction(DB db, Object threadstate) { + public boolean doTransaction(AsyncDB db, Object threadstate) { String operation = operationchooser.nextString(); if (operation == null) { return false; } switch (operation) { case "UPDATE": doTransactionUpdate(db); break; case "INSERT": doTransactionInsert(db); break; case "DELETE": doTransactionDelete(db); break; default: doTransactionRead(db); } return true; } /** * Returns next URL to be called. */ private String getNextURL(int opType) { if (opType == 1) { return readUrlMap.get(readKeyChooser.nextValue().intValue()); } else if (opType == 2) { return insertUrlMap.get(insertKeyChooser.nextValue().intValue()); } else if (opType == 3) { return deleteUrlMap.get(deleteKeyChooser.nextValue().intValue()); } else { return updateUrlMap.get(updateKeyChooser.nextValue().intValue()); } } @Override - public void doTransactionRead(DB db) { + public void doTransactionRead(AsyncDB db) { HashMap result = new HashMap(); db.read(null, getNextURL(1), null, result); } @Override - public void doTransactionInsert(DB db) { + public void doTransactionInsert(AsyncDB db) { HashMap value = new HashMap(); // Create random bytes of insert data with a specific size. value.put("data", new RandomByteIterator(fieldlengthgenerator.nextValue().longValue())); db.insert(null, getNextURL(2), value); } - public void doTransactionDelete(DB db) { + public void doTransactionDelete(AsyncDB db) { db.delete(null, getNextURL(3)); } @Override - public void doTransactionUpdate(DB db) { + public void doTransactionUpdate(AsyncDB db) { HashMap value = new HashMap(); // Create random bytes of update data with a specific size. value.put("data", new RandomByteIterator(fieldlengthgenerator.nextValue().longValue())); db.update(null, getNextURL(4), value); } } diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java index be97b206..b81e966c 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java @@ -1,1286 +1,1286 @@ /** * Copyright (c) 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import com.yahoo.ycsb.AsyncDB; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Client; -import com.yahoo.ycsb.DB; import com.yahoo.ycsb.NumericByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.Utils; import com.yahoo.ycsb.Workload; import com.yahoo.ycsb.WorkloadException; import com.yahoo.ycsb.generator.DiscreteGenerator; import com.yahoo.ycsb.generator.Generator; import com.yahoo.ycsb.generator.HotspotIntegerGenerator; import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator; import com.yahoo.ycsb.generator.NumberGenerator; import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator; import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; import com.yahoo.ycsb.generator.SequentialGenerator; import com.yahoo.ycsb.generator.UniformLongGenerator; import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator; import com.yahoo.ycsb.generator.ZipfianGenerator; import com.yahoo.ycsb.measurements.Measurements; /** * A specialized workload dealing with time series data, i.e. series of discreet * events associated with timestamps and identifiers. For this workload, identities * consist of a {@link String} key and a set of {@link String} tag key/value * pairs. *

* For example: * * * * * * *
Time Series KeyTag Keys/Values148322880014832288601483228920
AAAA=AA, AB=AA42.51.085.9
AAAA=AA, AB=AB-9.476.90.18
ABAA=AA, AB=AA-93.057.1-63.8
ABAA=AA, AB=AB7.656.1-0.3
*

* This table shows four time series with 3 measurements at three different timestamps. * Keys, tags, timestamps and values (numeric only at this time) are generated by * this workload. For details on properties and behavior, see the * {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation * and how {@link DB} clients can parse the workload. *

* In order to avoid having existing DB implementations implement a brand new interface * this workload uses the existing APIs to encode a few special values that can be parsed * by the client. The special values include the timestamp, numeric value and some * query (read or scan) parameters. As an example on how to parse the fields, see * {@link BasicTSDB}. *

* Timestamps *

* Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS}, * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the * {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)} * and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the * {@code values} map encoded in a {@link NumericByteIterator} with the key defined * in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp * when iterating over the values map, cast the {@link ByteIterator} to a * {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}. *

* Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps * earlier than the timestamp generator's timestamp will be choosen at random to * mimic a lambda architecture or old job re-reporting some data. *

* For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and * {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps * are encoded in a {@link StringByteIterator} in a key/value format with the * {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan} * has been set to a positive value then the value will include a range with the * starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator * and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}. *

* For calls to {@link DB#delete(String, String)}, encoding is the same as reads and * scans but key/value pairs are separated by the {@code deletedelimiter} property value. *

* By default, the starting timestamp is the current system time without any rounding. * All timestamps are then offsets from that starting value. *

* Values *

* Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored * in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV"). * Values can either be 64 bit signed {@link long}s or double precision {@link double}s * depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing * out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine * whether or not to call {@link NumericByteIterator#getDouble()} (true) or * {@link NumericByteIterator#getLong()} (false). *

* When {@code dataintegrity} is set to true, then the value is always set to a * 64 bit signed integer which is the Java hash code of the concatenation of the * key and map of values (sorted on the map keys and skipping the timestamp and value * entries) OR'd with the timestamp of the data point. See * {@link #validationFunction(String, long, TreeMap)} for the implementation. *

* Keys and Tags *

* As mentioned, the workload generates strings for the keys and tags. On initialization * three string generators are created using the {@link IncrementingPrintableStringGenerator} * implementation. Then the generators fill three arrays with values based on the * number of keys, the number of tags and the cardinality of each tag key/value pair. * This implementation gives us time series like the example table where every string * starts at something like "AA" (depending on the length of keys, tag keys and tag values) * and continuing to "ZZ" wherein they rollover back to "AA". *

* Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear * more than once per time series. If the workload is configured for four tags with a * tag key length of 2, the keys would be "AA", "AB", "AC" and "AD". *

* Each tag key is then associated with a tag value. Tag values may appear more than once * in each time series. E.g. time series will usually start with the tags "AA=AA", * "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many * unique values will be generated per tag key. In the example table above, the * {@code tagcardinality} property would have been set to {@code 1,2} meaning tag * key "AA" would always have the tag value "AA" given a cardinality of 1. However * tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This * cardinality map, along with the number of unique time series keys determines how * many unique time series are generated for the workload. Tag values share a common * array of generated strings to save on memory. *

* Operation Order *

* The default behavior of the workload (for inserts and updates) is to generate a * value for each time series for a given timestamp before incrementing to the next * timestamp and writing values. This is an ideal workload and some time series * databases are designed for this behavior. However in the real-world events will * arrive grouped close to the current system time with a number of events being * delayed, hence their timestamps are further in the past. The {@code delayedseries} * property determines the percentage of time series that are delayed by up to * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that * 5% of the time series will be written with timestamps earlier than the timestamp * generator's current time. *

* Reads and Scans *

* For benchmarking queries, some common tasks implemented by almost every time series * data base are available and are passed in the fields {@link Set}: *

* GroupBy - A common operation is to aggregate multiple time series into a * single time series via common parameters. For example, a user may want to see the * total network traffic in a data center so they'll issue a SQL query like: * SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value); * If the {@code groupbyfunction} has been set to a group by function, then the fields * will contain a key/value pair with the key set in {@code groupbykey}. E.g. * {@code YCSBGB=SUM}. *

* Additionally with grouping enabled, fields on tag keys where group bys should * occur will only have the key defined and will not have a value or delimiter. E.g. * if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}. *

* Downsampling - Another common operation is to reduce the resolution of the * queried time series when fetching a wide time range of data so fewer data points * are returned. For example, a user may fetch a week of data but if the data is * recorded on a 1 second interval, that would be over 600k data points so they * may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all * of the data points for a "bucket" are aggregated into a single value. *

* To enable downsampling, the {@code downsamplingfunction} property must be set to * a supported function such as "SUM" and the {@code downsamplinginterval} must be * set to a valid time interval with the same units as {@code timestampunits}, e.g. * "3600" which would create 1 hour buckets if the time units were set to seconds. * With downsampling, query fields will include a key/value pair with * {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being * a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval}, * for example {@code YCSBDS=SUM60}. *

* Timestamps - For every read, a random timestamp is selected from the interval * set. If {@code querytimespan} has been set to a positive value, then the configured * query time interval is added to the selected timestamp so the read passes the DB * a range of times. Note that during the run phase, if no data was previously loaded, * or if there are more {@code recordcount}s set for the run phase, reads may be sent * to the DB with timestamps that are beyond the written data time range (or even the * system clock of the DB). *

* Deletes *

* Because the delete API only accepts a single key, a full key and tag key/value * pair map is flattened into a single string for parsing by the database. Common * workloads include deleting a single time series (wherein all tag key and values are * defined), deleting all series containing a tag key and value or deleting all of the * time series sharing a common time series key. *

* Right now the workload supports deletes with a key and for time series tag key/value * pairs or a key with tags and a group by on one or more tags (meaning, delete all of * the series with any value for the given tag key). The parameters are collapsed into * a single string delimited with the character in the {@code deletedelimiter} property. * For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the * first time series in the table above. *

* Threads *

* For a multi-threaded execution, the number of time series keys set via the * {@code fieldcount} property, must be greater than or equal to the number of * threads set via {@code threads}. This is due to each thread choosing a subset * of the total number of time series keys and being responsible for writing values * for each time series containing those keys at each timestamp. Thus each thread * will have it's own timestamp generator, incrementing each time every time series * it is responsible for has had a value written. *

* Each thread may, however, issue reads and scans for any time series in the * complete set. *

* Sparsity *

* By default, during loads, every time series will have a data point written at every * time stamp in the interval set. This is common in workloads where a sensor writes * a value at regular intervals. However some time series are only reported under * certain conditions. *

* For example, a counter may track the number of errors over a * time period for a web service and only report when the value is greater than 1. * Or a time series may include tags such as a user ID and IP address when a request * arrives at the web service and only report values when that combination is seen. * This means the timeseries will not have a value at every timestamp and in * some cases there may be only a single value! *

* This workload has a {@code sparsity} parameter that can choose how often a * time series should record a value. The default value of 0.0 means every series * will get a value at every timestamp. A value of 0.95 will mean that for each * series, only 5% of the timestamps in the interval will have a value. The distribution * of values is random. *

* Notes/Warnings *

*

*

* TODOs *

*

*/ public class TimeSeriesWorkload extends Workload { /** * The types of values written to the timeseries store. */ public enum ValueType { INTEGERS("integers"), FLOATS("floats"), MIXED("mixednumbers"); protected final String name; ValueType(final String name) { this.name = name; } public static ValueType fromString(final String name) { for (final ValueType type : ValueType.values()) { if (type.name.equalsIgnoreCase(name)) { return type; } } throw new IllegalArgumentException("Unrecognized type: " + name); } } /** Name and default value for the timestamp key property. */ public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey"; public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS"; /** Name and default value for the value key property. */ public static final String VALUE_KEY_PROPERTY = "valuekey"; public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV"; /** Name and default value for the timestamp interval property. */ public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval"; public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60"; /** Name and default value for the timestamp units property. */ public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits"; public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS"; /** Name and default value for the number of tags property. */ public static final String TAG_COUNT_PROPERTY = "tagcount"; public static final String TAG_COUNT_PROPERTY_DEFAULT = "4"; /** Name and default value for the tag value cardinality map property. */ public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality"; public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8"; /** Name and default value for the tag key length property. */ public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength"; public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8"; /** Name and default value for the tag value length property. */ public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength"; public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8"; /** Name and default value for the tag pair delimiter property. */ public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter"; public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "="; /** Name and default value for the delete string delimiter property. */ public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter"; public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":"; /** Name and default value for the random timestamp write order property. */ public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder"; public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false"; /** Name and default value for the random time series write order property. */ public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder"; public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true"; /** Name and default value for the value types property. */ public static final String VALUE_TYPE_PROPERTY = "valuetype"; public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats"; /** Name and default value for the sparsity property. */ public static final String SPARSITY_PROPERTY = "sparsity"; public static final String SPARSITY_PROPERTY_DEFAULT = "0.00"; /** Name and default value for the delayed series percentage property. */ public static final String DELAYED_SERIES_PROPERTY = "delayedseries"; public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10"; /** Name and default value for the delayed series intervals property. */ public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals"; public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5"; /** Name and default value for the query time span property. */ public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan"; public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0"; /** Name and default value for the randomized query time span property. */ public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan"; public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false"; /** Name and default value for the query time stamp delimiter property. */ public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter"; public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ","; /** Name and default value for the group-by key property. */ public static final String GROUPBY_KEY_PROPERTY = "groupbykey"; public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB"; /** Name and default value for the group-by function property. */ public static final String GROUPBY_PROPERTY = "groupbyfunction"; /** Name and default value for the group-by key map property. */ public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys"; /** Name and default value for the downsampling key property. */ public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey"; public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS"; /** Name and default value for the downsampling function property. */ public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction"; /** Name and default value for the downsampling interval property. */ public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval"; /** The properties to pull settings from. */ protected Properties properties; /** Generators for keys, tag keys and tag values. */ protected Generator keyGenerator; protected Generator tagKeyGenerator; protected Generator tagValueGenerator; /** The timestamp key, defaults to "YCSBTS". */ protected String timestampKey; /** The value key, defaults to "YCSBDS". */ protected String valueKey; /** The number of time units in between timestamps. */ protected int timestampInterval; /** The units of time the timestamp and various intervals represent. */ protected TimeUnit timeUnits; /** Whether or not to randomize the timestamp order when writing. */ protected boolean randomizeTimestampOrder; /** Whether or not to randomize (shuffle) the time series order. NOT compatible * with data integrity. */ protected boolean randomizeTimeseriesOrder; /** The type of values to generate when writing data. */ protected ValueType valueType; /** Used to calculate an offset for each time series. */ protected int[] cumulativeCardinality; /** The calculated total cardinality based on the config. */ protected int totalCardinality; /** The calculated per-time-series-key cardinality. I.e. the number of unique * tag key and value combinations. */ protected int perKeyCardinality; /** How much data to scan for in each call. */ protected NumberGenerator scanlength; /** A generator used to select a random time series key per read/scan. */ protected NumberGenerator keychooser; /** A generator to select what operation to perform during the run phase. */ protected DiscreteGenerator operationchooser; /** The maximum number of interval offsets from the starting timestamp. Calculated * based on the number of records configured for the run. */ protected int maxOffsets; /** The number of records or operations to perform for this run. */ protected int recordcount; /** The number of tag pairs per time series. */ protected int tagPairs; /** The table we'll write to. */ protected String table; /** How many time series keys will be generated. */ protected int numKeys; /** The generated list of possible time series key values. */ protected String[] keys; /** The generated list of possible tag key values. */ protected String[] tagKeys; /** The generated list of possible tag value values. */ protected String[] tagValues; /** The cardinality for each tag key. */ protected int[] tagCardinality; /** A helper to skip non-incrementing tag values. */ protected int firstIncrementableCardinality; /** How sparse the data written should be. */ protected double sparsity; /** The percentage of time series that should be delayed in writes. */ protected double delayedSeries; /** The maximum number of intervals to delay a series. */ protected int delayedIntervals; /** Optional query time interval during reads/scans. */ protected int queryTimeSpan; /** Whether or not the actual interval should be randomly chosen, using * queryTimeSpan as the maximum value. */ protected boolean queryRandomTimeSpan; /** The delimiter for tag pairs in fields. */ protected String tagPairDelimiter; /** The delimiter between parameters for the delete key. */ protected String deleteDelimiter; /** The delimiter between timestamps for query time spans. */ protected String queryTimeSpanDelimiter; /** Whether or not to issue group-by queries. */ protected boolean groupBy; /** The key used for group-by tag keys. */ protected String groupByKey; /** The function used for group-by's. */ protected String groupByFunction; /** The tag keys to group on. */ protected boolean[] groupBys; /** Whether or not to issue downsampling queries. */ protected boolean downsample; /** The key used for downsampling tag keys. */ protected String downsampleKey; /** The downsampling function. */ protected String downsampleFunction; /** The downsampling interval. */ protected int downsampleInterval; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ protected boolean dataintegrity; /** Measurements to write data integrity results to. */ protected Measurements measurements = Measurements.getMeasurements(); @Override public void init(final Properties p) throws WorkloadException { properties = p; recordcount = Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { recordcount = Integer.MAX_VALUE; } timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT); valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT); operationchooser = CoreWorkload.createOperationGenerator(properties); final int maxscanlength = Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY, CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib = p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY, CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); if (scanlengthdistrib.compareTo("uniform") == 0) { scanlength = new UniformLongGenerator(1, maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian") == 0) { scanlength = new ZipfianGenerator(1, maxscanlength); } else { throw new WorkloadException( "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); } randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty( RANDOMIZE_TIMESTAMP_ORDER_PROPERTY, RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT)); randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty( RANDOMIZE_TIMESERIES_ORDER_PROPERTY, RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT)); // setup the cardinality numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, TAG_COUNT_PROPERTY_DEFAULT)); sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT)); tagCardinality = new int[tagPairs]; final String requestdistrib = p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY, CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); if (requestdistrib.compareTo("uniform") == 0) { keychooser = new UniformLongGenerator(0, numKeys - 1); } else if (requestdistrib.compareTo("sequential") == 0) { keychooser = new SequentialGenerator(0, numKeys - 1); } else if (requestdistrib.compareTo("zipfian") == 0) { keychooser = new ScrambledZipfianGenerator(0, numKeys - 1); //} else if (requestdistrib.compareTo("latest") == 0) { // keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION, CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION, CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(0, numKeys - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); } // figure out the start timestamp based on the units, cardinality and interval try { timestampInterval = Integer.parseInt(p.getProperty( TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT)); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse the " + TIMESTAMP_INTERVAL_PROPERTY, nfe); } try { timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY, TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase()); } catch (IllegalArgumentException e) { throw new WorkloadException("Unknown time unit type", e); } if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) { throw new WorkloadException("YCSB doesn't support " + timeUnits + " at this time."); } tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT); deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT); dataintegrity = Boolean.parseBoolean( p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT)); queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, QUERY_TIMESPAN_PROPERTY_DEFAULT)); queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT)); queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT); groupByFunction = p.getProperty(GROUPBY_PROPERTY); if (groupByFunction != null && !groupByFunction.isEmpty()) { final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY); if (groupByKeys == null || groupByKeys.isEmpty()) { throw new WorkloadException("Group by was enabled but no keys were specified."); } final String[] gbKeys = groupByKeys.split(","); if (gbKeys.length != tagKeys.length) { throw new WorkloadException("Only " + gbKeys.length + " group by keys " + "were specified but there were " + tagKeys.length + " tag keys given."); } groupBys = new boolean[gbKeys.length]; for (int i = 0; i < gbKeys.length; i++) { groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true; } groupBy = true; } downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT); downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY); if (downsampleFunction != null && !downsampleFunction.isEmpty()) { final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY); if (interval == null || interval.isEmpty()) { throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '" + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set."); } downsampleInterval = Integer.parseInt(interval); downsample = true; } delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT)); delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT)); valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT)); table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); initKeysAndTags(); validateSettings(); } @Override public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { if (properties == null) { throw new WorkloadException("Workload has not been initialized."); } return new ThreadState(mythreadid, threadcount); } @Override - public boolean doInsert(DB db, Object threadstate) { + public boolean doInsert(AsyncDB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } final Map tags = new TreeMap(); final String key = ((ThreadState)threadstate).nextDataPoint(tags, true); - if (db.insert(table, key, tags) == Status.OK) { + if (db.insert(table, key, tags).join() == Status.OK) { return true; } return false; } @Override - public boolean doTransaction(DB db, Object threadstate) { + public boolean doTransaction(AsyncDB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } switch (operationchooser.nextString()) { case "READ": doTransactionRead(db, threadstate); break; case "UPDATE": doTransactionUpdate(db, threadstate); break; case "INSERT": doTransactionInsert(db, threadstate); break; case "SCAN": doTransactionScan(db, threadstate); break; case "DELETE": doTransactionDelete(db, threadstate); break; default: return false; } return true; } - protected void doTransactionRead(final DB db, Object threadstate) { + protected void doTransactionRead(final AsyncDB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final String keyname = keys[keychooser.nextValue().intValue()]; final Random random = ThreadLocalRandom.current(); int offsets = state.queryOffsetGenerator.nextValue().intValue(); //int offsets = random.nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags Set fields = new HashSet(); for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { fields.add(tagKeys[i]); } else { fields.add(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { fields.add(timestampKey + tagPairDelimiter + startTimestamp); } if (groupBy) { fields.add(groupByKey + tagPairDelimiter + groupByFunction); } if (downsample) { fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval); } final Map cells = new HashMap(); - final Status status = db.read(table, keyname, fields, cells); + final Status status = db.read(table, keyname, fields, cells).join(); if (dataintegrity && status == Status.OK) { verifyRow(keyname, cells); } } - protected void doTransactionUpdate(final DB db, Object threadstate) { + protected void doTransactionUpdate(final AsyncDB db, Object threadstate) { if (threadstate == null) { throw new IllegalStateException("Missing thread state."); } final Map tags = new TreeMap(); final String key = ((ThreadState)threadstate).nextDataPoint(tags, false); db.update(table, key, tags); } - protected void doTransactionInsert(final DB db, Object threadstate) { + protected void doTransactionInsert(final AsyncDB db, Object threadstate) { doInsert(db, threadstate); } - protected void doTransactionScan(final DB db, Object threadstate) { + protected void doTransactionScan(final AsyncDB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final Random random = ThreadLocalRandom.current(); final String keyname = keys[random.nextInt(keys.length)]; // choose a random scan length int len = scanlength.nextValue().intValue(); int offsets = random.nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags Set fields = new HashSet(); for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { fields.add(tagKeys[i]); } else { fields.add(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { fields.add(timestampKey + tagPairDelimiter + startTimestamp); } if (groupBy) { fields.add(groupByKey + tagPairDelimiter + groupByFunction); } if (downsample) { fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval); } final Vector> results = new Vector>(); db.scan(table, keyname, len, fields, results); } - protected void doTransactionDelete(final DB db, Object threadstate) { + protected void doTransactionDelete(final AsyncDB db, Object threadstate) { final ThreadState state = (ThreadState) threadstate; final Random random = ThreadLocalRandom.current(); final StringBuilder buf = new StringBuilder().append(keys[random.nextInt(keys.length)]); int offsets = random.nextInt(maxOffsets - 1); final long startTimestamp; if (offsets > 0) { startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); } else { startTimestamp = state.startTimestamp; } // rando tags for (int i = 0; i < tagPairs; ++i) { if (groupBy && groupBys[i]) { buf.append(deleteDelimiter) .append(tagKeys[i]); } else { buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]); } } if (queryTimeSpan > 0) { final long endTimestamp; if (queryRandomTimeSpan) { endTimestamp = startTimestamp + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval)); } else { endTimestamp = startTimestamp + queryTimeSpan; } buf.append(deleteDelimiter) .append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); } else { buf.append(deleteDelimiter) .append(timestampKey + tagPairDelimiter + startTimestamp); } db.delete(table, buf.toString()); } /** * Parses the values returned by a read or scan operation and determines whether * or not the integer value matches the hash and timestamp of the original timestamp. * Only works for raw data points, will not work for group-by's or downsampled data. * @param key The time series key. * @param cells The cells read by the DB. * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if * the data did not match. */ protected Status verifyRow(final String key, final Map cells) { Status verifyStatus = Status.UNEXPECTED_STATE; long startTime = System.nanoTime(); double value = 0; long timestamp = 0; final TreeMap validationTags = new TreeMap(); for (final Entry entry : cells.entrySet()) { if (entry.getKey().equals(timestampKey)) { final NumericByteIterator it = (NumericByteIterator) entry.getValue(); timestamp = it.getLong(); } else if (entry.getKey().equals(valueKey)) { final NumericByteIterator it = (NumericByteIterator) entry.getValue(); value = it.isFloatingPoint() ? it.getDouble() : it.getLong(); } else { validationTags.put(entry.getKey(), entry.getValue().toString()); } } if (validationFunction(key, timestamp, validationTags) == value) { verifyStatus = Status.OK; } long endTime = System.nanoTime(); measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); measurements.reportStatus("VERIFY", verifyStatus); return verifyStatus; } /** * Function used for generating a deterministic hash based on the combination * of metric, tags and timestamp. * @param key A non-null string representing the key. * @param timestamp A timestamp in the proper units for the workload. * @param tags A non-null map of tag keys and values NOT including the YCSB * key or timestamp. * @return A hash value as an 8 byte integer. */ protected long validationFunction(final String key, final long timestamp, final TreeMap tags) { final StringBuilder validationBuffer = new StringBuilder(keys[0].length() + (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1])); for (final Entry pair : tags.entrySet()) { validationBuffer.append(pair.getKey()).append(pair.getValue()); } return (long) validationBuffer.toString().hashCode() ^ timestamp; } /** * Breaks out the keys, tags and cardinality initialization in another method * to keep CheckStyle happy. * @throws WorkloadException If something goes pear shaped. */ protected void initKeysAndTags() throws WorkloadException { final int keyLength = Integer.parseInt(properties.getProperty( CoreWorkload.FIELD_LENGTH_PROPERTY, CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT)); final int tagKeyLength = Integer.parseInt(properties.getProperty( TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT)); final int tagValueLength = Integer.parseInt(properties.getProperty( TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT)); keyGenerator = new IncrementingPrintableStringGenerator(keyLength); tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength); tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength); final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1")); final String tagCardinalityString = properties.getProperty( TAG_CARDINALITY_PROPERTY, TAG_CARDINALITY_PROPERTY_DEFAULT); final String[] tagCardinalityParts = tagCardinalityString.split(","); int idx = 0; totalCardinality = numKeys; perKeyCardinality = 1; int maxCardinality = 0; for (final String card : tagCardinalityParts) { try { tagCardinality[idx] = Integer.parseInt(card.trim()); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse cardinality: " + card, nfe); } if (tagCardinality[idx] < 1) { throw new WorkloadException("Cardinality must be greater than zero: " + tagCardinality[idx]); } totalCardinality *= tagCardinality[idx]; perKeyCardinality *= tagCardinality[idx]; if (tagCardinality[idx] > maxCardinality) { maxCardinality = tagCardinality[idx]; } ++idx; if (idx >= tagPairs) { // we have more cardinalities than tag keys so bail at this point. break; } } if (numKeys < threads) { throw new WorkloadException("Field count " + numKeys + " (keys for time " + "series workloads) must be greater or equal to the number of " + "threads " + threads); } // fill tags without explicit cardinality with 1 if (idx < tagPairs) { tagCardinality[idx++] = 1; } for (int i = 0; i < tagCardinality.length; ++i) { if (tagCardinality[i] > 1) { firstIncrementableCardinality = i; break; } } keys = new String[numKeys]; tagKeys = new String[tagPairs]; tagValues = new String[maxCardinality]; for (int i = 0; i < numKeys; ++i) { keys[i] = keyGenerator.nextString(); } for (int i = 0; i < tagPairs; ++i) { tagKeys[i] = tagKeyGenerator.nextString(); } for (int i = 0; i < maxCardinality; i++) { tagValues[i] = tagValueGenerator.nextString(); } if (randomizeTimeseriesOrder) { Utils.shuffleArray(keys); Utils.shuffleArray(tagValues); } maxOffsets = (recordcount / totalCardinality) + 1; final int[] keyAndTagCardinality = new int[tagPairs + 1]; keyAndTagCardinality[0] = numKeys; for (int i = 0; i < tagPairs; i++) { keyAndTagCardinality[i + 1] = tagCardinality[i]; } cumulativeCardinality = new int[keyAndTagCardinality.length]; for (int i = 0; i < keyAndTagCardinality.length; i++) { int cumulation = 1; for (int x = i; x <= keyAndTagCardinality.length - 1; x++) { cumulation *= keyAndTagCardinality[x]; } if (i > 0) { cumulativeCardinality[i - 1] = cumulation; } } cumulativeCardinality[cumulativeCardinality.length - 1] = 1; } /** * Makes sure the settings as given are compatible. * @throws WorkloadException If one or more settings were invalid. */ protected void validateSettings() throws WorkloadException { if (dataintegrity) { if (valueType != ValueType.INTEGERS) { throw new WorkloadException("Data integrity was enabled. 'valuetype' must " + "be set to 'integers'."); } if (groupBy) { throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must " + "be empty or null."); } if (downsample) { throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must " + "be empty or null."); } if (queryTimeSpan > 0) { throw new WorkloadException("Data integrity was enabled. 'querytimespan' must " + "be empty or 0."); } if (randomizeTimeseriesOrder) { throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must " + "be false."); } final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); if (startTimestamp == null || startTimestamp.isEmpty()) { throw new WorkloadException("Data integrity was enabled. 'insertstart' must " + "be set to a Unix Epoch timestamp."); } } } /** * Thread state class holding thread local generators and indices. */ protected class ThreadState { /** The timestamp generator for this thread. */ protected final UnixEpochTimestampGenerator timestampGenerator; /** An offset generator to select a random offset for queries. */ protected final NumberGenerator queryOffsetGenerator; /** The current write key index. */ protected int keyIdx; /** The starting fence for writing keys. */ protected int keyIdxStart; /** The ending fence for writing keys. */ protected int keyIdxEnd; /** Indices for each tag value for writes. */ protected int[] tagValueIdxs; /** Whether or not all time series have written values for the current timestamp. */ protected boolean rollover; /** The starting timestamp. */ protected long startTimestamp; /** * Default ctor. * @param threadID The zero based thread ID. * @param threadCount The total number of threads. * @throws WorkloadException If something went pear shaped. */ protected ThreadState(final int threadID, final int threadCount) throws WorkloadException { int totalThreads = threadCount > 0 ? threadCount : 1; if (threadID >= totalThreads) { throw new IllegalStateException("Thread ID " + threadID + " cannot be greater " + "than or equal than the thread count " + totalThreads); } if (keys.length < threadCount) { throw new WorkloadException("Thread count " + totalThreads + " must be greater " + "than or equal to key count " + keys.length); } int keysPerThread = keys.length / totalThreads; keyIdx = keysPerThread * threadID; keyIdxStart = keyIdx; if (totalThreads - 1 == threadID) { keyIdxEnd = keys.length; } else { keyIdxEnd = keyIdxStart + keysPerThread; } tagValueIdxs = new int[tagPairs]; // all zeros final String startingTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); if (startingTimestamp == null || startingTimestamp.isEmpty()) { timestampGenerator = randomizeTimestampOrder ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) : new UnixEpochTimestampGenerator(timestampInterval, timeUnits); } else { try { timestampGenerator = randomizeTimestampOrder ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, Long.parseLong(startingTimestamp), maxOffsets) : new UnixEpochTimestampGenerator(timestampInterval, timeUnits, Long.parseLong(startingTimestamp)); } catch (NumberFormatException nfe) { throw new WorkloadException("Unable to parse the " + CoreWorkload.INSERT_START_PROPERTY, nfe); } } // Set the last value properly for the timestamp, otherwise it may start // one interval ago. startTimestamp = timestampGenerator.nextValue(); // TODO - pick it queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2); } /** * Generates the next write value for thread. * @param map An initialized map to populate with tag keys and values as well * as the timestamp and actual value. * @param isInsert Whether or not it's an insert or an update. Updates will pick * an older timestamp (if random isn't enabled). * @return The next key to write. */ protected String nextDataPoint(final Map map, final boolean isInsert) { final Random random = ThreadLocalRandom.current(); int iterations = sparsity <= 0 ? 1 : random.nextInt((int) ((double) perKeyCardinality * sparsity)); if (iterations < 1) { iterations = 1; } while (true) { iterations--; if (rollover) { timestampGenerator.nextValue(); rollover = false; } String key = null; if (iterations <= 0) { final TreeMap validationTags; if (dataintegrity) { validationTags = new TreeMap(); } else { validationTags = null; } key = keys[keyIdx]; int overallIdx = keyIdx * cumulativeCardinality[0]; for (int i = 0; i < tagPairs; ++i) { int tvidx = tagValueIdxs[i]; map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx])); if (dataintegrity) { validationTags.put(tagKeys[i], tagValues[tvidx]); } if (delayedSeries > 0) { overallIdx += (tvidx * cumulativeCardinality[i + 1]); } } if (!isInsert) { final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval; final int intervals = random.nextInt((int) delta); map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval))); } else if (delayedSeries > 0) { // See if the series falls in a delay bucket and calculate an offset earlier // than the current timestamp value if so. double pct = (double) overallIdx / (double) totalCardinality; if (pct < delayedSeries) { int modulo = overallIdx % delayedIntervals; if (modulo < 0) { modulo *= -1; } map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() - timestampInterval * modulo)); } else { map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); } } else { map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); } if (dataintegrity) { map.put(valueKey, new NumericByteIterator(validationFunction(key, timestampGenerator.currentValue(), validationTags))); } else { switch (valueType) { case INTEGERS: map.put(valueKey, new NumericByteIterator(random.nextInt())); break; case FLOATS: map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000)); break; case MIXED: if (random.nextBoolean()) { map.put(valueKey, new NumericByteIterator(random.nextInt())); } else { map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000)); } break; default: throw new IllegalStateException("Somehow we didn't have a value " + "type configured that we support: " + valueType); } } } boolean tagRollover = false; for (int i = tagCardinality.length - 1; i >= 0; --i) { if (tagCardinality[i] <= 1) { tagRollover = true; // Only one tag so needs roll over. continue; } if (tagValueIdxs[i] + 1 >= tagCardinality[i]) { tagValueIdxs[i] = 0; if (i == firstIncrementableCardinality) { tagRollover = true; } } else { ++tagValueIdxs[i]; break; } } if (tagRollover) { if (keyIdx + 1 >= keyIdxEnd) { keyIdx = keyIdxStart; rollover = true; } else { ++keyIdx; } } if (iterations <= 0) { return key; } } } } } \ No newline at end of file diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java index 409331ac..9b8464b9 100644 --- a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java +++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java @@ -1,577 +1,579 @@ /** * Copyright (c) 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.TreeMap; import java.util.Vector; +import java.util.concurrent.CompletableFuture; + +import com.yahoo.ycsb.AsyncDB; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.Client; -import com.yahoo.ycsb.DB; import com.yahoo.ycsb.NumericByteIterator; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.Utils; import com.yahoo.ycsb.WorkloadException; import com.yahoo.ycsb.measurements.Measurements; import org.testng.annotations.Test; public class TestTimeSeriesWorkload { @Test public void twoThreads() throws Exception { final Properties p = getUTProperties(); Measurements.setProperties(p); final TimeSeriesWorkload wl = new TimeSeriesWorkload(); wl.init(p); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAB"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test (expectedExceptions = WorkloadException.class) public void badTimeUnit() throws Exception { final Properties p = new Properties(); p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar"); getWorkload(p, true); } @Test (expectedExceptions = WorkloadException.class) public void failedToInitWorkloadBeforeThreadInit() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, false); //wl.init(p); // <-- we NEED this :( final Object threadState = wl.initThread(p, 0, 2); final MockDB db = new MockDB(); wl.doInsert(db, threadState); } @Test (expectedExceptions = IllegalStateException.class) public void failedToInitThread() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final MockDB db = new MockDB(); wl.doInsert(db, null); } @Test public void insertOneKeyOneTagCardinalityOne() throws Exception { final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "1"); p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1"); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); timestamp += 60; } } @Test public void insertOneKeyTwoTagsLowCardinality() throws Exception { final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void insertTwoKeysTwoTagsLowCardinality() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; int metricCtr = 0; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); } if (metricCtr++ > 1) { assertEquals(db.keys.get(i), "AAAB"); if (metricCtr >= 4) { metricCtr = 0; timestamp += 60; } } else { assertEquals(db.keys.get(i), "AAAA"); } } } @Test public void insertTwoKeysTwoThreads() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); // <-- key 1 assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAB"); // <-- key 2 assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void insertThreeKeysTwoThreads() throws Exception { // To make sure the distribution doesn't miss any metrics final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3"); final TimeSeriesWorkload wl = getWorkload(p, true); Object threadState = wl.initThread(p, 0, 2); MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertTrue(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } threadState = wl.initThread(p, 1, 2); db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } timestamp = 1451606400; int metricCtr = 0; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); } if (metricCtr++ > 1) { assertEquals(db.keys.get(i), "AAAC"); if (metricCtr >= 4) { metricCtr = 0; timestamp += 60; } } else { assertEquals(db.keys.get(i), "AAAB"); } } } @Test public void insertWithValidation() throws Exception { final Properties p = getUTProperties(); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 74; i++) { assertTrue(wl.doInsert(db, threadState)); } assertEquals(db.keys.size(), 74); assertEquals(db.values.size(), 74); long timestamp = 1451606400; for (int i = 0; i < db.keys.size(); i++) { assertEquals(db.keys.get(i), "AAAA"); assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); assertEquals(Utils.bytesToLong(db.values.get(i).get( TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); assertFalse(((NumericByteIterator) db.values.get(i) .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); // validation check final TreeMap validationTags = new TreeMap(); for (final Entry entry : db.values.get(i).entrySet()) { if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) || entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) { continue; } validationTags.put(entry.getKey(), entry.getValue().toString()); } assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags), ((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong()); if (i % 2 == 0) { assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); } else { assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); timestamp += 60; } } } @Test public void read() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final Object threadState = wl.initThread(p, 0, 1); final MockDB db = new MockDB(); for (int i = 0; i < 20; i++) { wl.doTransactionRead(db, threadState); } } @Test public void verifyRow() throws Exception { final Properties p = getUTProperties(); final TimeSeriesWorkload wl = getWorkload(p, true); final TreeMap validationTags = new TreeMap(); final HashMap cells = new HashMap(); validationTags.put("AA", "AAAA"); cells.put("AA", new StringByteIterator("AAAA")); validationTags.put("AB", "AAAB"); cells.put("AB", new StringByteIterator("AAAB")); long hash = wl.validationFunction("AAAA", 1451606400L, validationTags); cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L)); cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash)); assertEquals(wl.verifyRow("AAAA", cells), Status.OK); // tweak the last value a bit for (final ByteIterator it : cells.values()) { it.reset(); } cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1)); assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); // no value cell, returns an unexpected state for (final ByteIterator it : cells.values()) { it.reset(); } cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); } @Test public void validateSettingsDataIntegrity() throws Exception { Properties p = getUTProperties(); // data validation incompatibilities p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum"); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, ""); p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p = getUTProperties(); p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true"); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, ""); try { getWorkload(p, true); fail("Expected WorkloadException"); } catch (WorkloadException e) { } } /** Helper method that generates unit testing defaults for the properties map */ private Properties getUTProperties() { final Properties p = new Properties(); p.put(Client.RECORD_COUNT_PROPERTY, "10"); p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2"); p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4"); p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2"); p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4"); p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2"); p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2"); p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400"); p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0"); p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); return p; } /** Helper to setup the workload for testing. */ private TimeSeriesWorkload getWorkload(final Properties p, final boolean init) throws WorkloadException { Measurements.setProperties(p); if (!init) { return new TimeSeriesWorkload(); } else { final TimeSeriesWorkload workload = new TimeSeriesWorkload(); workload.init(p); return workload; } } - static class MockDB extends DB { + static class MockDB extends AsyncDB { final List keys = new ArrayList(); final List> values = new ArrayList>(); @Override - public Status read(String table, String key, Set fields, - Map result) { - return Status.OK; + public CompletableFuture read(String table, String key, Set fields, + Map result) { + return CompletableFuture.completedFuture(Status.OK); } @Override - public Status scan(String table, String startkey, int recordcount, - Set fields, Vector> result) { + public CompletableFuture scan(String table, String startkey, int recordcount, + Set fields, Vector> result) { // TODO Auto-generated method stub - return Status.OK; + return CompletableFuture.completedFuture(Status.OK); } @Override - public Status update(String table, String key, - Map values) { + public CompletableFuture update(String table, String key, + Map values) { // TODO Auto-generated method stub - return Status.OK; + return CompletableFuture.completedFuture(Status.OK); } @Override - public Status insert(String table, String key, - Map values) { + public CompletableFuture insert(String table, String key, + Map values) { keys.add(key); this.values.add(values); - return Status.OK; + return CompletableFuture.completedFuture(Status.OK); } @Override - public Status delete(String table, String key) { + public CompletableFuture delete(String table, String key) { // TODO Auto-generated method stub - return Status.OK; + return CompletableFuture.completedFuture(Status.OK); } public void dumpStdout() { for (int i = 0; i < keys.size(); i++) { System.out.print("[" + i + "] Key: " + keys.get(i) + " Values: {"); int x = 0; for (final Entry entry : values.get(i).entrySet()) { if (x++ > 0) { System.out.print(", "); } System.out.print("{" + entry.getKey() + " => "); if (entry.getKey().equals("YCSBV")) { System.out.print(new String(Utils.bytesToDouble(entry.getValue().toArray()) + "}")); } else if (entry.getKey().equals("YCSBTS")) { System.out.print(new String(Utils.bytesToLong(entry.getValue().toArray()) + "}")); } else { System.out.print(new String(entry.getValue().toArray()) + "}"); } } System.out.println("}"); } } } } \ No newline at end of file