Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61890952
MongoDbClient.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, May 9, 15:19
Size
14 KB
Mime Type
text/html
Expires
Sat, May 11, 15:19 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17572757
Attached To
R7507 YCSB
MongoDbClient.java
View Options
/**
* Copyright (c) 2012 - 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/*
* MongoDB client binding for YCSB.
*
* Submitted by Yen Pai on 5/11/2010.
*
* https://gist.github.com/000a66b8db2caf42467b#file_mongo_database.java
*/
package
com.yahoo.ycsb.db
;
import
com.mongodb.MongoClient
;
import
com.mongodb.MongoClientURI
;
import
com.mongodb.ReadPreference
;
import
com.mongodb.WriteConcern
;
import
com.mongodb.client.FindIterable
;
import
com.mongodb.client.MongoCollection
;
import
com.mongodb.client.MongoCursor
;
import
com.mongodb.client.MongoDatabase
;
import
com.mongodb.client.model.InsertManyOptions
;
import
com.mongodb.client.model.UpdateOneModel
;
import
com.mongodb.client.model.UpdateOptions
;
import
com.mongodb.client.result.DeleteResult
;
import
com.mongodb.client.result.UpdateResult
;
import
com.yahoo.ycsb.ByteArrayByteIterator
;
import
com.yahoo.ycsb.ByteIterator
;
import
com.yahoo.ycsb.DB
;
import
com.yahoo.ycsb.DBException
;
import
com.yahoo.ycsb.StatusCode
;
import
org.bson.Document
;
import
org.bson.types.Binary
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.Vector
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
* MongoDB asynchronous client for YCSB framework using the MongoDB Inc. <a
* href="http://docs.mongodb.org/ecosystem/drivers/java/">driver</a>
* <p>
* See the <code>README.md</code> for configuration information.
* </p>
*
* @author ypai
* @see <a href="http://docs.mongodb.org/ecosystem/drivers/java/">MongoDB Inc.
* driver</a>
*/
public
class
MongoDbClient
extends
DB
{
/** Used to include a field in a response. */
private
static
final
Integer
INCLUDE
=
Integer
.
valueOf
(
1
);
/** The options to use for inserting many documents. */
private
static
final
InsertManyOptions
INSERT_UNORDERED
=
new
InsertManyOptions
().
ordered
(
false
);
/** The options to use for inserting a single document. */
private
static
final
UpdateOptions
UPDATE_WITH_UPSERT
=
new
UpdateOptions
()
.
upsert
(
true
);
/**
* The database name to access.
*/
private
static
String
databaseName
;
/** The database name to access. */
private
static
MongoDatabase
database
;
/**
* Count the number of times initialized to teardown on the last
* {@link #cleanup()}.
*/
private
static
final
AtomicInteger
INIT_COUNT
=
new
AtomicInteger
(
0
);
/** A singleton Mongo instance. */
private
static
MongoClient
mongoClient
;
/** The default read preference for the test. */
private
static
ReadPreference
readPreference
;
/** The default write concern for the test. */
private
static
WriteConcern
writeConcern
;
/** The batch size to use for inserts. */
private
static
int
batchSize
;
/** If true then use updates with the upsert option for inserts. */
private
static
boolean
useUpsert
;
/** The bulk inserts pending for the thread. */
private
final
List
<
Document
>
bulkInserts
=
new
ArrayList
<
Document
>();
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public
void
cleanup
()
throws
DBException
{
if
(
INIT_COUNT
.
decrementAndGet
()
==
0
)
{
try
{
mongoClient
.
close
();
}
catch
(
Exception
e1
)
{
System
.
err
.
println
(
"Could not close MongoDB connection pool: "
+
e1
.
toString
());
e1
.
printStackTrace
();
return
;
}
finally
{
database
=
null
;
mongoClient
=
null
;
}
}
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See the {@link DB}
* class's description for a discussion of error codes.
*/
@Override
public
int
delete
(
String
table
,
String
key
)
{
try
{
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
);
Document
query
=
new
Document
(
"_id"
,
key
);
DeleteResult
result
=
collection
.
withWriteConcern
(
writeConcern
).
deleteOne
(
query
);
if
(
result
.
wasAcknowledged
()
&&
result
.
getDeletedCount
()
==
0
)
{
System
.
err
.
println
(
"Nothing deleted for key "
+
key
);
return
StatusCode
.
ERROR
;
}
return
StatusCode
.
OK
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
e
.
toString
());
return
StatusCode
.
ERROR
;
}
}
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public
void
init
()
throws
DBException
{
INIT_COUNT
.
incrementAndGet
();
synchronized
(
INCLUDE
)
{
if
(
mongoClient
!=
null
)
{
return
;
}
Properties
props
=
getProperties
();
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize
=
Integer
.
parseInt
(
props
.
getProperty
(
"batchsize"
,
"1"
));
// Set is inserts are done as upserts. Defaults to false.
useUpsert
=
Boolean
.
parseBoolean
(
props
.
getProperty
(
"mongodb.upsert"
,
"false"
));
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
// to configure the client.
String
url
=
props
.
getProperty
(
"mongodb.url"
,
null
);
boolean
defaultedUrl
=
false
;
if
(
url
==
null
)
{
defaultedUrl
=
true
;
url
=
"mongodb://localhost:27017/ycsb?w=1"
;
}
url
=
OptionsSupport
.
updateUrl
(
url
,
props
);
if
(!
url
.
startsWith
(
"mongodb://"
))
{
System
.
err
.
println
(
"ERROR: Invalid URL: '"
+
url
+
"'. Must be of the form "
+
"'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. "
+
"http://docs.mongodb.org/manual/reference/connection-string/"
);
System
.
exit
(
1
);
}
try
{
MongoClientURI
uri
=
new
MongoClientURI
(
url
);
String
uriDb
=
uri
.
getDatabase
();
if
(!
defaultedUrl
&&
(
uriDb
!=
null
)
&&
!
uriDb
.
isEmpty
()
&&
!
"admin"
.
equals
(
uriDb
))
{
databaseName
=
uriDb
;
}
else
{
// If no database is specified in URI, use "ycsb"
databaseName
=
"ycsb"
;
}
readPreference
=
uri
.
getOptions
().
getReadPreference
();
writeConcern
=
uri
.
getOptions
().
getWriteConcern
();
mongoClient
=
new
MongoClient
(
uri
);
database
=
mongoClient
.
getDatabase
(
databaseName
)
.
withReadPreference
(
readPreference
)
.
withWriteConcern
(
writeConcern
);
System
.
out
.
println
(
"mongo client connection created with "
+
url
);
}
catch
(
Exception
e1
)
{
System
.
err
.
println
(
"Could not initialize MongoDB connection pool for Loader: "
+
e1
.
toString
());
e1
.
printStackTrace
();
return
;
}
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See the {@link DB}
* class's description for a discussion of error codes.
*/
@Override
public
int
insert
(
String
table
,
String
key
,
HashMap
<
String
,
ByteIterator
>
values
)
{
try
{
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
);
Document
toInsert
=
new
Document
(
"_id"
,
key
);
for
(
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
toInsert
.
put
(
entry
.
getKey
(),
entry
.
getValue
().
toArray
());
}
if
(
batchSize
==
1
)
{
if
(
useUpsert
)
{
// this is effectively an insert, but using an upsert instead due
// to current inability of the framework to clean up after itself
// between test runs.
collection
.
replaceOne
(
new
Document
(
"_id"
,
toInsert
.
get
(
"_id"
)),
toInsert
,
UPDATE_WITH_UPSERT
);
}
else
{
collection
.
insertOne
(
toInsert
);
}
}
else
{
bulkInserts
.
add
(
toInsert
);
if
(
bulkInserts
.
size
()
==
batchSize
)
{
if
(
useUpsert
)
{
List
<
UpdateOneModel
<
Document
>>
updates
=
new
ArrayList
<
UpdateOneModel
<
Document
>>(
bulkInserts
.
size
());
for
(
Document
doc
:
bulkInserts
)
{
updates
.
add
(
new
UpdateOneModel
<
Document
>(
new
Document
(
"_id"
,
doc
.
get
(
"_id"
)),
doc
,
UPDATE_WITH_UPSERT
));
}
collection
.
bulkWrite
(
updates
);
}
else
{
collection
.
insertMany
(
bulkInserts
,
INSERT_UNORDERED
);
}
bulkInserts
.
clear
();
}
}
return
StatusCode
.
OK
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Exception while trying bulk insert with "
+
bulkInserts
.
size
());
e
.
printStackTrace
();
return
StatusCode
.
ERROR
;
}
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
public
int
read
(
String
table
,
String
key
,
Set
<
String
>
fields
,
HashMap
<
String
,
ByteIterator
>
result
)
{
try
{
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
);
Document
query
=
new
Document
(
"_id"
,
key
);
FindIterable
<
Document
>
findIterable
=
collection
.
find
(
query
);
if
(
fields
!=
null
)
{
Document
projection
=
new
Document
();
for
(
String
field
:
fields
)
{
projection
.
put
(
field
,
INCLUDE
);
}
findIterable
.
projection
(
projection
);
}
Document
queryResult
=
findIterable
.
first
();
if
(
queryResult
!=
null
)
{
fillMap
(
result
,
queryResult
);
}
return
queryResult
!=
null
?
StatusCode
.
OK
:
StatusCode
.
ERROR
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
e
.
toString
());
return
StatusCode
.
ERROR
;
}
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error. See the {@link DB}
* class's description for a discussion of error codes.
*/
@Override
public
int
scan
(
String
table
,
String
startkey
,
int
recordcount
,
Set
<
String
>
fields
,
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
MongoCursor
<
Document
>
cursor
=
null
;
try
{
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
);
Document
scanRange
=
new
Document
(
"$gte"
,
startkey
);
Document
query
=
new
Document
(
"_id"
,
scanRange
);
Document
sort
=
new
Document
(
"_id"
,
INCLUDE
);
FindIterable
<
Document
>
findIterable
=
collection
.
find
(
query
).
sort
(
sort
).
limit
(
recordcount
);
if
(
fields
!=
null
)
{
Document
projection
=
new
Document
();
for
(
String
fieldName
:
fields
)
{
projection
.
put
(
fieldName
,
INCLUDE
);
}
findIterable
.
projection
(
projection
);
}
cursor
=
findIterable
.
iterator
();
if
(!
cursor
.
hasNext
())
{
System
.
err
.
println
(
"Nothing found in scan for key "
+
startkey
);
return
StatusCode
.
ERROR
;
}
result
.
ensureCapacity
(
recordcount
);
while
(
cursor
.
hasNext
())
{
HashMap
<
String
,
ByteIterator
>
resultMap
=
new
HashMap
<
String
,
ByteIterator
>();
Document
obj
=
cursor
.
next
();
fillMap
(
resultMap
,
obj
);
result
.
add
(
resultMap
);
}
return
StatusCode
.
OK
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
e
.
toString
());
return
StatusCode
.
ERROR
;
}
finally
{
if
(
cursor
!=
null
)
{
cursor
.
close
();
}
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public
int
update
(
String
table
,
String
key
,
HashMap
<
String
,
ByteIterator
>
values
)
{
try
{
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
);
Document
query
=
new
Document
(
"_id"
,
key
);
Document
fieldsToSet
=
new
Document
();
for
(
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
fieldsToSet
.
put
(
entry
.
getKey
(),
entry
.
getValue
().
toArray
());
}
Document
update
=
new
Document
(
"$set"
,
fieldsToSet
);
UpdateResult
result
=
collection
.
updateOne
(
query
,
update
);
if
(
result
.
wasAcknowledged
()
&&
result
.
getMatchedCount
()
==
0
)
{
System
.
err
.
println
(
"Nothing updated for key "
+
key
);
return
StatusCode
.
ERROR
;
}
return
StatusCode
.
OK
;
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
e
.
toString
());
return
StatusCode
.
ERROR
;
}
}
/**
* Fills the map with the values from the DBObject.
*
* @param resultMap
* The map to fill/
* @param obj
* The object to copy values from.
*/
protected
void
fillMap
(
Map
<
String
,
ByteIterator
>
resultMap
,
Document
obj
)
{
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
obj
.
entrySet
())
{
if
(
entry
.
getValue
()
instanceof
Binary
)
{
resultMap
.
put
(
entry
.
getKey
(),
new
ByteArrayByteIterator
(((
Binary
)
entry
.
getValue
()).
getData
()));
}
}
}
}
Event Timeline
Log In to Comment