diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java index b196957c..d9bc5717 100644 --- a/core/src/main/java/com/yahoo/ycsb/Client.java +++ b/core/src/main/java/com/yahoo/ycsb/Client.java @@ -1,1124 +1,1125 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import com.yahoo.ycsb.measurements.Measurements; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter; import org.apache.htrace.core.HTraceConfiguration; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** * A thread to periodically show the status of the experiment to reassure you that progress is being made. */ class StatusThread extends Thread { // Counts down each of the clients completing private final CountDownLatch completeLatch; // Stores the measurements for the run private final Measurements measurements; // Whether or not to track the JVM stats per run private final boolean trackJVMStats; // The clients that are running. private final List clients; private final String label; private final boolean standardstatus; // The interval for reporting status. private long sleeptimeNs; // JVM max/mins private int maxThreads; private int minThreads = Integer.MAX_VALUE; private long maxUsedMem; private long minUsedMem = Long.MAX_VALUE; private double maxLoadAvg; private double minLoadAvg = Double.MAX_VALUE; private long lastGCCount = 0; private long lastGCTime = 0; /** * Creates a new StatusThread without JVM stat tracking. * * @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} * as they complete. * @param clients The clients to collect metrics from. * @param label The label for the status. * @param standardstatus If true the status is printed to stdout in addition to stderr. * @param statusIntervalSeconds The number of seconds between status updates. */ public StatusThread(CountDownLatch completeLatch, List clients, String label, boolean standardstatus, int statusIntervalSeconds) { this(completeLatch, clients, label, standardstatus, statusIntervalSeconds, false); } /** * Creates a new StatusThread. * * @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} * as they complete. * @param clients The clients to collect metrics from. * @param label The label for the status. * @param standardstatus If true the status is printed to stdout in addition to stderr. * @param statusIntervalSeconds The number of seconds between status updates. * @param trackJVMStats Whether or not to track JVM stats. */ public StatusThread(CountDownLatch completeLatch, List clients, String label, boolean standardstatus, int statusIntervalSeconds, boolean trackJVMStats) { this.completeLatch = completeLatch; this.clients = clients; this.label = label; this.standardstatus = standardstatus; sleeptimeNs = TimeUnit.SECONDS.toNanos(statusIntervalSeconds); measurements = Measurements.getMeasurements(); this.trackJVMStats = trackJVMStats; } /** * Run and periodically report status. */ @Override public void run() { final long startTimeMs = System.currentTimeMillis(); final long startTimeNanos = System.nanoTime(); long deadline = startTimeNanos + sleeptimeNs; long startIntervalMs = startTimeMs; long lastTotalOps = 0; boolean alldone; do { long nowMs = System.currentTimeMillis(); lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps); if (trackJVMStats) { measureJVM(); } alldone = waitForClientsUntil(deadline); startIntervalMs = nowMs; deadline += sleeptimeNs; } while (!alldone); if (trackJVMStats) { measureJVM(); } // Print the final stats. computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps); } /** * Computes and prints the stats. * * @param startTimeMs The start time of the test. * @param startIntervalMs The start time of this interval. * @param endIntervalMs The end time (now) for the interval. * @param lastTotalOps The last total operations count. * @return The current operation count. */ private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs, long lastTotalOps) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); long totalops = 0; long todoops = 0; // Calculate the total number of operations completed. for (ClientThread t : clients) { totalops += t.getOpsDone(); todoops += t.getOpsTodo(); } long interval = endIntervalMs - startTimeMs; double throughput = 1000.0 * (((double) totalops) / (double) interval); double curthroughput = 1000.0 * (((double) (totalops - lastTotalOps)) / ((double) (endIntervalMs - startIntervalMs))); long estremaining = (long) Math.ceil(todoops / throughput); DecimalFormat d = new DecimalFormat("#.##"); String labelString = this.label + format.format(new Date()); StringBuilder msg = new StringBuilder(labelString).append(" ").append(interval / 1000).append(" sec: "); msg.append(totalops).append(" operations; "); if (totalops != 0) { msg.append(d.format(curthroughput)).append(" current ops/sec; "); } if (todoops != 0) { msg.append("est completion in ").append(RemainingFormatter.format(estremaining)); } msg.append(Measurements.getMeasurements().getSummary()); System.err.println(msg); if (standardstatus) { System.out.println(msg); } return totalops; } /** * Waits for all of the client to finish or the deadline to expire. * * @param deadline The current deadline. * @return True if all of the clients completed. */ private boolean waitForClientsUntil(long deadline) { boolean alldone = false; long now = System.nanoTime(); while (!alldone && now < deadline) { try { alldone = completeLatch.await(deadline - now, TimeUnit.NANOSECONDS); } catch (InterruptedException ie) { // If we are interrupted the thread is being asked to shutdown. // Return true to indicate that and reset the interrupt state // of the thread. Thread.currentThread().interrupt(); alldone = true; } now = System.nanoTime(); } return alldone; } /** * Executes the JVM measurements. */ private void measureJVM() { final int threads = Utils.getActiveThreadCount(); if (threads < minThreads) { minThreads = threads; } if (threads > maxThreads) { maxThreads = threads; } measurements.measure("THREAD_COUNT", threads); // TODO - once measurements allow for other number types, switch to using // the raw bytes. Otherwise we can track in MB to avoid negative values // when faced with huge heaps. final int usedMem = Utils.getUsedMemoryMegaBytes(); if (usedMem < minUsedMem) { minUsedMem = usedMem; } if (usedMem > maxUsedMem) { maxUsedMem = usedMem; } measurements.measure("USED_MEM_MB", usedMem); // Some JVMs may not implement this feature so if the value is less than // zero, just ommit it. final double systemLoad = Utils.getSystemLoadAverage(); if (systemLoad >= 0) { // TODO - store the double if measurements allows for them measurements.measure("SYS_LOAD_AVG", (int) systemLoad); if (systemLoad > maxLoadAvg) { maxLoadAvg = systemLoad; } if (systemLoad < minLoadAvg) { minLoadAvg = systemLoad; } } final long gcs = Utils.getGCTotalCollectionCount(); measurements.measure("GCS", (int) (gcs - lastGCCount)); final long gcTime = Utils.getGCTotalTime(); measurements.measure("GCS_TIME", (int) (gcTime - lastGCTime)); lastGCCount = gcs; lastGCTime = gcTime; } /** * @return The maximum threads running during the test. */ public int getMaxThreads() { return maxThreads; } /** * @return The minimum threads running during the test. */ public int getMinThreads() { return minThreads; } /** * @return The maximum memory used during the test. */ public long getMaxUsedMem() { return maxUsedMem; } /** * @return The minimum memory used during the test. */ public long getMinUsedMem() { return minUsedMem; } /** * @return The maximum load average during the test. */ public double getMaxLoadAvg() { return maxLoadAvg; } /** * @return The minimum load average during the test. */ public double getMinLoadAvg() { return minLoadAvg; } /** * @return Whether or not the thread is tracking JVM stats. */ public boolean trackJVMStats() { return trackJVMStats; } } /** * Turn seconds remaining into more useful units. * i.e. if there are hours or days worth of seconds, use them. */ final class RemainingFormatter { private RemainingFormatter() { // not used } public static StringBuilder format(long seconds) { StringBuilder time = new StringBuilder(); long days = TimeUnit.SECONDS.toDays(seconds); if (days > 0) { time.append(days).append(days == 1 ? " day " : " days "); seconds -= TimeUnit.DAYS.toSeconds(days); } long hours = TimeUnit.SECONDS.toHours(seconds); if (hours > 0) { time.append(hours).append(hours == 1 ? " hour " : " hours "); seconds -= TimeUnit.HOURS.toSeconds(hours); } /* Only include minute granularity if we're < 1 day. */ if (days < 1) { long minutes = TimeUnit.SECONDS.toMinutes(seconds); if (minutes > 0) { time.append(minutes).append(minutes == 1 ? " minute " : " minutes "); seconds -= TimeUnit.MINUTES.toSeconds(seconds); } } /* Only bother to include seconds if we're < 1 minute */ if (time.length() == 0) { time.append(seconds).append(time.length() == 1 ? " second " : " seconds "); } return time; } } /** * A thread for executing transactions or data inserts to the database. */ class ClientThread implements Runnable { // Counts down each of the clients completing. private final CountDownLatch completeLatch; private static boolean spinSleep; private AsyncDB db; private boolean dotransactions; private Workload workload; private int opcount; private double targetOpsPerMs; private int opsdone; private int threadid; private int threadcount; private Object workloadstate; private Properties props; private long targetOpsTickNs; private final Measurements measurements; /** * Constructor. * * @param db the DB implementation to use * @param dotransactions true to do transactions, false to insert data * @param workload the workload to use * @param props the properties defining the experiment * @param opcount the number of operations (transactions or inserts) to do * @param targetperthreadperms target number of operations per thread per ms * @param completeLatch The latch tracking the completion of all clients. */ public ClientThread(AsyncDB db, boolean dotransactions, Workload workload, Properties props, int opcount, double targetperthreadperms, CountDownLatch completeLatch) { this.db = db; this.dotransactions = dotransactions; this.workload = workload; this.opcount = opcount; opsdone = 0; if (targetperthreadperms > 0) { targetOpsPerMs = targetperthreadperms; targetOpsTickNs = (long) (1000000 / targetOpsPerMs); } this.props = props; measurements = Measurements.getMeasurements(); spinSleep = Boolean.valueOf(this.props.getProperty("spin.sleep", "false")); this.completeLatch = completeLatch; } public void setThreadId(final int threadId) { threadid = threadId; } public void setThreadCount(final int threadCount) { threadcount = threadCount; } public int getOpsDone() { return opsdone; } @Override public void run() { try { db.init(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } try { workloadstate = workload.initThread(props, threadid, threadcount); } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); return; } //NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements // and the client thread have the same view on time. //spread the thread operations out so they don't all hit the DB at the same time // GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0 // and the sleep() doesn't make sense for granularities < 1 ms anyway if ((targetOpsPerMs > 0) && (targetOpsPerMs <= 1.0)) { long randomMinorDelay = ThreadLocalRandom.current().nextInt((int) targetOpsTickNs); sleepUntil(System.nanoTime() + randomMinorDelay); } try { if (dotransactions) { long startTimeNanos = System.nanoTime(); while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) { if (!workload.doTransaction(db, workloadstate)) { break; } opsdone++; throttleNanos(startTimeNanos); } } else { long startTimeNanos = System.nanoTime(); while (((opcount == 0) || (opsdone < opcount)) && !workload.isStopRequested()) { if (!workload.doInsert(db, workloadstate)) { break; } opsdone++; throttleNanos(startTimeNanos); } } } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { + workload.threadCleanup(workloadstate); measurements.setIntendedStartTimeNs(0); db.cleanup(); } catch (DBException e) { e.printStackTrace(); e.printStackTrace(System.out); } finally { completeLatch.countDown(); } } private static void sleepUntil(long deadline) { while (System.nanoTime() < deadline) { if (!spinSleep) { LockSupport.parkNanos(deadline - System.nanoTime()); } } } private void throttleNanos(long startTimeNanos) { //throttle the operations if (targetOpsPerMs > 0) { // delay until next tick long deadline = startTimeNanos + opsdone * targetOpsTickNs; sleepUntil(deadline); measurements.setIntendedStartTimeNs(deadline); } } /** * The total amount of work this thread is still expected to do. */ int getOpsTodo() { int todo = opcount - opsdone; return todo < 0 ? 0 : todo; } } /** * Main class for executing YCSB. */ public final class Client { private Client() { //not used } public static final String DEFAULT_RECORD_COUNT = "0"; /** * The target number of operations to perform. */ public static final String OPERATION_COUNT_PROPERTY = "operationcount"; /** * The number of records to load into the database initially. */ public static final String RECORD_COUNT_PROPERTY = "recordcount"; /** * The workload class to be loaded. */ public static final String WORKLOAD_PROPERTY = "workload"; /** * The database class to be used. */ public static final String DB_PROPERTY = "db"; /** * The exporter class to be used. The default is * com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter. */ public static final String EXPORTER_PROPERTY = "exporter"; /** * If set to the path of a file, YCSB will write all output to this file * instead of STDOUT. */ public static final String EXPORT_FILE_PROPERTY = "exportfile"; /** * The number of YCSB client threads to run. */ public static final String THREAD_COUNT_PROPERTY = "threadcount"; /** * Indicates how many inserts to do if less than recordcount. * Useful for partitioning the load among multiple servers if the client is the bottleneck. * Additionally workloads should support the "insertstart" property which tells them which record to start at. */ public static final String INSERT_COUNT_PROPERTY = "insertcount"; /** * Target number of operations per second. */ public static final String TARGET_PROPERTY = "target"; /** * The maximum amount of time (in seconds) for which the benchmark will be run. */ public static final String MAX_EXECUTION_TIME = "maxexecutiontime"; /** * Whether or not this is the transaction phase (run) or not (load). */ public static final String DO_TRANSACTIONS_PROPERTY = "dotransactions"; /** * Whether or not to show status during run. */ public static final String STATUS_PROPERTY = "status"; /** * Use label for status (e.g. to label one experiment out of a whole batch). */ public static final String LABEL_PROPERTY = "label"; /** * An optional thread used to track progress and measure JVM stats. */ private static StatusThread statusthread = null; // HTrace integration related constants. /** * All keys for configuring the tracing system start with this prefix. */ private static final String HTRACE_KEY_PREFIX = "htrace."; private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init"; private static final String CLIENT_INIT_SPAN = "Client#init"; private static final String CLIENT_WORKLOAD_SPAN = "Client#workload"; private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup"; private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements"; public static void usageMessage() { System.out.println("Usage: java com.yahoo.ycsb.Client [options]"); System.out.println("Options:"); System.out.println(" -threads n: execute using n threads (default: 1) - can also be specified as the \n" + " \"threadcount\" property using -p"); System.out.println(" -target n: attempt to do n operations per second (default: unlimited) - can also\n" + " be specified as the \"target\" property using -p"); System.out.println(" -load: run the loading phase of the workload"); System.out.println(" -t: run the transactions phase of the workload (default)"); System.out.println(" -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - \n" + " can also be specified as the \"db\" property using -p"); System.out.println(" -P propertyfile: load properties from the given file. Multiple files can"); System.out.println(" be specified, and will be processed in the order specified"); System.out.println(" -p name=value: specify a property to be passed to the DB and workloads;"); System.out.println(" multiple properties can be specified, and override any"); System.out.println(" values in the propertyfile"); System.out.println(" -s: show status during run (default: no status)"); System.out.println(" -l label: use label for status (e.g. to label one experiment out of a whole batch)"); System.out.println(""); System.out.println("Required properties:"); System.out.println(" " + WORKLOAD_PROPERTY + ": the name of the workload class to use (e.g. " + "com.yahoo.ycsb.workloads.CoreWorkload)"); System.out.println(""); System.out.println("To run the transaction phase from multiple servers, start a separate client on each."); System.out.println("To run the load phase from multiple servers, start a separate client on each; additionally,"); System.out.println("use the \"insertcount\" and \"insertstart\" properties to divide up the records " + "to be inserted"); } public static boolean checkRequiredProperties(Properties props) { if (props.getProperty(WORKLOAD_PROPERTY) == null) { System.out.println("Missing property: " + WORKLOAD_PROPERTY); return false; } return true; } /** * Exports the measurements to either sysout or a file using the exporter * loaded from conf. * * @throws IOException Either failed to write to output stream or failed to close it. */ private static void exportMeasurements(Properties props, int opcount, long runtime) throws IOException { MeasurementsExporter exporter = null; try { // if no destination file is provided the results will be written to stdout OutputStream out; String exportFile = props.getProperty(EXPORT_FILE_PROPERTY); if (exportFile == null) { out = System.out; } else { out = new FileOutputStream(exportFile); } // if no exporter is provided the default text one will be used String exporterStr = props.getProperty(EXPORTER_PROPERTY, "com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter"); try { exporter = (MeasurementsExporter) Class.forName(exporterStr).getConstructor(OutputStream.class) .newInstance(out); } catch (Exception e) { System.err.println("Could not find exporter " + exporterStr + ", will use default text reporter."); e.printStackTrace(); exporter = new TextMeasurementsExporter(out); } exporter.write("OVERALL", "RunTime(ms)", runtime); double throughput = 1000.0 * (opcount) / (runtime); exporter.write("OVERALL", "Throughput(ops/sec)", throughput); final Map gcs = Utils.getGCStatst(); long totalGCCount = 0; long totalGCTime = 0; for (final Entry entry : gcs.entrySet()) { exporter.write("TOTAL_GCS_" + entry.getKey(), "Count", entry.getValue()[0]); exporter.write("TOTAL_GC_TIME_" + entry.getKey(), "Time(ms)", entry.getValue()[1]); exporter.write("TOTAL_GC_TIME_%_" + entry.getKey(), "Time(%)", ((double) entry.getValue()[1] / runtime) * (double) 100); totalGCCount += entry.getValue()[0]; totalGCTime += entry.getValue()[1]; } exporter.write("TOTAL_GCs", "Count", totalGCCount); exporter.write("TOTAL_GC_TIME", "Time(ms)", totalGCTime); exporter.write("TOTAL_GC_TIME_%", "Time(%)", ((double) totalGCTime / runtime) * (double) 100); if (statusthread != null && statusthread.trackJVMStats()) { exporter.write("MAX_MEM_USED", "MBs", statusthread.getMaxUsedMem()); exporter.write("MIN_MEM_USED", "MBs", statusthread.getMinUsedMem()); exporter.write("MAX_THREADS", "Count", statusthread.getMaxThreads()); exporter.write("MIN_THREADS", "Count", statusthread.getMinThreads()); exporter.write("MAX_SYS_LOAD_AVG", "Load", statusthread.getMaxLoadAvg()); exporter.write("MIN_SYS_LOAD_AVG", "Load", statusthread.getMinLoadAvg()); } Measurements.getMeasurements().exportMeasurements(exporter); } finally { if (exporter != null) { exporter.close(); } } } @SuppressWarnings("unchecked") public static void main(String[] args) { Properties props = parseArguments(args); boolean status = Boolean.valueOf(props.getProperty(STATUS_PROPERTY, String.valueOf(false))); String label = props.getProperty(LABEL_PROPERTY, ""); long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0")); //get number of threads, target and db int threadcount = Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY, "1")); String dbname = props.getProperty(DB_PROPERTY, "com.yahoo.ycsb.BasicDB"); int target = Integer.parseInt(props.getProperty(TARGET_PROPERTY, "0")); //compute the target throughput double targetperthreadperms = -1; if (target > 0) { double targetperthread = ((double) target) / ((double) threadcount); targetperthreadperms = targetperthread / 1000.0; } Thread warningthread = setupWarningThread(); warningthread.start(); Measurements.setProperties(props); Workload workload = getWorkload(props); final Tracer tracer = getTracer(props, workload); initWorkload(props, warningthread, workload, tracer); System.err.println("Starting test."); final CountDownLatch completeLatch = new CountDownLatch(threadcount); final List clients = initDb(dbname, props, threadcount, targetperthreadperms, workload, tracer, completeLatch); if (status) { boolean standardstatus = false; if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY, "").compareTo("timeseries") == 0) { standardstatus = true; } int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval", "10")); boolean trackJVMStats = props.getProperty(Measurements.MEASUREMENT_TRACK_JVM_PROPERTY, Measurements.MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT).equals("true"); statusthread = new StatusThread(completeLatch, clients, label, standardstatus, statusIntervalSeconds, trackJVMStats); statusthread.start(); } Thread terminator = null; long st; long en; int opsDone; try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) { final Map threads = new HashMap<>(threadcount); for (ClientThread client : clients) { threads.put(new Thread(tracer.wrap(client, "ClientThread")), client); } st = System.currentTimeMillis(); for (Thread t : threads.keySet()) { t.start(); } if (maxExecutionTime > 0) { terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload); terminator.start(); } opsDone = 0; for (Map.Entry entry : threads.entrySet()) { try { entry.getKey().join(); opsDone += entry.getValue().getOpsDone(); } catch (InterruptedException ignored) { // ignored } } en = System.currentTimeMillis(); } try { try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) { if (terminator != null && !terminator.isInterrupted()) { terminator.interrupt(); } if (status) { // wake up status thread if it's asleep statusthread.interrupt(); // at this point we assume all the monitored threads are already gone as per above join loop. try { statusthread.join(); } catch (InterruptedException ignored) { // ignored } } workload.cleanup(); } } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } try { try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) { exportMeasurements(props, opsDone, en - st); } } catch (IOException e) { System.err.println("Could not export measurements, error: " + e.getMessage()); e.printStackTrace(); System.exit(-1); } System.exit(0); } private static List initDb(String dbname, Properties props, int threadcount, double targetperthreadperms, Workload workload, Tracer tracer, CountDownLatch completeLatch) { boolean initFailed = false; boolean dotransactions = Boolean.valueOf(props.getProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true))); final List clients = new ArrayList<>(threadcount); try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) { int opcount; if (dotransactions) { opcount = Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY, "0")); } else { if (props.containsKey(INSERT_COUNT_PROPERTY)) { opcount = Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY, "0")); } else { opcount = Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); } } for (int threadid = 0; threadid < threadcount; threadid++) { AsyncDB db; try { db = DBFactory.newDB(dbname, props, tracer); } catch (UnknownDBException e) { System.out.println("Unknown DB " + dbname); initFailed = true; break; } int threadopcount = opcount / threadcount; // ensure correct number of operations, in case opcount is not a multiple of threadcount if (threadid < opcount % threadcount) { ++threadopcount; } ClientThread t = new ClientThread(db, dotransactions, workload, props, threadopcount, targetperthreadperms, completeLatch); t.setThreadId(threadid); t.setThreadCount(threadcount); clients.add(t); } if (initFailed) { System.err.println("Error initializing datastore bindings."); System.exit(0); } } return clients; } private static Tracer getTracer(Properties props, Workload workload) { return new Tracer.Builder("YCSB " + workload.getClass().getSimpleName()) .conf(getHTraceConfiguration(props)) .build(); } private static void initWorkload(Properties props, Thread warningthread, Workload workload, Tracer tracer) { try { try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) { workload.init(props); warningthread.interrupt(); } } catch (WorkloadException e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } } private static HTraceConfiguration getHTraceConfiguration(Properties props) { final Map filteredProperties = new HashMap<>(); for (String key : props.stringPropertyNames()) { if (key.startsWith(HTRACE_KEY_PREFIX)) { filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key)); } } return HTraceConfiguration.fromMap(filteredProperties); } private static Thread setupWarningThread() { //show a warning message that creating the workload is taking a while //but only do so if it is taking longer than 2 seconds //(showing the message right away if the setup wasn't taking very long was confusing people) return new Thread() { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { return; } System.err.println(" (might take a few minutes for large data sets)"); } }; } private static Workload getWorkload(Properties props) { ClassLoader classLoader = Client.class.getClassLoader(); try { Properties projectProp = new Properties(); projectProp.load(classLoader.getResourceAsStream("project.properties")); System.err.println("YCSB Client " + projectProp.getProperty("version")); } catch (IOException e) { System.err.println("Unable to retrieve client version."); } System.err.println(); System.err.println("Loading workload..."); try { Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY)); return (Workload) workloadclass.newInstance(); } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); System.exit(0); } return null; } private static Properties parseArguments(String[] args) { Properties props = new Properties(); System.err.print("Command line:"); for (String arg : args) { System.err.print(" " + arg); } System.err.println(); Properties fileprops = new Properties(); int argindex = 0; if (args.length == 0) { usageMessage(); System.out.println("At least one argument specifying a workload is required."); System.exit(0); } while (args[argindex].startsWith("-")) { if (args[argindex].compareTo("-threads") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -threads."); System.exit(0); } int tcount = Integer.parseInt(args[argindex]); props.setProperty(THREAD_COUNT_PROPERTY, String.valueOf(tcount)); argindex++; } else if (args[argindex].compareTo("-target") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -target."); System.exit(0); } int ttarget = Integer.parseInt(args[argindex]); props.setProperty(TARGET_PROPERTY, String.valueOf(ttarget)); argindex++; } else if (args[argindex].compareTo("-load") == 0) { props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(false)); argindex++; } else if (args[argindex].compareTo("-t") == 0) { props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(true)); argindex++; } else if (args[argindex].compareTo("-s") == 0) { props.setProperty(STATUS_PROPERTY, String.valueOf(true)); argindex++; } else if (args[argindex].compareTo("-db") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -db."); System.exit(0); } props.setProperty(DB_PROPERTY, args[argindex]); argindex++; } else if (args[argindex].compareTo("-l") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -l."); System.exit(0); } props.setProperty(LABEL_PROPERTY, args[argindex]); argindex++; } else if (args[argindex].compareTo("-P") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -P."); System.exit(0); } String propfile = args[argindex]; argindex++; Properties myfileprops = new Properties(); try { myfileprops.load(new FileInputStream(propfile)); } catch (IOException e) { System.out.println("Unable to open the properties file " + propfile); System.out.println(e.getMessage()); System.exit(0); } //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e = myfileprops.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, myfileprops.getProperty(prop)); } } else if (args[argindex].compareTo("-p") == 0) { argindex++; if (argindex >= args.length) { usageMessage(); System.out.println("Missing argument value for -p"); System.exit(0); } int eq = args[argindex].indexOf('='); if (eq < 0) { usageMessage(); System.out.println("Argument '-p' expected to be in key=value format (e.g., -p operationcount=99999)"); System.exit(0); } String name = args[argindex].substring(0, eq); String value = args[argindex].substring(eq + 1); props.put(name, value); argindex++; } else { usageMessage(); System.out.println("Unknown option " + args[argindex]); System.exit(0); } if (argindex >= args.length) { break; } } if (argindex != args.length) { usageMessage(); if (argindex < args.length) { System.out.println("An argument value without corresponding argument specifier (e.g., -p, -s) was found. " + "We expected an argument specifier and instead found " + args[argindex]); } else { System.out.println("An argument specifier without corresponding value was found at the end of the supplied " + "command line arguments."); } System.exit(0); } //overwrite file properties with properties from the command line //Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5 for (Enumeration e = props.propertyNames(); e.hasMoreElements();) { String prop = (String) e.nextElement(); fileprops.setProperty(prop, props.getProperty(prop)); } props = fileprops; if (!checkRequiredProperties(props)) { System.out.println("Failed check required properties."); System.exit(0); } return props; } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java index 68e66d2d..e4278fc8 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java +++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java @@ -1,294 +1,300 @@ /** * Copyright (c) 2010 Yahoo! Inc., 2016-2017 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import com.yahoo.ycsb.measurements.Measurements; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; import java.util.concurrent.CompletableFuture; /** * Wrapper around a "real" DB that measures latencies and counts return codes. * Also reports latency separately between OK and failed operations. * Waits for async calls to finish during cleanup() operation. */ public class DBWrapper extends AsyncDB { private final AsyncDB db; private final Measurements measurements; private final Tracer tracer; private boolean reportLatencyForEachError = false; private Set latencyTrackedErrors = new HashSet(); private final Set> queries = new HashSet<>(); private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY = "reportlatencyforeacherror"; private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT = "false"; private static final String LATENCY_TRACKED_ERRORS_PROPERTY = "latencytrackederrors"; private final String scopeStringCleanup; private final String scopeStringDelete; private final String scopeStringInit; private final String scopeStringInsert; private final String scopeStringRead; private final String scopeStringScan; private final String scopeStringUpdate; public DBWrapper(final AsyncDB db, final Tracer tracer) { this.db = db; measurements = Measurements.getMeasurements(); this.tracer = tracer; final String simple = db.getClass().getSimpleName(); scopeStringCleanup = simple + "#cleanup"; scopeStringDelete = simple + "#delete"; scopeStringInit = simple + "#init"; scopeStringInsert = simple + "#insert"; scopeStringRead = simple + "#read"; scopeStringScan = simple + "#scan"; scopeStringUpdate = simple + "#update"; } /** * Set the properties for this DB. */ public void setProperties(Properties p) { db.setProperties(p); } /** * Get the set of properties for this DB. */ public Properties getProperties() { return db.getProperties(); } /** * Initialize any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void init() throws DBException { try (final TraceScope span = tracer.newScope(scopeStringInit)) { db.init(); this.reportLatencyForEachError = Boolean.parseBoolean(getProperties(). getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY, REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT)); if (!reportLatencyForEachError) { String latencyTrackedErrorsProperty = getProperties().getProperty(LATENCY_TRACKED_ERRORS_PROPERTY, null); if (latencyTrackedErrorsProperty != null) { this.latencyTrackedErrors = new HashSet(Arrays.asList( latencyTrackedErrorsProperty.split(","))); } } System.err.println("DBWrapper: report latency for each error is " + this.reportLatencyForEachError + " and specific error codes to track" + " for latency are: " + this.latencyTrackedErrors.toString()); } } /** * Cleanup any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ public void cleanup() throws DBException { try (final TraceScope span = tracer.newScope(scopeStringCleanup)) { - // Wait for any asynchronous operations to finish. - System.err.println("DBWrapper: Waiting 100ms for all queries to finish."); + // Wait for asynchronous operations to finish. + System.err.println("DBWrapper: Waiting for all queries to finish."); CompletableFuture.allOf(queries.toArray(new CompletableFuture[0])).join(); long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); db.cleanup(); long en = System.nanoTime(); measure("CLEANUP", Status.OK, ist, st, en); } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table The name of the table * @param key The record key of the record to read. * @param fields The list of fields to read, or null for all of them * @param result A HashMap of field/value pairs for the result * @return The result of the operation. */ public CompletableFuture read(String table, String key, Set fields, Map result) { try (final TraceScope span = tracer.newScope(scopeStringRead)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); CompletableFuture readFuture = db.read(table, key, fields, result).whenComplete((res, ex) -> { if (ex != null) { System.err.println("READ failed due to exception: " + ex); res = Status.ERROR; } long en = System.nanoTime(); measure("READ", res, ist, st, en); measurements.reportStatus("READ", res); }); queries.add(readFuture); return readFuture; } } /** * Perform a range scan for a set of records in the database. * Each field/value pair from the result will be stored in a HashMap. * * @param table The name of the table * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return The result of the operation. */ public CompletableFuture scan(String table, String startkey, int recordcount, Set fields, Vector> result) { try (final TraceScope span = tracer.newScope(scopeStringScan)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); CompletableFuture scanFuture = db.scan(table, startkey, recordcount, fields, result) .whenComplete((res, ex) -> { if (ex != null) { System.err.println("SCAN failed due to exception: " + ex); res = Status.ERROR; } long en = System.nanoTime(); measure("SCAN", res, ist, st, en); measurements.reportStatus("SCAN", res); }); queries.add(scanFuture); return scanFuture; } } private void measure(String op, Status result, long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) { String measurementName = op; if (result == null || !result.isOk()) { if (this.reportLatencyForEachError || this.latencyTrackedErrors.contains(result.getName())) { measurementName = op + "-" + result.getName(); } else { measurementName = op + "-FAILED"; } } measurements.measure(measurementName, (int) ((endTimeNanos - startTimeNanos) / 1000)); measurements.measureIntended(measurementName, (int) ((endTimeNanos - intendedStartTimeNanos) / 1000)); } /** * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the * record with the specified record key, overwriting any existing values with the same field name. * * @param table The name of the table * @param key The record key of the record to write. * @param values A HashMap of field/value pairs to update in the record * @return The result of the operation. */ public CompletableFuture update(String table, String key, Map values) { try (final TraceScope span = tracer.newScope(scopeStringUpdate)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); CompletableFuture updateFuture = db.update(table, key, values).whenComplete((res, ex) -> { if (ex != null) { System.err.println("UPDATE failed due to exception: " + ex); res = Status.ERROR; } long en = System.nanoTime(); measure("UPDATE", res, ist, st, en); measurements.reportStatus("UPDATE", res); }); queries.add(updateFuture); return updateFuture; } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified * record key. * * @param table The name of the table * @param key The record key of the record to insert. * @param values A HashMap of field/value pairs to insert in the record * @return The result of the operation. */ public CompletableFuture insert(String table, String key, Map values) { try (final TraceScope span = tracer.newScope(scopeStringInsert)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); CompletableFuture insertFuture = db.insert(table, key, values).whenComplete((res, ex) -> { if (ex != null) { System.err.println("INSERT failed due to exception: " + ex); res = Status.ERROR; } long en = System.nanoTime(); measure("INSERT", res, ist, st, en); measurements.reportStatus("INSERT", res); }); queries.add(insertFuture); return insertFuture; } } /** * Delete a record from the database. * * @param table The name of the table * @param key The record key of the record to delete. * @return The result of the operation. */ public CompletableFuture delete(String table, String key) { try (final TraceScope span = tracer.newScope(scopeStringDelete)) { long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); CompletableFuture deleteFuture = db.delete(table, key).whenComplete((res, ex) -> { if (ex != null) { System.err.println("DELETE failed due to exception: " + ex); res = Status.ERROR; } long en = System.nanoTime(); measure("DELETE", res, ist, st, en); measurements.reportStatus("DELETE", res); }); queries.add(deleteFuture); return deleteFuture; } } } diff --git a/core/src/main/java/com/yahoo/ycsb/Workload.java b/core/src/main/java/com/yahoo/ycsb/Workload.java index 289c2b23..b1fad69f 100644 --- a/core/src/main/java/com/yahoo/ycsb/Workload.java +++ b/core/src/main/java/com/yahoo/ycsb/Workload.java @@ -1,122 +1,125 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import java.util.concurrent.atomic.AtomicBoolean; import java.util.Properties; /** * One experiment scenario. One object of this type will * be instantiated and shared among all client threads. This class * should be constructed using a no-argument constructor, so we can * load it dynamically. Any argument-based initialization should be * done by init(). * * If you extend this class, you should support the "insertstart" property. This * allows the Client to proceed from multiple clients on different machines, in case * the client is the bottleneck. For example, if we want to load 1 million records from * 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally, * the "insertcount" property, which is interpreted by Client, can be used to tell each instance of the * client how many inserts to do. In the example above, both clients should have insertcount=500000. */ public abstract class Workload { public static final String INSERT_START_PROPERTY = "insertstart"; public static final String INSERT_COUNT_PROPERTY = "insertcount"; public static final String INSERT_START_PROPERTY_DEFAULT = "0"; private volatile AtomicBoolean stopRequested = new AtomicBoolean(false); /** Operations available for a database. */ public enum Operation { READ, UPDATE, INSERT, SCAN, DELETE } /** * Initialize the scenario. Create any generators and other shared objects here. * Called once, in the main client thread, before any operations are started. */ public void init(Properties p) throws WorkloadException { } /** * Initialize any state for a particular client thread. Since the scenario object * will be shared among all threads, this is the place to create any state that is specific * to one thread. To be clear, this means the returned object should be created anew on each * call to initThread(); do not return the same object multiple times. * The returned object will be passed to invocations of doInsert() and doTransaction() * for this thread. There should be no side effects from this call; all state should be encapsulated * in the returned object. If you have no state to retain for this thread, return null. (But if you have * no state to retain for this thread, probably you don't need to override initThread().) * * @return false if the workload knows it is done for this thread. Client will terminate the thread. * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read * traces from a file, return true when there are more to do, false when you are done. */ public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { return null; } /** * Cleanup the scenario. Called once, in the main client thread, after all operations have completed. */ public void cleanup() throws WorkloadException { } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. */ public abstract boolean doInsert(AsyncDB db, Object threadstate); /** * Do one transaction operation. Because it will be called concurrently from multiple client threads, this * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. * * @return false if the workload knows it is done for this thread. Client will terminate the thread. * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read * traces from a file, return true when there are more to do, false when you are done. */ public abstract boolean doTransaction(AsyncDB db, Object threadstate); /** * Allows scheduling a request to stop the workload. */ public void requestStop() { stopRequested.set(true); } /** * Check the status of the stop request flag. * @return true if stop was requested, false otherwise. */ public boolean isStopRequested() { return stopRequested.get(); } + + public void threadCleanup(Object threadstate) {} + } diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index 564d3ea3..22068400 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,913 +1,926 @@ /** * Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016-2017 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.workloads; import com.yahoo.ycsb.*; import com.yahoo.ycsb.generator.*; import com.yahoo.ycsb.generator.UniformLongGenerator; import com.yahoo.ycsb.measurements.Measurements; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /** * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The * relative proportion of different kinds of operations, and other properties of the workload, * are controlled by parameters specified at runtime. *

* Properties to control the client: *

*/ public class CoreWorkload extends Workload { /** * The name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY = "table"; /** * The default name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY_DEFAULT = "usertable"; protected String table; /** * The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY = "fieldcount"; /** * Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; private List fieldnames; /** * The name of the property for the field length distribution. Options are "uniform", "zipfian" * (favouring short records), "constant", and "histogram". *

* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the * fieldlength property. If "histogram", then the histogram will be read from the filename * specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution"; /** * The default field length distribution. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; /** * The name of the property for the length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY = "fieldlength"; /** * The default maximum length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100"; /** * The name of the property for the minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY = "minfieldlength"; /** * The default minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of a property that specifies the filename containing the field length histogram (only * used if fieldlengthdistribution is "histogram"). */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; /** * The default filename containing a field length histogram. */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; /** * Generator object that produces field lengths. The value of this depends on the properties that * start with "FIELD_LENGTH_". */ protected NumberGenerator fieldlengthgenerator; /** * The name of the property for deciding whether to read one field (false) or all fields (true) of * a record. */ public static final String READ_ALL_FIELDS_PROPERTY = "readallfields"; /** * The default value for the readallfields property. */ public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true"; protected boolean readallfields; /** * The name of the property for deciding whether to write one field (false) or all fields (true) * of a record. */ public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields"; /** * The default value for the writeallfields property. */ public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false"; protected boolean writeallfields; /** * The name of the property for deciding whether to check all returned * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; /** * The default value for the dataintegrity property. */ public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false"; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ private boolean dataintegrity; /** * The name of the property for the proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY = "readproportion"; /** * The default proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95"; /** * The name of the property for the proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion"; /** * The default proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05"; /** * The name of the property for the proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY = "insertproportion"; /** * The default proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY = "scanproportion"; /** * The default proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are read-modify-write. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion"; /** * The default proportion of transactions that are scans. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the the distribution of requests across the keyspace. Options are * "uniform", "zipfian" and "latest" */ public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution"; /** * The default distribution of requests across the keyspace. */ public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for adding zero padding to record numbers in order to match * string sort order. Controls the number of 0s to left pad with. */ public static final String ZERO_PADDING_PROPERTY = "zeropadding"; /** * The default zero padding value. Matches integer sort order */ public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1"; /** * The name of the property for the min scan length (number of records). */ public static final String MIN_SCAN_LENGTH_PROPERTY = "minscanlength"; /** * The default min scan length. */ public static final String MIN_SCAN_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of the property for the max scan length (number of records). */ public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength"; /** * The default max scan length. */ public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000"; /** * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" * (favoring short scans) */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution"; /** * The default max scan length. */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for the order to insert records. Options are "ordered" or "hashed" */ public static final String INSERT_ORDER_PROPERTY = "insertorder"; /** * Default insert order. */ public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed"; /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; /** * How many times to retry when insertion of a single item to a DB fails. */ public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit"; public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0"; /** * On average, how long to wait between the retries, in seconds. */ public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval"; public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3"; public static final String OPEN_LOOP_PROPERTY = "openloop"; public static final String OPEN_LOOP_PROPERTY_DEFAULT = "false"; protected boolean openloop; protected NumberGenerator keysequence; protected DiscreteGenerator operationchooser; protected NumberGenerator keychooser; protected NumberGenerator fieldchooser; protected AcknowledgedCounterGenerator transactioninsertkeysequence; protected NumberGenerator scanlength; protected boolean orderedinserts; protected long fieldcount; protected long recordcount; protected int zeropadding; protected int insertionRetryLimit; protected int insertionRetryInterval; private Measurements measurements = Measurements.getMeasurements(); + private Set> rmwQueries = ConcurrentHashMap.newKeySet(); + protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException { NumberGenerator fieldlengthgenerator; String fieldlengthdistribution = p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int fieldlength = Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT)); int minfieldlength = Integer.parseInt(p.getProperty(MIN_FIELD_LENGTH_PROPERTY, MIN_FIELD_LENGTH_PROPERTY_DEFAULT)); String fieldlengthhistogram = p.getProperty( FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); if (fieldlengthdistribution.compareTo("constant") == 0) { fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); } else if (fieldlengthdistribution.compareTo("uniform") == 0) { fieldlengthgenerator = new UniformLongGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("zipfian") == 0) { fieldlengthgenerator = new ZipfianGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("histogram") == 0) { try { fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); } catch (IOException e) { throw new WorkloadException( "Couldn't read field length histogram file: " + fieldlengthhistogram, e); } } else { throw new WorkloadException( "Unknown field length distribution \"" + fieldlengthdistribution + "\""); } return fieldlengthgenerator; } /** * Initialize the scenario. * Called once, in the main client thread, before any operations are started. */ @Override public void init(Properties p) throws WorkloadException { table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); fieldcount = Long.parseLong(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT)); fieldnames = new ArrayList<>(); for (int i = 0; i < fieldcount; i++) { fieldnames.add("field" + i); } fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); recordcount = Long.parseLong(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { recordcount = Integer.MAX_VALUE; } String requestdistrib = p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); int minscanlength = Integer.parseInt(p.getProperty(MIN_SCAN_LENGTH_PROPERTY, MIN_SCAN_LENGTH_PROPERTY_DEFAULT)); int maxscanlength = Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib = p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); long insertstart = Long.parseLong(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT)); long insertcount= Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart))); // Confirm valid values for insertstart and insertcount in relation to recordcount if (recordcount < (insertstart + insertcount)) { System.err.println("Invalid combination of insertstart, insertcount and recordcount."); System.err.println("recordcount must be bigger than insertstart + insertcount."); System.exit(-1); } zeropadding = Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT)); readallfields = Boolean.parseBoolean( p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT)); writeallfields = Boolean.parseBoolean( p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); dataintegrity = Boolean.parseBoolean( p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); // Confirm that fieldlengthgenerator returns a constant if data // integrity check requested. if (dataintegrity && !(p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) { orderedinserts = false; } else if (requestdistrib.compareTo("exponential") == 0) { double percentile = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordcount * frac); } else { orderedinserts = true; } keysequence = new CounterGenerator(insertstart); operationchooser = createOperationGenerator(p); transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform") == 0) { keychooser = new UniformLongGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("sequential") == 0) { keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("zipfian") == 0) { // it does this by generating a random "next key" in part by taking the modulus over the // number of keys. // If the number of keys changes, this would shift the modulus, and we don't want that to // change which keys are popular so we'll actually construct the scrambled zipfian generator // with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict // the number of inserts, and tell the scrambled zipfian generator the number of existing keys // plus the number of predicted keys as the total keyspace. then, if the generator picks a key // that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of // the keyspace doesn't change from the perspective of the scrambled zipfian generator final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys); } else if (requestdistrib.compareTo("latest") == 0) { keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); } fieldchooser = new UniformLongGenerator(0, fieldcount - 1); if (scanlengthdistrib.compareTo("uniform") == 0) { scanlength = new UniformLongGenerator(minscanlength, maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian") == 0) { scanlength = new ZipfianGenerator(minscanlength, maxscanlength); } else { throw new WorkloadException( "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); } insertionRetryLimit = Integer.parseInt(p.getProperty( INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT)); insertionRetryInterval = Integer.parseInt(p.getProperty( INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT)); openloop = Boolean.parseBoolean( p.getProperty(OPEN_LOOP_PROPERTY, OPEN_LOOP_PROPERTY_DEFAULT)); } + @Override + public void threadCleanup(Object threadstate) { + if (!rmwQueries.isEmpty()) { + // Wait for asynchronous operations to finish. + System.err.println("CoreWorkload: Waiting for all RMW queries to finish."); + CompletableFuture.allOf(rmwQueries.toArray(new CompletableFuture[0])).join(); + } + } + protected String buildKeyName(long keynum) { if (!orderedinserts) { keynum = Utils.hash(keynum); } String value = Long.toString(keynum); int fill = zeropadding - value.length(); String prekey = "user"; for (int i = 0; i < fill; i++) { prekey += '0'; } return prekey + value; } /** * Builds a value for a randomly chosen field. */ private HashMap buildSingleValue(String key) { HashMap value = new HashMap<>(); String fieldkey = fieldnames.get(fieldchooser.nextValue().intValue()); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); } value.put(fieldkey, data); return value; } /** * Builds values for all fields. */ private HashMap buildValues(String key) { HashMap values = new HashMap<>(); for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); } values.put(fieldkey, data); } return values; } /** * Build a deterministic value given the key information. */ private String buildDeterministicValue(String key, String fieldkey) { int size = fieldlengthgenerator.nextValue().intValue(); StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, * this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override public boolean doInsert(AsyncDB db, Object threadstate) { int keynum = keysequence.nextValue().intValue(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); Status status; int numOfRetries = 0; do { status = db.syncView().insert(table, dbkey, values); if (null != status && status.isOk()) { break; } // Retry if configured. Without retrying, the load process will fail // even if one single insertion fails. User can optionally configure // an insertion retry limit (default is 0) to enable retry. if (++numOfRetries <= insertionRetryLimit) { System.err.println("Retrying insertion, retry count: " + numOfRetries); try { // Sleep for a random number between [0.8, 1.2)*insertionRetryInterval. int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random())); Thread.sleep(sleepTime); } catch (InterruptedException e) { break; } } else { System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries + "Insertion Retry Limit: " + insertionRetryLimit); break; } } while (true); return null != status && status.isOk(); } /** * Do one transaction operation. Because it will be called concurrently from multiple client * threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override public boolean doTransaction(AsyncDB db, Object threadstate) { String operation = operationchooser.nextString(); if(operation == null) { return false; } switch (operation) { case "READ": doTransactionRead(db); break; case "UPDATE": doTransactionUpdate(db); break; case "INSERT": doTransactionInsert(db); break; case "SCAN": doTransactionScan(db); break; default: doTransactionReadModifyWrite(db); } return true; } /** * Results are reported in the first three buckets of the histogram under * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. * Bucket 2 means null data was returned when some data was expected. */ protected void verifyRow(String key, HashMap cells) { Status verifyStatus = Status.OK; long startTime = System.nanoTime(); if (!cells.isEmpty()) { for (Map.Entry entry : cells.entrySet()) { if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) { verifyStatus = Status.UNEXPECTED_STATE; break; } } } else { // This assumes that null data is never valid verifyStatus = Status.ERROR; } long endTime = System.nanoTime(); measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); measurements.reportStatus("VERIFY", verifyStatus); } long nextKeynum() { long keynum; if (keychooser instanceof ExponentialGenerator) { do { keynum = transactioninsertkeysequence.lastValue() - keychooser.nextValue().intValue(); } while (keynum < 0); } else { do { keynum = keychooser.nextValue().intValue(); } while (keynum > transactioninsertkeysequence.lastValue()); } return keynum; } public void doTransactionRead(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } else if (dataintegrity) { // pass the full field list if dataintegrity is on for verification fields = new HashSet(fieldnames); } HashMap cells = new HashMap(); CompletableFuture readFuture = db.read(table, keyname, fields, cells).thenAccept((res) -> { if (res.isOk() && dataintegrity) { verifyRow(keyname, cells); } }); if (!openloop) { readFuture.join(); } } public void doTransactionReadModifyWrite(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } HashMap values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } // do the transaction HashMap cells = new HashMap(); long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); String opName = "READ-MODIFY-WRITE"; String opReadFailureName = "READ-MODIFY-WRITE-READ_FAILED"; String opWriteFailureName = "READ-MODIFY-WRITE-WRITE_FAILED"; CompletableFuture rFuture = db.read(table, keyname, fields, cells).thenAccept((rres) -> { if (!rres.isOk()) { long en = System.nanoTime(); measurements.measure(opReadFailureName, (int) ((en - st) / 1000)); measurements.measureIntended(opReadFailureName, (int) ((en - ist) / 1000)); } else { CompletableFuture wFuture = db.update(table, keyname, values).thenAccept((wres) -> { long en = System.nanoTime(); if (wres.isOk() && dataintegrity) { verifyRow(keyname, cells); } measurements.measure(wres.isOk() ? opName : opWriteFailureName, (int) ((en - st) / 1000)); measurements.measureIntended(wres.isOk() ? opName : opWriteFailureName, (int) ((en - ist) / 1000)); }); if (!openloop) { wFuture.join(); } } }); if (!openloop) { rFuture.join(); } + rmwQueries.add(rFuture); } public void doTransactionScan(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String startkeyname = buildKeyName(keynum); // choose a random scan length int len = scanlength.nextValue().intValue(); HashSet fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet(); fields.add(fieldname); } CompletableFuture scanFuture = db.scan(table, startkeyname, len, fields, new Vector>()); if (!openloop) { scanFuture.join(); } } public void doTransactionUpdate(AsyncDB db) { // choose a random key long keynum = nextKeynum(); String keyname = buildKeyName(keynum); HashMap values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } CompletableFuture updateFuture = db.update(table, keyname, values); if (!openloop) { updateFuture.join(); } } public void doTransactionInsert(AsyncDB db) { // choose the next key long keynum = transactioninsertkeysequence.nextValue(); String dbkey = buildKeyName(keynum); HashMap values = buildValues(dbkey); CompletableFuture insertFuture = db.insert(table, dbkey, values).thenAccept((res) -> { if (res.isOk()) { transactioninsertkeysequence.acknowledge(keynum); } }); if (!openloop) { insertFuture.join(); } } /** * Creates a weighted discrete values with database operations for a workload to perform. * Weights/proportions are read from the properties list and defaults are used * when values are not configured. * Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE". * * @param p The properties list to pull weights from. * @return A generator that can be used to determine the next operation to perform. * @throws IllegalArgumentException if the properties object was null. */ protected static DiscreteGenerator createOperationGenerator(final Properties p) { if (p == null) { throw new IllegalArgumentException("Properties object cannot be null"); } final double readproportion = Double.parseDouble( p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); final double updateproportion = Double.parseDouble( p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); final double scanproportion = Double.parseDouble( p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); final double readmodifywriteproportion = Double.parseDouble(p.getProperty( READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); final DiscreteGenerator operationchooser = new DiscreteGenerator(); if (readproportion > 0) { operationchooser.addValue(readproportion, "READ"); } if (updateproportion > 0) { operationchooser.addValue(updateproportion, "UPDATE"); } if (insertproportion > 0) { operationchooser.addValue(insertproportion, "INSERT"); } if (scanproportion > 0) { operationchooser.addValue(scanproportion, "SCAN"); } if (readmodifywriteproportion > 0) { operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); } return operationchooser; } }