diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java index 79be7d91..db78e879 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java @@ -1,452 +1,452 @@ /** * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.Vector; import java.util.concurrent.TimeUnit; /** * Accumulo binding for YCSB. */ public class AccumuloClient extends DB { private ZooKeeperInstance inst; private Connector connector; private String table = ""; private BatchWriter bw = null; private Text colFam = new Text(""); private Scanner singleScanner = null; // A scanner for reads/deletes. private Scanner scanScanner = null; // A scanner for use by scan() private static final String PC_PRODUCER = "producer"; private static final String PC_CONSUMER = "consumer"; private String pcFlag = ""; private ZKProducerConsumer.Queue q = null; private static Hashtable hmKeyReads = null; private static Hashtable hmKeyNumReads = null; private Random r = null; @Override public void init() throws DBException { colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); inst = new ZooKeeperInstance( getProperties().getProperty("accumulo.instanceName"), getProperties().getProperty("accumulo.zooKeepers")); try { String principal = getProperties().getProperty("accumulo.username"); AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); connector = inst.getConnector(principal, token); } catch (AccumuloException e) { throw new DBException(e); } catch (AccumuloSecurityException e) { throw new DBException(e); } pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none"); if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) { System.out.println("*** YCSB Client is " + pcFlag); String address = getProperties().getProperty("accumulo.PC_SERVER"); String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); System.out .println("*** PC_INFO(server:" + address + ";root=" + root + ")"); q = new ZKProducerConsumer.Queue(address, root); r = new Random(); } if (pcFlag.equals(PC_CONSUMER)) { hmKeyReads = new Hashtable(); hmKeyNumReads = new Hashtable(); try { keyNotification(null); } catch (KeeperException e) { throw new DBException(e); } } } @Override public void cleanup() throws DBException { try { if (bw != null) { bw.close(); } } catch (MutationsRejectedException e) { throw new DBException(e); } CleanUp.shutdownNow(); } /** * Commonly repeated functionality: Before doing any operation, make sure * we're working on the correct table. If not, open the correct one. * * @param t * The table to open. */ public void checkTable(String t) throws TableNotFoundException { if (!table.equals(t)) { getTable(t); } } /** * Called when the user specifies a table that isn't the same as the existing * table. Connect to it and if necessary, close our current connection. * * @param t * The table to open. */ public void getTable(String t) throws TableNotFoundException { if (bw != null) { // Close the existing writer if necessary. try { bw.close(); } catch (MutationsRejectedException e) { // Couldn't spit out the mutations we wanted. // Ignore this for now. System.err.println("MutationsRejectedException: " + e.getMessage()); } } BatchWriterConfig bwc = new BatchWriterConfig(); bwc.setMaxLatency( Long.parseLong(getProperties() .getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); bwc.setMaxMemory(Long.parseLong( getProperties().getProperty("accumulo.batchWriterSize", "100000"))); bwc.setMaxWriteThreads(Integer.parseInt( getProperties().getProperty("accumulo.batchWriterThreads", "1"))); bw = connector.createBatchWriter(table, bwc); // Create our scanners singleScanner = connector.createScanner(table, Authorizations.EMPTY); scanScanner = connector.createScanner(table, Authorizations.EMPTY); table = t; // Store the name of the table we have open. } /** * Gets a scanner from Accumulo over one row. * * @param row * the row to scan * @param fields * the set of columns to scan * @return an Accumulo {@link Scanner} bound to the given row and columns */ private Scanner getRow(Text row, Set fields) { singleScanner.clearColumns(); singleScanner.setRange(new Range(row)); if (fields != null) { for (String field : fields) { singleScanner.fetchColumn(colFam, new Text(field)); } } return singleScanner; } @Override public Status read(String t, String key, Set fields, HashMap result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { // Pick out the results we care about. for (Entry entry : getRow(new Text(key), null)) { Value v = entry.getValue(); byte[] buf = v.get(); result.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } } catch (Exception e) { System.err.println("Error trying to reading Accumulo table" + key + e); return Status.ERROR; } return Status.OK; } @Override public Status scan(String t, String startkey, int recordcount, Set fields, Vector> result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } // There doesn't appear to be a way to create a range for a given // LENGTH. Just start and end keys. So we'll do this the hard way for // now: // Just make the end 'infinity' and only read as much as we need. scanScanner.clearColumns(); scanScanner.setRange(new Range(new Text(startkey), null)); // Batch size is how many key/values to try to get per call. Here, I'm // guessing that the number of keys in a row is equal to the number of // fields // we're interested in. // We try to fetch one more so as to tell when we've run out of fields. if (fields != null) { // And add each of them as fields we want. for (String field : fields) { scanScanner.fetchColumn(colFam, new Text(field)); } } // else - If no fields are provided, we assume one column/row. String rowKey = ""; HashMap currentHM = null; int count = 0; // Begin the iteration. for (Entry entry : scanScanner) { // Check for a new row. if (!rowKey.equals(entry.getKey().getRow().toString())) { if (count++ == recordcount) { // Done reading the last row. break; } rowKey = entry.getKey().getRow().toString(); if (fields != null) { // Initial Capacity for all keys. currentHM = new HashMap(fields.size()); } else { // An empty result map. currentHM = new HashMap(); } result.add(currentHM); } // Now add the key to the hashmap. Value v = entry.getValue(); byte[] buf = v.get(); currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } return Status.OK; } @Override public Status update(String t, String key, HashMap values) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } Mutation mutInsert = new Mutation(new Text(key)); for (Map.Entry entry : values.entrySet()) { mutInsert.put(colFam, new Text(entry.getKey()), System.currentTimeMillis(), new Value(entry.getValue().toArray())); } try { bw.addMutation(mutInsert); // Distributed YCSB co-ordination: YCSB on a client produces the key // to // be stored in the shared queue in ZooKeeper. if (pcFlag.equals(PC_PRODUCER)) { if (r.nextFloat() < 0.01) { keyNotification(key); } } } catch (MutationsRejectedException e) { System.err.println("Error performing update."); e.printStackTrace(); return Status.ERROR; } catch (KeeperException e) { System.err.println("Error notifying the Zookeeper Queue."); e.printStackTrace(); return Status.ERROR; } return Status.OK; } @Override public Status insert(String t, String key, HashMap values) { return update(t, key, values); } @Override public Status delete(String t, String key) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { deleteRow(new Text(key)); } catch (MutationsRejectedException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } catch (RuntimeException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } return Status.OK; } // These functions are adapted from RowOperations.java: private void deleteRow(Text row) throws MutationsRejectedException { deleteRow(getRow(row, null)); } /** * Deletes a row, given a Scanner of JUST that row. */ private void deleteRow(Scanner scanner) throws MutationsRejectedException { Mutation deleter = null; // iterate through the keys for (Entry entry : scanner) { // create a mutation for the row if (deleter == null) { deleter = new Mutation(entry.getKey().getRow()); } // the remove function adds the key with the delete flag set to true deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); } bw.addMutation(deleter); } private void keyNotification(String key) throws KeeperException { if (pcFlag.equals(PC_PRODUCER)) { try { q.produce(key); } catch (InterruptedException e) { // Reset the interrupted state. Thread.currentThread().interrupt(); } } else { // XXX: do something better to keep the loop going (while??) for (int i = 0; i < 10000000; i++) { try { String strKey = q.consume(); if (!hmKeyReads.containsKey(strKey) && !hmKeyNumReads.containsKey(strKey)) { hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); hmKeyNumReads.put(strKey, new Integer(1)); } // YCSB Consumer will read the key that was fetched from the // queue in ZooKeeper. // (current way is kind of ugly but works, i think) // TODO : Get table name from configuration or argument String usertable = "usertable"; HashSet fields = new HashSet(); for (int j = 0; j < 9; j++) { fields.add("field" + j); } HashMap result = new HashMap(); read(usertable, strKey, fields, result); // If the results are empty, the key is enqueued in // Zookeeper // and tried again, until the results are found. - if (result.size() == 0) { + if (result.isEmpty()) { q.produce(strKey); int count = ((Integer) hmKeyNumReads.get(strKey)).intValue(); hmKeyNumReads.put(strKey, new Integer(count + 1)); } else { if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) { long currTime = System.currentTimeMillis(); long writeTime = ((Long) hmKeyReads.get(strKey)).longValue(); System.out.println( "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch=" + currTime + ";TimeLag=" + (currTime - writeTime)); } } } catch (InterruptedException e) { // Reset the interrupted state. Thread.currentThread().interrupt(); } } } } public Status presplit(String t, String[] keys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { TreeSet splits = new TreeSet(); for (int i = 0; i < keys.length; i++) { splits.add(new Text(keys[i])); } connector.tableOperations().addSplits(t, splits); return Status.OK; } } diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java index 2d42c79a..1b478f72 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java @@ -1,193 +1,193 @@ /** * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * Implementing the PC (Producer/Consumer) Queue in ZooKeeper. */ public class ZKProducerConsumer implements Watcher { private static ZooKeeper zk = null; private static Integer mutex; private String root; /** * Constructor that takes the address of the ZK server. * * @param address * The address of the ZK server. */ ZKProducerConsumer(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, this); mutex = new Integer(-1); System.out.println("Finished starting ZK: " + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } // else mutex = new Integer(-1); } public synchronized void process(WatchedEvent event) { synchronized (mutex) { // System.out.println("Process: " + event.getType()); mutex.notify(); } } /** * Returns the root. * * @return The root. */ protected String getRoot() { return root; } /** * Sets the root. * * @param r * The root value. */ protected void setRoot(String r) { this.root = r; } /** * QueueElement a single queue element. No longer used. * @deprecated No longer used. */ @Deprecated public static class QueueElement { private String key; private long writeTime; QueueElement(String key, long writeTime) { this.key = key; this.writeTime = writeTime; } } /** * Producer-Consumer queue. */ public static class Queue extends ZKProducerConsumer { /** * Constructor of producer-consumer queue. * * @param address * The Zookeeper server address. * @param name * The name of the root element for the queue. */ Queue(String address, String name) { super(address); this.setRoot(name); // Create ZK node name if (zk != null) { try { Stat s = zk.exists(getRoot(), false); if (s == null) { zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println( "Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * Producer calls this method to insert the key in the queue. * * @param key * The key to produce (add to the queue). * @return True if the key was added. * @throws KeeperException * On a failure talking to zookeeper. * @throws InterruptedException * If the current thread is interrupted waiting for the zookeeper * acknowledgement. */ // boolean produce(String key) throws KeeperException, InterruptedException { byte[] value; value = key.getBytes(); zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * Consumer calls this method to "wait" for the key to the available. * * @return The key to consumed (remove from the queue). * @throws KeeperException * On a failure talking to zookeeper. * @throws InterruptedException * If the current thread is interrupted waiting for the zookeeper * acknowledgement. */ String consume() throws KeeperException, InterruptedException { String retvalue = null; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { List list = zk.getChildren(getRoot(), true); - if (list.size() == 0) { + if (list.isEmpty()) { System.out.println("Going to wait"); mutex.wait(); } else { String path = getRoot() + "/" + list.get(0); byte[] b = zk.getData(path, false, stat); retvalue = new String(b); zk.delete(path, -1); return retvalue; } } } } } } diff --git a/core/src/main/java/com/yahoo/ycsb/CommandLine.java b/core/src/main/java/com/yahoo/ycsb/CommandLine.java index 02885730..80c1fc18 100644 --- a/core/src/main/java/com/yahoo/ycsb/CommandLine.java +++ b/core/src/main/java/com/yahoo/ycsb/CommandLine.java @@ -1,411 +1,411 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import java.util.Properties; import java.io.FileInputStream; import java.io.IOException; import java.util.Enumeration; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.HashSet; import java.util.Vector; import com.yahoo.ycsb.Client; import com.yahoo.ycsb.workloads.*; /** * A simple command line client to a database, using the appropriate com.yahoo.ycsb.DB implementation. */ public class CommandLine { public static final String DEFAULT_DB="com.yahoo.ycsb.BasicDB"; public static void usageMessage() { System.out.println("YCSB Command Line Client"); System.out.println("Usage: java com.yahoo.ycsb.CommandLine [options]"); System.out.println("Options:"); System.out.println(" -P filename: Specify a property file"); System.out.println(" -p name=value: Specify a property value"); System.out.println(" -db classname: Use a specified DB class (can also set the \"db\" property)"); System.out.println(" -table tablename: Use the table name instead of the default \""+CoreWorkload.TABLENAME_PROPERTY_DEFAULT+"\""); System.out.println(); } public static void help() { System.out.println("Commands:"); System.out.println(" read key [field1 field2 ...] - Read a record"); System.out.println(" scan key recordcount [field1 field2 ...] - Scan starting at key"); System.out.println(" insert key name1=value1 [name2=value2 ...] - Insert a new record"); System.out.println(" update key name1=value1 [name2=value2 ...] - Update a record"); System.out.println(" delete key - Delete a record"); System.out.println(" table [tablename] - Get or [set] the name of the table"); System.out.println(" quit - Quit"); } public static void main(String[] args) { int argindex=0; Properties props=new Properties(); Properties fileprops=new Properties(); String table=CoreWorkload.TABLENAME_PROPERTY_DEFAULT; while ( (argindex=args.length) { usageMessage(); System.exit(0); } props.setProperty(Client.DB_PROPERTY, args[argindex]); argindex++; } else if (args[argindex].compareTo("-P")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } String propfile=args[argindex]; argindex++; Properties myfileprops=new Properties(); try { myfileprops.load(new FileInputStream(propfile)); } catch (IOException e) { System.out.println(e.getMessage()); System.exit(0); } for (Enumeration e=myfileprops.propertyNames(); e.hasMoreElements(); ) { String prop=(String)e.nextElement(); fileprops.setProperty(prop,myfileprops.getProperty(prop)); } } else if (args[argindex].compareTo("-p")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } int eq=args[argindex].indexOf('='); if (eq<0) { usageMessage(); System.exit(0); } String name=args[argindex].substring(0,eq); String value=args[argindex].substring(eq+1); props.put(name,value); //System.out.println("["+name+"]=["+value+"]"); argindex++; } else if (args[argindex].compareTo("-table")==0) { argindex++; if (argindex>=args.length) { usageMessage(); System.exit(0); } table=args[argindex]; argindex++; } else { System.out.println("Unknown option "+args[argindex]); usageMessage(); System.exit(0); } if (argindex>=args.length) { break; } } if (argindex!=args.length) { usageMessage(); System.exit(0); } for (Enumeration e=props.propertyNames(); e.hasMoreElements(); ) { String prop=(String)e.nextElement(); fileprops.setProperty(prop,props.getProperty(prop)); } props=fileprops; System.out.println("YCSB Command Line client"); System.out.println("Type \"help\" for command line help"); System.out.println("Start with \"-help\" for usage info"); //create a DB String dbname=props.getProperty(Client.DB_PROPERTY, DEFAULT_DB); ClassLoader classLoader = CommandLine.class.getClassLoader(); DB db=null; try { Class dbclass = classLoader.loadClass(dbname); db=(DB)dbclass.newInstance(); } catch (Exception e) { e.printStackTrace(); System.exit(0); } db.setProperties(props); try { db.init(); } catch (DBException e) { e.printStackTrace(); System.exit(0); } System.out.println("Connected."); //main loop BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for (;;) { //get user input System.out.print("> "); String input=null; try { input=br.readLine(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } if (input.compareTo("")==0) { continue; } if (input.compareTo("help")==0) { help(); continue; } if (input.compareTo("quit")==0) { break; } String[] tokens=input.split(" "); long st=System.currentTimeMillis(); //handle commands if (tokens[0].compareTo("table")==0) { if (tokens.length==1) { System.out.println("Using table \""+table+"\""); } else if (tokens.length==2) { table=tokens[1]; System.out.println("Using table \""+table+"\""); } else { System.out.println("Error: syntax is \"table tablename\""); } } else if (tokens[0].compareTo("read")==0) { if (tokens.length==1) { System.out.println("Error: syntax is \"read keyname [field1 field2 ...]\""); } else { Set fields=null; if (tokens.length>2) { fields=new HashSet(); for (int i=2; i result=new HashMap(); Status ret=db.read(table,tokens[1],fields,result); System.out.println("Return code: "+ret.getName()); for (Map.Entry ent : result.entrySet()) { System.out.println(ent.getKey()+"="+ent.getValue()); } } } else if (tokens[0].compareTo("scan")==0) { if (tokens.length<3) { System.out.println("Error: syntax is \"scan keyname scanlength [field1 field2 ...]\""); } else { Set fields=null; if (tokens.length>3) { fields=new HashSet(); for (int i=3; i> results=new Vector>(); Status ret=db.scan(table,tokens[1],Integer.parseInt(tokens[2]),fields,results); System.out.println("Result: "+ret.getName()); int record=0; - if (results.size()==0) + if (results.isEmpty()) { System.out.println("0 records"); } else { System.out.println("--------------------------------"); } for (HashMap result : results) { System.out.println("Record "+(record++)); for (Map.Entry ent : result.entrySet()) { System.out.println(ent.getKey()+"="+ent.getValue()); } System.out.println("--------------------------------"); } } } else if (tokens[0].compareTo("update")==0) { if (tokens.length<3) { System.out.println("Error: syntax is \"update keyname name1=value1 [name2=value2 ...]\""); } else { HashMap values=new HashMap(); for (int i=2; i values=new HashMap(); for (int i=2; i The following options must be passed when using this database client. * *
    *
  • couchbase.url=http://127.0.0.1:8091/pools The connection URL from one server.
  • *
  • couchbase.bucket=default The bucket name to use./li> *
  • couchbase.password= The password of the bucket.
  • *
  • couchbase.checkFutures=true If the futures should be inspected (makes ops sync).
  • *
  • couchbase.persistTo=0 Observe Persistence ("PersistTo" constraint)
  • *
  • couchbase.replicateTo=0 Observe Replication ("ReplicateTo" constraint)
  • *
  • couchbase.json=true Use json or java serialization as target format.
  • *
* * @author Michael Nitschinger */ public class CouchbaseClient extends DB { public static final String URL_PROPERTY = "couchbase.url"; public static final String BUCKET_PROPERTY = "couchbase.bucket"; public static final String PASSWORD_PROPERTY = "couchbase.password"; public static final String CHECKF_PROPERTY = "couchbase.checkFutures"; public static final String PERSIST_PROPERTY = "couchbase.persistTo"; public static final String REPLICATE_PROPERTY = "couchbase.replicateTo"; public static final String JSON_PROPERTY = "couchbase.json"; protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); private com.couchbase.client.CouchbaseClient client; private PersistTo persistTo; private ReplicateTo replicateTo; private boolean checkFutures; private boolean useJson; private final Logger log = LoggerFactory.getLogger(getClass()); @Override public void init() throws DBException { Properties props = getProperties(); String url = props.getProperty(URL_PROPERTY, "http://127.0.0.1:8091/pools"); String bucket = props.getProperty(BUCKET_PROPERTY, "default"); String password = props.getProperty(PASSWORD_PROPERTY, ""); checkFutures = props.getProperty(CHECKF_PROPERTY, "true").equals("true"); useJson = props.getProperty(JSON_PROPERTY, "true").equals("true"); persistTo = parsePersistTo(props.getProperty(PERSIST_PROPERTY, "0")); replicateTo = parseReplicateTo(props.getProperty(REPLICATE_PROPERTY, "0")); Properties systemProperties = System.getProperties(); systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger"); System.setProperties(systemProperties); try { client = new com.couchbase.client.CouchbaseClient( Arrays.asList(new URI(url)), bucket, password ); } catch (Exception e) { throw new DBException("Could not create CouchbaseClient object.", e); } } /** * Parse the replicate property into the correct enum. * * @param property the stringified property value. * @throws DBException if parsing the property did fail. * @return the correct enum. */ private ReplicateTo parseReplicateTo(final String property) throws DBException { int value = Integer.parseInt(property); switch (value) { case 0: return ReplicateTo.ZERO; case 1: return ReplicateTo.ONE; case 2: return ReplicateTo.TWO; case 3: return ReplicateTo.THREE; default: throw new DBException(REPLICATE_PROPERTY + " must be between 0 and 3"); } } /** * Parse the persist property into the correct enum. * * @param property the stringified property value. * @throws DBException if parsing the property did fail. * @return the correct enum. */ private PersistTo parsePersistTo(final String property) throws DBException { int value = Integer.parseInt(property); switch (value) { case 0: return PersistTo.ZERO; case 1: return PersistTo.ONE; case 2: return PersistTo.TWO; case 3: return PersistTo.THREE; case 4: return PersistTo.FOUR; default: throw new DBException(PERSIST_PROPERTY + " must be between 0 and 4"); } } /** * Shutdown the client. */ @Override public void cleanup() { client.shutdown(); } @Override public Status read(final String table, final String key, final Set fields, final HashMap result) { String formattedKey = formatKey(table, key); try { Object loaded = client.get(formattedKey); if (loaded == null) { return Status.ERROR; } decode(loaded, fields, result); return Status.OK; } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not read value for key " + formattedKey, e); } return Status.ERROR; } } /** * Scan is currently not implemented. * * @param table The name of the table * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return Status.ERROR, because not implemented yet. */ @Override public Status scan(final String table, final String startkey, final int recordcount, final Set fields, final Vector> result) { return Status.ERROR; } @Override public Status update(final String table, final String key, final HashMap values) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.replace( formattedKey, encode(values), persistTo, replicateTo ); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not update value for key " + formattedKey, e); } return Status.ERROR; } } @Override public Status insert(final String table, final String key, final HashMap values) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.add( formattedKey, encode(values), persistTo, replicateTo ); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not insert value for key " + formattedKey, e); } return Status.ERROR; } } @Override public Status delete(final String table, final String key) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.delete(formattedKey, persistTo, replicateTo); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not delete value for key " + formattedKey, e); } return Status.ERROR; } } /** * Prefix the key with the given prefix, to establish a unique namespace. * * @param prefix the prefix to use. * @param key the actual key. * @return the formatted and prefixed key. */ private String formatKey(final String prefix, final String key) { return prefix + ":" + key; } /** * Wrapper method that either inspects the future or not. * * @param future the future to potentially verify. * @return the status of the future result. */ private Status checkFutureStatus(final OperationFuture future) { if (checkFutures) { return future.getStatus().isSuccess() ? Status.OK : Status.ERROR; } else { return Status.OK; } } /** * Decode the object from server into the storable result. * * @param source the loaded object. * @param fields the fields to check. * @param dest the result passed back to the ycsb core. */ private void decode(final Object source, final Set fields, final HashMap dest) { if (useJson) { try { JsonNode json = JSON_MAPPER.readTree((String) source); - boolean checkFields = fields != null && fields.size() > 0; + boolean checkFields = fields != null && !fields.isEmpty(); for (Iterator> jsonFields = json.fields(); jsonFields.hasNext();) { Map.Entry jsonField = jsonFields.next(); String name = jsonField.getKey(); if (checkFields && fields.contains(name)) { continue; } JsonNode jsonValue = jsonField.getValue(); if (jsonValue != null && !jsonValue.isNull()) { dest.put(name, new StringByteIterator(jsonValue.asText())); } } } catch (Exception e) { throw new RuntimeException("Could not decode JSON"); } } else { HashMap converted = (HashMap) source; for (Map.Entry entry : converted.entrySet()) { dest.put(entry.getKey(), new StringByteIterator(entry.getValue())); } } } /** * Encode the object for couchbase storage. * * @param source the source value. * @return the storable object. */ private Object encode(final HashMap source) { HashMap stringMap = StringByteIterator.getStringMap(source); if (!useJson) { return stringMap; } ObjectNode node = JSON_MAPPER.createObjectNode(); for (Map.Entry pair : stringMap.entrySet()) { node.put(pair.getKey(), pair.getValue()); } JsonFactory jsonFactory = new JsonFactory(); Writer writer = new StringWriter(); try { JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer); JSON_MAPPER.writeTree(jsonGenerator, node); } catch (Exception e) { throw new RuntimeException("Could not encode JSON value"); } return writer.toString(); } } diff --git a/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java b/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java index 8a95d8fb..9ce0b93c 100644 --- a/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java +++ b/memcached/src/main/java/com/yahoo/ycsb/db/MemcachedClient.java @@ -1,294 +1,294 @@ /** * Copyright (c) 2014-2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import java.io.IOException; import java.io.StringWriter; import java.io.Writer; import java.net.InetSocketAddress; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; // We also use `net.spy.memcached.MemcachedClient`; it is not imported // explicitly and referred to with its full path to avoid conflicts with the // class of the same name in this file. import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ObjectNode; import org.apache.log4j.Logger; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * Concrete Memcached client implementation. */ public class MemcachedClient extends DB { private final Logger logger = Logger.getLogger(getClass()); protected static final ObjectMapper MAPPER = new ObjectMapper(); private boolean checkOperationStatus; private long shutdownTimeoutMillis; private int objectExpirationTime; public static final String HOSTS_PROPERTY = "memcached.hosts"; public static final int DEFAULT_PORT = 11211; private static final String TEMPORARY_FAILURE_MSG = "Temporary failure"; private static final String CANCELLED_MSG = "cancelled"; public static final String SHUTDOWN_TIMEOUT_MILLIS_PROPERTY = "memcached.shutdownTimeoutMillis"; public static final String DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = "30000"; public static final String OBJECT_EXPIRATION_TIME_PROPERTY = "memcached.objectExpirationTime"; public static final String DEFAULT_OBJECT_EXPIRATION_TIME = String.valueOf(Integer.MAX_VALUE); public static final String CHECK_OPERATION_STATUS_PROPERTY = "memcached.checkOperationStatus"; public static final String CHECK_OPERATION_STATUS_DEFAULT = "true"; public static final String READ_BUFFER_SIZE_PROPERTY = "memcached.readBufferSize"; public static final String DEFAULT_READ_BUFFER_SIZE = "3000000"; public static final String OP_TIMEOUT_PROPERTY = "memcached.opTimeoutMillis"; public static final String DEFAULT_OP_TIMEOUT = "60000"; public static final String FAILURE_MODE_PROPERTY = "memcached.failureMode"; public static final FailureMode FAILURE_MODE_PROPERTY_DEFAULT = FailureMode.Redistribute; /** * The MemcachedClient implementation that will be used to communicate * with the memcached server. */ private net.spy.memcached.MemcachedClient client; /** * @returns Underlying Memcached protocol client, implemented by * SpyMemcached. */ protected net.spy.memcached.MemcachedClient memcachedClient() { return client; } @Override public void init() throws DBException { try { client = createMemcachedClient(); checkOperationStatus = Boolean.parseBoolean( getProperties().getProperty(CHECK_OPERATION_STATUS_PROPERTY, CHECK_OPERATION_STATUS_DEFAULT)); objectExpirationTime = Integer.parseInt( getProperties().getProperty(OBJECT_EXPIRATION_TIME_PROPERTY, DEFAULT_OBJECT_EXPIRATION_TIME)); shutdownTimeoutMillis = Integer.parseInt( getProperties().getProperty(SHUTDOWN_TIMEOUT_MILLIS_PROPERTY, DEFAULT_SHUTDOWN_TIMEOUT_MILLIS)); } catch (Exception e) { throw new DBException(e); } } protected net.spy.memcached.MemcachedClient createMemcachedClient() throws Exception { ConnectionFactoryBuilder connectionFactoryBuilder = new ConnectionFactoryBuilder(); connectionFactoryBuilder.setReadBufferSize(Integer.parseInt( getProperties().getProperty(READ_BUFFER_SIZE_PROPERTY, DEFAULT_READ_BUFFER_SIZE))); connectionFactoryBuilder.setOpTimeout(Integer.parseInt( getProperties().getProperty(OP_TIMEOUT_PROPERTY, DEFAULT_OP_TIMEOUT))); String failureString = getProperties().getProperty(FAILURE_MODE_PROPERTY); connectionFactoryBuilder.setFailureMode( failureString == null ? FAILURE_MODE_PROPERTY_DEFAULT : FailureMode.valueOf(failureString)); // Note: this only works with IPv4 addresses due to its assumption of // ":" being the separator of hostname/IP and port; this is not the case // when dealing with IPv6 addresses. // // TODO(mbrukman): fix this. List addresses = new ArrayList(); String[] hosts = getProperties().getProperty(HOSTS_PROPERTY).split(","); for (String address : hosts) { int colon = address.indexOf(":"); int port = DEFAULT_PORT; String host = address; if (colon != -1) { port = Integer.parseInt(address.substring(colon + 1)); host = address.substring(0, colon); } addresses.add(new InetSocketAddress(host, port)); } return new net.spy.memcached.MemcachedClient( connectionFactoryBuilder.build(), addresses); } @Override public Status read( String table, String key, Set fields, HashMap result) { key = createQualifiedKey(table, key); try { GetFuture future = memcachedClient().asyncGet(key); Object document = future.get(); if (document != null) { fromJson((String) document, fields, result); } return Status.OK; } catch (Exception e) { logger.error("Error encountered for key: " + key, e); return Status.ERROR; } } @Override public Status scan( String table, String startkey, int recordcount, Set fields, Vector> result){ return Status.NOT_IMPLEMENTED; } @Override public Status update( String table, String key, HashMap values) { key = createQualifiedKey(table, key); try { OperationFuture future = memcachedClient().replace(key, objectExpirationTime, toJson(values)); return getReturnCode(future); } catch (Exception e) { logger.error("Error updating value with key: " + key, e); return Status.ERROR; } } @Override public Status insert( String table, String key, HashMap values) { key = createQualifiedKey(table, key); try { OperationFuture future = memcachedClient().add(key, objectExpirationTime, toJson(values)); return getReturnCode(future); } catch (Exception e) { logger.error("Error inserting value", e); return Status.ERROR; } } @Override public Status delete(String table, String key) { key = createQualifiedKey(table, key); try { OperationFuture future = memcachedClient().delete(key); return getReturnCode(future); } catch (Exception e) { logger.error("Error deleting value", e); return Status.ERROR; } } protected Status getReturnCode(OperationFuture future) { if (!checkOperationStatus) { return Status.OK; } if (future.getStatus().isSuccess()) { return Status.OK; } else if (TEMPORARY_FAILURE_MSG.equals(future.getStatus().getMessage())) { return new Status("TEMPORARY_FAILURE", TEMPORARY_FAILURE_MSG); } else if (CANCELLED_MSG.equals(future.getStatus().getMessage())) { return new Status("CANCELLED_MSG", CANCELLED_MSG); } return new Status("ERROR", future.getStatus().getMessage()); } @Override public void cleanup() throws DBException { if (client != null) { memcachedClient().shutdown(shutdownTimeoutMillis, MILLISECONDS); } } protected static String createQualifiedKey(String table, String key) { return MessageFormat.format("{0}-{1}", table, key); } protected static void fromJson( String value, Set fields, Map result) throws IOException { JsonNode json = MAPPER.readTree(value); - boolean checkFields = fields != null && fields.size() > 0; + boolean checkFields = fields != null && !fields.isEmpty(); for (Iterator> jsonFields = json.getFields(); jsonFields.hasNext(); /* increment in loop body */) { Map.Entry jsonField = jsonFields.next(); String name = jsonField.getKey(); if (checkFields && fields.contains(name)) { continue; } JsonNode jsonValue = jsonField.getValue(); if (jsonValue != null && !jsonValue.isNull()) { result.put(name, new StringByteIterator(jsonValue.asText())); } } } protected static String toJson(Map values) throws IOException { ObjectNode node = MAPPER.createObjectNode(); HashMap stringMap = StringByteIterator.getStringMap(values); for (Map.Entry pair : stringMap.entrySet()) { node.put(pair.getKey(), pair.getValue()); } JsonFactory jsonFactory = new JsonFactory(); Writer writer = new StringWriter(); JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(writer); MAPPER.writeTree(jsonGenerator, node); return writer.toString(); } }