diff --git a/mongodb/README.md b/mongodb/README.md index f6dd8f4c..7340df4c 100644 --- a/mongodb/README.md +++ b/mongodb/README.md @@ -1,128 +1,129 @@ ## Quick Start This section describes how to run YCSB on MongoDB. ### 1. Start MongoDB First, download MongoDB and start `mongod`. For example, to start MongoDB on x86-64 Linux box: wget http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-x.x.x.tgz tar xfvz mongodb-linux-x86_64-*.tgz mkdir /tmp/mongodb cd mongodb-linux-x86_64-* ./bin/mongod --dbpath /tmp/mongodb Replace x.x.x above with the latest stable release version for MongoDB. See http://docs.mongodb.org/manual/installation/ for installation steps for various operating systems. ### 2. Install Java and Maven Go to http://www.oracle.com/technetwork/java/javase/downloads/index.html and get the url to download the rpm into your server. For example: wget http://download.oracle.com/otn-pub/java/jdk/7u40-b43/jdk-7u40-linux-x64.rpm?AuthParam=11232426132 -o jdk-7u40-linux-x64.rpm rpm -Uvh jdk-7u40-linux-x64.rpm Or install via yum/apt-get sudo yum install java-devel Download MVN from http://maven.apache.org/download.cgi wget http://ftp.heanet.ie/mirrors/www.apache.org/dist/maven/maven-3/3.1.1/binaries/apache-maven-3.1.1-bin.tar.gz sudo tar xzf apache-maven-*-bin.tar.gz -C /usr/local cd /usr/local sudo ln -s apache-maven-* maven sudo vi /etc/profile.d/maven.sh Add the following to `maven.sh` export M2_HOME=/usr/local/maven export PATH=${M2_HOME}/bin:${PATH} Reload bash and test mvn bash mvn -version ### 3. Set Up YCSB Download the YCSB zip file and compile: git clone git://github.com/brianfrankcooper/YCSB.git cd YCSB mvn -pl com.yahoo.ycsb:core,com.yahoo.ycsb:mongodb-binding clean package ### 4. Run YCSB Now you are ready to run! First, use the asynchronous driver to load the data: ./bin/ycsb load mongodb-async -s -P workloads/workloada > outputLoad.txt Then, run the workload: ./bin/ycsb run mongodb-async -s -P workloads/workloada > outputRun.txt Similarly, to use the synchronous driver from MongoDB Inc. we load the data: ./bin/ycsb load mongodb -s -P workloads/workloada > outputLoad.txt Then, run the workload: ./bin/ycsb run mongodb -s -P workloads/workloada > outputRun.txt See the next section for the list of configuration parameters for MongoDB. ## MongoDB Configuration Parameters - `mongodb.url` - This should be a MongoDB URI or connection string. - See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options. - For the complete set of options for the asynchronous driver see: - http://www.allanbank.com/mongodb-async-driver/apidocs/index.html?com/allanbank/mongodb/MongoDbUri.html - For the complete set of options for the synchronous driver see: - http://api.mongodb.org/java/current/index.html?com/mongodb/MongoClientURI.html - Default value is `mongodb://localhost:27017/ycsb?w=1` + - Default value of database is `ycsb` - `mongodb.batchsize` - Useful for the insert workload as it will submit the inserts in batches inproving throughput. - Default value is `1`. - `mongodb.writeConcern` - **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.uri`. - Allowed values are : - `errors_ignored` - `unacknowledged` - `acknowledged` - `journaled` - `replica_acknowledged` - `majority` - Default value is `acknowledged`. - `mongodb.readPreference` - **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.uri`. - Allowed values are : - `primary` - `primary_preferred` - `secondary` - `secondary_preferred` - `nearest` - Default value is `primary`. - `mongodb.maxconnections` - **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.uri`. - Default value is `100`. - `mongodb.threadsAllowedToBlockForConnectionMultiplier` - **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.uri`. - Default value is `5`. For example: ./bin/ycsb load mongodb-async -s -P workloads/workloada -p mongodb.url=mongodb://localhost:27017/ycsb?w=0 To run with the synchronous driver from MongoDB Inc.: ./bin/ycsb load mongodb -s -P workloads/workloada -p mongodb.url=mongodb://localhost:27017/ycsb?w=0 diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java index c57ff518..238a824d 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java @@ -1,546 +1,546 @@ /** * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import static com.allanbank.mongodb.builder.QueryBuilder.where; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import com.allanbank.mongodb.Durability; import com.allanbank.mongodb.LockType; import com.allanbank.mongodb.MongoClient; import com.allanbank.mongodb.MongoClientConfiguration; import com.allanbank.mongodb.MongoCollection; import com.allanbank.mongodb.MongoDatabase; import com.allanbank.mongodb.MongoDbUri; import com.allanbank.mongodb.MongoFactory; import com.allanbank.mongodb.MongoIterator; import com.allanbank.mongodb.ReadPreference; import com.allanbank.mongodb.bson.Document; import com.allanbank.mongodb.bson.Element; import com.allanbank.mongodb.bson.ElementType; import com.allanbank.mongodb.bson.builder.BuilderFactory; import com.allanbank.mongodb.bson.builder.DocumentBuilder; import com.allanbank.mongodb.bson.element.BinaryElement; import com.allanbank.mongodb.builder.BatchedWrite; import com.allanbank.mongodb.builder.BatchedWriteMode; import com.allanbank.mongodb.builder.Find; import com.allanbank.mongodb.builder.Sort; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; /** * MongoDB asynchronous client for YCSB framework. * * Properties to set: * - * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb + * mongodb.url=mongodb://localhost:27017 * mongodb.writeConcern=normal * * @author rjm */ public class AsyncMongoDbClient extends DB { /** Used to include a field in a response. */ protected static final int INCLUDE = 1; /** The database to use. */ private static String databaseName; /** Thread local document builder. */ private static final ThreadLocal DOCUMENT_BUILDER = new ThreadLocal() { @Override protected DocumentBuilder initialValue() { return BuilderFactory.start(); } }; /** The write concern for the requests. */ private static final AtomicInteger initCount = new AtomicInteger(0); /** The connection to MongoDB. */ private static MongoClient mongoClient; /** The write concern for the requests. */ private static Durability writeConcern; /** Which servers to use for reads. */ private static ReadPreference readPreference; /** The database to MongoDB. */ private MongoDatabase database; /** The batch size to use for inserts. */ private static int batchSize; /** The bulk inserts pending for the thread. */ private final BatchedWrite.Builder batchedWrite = BatchedWrite.builder() .mode(BatchedWriteMode.REORDERED); /** The number of writes in the batchedWrite. */ private int batchedWriteCount = 0; /** * Cleanup any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ @Override public final void cleanup() throws DBException { if (initCount.decrementAndGet() == 0) { try { mongoClient.close(); } catch (final Exception e1) { System.err.println("Could not close MongoDB connection pool: " + e1.toString()); e1.printStackTrace(); return; } finally { mongoClient = null; database = null; } } } /** * Delete a record from the database. * * @param table * The name of the table * @param key * The record key of the record to delete. * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public final int delete(final String table, final String key) { try { final MongoCollection collection = database.getCollection(table); final Document q = BuilderFactory.start().add("_id", key).build(); final long res = collection.delete(q, writeConcern); if (res == 0) { System.err.println("Nothing deleted for key " + key); return 1; } return 0; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Initialize any state for this DB. Called once per DB instance; there is * one DB instance per client thread. */ @Override public final void init() throws DBException { final int count = initCount.incrementAndGet(); synchronized (AsyncMongoDbClient.class) { final Properties props = getProperties(); if (mongoClient != null) { database = mongoClient.getDatabase(databaseName); // If there are more threads (count) than connections then the // Low latency spin lock is not really needed as we will keep // the connections occupied. if (count > mongoClient.getConfig().getMaxConnectionCount()) { mongoClient.getConfig().setLockType(LockType.MUTEX); } return; } // Set insert batchsize, default 1 - to be YCSB-original equivalent batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1")); // Just use the standard connection format URL - // http://docs.mongodatabase.org/manual/reference/connection-string/ + // http://docs.mongodb.org/manual/reference/connection-string/ // to configure the client. String url = props.getProperty("mongodb.url", "mongodb://localhost:27017/ycsb?w=1"); if (!url.startsWith("mongodb://")) { System.err .println("ERROR: Invalid URL: '" + url + "'. Must be of the form " + "'mongodb://:,:/database?options'. " + "See http://docs.mongodb.org/manual/reference/connection-string/."); System.exit(1); } MongoDbUri uri = new MongoDbUri(url); try { databaseName = uri.getDatabase(); if ((databaseName == null) || databaseName.isEmpty()) { System.err .println("ERROR: Invalid URL: '" + url + "'. Must provide a database name with the URI. " + "'mongodb://:,:/database"); System.exit(1); } mongoClient = MongoFactory.createClient(uri); MongoClientConfiguration config = mongoClient.getConfig(); if (!url.toLowerCase().contains("locktype=")) { config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed... } readPreference = config.getDefaultReadPreference(); writeConcern = config.getDefaultDurability(); database = mongoClient.getDatabase(databaseName); System.out.println("mongo connection created with " + url); } catch (final Exception e1) { System.err .println("Could not initialize MongoDB connection pool for Loader: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key. * * @param table * The name of the table * @param key * The record key of the record to insert. * @param values * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public final int insert(final String table, final String key, final HashMap values) { try { final MongoCollection collection = database.getCollection(table); final DocumentBuilder toInsert = DOCUMENT_BUILDER.get().reset() .add("_id", key); final Document query = toInsert.build(); for (final Map.Entry entry : values .entrySet()) { toInsert.add(entry.getKey(), entry.getValue().toArray()); } // Do an upsert. if (batchSize <= 1) { long result = collection.update(query, toInsert, /* multi= */false, /* upsert= */true, writeConcern); return result == 1 ? 0 : 1; } // Use a bulk insert. try { batchedWrite.insert(toInsert); batchedWriteCount += 1; if (batchedWriteCount < batchSize) { return 0; } long count = collection.write(batchedWrite); if (count == batchedWriteCount) { batchedWrite.reset().mode(BatchedWriteMode.REORDERED); batchedWriteCount = 0; return 0; } System.err .println("Number of inserted documents doesn't match the number sent, " + count + " inserted, sent " + batchedWriteCount); batchedWrite.reset().mode(BatchedWriteMode.REORDERED); batchedWriteCount = 0; return 1; } catch (Exception e) { System.err.println("Exception while trying bulk insert with " + batchedWriteCount); e.printStackTrace(); return 1; } } catch (final Exception e) { e.printStackTrace(); return 1; } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table * The name of the table * @param key * The record key of the record to read. * @param fields * The list of fields to read, or null for all of them * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public final int read(final String table, final String key, final Set fields, final HashMap result) { try { final MongoCollection collection = database.getCollection(table); final DocumentBuilder query = DOCUMENT_BUILDER.get().reset() .add("_id", key); Document queryResult = null; if (fields != null) { final DocumentBuilder fieldsToReturn = BuilderFactory.start(); final Iterator iter = fields.iterator(); while (iter.hasNext()) { fieldsToReturn.add(iter.next(), 1); } final Find.Builder fb = new Find.Builder(query); fb.projection(fieldsToReturn); fb.setLimit(1); fb.setBatchSize(1); fb.readPreference(readPreference); final MongoIterator ci = collection.find(fb.build()); if (ci.hasNext()) { queryResult = ci.next(); ci.close(); } } else { queryResult = collection.findOne(query); } if (queryResult != null) { fillMap(result, queryResult); } return queryResult != null ? 0 : 1; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Perform a range scan for a set of records in the database. Each * field/value pair from the result will be stored in a HashMap. * * @param table * The name of the table * @param startkey * The record key of the first record to read. * @param recordcount * The number of records to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one record * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public final int scan(final String table, final String startkey, final int recordcount, final Set fields, final Vector> result) { try { final MongoCollection collection = database.getCollection(table); final Find.Builder find = Find.builder() .query(where("_id").greaterThanOrEqualTo(startkey)) .limit(recordcount).batchSize(recordcount) .sort(Sort.asc("_id")).readPreference(readPreference); if (fields != null) { final DocumentBuilder fieldsDoc = BuilderFactory.start(); for (final String field : fields) { fieldsDoc.add(field, INCLUDE); } find.projection(fieldsDoc); } result.ensureCapacity(recordcount); final MongoIterator cursor = collection.find(find); if (!cursor.hasNext()) { System.err.println("Nothing found in scan for key " + startkey); return 1; } while (cursor.hasNext()) { // toMap() returns a Map but result.add() expects a // Map. Hence, the suppress warnings. final Document doc = cursor.next(); final HashMap docAsMap = new HashMap(); fillMap(docAsMap, doc); result.add(docAsMap); } return 0; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Update a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key, overwriting any existing values with the same field name. * * @param table * The name of the table * @param key * The record key of the record to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public final int update(final String table, final String key, final HashMap values) { try { final MongoCollection collection = database.getCollection(table); final DocumentBuilder query = BuilderFactory.start() .add("_id", key); final DocumentBuilder update = BuilderFactory.start(); final DocumentBuilder fieldsToSet = update.push("$set"); for (final Map.Entry entry : values .entrySet()) { fieldsToSet.add(entry.getKey(), entry.getValue().toArray()); } final long res = collection.update(query, update, false, false, writeConcern); return res == 1 ? 0 : 1; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Fills the map with the ByteIterators from the document. * * @param result * The map to fill. * @param queryResult * The document to fill from. */ protected final void fillMap(final HashMap result, final Document queryResult) { for (final Element be : queryResult) { if (be.getType() == ElementType.BINARY) { result.put(be.getName(), new BinaryByteArrayIterator( (BinaryElement) be)); } } } /** * BinaryByteArrayIterator provides an adapter from a {@link BinaryElement} * to a {@link ByteIterator}. */ private final static class BinaryByteArrayIterator extends ByteIterator { /** The binary data. */ private final BinaryElement binaryElement; /** The current offset into the binary element. */ private int offset; /** * Creates a new BinaryByteArrayIterator. * * @param element * The {@link BinaryElement} to iterate over. */ public BinaryByteArrayIterator(final BinaryElement element) { this.binaryElement = element; this.offset = 0; } /** * {@inheritDoc} *

