Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61754210
NativeAsyncMongoDbClient.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 8, 18:37
Size
17 KB
Mime Type
text/html
Expires
Fri, May 10, 18:37 (2 d)
Engine
blob
Format
Raw Data
Handle
17554218
Attached To
R7507 YCSB
NativeAsyncMongoDbClient.java
View Options
/*
* Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package
com.yahoo.ycsb.db
;
import
com.mongodb.ConnectionString
;
import
com.mongodb.MongoBulkWriteException
;
import
com.mongodb.MongoClientSettings
;
import
com.mongodb.ReadPreference
;
import
com.mongodb.WriteConcern
;
import
com.mongodb.async.client.*
;
import
com.mongodb.async.client.MongoClient
;
import
com.mongodb.bulk.BulkWriteResult
;
import
com.mongodb.client.model.*
;
import
com.mongodb.client.result.DeleteResult
;
import
com.yahoo.ycsb.AsyncDB
;
import
com.yahoo.ycsb.ByteIterator
;
import
com.yahoo.ycsb.DBException
;
import
com.yahoo.ycsb.Status
;
import
com.yahoo.ycsb.StringByteIterator
;
import
org.bson.Document
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Set
;
import
java.util.Vector
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.function.Consumer
;
import static
com.mongodb.client.model.Filters.eq
;
import static
com.mongodb.client.model.Filters.gte
;
import static
com.mongodb.client.model.Projections.fields
;
import static
com.mongodb.client.model.Projections.include
;
/**
* MongoDB asynchronous client for YCSB framework using the <a
* href="http://www.allanbank.com/mongodb-async-driver/">Asynchronous Java
* Driver</a>
* <p>
* See the <code>README.md</code> for configuration information.
* </p>
*
* @author rjm
* @see <a href="http://www.allanbank.com/mongodb-async-driver/">Asynchronous
* Java Driver</a>
*/
public
class
NativeAsyncMongoDbClient
extends
AsyncDB
{
/**
* Used to include a field in a response.
*/
protected
static
final
int
INCLUDE
=
1
;
/**
* The database to use.
*/
private
static
String
databaseName
;
/**
* The write concern for the requests.
*/
private
static
final
AtomicInteger
INIT_COUNT
=
new
AtomicInteger
(
0
);
/**
* The connection to MongoDB.
*/
private
static
MongoClient
mongoClient
;
/**
* The write concern for the requests.
*/
private
static
WriteConcern
writeConcern
;
/**
* Which servers to use for reads.
*/
private
static
ReadPreference
readPreference
;
/**
* The database to MongoDB.
*/
private
MongoDatabase
database
;
/**
* The batch size to use for inserts.
*/
private
static
int
batchSize
;
/**
* If true then use updates with the upsert option for inserts.
*/
private
static
boolean
useUpsert
;
/**
* The bulk inserts pending for the thread.
*/
private
final
ArrayList
<
Document
>
batchedWrites
=
new
ArrayList
<>();
private
final
ArrayList
<
WriteModel
<
Document
>>
batchedUpserts
=
new
ArrayList
<>();
/**
* The number of writes in the batchedWrite.
*/
private
int
batchedWriteCount
=
0
;
private
final
InsertManyOptions
insertManyOptions
=
new
InsertManyOptions
().
ordered
(
false
);
private
final
UpdateOptions
upsertOptions
=
new
UpdateOptions
().
upsert
(
true
);
private
final
BulkWriteOptions
bulkWriteOptions
=
new
BulkWriteOptions
().
ordered
(
false
);
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public
final
void
cleanup
()
throws
DBException
{
if
(
INIT_COUNT
.
decrementAndGet
()
==
0
)
{
try
{
mongoClient
.
close
();
}
catch
(
final
Exception
e1
)
{
System
.
err
.
println
(
"Could not close MongoDB connection pool: "
+
e1
.
toString
());
e1
.
printStackTrace
();
return
;
}
finally
{
mongoClient
=
null
;
database
=
null
;
}
}
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public
final
CompletableFuture
<
Status
>
delete
(
final
String
table
,
final
String
key
)
{
CompletableFuture
<
Status
>
deleteResult
=
new
CompletableFuture
<>();
final
MongoCollection
collection
=
database
.
getCollection
(
table
).
withWriteConcern
(
writeConcern
);
collection
.
deleteOne
(
eq
(
key
),
(
r
,
ex
)
->
{
DeleteResult
result
=
(
DeleteResult
)
r
;
if
(
ex
!=
null
)
{
System
.
err
.
println
(
ex
.
toString
());
deleteResult
.
complete
(
Status
.
ERROR
);
}
else
if
(!
result
.
wasAcknowledged
()
&&
writeConcern
.
isAcknowledged
())
{
System
.
err
.
println
(
"Delete was not acknowledged for key "
+
key
);
deleteResult
.
complete
(
Status
.
UNEXPECTED_STATE
);
}
else
if
(
result
.
getDeletedCount
()
==
0
)
{
System
.
err
.
println
(
"Nothing deleted for key "
+
key
);
deleteResult
.
complete
(
Status
.
NOT_FOUND
);
}
else
{
deleteResult
.
complete
(
Status
.
OK
);
}
});
return
deleteResult
;
}
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public
final
void
init
()
throws
DBException
{
final
int
count
=
INIT_COUNT
.
incrementAndGet
();
synchronized
(
NativeAsyncMongoDbClient
.
class
)
{
final
Properties
props
=
getProperties
();
if
(
mongoClient
!=
null
)
{
database
=
mongoClient
.
getDatabase
(
databaseName
);
// // If there are more threads (count) than connections then the
// // Low latency spin lock is not really needed as we will keep
// // the connections occupied.
// if (count > mongoClient.getConfig().getMaxConnectionCount()) {
// mongoClient.getConfig().setLockType(LockType.MUTEX);
// }
return
;
}
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize
=
Integer
.
parseInt
(
props
.
getProperty
(
"mongodb.batchsize"
,
"1"
));
if
(
batchSize
>
1
)
{
batchedWrites
.
ensureCapacity
(
batchSize
);
}
// Set is inserts are done as upserts. Defaults to false.
useUpsert
=
Boolean
.
parseBoolean
(
props
.
getProperty
(
"mongodb.upsert"
,
"false"
));
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
// to configure the client.
String
url
=
props
.
getProperty
(
"mongodb.url"
,
"mongodb://localhost:27017/ycsb?w=1"
);
if
(!
url
.
startsWith
(
"mongodb://"
))
{
System
.
err
.
println
(
"ERROR: Invalid URL: '"
+
url
+
"'. Must be of the form "
+
"'mongodb://<host1>:<port1>,<host2>:<port2>/database?"
+
"options'. See "
+
"http://docs.mongodb.org/manual/reference/connection-string/."
);
System
.
exit
(
1
);
}
ConnectionString
uri
=
new
ConnectionString
(
url
);
try
{
databaseName
=
uri
.
getDatabase
();
if
((
databaseName
==
null
)
||
databaseName
.
isEmpty
())
{
// Default database is "ycsb" if database is not
// specified in URL
databaseName
=
"ycsb"
;
}
MongoClientSettings
config
=
MongoClientSettings
.
builder
().
applyConnectionString
(
uri
).
build
();
mongoClient
=
MongoClients
.
create
(
config
);
// MongoClientConfiguration config = mongoClient.getConfig();
// if (!url.toLowerCase().contains("locktype=")) {
// config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
// }
readPreference
=
config
.
getReadPreference
();
writeConcern
=
config
.
getWriteConcern
();
database
=
mongoClient
.
getDatabase
(
databaseName
);
System
.
out
.
println
(
"mongo connection created with "
+
url
);
}
catch
(
final
Exception
e1
)
{
System
.
err
.
println
(
"Could not initialize MongoDB connection pool for Loader: "
+
e1
.
toString
());
e1
.
printStackTrace
();
return
;
}
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See the {@link AsyncDB}
* class's description for a discussion of error codes.
*/
@Override
public
final
CompletableFuture
<
Status
>
insert
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
CompletableFuture
<
Status
>
insertResult
=
new
CompletableFuture
<>();
Consumer
<
Throwable
>
bulkWriteExceptionHandler
=
ex
->
{
if
(
ex
!=
null
)
{
if
(
ex
instanceof
MongoBulkWriteException
)
{
BulkWriteResult
wResult
=
((
MongoBulkWriteException
)
ex
).
getWriteResult
();
if
(!
wResult
.
wasAcknowledged
()
&&
writeConcern
.
isAcknowledged
())
{
System
.
err
.
println
(
"Bulk insert was not acknowledged."
);
insertResult
.
complete
(
Status
.
UNEXPECTED_STATE
);
}
else
{
System
.
err
.
println
(
"Bulk insert of "
+
batchSize
+
" records failed for "
+
(
batchSize
-
wResult
.
getInsertedCount
())
+
" records."
);
insertResult
.
complete
(
Status
.
ERROR
);
}
}
else
{
System
.
err
.
println
(
"Exception while trying to bulk-insert "
+
batchSize
+
" records."
);
insertResult
.
complete
(
Status
.
ERROR
);
}
}
else
{
insertResult
.
complete
(
Status
.
OK
);
}
};
final
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
).
withWriteConcern
(
writeConcern
);
final
Document
toInsert
=
new
Document
(
"_id"
,
key
);
for
(
final
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
toInsert
.
append
(
entry
.
getKey
(),
entry
.
getValue
().
toArray
());
}
if
(
batchSize
<=
1
)
{
if
(
useUpsert
)
{
collection
.
updateOne
(
eq
(
key
),
new
Document
(
"$setOnInsert"
,
toInsert
),
upsertOptions
,
(
r
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"Exception while trying to insert one record: "
+
ex
);
insertResult
.
complete
(
Status
.
ERROR
);
}
else
if
(!
r
.
wasAcknowledged
()
&&
writeConcern
.
isAcknowledged
())
{
System
.
err
.
println
(
"Upsert was not acknowledged for key "
+
key
);
insertResult
.
complete
(
Status
.
UNEXPECTED_STATE
);
}
else
if
(
r
.
getUpsertedId
()
==
null
||
!
r
.
getUpsertedId
().
asString
().
toString
().
equals
(
key
))
{
insertResult
.
complete
(
Status
.
NOT_FOUND
);
}
else
{
insertResult
.
complete
(
Status
.
OK
);
}
});
}
else
{
collection
.
insertOne
(
toInsert
,
(
r
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"Exception while trying to insert one record: "
+
ex
);
insertResult
.
complete
(
Status
.
ERROR
);
}
else
{
insertResult
.
complete
(
Status
.
OK
);
}
});
}
}
else
{
if
(
useUpsert
)
{
batchedUpserts
.
add
(
new
UpdateOneModel
<>(
eq
(
key
),
new
Document
(
"$setOnInsert"
,
toInsert
),
upsertOptions
));
}
else
{
batchedWrites
.
add
(
toInsert
);
}
batchedWriteCount
+=
1
;
if
(
batchedWriteCount
<
batchSize
)
{
insertResult
.
complete
(
Status
.
BATCHED_OK
);
}
else
{
if
(
useUpsert
)
{
collection
.
bulkWrite
(
batchedUpserts
,
bulkWriteOptions
,
(
r
,
ex
)
->
bulkWriteExceptionHandler
.
accept
(
ex
));
batchedUpserts
.
clear
();
}
else
{
collection
.
insertMany
(
batchedWrites
,
insertManyOptions
,
(
r
,
ex
)
->
bulkWriteExceptionHandler
.
accept
(
ex
));
batchedWrites
.
clear
();
}
batchedWriteCount
=
0
;
}
}
return
insertResult
;
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
public
final
CompletableFuture
<
Status
>
read
(
final
String
table
,
final
String
key
,
final
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
{
CompletableFuture
<
Status
>
readResult
=
new
CompletableFuture
<>();
final
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
).
withReadPreference
(
readPreference
);
FindIterable
<
Document
>
findQuery
=
collection
.
find
(
eq
(
key
)).
limit
(
1
).
batchSize
(
1
);
if
(
fields
!=
null
)
{
findQuery
=
findQuery
.
projection
(
fields
(
include
(
new
ArrayList
<>(
fields
))));
}
findQuery
.
first
((
r
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"Exception while trying to read key "
+
key
+
": "
+
ex
);
readResult
.
complete
(
Status
.
ERROR
);
}
else
{
if
(
r
==
null
)
{
readResult
.
complete
(
Status
.
NOT_FOUND
);
}
else
{
r
.
forEach
((
k
,
v
)
->
result
.
put
(
k
,
new
StringByteIterator
((
String
)
v
)));
readResult
.
complete
(
Status
.
OK
);
}
}
});
return
readResult
;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error. See the {@link AsyncDB}
* class's description for a discussion of error codes.
*/
@Override
public
final
CompletableFuture
<
Status
>
scan
(
final
String
table
,
final
String
startkey
,
final
int
recordcount
,
final
Set
<
String
>
fields
,
final
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
CompletableFuture
<
Status
>
scanResult
=
new
CompletableFuture
<>();
final
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
).
withReadPreference
(
readPreference
);
FindIterable
<
Document
>
scanQuery
=
collection
.
find
(
gte
(
"_id"
,
startkey
))
.
limit
(
recordcount
).
batchSize
(
recordcount
).
sort
(
Sorts
.
ascending
(
"_id"
));
if
(
fields
!=
null
)
{
scanQuery
=
scanQuery
.
projection
(
fields
(
include
(
new
ArrayList
<>(
fields
))));
}
result
.
ensureCapacity
(
recordcount
);
scanQuery
.
forEach
((
r
->
{
HashMap
<
String
,
ByteIterator
>
map
=
new
HashMap
<>();
r
.
forEach
((
k
,
v
)
->
map
.
put
(
k
,
new
StringByteIterator
((
String
)
v
)));
result
.
add
(
map
);
}),
(
r
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"Exception while trying to scan from key "
+
startkey
+
" "
+
recordcount
+
" records: "
+
ex
);
scanResult
.
complete
(
Status
.
ERROR
);
}
else
if
(
result
.
size
()
<
recordcount
)
{
System
.
err
.
println
(
"Scan returned fewer records than expected: "
+
result
.
size
()
+
" instead of "
+
recordcount
+
"."
);
scanResult
.
complete
(
Status
.
NOT_FOUND
);
}
else
{
scanResult
.
complete
(
Status
.
OK
);
}
});
return
scanResult
;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See the {@link DB}
* class's description for a discussion of error codes.
*/
@Override
public
final
CompletableFuture
<
Status
>
update
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
CompletableFuture
<
Status
>
updateResult
=
new
CompletableFuture
<>();
final
MongoCollection
<
Document
>
collection
=
database
.
getCollection
(
table
).
withWriteConcern
(
writeConcern
);
Document
toUpdate
=
new
Document
();
for
(
final
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
toUpdate
.
append
(
entry
.
getKey
(),
entry
.
getValue
());
}
collection
.
updateOne
(
eq
(
key
),
new
Document
(
"$set"
,
toUpdate
),
(
r
,
ex
)
->
{
if
(
ex
!=
null
)
{
System
.
err
.
println
(
"Exception while trying to update one record: "
+
ex
);
updateResult
.
complete
(
Status
.
ERROR
);
}
else
if
(!
r
.
wasAcknowledged
()
&&
writeConcern
.
isAcknowledged
())
{
System
.
err
.
println
(
"Update was not acknowledged for key "
+
key
);
updateResult
.
complete
(
Status
.
UNEXPECTED_STATE
);
}
else
if
(
r
.
getMatchedCount
()
<
1
)
{
updateResult
.
complete
(
Status
.
NOT_FOUND
);
}
else
{
updateResult
.
complete
(
Status
.
OK
);
}
});
return
updateResult
;
}
}
Event Timeline
Log In to Comment