Page MenuHomec4science

Client.java
No OneTemporary

File Metadata

Created
Mon, Jun 10, 06:06

Client.java

/**
* Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.HTraceConfiguration;
import com.yahoo.ycsb.measurements.Measurements;
import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
/**
* A thread to periodically show the status of the experiment, to reassure you that progress is being made.
*
* @author cooperb
*/
class StatusThread extends Thread
{
/** Counts down each of the clients completing. */
private final CountDownLatch _completeLatch;
/** Stores the measurements for the run. */
private final Measurements _measurements;
/** Whether or not to track the JVM stats per run */
private final boolean _trackJVMStats;
/** The clients that are running. */
private final List<ClientThread> _clients;
private final String _label;
private final boolean _standardstatus;
/** The interval for reporting status. */
private long _sleeptimeNs;
/** JVM max/mins */
private int _maxThreads;
private int _minThreads = Integer.MAX_VALUE;
private long _maxUsedMem;
private long _minUsedMem = Long.MAX_VALUE;
private double _maxLoadAvg;
private double _minLoadAvg = Double.MAX_VALUE;
private long lastGCCount = 0;
private long lastGCTime = 0;
/**
* Creates a new StatusThread without JVM stat tracking.
*
* @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} as they complete.
* @param clients The clients to collect metrics from.
* @param label The label for the status.
* @param standardstatus If true the status is printed to stdout in addition to stderr.
* @param statusIntervalSeconds The number of seconds between status updates.
*/
public StatusThread(CountDownLatch completeLatch, List<ClientThread> clients,
String label, boolean standardstatus, int statusIntervalSeconds)
{
this(completeLatch, clients, label, standardstatus, statusIntervalSeconds, false);
}
/**
* Creates a new StatusThread.
*
* @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} as they complete.
* @param clients The clients to collect metrics from.
* @param label The label for the status.
* @param standardstatus If true the status is printed to stdout in addition to stderr.
* @param statusIntervalSeconds The number of seconds between status updates.
* @param trackJVMStats Whether or not to track JVM stats.
*/
public StatusThread(CountDownLatch completeLatch, List<ClientThread> clients,
String label, boolean standardstatus, int statusIntervalSeconds,
boolean trackJVMStats)
{
_completeLatch=completeLatch;
_clients=clients;
_label=label;
_standardstatus=standardstatus;
_sleeptimeNs=TimeUnit.SECONDS.toNanos(statusIntervalSeconds);
_measurements = Measurements.getMeasurements();
_trackJVMStats = trackJVMStats;
}
/**
* Run and periodically report status.
*/
@Override
public void run()
{
final long startTimeMs=System.currentTimeMillis();
final long startTimeNanos = System.nanoTime();
long deadline = startTimeNanos + _sleeptimeNs;
long startIntervalMs=startTimeMs;
long lastTotalOps=0;
boolean alldone;
do
{
long nowMs=System.currentTimeMillis();
lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps);
if (_trackJVMStats) {
measureJVM();
}
alldone = waitForClientsUntil(deadline);
startIntervalMs=nowMs;
deadline+=_sleeptimeNs;
}
while (!alldone);
if (_trackJVMStats) {
measureJVM();
}
// Print the final stats.
computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps);
}
/**
* Computes and prints the stats.
*
* @param startTimeMs The start time of the test.
* @param startIntervalMs The start time of this interval.
* @param endIntervalMs The end time (now) for the interval.
* @param lastTotalOps The last total operations count.
*
* @return The current operation count.
*/
private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs,
long lastTotalOps) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
long totalops=0;
long todoops=0;
// Calculate the total number of operations completed.
for (ClientThread t : _clients)
{
totalops+=t.getOpsDone();
todoops+=t.getOpsTodo();
}
long interval=endIntervalMs-startTimeMs;
double throughput=1000.0*(((double)totalops)/(double)interval);
double curthroughput=1000.0*(((double)(totalops-lastTotalOps))/((double)(endIntervalMs-startIntervalMs)));
long estremaining = (long) Math.ceil(todoops / throughput);
DecimalFormat d = new DecimalFormat("#.##");
String label = _label + format.format(new Date());
StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: ");
msg.append(totalops).append(" operations; ");
if (totalops != 0) {
msg.append(d.format(curthroughput)).append(" current ops/sec; ");
}
if (todoops != 0) {
msg.append("est completion in ").append(RemainingFormatter.format(estremaining));
}
msg.append(Measurements.getMeasurements().getSummary());
System.err.println(msg);
if (_standardstatus) {
System.out.println(msg);
}
return totalops;
}
/**
* Waits for all of the client to finish or the deadline to expire.
*
* @param deadline The current deadline.
*
* @return True if all of the clients completed.
*/
private boolean waitForClientsUntil(long deadline) {
boolean alldone=false;
long now=System.nanoTime();
while( !alldone && now < deadline ) {
try {
alldone = _completeLatch.await(deadline-now, TimeUnit.NANOSECONDS);
}
catch( InterruptedException ie) {
// If we are interrupted the thread is being asked to shutdown.
// Return true to indicate that and reset the interrupt state
// of the thread.
Thread.currentThread().interrupt();
alldone=true;
}
now=System.nanoTime();
}
return alldone;
}
/** Executes the JVM measurements. */
private void measureJVM() {
final int threads = Utils.getActiveThreadCount();
if (threads < _minThreads) {
_minThreads = threads;
}
if (threads > _maxThreads) {
_maxThreads = threads;
}
_measurements.measure("THREAD_COUNT", threads);
// TODO - once measurements allow for other number types, switch to using
// the raw bytes. Otherwise we can track in MB to avoid negative values
// when faced with huge heaps.
final int usedMem = Utils.getUsedMemoryMegaBytes();
if (usedMem < _minUsedMem) {
_minUsedMem = usedMem;
}
if (usedMem > _maxUsedMem) {
_maxUsedMem = usedMem;
}
_measurements.measure("USED_MEM_MB", usedMem);
// Some JVMs may not implement this feature so if the value is less than
// zero, just ommit it.
final double systemLoad = Utils.getSystemLoadAverage();
if (systemLoad >= 0) {
// TODO - store the double if measurements allows for them
_measurements.measure("SYS_LOAD_AVG", (int)systemLoad);
if (systemLoad > _maxLoadAvg) {
_maxLoadAvg = systemLoad;
}
if (systemLoad < _minLoadAvg) {
_minLoadAvg = systemLoad;
}
}
final long gcs = Utils.getGCTotalCollectionCount();
_measurements.measure("GCS", (int)(gcs - lastGCCount));
final long gcTime = Utils.getGCTotalTime();
_measurements.measure("GCS_TIME", (int)(gcTime - lastGCTime));
lastGCCount = gcs;
lastGCTime = gcTime;
}
/** @return The maximum threads running during the test. */
public int getMaxThreads() {
return _maxThreads;
}
/** @return The minimum threads running during the test. */
public int getMinThreads() {
return _minThreads;
}
/** @return The maximum memory used during the test. */
public long getMaxUsedMem() {
return _maxUsedMem;
}
/** @return The minimum memory used during the test. */
public long getMinUsedMem() {
return _minUsedMem;
}
/** @return The maximum load average during the test. */
public double getMaxLoadAvg() {
return _maxLoadAvg;
}
/** @return The minimum load average during the test. */
public double getMinLoadAvg() {
return _minLoadAvg;
}
/** @return Whether or not the thread is tracking JVM stats. */
public boolean trackJVMStats() {
return _trackJVMStats;
}
}
/**
* Turn seconds remaining into more useful units.
* i.e. if there are hours or days worth of seconds, use them.
*/
class RemainingFormatter {
public static StringBuilder format(long seconds) {
StringBuilder time = new StringBuilder();
long days = TimeUnit.SECONDS.toDays(seconds);
if (days > 0) {
time.append(days).append(" days ");
seconds -= TimeUnit.DAYS.toSeconds(days);
}
long hours = TimeUnit.SECONDS.toHours(seconds);
if (hours > 0) {
time.append(hours).append(" hours ");
seconds -= TimeUnit.HOURS.toSeconds(hours);
}
/* Only include minute granularity if we're < 1 day. */
if (days < 1) {
long minutes = TimeUnit.SECONDS.toMinutes(seconds);
if (minutes > 0) {
time.append(minutes).append(" minutes ");
seconds -= TimeUnit.MINUTES.toSeconds(seconds);
}
}
/* Only bother to include seconds if we're < 1 minute */
if (time.length() == 0) {
time.append(seconds).append(" seconds ");
}
return time;
}
}
/**
* A thread for executing transactions or data inserts to the database.
*
* @author cooperb
*
*/
class ClientThread implements Runnable
{
/** Counts down each of the clients completing. */
private final CountDownLatch _completeLatch;
private static boolean _spinSleep;
DB _db;
boolean _dotransactions;
Workload _workload;
int _opcount;
double _targetOpsPerMs;
int _opsdone;
int _threadid;
int _threadcount;
Object _workloadstate;
Properties _props;
long _targetOpsTickNs;
final Measurements _measurements;
/**
* Constructor.
*
* @param db the DB implementation to use
* @param dotransactions true to do transactions, false to insert data
* @param workload the workload to use
* @param props the properties defining the experiment
* @param opcount the number of operations (transactions or inserts) to do
* @param targetperthreadperms target number of operations per thread per ms
* @param completeLatch The latch tracking the completion of all clients.
*/
public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount, double targetperthreadperms, CountDownLatch completeLatch)
{
_db=db;
_dotransactions=dotransactions;
_workload=workload;
_opcount=opcount;
_opsdone=0;
if(targetperthreadperms > 0){
_targetOpsPerMs=targetperthreadperms;
_targetOpsTickNs=(long)(1000000/_targetOpsPerMs);
}
_props=props;
_measurements = Measurements.getMeasurements();
_spinSleep = Boolean.valueOf(_props.getProperty("spin.sleep", "false"));
_completeLatch=completeLatch;
}
public int getOpsDone()
{
return _opsdone;
}
@Override
public void run()
{
try
{
_db.init();
}
catch (DBException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return;
}
try
{
_workloadstate=_workload.initThread(_props,_threadid,_threadcount);
}
catch (WorkloadException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return;
}
//NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements
// and the client thread have the same view on time.
//spread the thread operations out so they don't all hit the DB at the same time
// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
// and the sleep() doesn't make sense for granularities < 1 ms anyway
if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
{
long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
sleepUntil(System.nanoTime() + randomMinorDelay);
}
try
{
if (_dotransactions)
{
long startTimeNanos = System.nanoTime();
while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{
if (!_workload.doTransaction(_db,_workloadstate))
{
break;
}
_opsdone++;
throttleNanos(startTimeNanos);
}
}
else
{
long startTimeNanos = System.nanoTime();
while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{
if (!_workload.doInsert(_db,_workloadstate))
{
break;
}
_opsdone++;
throttleNanos(startTimeNanos);
}
}
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try
{
_measurements.setIntendedStartTimeNs(0);
_db.cleanup();
}
catch (DBException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return;
}
finally
{
_completeLatch.countDown();
}
}
static void sleepUntil(long deadline) {
long now = System.nanoTime();
while((now = System.nanoTime()) < deadline) {
if (!_spinSleep) {
LockSupport.parkNanos(deadline - now);
}
}
}
private void throttleNanos(long startTimeNanos) {
//throttle the operations
if (_targetOpsPerMs > 0)
{
// delay until next tick
long deadline = startTimeNanos + _opsdone*_targetOpsTickNs;
sleepUntil(deadline);
_measurements.setIntendedStartTimeNs(deadline);
}
}
/**
* the total amount of work this thread is still expected to do
*/
public int getOpsTodo()
{
int todo = _opcount - _opsdone;
return todo < 0 ? 0 : todo;
}
}
/**
* Main class for executing YCSB.
*/
public class Client
{
public static final String DEFAULT_RECORD_COUNT = "0";
/**
* The target number of operations to perform.
*/
public static final String OPERATION_COUNT_PROPERTY="operationcount";
/**
* The number of records to load into the database initially.
*/
public static final String RECORD_COUNT_PROPERTY="recordcount";
/**
* The workload class to be loaded.
*/
public static final String WORKLOAD_PROPERTY="workload";
/**
* The database class to be used.
*/
public static final String DB_PROPERTY="db";
/**
* The exporter class to be used. The default is
* com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter.
*/
public static final String EXPORTER_PROPERTY="exporter";
/**
* If set to the path of a file, YCSB will write all output to this file
* instead of STDOUT.
*/
public static final String EXPORT_FILE_PROPERTY="exportfile";
/**
* The number of YCSB client threads to run.
*/
public static final String THREAD_COUNT_PROPERTY="threadcount";
/**
* Indicates how many inserts to do, if less than recordcount. Useful for partitioning
* the load among multiple servers, if the client is the bottleneck. Additionally, workloads
* should support the "insertstart" property, which tells them which record to start at.
*/
public static final String INSERT_COUNT_PROPERTY="insertcount";
/**
* Target number of operations per second
*/
public static final String TARGET_PROPERTY="target";
/**
* The maximum amount of time (in seconds) for which the benchmark will be run.
*/
public static final String MAX_EXECUTION_TIME = "maxexecutiontime";
/**
* Whether or not this is the transaction phase (run) or not (load).
*/
public static final String DO_TRANSACTIONS_PROPERTY = "dotransactions";
/** An optional thread used to track progress and measure JVM stats. */
private static StatusThread statusthread = null;
// HTrace integration related constants.
/**
* All keys for configuring the tracing system start with this prefix.
*/
private static final String HTRACE_KEY_PREFIX="htrace.";
private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init";
private static final String CLIENT_INIT_SPAN = "Client#init";
private static final String CLIENT_WORKLOAD_SPAN = "Client#workload";
private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup";
private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements";
public static void usageMessage()
{
System.out.println("Usage: java com.yahoo.ycsb.Client [options]");
System.out.println("Options:");
System.out.println(" -threads n: execute using n threads (default: 1) - can also be specified as the \n" +
" \"threadcount\" property using -p");
System.out.println(" -target n: attempt to do n operations per second (default: unlimited) - can also\n" +
" be specified as the \"target\" property using -p");
System.out.println(" -load: run the loading phase of the workload");
System.out.println(" -t: run the transactions phase of the workload (default)");
System.out.println(" -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - \n" +
" can also be specified as the \"db\" property using -p");
System.out.println(" -P propertyfile: load properties from the given file. Multiple files can");
System.out.println(" be specified, and will be processed in the order specified");
System.out.println(" -p name=value: specify a property to be passed to the DB and workloads;");
System.out.println(" multiple properties can be specified, and override any");
System.out.println(" values in the propertyfile");
System.out.println(" -s: show status during run (default: no status)");
System.out.println(" -l label: use label for status (e.g. to label one experiment out of a whole batch)");
System.out.println("");
System.out.println("Required properties:");
System.out.println(" "+WORKLOAD_PROPERTY+": the name of the workload class to use (e.g. com.yahoo.ycsb.workloads.CoreWorkload)");
System.out.println("");
System.out.println("To run the transaction phase from multiple servers, start a separate client on each.");
System.out.println("To run the load phase from multiple servers, start a separate client on each; additionally,");
System.out.println("use the \"insertcount\" and \"insertstart\" properties to divide up the records to be inserted");
}
public static boolean checkRequiredProperties(Properties props)
{
if (props.getProperty(WORKLOAD_PROPERTY)==null)
{
System.out.println("Missing property: "+WORKLOAD_PROPERTY);
return false;
}
return true;
}
/**
* Exports the measurements to either sysout or a file using the exporter
* loaded from conf.
* @throws IOException Either failed to write to output stream or failed to close it.
*/
private static void exportMeasurements(Properties props, int opcount, long runtime)
throws IOException
{
MeasurementsExporter exporter = null;
try
{
// if no destination file is provided the results will be written to stdout
OutputStream out;
String exportFile = props.getProperty(EXPORT_FILE_PROPERTY);
if (exportFile == null)
{
out = System.out;
} else
{
out = new FileOutputStream(exportFile);
}
// if no exporter is provided the default text one will be used
String exporterStr = props.getProperty(EXPORTER_PROPERTY, "com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter");
try
{
exporter = (MeasurementsExporter) Class.forName(exporterStr).getConstructor(OutputStream.class).newInstance(out);
} catch (Exception e)
{
System.err.println("Could not find exporter " + exporterStr
+ ", will use default text reporter.");
e.printStackTrace();
exporter = new TextMeasurementsExporter(out);
}
exporter.write("OVERALL", "RunTime(ms)", runtime);
double throughput = 1000.0 * (opcount) / (runtime);
exporter.write("OVERALL", "Throughput(ops/sec)", throughput);
final Map<String, Long[]> gcs = Utils.getGCStatst();
long totalGCCount = 0;
long totalGCTime = 0;
for (final Entry<String, Long[]> entry : gcs.entrySet()) {
exporter.write("TOTAL_GCS_" + entry.getKey(), "Count", entry.getValue()[0]);
exporter.write("TOTAL_GC_TIME_" + entry.getKey(), "Time(ms)", entry.getValue()[1]);
exporter.write("TOTAL_GC_TIME_%_" + entry.getKey(), "Time(%)",((double)entry.getValue()[1] / runtime) * (double)100);
totalGCCount += entry.getValue()[0];
totalGCTime += entry.getValue()[1];
}
exporter.write("TOTAL_GCs", "Count", totalGCCount);
exporter.write("TOTAL_GC_TIME", "Time(ms)", totalGCTime);
exporter.write("TOTAL_GC_TIME_%", "Time(%)", ((double)totalGCTime / runtime) * (double)100);
if (statusthread != null && statusthread.trackJVMStats()) {
exporter.write("MAX_MEM_USED", "MBs", statusthread.getMaxUsedMem());
exporter.write("MIN_MEM_USED", "MBs", statusthread.getMinUsedMem());
exporter.write("MAX_THREADS", "Count", statusthread.getMaxThreads());
exporter.write("MIN_THREADS", "Count", statusthread.getMinThreads());
exporter.write("MAX_SYS_LOAD_AVG", "Load", statusthread.getMaxLoadAvg());
exporter.write("MIN_SYS_LOAD_AVG", "Load", statusthread.getMinLoadAvg());
}
Measurements.getMeasurements().exportMeasurements(exporter);
} finally
{
if (exporter != null)
{
exporter.close();
}
}
}
@SuppressWarnings("unchecked")
public static void main(String[] args)
{
String dbname;
Properties props=new Properties();
Properties fileprops=new Properties();
boolean dotransactions=true;
int threadcount=1;
int target=0;
boolean status=false;
String label="";
//parse arguments
int argindex=0;
if (args.length==0)
{
usageMessage();
System.out.println("At least one argument specifying a workload is required.");
System.exit(0);
}
while (args[argindex].startsWith("-"))
{
if (args[argindex].compareTo("-threads")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -threads.");
System.exit(0);
}
int tcount=Integer.parseInt(args[argindex]);
props.setProperty(THREAD_COUNT_PROPERTY, String.valueOf(tcount));
argindex++;
}
else if (args[argindex].compareTo("-target")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -target.");
System.exit(0);
}
int ttarget=Integer.parseInt(args[argindex]);
props.setProperty(TARGET_PROPERTY, String.valueOf(ttarget));
argindex++;
}
else if (args[argindex].compareTo("-load")==0)
{
dotransactions=false;
argindex++;
}
else if (args[argindex].compareTo("-t")==0)
{
dotransactions=true;
argindex++;
}
else if (args[argindex].compareTo("-s")==0)
{
status=true;
argindex++;
}
else if (args[argindex].compareTo("-db")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -db.");
System.exit(0);
}
props.setProperty(DB_PROPERTY,args[argindex]);
argindex++;
}
else if (args[argindex].compareTo("-l")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -l.");
System.exit(0);
}
label=args[argindex];
argindex++;
}
else if (args[argindex].compareTo("-P")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -P.");
System.exit(0);
}
String propfile=args[argindex];
argindex++;
Properties myfileprops=new Properties();
try
{
myfileprops.load(new FileInputStream(propfile));
}
catch (IOException e)
{
System.out.println("Unable to open the properties file " + propfile);
System.out.println(e.getMessage());
System.exit(0);
}
//Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5
for (Enumeration e=myfileprops.propertyNames(); e.hasMoreElements(); )
{
String prop=(String)e.nextElement();
fileprops.setProperty(prop,myfileprops.getProperty(prop));
}
}
else if (args[argindex].compareTo("-p")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.out.println("Missing argument value for -p");
System.exit(0);
}
int eq=args[argindex].indexOf('=');
if (eq<0)
{
usageMessage();
System.out.println("Argument '-p' expected to be in key=value format (e.g., -p operationcount=99999)");
System.exit(0);
}
String name=args[argindex].substring(0,eq);
String value=args[argindex].substring(eq+1);
props.put(name,value);
//System.out.println("["+name+"]=["+value+"]");
argindex++;
}
else
{
usageMessage();
System.out.println("Unknown option " + args[argindex]);
System.exit(0);
}
if (argindex>=args.length)
{
break;
}
}
if (argindex != args.length)
{
usageMessage();
if (argindex < args.length) {
System.out.println("An argument value without corresponding argument specifier (e.g., -p, -s) was found. "
+ "We expected an argument specifier and instead found " + args[argindex]);
} else {
System.out.println("An argument specifier without corresponding value was found at the end of the supplied command line arguments.");
}
System.exit(0);
}
//set up logging
//BasicConfigurator.configure();
//overwrite file properties with properties from the command line
//Issue #5 - remove call to stringPropertyNames to make compilable under Java 1.5
for (Enumeration e=props.propertyNames(); e.hasMoreElements(); )
{
String prop=(String)e.nextElement();
fileprops.setProperty(prop,props.getProperty(prop));
}
props=fileprops;
if (!checkRequiredProperties(props))
{
System.out.println("Failed check required properties.");
System.exit(0);
}
props.setProperty(DO_TRANSACTIONS_PROPERTY, String.valueOf(dotransactions));
long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0"));
//get number of threads, target and db
threadcount=Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY,"1"));
dbname=props.getProperty(DB_PROPERTY,"com.yahoo.ycsb.BasicDB");
target=Integer.parseInt(props.getProperty(TARGET_PROPERTY,"0"));
//compute the target throughput
double targetperthreadperms=-1;
if (target>0)
{
double targetperthread=((double)target)/((double)threadcount);
targetperthreadperms=targetperthread/1000.0;
}
final Map<String, String> filteredProperties = new HashMap<>();
for (String key : props.stringPropertyNames()) {
if (key.startsWith(HTRACE_KEY_PREFIX)) {
filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key));
}
}
final HTraceConfiguration conf = HTraceConfiguration.fromMap(filteredProperties);
//show a warning message that creating the workload is taking a while
//but only do so if it is taking longer than 2 seconds
//(showing the message right away if the setup wasn't taking very long was confusing people)
Thread warningthread=new Thread()
{
@Override
public void run()
{
try
{
sleep(2000);
}
catch (InterruptedException e)
{
return;
}
System.err.println(" (might take a few minutes for large data sets)");
}
};
warningthread.start();
//set up measurements
Measurements.setProperties(props);
//load the workload
ClassLoader classLoader = Client.class.getClassLoader();
try {
Properties projectProp = new Properties();
projectProp.load(classLoader.getResourceAsStream("project.properties"));
System.err.println("YCSB Client " + projectProp.getProperty("version"));
} catch (IOException e) {
System.err.println("Unable to retrieve client version.");
}
System.err.print("Command line:");
for (int i=0; i<args.length; i++)
{
System.err.print(" "+args[i]);
}
System.err.println();
System.err.println("Loading workload...");
Workload workload = null;
try
{
Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY));
workload = (Workload)workloadclass.newInstance();
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
final Tracer tracer = new Tracer.Builder("YCSB " + workload.getClass().getSimpleName())
.conf(conf)
.build();
try
{
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) {
workload.init(props);
warningthread.interrupt();
}
}
catch (WorkloadException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
//run the workload
System.err.println("Starting test.");
final CountDownLatch completeLatch = new CountDownLatch(threadcount);
final List<ClientThread> clients = new ArrayList<ClientThread>(threadcount);
boolean initFailed = false;
try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) {
int opcount;
if (dotransactions)
{
opcount=Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY,"0"));
}
else
{
if (props.containsKey(INSERT_COUNT_PROPERTY))
{
opcount=Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY,"0"));
}
else
{
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
}
}
for (int threadid=0; threadid<threadcount; threadid++)
{
DB db = null;
try
{
db = DBFactory.newDB(dbname, props, tracer);
}
catch (UnknownDBException e)
{
System.out.println("Unknown DB " + dbname);
initFailed = true;
break;
}
int threadopcount = opcount / threadcount;
// ensure correct number of operations, in case opcount is not a multiple of threadcount
if (threadid<opcount%threadcount)
{
++threadopcount;
}
ClientThread t=new ClientThread(db,dotransactions,workload,props,threadopcount, targetperthreadperms, completeLatch);
clients.add(t);
}
}
if (initFailed) {
System.err.println("Error initializing datastore bindings.");
System.exit(0);
}
if (status)
{
boolean standardstatus=false;
if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY,"").compareTo("timeseries")==0)
{
standardstatus=true;
}
int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval","10"));
boolean trackJVMStats = props.getProperty(Measurements.MEASUREMENT_TRACK_JVM_PROPERTY,
Measurements.MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT).equals("true");
statusthread=new StatusThread(completeLatch,clients,label,standardstatus,statusIntervalSeconds,trackJVMStats);
statusthread.start();
}
Thread terminator = null;
long st;
long en;
int opsDone;
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) {
final Map<Thread, ClientThread> threads = new HashMap<Thread, ClientThread>(threadcount);
for (ClientThread client : clients) {
threads.put(new Thread(tracer.wrap(client, "ClientThread")), client);
}
st=System.currentTimeMillis();
for (Thread t : threads.keySet())
{
t.start();
}
if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload);
terminator.start();
}
opsDone = 0;
for (Map.Entry<Thread, ClientThread> entry : threads.entrySet())
{
try
{
entry.getKey().join();
opsDone += entry.getValue().getOpsDone();
}
catch (InterruptedException e)
{
}
}
en=System.currentTimeMillis();
}
try
{
try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) {
if (terminator != null && !terminator.isInterrupted()) {
terminator.interrupt();
}
if (status)
{
// wake up status thread if it's asleep
statusthread.interrupt();
// at this point we assume all the monitored threads are already gone as per above join loop.
try {
statusthread.join();
} catch (InterruptedException e) {
}
}
workload.cleanup();
}
}
catch (WorkloadException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try
{
try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) {
exportMeasurements(props, opsDone, en - st);
}
} catch (IOException e)
{
System.err.println("Could not export measurements, error: " + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
}

Event Timeline