diff --git a/mongodb/README.md b/mongodb/README.md index 1fed1348..dd139c42 100644 --- a/mongodb/README.md +++ b/mongodb/README.md @@ -1,46 +1,46 @@ ## Quick Start This section describes how to run YCSB on MongoDB running locally. ### 1. Start MongoDB First, download MongoDB and start `mongod`. For example, to start MongoDB on x86-64 Linux box: wget http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-1.8.3.tgz tar xfvz mongodb-linux-x86_64-1.8.3.tgz mkdir /tmp/mongodb cd mongodb-linux-x86_64-1.8.3 ./bin/mongod --dbpath /tmp/mongodb ### 2. Set Up YCSB Clone the YCSB git repository and compile: git clone git://github.com/brianfrankcooper/YCSB.git cd YCSB mvn clean package ### 3. Run YCSB Now you are ready to run! First, load the data: ./bin/ycsb load mongodb -s -P workloads/workloada Then, run the workload: ./bin/ycsb run mongodb -s -P workloads/workloada See the next section for the list of configuration parameters for MongoDB. ## MongoDB Configuration Parameters ### `mongodb.url` (default: `mongodb://localhost:27017`) ### `mongodb.database` (default: `ycsb`) -### `mongodb.writeConcern` (default `safe`) +### `mongodb.writeConcern` (default `acknowledged`, options are `errors_ignored`, `unacknowledged`, `acknowledged`, `journaled`, `replica_acknowledged`) ### `mongodb.maxconnections` (default `10`) ### `mongodb.threadsAllowedToBlockForConnectionMultiplier` (default `5`) diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java index 645b233e..d83fd43e 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java @@ -1,464 +1,463 @@ /** * MongoDB client binding for YCSB using the Asynchronous Java Driver. * * Submitted by Rob Moore. */ package com.yahoo.ycsb.db; import static com.allanbank.mongodb.builder.QueryBuilder.where; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import com.allanbank.mongodb.Durability; import com.allanbank.mongodb.LockType; import com.allanbank.mongodb.MongoClient; import com.allanbank.mongodb.MongoCollection; import com.allanbank.mongodb.MongoDatabase; import com.allanbank.mongodb.MongoDbUri; import com.allanbank.mongodb.MongoFactory; import com.allanbank.mongodb.MongoIterator; import com.allanbank.mongodb.bson.Document; import com.allanbank.mongodb.bson.Element; import com.allanbank.mongodb.bson.ElementType; import com.allanbank.mongodb.bson.builder.BuilderFactory; import com.allanbank.mongodb.bson.builder.DocumentBuilder; import com.allanbank.mongodb.bson.element.BinaryElement; import com.allanbank.mongodb.builder.Find; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; /** * MongoDB asynchronous client for YCSB framework. * * Properties to set: * * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb * mongodb.writeConcern=normal * * @author rjm * */ public class AsyncMongoDbClient extends DB { /** The database to use. */ private static String database; /** Thread local document builder. */ private static final ThreadLocal DOCUMENT_BUILDER = new ThreadLocal() { @Override protected DocumentBuilder initialValue() { return BuilderFactory.start(); } }; /** The write concern for the requests. */ private static final AtomicInteger initCount = new AtomicInteger(0); /** The connection to MongoDB. */ private static MongoClient mongo; /** The write concern for the requests. */ private static Durability writeConcern; /** The database to MongoDB. */ private MongoDatabase db; /** * Cleanup any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ @Override public final void cleanup() throws DBException { if (initCount.decrementAndGet() <= 0) { try { mongo.close(); } catch (final Exception e1) { System.err.println("Could not close MongoDB connection pool: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Delete a record from the database. * * @param table * The name of the table * @param key * The record key of the record to delete. * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public final int delete(final String table, final String key) { try { final MongoCollection collection = db.getCollection(table); final Document q = BuilderFactory.start().add("_id", key).build(); final long res = collection.delete(q, writeConcern); return res == 1 ? 0 : 1; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Initialize any state for this DB. Called once per DB instance; there is * one DB instance per client thread. */ @Override public final void init() throws DBException { final int count = initCount.incrementAndGet(); final Properties props = getProperties(); final String maxConnections = props.getProperty( "mongodb.maxconnections", "10"); final int connections = Integer.parseInt(maxConnections); synchronized (AsyncMongoDbClient.class) { if (mongo != null) { db = mongo.getDatabase(database); // If there are more threads (count) than connections then the // Low latency spin lock is not really needed as we will keep // the connections occupied. if (count > connections) { mongo.getConfig().setLockType(LockType.MUTEX); } return; } // initialize MongoDb driver String url = props.getProperty("mongodb.url", "mongodb://localhost:27017"); database = props.getProperty("mongodb.database", "ycsb"); - final String writeConcernType = props.getProperty( - "mongodb.writeConcern", - props.getProperty("mongodb.durability", "safe")) + String writeConcernType = props.getProperty("mongodb.writeConcern", + props.getProperty("mongodb.durability", "acknowledged")) .toLowerCase(); - if ("none".equals(writeConcernType)) { + if ("errors_ignored".equals(writeConcernType)) { writeConcern = Durability.NONE; } - else if ("safe".equals(writeConcernType)) { - writeConcern = Durability.ACK; + else if ("unacknowledged".equals(writeConcernType)) { + writeConcern = Durability.NONE; } - else if ("normal".equals(writeConcernType)) { + else if ("acknowledged".equals(writeConcernType)) { writeConcern = Durability.ACK; } - else if ("fsync_safe".equals(writeConcernType)) { - writeConcern = Durability.fsyncDurable(10000); + else if ("journaled".equals(writeConcernType)) { + writeConcern = Durability.journalDurable(0); } - else if ("replicas_safe".equals(writeConcernType)) { - writeConcern = Durability.replicaDurable(10000); + else if ("replica_acknowledged".equals(writeConcernType)) { + writeConcern = Durability.replicaDurable(2, 0); } else { System.err - .println("ERROR: Invalid durability: '" + .println("ERROR: Invalid writeConcern: '" + writeConcernType + "'. " - + "Must be [ none | safe | normal | fsync_safe | replicas_safe ]"); + + "Must be [ errors_ignored | unacknowledged | acknowledged | journaled | replica_acknowledged ]"); System.exit(1); } try { // need to append db to url. url += "/" + database; System.out.println("new database url = " + url); mongo = MongoFactory.createClient(new MongoDbUri(url)); mongo.getConfig().setMaxConnectionCount(connections); mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN); // assumed... db = mongo.getDatabase(database); System.out.println("mongo connection created with " + url); } catch (final Exception e1) { System.err .println("Could not initialize MongoDB connection pool for Loader: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key. * * @param table * The name of the table * @param key * The record key of the record to insert. * @param values * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public final int insert(final String table, final String key, final HashMap values) { try { final MongoCollection collection = db.getCollection(table); final DocumentBuilder r = DOCUMENT_BUILDER.get().reset() .add("_id", key); for (final Map.Entry entry : values .entrySet()) { r.add(entry.getKey(), entry.getValue().toArray()); } collection.insert(writeConcern, r); return 0; } catch (final Exception e) { e.printStackTrace(); return 1; } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table * The name of the table * @param key * The record key of the record to read. * @param fields * The list of fields to read, or null for all of them * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override public final int read(final String table, final String key, final Set fields, final HashMap result) { try { final MongoCollection collection = db.getCollection(table); final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder fieldsToReturn = BuilderFactory.start(); Document queryResult = null; if (fields != null) { final Iterator iter = fields.iterator(); while (iter.hasNext()) { fieldsToReturn.add(iter.next(), 1); } final Find.Builder fb = new Find.Builder(q); fb.projection(fieldsToReturn); fb.setLimit(1); fb.setBatchSize(1); final MongoIterator ci = collection.find(fb.build()); if (ci.hasNext()) { queryResult = ci.next(); ci.close(); } } else { queryResult = collection.findOne(q); } if (queryResult != null) { fillMap(result, queryResult); } return queryResult != null ? 0 : 1; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Perform a range scan for a set of records in the database. Each * field/value pair from the result will be stored in a HashMap. * * @param table * The name of the table * @param startkey * The record key of the first record to read. * @param recordcount * The number of records to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public final int scan(final String table, final String startkey, final int recordcount, final Set fields, final Vector> result) { try { final MongoCollection collection = db.getCollection(table); // { "_id":{"$gte":startKey}} } final Find.Builder fb = new Find.Builder(); fb.setQuery(where("_id").greaterThanOrEqualTo(startkey)); fb.setLimit(recordcount); fb.setBatchSize(recordcount); if (fields != null) { final DocumentBuilder fieldsDoc = BuilderFactory.start(); for (final String field : fields) { fieldsDoc.add(field, 1); } fb.projection(fieldsDoc); } result.ensureCapacity(recordcount); final MongoIterator cursor = collection.find(fb.build()); while (cursor.hasNext()) { // toMap() returns a Map but result.add() expects a // Map. Hence, the suppress warnings. final Document doc = cursor.next(); final HashMap docAsMap = new HashMap(); fillMap(docAsMap, doc); result.add(docAsMap); } return 0; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Update a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key, overwriting any existing values with the same field name. * * @param table * The name of the table * @param key * The record key of the record to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public final int update(final String table, final String key, final HashMap values) { try { final MongoCollection collection = db.getCollection(table); final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder u = BuilderFactory.start(); final DocumentBuilder fieldsToSet = u.push("$set"); for (final Map.Entry entry : values .entrySet()) { fieldsToSet.add(entry.getKey(), entry.getValue().toArray()); } final long res = collection .update(q, u, false, false, writeConcern); return res == 1 ? 0 : 1; } catch (final Exception e) { System.err.println(e.toString()); return 1; } } /** * Fills the map with the ByteIterators from the document. * * @param result * The map to fill. * @param queryResult * The document to fill from. */ protected final void fillMap(final HashMap result, final Document queryResult) { for (final Element be : queryResult) { if (be.getType() == ElementType.BINARY) { result.put(be.getName(), new BinaryByteArrayIterator( (BinaryElement) be)); } } } /** * BinaryByteArrayIterator provides an adapter from a {@link BinaryElement} * to a {@link ByteIterator}. * * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved */ private final static class BinaryByteArrayIterator extends ByteIterator { /** The binary data. */ private final BinaryElement binaryElement; /** The current offset into the binary element. */ private int offset; /** * Creates a new BinaryByteArrayIterator. * * @param element * The {@link BinaryElement} to iterate over. */ public BinaryByteArrayIterator(final BinaryElement element) { this.binaryElement = element; this.offset = 0; } /** * {@inheritDoc} *

