Page MenuHomec4science

LevelDBClient.java
No OneTemporary

File Metadata

Created
Wed, May 15, 18:47

LevelDBClient.java

/*
* Copyright (c) 2018 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.leveldb;
import com.yahoo.ycsb.AsyncDB;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import net.jcip.annotations.GuardedBy;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.CompressionType;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* LevelDB binding for <a href="https://github.com/google/leveldb">LevelDB</a>.
*
*/
public class LevelDBClient extends AsyncDB {
private static final String PROPERTY_LEVELDB_DIR = "leveldb.dir";
private static final String PROPERTY_SYNC_WRITES = "leveldb.sync";
private static final String PROPERTY_VERIFY_CHECKSUMS = "leveldb.verify_checksums";
private static final String PROPERTY_COMPRESSION = "leveldb.compression";
private static final String PROPERTY_WRITE_BATCH_SIZE = "leveldb.write_batch_size";
private static final Logger LOGGER = LoggerFactory.getLogger(LevelDBClient.class);
@GuardedBy("LevelDBClient.class") private static ReadOptions readOptions = null;
@GuardedBy("LevelDBClient.class") private static WriteOptions writeOptions = null;
@GuardedBy("LevelDBClient.class") private static org.iq80.leveldb.DB levelDb = null;
@GuardedBy("LevelDBClient.class") private static int references = 0;
@GuardedBy("LevelDBClient.class") private static int batchWriteSize;
private static ThreadLocal<Integer> batchWriteCount = ThreadLocal.withInitial(() -> 0);
private static ThreadLocal<WriteBatch> batchWrites;
@Override
public void init() throws DBException {
synchronized(LevelDBClient.class) {
if (levelDb == null) {
Path levelDbDir = Paths.get(getProperties().getProperty(PROPERTY_LEVELDB_DIR));
LOGGER.info("LevelDB data dir: " + levelDbDir);
CompressionType compressionType = CompressionType.valueOf(getProperties().getProperty(PROPERTY_COMPRESSION,
"NONE"));
Options dbOptions = new Options().createIfMissing(true).compressionType(compressionType);
try {
levelDb = JniDBFactory.factory.open(levelDbDir.toFile(), dbOptions);
} catch (final IOException e) {
throw new DBException(e);
}
boolean verifyChecksums = Boolean.parseBoolean(getProperties().getProperty(PROPERTY_VERIFY_CHECKSUMS,
"false"));
readOptions = new ReadOptions().fillCache(true).verifyChecksums(verifyChecksums);
boolean syncWrites = Boolean.parseBoolean(getProperties().getProperty(PROPERTY_SYNC_WRITES,
"false"));
writeOptions = new WriteOptions().sync(syncWrites);
batchWriteSize = Integer.parseInt(getProperties().getProperty(PROPERTY_WRITE_BATCH_SIZE, "1"));
batchWrites = ThreadLocal.withInitial(levelDb::createWriteBatch);
}
references++;
}
}
@Override
public void cleanup() throws DBException {
super.cleanup();
synchronized (LevelDBClient.class) {
try {
if (references == 1) {
levelDb.close();
}
} catch (final IOException e) {
throw new DBException(e);
} finally {
references--;
}
}
}
@Override
public CompletableFuture<Status> read(final String table, final String key, final Set<String> fields,
final Map<String, ByteIterator> result) {
final byte[] values;
try {
values = levelDb.get(bytes(key));
} catch (org.iq80.leveldb.DBException e) {
LOGGER.error("Exception thrown reading from DB: " + e);
return CompletableFuture.completedFuture(Status.ERROR);
}
if (values == null) {
LOGGER.warn("Record with key " + key + " not found.");
return CompletableFuture.completedFuture(Status.NOT_FOUND);
}
deserializeValues(values, fields, result);
return CompletableFuture.completedFuture(Status.OK);
}
@Override
public CompletableFuture<Status> scan(final String table, final String startkey, final int recordcount,
final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) {
try (final DBIterator iterator = levelDb.iterator(readOptions)) {
iterator.seek(bytes(startkey));
if (!iterator.hasNext()) {
return CompletableFuture.completedFuture(Status.NOT_FOUND);
}
int records = 0;
while (records < recordcount) {
if (!iterator.hasNext()) {
LOGGER.warn("Tried retrieving " + recordcount + " records, only found " + (records + 1) + ".");
return CompletableFuture.completedFuture(Status.NOT_FOUND);
}
Map.Entry<byte[], byte[]> record = iterator.next();
final HashMap<String, ByteIterator> values = new HashMap<>();
deserializeValues(record.getValue(), fields, values);
result.add(values);
records++;
}
return CompletableFuture.completedFuture(Status.OK);
} catch (org.iq80.leveldb.DBException | IOException e) {
LOGGER.error("Exception thrown scanning DB: " + e);
return CompletableFuture.completedFuture(Status.ERROR);
}
}
@Override
public CompletableFuture<Status> update(final String table, final String key,
final Map<String, ByteIterator> values) {
final Map<String, ByteIterator> newValues = new HashMap<>();
return read(table, key, null, newValues).thenApply(r -> {
if (!r.isOk()) {
return r;
}
newValues.putAll(values);
return insert(table, key, newValues).join();
});
}
@Override
public CompletableFuture<Status> insert(final String table, final String key,
final Map<String, ByteIterator> values) {
try {
if (batchWriteSize > 1) {
batchWriteCount.set(batchWriteCount.get() + 1);
batchWrites.get().put(bytes(key), serializeValues(values));
if (batchWriteCount.get() < batchWriteSize) {
return CompletableFuture.completedFuture(Status.BATCHED_OK);
} else {
levelDb.write(batchWrites.get(), writeOptions);
batchWriteCount.set(0);
batchWrites.set(levelDb.createWriteBatch());
return CompletableFuture.completedFuture(Status.OK);
}
} else {
levelDb.put(bytes(key), serializeValues(values));
return CompletableFuture.completedFuture(Status.OK);
}
} catch (org.iq80.leveldb.DBException | IOException e) {
LOGGER.error("Exception thrown writing to DB: " + e);
return CompletableFuture.completedFuture(Status.ERROR);
}
}
@Override
public CompletableFuture<Status> delete(final String table, final String key) {
try {
levelDb.delete(bytes(key), writeOptions);
return CompletableFuture.completedFuture(Status.OK);
} catch (org.iq80.leveldb.DBException e) {
LOGGER.error("Exception thrown deleting from DB: " + e);
return CompletableFuture.completedFuture(Status.ERROR);
}
}
private Map<String, ByteIterator> deserializeValues(final byte[] values, final Set<String> fields,
final Map<String, ByteIterator> result) {
final ByteBuffer buf = ByteBuffer.allocate(4);
int offset = 0;
while(offset < values.length) {
buf.put(values, offset, 4);
buf.flip();
final int keyLen = buf.getInt();
buf.clear();
offset += 4;
final String key = new String(values, offset, keyLen);
offset += keyLen;
buf.put(values, offset, 4);
buf.flip();
final int valueLen = buf.getInt();
buf.clear();
offset += 4;
if(fields == null || fields.contains(key)) {
result.put(key, new ByteArrayByteIterator(values, offset, valueLen));
}
offset += valueLen;
}
return result;
}
private byte[] serializeValues(final Map<String, ByteIterator> values) throws IOException {
try(final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final ByteBuffer buf = ByteBuffer.allocate(4);
for(final Map.Entry<String, ByteIterator> value : values.entrySet()) {
final byte[] keyBytes = bytes(value.getKey());
final byte[] valueBytes = value.getValue().toArray();
buf.putInt(keyBytes.length);
baos.write(buf.array());
baos.write(keyBytes);
buf.clear();
buf.putInt(valueBytes.length);
baos.write(buf.array());
baos.write(valueBytes);
buf.clear();
}
return baos.toByteArray();
}
}
}

Event Timeline