Page MenuHomec4science

DBWrapper.java
No OneTemporary

File Metadata

Created
Sat, Apr 27, 12:18

DBWrapper.java

/**
* Copyright (c) 2010 Yahoo! Inc., 2016-2017 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import com.yahoo.ycsb.measurements.Measurements;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
/**
* Wrapper around a "real" DB that measures latencies and counts return codes.
* Also reports latency separately between OK and failed operations.
* Waits for async calls to finish during cleanup() operation.
*/
public class DBWrapper extends AsyncDB {
private final AsyncDB db;
private final Measurements measurements;
private final Tracer tracer;
private boolean reportLatencyForEachError = false;
private Set<String> latencyTrackedErrors = new HashSet<String>();
private final Set<CompletableFuture<Status>> queries = new HashSet<>();
private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY = "reportlatencyforeacherror";
private static final String REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT = "false";
private static final String LATENCY_TRACKED_ERRORS_PROPERTY = "latencytrackederrors";
private final String scopeStringCleanup;
private final String scopeStringDelete;
private final String scopeStringInit;
private final String scopeStringInsert;
private final String scopeStringRead;
private final String scopeStringScan;
private final String scopeStringUpdate;
public DBWrapper(final AsyncDB db, final Tracer tracer) {
this.db = db;
measurements = Measurements.getMeasurements();
this.tracer = tracer;
final String simple = db.getClass().getSimpleName();
scopeStringCleanup = simple + "#cleanup";
scopeStringDelete = simple + "#delete";
scopeStringInit = simple + "#init";
scopeStringInsert = simple + "#insert";
scopeStringRead = simple + "#read";
scopeStringScan = simple + "#scan";
scopeStringUpdate = simple + "#update";
}
/**
* Set the properties for this DB.
*/
public void setProperties(Properties p) {
db.setProperties(p);
}
/**
* Get the set of properties for this DB.
*/
public Properties getProperties() {
return db.getProperties();
}
/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void init() throws DBException {
try (final TraceScope span = tracer.newScope(scopeStringInit)) {
db.init();
this.reportLatencyForEachError = Boolean.parseBoolean(getProperties().
getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY,
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT));
if (!reportLatencyForEachError) {
String latencyTrackedErrorsProperty = getProperties().getProperty(LATENCY_TRACKED_ERRORS_PROPERTY, null);
if (latencyTrackedErrorsProperty != null) {
this.latencyTrackedErrors = new HashSet<String>(Arrays.asList(
latencyTrackedErrorsProperty.split(",")));
}
}
System.err.println("DBWrapper: report latency for each error is " +
this.reportLatencyForEachError + " and specific error codes to track" +
" for latency are: " + this.latencyTrackedErrors.toString());
}
}
/**
* Cleanup any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void cleanup() throws DBException {
try (final TraceScope span = tracer.newScope(scopeStringCleanup)) {
// Wait for asynchronous operations to finish.
System.err.println("DBWrapper: Waiting for all queries to finish.");
CompletableFuture.allOf(queries.toArray(new CompletableFuture[0])).join();
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
db.cleanup();
long en = System.nanoTime();
measure("CLEANUP", Status.OK, ist, st, en);
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return The result of the operation.
*/
public CompletableFuture<Status> read(String table, String key, Set<String> fields,
Map<String, ByteIterator> result) {
try (final TraceScope span = tracer.newScope(scopeStringRead)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
CompletableFuture<Status> readFuture = db.read(table, key, fields, result).whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("READ failed due to exception: " + ex);
res = Status.ERROR;
}
long en = System.nanoTime();
measure("READ", res, ist, st, en);
measurements.reportStatus("READ", res);
});
queries.add(readFuture);
return readFuture;
}
}
/**
* Perform a range scan for a set of records in the database.
* Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return The result of the operation.
*/
public CompletableFuture<Status> scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try (final TraceScope span = tracer.newScope(scopeStringScan)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
CompletableFuture<Status> scanFuture = db.scan(table, startkey, recordcount, fields, result)
.whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("SCAN failed due to exception: " + ex);
res = Status.ERROR;
}
long en = System.nanoTime();
measure("SCAN", res, ist, st, en);
measurements.reportStatus("SCAN", res);
});
queries.add(scanFuture);
return scanFuture;
}
}
private void measure(String op, Status result, long intendedStartTimeNanos,
long startTimeNanos, long endTimeNanos) {
String measurementName = op;
if (result == null || !result.isOk()) {
if (this.reportLatencyForEachError ||
this.latencyTrackedErrors.contains(result.getName())) {
measurementName = op + "-" + result.getName();
} else {
measurementName = op + "-FAILED";
}
}
measurements.measure(measurementName,
(int) ((endTimeNanos - startTimeNanos) / 1000));
measurements.measureIntended(measurementName,
(int) ((endTimeNanos - intendedStartTimeNanos) / 1000));
}
/**
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
* record with the specified record key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return The result of the operation.
*/
public CompletableFuture<Status> update(String table, String key,
Map<String, ByteIterator> values) {
try (final TraceScope span = tracer.newScope(scopeStringUpdate)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
CompletableFuture<Status> updateFuture = db.update(table, key, values).whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("UPDATE failed due to exception: " + ex);
res = Status.ERROR;
}
long en = System.nanoTime();
measure("UPDATE", res, ist, st, en);
measurements.reportStatus("UPDATE", res);
});
queries.add(updateFuture);
return updateFuture;
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified
* record key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return The result of the operation.
*/
public CompletableFuture<Status> insert(String table, String key,
Map<String, ByteIterator> values) {
try (final TraceScope span = tracer.newScope(scopeStringInsert)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
CompletableFuture<Status> insertFuture = db.insert(table, key, values).whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("INSERT failed due to exception: " + ex);
res = Status.ERROR;
}
long en = System.nanoTime();
measure("INSERT", res, ist, st, en);
measurements.reportStatus("INSERT", res);
});
queries.add(insertFuture);
return insertFuture;
}
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return The result of the operation.
*/
public CompletableFuture<Status> delete(String table, String key) {
try (final TraceScope span = tracer.newScope(scopeStringDelete)) {
long ist = measurements.getIntendedStartTimeNs();
long st = System.nanoTime();
CompletableFuture<Status> deleteFuture = db.delete(table, key).whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("DELETE failed due to exception: " + ex);
res = Status.ERROR;
}
long en = System.nanoTime();
measure("DELETE", res, ist, st, en);
measurements.reportStatus("DELETE", res);
});
queries.add(deleteFuture);
return deleteFuture;
}
}
}

Event Timeline