diff --git a/accumulo/README.md b/accumulo/README.md index e2555833..fd9b4e8d 100644 --- a/accumulo/README.md +++ b/accumulo/README.md @@ -1,101 +1,83 @@ ## Quick Start This section describes how to run YCSB on [Accumulo](https://accumulo.apache.org/). ### 1. Start Accumulo See the [Accumulo Documentation](https://accumulo.apache.org/1.7/accumulo_user_manual.html#_installation) for details on installing and running Accumulo. Before running the YCSB test you must create the Accumulo table. Again see the [Accumulo Documentation](https://accumulo.apache.org/1.7/accumulo_user_manual.html#_basic_administration) for details. The default table name is `ycsb`. ### 2. Set Up YCSB Git clone YCSB and compile: git clone http://github.com/brianfrankcooper/YCSB.git cd YCSB mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package ### 3. Load Data and Run Tests Load the data: ./bin/ycsb load accumulo -s -P workloads/workloada \ -p accumulo.zooKeepers=localhost \ -p accumulo.columnFamily=ycsb \ -p accumulo.instanceName=ycsb \ -p accumulo.username=user \ -p accumulo.password=supersecret \ > outputLoad.txt Run the workload test: ./bin/ycsb run accumulo -s -P workloads/workloada \ -p accumulo.zooKeepers=localhost \ -p accumulo.columnFamily=ycsb \ -p accumulo.instanceName=ycsb \ -p accumulo.username=user \ -p accumulo.password=supersecret \ > outputLoad.txt ## Accumulo Configuration Parameters - `accumulo.zooKeepers` - The Accumulo cluster's [zookeeper servers](https://accumulo.apache.org/1.7/accumulo_user_manual.html#_connecting). - Should contain a comma separated list of of hostname or hostname:port values. - No default value. - `accumulo.columnFamily` - The name of the column family to use to store the data within the table. - No default value. - `accumulo.instanceName` - Name of the Accumulo [instance](https://accumulo.apache.org/1.7/accumulo_user_manual.html#_connecting). - No default value. - `accumulo.username` - The username to use when connecting to Accumulo. - No default value. - `accumulo.password` - The password for the user connecting to Accumulo. - No default value. - -- `accumulo.PC_FLAG` - - Provides support for distributed clients using ZooKeeper to manage the Producers and Consumers. - - If not set then the YCSB client will perform all work locally. - - Allowed values are: - - `producer` - - `consumer` - - Not set - - Default value is not set. - -- `accumulo.PC_SERVER` - - The set of ZooKeeper servers to use for the prioducers and consumers to communicate. - - Should contain a comma separated list of of hostname or hostname:port values. - - No default value. - -- `accumulo.PC_ROOT_IN_ZK` - - The root node in the ZooKeepers for the producers and consumers to communicate. - - No default value. diff --git a/accumulo/pom.xml b/accumulo/pom.xml index 2c69a60d..9aab8fe9 100644 --- a/accumulo/pom.xml +++ b/accumulo/pom.xml @@ -1,91 +1,76 @@ 4.0.0 com.yahoo.ycsb binding-parent 0.7.0-SNAPSHOT ../binding-parent accumulo-binding Accumulo DB Binding + + + 2.2.0 + org.apache.accumulo accumulo-core ${accumulo.version} - - - org.apache.hadoop - hadoop-common - - - org.apache.thrift - thrift - - - org.apache.zookeeper - zookeeper - - - - - org.apache.zookeeper - zookeeper - 3.3.1 org.apache.hadoop - hadoop-core - 0.20.203.0 + hadoop-common + ${hadoop.version} com.yahoo.ycsb core ${project.version} provided org.apache.maven.plugins maven-checkstyle-plugin 2.15 true ../checkstyle.xml true true validate validate checkstyle diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java index 51dcd5e5..9f3448d8 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -1,452 +1,339 @@ /** - * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. + * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db.accumulo; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; import java.util.HashMap; -import java.util.HashSet; -import java.util.Hashtable; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.Set; -import java.util.TreeSet; import java.util.Vector; import java.util.concurrent.TimeUnit; /** * Accumulo binding for YCSB. */ public class AccumuloClient extends DB { private ZooKeeperInstance inst; private Connector connector; private String table = ""; private BatchWriter bw = null; private Text colFam = new Text(""); private Scanner singleScanner = null; // A scanner for reads/deletes. private Scanner scanScanner = null; // A scanner for use by scan() - private static final String PC_PRODUCER = "producer"; - private static final String PC_CONSUMER = "consumer"; - private String pcFlag = ""; - private ZKProducerConsumer.Queue q = null; - private static Hashtable hmKeyReads = null; - private static Hashtable hmKeyNumReads = null; - private Random r = null; - @Override public void init() throws DBException { colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); inst = new ZooKeeperInstance( getProperties().getProperty("accumulo.instanceName"), getProperties().getProperty("accumulo.zooKeepers")); try { String principal = getProperties().getProperty("accumulo.username"); AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); connector = inst.getConnector(principal, token); } catch (AccumuloException e) { throw new DBException(e); } catch (AccumuloSecurityException e) { throw new DBException(e); } - pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none"); - if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) { - System.out.println("*** YCSB Client is " + pcFlag); - String address = getProperties().getProperty("accumulo.PC_SERVER"); - String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); - System.out - .println("*** PC_INFO(server:" + address + ";root=" + root + ")"); - q = new ZKProducerConsumer.Queue(address, root); - r = new Random(); - } - - if (pcFlag.equals(PC_CONSUMER)) { - hmKeyReads = new Hashtable(); - hmKeyNumReads = new Hashtable(); - try { - keyNotification(null); - } catch (KeeperException e) { - throw new DBException(e); - } + if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) { + System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " + + "Please see YCSB issue #416 for work on adding a general solution to coordinated work."); } } @Override public void cleanup() throws DBException { try { if (bw != null) { bw.close(); } } catch (MutationsRejectedException e) { throw new DBException(e); } CleanUp.shutdownNow(); } /** * Commonly repeated functionality: Before doing any operation, make sure * we're working on the correct table. If not, open the correct one. * * @param t * The table to open. */ public void checkTable(String t) throws TableNotFoundException { if (!table.equals(t)) { getTable(t); } } /** * Called when the user specifies a table that isn't the same as the existing * table. Connect to it and if necessary, close our current connection. * * @param t * The table to open. */ public void getTable(String t) throws TableNotFoundException { if (bw != null) { // Close the existing writer if necessary. try { bw.close(); } catch (MutationsRejectedException e) { // Couldn't spit out the mutations we wanted. // Ignore this for now. System.err.println("MutationsRejectedException: " + e.getMessage()); } } BatchWriterConfig bwc = new BatchWriterConfig(); bwc.setMaxLatency( Long.parseLong(getProperties() .getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); bwc.setMaxMemory(Long.parseLong( getProperties().getProperty("accumulo.batchWriterSize", "100000"))); bwc.setMaxWriteThreads(Integer.parseInt( getProperties().getProperty("accumulo.batchWriterThreads", "1"))); bw = connector.createBatchWriter(t, bwc); // Create our scanners singleScanner = connector.createScanner(t, Authorizations.EMPTY); scanScanner = connector.createScanner(t, Authorizations.EMPTY); table = t; // Store the name of the table we have open. } /** * Gets a scanner from Accumulo over one row. - * - * @param row - * the row to scan - * @param fields - * the set of columns to scan + * + * @param row the row to scan + * @param fields the set of columns to scan * @return an Accumulo {@link Scanner} bound to the given row and columns */ private Scanner getRow(Text row, Set fields) { singleScanner.clearColumns(); singleScanner.setRange(new Range(row)); if (fields != null) { for (String field : fields) { singleScanner.fetchColumn(colFam, new Text(field)); } } return singleScanner; } @Override public Status read(String t, String key, Set fields, HashMap result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { // Pick out the results we care about. for (Entry entry : getRow(new Text(key), null)) { Value v = entry.getValue(); byte[] buf = v.get(); result.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } } catch (Exception e) { System.err.println("Error trying to reading Accumulo table" + key + e); return Status.ERROR; } return Status.OK; } @Override public Status scan(String t, String startkey, int recordcount, Set fields, Vector> result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } // There doesn't appear to be a way to create a range for a given // LENGTH. Just start and end keys. So we'll do this the hard way for // now: // Just make the end 'infinity' and only read as much as we need. scanScanner.clearColumns(); scanScanner.setRange(new Range(new Text(startkey), null)); // Batch size is how many key/values to try to get per call. Here, I'm // guessing that the number of keys in a row is equal to the number of - // fields - // we're interested in. + // fields we're interested in. + // We try to fetch one more so as to tell when we've run out of fields. + // If no fields are provided, we assume one column/row. if (fields != null) { // And add each of them as fields we want. for (String field : fields) { scanScanner.fetchColumn(colFam, new Text(field)); } - } // else - If no fields are provided, we assume one column/row. + } String rowKey = ""; HashMap currentHM = null; int count = 0; // Begin the iteration. for (Entry entry : scanScanner) { // Check for a new row. if (!rowKey.equals(entry.getKey().getRow().toString())) { if (count++ == recordcount) { // Done reading the last row. break; } rowKey = entry.getKey().getRow().toString(); if (fields != null) { // Initial Capacity for all keys. currentHM = new HashMap(fields.size()); } else { // An empty result map. currentHM = new HashMap(); } result.add(currentHM); } // Now add the key to the hashmap. Value v = entry.getValue(); byte[] buf = v.get(); currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } return Status.OK; } @Override public Status update(String t, String key, HashMap values) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } Mutation mutInsert = new Mutation(new Text(key)); for (Map.Entry entry : values.entrySet()) { mutInsert.put(colFam, new Text(entry.getKey()), System.currentTimeMillis(), new Value(entry.getValue().toArray())); } try { bw.addMutation(mutInsert); - // Distributed YCSB co-ordination: YCSB on a client produces the key - // to - // be stored in the shared queue in ZooKeeper. - if (pcFlag.equals(PC_PRODUCER)) { - if (r.nextFloat() < 0.01) { - keyNotification(key); - } - } } catch (MutationsRejectedException e) { System.err.println("Error performing update."); e.printStackTrace(); return Status.ERROR; - } catch (KeeperException e) { - System.err.println("Error notifying the Zookeeper Queue."); - e.printStackTrace(); - return Status.ERROR; } return Status.OK; } @Override public Status insert(String t, String key, HashMap values) { return update(t, key, values); } @Override public Status delete(String t, String key) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { deleteRow(new Text(key)); } catch (MutationsRejectedException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } catch (RuntimeException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } return Status.OK; } // These functions are adapted from RowOperations.java: private void deleteRow(Text row) throws MutationsRejectedException { deleteRow(getRow(row, null)); } /** * Deletes a row, given a Scanner of JUST that row. */ private void deleteRow(Scanner scanner) throws MutationsRejectedException { Mutation deleter = null; // iterate through the keys for (Entry entry : scanner) { // create a mutation for the row if (deleter == null) { deleter = new Mutation(entry.getKey().getRow()); } // the remove function adds the key with the delete flag set to true deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); } bw.addMutation(deleter); } - - private void keyNotification(String key) throws KeeperException { - - if (pcFlag.equals(PC_PRODUCER)) { - try { - q.produce(key); - } catch (InterruptedException e) { - // Reset the interrupted state. - Thread.currentThread().interrupt(); - } - } else { - // XXX: do something better to keep the loop going (while??) - for (int i = 0; i < 10000000; i++) { - try { - String strKey = q.consume(); - - if (!hmKeyReads.containsKey(strKey) - && !hmKeyNumReads.containsKey(strKey)) { - hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); - hmKeyNumReads.put(strKey, new Integer(1)); - } - - // YCSB Consumer will read the key that was fetched from the - // queue in ZooKeeper. - // (current way is kind of ugly but works, i think) - // TODO : Get table name from configuration or argument - String usertable = "usertable"; - HashSet fields = new HashSet(); - for (int j = 0; j < 9; j++) { - fields.add("field" + j); - } - HashMap result = - new HashMap(); - - read(usertable, strKey, fields, result); - // If the results are empty, the key is enqueued in - // Zookeeper - // and tried again, until the results are found. - if (result.isEmpty()) { - q.produce(strKey); - int count = ((Integer) hmKeyNumReads.get(strKey)).intValue(); - hmKeyNumReads.put(strKey, new Integer(count + 1)); - } else { - if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) { - long currTime = System.currentTimeMillis(); - long writeTime = ((Long) hmKeyReads.get(strKey)).longValue(); - System.out.println( - "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch=" - + currTime + ";TimeLag=" + (currTime - writeTime)); - } - } - } catch (InterruptedException e) { - // Reset the interrupted state. - Thread.currentThread().interrupt(); - } - } - } - - } - - public Status presplit(String t, String[] keys) - throws TableNotFoundException, AccumuloException, - AccumuloSecurityException { - TreeSet splits = new TreeSet(); - for (int i = 0; i < keys.length; i++) { - splits.add(new Text(keys[i])); - } - connector.tableOperations().addSplits(t, splits); - return Status.OK; - } - } diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java deleted file mode 100644 index 1b478f72..00000000 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import java.io.IOException; -import java.util.List; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -/** - * Implementing the PC (Producer/Consumer) Queue in ZooKeeper. - */ -public class ZKProducerConsumer implements Watcher { - - private static ZooKeeper zk = null; - private static Integer mutex; - - private String root; - - /** - * Constructor that takes the address of the ZK server. - * - * @param address - * The address of the ZK server. - */ - ZKProducerConsumer(String address) { - if (zk == null) { - try { - System.out.println("Starting ZK:"); - zk = new ZooKeeper(address, 3000, this); - mutex = new Integer(-1); - System.out.println("Finished starting ZK: " + zk); - } catch (IOException e) { - System.out.println(e.toString()); - zk = null; - } - } - // else mutex = new Integer(-1); - } - - public synchronized void process(WatchedEvent event) { - synchronized (mutex) { - // System.out.println("Process: " + event.getType()); - mutex.notify(); - } - } - - /** - * Returns the root. - * - * @return The root. - */ - protected String getRoot() { - return root; - } - - /** - * Sets the root. - * - * @param r - * The root value. - */ - protected void setRoot(String r) { - this.root = r; - } - - /** - * QueueElement a single queue element. No longer used. - * @deprecated No longer used. - */ - @Deprecated - public static class QueueElement { - private String key; - private long writeTime; - - QueueElement(String key, long writeTime) { - this.key = key; - this.writeTime = writeTime; - } - } - - /** - * Producer-Consumer queue. - */ - public static class Queue extends ZKProducerConsumer { - - /** - * Constructor of producer-consumer queue. - * - * @param address - * The Zookeeper server address. - * @param name - * The name of the root element for the queue. - */ - Queue(String address, String name) { - super(address); - this.setRoot(name); - // Create ZK node name - if (zk != null) { - try { - Stat s = zk.exists(getRoot(), false); - if (s == null) { - zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } catch (KeeperException e) { - System.out.println( - "Keeper exception when instantiating queue: " + e.toString()); - } catch (InterruptedException e) { - System.out.println("Interrupted exception"); - } - } - } - - /** - * Producer calls this method to insert the key in the queue. - * - * @param key - * The key to produce (add to the queue). - * @return True if the key was added. - * @throws KeeperException - * On a failure talking to zookeeper. - * @throws InterruptedException - * If the current thread is interrupted waiting for the zookeeper - * acknowledgement. - */ - // - boolean produce(String key) throws KeeperException, InterruptedException { - byte[] value; - value = key.getBytes(); - zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL); - - return true; - } - - /** - * Consumer calls this method to "wait" for the key to the available. - * - * @return The key to consumed (remove from the queue). - * @throws KeeperException - * On a failure talking to zookeeper. - * @throws InterruptedException - * If the current thread is interrupted waiting for the zookeeper - * acknowledgement. - */ - String consume() throws KeeperException, InterruptedException { - String retvalue = null; - Stat stat = null; - - // Get the first element available - while (true) { - synchronized (mutex) { - List list = zk.getChildren(getRoot(), true); - if (list.isEmpty()) { - System.out.println("Going to wait"); - mutex.wait(); - } else { - String path = getRoot() + "/" + list.get(0); - byte[] b = zk.getData(path, false, stat); - retvalue = new String(b); - zk.delete(path, -1); - - return retvalue; - - } - } - } - } - } -} diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java index 7c85b6ac..e38d200c 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java @@ -1,22 +1,22 @@ -/* - * Copyright (c) 2015, YCSB contributors. All rights reserved. +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ /** - * The YCSB binding for Apache Accumulo. + * YCSB binding for Apache Accumulo. */ package com.yahoo.ycsb.db.accumulo; diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java deleted file mode 100644 index fbd7cf85..00000000 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (c) 2015 YCSB contributors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -/** - * YCSB binding for Accumulo. - */ -package com.yahoo.ycsb.db; \ No newline at end of file