Page MenuHomec4science

NativeAsyncMongoDbClient.java
No OneTemporary

File Metadata

Created
Wed, May 8, 18:37

NativeAsyncMongoDbClient.java

/*
* Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.mongodb.ConnectionString;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.async.client.*;
import com.mongodb.async.client.MongoClient;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.*;
import com.mongodb.client.result.DeleteResult;
import com.yahoo.ycsb.AsyncDB;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.bson.Document;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.gte;
import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
/**
* MongoDB asynchronous client for YCSB framework using the <a
* href="http://www.allanbank.com/mongodb-async-driver/">Asynchronous Java
* Driver</a>
* <p>
* See the <code>README.md</code> for configuration information.
* </p>
*
* @author rjm
* @see <a href="http://www.allanbank.com/mongodb-async-driver/">Asynchronous
* Java Driver</a>
*/
public class NativeAsyncMongoDbClient extends AsyncDB {
/**
* Used to include a field in a response.
*/
protected static final int INCLUDE = 1;
/**
* The database to use.
*/
private static String databaseName;
/**
* The write concern for the requests.
*/
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
/**
* The connection to MongoDB.
*/
private static MongoClient mongoClient;
/**
* The write concern for the requests.
*/
private static WriteConcern writeConcern;
/**
* Which servers to use for reads.
*/
private static ReadPreference readPreference;
/**
* The database to MongoDB.
*/
private MongoDatabase database;
/**
* The batch size to use for inserts.
*/
private static int batchSize;
/**
* If true then use updates with the upsert option for inserts.
*/
private static boolean useUpsert;
/**
* The bulk inserts pending for the thread.
*/
private final ArrayList<Document> batchedWrites = new ArrayList<>();
private final ArrayList<WriteModel<Document>> batchedUpserts = new ArrayList<>();
/**
* The number of writes in the batchedWrite.
*/
private int batchedWriteCount = 0;
private final InsertManyOptions insertManyOptions = new InsertManyOptions().ordered(false);
private final UpdateOptions upsertOptions = new UpdateOptions().upsert(true);
private final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions().ordered(false);
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public final void cleanup() throws DBException {
if (INIT_COUNT.decrementAndGet() == 0) {
try {
mongoClient.close();
} catch (final Exception e1) {
System.err.println("Could not close MongoDB connection pool: "
+ e1.toString());
e1.printStackTrace();
return;
} finally {
mongoClient = null;
database = null;
}
}
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public final CompletableFuture<Status> delete(final String table, final String key) {
CompletableFuture<Status> deleteResult = new CompletableFuture<>();
final MongoCollection collection = database.getCollection(table).withWriteConcern(writeConcern);
collection.deleteOne(eq(key), (r, ex) -> {
DeleteResult result = (DeleteResult) r;
if (ex != null) {
System.err.println(ex.toString());
deleteResult.complete(Status.ERROR);
} else if (!result.wasAcknowledged() && writeConcern.isAcknowledged()) {
System.err.println("Delete was not acknowledged for key " + key);
deleteResult.complete(Status.UNEXPECTED_STATE);
} else if (result.getDeletedCount() == 0) {
System.err.println("Nothing deleted for key " + key);
deleteResult.complete(Status.NOT_FOUND);
} else {
deleteResult.complete(Status.OK);
}
});
return deleteResult;
}
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public final void init() throws DBException {
final int count = INIT_COUNT.incrementAndGet();
synchronized (NativeAsyncMongoDbClient.class) {
final Properties props = getProperties();
if (mongoClient != null) {
database = mongoClient.getDatabase(databaseName);
// // If there are more threads (count) than connections then the
// // Low latency spin lock is not really needed as we will keep
// // the connections occupied.
// if (count > mongoClient.getConfig().getMaxConnectionCount()) {
// mongoClient.getConfig().setLockType(LockType.MUTEX);
// }
return;
}
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1"));
if (batchSize > 1) {
batchedWrites.ensureCapacity(batchSize);
}
// Set is inserts are done as upserts. Defaults to false.
useUpsert = Boolean.parseBoolean(
props.getProperty("mongodb.upsert", "false"));
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
// to configure the client.
String url =
props
.getProperty("mongodb.url", "mongodb://localhost:27017/ycsb?w=1");
if (!url.startsWith("mongodb://")) {
System.err.println("ERROR: Invalid URL: '" + url
+ "'. Must be of the form "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database?"
+ "options'. See "
+ "http://docs.mongodb.org/manual/reference/connection-string/.");
System.exit(1);
}
ConnectionString uri = new ConnectionString(url);
try {
databaseName = uri.getDatabase();
if ((databaseName == null) || databaseName.isEmpty()) {
// Default database is "ycsb" if database is not
// specified in URL
databaseName = "ycsb";
}
MongoClientSettings config = MongoClientSettings.builder().applyConnectionString(uri).build();
mongoClient = MongoClients.create(config);
// MongoClientConfiguration config = mongoClient.getConfig();
// if (!url.toLowerCase().contains("locktype=")) {
// config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
// }
readPreference = config.getReadPreference();
writeConcern = config.getWriteConcern();
database = mongoClient.getDatabase(databaseName);
System.out.println("mongo connection created with " + url);
} catch (final Exception e1) {
System.err
.println("Could not initialize MongoDB connection pool for Loader: "
+ e1.toString());
e1.printStackTrace();
return;
}
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See the {@link AsyncDB}
* class's description for a discussion of error codes.
*/
@Override
public final CompletableFuture<Status> insert(final String table, final String key,
final Map<String, ByteIterator> values) {
CompletableFuture<Status> insertResult = new CompletableFuture<>();
Consumer<Throwable> bulkWriteExceptionHandler = ex -> {
if (ex != null) {
if (ex instanceof MongoBulkWriteException) {
BulkWriteResult wResult = ((MongoBulkWriteException) ex).getWriteResult();
if (!wResult.wasAcknowledged() && writeConcern.isAcknowledged()) {
System.err.println("Bulk insert was not acknowledged.");
insertResult.complete(Status.UNEXPECTED_STATE);
} else {
System.err.println("Bulk insert of " + batchSize +
" records failed for " + (batchSize - wResult.getInsertedCount()) + " records.");
insertResult.complete(Status.ERROR);
}
} else {
System.err.println("Exception while trying to bulk-insert " + batchSize + " records.");
insertResult.complete(Status.ERROR);
}
} else {
insertResult.complete(Status.OK);
}
};
final MongoCollection<Document> collection = database.getCollection(table).withWriteConcern(writeConcern);
final Document toInsert = new Document("_id", key);
for (final Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toInsert.append(entry.getKey(), entry.getValue().toArray());
}
if (batchSize <= 1) {
if (useUpsert) {
collection.updateOne(eq(key), new Document("$setOnInsert", toInsert), upsertOptions, (r, ex) -> {
if (ex != null) {
System.err.println("Exception while trying to insert one record: " + ex);
insertResult.complete(Status.ERROR);
} else if (!r.wasAcknowledged() && writeConcern.isAcknowledged()) {
System.err.println("Upsert was not acknowledged for key " + key);
insertResult.complete(Status.UNEXPECTED_STATE);
} else if (r.getUpsertedId() == null || !r.getUpsertedId().asString().toString().equals(key)) {
insertResult.complete(Status.NOT_FOUND);
} else {
insertResult.complete(Status.OK);
}
});
} else {
collection.insertOne(toInsert, (r, ex) -> {
if (ex != null) {
System.err.println("Exception while trying to insert one record: " + ex);
insertResult.complete(Status.ERROR);
} else {
insertResult.complete(Status.OK);
}
});
}
} else {
if (useUpsert) {
batchedUpserts.add(new UpdateOneModel<>(eq(key), new Document("$setOnInsert", toInsert), upsertOptions));
} else {
batchedWrites.add(toInsert);
}
batchedWriteCount += 1;
if (batchedWriteCount < batchSize) {
insertResult.complete(Status.BATCHED_OK);
} else {
if (useUpsert) {
collection.bulkWrite(batchedUpserts, bulkWriteOptions, (r, ex) -> bulkWriteExceptionHandler.accept(ex));
batchedUpserts.clear();
} else {
collection.insertMany(batchedWrites, insertManyOptions, (r, ex) -> bulkWriteExceptionHandler.accept(ex));
batchedWrites.clear();
}
batchedWriteCount = 0;
}
}
return insertResult;
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
public final CompletableFuture<Status> read(final String table, final String key,
final Set<String> fields, final Map<String, ByteIterator> result) {
CompletableFuture<Status> readResult = new CompletableFuture<>();
final MongoCollection<Document> collection = database.getCollection(table).withReadPreference(readPreference);
FindIterable<Document> findQuery = collection.find(eq(key)).limit(1).batchSize(1);
if (fields != null) {
findQuery = findQuery.projection(fields(include(new ArrayList<>(fields))));
}
findQuery.first((r, ex) -> {
if (ex != null) {
System.err.println("Exception while trying to read key " + key + ": " + ex);
readResult.complete(Status.ERROR);
} else {
if (r == null) {
readResult.complete(Status.NOT_FOUND);
} else {
r.forEach((k, v) -> result.put(k, new StringByteIterator((String) v)));
readResult.complete(Status.OK);
}
}
});
return readResult;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error. See the {@link AsyncDB}
* class's description for a discussion of error codes.
*/
@Override
public final CompletableFuture<Status> scan(final String table, final String startkey,
final int recordcount, final Set<String> fields,
final Vector<HashMap<String, ByteIterator>> result) {
CompletableFuture<Status> scanResult = new CompletableFuture<>();
final MongoCollection<Document> collection = database.getCollection(table).withReadPreference(readPreference);
FindIterable<Document> scanQuery = collection.find(gte("_id", startkey))
.limit(recordcount).batchSize(recordcount).sort(Sorts.ascending("_id"));
if (fields != null) {
scanQuery = scanQuery.projection(fields(include(new ArrayList<>(fields))));
}
result.ensureCapacity(recordcount);
scanQuery.forEach((r -> {
HashMap<String, ByteIterator> map = new HashMap<>();
r.forEach((k, v) -> map.put(k, new StringByteIterator((String) v)));
result.add(map);
}), (r, ex) -> {
if (ex != null) {
System.err.println("Exception while trying to scan from key " + startkey
+ " " + recordcount + " records: " + ex);
scanResult.complete(Status.ERROR);
} else if (result.size() < recordcount) {
System.err.println("Scan returned fewer records than expected: " + result.size()
+ " instead of " + recordcount + ".");
scanResult.complete(Status.NOT_FOUND);
} else {
scanResult.complete(Status.OK);
}
});
return scanResult;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See the {@link DB}
* class's description for a discussion of error codes.
*/
@Override
public final CompletableFuture<Status> update(final String table, final String key,
final Map<String, ByteIterator> values) {
CompletableFuture<Status> updateResult = new CompletableFuture<>();
final MongoCollection<Document> collection = database.getCollection(table).withWriteConcern(writeConcern);
Document toUpdate = new Document();
for (final Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toUpdate.append(entry.getKey(), entry.getValue());
}
collection.updateOne(eq(key), new Document("$set", toUpdate), (r, ex) -> {
if (ex != null) {
System.err.println("Exception while trying to update one record: " + ex);
updateResult.complete(Status.ERROR);
} else if (!r.wasAcknowledged() && writeConcern.isAcknowledged()) {
System.err.println("Update was not acknowledged for key " + key);
updateResult.complete(Status.UNEXPECTED_STATE);
} else if (r.getMatchedCount() < 1) {
updateResult.complete(Status.NOT_FOUND);
} else {
updateResult.complete(Status.OK);
}
});
return updateResult;
}
}

Event Timeline