* Overridden to return the number of bytes remaining in the iterator. *

*/ @Override public long bytesLeft() { return Math.max(0, binaryElement.length() - offset); } /** * {@inheritDoc} *

* Overridden to return true if there is more data in the * {@link BinaryElement}. *

*/ @Override public boolean hasNext() { return (offset < binaryElement.length()); } /** * {@inheritDoc} *

* Overridden to return the next value and advance the iterator. *

*/ @Override public byte nextByte() { final byte value = binaryElement.get(offset); offset += 1; return value; } } } diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index 792c0418..814eb46e 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -1,432 +1,434 @@ /** * MongoDB client binding for YCSB. * * Submitted by Yen Pai on 5/11/2010. * * https://gist.github.com/000a66b8db2caf42467b#file_mongo_db.java * */ package com.yahoo.ycsb.db; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import com.mongodb.BasicDBObject; import com.mongodb.DBAddress; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; /** * MongoDB client for YCSB framework. * * Properties to set: * * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb - * mongodb.writeConcern=normal + * mongodb.writeConcern=acknowledged * * @author ypai */ public class MongoDbClient extends DB { /** Used to include a field in a response. */ protected static final Integer INCLUDE = Integer.valueOf(1); /** A singleton Mongo instance. */ private static MongoClient mongo; /** The default write concern for the test. */ private static WriteConcern writeConcern; /** The database to access. */ private static String database; /** * Count the number of times initialized to teardown on the last * {@link #cleanup()}. */ private static final AtomicInteger initCount = new AtomicInteger(0); /** * Initialize any state for this DB. Called once per DB instance; there is * one DB instance per client thread. */ @Override public void init() throws DBException { initCount.incrementAndGet(); synchronized (INCLUDE) { if (mongo != null) { return; } // initialize MongoDb driver Properties props = getProperties(); String url = props.getProperty("mongodb.url", "mongodb://localhost:27017"); if (url.contains(",")) { //pick one and random String[] urls = url.split(","); int index = new Random().nextInt(urls.length); url = urls[index]; System.out.printf("Using Mongo URL: %s\n", url); } database = props.getProperty("mongodb.database", "ycsb"); String writeConcernType = props.getProperty("mongodb.writeConcern", - "safe").toLowerCase(); + "acknowledged").toLowerCase(); + + // Set connectionpool to size of ycsb thread pool final String maxConnections = props.getProperty( "mongodb.maxconnections", "10"); final String threadsAllowedToBlockForConnectionMultiplier = props .getProperty( "mongodb.threadsAllowedToBlockForConnectionMultiplier", "5"); - if ("none".equals(writeConcernType)) { - writeConcern = WriteConcern.NONE; + if ("errors_ignored".equals(writeConcernType)) { + writeConcern = WriteConcern.ERRORS_IGNORED; } - else if ("safe".equals(writeConcernType)) { - writeConcern = WriteConcern.SAFE; + else if ("unacknowledged".equals(writeConcernType)) { + writeConcern = WriteConcern.UNACKNOWLEDGED; } - else if ("normal".equals(writeConcernType)) { - writeConcern = WriteConcern.NORMAL; + else if ("acknowledged".equals(writeConcernType)) { + writeConcern = WriteConcern.ACKNOWLEDGED; } - else if ("fsync_safe".equals(writeConcernType)) { - writeConcern = WriteConcern.FSYNC_SAFE; + else if ("journaled".equals(writeConcernType)) { + writeConcern = WriteConcern.JOURNALED; } - else if ("replicas_safe".equals(writeConcernType)) { - writeConcern = WriteConcern.REPLICAS_SAFE; + else if ("replica_acknowledged".equals(writeConcernType)) { + writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED; } else { System.err .println("ERROR: Invalid writeConcern: '" + writeConcernType + "'. " - + "Must be [ none | safe | normal | fsync_safe | replicas_safe ]"); + + "Must be [ errors_ignored | unacknowledged | acknowledged | journaled | replica_acknowledged ]"); System.exit(1); } try { // strip out prefix since Java driver doesn't currently support // standard connection format URL yet // http://www.mongodb.org/display/DOCS/Connections if (url.startsWith("mongodb://")) { url = url.substring(10); } // need to append db to url. url += "/" + database; System.out.println("new database url = " + url); MongoClientOptions options = MongoClientOptions .builder() .cursorFinalizerEnabled(false) .connectionsPerHost(Integer.parseInt(maxConnections)) .threadsAllowedToBlockForConnectionMultiplier( Integer.parseInt(threadsAllowedToBlockForConnectionMultiplier)) .build(); mongo = new MongoClient(new DBAddress(url), options); System.out.println("mongo connection created with " + url); } catch (Exception e1) { System.err .println("Could not initialize MongoDB connection pool for Loader: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Cleanup any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ @Override public void cleanup() throws DBException { if (initCount.decrementAndGet() <= 0) { try { mongo.close(); } catch (Exception e1) { System.err.println("Could not close MongoDB connection pool: " + e1.toString()); e1.printStackTrace(); return; } } } /** * Delete a record from the database. * * @param table * The name of the table * @param key * The record key of the record to delete. * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int delete(String table, String key) { com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); DBCollection collection = db.getCollection(table); DBObject q = new BasicDBObject().append("_id", key); WriteResult res = collection.remove(q, writeConcern); return res.getN() == 1 ? 0 : 1; } catch (Exception e) { System.err.println(e.toString()); return 1; } finally { if (db != null) { db.requestDone(); } } } /** * Insert a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key. * * @param table * The name of the table * @param key * The record key of the record to insert. * @param values * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int insert(String table, String key, HashMap values) { com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); DBCollection collection = db.getCollection(table); DBObject r = new BasicDBObject().append("_id", key); for (Map.Entry entry : values.entrySet()) { r.put(entry.getKey(), entry.getValue().toArray()); } WriteResult res = collection.insert(r, writeConcern); return res.getError() == null ? 0 : 1; } catch (Exception e) { e.printStackTrace(); return 1; } finally { if (db != null) { db.requestDone(); } } } /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. * * @param table * The name of the table * @param key * The record key of the record to read. * @param fields * The list of fields to read, or null for all of them * @param result * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override @SuppressWarnings("unchecked") public int read(String table, String key, Set fields, HashMap result) { com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); DBCollection collection = db.getCollection(table); DBObject q = new BasicDBObject().append("_id", key); DBObject fieldsToReturn = new BasicDBObject(); DBObject queryResult = null; if (fields != null) { Iterator iter = fields.iterator(); while (iter.hasNext()) { fieldsToReturn.put(iter.next(), INCLUDE); } queryResult = collection.findOne(q, fieldsToReturn); } else { queryResult = collection.findOne(q); } if (queryResult != null) { result.putAll(queryResult.toMap()); } return queryResult != null ? 0 : 1; } catch (Exception e) { System.err.println(e.toString()); return 1; } finally { if (db != null) { db.requestDone(); } } } /** * Update a record in the database. Any field/value pairs in the specified * values HashMap will be written into the record with the specified record * key, overwriting any existing values with the same field name. * * @param table * The name of the table * @param key * The record key of the record to write. * @param values * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int update(String table, String key, HashMap values) { com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); DBCollection collection = db.getCollection(table); DBObject q = new BasicDBObject().append("_id", key); DBObject u = new BasicDBObject(); DBObject fieldsToSet = new BasicDBObject(); Iterator keys = values.keySet().iterator(); while (keys.hasNext()) { String tmpKey = keys.next(); fieldsToSet.put(tmpKey, values.get(tmpKey).toArray()); } u.put("$set", fieldsToSet); WriteResult res = collection.update(q, u, false, false, writeConcern); return res.getN() == 1 ? 0 : 1; } catch (Exception e) { System.err.println(e.toString()); return 1; } finally { if (db != null) { db.requestDone(); } } } /** * Perform a range scan for a set of records in the database. Each * field/value pair from the result will be stored in a HashMap. * * @param table * The name of the table * @param startkey * The record key of the first record to read. * @param recordcount * The number of records to read * @param fields * The list of fields to read, or null for all of them * @param result * A Vector of HashMaps, where each HashMap is a set field/value * pairs for one record * @return Zero on success, a non-zero error code on error. See this class's * description for a discussion of error codes. */ @Override public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { com.mongodb.DB db = null; DBCursor cursor = null; try { db = mongo.getDB(database); db.requestStart(); DBCollection collection = db.getCollection(table); // { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} } DBObject scanRange = new BasicDBObject().append("$gte", startkey); DBObject q = new BasicDBObject().append("_id", scanRange); cursor = collection.find(q).limit(recordcount); while (cursor.hasNext()) { // toMap() returns a Map, but result.add() expects a // Map. Hence, the suppress warnings. HashMap resultMap = new HashMap(); DBObject obj = cursor.next(); fillMap(resultMap, obj); result.add(resultMap); } return 0; } catch (Exception e) { System.err.println(e.toString()); return 1; } finally { if (db != null) { if (cursor != null) { cursor.close(); } db.requestDone(); } } } /** * Fills the map with the values from the DBObject. * * @param resultMap * The map to fill/ * @param obj * The object to copy values from. */ @SuppressWarnings("unchecked") protected void fillMap(HashMap resultMap, DBObject obj) { Map objMap = obj.toMap(); for (Map.Entry entry : objMap.entrySet()) { if (entry.getValue() instanceof byte[]) { resultMap.put(entry.getKey(), new ByteArrayByteIterator( (byte[]) entry.getValue())); } } } }