* Overridden to return the number of bytes remaining in the iterator. *

*/ @Override public long bytesLeft() { return Math.max(0, binaryElement.length() - offset); } /** * {@inheritDoc} *

* Overridden to return true if there is more data in the * {@link BinaryElement}. *

*/ @Override public boolean hasNext() { return (offset < binaryElement.length()); } /** * {@inheritDoc} *

* Overridden to return the next value and advance the iterator. *

*/ @Override public byte nextByte() { final byte value = binaryElement.get(offset); offset += 1; return value; } } } diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index a2e50b32..343a2e5e 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -1,480 +1,471 @@ /** * MongoDB client binding for YCSB. * * Submitted by Yen Pai on 5/11/2010. * * https://gist.github.com/000a66b8db2caf42467b#file_mongo_database.java * */ package com.yahoo.ycsb.db; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import org.bson.Document; import org.bson.types.Binary; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.InsertOneModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; /** * MongoDB client for YCSB framework. * * Properties to set: * - * mongodatabase.url=mongodb://localhost:27017 mongodatabase.database=ycsb - * mongodatabase.writeConcern=acknowledged + * mongodb.url=mongodb://localhost:27017 + * mongodb.writeConcern=acknowledged * * @author ypai */ public class MongoDbClient extends DB { /** Update options to do an upsert. */ private static final UpdateOptions UPSERT = new UpdateOptions() .upsert(true); /** Used to include a field in a response. */ protected static final Integer INCLUDE = Integer.valueOf(1); /** The database name to access. */ private static String databaseName; /** The database name to access. */ private static MongoDatabase database; /** * Count the number of times initialized to teardown on the last * {@link #cleanup()}. */ private static final AtomicInteger initCount = new AtomicInteger(0); /** A singleton Mongo instance. */ private static MongoClient mongoClient; /** The default read preference for the test */ private static ReadPreference readPreference; /** The default write concern for the test. */ private static WriteConcern writeConcern; /** The batch size to use for inserts. */ private static int batchSize; /** The bulk inserts pending for the thread. */ private final List> bulkInserts = new ArrayList>(); /** * Cleanup any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ @Override public void cleanup() throws DBException { if (initCount.decrementAndGet() == 0) { try { mongoClient.close(); } catch (Exception e1) { System.err.println("Could not close MongoDB connection pool: " + e1.toString()); e1.printStackTrace(); return; } finally { database = null; mongoClient = null; } } } /** * Delete a record from the database. * * @param table * The name of the table * @param key * The record key of the record to delete. * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int delete(String table, String key) { try { MongoCollection collection = database .getCollection(table); Document query = new Document("_id", key); DeleteResult result = collection.withWriteConcern(writeConcern) .deleteOne(query); if (result.wasAcknowledged() && result.getDeletedCount() == 0) { System.err.println("Nothing deleted for key " + key); return 1; } return 0; } catch (Exception e) { System.err.println(e.toString()); return 1; } } /** * Initialize any state for this DB. Called once per DB instance; there is * one DB instance per client thread. */ @Override public void init() throws DBException { initCount.incrementAndGet(); synchronized (INCLUDE) { if (mongoClient != null) { return; } Properties props = getProperties(); // Set insert batchsize, default 1 - to be YCSB-original equivalent batchSize = Integer.parseInt(props.getProperty("batchsize", "1")); // Just use the standard connection format URL - // http://docs.mongodatabase.org/manual/reference/connection-string/ + // http://docs.mongodb.org/manual/reference/connection-string/ // to configure the client. - // - // Support legacy options by updating the URL as appropriate. - String url = props.getProperty("mongodatabase.url", null); + String url = props.getProperty("mongodb.url", null); boolean defaultedUrl = false; if (url == null) { defaultedUrl = true; url = "mongodb://localhost:27017/ycsb?w=1"; } url = OptionsSupport.updateUrl(url, props); if (!url.startsWith("mongodb://")) { System.err .println("ERROR: Invalid URL: '" + url + "'. Must be of the form " + "'mongodb://:,:/database?options'. " - + "See http://docs.mongodatabase.org/manual/reference/connection-string/."); + + "http://docs.mongodb.org/manual/reference/connection-string/"); System.exit(1); } try { MongoClientURI uri = new MongoClientURI(url); String uriDb = uri.getDatabase(); if (!defaultedUrl && (uriDb != null) && !uriDb.isEmpty() && !"admin".equals(uriDb)) { databaseName = uriDb; } else { - databaseName = props.getProperty("mongodatabase.database", - "ycsb"); - } + //If no database is specified in URI, use "ycsb" + databaseName = "ycsb"; - if ((databaseName == null) || databaseName.isEmpty()) { - System.err - .println("ERROR: Invalid URL: '" - + url - + "'. Must provide a database name with the URI. " - + "'mongodb://:,:/database"); - System.exit(1); } + readPreference = uri.getOptions().getReadPreference(); writeConcern = uri.getOptions().getWriteConcern(); mongoClient = new MongoClient(uri); database = mongoClient.getDatabase(databaseName); System.out.println("mongo client connection created with " + url); } catch (Exception e1) { System.err .println("Could not initialize MongoDB connection pool for Loader: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key. * * @param table * The name of the table * @param key * The record key of the record to insert. * @param values * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int insert(String table, String key, HashMap values) { try { MongoCollection collection = database .getCollection(table); Document criteria = new Document("_id", key); Document toInsert = new Document("_id", key); for (Map.Entry entry : values.entrySet()) { toInsert.put(entry.getKey(), entry.getValue().toArray()); } // Do a single upsert. if (batchSize <= 1) { UpdateResult result = collection.withWriteConcern(writeConcern) .replaceOne(criteria, toInsert, UPSERT); if (!result.wasAcknowledged() || result.getMatchedCount() > 0 || (result.isModifiedCountAvailable() && (result .getModifiedCount() > 0)) || result.getUpsertedId() != null) { return 0; } System.err.println("Nothing inserted for key " + key); return 1; } // Use a bulk insert. try { bulkInserts.add(new InsertOneModel(toInsert)); if (bulkInserts.size() < batchSize) { return 0; } BulkWriteResult result = collection.withWriteConcern( writeConcern).bulkWrite(bulkInserts, new BulkWriteOptions().ordered(false)); if (!result.wasAcknowledged() || result.getInsertedCount() == bulkInserts.size()) { bulkInserts.clear(); return 0; } System.err .println("Number of inserted documents doesn't match the number sent, " + result.getInsertedCount() + " inserted, sent " + bulkInserts.size()); bulkInserts.clear(); return 1; } catch (Exception e) { System.err.println("Exception while trying bulk insert with " + bulkInserts.size()); e.printStackTrace(); return 1; } } catch (Exception e) { e.printStackTrace(); return 1; } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table * The name of the table * @param key * The record key of the record to read. * @param fields * The list of fields to read, or null for all of them * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public int read(String table, String key, Set fields, HashMap result) { try { MongoCollection collection = database .getCollection(table); Document query = new Document("_id", key); Document fieldsToReturn = new Document(); Document queryResult = null; if (fields != null) { Iterator iter = fields.iterator(); while (iter.hasNext()) { fieldsToReturn.put(iter.next(), INCLUDE); } queryResult = collection.withReadPreference(readPreference) .find(query).projection(fieldsToReturn).first(); } else { queryResult = collection.withReadPreference(readPreference) .find(query).first(); } if (queryResult != null) { fillMap(result, queryResult); } return queryResult != null ? 0 : 1; } catch (Exception e) { System.err.println(e.toString()); return 1; } } /** * Perform a range scan for a set of records in the database. Each * field/value pair from the result will be stored in a HashMap. * * @param table * The name of the table * @param startkey * The record key of the first record to read. * @param recordcount * The number of records to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one record * @return Zero on success, a non-zero error code on error. See the * {@link DB} class's description for a discussion of error codes. */ @Override public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { FindIterable cursor = null; MongoCursor iter = null; try { MongoCollection collection = database .getCollection(table); Document scanRange = new Document("$gte", startkey); Document query = new Document("_id", scanRange); Document sort = new Document("_id", INCLUDE); Document projection = null; if (fields != null) { projection = new Document(); for (String fieldName : fields) { projection.put(fieldName, INCLUDE); } } cursor = collection.withReadPreference(readPreference).find(query) .projection(projection).sort(sort).limit(recordcount); // Do the query. iter = cursor.iterator(); if (!iter.hasNext()) { System.err.println("Nothing found in scan for key " + startkey); return 1; } while (iter.hasNext()) { HashMap resultMap = new HashMap(); Document obj = iter.next(); fillMap(resultMap, obj); result.add(resultMap); } return 0; } catch (Exception e) { System.err.println(e.toString()); return 1; } finally { if (iter != null) { iter.close(); } } } /** * Update a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key, overwriting any existing values with the same field name. * * @param table * The name of the table * @param key * The record key of the record to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int update(String table, String key, HashMap values) { try { MongoCollection collection = database .getCollection(table); Document query = new Document("_id", key); Document fieldsToSet = new Document(); for (Map.Entry entry : values.entrySet()) { fieldsToSet.put(entry.getKey(), entry.getValue().toArray()); } Document update = new Document("$set", fieldsToSet); UpdateResult result = collection.withWriteConcern(writeConcern) .updateOne(query, update); if (result.wasAcknowledged() && result.getMatchedCount() == 0) { System.err.println("Nothing updated for key " + key); return 1; } return 0; } catch (Exception e) { System.err.println(e.toString()); return 1; } } /** * Fills the map with the values from the DBObject. * * @param resultMap * The map to fill/ * @param obj * The object to copy values from. */ protected void fillMap(HashMap resultMap, Document obj) { for (Map.Entry entry : obj.entrySet()) { if (entry.getValue() instanceof Binary) { resultMap.put(entry.getKey(), new ByteArrayByteIterator( ((Binary) entry.getValue()).getData())); } } } }