diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
index 79be7d91..db78e879 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
@@ -1,452 +1,452 @@
/**
* Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CleanUp;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
/**
* Accumulo binding for YCSB.
*/
public class AccumuloClient extends DB {
private ZooKeeperInstance inst;
private Connector connector;
private String table = "";
private BatchWriter bw = null;
private Text colFam = new Text("");
private Scanner singleScanner = null; // A scanner for reads/deletes.
private Scanner scanScanner = null; // A scanner for use by scan()
private static final String PC_PRODUCER = "producer";
private static final String PC_CONSUMER = "consumer";
private String pcFlag = "";
private ZKProducerConsumer.Queue q = null;
private static Hashtable hmKeyReads = null;
private static Hashtable hmKeyNumReads = null;
private Random r = null;
@Override
public void init() throws DBException {
colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
inst = new ZooKeeperInstance(
getProperties().getProperty("accumulo.instanceName"),
getProperties().getProperty("accumulo.zooKeepers"));
try {
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token =
new PasswordToken(getProperties().getProperty("accumulo.password"));
connector = inst.getConnector(principal, token);
} catch (AccumuloException e) {
throw new DBException(e);
} catch (AccumuloSecurityException e) {
throw new DBException(e);
}
pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none");
if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) {
System.out.println("*** YCSB Client is " + pcFlag);
String address = getProperties().getProperty("accumulo.PC_SERVER");
String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
System.out
.println("*** PC_INFO(server:" + address + ";root=" + root + ")");
q = new ZKProducerConsumer.Queue(address, root);
r = new Random();
}
if (pcFlag.equals(PC_CONSUMER)) {
hmKeyReads = new Hashtable();
hmKeyNumReads = new Hashtable();
try {
keyNotification(null);
} catch (KeeperException e) {
throw new DBException(e);
}
}
}
@Override
public void cleanup() throws DBException {
try {
if (bw != null) {
bw.close();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
CleanUp.shutdownNow();
}
/**
* Commonly repeated functionality: Before doing any operation, make sure
* we're working on the correct table. If not, open the correct one.
*
* @param t
* The table to open.
*/
public void checkTable(String t) throws TableNotFoundException {
if (!table.equals(t)) {
getTable(t);
}
}
/**
* Called when the user specifies a table that isn't the same as the existing
* table. Connect to it and if necessary, close our current connection.
*
* @param t
* The table to open.
*/
public void getTable(String t) throws TableNotFoundException {
if (bw != null) { // Close the existing writer if necessary.
try {
bw.close();
} catch (MutationsRejectedException e) {
// Couldn't spit out the mutations we wanted.
// Ignore this for now.
System.err.println("MutationsRejectedException: " + e.getMessage());
}
}
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(
Long.parseLong(getProperties()
.getProperty("accumulo.batchWriterMaxLatency", "30000")),
TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(
getProperties().getProperty("accumulo.batchWriterSize", "100000")));
bwc.setMaxWriteThreads(Integer.parseInt(
getProperties().getProperty("accumulo.batchWriterThreads", "1")));
bw = connector.createBatchWriter(table, bwc);
// Create our scanners
singleScanner = connector.createScanner(table, Authorizations.EMPTY);
scanScanner = connector.createScanner(table, Authorizations.EMPTY);
table = t; // Store the name of the table we have open.
}
/**
* Gets a scanner from Accumulo over one row.
*
* @param row
* the row to scan
* @param fields
* the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(Text row, Set fields) {
singleScanner.clearColumns();
singleScanner.setRange(new Range(row));
if (fields != null) {
for (String field : fields) {
singleScanner.fetchColumn(colFam, new Text(field));
}
}
return singleScanner;
}
@Override
public Status read(String t, String key, Set fields,
HashMap result) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
try {
// Pick out the results we care about.
for (Entry entry : getRow(new Text(key), null)) {
Value v = entry.getValue();
byte[] buf = v.get();
result.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table" + key + e);
return Status.ERROR;
}
return Status.OK;
}
@Override
public Status scan(String t, String startkey, int recordcount,
Set fields, Vector> result) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
// There doesn't appear to be a way to create a range for a given
// LENGTH. Just start and end keys. So we'll do this the hard way for
// now:
// Just make the end 'infinity' and only read as much as we need.
scanScanner.clearColumns();
scanScanner.setRange(new Range(new Text(startkey), null));
// Batch size is how many key/values to try to get per call. Here, I'm
// guessing that the number of keys in a row is equal to the number of
// fields
// we're interested in.
// We try to fetch one more so as to tell when we've run out of fields.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanScanner.fetchColumn(colFam, new Text(field));
}
} // else - If no fields are provided, we assume one column/row.
String rowKey = "";
HashMap currentHM = null;
int count = 0;
// Begin the iteration.
for (Entry entry : scanScanner) {
// Check for a new row.
if (!rowKey.equals(entry.getKey().getRow().toString())) {
if (count++ == recordcount) { // Done reading the last row.
break;
}
rowKey = entry.getKey().getRow().toString();
if (fields != null) {
// Initial Capacity for all keys.
currentHM = new HashMap(fields.size());
} else {
// An empty result map.
currentHM = new HashMap();
}
result.add(currentHM);
}
// Now add the key to the hashmap.
Value v = entry.getValue();
byte[] buf = v.get();
currentHM.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
}
return Status.OK;
}
@Override
public Status update(String t, String key,
HashMap values) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
Mutation mutInsert = new Mutation(new Text(key));
for (Map.Entry entry : values.entrySet()) {
mutInsert.put(colFam, new Text(entry.getKey()),
System.currentTimeMillis(), new Value(entry.getValue().toArray()));
}
try {
bw.addMutation(mutInsert);
// Distributed YCSB co-ordination: YCSB on a client produces the key
// to
// be stored in the shared queue in ZooKeeper.
if (pcFlag.equals(PC_PRODUCER)) {
if (r.nextFloat() < 0.01) {
keyNotification(key);
}
}
} catch (MutationsRejectedException e) {
System.err.println("Error performing update.");
e.printStackTrace();
return Status.ERROR;
} catch (KeeperException e) {
System.err.println("Error notifying the Zookeeper Queue.");
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
@Override
public Status insert(String t, String key,
HashMap values) {
return update(t, key, values);
}
@Override
public Status delete(String t, String key) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
try {
deleteRow(new Text(key));
} catch (MutationsRejectedException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(Text row) throws MutationsRejectedException {
deleteRow(getRow(row, null));
}
/**
* Deletes a row, given a Scanner of JUST that row.
*/
private void deleteRow(Scanner scanner) throws MutationsRejectedException {
Mutation deleter = null;
// iterate through the keys
for (Entry entry : scanner) {
// create a mutation for the row
if (deleter == null) {
deleter = new Mutation(entry.getKey().getRow());
}
// the remove function adds the key with the delete flag set to true
deleter.putDelete(entry.getKey().getColumnFamily(),
entry.getKey().getColumnQualifier());
}
bw.addMutation(deleter);
}
private void keyNotification(String key) throws KeeperException {
if (pcFlag.equals(PC_PRODUCER)) {
try {
q.produce(key);
} catch (InterruptedException e) {
// Reset the interrupted state.
Thread.currentThread().interrupt();
}
} else {
// XXX: do something better to keep the loop going (while??)
for (int i = 0; i < 10000000; i++) {
try {
String strKey = q.consume();
if (!hmKeyReads.containsKey(strKey)
&& !hmKeyNumReads.containsKey(strKey)) {
hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
hmKeyNumReads.put(strKey, new Integer(1));
}
// YCSB Consumer will read the key that was fetched from the
// queue in ZooKeeper.
// (current way is kind of ugly but works, i think)
// TODO : Get table name from configuration or argument
String usertable = "usertable";
HashSet fields = new HashSet();
for (int j = 0; j < 9; j++) {
fields.add("field" + j);
}
HashMap result =
new HashMap();
read(usertable, strKey, fields, result);
// If the results are empty, the key is enqueued in
// Zookeeper
// and tried again, until the results are found.
- if (result.size() == 0) {
+ if (result.isEmpty()) {
q.produce(strKey);
int count = ((Integer) hmKeyNumReads.get(strKey)).intValue();
hmKeyNumReads.put(strKey, new Integer(count + 1));
} else {
if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) {
long currTime = System.currentTimeMillis();
long writeTime = ((Long) hmKeyReads.get(strKey)).longValue();
System.out.println(
"Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch="
+ currTime + ";TimeLag=" + (currTime - writeTime));
}
}
} catch (InterruptedException e) {
// Reset the interrupted state.
Thread.currentThread().interrupt();
}
}
}
}
public Status presplit(String t, String[] keys)
throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
TreeSet splits = new TreeSet();
for (int i = 0; i < keys.length; i++) {
splits.add(new Text(keys[i]));
}
connector.tableOperations().addSplits(t, splits);
return Status.OK;
}
}
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
index 2d42c79a..1b478f72 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
@@ -1,193 +1,193 @@
/**
* Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* Implementing the PC (Producer/Consumer) Queue in ZooKeeper.
*/
public class ZKProducerConsumer implements Watcher {
private static ZooKeeper zk = null;
private static Integer mutex;
private String root;
/**
* Constructor that takes the address of the ZK server.
*
* @param address
* The address of the ZK server.
*/
ZKProducerConsumer(String address) {
if (zk == null) {
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
// else mutex = new Integer(-1);
}
public synchronized void process(WatchedEvent event) {
synchronized (mutex) {
// System.out.println("Process: " + event.getType());
mutex.notify();
}
}
/**
* Returns the root.
*
* @return The root.
*/
protected String getRoot() {
return root;
}
/**
* Sets the root.
*
* @param r
* The root value.
*/
protected void setRoot(String r) {
this.root = r;
}
/**
* QueueElement a single queue element. No longer used.
* @deprecated No longer used.
*/
@Deprecated
public static class QueueElement {
private String key;
private long writeTime;
QueueElement(String key, long writeTime) {
this.key = key;
this.writeTime = writeTime;
}
}
/**
* Producer-Consumer queue.
*/
public static class Queue extends ZKProducerConsumer {
/**
* Constructor of producer-consumer queue.
*
* @param address
* The Zookeeper server address.
* @param name
* The name of the root element for the queue.
*/
Queue(String address, String name) {
super(address);
this.setRoot(name);
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(getRoot(), false);
if (s == null) {
zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println(
"Keeper exception when instantiating queue: " + e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* Producer calls this method to insert the key in the queue.
*
* @param key
* The key to produce (add to the queue).
* @return True if the key was added.
* @throws KeeperException
* On a failure talking to zookeeper.
* @throws InterruptedException
* If the current thread is interrupted waiting for the zookeeper
* acknowledgement.
*/
//
boolean produce(String key) throws KeeperException, InterruptedException {
byte[] value;
value = key.getBytes();
zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
/**
* Consumer calls this method to "wait" for the key to the available.
*
* @return The key to consumed (remove from the queue).
* @throws KeeperException
* On a failure talking to zookeeper.
* @throws InterruptedException
* If the current thread is interrupted waiting for the zookeeper
* acknowledgement.
*/
String consume() throws KeeperException, InterruptedException {
String retvalue = null;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List list = zk.getChildren(getRoot(), true);
- if (list.size() == 0) {
+ if (list.isEmpty()) {
System.out.println("Going to wait");
mutex.wait();
} else {
String path = getRoot() + "/" + list.get(0);
byte[] b = zk.getData(path, false, stat);
retvalue = new String(b);
zk.delete(path, -1);
return retvalue;
}
}
}
}
}
}
diff --git a/core/src/main/java/com/yahoo/ycsb/CommandLine.java b/core/src/main/java/com/yahoo/ycsb/CommandLine.java
index 02885730..80c1fc18 100644
--- a/core/src/main/java/com/yahoo/ycsb/CommandLine.java
+++ b/core/src/main/java/com/yahoo/ycsb/CommandLine.java
@@ -1,411 +1,411 @@
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import java.util.Properties;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Enumeration;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Vector;
import com.yahoo.ycsb.Client;
import com.yahoo.ycsb.workloads.*;
/**
* A simple command line client to a database, using the appropriate com.yahoo.ycsb.DB implementation.
*/
public class CommandLine
{
public static final String DEFAULT_DB="com.yahoo.ycsb.BasicDB";
public static void usageMessage()
{
System.out.println("YCSB Command Line Client");
System.out.println("Usage: java com.yahoo.ycsb.CommandLine [options]");
System.out.println("Options:");
System.out.println(" -P filename: Specify a property file");
System.out.println(" -p name=value: Specify a property value");
System.out.println(" -db classname: Use a specified DB class (can also set the \"db\" property)");
System.out.println(" -table tablename: Use the table name instead of the default \""+CoreWorkload.TABLENAME_PROPERTY_DEFAULT+"\"");
System.out.println();
}
public static void help()
{
System.out.println("Commands:");
System.out.println(" read key [field1 field2 ...] - Read a record");
System.out.println(" scan key recordcount [field1 field2 ...] - Scan starting at key");
System.out.println(" insert key name1=value1 [name2=value2 ...] - Insert a new record");
System.out.println(" update key name1=value1 [name2=value2 ...] - Update a record");
System.out.println(" delete key - Delete a record");
System.out.println(" table [tablename] - Get or [set] the name of the table");
System.out.println(" quit - Quit");
}
public static void main(String[] args)
{
int argindex=0;
Properties props=new Properties();
Properties fileprops=new Properties();
String table=CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
while ( (argindex=args.length)
{
usageMessage();
System.exit(0);
}
props.setProperty(Client.DB_PROPERTY, args[argindex]);
argindex++;
}
else if (args[argindex].compareTo("-P")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.exit(0);
}
String propfile=args[argindex];
argindex++;
Properties myfileprops=new Properties();
try
{
myfileprops.load(new FileInputStream(propfile));
}
catch (IOException e)
{
System.out.println(e.getMessage());
System.exit(0);
}
for (Enumeration e=myfileprops.propertyNames(); e.hasMoreElements(); )
{
String prop=(String)e.nextElement();
fileprops.setProperty(prop,myfileprops.getProperty(prop));
}
}
else if (args[argindex].compareTo("-p")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.exit(0);
}
int eq=args[argindex].indexOf('=');
if (eq<0)
{
usageMessage();
System.exit(0);
}
String name=args[argindex].substring(0,eq);
String value=args[argindex].substring(eq+1);
props.put(name,value);
//System.out.println("["+name+"]=["+value+"]");
argindex++;
}
else if (args[argindex].compareTo("-table")==0)
{
argindex++;
if (argindex>=args.length)
{
usageMessage();
System.exit(0);
}
table=args[argindex];
argindex++;
}
else
{
System.out.println("Unknown option "+args[argindex]);
usageMessage();
System.exit(0);
}
if (argindex>=args.length)
{
break;
}
}
if (argindex!=args.length)
{
usageMessage();
System.exit(0);
}
for (Enumeration e=props.propertyNames(); e.hasMoreElements(); )
{
String prop=(String)e.nextElement();
fileprops.setProperty(prop,props.getProperty(prop));
}
props=fileprops;
System.out.println("YCSB Command Line client");
System.out.println("Type \"help\" for command line help");
System.out.println("Start with \"-help\" for usage info");
//create a DB
String dbname=props.getProperty(Client.DB_PROPERTY, DEFAULT_DB);
ClassLoader classLoader = CommandLine.class.getClassLoader();
DB db=null;
try
{
Class dbclass = classLoader.loadClass(dbname);
db=(DB)dbclass.newInstance();
}
catch (Exception e)
{
e.printStackTrace();
System.exit(0);
}
db.setProperties(props);
try
{
db.init();
}
catch (DBException e)
{
e.printStackTrace();
System.exit(0);
}
System.out.println("Connected.");
//main loop
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (;;)
{
//get user input
System.out.print("> ");
String input=null;
try
{
input=br.readLine();
}
catch (IOException e)
{
e.printStackTrace();
System.exit(1);
}
if (input.compareTo("")==0)
{
continue;
}
if (input.compareTo("help")==0)
{
help();
continue;
}
if (input.compareTo("quit")==0)
{
break;
}
String[] tokens=input.split(" ");
long st=System.currentTimeMillis();
//handle commands
if (tokens[0].compareTo("table")==0)
{
if (tokens.length==1)
{
System.out.println("Using table \""+table+"\"");
}
else if (tokens.length==2)
{
table=tokens[1];
System.out.println("Using table \""+table+"\"");
}
else
{
System.out.println("Error: syntax is \"table tablename\"");
}
}
else if (tokens[0].compareTo("read")==0)
{
if (tokens.length==1)
{
System.out.println("Error: syntax is \"read keyname [field1 field2 ...]\"");
}
else
{
Set fields=null;
if (tokens.length>2)
{
fields=new HashSet();
for (int i=2; i result=new HashMap();
Status ret=db.read(table,tokens[1],fields,result);
System.out.println("Return code: "+ret.getName());
for (Map.Entry ent : result.entrySet())
{
System.out.println(ent.getKey()+"="+ent.getValue());
}
}
}
else if (tokens[0].compareTo("scan")==0)
{
if (tokens.length<3)
{
System.out.println("Error: syntax is \"scan keyname scanlength [field1 field2 ...]\"");
}
else
{
Set fields=null;
if (tokens.length>3)
{
fields=new HashSet();
for (int i=3; i> results=new Vector>();
Status ret=db.scan(table,tokens[1],Integer.parseInt(tokens[2]),fields,results);
System.out.println("Result: "+ret.getName());
int record=0;
- if (results.size()==0)
+ if (results.isEmpty())
{
System.out.println("0 records");
}
else
{
System.out.println("--------------------------------");
}
for (HashMap result : results)
{
System.out.println("Record "+(record++));
for (Map.Entry ent : result.entrySet())
{
System.out.println(ent.getKey()+"="+ent.getValue());
}
System.out.println("--------------------------------");
}
}
}
else if (tokens[0].compareTo("update")==0)
{
if (tokens.length<3)
{
System.out.println("Error: syntax is \"update keyname name1=value1 [name2=value2 ...]\"");
}
else
{
HashMap values=new HashMap();
for (int i=2; i values=new HashMap();
for (int i=2; i The following options must be passed when using this database client.
*
*
*
couchbase.url=http://127.0.0.1:8091/pools The connection URL from one server.
*
couchbase.bucket=default The bucket name to use./li>
*
couchbase.password= The password of the bucket.
*
couchbase.checkFutures=true If the futures should be inspected (makes ops sync).
couchbase.json=true Use json or java serialization as target format.
*
*
* @author Michael Nitschinger
*/
public class CouchbaseClient extends DB {
public static final String URL_PROPERTY = "couchbase.url";
public static final String BUCKET_PROPERTY = "couchbase.bucket";
public static final String PASSWORD_PROPERTY = "couchbase.password";
public static final String CHECKF_PROPERTY = "couchbase.checkFutures";
public static final String PERSIST_PROPERTY = "couchbase.persistTo";
public static final String REPLICATE_PROPERTY = "couchbase.replicateTo";
public static final String JSON_PROPERTY = "couchbase.json";
protected static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private com.couchbase.client.CouchbaseClient client;
private PersistTo persistTo;
private ReplicateTo replicateTo;
private boolean checkFutures;
private boolean useJson;
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void init() throws DBException {
Properties props = getProperties();
String url = props.getProperty(URL_PROPERTY, "http://127.0.0.1:8091/pools");
String bucket = props.getProperty(BUCKET_PROPERTY, "default");
String password = props.getProperty(PASSWORD_PROPERTY, "");
checkFutures = props.getProperty(CHECKF_PROPERTY, "true").equals("true");
useJson = props.getProperty(JSON_PROPERTY, "true").equals("true");
persistTo = parsePersistTo(props.getProperty(PERSIST_PROPERTY, "0"));
replicateTo = parseReplicateTo(props.getProperty(REPLICATE_PROPERTY, "0"));
Properties systemProperties = System.getProperties();
systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger");
System.setProperties(systemProperties);
try {
client = new com.couchbase.client.CouchbaseClient(
Arrays.asList(new URI(url)),
bucket,
password
);
} catch (Exception e) {
throw new DBException("Could not create CouchbaseClient object.", e);
}
}
/**
* Parse the replicate property into the correct enum.
*
* @param property the stringified property value.
* @throws DBException if parsing the property did fail.
* @return the correct enum.
*/
private ReplicateTo parseReplicateTo(final String property) throws DBException {
int value = Integer.parseInt(property);
switch (value) {
case 0: return ReplicateTo.ZERO;
case 1: return ReplicateTo.ONE;
case 2: return ReplicateTo.TWO;
case 3: return ReplicateTo.THREE;
default:
throw new DBException(REPLICATE_PROPERTY + " must be between 0 and 3");
}
}
/**
* Parse the persist property into the correct enum.
*
* @param property the stringified property value.
* @throws DBException if parsing the property did fail.
* @return the correct enum.
*/
private PersistTo parsePersistTo(final String property) throws DBException {
int value = Integer.parseInt(property);
switch (value) {
case 0: return PersistTo.ZERO;
case 1: return PersistTo.ONE;
case 2: return PersistTo.TWO;
case 3: return PersistTo.THREE;
case 4: return PersistTo.FOUR;
default:
throw new DBException(PERSIST_PROPERTY + " must be between 0 and 4");
}
}
/**
* Shutdown the client.
*/
@Override
public void cleanup() {
client.shutdown();
}
@Override
public Status read(final String table, final String key, final Set fields,
final HashMap result) {
String formattedKey = formatKey(table, key);
try {
Object loaded = client.get(formattedKey);
if (loaded == null) {
return Status.ERROR;
}
decode(loaded, fields, result);
return Status.OK;
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not read value for key " + formattedKey, e);
}
return Status.ERROR;
}
}
/**
* Scan is currently not implemented.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return Status.ERROR, because not implemented yet.
*/
@Override
public Status scan(final String table, final String startkey, final int recordcount,
final Set fields, final Vector> result) {
return Status.ERROR;
}
@Override
public Status update(final String table, final String key, final HashMap values) {
String formattedKey = formatKey(table, key);
try {
final OperationFuture future = client.replace(
formattedKey,
encode(values),
persistTo,
replicateTo
);
return checkFutureStatus(future);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not update value for key " + formattedKey, e);
}
return Status.ERROR;
}
}
@Override
public Status insert(final String table, final String key, final HashMap values) {
String formattedKey = formatKey(table, key);
try {
final OperationFuture future = client.add(
formattedKey,
encode(values),
persistTo,
replicateTo
);
return checkFutureStatus(future);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not insert value for key " + formattedKey, e);
}
return Status.ERROR;
}
}
@Override
public Status delete(final String table, final String key) {
String formattedKey = formatKey(table, key);
try {
final OperationFuture future = client.delete(formattedKey, persistTo, replicateTo);
return checkFutureStatus(future);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not delete value for key " + formattedKey, e);
}
return Status.ERROR;
}
}
/**
* Prefix the key with the given prefix, to establish a unique namespace.
*
* @param prefix the prefix to use.
* @param key the actual key.
* @return the formatted and prefixed key.
*/
private String formatKey(final String prefix, final String key) {
return prefix + ":" + key;
}
/**
* Wrapper method that either inspects the future or not.
*
* @param future the future to potentially verify.
* @return the status of the future result.
*/
private Status checkFutureStatus(final OperationFuture> future) {
if (checkFutures) {
return future.getStatus().isSuccess() ? Status.OK : Status.ERROR;
} else {
return Status.OK;
}
}
/**
* Decode the object from server into the storable result.
*
* @param source the loaded object.
* @param fields the fields to check.
* @param dest the result passed back to the ycsb core.
*/
private void decode(final Object source, final Set fields,
final HashMap dest) {
if (useJson) {
try {
JsonNode json = JSON_MAPPER.readTree((String) source);
- boolean checkFields = fields != null && fields.size() > 0;
+ boolean checkFields = fields != null && !fields.isEmpty();
for (Iterator> jsonFields = json.fields(); jsonFields.hasNext();) {
Map.Entry jsonField = jsonFields.next();
String name = jsonField.getKey();
if (checkFields && fields.contains(name)) {
continue;
}
JsonNode jsonValue = jsonField.getValue();
if (jsonValue != null && !jsonValue.isNull()) {
dest.put(name, new StringByteIterator(jsonValue.asText()));
}
}
} catch (Exception e) {
throw new RuntimeException("Could not decode JSON");
}
} else {
HashMap converted = (HashMap) source;
for (Map.Entry entry : converted.entrySet()) {
dest.put(entry.getKey(), new StringByteIterator(entry.getValue()));
}
}
}
/**
* Encode the object for couchbase storage.
*
* @param source the source value.
* @return the storable object.
*/
private Object encode(final HashMap source) {
HashMap stringMap = StringByteIterator.getStringMap(source);
if (!useJson) {
return stringMap;
}
ObjectNode node = JSON_MAPPER.createObjectNode();
for (Map.Entry pair : stringMap.entrySet()) {
node.put(pair.getKey(), pair.getValue());
}
JsonFactory jsonFactory = new JsonFactory();
Writer writer = new StringWriter();
try {
JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer);
JSON_MAPPER.writeTree(jsonGenerator, node);
} catch (Exception e) {
throw new RuntimeException("Could not encode JSON value");
}
return writer.toString();
}
}
diff --git a/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java b/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java
index 8a95d8fb..9ce0b93c 100644
--- a/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java
+++ b/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java
@@ -1,294 +1,294 @@
/**
* Copyright (c) 2014-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
// We also use `net.spy.memcached.MemcachedClient`; it is not imported
// explicitly and referred to with its full path to avoid conflicts with the
// class of the same name in this file.
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.apache.log4j.Logger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Concrete Memcached client implementation.
*/
public class MemcachedClient extends DB {
private final Logger logger = Logger.getLogger(getClass());
protected static final ObjectMapper MAPPER = new ObjectMapper();
private boolean checkOperationStatus;
private long shutdownTimeoutMillis;
private int objectExpirationTime;
public static final String HOSTS_PROPERTY = "memcached.hosts";
public static final int DEFAULT_PORT = 11211;
private static final String TEMPORARY_FAILURE_MSG = "Temporary failure";
private static final String CANCELLED_MSG = "cancelled";
public static final String SHUTDOWN_TIMEOUT_MILLIS_PROPERTY =
"memcached.shutdownTimeoutMillis";
public static final String DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = "30000";
public static final String OBJECT_EXPIRATION_TIME_PROPERTY =
"memcached.objectExpirationTime";
public static final String DEFAULT_OBJECT_EXPIRATION_TIME =
String.valueOf(Integer.MAX_VALUE);
public static final String CHECK_OPERATION_STATUS_PROPERTY =
"memcached.checkOperationStatus";
public static final String CHECK_OPERATION_STATUS_DEFAULT = "true";
public static final String READ_BUFFER_SIZE_PROPERTY =
"memcached.readBufferSize";
public static final String DEFAULT_READ_BUFFER_SIZE = "3000000";
public static final String OP_TIMEOUT_PROPERTY = "memcached.opTimeoutMillis";
public static final String DEFAULT_OP_TIMEOUT = "60000";
public static final String FAILURE_MODE_PROPERTY = "memcached.failureMode";
public static final FailureMode FAILURE_MODE_PROPERTY_DEFAULT =
FailureMode.Redistribute;
/**
* The MemcachedClient implementation that will be used to communicate
* with the memcached server.
*/
private net.spy.memcached.MemcachedClient client;
/**
* @returns Underlying Memcached protocol client, implemented by
* SpyMemcached.
*/
protected net.spy.memcached.MemcachedClient memcachedClient() {
return client;
}
@Override
public void init() throws DBException {
try {
client = createMemcachedClient();
checkOperationStatus = Boolean.parseBoolean(
getProperties().getProperty(CHECK_OPERATION_STATUS_PROPERTY,
CHECK_OPERATION_STATUS_DEFAULT));
objectExpirationTime = Integer.parseInt(
getProperties().getProperty(OBJECT_EXPIRATION_TIME_PROPERTY,
DEFAULT_OBJECT_EXPIRATION_TIME));
shutdownTimeoutMillis = Integer.parseInt(
getProperties().getProperty(SHUTDOWN_TIMEOUT_MILLIS_PROPERTY,
DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
} catch (Exception e) {
throw new DBException(e);
}
}
protected net.spy.memcached.MemcachedClient createMemcachedClient()
throws Exception {
ConnectionFactoryBuilder connectionFactoryBuilder =
new ConnectionFactoryBuilder();
connectionFactoryBuilder.setReadBufferSize(Integer.parseInt(
getProperties().getProperty(READ_BUFFER_SIZE_PROPERTY,
DEFAULT_READ_BUFFER_SIZE)));
connectionFactoryBuilder.setOpTimeout(Integer.parseInt(
getProperties().getProperty(OP_TIMEOUT_PROPERTY, DEFAULT_OP_TIMEOUT)));
String failureString = getProperties().getProperty(FAILURE_MODE_PROPERTY);
connectionFactoryBuilder.setFailureMode(
failureString == null ? FAILURE_MODE_PROPERTY_DEFAULT
: FailureMode.valueOf(failureString));
// Note: this only works with IPv4 addresses due to its assumption of
// ":" being the separator of hostname/IP and port; this is not the case
// when dealing with IPv6 addresses.
//
// TODO(mbrukman): fix this.
List addresses = new ArrayList();
String[] hosts = getProperties().getProperty(HOSTS_PROPERTY).split(",");
for (String address : hosts) {
int colon = address.indexOf(":");
int port = DEFAULT_PORT;
String host = address;
if (colon != -1) {
port = Integer.parseInt(address.substring(colon + 1));
host = address.substring(0, colon);
}
addresses.add(new InetSocketAddress(host, port));
}
return new net.spy.memcached.MemcachedClient(
connectionFactoryBuilder.build(), addresses);
}
@Override
public Status read(
String table, String key, Set fields,
HashMap result) {
key = createQualifiedKey(table, key);
try {
GetFuture