Page MenuHomec4science

HBaseClient10.java
No OneTemporary

File Metadata

Created
Sat, Oct 19, 12:10

HBaseClient10.java

/**
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.measurements.Measurements;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
/**
* HBase 1.0 client for YCSB framework.
*
* A modified version of HBaseClient (which targets HBase v0.9) utilizing the
* HBase 1.0.0 API.
*
* This client also adds toggleable client-side buffering and configurable write
* durability.
*/
public class HBaseClient10 extends com.yahoo.ycsb.DB {
private Configuration config = HBaseConfiguration.create();
private boolean debug = false;
private String tableName = "";
private Connection connection = null;
// Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
private Table currentTable = null;
private BufferedMutator bufferedMutator = null;
private String columnFamily = "";
private byte[] columnFamilyBytes;
/**
* Durability to use for puts and deletes.
*/
private Durability durability = Durability.USE_DEFAULT;
/** Whether or not a page filter should be used to limit scan length. */
private boolean usePageFilter = true;
/**
* If true, buffer mutations on the client. This is the default behavior for
* HBaseClient. For measuring insert/update/delete latencies, client side
* buffering should be disabled.
*/
private boolean clientSideBuffering = false;
private long writeBufferSize = 1024 * 1024 * 12;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
if ("true"
.equals(getProperties().getProperty("clientbuffering", "false"))) {
this.clientSideBuffering = true;
}
if (getProperties().containsKey("writebuffersize")) {
writeBufferSize =
Long.parseLong(getProperties().getProperty("writebuffersize"));
}
if (getProperties().getProperty("durability") != null) {
this.durability =
Durability.valueOf(getProperties().getProperty("durability"));
}
try {
connection = ConnectionFactory.createConnection(config);
} catch (java.io.IOException e) {
throw new DBException(e);
}
if ((getProperties().getProperty("debug") != null)
&& (getProperties().getProperty("debug").compareTo("true") == 0)) {
debug = true;
}
if ("false"
.equals(getProperties().getProperty("hbase.usepagefilter", "true"))) {
usePageFilter = false;
}
columnFamily = getProperties().getProperty("columnfamily");
if (columnFamily == null) {
System.err.println("Error, must specify a columnfamily for HBase table");
throw new DBException("No columnfamily specified");
}
columnFamilyBytes = Bytes.toBytes(columnFamily);
// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
try {
final TableName tName = TableName.valueOf(table);
HTableDescriptor dsc =
connection.getTable(tName).getTableDescriptor();
} catch (IOException e) {
throw new DBException(e);
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public void cleanup() throws DBException {
// Get the measurements instance as this is the only client that should
// count clean up time like an update if client-side buffering is
// enabled.
Measurements measurements = Measurements.getMeasurements();
try {
long st = System.nanoTime();
if (bufferedMutator != null) {
bufferedMutator.close();
}
if (currentTable != null) {
currentTable.close();
}
long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000));
connection.close();
} catch (IOException e) {
throw new DBException(e);
}
}
public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table);
this.currentTable = this.connection.getTable(tName);
// suggestions from
// http://ryantwopointoh.blogspot.com/2009/01/
// performance-of-hbase-importing.html
if (clientSideBuffering) {
final BufferedMutatorParams p = new BufferedMutatorParams(tName);
p.writeBufferSize(writeBufferSize);
this.bufferedMutator = this.connection.getBufferedMutator(p);
}
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
Result r = null;
try {
if (debug) {
System.out
.println("Doing read from HBase columnfamily " + columnFamily);
System.out.println("Doing read for key: " + key);
}
Get g = new Get(Bytes.toBytes(key));
if (fields == null) {
g.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
g.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
r = currentTable.get(g);
} catch (IOException e) {
if (debug) {
System.err.println("Error doing get: " + e);
}
return Status.ERROR;
} catch (ConcurrentModificationException e) {
// do nothing for now...need to understand HBase concurrency model better
return Status.ERROR;
}
if (r.isEmpty()) {
return Status.NOT_FOUND;
}
while (r.advance()) {
final Cell c = r.current();
result.put(Bytes.toString(CellUtil.cloneQualifier(c)),
new ByteArrayByteIterator(CellUtil.cloneValue(c)));
if (debug) {
System.out.println(
"Result for field: " + Bytes.toString(CellUtil.cloneQualifier(c))
+ " is: " + Bytes.toString(CellUtil.cloneValue(c)));
}
}
return Status.OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
Scan s = new Scan(Bytes.toBytes(startkey));
// HBase has no record limit. Here, assume recordcount is small enough to
// bring back in one call.
// We get back recordcount records
s.setCaching(recordcount);
if (this.usePageFilter) {
s.setFilter(new PageFilter(recordcount));
}
// add specified fields or else all fields
if (fields == null) {
s.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
s.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
// get results
ResultScanner scanner = null;
try {
scanner = currentTable.getScanner(s);
int numResults = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
// get row key
String key = Bytes.toString(rr.getRow());
if (debug) {
System.out.println("Got scan result for key: " + key);
}
HashMap<String, ByteIterator> rowResult =
new HashMap<String, ByteIterator>();
while (rr.advance()) {
final Cell cell = rr.current();
rowResult.put(Bytes.toString(CellUtil.cloneQualifier(cell)),
new ByteArrayByteIterator(CellUtil.cloneValue(cell)));
}
// add rowResult to result vector
result.add(rowResult);
numResults++;
// PageFilter does not guarantee that the number of results is <=
// pageSize, so this
// break is required.
if (numResults >= recordcount) {// if hit recordcount, bail out
break;
}
} // done with row
} catch (IOException e) {
if (debug) {
System.out.println("Error in getting/parsing scan result: " + e);
}
return Status.ERROR;
} finally {
if (scanner != null) {
scanner.close();
}
}
return Status.OK;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
if (debug) {
System.out.println("Setting up put for key: " + key);
}
Put p = new Put(Bytes.toBytes(key));
p.setDurability(durability);
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
byte[] value = entry.getValue().toArray();
if (debug) {
System.out.println("Adding field/value " + entry.getKey() + "/"
+ Bytes.toStringBinary(value) + " to put request");
}
p.addColumn(columnFamilyBytes, Bytes.toBytes(entry.getKey()), value);
}
try {
if (clientSideBuffering) {
Preconditions.checkNotNull(bufferedMutator);
bufferedMutator.mutate(p);
} else {
currentTable.put(p);
}
} catch (IOException e) {
if (debug) {
System.err.println("Error doing put: " + e);
}
return Status.ERROR;
} catch (ConcurrentModificationException e) {
// do nothing for now...hope this is rare
return Status.ERROR;
}
return Status.OK;
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
return update(table, key, values);
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status delete(String table, String key) {
// if this is a "new" table, init HTable object. Else, use existing one
if (!tableName.equals(table)) {
currentTable = null;
try {
getHTable(table);
tableName = table;
} catch (IOException e) {
System.err.println("Error accessing HBase table: " + e);
return Status.ERROR;
}
}
if (debug) {
System.out.println("Doing delete for key: " + key);
}
final Delete d = new Delete(Bytes.toBytes(key));
d.setDurability(durability);
try {
if (clientSideBuffering) {
Preconditions.checkNotNull(bufferedMutator);
bufferedMutator.mutate(d);
} else {
currentTable.delete(d);
}
} catch (IOException e) {
if (debug) {
System.err.println("Error doing delete: " + e);
}
return Status.ERROR;
}
return Status.OK;
}
@VisibleForTesting
void setConfiguration(final Configuration newConfig) {
this.config = newConfig;
}
}
/*
* For customized vim control set autoindent set si set shiftwidth=4
*/

Event Timeline