Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F62804072
LevelDBClient.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 15, 18:47
Size
9 KB
Mime Type
text/html
Expires
Fri, May 17, 18:47 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17691280
Attached To
R7507 YCSB
LevelDBClient.java
View Options
/*
* Copyright (c) 2018 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package
com.yahoo.ycsb.db.leveldb
;
import
com.yahoo.ycsb.AsyncDB
;
import
com.yahoo.ycsb.ByteArrayByteIterator
;
import
com.yahoo.ycsb.ByteIterator
;
import
com.yahoo.ycsb.DBException
;
import
com.yahoo.ycsb.Status
;
import
net.jcip.annotations.GuardedBy
;
import
org.fusesource.leveldbjni.JniDBFactory
;
import
org.iq80.leveldb.CompressionType
;
import
org.iq80.leveldb.DBIterator
;
import
org.iq80.leveldb.Options
;
import
org.iq80.leveldb.ReadOptions
;
import
org.iq80.leveldb.WriteBatch
;
import
org.iq80.leveldb.WriteOptions
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.nio.file.Path
;
import
java.nio.file.Paths
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.Vector
;
import
java.util.concurrent.CompletableFuture
;
import static
org.fusesource.leveldbjni.JniDBFactory.bytes
;
/**
* LevelDB binding for <a href="https://github.com/google/leveldb">LevelDB</a>.
*
*/
public
class
LevelDBClient
extends
AsyncDB
{
private
static
final
String
PROPERTY_LEVELDB_DIR
=
"leveldb.dir"
;
private
static
final
String
PROPERTY_SYNC_WRITES
=
"leveldb.sync"
;
private
static
final
String
PROPERTY_VERIFY_CHECKSUMS
=
"leveldb.verify_checksums"
;
private
static
final
String
PROPERTY_COMPRESSION
=
"leveldb.compression"
;
private
static
final
String
PROPERTY_WRITE_BATCH_SIZE
=
"leveldb.write_batch_size"
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LevelDBClient
.
class
);
@GuardedBy
(
"LevelDBClient.class"
)
private
static
ReadOptions
readOptions
=
null
;
@GuardedBy
(
"LevelDBClient.class"
)
private
static
WriteOptions
writeOptions
=
null
;
@GuardedBy
(
"LevelDBClient.class"
)
private
static
org
.
iq80
.
leveldb
.
DB
levelDb
=
null
;
@GuardedBy
(
"LevelDBClient.class"
)
private
static
int
references
=
0
;
@GuardedBy
(
"LevelDBClient.class"
)
private
static
int
batchWriteSize
;
private
static
ThreadLocal
<
Integer
>
batchWriteCount
=
ThreadLocal
.
withInitial
(()
->
0
);
private
static
ThreadLocal
<
WriteBatch
>
batchWrites
;
@Override
public
void
init
()
throws
DBException
{
synchronized
(
LevelDBClient
.
class
)
{
if
(
levelDb
==
null
)
{
Path
levelDbDir
=
Paths
.
get
(
getProperties
().
getProperty
(
PROPERTY_LEVELDB_DIR
));
LOGGER
.
info
(
"LevelDB data dir: "
+
levelDbDir
);
CompressionType
compressionType
=
CompressionType
.
valueOf
(
getProperties
().
getProperty
(
PROPERTY_COMPRESSION
,
"NONE"
));
Options
dbOptions
=
new
Options
().
createIfMissing
(
true
).
compressionType
(
compressionType
);
try
{
levelDb
=
JniDBFactory
.
factory
.
open
(
levelDbDir
.
toFile
(),
dbOptions
);
}
catch
(
final
IOException
e
)
{
throw
new
DBException
(
e
);
}
boolean
verifyChecksums
=
Boolean
.
parseBoolean
(
getProperties
().
getProperty
(
PROPERTY_VERIFY_CHECKSUMS
,
"false"
));
readOptions
=
new
ReadOptions
().
fillCache
(
true
).
verifyChecksums
(
verifyChecksums
);
boolean
syncWrites
=
Boolean
.
parseBoolean
(
getProperties
().
getProperty
(
PROPERTY_SYNC_WRITES
,
"false"
));
writeOptions
=
new
WriteOptions
().
sync
(
syncWrites
);
batchWriteSize
=
Integer
.
parseInt
(
getProperties
().
getProperty
(
PROPERTY_WRITE_BATCH_SIZE
,
"1"
));
batchWrites
=
ThreadLocal
.
withInitial
(
levelDb:
:
createWriteBatch
);
}
references
++;
}
}
@Override
public
void
cleanup
()
throws
DBException
{
super
.
cleanup
();
synchronized
(
LevelDBClient
.
class
)
{
try
{
if
(
references
==
1
)
{
levelDb
.
close
();
}
}
catch
(
final
IOException
e
)
{
throw
new
DBException
(
e
);
}
finally
{
references
--;
}
}
}
@Override
public
CompletableFuture
<
Status
>
read
(
final
String
table
,
final
String
key
,
final
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
{
final
byte
[]
values
;
try
{
values
=
levelDb
.
get
(
bytes
(
key
));
}
catch
(
org
.
iq80
.
leveldb
.
DBException
e
)
{
LOGGER
.
error
(
"Exception thrown reading from DB: "
+
e
);
return
CompletableFuture
.
completedFuture
(
Status
.
ERROR
);
}
if
(
values
==
null
)
{
LOGGER
.
warn
(
"Record with key "
+
key
+
" not found."
);
return
CompletableFuture
.
completedFuture
(
Status
.
NOT_FOUND
);
}
deserializeValues
(
values
,
fields
,
result
);
return
CompletableFuture
.
completedFuture
(
Status
.
OK
);
}
@Override
public
CompletableFuture
<
Status
>
scan
(
final
String
table
,
final
String
startkey
,
final
int
recordcount
,
final
Set
<
String
>
fields
,
final
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
try
(
final
DBIterator
iterator
=
levelDb
.
iterator
(
readOptions
))
{
iterator
.
seek
(
bytes
(
startkey
));
if
(!
iterator
.
hasNext
())
{
return
CompletableFuture
.
completedFuture
(
Status
.
NOT_FOUND
);
}
int
records
=
0
;
while
(
records
<
recordcount
)
{
if
(!
iterator
.
hasNext
())
{
LOGGER
.
warn
(
"Tried retrieving "
+
recordcount
+
" records, only found "
+
(
records
+
1
)
+
"."
);
return
CompletableFuture
.
completedFuture
(
Status
.
NOT_FOUND
);
}
Map
.
Entry
<
byte
[],
byte
[]>
record
=
iterator
.
next
();
final
HashMap
<
String
,
ByteIterator
>
values
=
new
HashMap
<>();
deserializeValues
(
record
.
getValue
(),
fields
,
values
);
result
.
add
(
values
);
records
++;
}
return
CompletableFuture
.
completedFuture
(
Status
.
OK
);
}
catch
(
org
.
iq80
.
leveldb
.
DBException
|
IOException
e
)
{
LOGGER
.
error
(
"Exception thrown scanning DB: "
+
e
);
return
CompletableFuture
.
completedFuture
(
Status
.
ERROR
);
}
}
@Override
public
CompletableFuture
<
Status
>
update
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
final
Map
<
String
,
ByteIterator
>
newValues
=
new
HashMap
<>();
return
read
(
table
,
key
,
null
,
newValues
).
thenApply
(
r
->
{
if
(!
r
.
isOk
())
{
return
r
;
}
newValues
.
putAll
(
values
);
return
insert
(
table
,
key
,
newValues
).
join
();
});
}
@Override
public
CompletableFuture
<
Status
>
insert
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
try
{
if
(
batchWriteSize
>
1
)
{
batchWriteCount
.
set
(
batchWriteCount
.
get
()
+
1
);
batchWrites
.
get
().
put
(
bytes
(
key
),
serializeValues
(
values
));
if
(
batchWriteCount
.
get
()
<
batchWriteSize
)
{
return
CompletableFuture
.
completedFuture
(
Status
.
BATCHED_OK
);
}
else
{
levelDb
.
write
(
batchWrites
.
get
(),
writeOptions
);
batchWriteCount
.
set
(
0
);
batchWrites
.
set
(
levelDb
.
createWriteBatch
());
return
CompletableFuture
.
completedFuture
(
Status
.
OK
);
}
}
else
{
levelDb
.
put
(
bytes
(
key
),
serializeValues
(
values
));
return
CompletableFuture
.
completedFuture
(
Status
.
OK
);
}
}
catch
(
org
.
iq80
.
leveldb
.
DBException
|
IOException
e
)
{
LOGGER
.
error
(
"Exception thrown writing to DB: "
+
e
);
return
CompletableFuture
.
completedFuture
(
Status
.
ERROR
);
}
}
@Override
public
CompletableFuture
<
Status
>
delete
(
final
String
table
,
final
String
key
)
{
try
{
levelDb
.
delete
(
bytes
(
key
),
writeOptions
);
return
CompletableFuture
.
completedFuture
(
Status
.
OK
);
}
catch
(
org
.
iq80
.
leveldb
.
DBException
e
)
{
LOGGER
.
error
(
"Exception thrown deleting from DB: "
+
e
);
return
CompletableFuture
.
completedFuture
(
Status
.
ERROR
);
}
}
private
Map
<
String
,
ByteIterator
>
deserializeValues
(
final
byte
[]
values
,
final
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
{
final
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
4
);
int
offset
=
0
;
while
(
offset
<
values
.
length
)
{
buf
.
put
(
values
,
offset
,
4
);
buf
.
flip
();
final
int
keyLen
=
buf
.
getInt
();
buf
.
clear
();
offset
+=
4
;
final
String
key
=
new
String
(
values
,
offset
,
keyLen
);
offset
+=
keyLen
;
buf
.
put
(
values
,
offset
,
4
);
buf
.
flip
();
final
int
valueLen
=
buf
.
getInt
();
buf
.
clear
();
offset
+=
4
;
if
(
fields
==
null
||
fields
.
contains
(
key
))
{
result
.
put
(
key
,
new
ByteArrayByteIterator
(
values
,
offset
,
valueLen
));
}
offset
+=
valueLen
;
}
return
result
;
}
private
byte
[]
serializeValues
(
final
Map
<
String
,
ByteIterator
>
values
)
throws
IOException
{
try
(
final
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
())
{
final
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
4
);
for
(
final
Map
.
Entry
<
String
,
ByteIterator
>
value
:
values
.
entrySet
())
{
final
byte
[]
keyBytes
=
bytes
(
value
.
getKey
());
final
byte
[]
valueBytes
=
value
.
getValue
().
toArray
();
buf
.
putInt
(
keyBytes
.
length
);
baos
.
write
(
buf
.
array
());
baos
.
write
(
keyBytes
);
buf
.
clear
();
buf
.
putInt
(
valueBytes
.
length
);
baos
.
write
(
buf
.
array
());
baos
.
write
(
valueBytes
);
buf
.
clear
();
}
return
baos
.
toByteArray
();
}
}
}
Event Timeline
Log In to Comment