diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java index 96b869e2..25c75499 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -1,348 +1,349 @@ /** * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db.accumulo; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.Vector; import java.util.concurrent.TimeUnit; /** * Accumulo binding for YCSB. */ public class AccumuloClient extends DB { private ZooKeeperInstance inst; private Connector connector; private String table = ""; private BatchWriter bw = null; private Text colFam = new Text(""); private Scanner singleScanner = null; // A scanner for reads/deletes. private Scanner scanScanner = null; // A scanner for use by scan() static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { CleanUp.shutdownNow(); } }); } @Override public void init() throws DBException { colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); - inst = new ZooKeeperInstance( - getProperties().getProperty("accumulo.instanceName"), - getProperties().getProperty("accumulo.zooKeepers")); + inst = new ZooKeeperInstance(new ClientConfiguration() + .withInstance(getProperties().getProperty("accumulo.instanceName")) + .withZkHosts(getProperties().getProperty("accumulo.zooKeepers"))); try { String principal = getProperties().getProperty("accumulo.username"); AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); connector = inst.getConnector(principal, token); } catch (AccumuloException e) { throw new DBException(e); } catch (AccumuloSecurityException e) { throw new DBException(e); } if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) { System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " + "Please see YCSB issue #416 for work on adding a general solution to coordinated work."); } } @Override public void cleanup() throws DBException { try { if (bw != null) { bw.close(); } } catch (MutationsRejectedException e) { throw new DBException(e); } } /** * Commonly repeated functionality: Before doing any operation, make sure * we're working on the correct table. If not, open the correct one. * * @param t * The table to open. */ public void checkTable(String t) throws TableNotFoundException { if (!table.equals(t)) { getTable(t); } } /** * Called when the user specifies a table that isn't the same as the existing * table. Connect to it and if necessary, close our current connection. * * @param t * The table to open. */ public void getTable(String t) throws TableNotFoundException { if (bw != null) { // Close the existing writer if necessary. try { bw.close(); } catch (MutationsRejectedException e) { // Couldn't spit out the mutations we wanted. // Ignore this for now. System.err.println("MutationsRejectedException: " + e.getMessage()); } } BatchWriterConfig bwc = new BatchWriterConfig(); bwc.setMaxLatency( Long.parseLong(getProperties() .getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); bwc.setMaxMemory(Long.parseLong( getProperties().getProperty("accumulo.batchWriterSize", "100000"))); bwc.setMaxWriteThreads(Integer.parseInt( getProperties().getProperty("accumulo.batchWriterThreads", "1"))); bw = connector.createBatchWriter(t, bwc); // Create our scanners singleScanner = connector.createScanner(t, Authorizations.EMPTY); scanScanner = connector.createScanner(t, Authorizations.EMPTY); table = t; // Store the name of the table we have open. } /** * Gets a scanner from Accumulo over one row. * * @param row the row to scan * @param fields the set of columns to scan * @return an Accumulo {@link Scanner} bound to the given row and columns */ private Scanner getRow(Text row, Set fields) { singleScanner.clearColumns(); singleScanner.setRange(new Range(row)); if (fields != null) { for (String field : fields) { singleScanner.fetchColumn(colFam, new Text(field)); } } return singleScanner; } @Override public Status read(String t, String key, Set fields, HashMap result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { // Pick out the results we care about. for (Entry entry : getRow(new Text(key), null)) { Value v = entry.getValue(); byte[] buf = v.get(); result.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } } catch (Exception e) { System.err.println("Error trying to reading Accumulo table" + key + e); return Status.ERROR; } return Status.OK; } @Override public Status scan(String t, String startkey, int recordcount, Set fields, Vector> result) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } // There doesn't appear to be a way to create a range for a given // LENGTH. Just start and end keys. So we'll do this the hard way for // now: // Just make the end 'infinity' and only read as much as we need. scanScanner.clearColumns(); scanScanner.setRange(new Range(new Text(startkey), null)); // Batch size is how many key/values to try to get per call. Here, I'm // guessing that the number of keys in a row is equal to the number of // fields we're interested in. // We try to fetch one more so as to tell when we've run out of fields. // If no fields are provided, we assume one column/row. if (fields != null) { // And add each of them as fields we want. for (String field : fields) { scanScanner.fetchColumn(colFam, new Text(field)); } } String rowKey = ""; HashMap currentHM = null; int count = 0; // Begin the iteration. for (Entry entry : scanScanner) { // Check for a new row. if (!rowKey.equals(entry.getKey().getRow().toString())) { if (count++ == recordcount) { // Done reading the last row. break; } rowKey = entry.getKey().getRow().toString(); if (fields != null) { // Initial Capacity for all keys. currentHM = new HashMap(fields.size()); } else { // An empty result map. currentHM = new HashMap(); } result.add(currentHM); } // Now add the key to the hashmap. Value v = entry.getValue(); byte[] buf = v.get(); currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } return Status.OK; } @Override public Status update(String t, String key, HashMap values) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } Mutation mutInsert = new Mutation(new Text(key)); for (Map.Entry entry : values.entrySet()) { mutInsert.put(colFam, new Text(entry.getKey()), System.currentTimeMillis(), new Value(entry.getValue().toArray())); } try { bw.addMutation(mutInsert); } catch (MutationsRejectedException e) { System.err.println("Error performing update."); e.printStackTrace(); return Status.ERROR; } return Status.OK; } @Override public Status insert(String t, String key, HashMap values) { return update(t, key, values); } @Override public Status delete(String t, String key) { try { checkTable(t); } catch (TableNotFoundException e) { System.err.println("Error trying to connect to Accumulo table." + e); return Status.ERROR; } try { deleteRow(new Text(key)); } catch (MutationsRejectedException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } catch (RuntimeException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; } return Status.OK; } // These functions are adapted from RowOperations.java: private void deleteRow(Text row) throws MutationsRejectedException { deleteRow(getRow(row, null)); } /** * Deletes a row, given a Scanner of JUST that row. */ private void deleteRow(Scanner scanner) throws MutationsRejectedException { Mutation deleter = null; // iterate through the keys for (Entry entry : scanner) { // create a mutation for the row if (deleter == null) { deleter = new Mutation(entry.getKey().getRow()); } // the remove function adds the key with the delete flag set to true deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); } bw.addMutation(deleter); } } diff --git a/accumulo/src/test/resources/log4j.properties b/accumulo/src/test/resources/log4j.properties index e03d54a3..2d48dce5 100644 --- a/accumulo/src/test/resources/log4j.properties +++ b/accumulo/src/test/resources/log4j.properties @@ -1,29 +1,29 @@ # # Copyright (c) 2015 YCSB contributors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # # Root logger option log4j.rootLogger=INFO, stderr log4j.appender.stderr=org.apache.log4j.ConsoleAppender log4j.appender.stderr.target=System.err log4j.appender.stderr.layout=org.apache.log4j.PatternLayout log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n # Suppress messages from ZooKeeper -log4j.logger.com.yahoo.ycsb.db.accumulo=INFO +log4j.logger.com.yahoo.ycsb.db.accumulo=DEBUG log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.apache.accumulo=WARN diff --git a/pom.xml b/pom.xml index 2966efdc..b94e0c18 100644 --- a/pom.xml +++ b/pom.xml @@ -1,179 +1,179 @@ 4.0.0 com.yahoo.ycsb root 0.11.0-SNAPSHOT pom YCSB Root This is the top level project that builds, packages the core and all the DB bindings for YCSB infrastructure. scm:git:git://github.com/brianfrankcooper/YCSB.git master https://github.com/brianfrankcooper/YCSB checkstyle checkstyle 5.0 org.jdom jdom 1.1 com.google.collections google-collections 1.0 org.slf4j slf4j-api 1.6.4 2.5.5 2.10 1.7.1 0.94.27 0.98.14-hadoop2 1.0.2 - 1.6.0 + 1.7.2 1.2.9 1.0.3 3.0.0 1.0.0-incubating.M2 0.2.3 7.2.2.Final 0.9.0 2.1.1 3.0.3 2.0.1 2.1.8 2.0.0 1.10.20 0.81 UTF-8 0.8.0 0.9.5.6 1.4.10 2.3.1 1.6.5 2.0.5 3.1.2 5.4.0 core binding-parent accumulo aerospike asynchbase cassandra cassandra2 couchbase couchbase2 distribution dynamodb elasticsearch geode googlebigtable googledatastore hbase094 hbase098 hbase10 hypertable infinispan jdbc kudu memcached mongodb nosqldb orientdb rados redis riak s3 solr tarantool org.apache.maven.plugins maven-checkstyle-plugin 2.15 org.apache.maven.plugins maven-compiler-plugin 3.3 1.7 1.7 org.apache.maven.plugins maven-checkstyle-plugin validate validate check checkstyle.xml