Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60100752
DBWrapper.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Apr 27, 12:18
Size
10 KB
Mime Type
text/x-java
Expires
Mon, Apr 29, 12:18 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17300784
Attached To
R7507 YCSB
DBWrapper.java
View Options
/**
* Copyright (c) 2010 Yahoo! Inc., 2016-2017 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package
com.yahoo.ycsb
;
import
com.yahoo.ycsb.measurements.Measurements
;
import
org.apache.htrace.core.TraceScope
;
import
org.apache.htrace.core.Tracer
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.Vector
;
import
java.util.concurrent.CompletableFuture
;
/**
* Wrapper around a "real" DB that measures latencies and counts return codes.
* Also reports latency separately between OK and failed operations.
* Waits for async calls to finish during cleanup() operation.
*/
public
class
DBWrapper
extends
AsyncDB
{
private
final
AsyncDB
db
;
private
final
Measurements
measurements
;
private
final
Tracer
tracer
;
private
boolean
reportLatencyForEachError
=
false
;
private
Set
<
String
>
latencyTrackedErrors
=
new
HashSet
<
String
>();
private
final
Set
<
CompletableFuture
<
Status
>>
queries
=
new
HashSet
<>();
private
static
final
String
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY
=
"reportlatencyforeacherror"
;
private
static
final
String
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT
=
"false"
;
private
static
final
String
LATENCY_TRACKED_ERRORS_PROPERTY
=
"latencytrackederrors"
;
private
final
String
scopeStringCleanup
;
private
final
String
scopeStringDelete
;
private
final
String
scopeStringInit
;
private
final
String
scopeStringInsert
;
private
final
String
scopeStringRead
;
private
final
String
scopeStringScan
;
private
final
String
scopeStringUpdate
;
public
DBWrapper
(
final
AsyncDB
db
,
final
Tracer
tracer
)
{
this
.
db
=
db
;
measurements
=
Measurements
.
getMeasurements
();
this
.
tracer
=
tracer
;
final
String
simple
=
db
.
getClass
().
getSimpleName
();
scopeStringCleanup
=
simple
+
"#cleanup"
;
scopeStringDelete
=
simple
+
"#delete"
;
scopeStringInit
=
simple
+
"#init"
;
scopeStringInsert
=
simple
+
"#insert"
;
scopeStringRead
=
simple
+
"#read"
;
scopeStringScan
=
simple
+
"#scan"
;
scopeStringUpdate
=
simple
+
"#update"
;
}
/**
* Set the properties for this DB.
*/
public
void
setProperties
(
Properties
p
)
{
db
.
setProperties
(
p
);
}
/**
* Get the set of properties for this DB.
*/
public
Properties
getProperties
()
{
return
db
.
getProperties
();
}
/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public
void
init
()
throws
DBException
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringInit
))
{
db
.
init
();
this
.
reportLatencyForEachError
=
Boolean
.
parseBoolean
(
getProperties
().
getProperty
(
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY
,
REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT
));
if
(!
reportLatencyForEachError
)
{
String
latencyTrackedErrorsProperty
=
getProperties
().
getProperty
(
LATENCY_TRACKED_ERRORS_PROPERTY
,
null
);
if
(
latencyTrackedErrorsProperty
!=
null
)
{
this
.
latencyTrackedErrors
=
new
HashSet
<
String
>(
Arrays
.
asList
(
latencyTrackedErrorsProperty
.
split
(
","
)));
}
}
System
.
err
.
println
(
"DBWrapper: report latency for each error is "
+
this
.
reportLatencyForEachError
+
" and specific error codes to track"
+
" for latency are: "
+
this
.
latencyTrackedErrors
.
toString
());
}
}
/**
* Cleanup any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public
void
cleanup
()
throws
DBException
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringCleanup
))
{
// Wait for asynchronous operations to finish.
System
.
err
.
println
(
"DBWrapper: Waiting for all queries to finish."
);
CompletableFuture
.
allOf
(
queries
.
toArray
(
new
CompletableFuture
[
0
])).
join
();
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
db
.
cleanup
();
long
en
=
System
.
nanoTime
();
measure
(
"CLEANUP"
,
Status
.
OK
,
ist
,
st
,
en
);
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return The result of the operation.
*/
public
CompletableFuture
<
Status
>
read
(
String
table
,
String
key
,
Set
<
String
>
fields
,
Map
<
String
,
ByteIterator
>
result
)
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringRead
))
{
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
CompletableFuture
<
Status
>
readFuture
=
db
.
read
(
table
,
key
,
fields
,
result
).
whenComplete
((
res
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"READ failed due to exception: "
+
ex
);
res
=
Status
.
ERROR
;
}
long
en
=
System
.
nanoTime
();
measure
(
"READ"
,
res
,
ist
,
st
,
en
);
measurements
.
reportStatus
(
"READ"
,
res
);
});
queries
.
add
(
readFuture
);
return
readFuture
;
}
}
/**
* Perform a range scan for a set of records in the database.
* Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return The result of the operation.
*/
public
CompletableFuture
<
Status
>
scan
(
String
table
,
String
startkey
,
int
recordcount
,
Set
<
String
>
fields
,
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringScan
))
{
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
CompletableFuture
<
Status
>
scanFuture
=
db
.
scan
(
table
,
startkey
,
recordcount
,
fields
,
result
)
.
whenComplete
((
res
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"SCAN failed due to exception: "
+
ex
);
res
=
Status
.
ERROR
;
}
long
en
=
System
.
nanoTime
();
measure
(
"SCAN"
,
res
,
ist
,
st
,
en
);
measurements
.
reportStatus
(
"SCAN"
,
res
);
});
queries
.
add
(
scanFuture
);
return
scanFuture
;
}
}
private
void
measure
(
String
op
,
Status
result
,
long
intendedStartTimeNanos
,
long
startTimeNanos
,
long
endTimeNanos
)
{
String
measurementName
=
op
;
if
(
result
==
null
||
!
result
.
isOk
())
{
if
(
this
.
reportLatencyForEachError
||
this
.
latencyTrackedErrors
.
contains
(
result
.
getName
()))
{
measurementName
=
op
+
"-"
+
result
.
getName
();
}
else
{
measurementName
=
op
+
"-FAILED"
;
}
}
measurements
.
measure
(
measurementName
,
(
int
)
((
endTimeNanos
-
startTimeNanos
)
/
1000
));
measurements
.
measureIntended
(
measurementName
,
(
int
)
((
endTimeNanos
-
intendedStartTimeNanos
)
/
1000
));
}
/**
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
* record with the specified record key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return The result of the operation.
*/
public
CompletableFuture
<
Status
>
update
(
String
table
,
String
key
,
Map
<
String
,
ByteIterator
>
values
)
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringUpdate
))
{
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
CompletableFuture
<
Status
>
updateFuture
=
db
.
update
(
table
,
key
,
values
).
whenComplete
((
res
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"UPDATE failed due to exception: "
+
ex
);
res
=
Status
.
ERROR
;
}
long
en
=
System
.
nanoTime
();
measure
(
"UPDATE"
,
res
,
ist
,
st
,
en
);
measurements
.
reportStatus
(
"UPDATE"
,
res
);
});
queries
.
add
(
updateFuture
);
return
updateFuture
;
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified
* record key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return The result of the operation.
*/
public
CompletableFuture
<
Status
>
insert
(
String
table
,
String
key
,
Map
<
String
,
ByteIterator
>
values
)
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringInsert
))
{
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
CompletableFuture
<
Status
>
insertFuture
=
db
.
insert
(
table
,
key
,
values
).
whenComplete
((
res
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"INSERT failed due to exception: "
+
ex
);
res
=
Status
.
ERROR
;
}
long
en
=
System
.
nanoTime
();
measure
(
"INSERT"
,
res
,
ist
,
st
,
en
);
measurements
.
reportStatus
(
"INSERT"
,
res
);
});
queries
.
add
(
insertFuture
);
return
insertFuture
;
}
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return The result of the operation.
*/
public
CompletableFuture
<
Status
>
delete
(
String
table
,
String
key
)
{
try
(
final
TraceScope
span
=
tracer
.
newScope
(
scopeStringDelete
))
{
long
ist
=
measurements
.
getIntendedStartTimeNs
();
long
st
=
System
.
nanoTime
();
CompletableFuture
<
Status
>
deleteFuture
=
db
.
delete
(
table
,
key
).
whenComplete
((
res
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"DELETE failed due to exception: "
+
ex
);
res
=
Status
.
ERROR
;
}
long
en
=
System
.
nanoTime
();
measure
(
"DELETE"
,
res
,
ist
,
st
,
en
);
measurements
.
reportStatus
(
"DELETE"
,
res
);
});
queries
.
add
(
deleteFuture
);
return
deleteFuture
;
}
}
}
Event Timeline
Log In to Comment