Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60154104
Couchbase2Client.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Apr 27, 22:09
Size
34 KB
Mime Type
text/x-java
Expires
Mon, Apr 29, 22:09 (2 d)
Engine
blob
Format
Raw Data
Handle
17309680
Attached To
R7507 YCSB
Couchbase2Client.java
View Options
/**
* Copyright (c) 2016 Yahoo! Inc. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package
com.yahoo.ycsb.db.couchbase2
;
import
com.couchbase.client.core.env.DefaultCoreEnvironment
;
import
com.couchbase.client.core.env.resources.IoPoolShutdownHook
;
import
com.couchbase.client.core.logging.CouchbaseLogger
;
import
com.couchbase.client.core.logging.CouchbaseLoggerFactory
;
import
com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig
;
import
com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig
;
import
com.couchbase.client.core.metrics.LatencyMetricsCollectorConfig
;
import
com.couchbase.client.core.metrics.MetricsCollectorConfig
;
import
com.couchbase.client.deps.com.fasterxml.jackson.core.JsonFactory
;
import
com.couchbase.client.deps.com.fasterxml.jackson.core.JsonGenerator
;
import
com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode
;
import
com.couchbase.client.deps.com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.couchbase.client.deps.io.netty.channel.DefaultSelectStrategyFactory
;
import
com.couchbase.client.deps.io.netty.channel.EventLoopGroup
;
import
com.couchbase.client.deps.io.netty.channel.SelectStrategy
;
import
com.couchbase.client.deps.io.netty.channel.SelectStrategyFactory
;
import
com.couchbase.client.deps.io.netty.channel.epoll.EpollEventLoopGroup
;
import
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup
;
import
com.couchbase.client.deps.io.netty.util.IntSupplier
;
import
com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory
;
import
com.couchbase.client.java.Bucket
;
import
com.couchbase.client.java.Cluster
;
import
com.couchbase.client.java.CouchbaseCluster
;
import
com.couchbase.client.java.PersistTo
;
import
com.couchbase.client.java.ReplicateTo
;
import
com.couchbase.client.java.document.Document
;
import
com.couchbase.client.java.document.RawJsonDocument
;
import
com.couchbase.client.java.document.json.JsonArray
;
import
com.couchbase.client.java.document.json.JsonObject
;
import
com.couchbase.client.java.env.CouchbaseEnvironment
;
import
com.couchbase.client.java.env.DefaultCouchbaseEnvironment
;
import
com.couchbase.client.java.error.TemporaryFailureException
;
import
com.couchbase.client.java.query.*
;
import
com.couchbase.client.java.transcoder.JacksonTransformers
;
import
com.couchbase.client.java.util.Blocking
;
import
com.yahoo.ycsb.ByteIterator
;
import
com.yahoo.ycsb.DB
;
import
com.yahoo.ycsb.DBException
;
import
com.yahoo.ycsb.Status
;
import
com.yahoo.ycsb.StringByteIterator
;
import
rx.Observable
;
import
rx.Subscriber
;
import
rx.functions.Action1
;
import
rx.functions.Func1
;
import
java.io.StringWriter
;
import
java.io.Writer
;
import
java.nio.channels.spi.SelectorProvider
;
import
java.util.*
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.locks.LockSupport
;
/**
* A class that wraps the 2.x Couchbase SDK to be used with YCSB.
*
* <p> The following options can be passed when using this database client to override the defaults.
*
* <ul>
* <li><b>couchbase.host=127.0.0.1</b> The hostname from one server.</li>
* <li><b>couchbase.bucket=default</b> The bucket name to use.</li>
* <li><b>couchbase.password=</b> The password of the bucket.</li>
* <li><b>couchbase.syncMutationResponse=true</b> If mutations should wait for the response to complete.</li>
* <li><b>couchbase.persistTo=0</b> Persistence durability requirement</li>
* <li><b>couchbase.replicateTo=0</b> Replication durability requirement</li>
* <li><b>couchbase.upsert=false</b> Use upsert instead of insert or replace.</li>
* <li><b>couchbase.adhoc=false</b> If set to true, prepared statements are not used.</li>
* <li><b>couchbase.kv=true</b> If set to false, mutation operations will also be performed through N1QL.</li>
* <li><b>couchbase.maxParallelism=1</b> The server parallelism for all n1ql queries.</li>
* <li><b>couchbase.kvEndpoints=1</b> The number of KV sockets to open per server.</li>
* <li><b>couchbase.queryEndpoints=5</b> The number of N1QL Query sockets to open per server.</li>
* <li><b>couchbase.epoll=false</b> If Epoll instead of NIO should be used (only available for linux.</li>
* <li><b>couchbase.boost=3</b> If > 0 trades CPU for higher throughput. N is the number of event loops, ideally
* set to the number of physical cores. Setting higher than that will likely degrade performance.</li>
* <li><b>couchbase.networkMetricsInterval=0</b> The interval in seconds when latency metrics will be logged.</li>
* <li><b>couchbase.runtimeMetricsInterval=0</b> The interval in seconds when runtime metrics will be logged.</li>
* <li><b>couchbase.documentExpiry=0</b> Document Expiry is the amount of time until a document expires in
* Couchbase.</li>
* </ul>
*/
public
class
Couchbase2Client
extends
DB
{
static
{
// No need to send the full encoded_plan for this benchmark workload, less network overhead!
System
.
setProperty
(
"com.couchbase.query.encodedPlanEnabled"
,
"false"
);
}
private
static
final
String
SEPARATOR
=
":"
;
private
static
final
CouchbaseLogger
LOGGER
=
CouchbaseLoggerFactory
.
getInstance
(
Couchbase2Client
.
class
);
private
static
final
Object
INIT_COORDINATOR
=
new
Object
();
private
static
volatile
CouchbaseEnvironment
env
=
null
;
private
Cluster
cluster
;
private
Bucket
bucket
;
private
String
bucketName
;
private
boolean
upsert
;
private
PersistTo
persistTo
;
private
ReplicateTo
replicateTo
;
private
boolean
syncMutResponse
;
private
boolean
epoll
;
private
long
kvTimeout
;
private
boolean
adhoc
;
private
boolean
kv
;
private
int
maxParallelism
;
private
String
host
;
private
int
kvEndpoints
;
private
int
queryEndpoints
;
private
int
boost
;
private
int
networkMetricsInterval
;
private
int
runtimeMetricsInterval
;
private
String
scanAllQuery
;
private
int
documentExpiry
;
@Override
public
void
init
()
throws
DBException
{
Properties
props
=
getProperties
();
host
=
props
.
getProperty
(
"couchbase.host"
,
"127.0.0.1"
);
bucketName
=
props
.
getProperty
(
"couchbase.bucket"
,
"default"
);
String
bucketPassword
=
props
.
getProperty
(
"couchbase.password"
,
""
);
upsert
=
props
.
getProperty
(
"couchbase.upsert"
,
"false"
).
equals
(
"true"
);
persistTo
=
parsePersistTo
(
props
.
getProperty
(
"couchbase.persistTo"
,
"0"
));
replicateTo
=
parseReplicateTo
(
props
.
getProperty
(
"couchbase.replicateTo"
,
"0"
));
syncMutResponse
=
props
.
getProperty
(
"couchbase.syncMutationResponse"
,
"true"
).
equals
(
"true"
);
adhoc
=
props
.
getProperty
(
"couchbase.adhoc"
,
"false"
).
equals
(
"true"
);
kv
=
props
.
getProperty
(
"couchbase.kv"
,
"true"
).
equals
(
"true"
);
maxParallelism
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.maxParallelism"
,
"1"
));
kvEndpoints
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.kvEndpoints"
,
"1"
));
queryEndpoints
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.queryEndpoints"
,
"1"
));
epoll
=
props
.
getProperty
(
"couchbase.epoll"
,
"false"
).
equals
(
"true"
);
boost
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.boost"
,
"3"
));
networkMetricsInterval
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.networkMetricsInterval"
,
"0"
));
runtimeMetricsInterval
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.runtimeMetricsInterval"
,
"0"
));
documentExpiry
=
Integer
.
parseInt
(
props
.
getProperty
(
"couchbase.documentExpiry"
,
"0"
));
scanAllQuery
=
"SELECT RAW meta().id FROM `"
+
bucketName
+
"` WHERE meta().id >= '$1' ORDER BY meta().id LIMIT $2"
;
try
{
synchronized
(
INIT_COORDINATOR
)
{
if
(
env
==
null
)
{
LatencyMetricsCollectorConfig
latencyConfig
=
networkMetricsInterval
<=
0
?
DefaultLatencyMetricsCollectorConfig
.
disabled
()
:
DefaultLatencyMetricsCollectorConfig
.
builder
()
.
emitFrequency
(
networkMetricsInterval
)
.
emitFrequencyUnit
(
TimeUnit
.
SECONDS
)
.
build
();
MetricsCollectorConfig
runtimeConfig
=
runtimeMetricsInterval
<=
0
?
DefaultMetricsCollectorConfig
.
disabled
()
:
DefaultMetricsCollectorConfig
.
create
(
runtimeMetricsInterval
,
TimeUnit
.
SECONDS
);
DefaultCouchbaseEnvironment
.
Builder
builder
=
DefaultCouchbaseEnvironment
.
builder
()
.
queryEndpoints
(
queryEndpoints
)
.
callbacksOnIoPool
(
true
)
.
runtimeMetricsCollectorConfig
(
runtimeConfig
)
.
networkLatencyMetricsCollectorConfig
(
latencyConfig
)
.
socketConnectTimeout
(
10000
)
// 10 secs socket connect timeout
.
connectTimeout
(
30000
)
// 30 secs overall bucket open timeout
.
kvTimeout
(
10000
)
// 10 instead of 2.5s for KV ops
.
kvEndpoints
(
kvEndpoints
);
// Tune boosting and epoll based on settings
SelectStrategyFactory
factory
=
boost
>
0
?
new
BackoffSelectStrategyFactory
()
:
DefaultSelectStrategyFactory
.
INSTANCE
;
int
poolSize
=
boost
>
0
?
boost
:
Integer
.
parseInt
(
System
.
getProperty
(
"com.couchbase.ioPoolSize"
,
Integer
.
toString
(
DefaultCoreEnvironment
.
IO_POOL_SIZE
))
);
ThreadFactory
threadFactory
=
new
DefaultThreadFactory
(
"cb-io"
,
true
);
EventLoopGroup
group
=
epoll
?
new
EpollEventLoopGroup
(
poolSize
,
threadFactory
,
factory
)
:
new
NioEventLoopGroup
(
poolSize
,
threadFactory
,
SelectorProvider
.
provider
(),
factory
);
builder
.
ioPool
(
group
,
new
IoPoolShutdownHook
(
group
));
env
=
builder
.
build
();
logParams
();
}
}
cluster
=
CouchbaseCluster
.
create
(
env
,
host
);
bucket
=
cluster
.
openBucket
(
bucketName
,
bucketPassword
);
kvTimeout
=
env
.
kvTimeout
();
}
catch
(
Exception
ex
)
{
throw
new
DBException
(
"Could not connect to Couchbase Bucket."
,
ex
);
}
if
(!
kv
&&
!
syncMutResponse
)
{
throw
new
DBException
(
"Not waiting for N1QL responses on mutations not yet implemented."
);
}
}
/**
* Helper method to log the CLI params so that on the command line debugging is easier.
*/
private
void
logParams
()
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"host="
).
append
(
host
);
sb
.
append
(
", bucket="
).
append
(
bucketName
);
sb
.
append
(
", upsert="
).
append
(
upsert
);
sb
.
append
(
", persistTo="
).
append
(
persistTo
);
sb
.
append
(
", replicateTo="
).
append
(
replicateTo
);
sb
.
append
(
", syncMutResponse="
).
append
(
syncMutResponse
);
sb
.
append
(
", adhoc="
).
append
(
adhoc
);
sb
.
append
(
", kv="
).
append
(
kv
);
sb
.
append
(
", maxParallelism="
).
append
(
maxParallelism
);
sb
.
append
(
", queryEndpoints="
).
append
(
queryEndpoints
);
sb
.
append
(
", kvEndpoints="
).
append
(
kvEndpoints
);
sb
.
append
(
", queryEndpoints="
).
append
(
queryEndpoints
);
sb
.
append
(
", epoll="
).
append
(
epoll
);
sb
.
append
(
", boost="
).
append
(
boost
);
sb
.
append
(
", networkMetricsInterval="
).
append
(
networkMetricsInterval
);
sb
.
append
(
", runtimeMetricsInterval="
).
append
(
runtimeMetricsInterval
);
LOGGER
.
info
(
"===> Using Params: "
+
sb
.
toString
());
}
@Override
public
Status
read
(
final
String
table
,
final
String
key
,
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
{
try
{
String
docId
=
formatId
(
table
,
key
);
if
(
kv
)
{
return
readKv
(
docId
,
fields
,
result
);
}
else
{
return
readN1ql
(
docId
,
fields
,
result
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #read(String, String, Set, Map)} operation via Key/Value ("get").
*
* @param docId the document ID
* @param fields the fields to be loaded
* @param result the result map where the doc needs to be converted into
* @return The result of the operation.
*/
private
Status
readKv
(
final
String
docId
,
final
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
throws
Exception
{
RawJsonDocument
loaded
=
bucket
.
get
(
docId
,
RawJsonDocument
.
class
);
if
(
loaded
==
null
)
{
return
Status
.
NOT_FOUND
;
}
decode
(
loaded
.
content
(),
fields
,
result
);
return
Status
.
OK
;
}
/**
* Performs the {@link #read(String, String, Set, Map)} operation via N1QL ("SELECT").
*
* If this option should be used, the "-p couchbase.kv=false" property must be set.
*
* @param docId the document ID
* @param fields the fields to be loaded
* @param result the result map where the doc needs to be converted into
* @return The result of the operation.
*/
private
Status
readN1ql
(
final
String
docId
,
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
result
)
throws
Exception
{
String
readQuery
=
"SELECT "
+
joinFields
(
fields
)
+
" FROM `"
+
bucketName
+
"` USE KEYS [$1]"
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
readQuery
,
JsonArray
.
from
(
docId
),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
DBException
(
"Error while parsing N1QL Result. Query: "
+
readQuery
+
", Errors: "
+
queryResult
.
errors
());
}
N1qlQueryRow
row
;
try
{
row
=
queryResult
.
rows
().
next
();
}
catch
(
NoSuchElementException
ex
)
{
return
Status
.
NOT_FOUND
;
}
JsonObject
content
=
row
.
value
();
if
(
fields
==
null
)
{
content
=
content
.
getObject
(
bucketName
);
// n1ql result set scoped under *.bucketName
fields
=
content
.
getNames
();
}
for
(
String
field
:
fields
)
{
Object
value
=
content
.
get
(
field
);
result
.
put
(
field
,
new
StringByteIterator
(
value
!=
null
?
value
.
toString
()
:
""
));
}
return
Status
.
OK
;
}
@Override
public
Status
update
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
if
(
upsert
)
{
return
upsert
(
table
,
key
,
values
);
}
try
{
String
docId
=
formatId
(
table
,
key
);
if
(
kv
)
{
return
updateKv
(
docId
,
values
);
}
else
{
return
updateN1ql
(
docId
,
values
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #update(String, String, Map)} operation via Key/Value ("replace").
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
updateKv
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
{
waitForMutationResponse
(
bucket
.
async
().
replace
(
RawJsonDocument
.
create
(
docId
,
documentExpiry
,
encode
(
values
)),
persistTo
,
replicateTo
));
return
Status
.
OK
;
}
/**
* Performs the {@link #update(String, String, Map)} operation via N1QL ("UPDATE").
*
* If this option should be used, the "-p couchbase.kv=false" property must be set.
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
updateN1ql
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
throws
Exception
{
String
fields
=
encodeN1qlFields
(
values
);
String
updateQuery
=
"UPDATE `"
+
bucketName
+
"` USE KEYS [$1] SET "
+
fields
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
updateQuery
,
JsonArray
.
from
(
docId
),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
DBException
(
"Error while parsing N1QL Result. Query: "
+
updateQuery
+
", Errors: "
+
queryResult
.
errors
());
}
return
Status
.
OK
;
}
@Override
public
Status
insert
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
if
(
upsert
)
{
return
upsert
(
table
,
key
,
values
);
}
try
{
String
docId
=
formatId
(
table
,
key
);
if
(
kv
)
{
return
insertKv
(
docId
,
values
);
}
else
{
return
insertN1ql
(
docId
,
values
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #insert(String, String, Map)} operation via Key/Value ("INSERT").
*
* Note that during the "load" phase it makes sense to retry TMPFAILS (so that even if the server is
* overloaded temporarily the ops will succeed eventually). The current code will retry TMPFAILs
* for maximum of one minute and then bubble up the error.
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
insertKv
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
{
int
tries
=
60
;
// roughly 60 seconds with the 1 second sleep, not 100% accurate.
for
(
int
i
=
0
;
i
<
tries
;
i
++)
{
try
{
waitForMutationResponse
(
bucket
.
async
().
insert
(
RawJsonDocument
.
create
(
docId
,
documentExpiry
,
encode
(
values
)),
persistTo
,
replicateTo
));
return
Status
.
OK
;
}
catch
(
TemporaryFailureException
ex
)
{
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
"Interrupted while sleeping on TMPFAIL backoff."
,
ex
);
}
}
}
throw
new
RuntimeException
(
"Still receiving TMPFAIL from the server after trying "
+
tries
+
" times. "
+
"Check your server."
);
}
/**
* Performs the {@link #insert(String, String, Map)} operation via N1QL ("INSERT").
*
* If this option should be used, the "-p couchbase.kv=false" property must be set.
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
insertN1ql
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
throws
Exception
{
String
insertQuery
=
"INSERT INTO `"
+
bucketName
+
"`(KEY,VALUE) VALUES ($1,$2)"
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
insertQuery
,
JsonArray
.
from
(
docId
,
valuesToJsonObject
(
values
)),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
DBException
(
"Error while parsing N1QL Result. Query: "
+
insertQuery
+
", Errors: "
+
queryResult
.
errors
());
}
return
Status
.
OK
;
}
/**
* Performs an upsert instead of insert or update using either Key/Value or N1QL.
*
* If this option should be used, the "-p couchbase.upsert=true" property must be set.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return The result of the operation.
*/
private
Status
upsert
(
final
String
table
,
final
String
key
,
final
Map
<
String
,
ByteIterator
>
values
)
{
try
{
String
docId
=
formatId
(
table
,
key
);
if
(
kv
)
{
return
upsertKv
(
docId
,
values
);
}
else
{
return
upsertN1ql
(
docId
,
values
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #upsert(String, String, Map)} operation via Key/Value ("upsert").
*
* If this option should be used, the "-p couchbase.upsert=true" property must be set.
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
upsertKv
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
{
waitForMutationResponse
(
bucket
.
async
().
upsert
(
RawJsonDocument
.
create
(
docId
,
documentExpiry
,
encode
(
values
)),
persistTo
,
replicateTo
));
return
Status
.
OK
;
}
/**
* Performs the {@link #upsert(String, String, Map)} operation via N1QL ("UPSERT").
*
* If this option should be used, the "-p couchbase.upsert=true -p couchbase.kv=false" properties must be set.
*
* @param docId the document ID
* @param values the values to update the document with.
* @return The result of the operation.
*/
private
Status
upsertN1ql
(
final
String
docId
,
final
Map
<
String
,
ByteIterator
>
values
)
throws
Exception
{
String
upsertQuery
=
"UPSERT INTO `"
+
bucketName
+
"`(KEY,VALUE) VALUES ($1,$2)"
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
upsertQuery
,
JsonArray
.
from
(
docId
,
valuesToJsonObject
(
values
)),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
DBException
(
"Error while parsing N1QL Result. Query: "
+
upsertQuery
+
", Errors: "
+
queryResult
.
errors
());
}
return
Status
.
OK
;
}
@Override
public
Status
delete
(
final
String
table
,
final
String
key
)
{
try
{
String
docId
=
formatId
(
table
,
key
);
if
(
kv
)
{
return
deleteKv
(
docId
);
}
else
{
return
deleteN1ql
(
docId
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #delete(String, String)} (String, String)} operation via Key/Value ("remove").
*
* @param docId the document ID.
* @return The result of the operation.
*/
private
Status
deleteKv
(
final
String
docId
)
{
waitForMutationResponse
(
bucket
.
async
().
remove
(
docId
,
persistTo
,
replicateTo
));
return
Status
.
OK
;
}
/**
* Performs the {@link #delete(String, String)} (String, String)} operation via N1QL ("DELETE").
*
* If this option should be used, the "-p couchbase.kv=false" property must be set.
*
* @param docId the document ID.
* @return The result of the operation.
*/
private
Status
deleteN1ql
(
final
String
docId
)
throws
Exception
{
String
deleteQuery
=
"DELETE FROM `"
+
bucketName
+
"` USE KEYS [$1]"
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
deleteQuery
,
JsonArray
.
from
(
docId
),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
DBException
(
"Error while parsing N1QL Result. Query: "
+
deleteQuery
+
", Errors: "
+
queryResult
.
errors
());
}
return
Status
.
OK
;
}
@Override
public
Status
scan
(
final
String
table
,
final
String
startkey
,
final
int
recordcount
,
final
Set
<
String
>
fields
,
final
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
try
{
if
(
fields
==
null
||
fields
.
isEmpty
())
{
return
scanAllFields
(
table
,
startkey
,
recordcount
,
result
);
}
else
{
return
scanSpecificFields
(
table
,
startkey
,
recordcount
,
fields
,
result
);
}
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
return
Status
.
ERROR
;
}
}
/**
* Performs the {@link #scan(String, String, int, Set, Vector)} operation, optimized for all fields.
*
* Since the full document bodies need to be loaded anyways, it makes sense to just grab the document IDs
* from N1QL and then perform the bulk loading via KV for better performance. This is a usual pattern with
* Couchbase and shows the benefits of using both N1QL and KV together.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return The result of the operation.
*/
private
Status
scanAllFields
(
final
String
table
,
final
String
startkey
,
final
int
recordcount
,
final
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
final
List
<
HashMap
<
String
,
ByteIterator
>>
data
=
new
ArrayList
<
HashMap
<
String
,
ByteIterator
>>(
recordcount
);
bucket
.
async
()
.
query
(
N1qlQuery
.
parameterized
(
scanAllQuery
,
JsonArray
.
from
(
formatId
(
table
,
startkey
),
recordcount
),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
))
.
doOnNext
(
new
Action1
<
AsyncN1qlQueryResult
>()
{
@Override
public
void
call
(
AsyncN1qlQueryResult
result
)
{
if
(!
result
.
parseSuccess
())
{
throw
new
RuntimeException
(
"Error while parsing N1QL Result. Query: "
+
scanAllQuery
+
", Errors: "
+
result
.
errors
());
}
}
})
.
flatMap
(
new
Func1
<
AsyncN1qlQueryResult
,
Observable
<
AsyncN1qlQueryRow
>>()
{
@Override
public
Observable
<
AsyncN1qlQueryRow
>
call
(
AsyncN1qlQueryResult
result
)
{
return
result
.
rows
();
}
})
.
flatMap
(
new
Func1
<
AsyncN1qlQueryRow
,
Observable
<
RawJsonDocument
>>()
{
@Override
public
Observable
<
RawJsonDocument
>
call
(
AsyncN1qlQueryRow
row
)
{
String
id
=
new
String
(
row
.
byteValue
()).
trim
();
return
bucket
.
async
().
get
(
id
.
substring
(
1
,
id
.
length
()-
1
),
RawJsonDocument
.
class
);
}
})
.
map
(
new
Func1
<
RawJsonDocument
,
HashMap
<
String
,
ByteIterator
>>()
{
@Override
public
HashMap
<
String
,
ByteIterator
>
call
(
RawJsonDocument
document
)
{
HashMap
<
String
,
ByteIterator
>
tuple
=
new
HashMap
<
String
,
ByteIterator
>();
decode
(
document
.
content
(),
null
,
tuple
);
return
tuple
;
}
})
.
toBlocking
()
.
forEach
(
new
Action1
<
HashMap
<
String
,
ByteIterator
>>()
{
@Override
public
void
call
(
HashMap
<
String
,
ByteIterator
>
tuple
)
{
data
.
add
(
tuple
);
}
});
result
.
addAll
(
data
);
return
Status
.
OK
;
}
/**
* Performs the {@link #scan(String, String, int, Set, Vector)} operation N1Ql only for a subset of the fields.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return The result of the operation.
*/
private
Status
scanSpecificFields
(
final
String
table
,
final
String
startkey
,
final
int
recordcount
,
final
Set
<
String
>
fields
,
final
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
String
scanSpecQuery
=
"SELECT "
+
joinFields
(
fields
)
+
" FROM `"
+
bucketName
+
"` WHERE meta().id >= '$1' LIMIT $2"
;
N1qlQueryResult
queryResult
=
bucket
.
query
(
N1qlQuery
.
parameterized
(
scanSpecQuery
,
JsonArray
.
from
(
formatId
(
table
,
startkey
),
recordcount
),
N1qlParams
.
build
().
adhoc
(
adhoc
).
maxParallelism
(
maxParallelism
)
));
if
(!
queryResult
.
parseSuccess
()
||
!
queryResult
.
finalSuccess
())
{
throw
new
RuntimeException
(
"Error while parsing N1QL Result. Query: "
+
scanSpecQuery
+
", Errors: "
+
queryResult
.
errors
());
}
boolean
allFields
=
fields
==
null
||
fields
.
isEmpty
();
result
.
ensureCapacity
(
recordcount
);
for
(
N1qlQueryRow
row
:
queryResult
)
{
JsonObject
value
=
row
.
value
();
if
(
fields
==
null
)
{
value
=
value
.
getObject
(
bucketName
);
}
Set
<
String
>
f
=
allFields
?
value
.
getNames
()
:
fields
;
HashMap
<
String
,
ByteIterator
>
tuple
=
new
HashMap
<
String
,
ByteIterator
>(
f
.
size
());
for
(
String
field
:
f
)
{
tuple
.
put
(
field
,
new
StringByteIterator
(
value
.
getString
(
field
)));
}
result
.
add
(
tuple
);
}
return
Status
.
OK
;
}
/**
* Helper method to block on the response, depending on the property set.
*
* By default, since YCSB is sync the code will always wait for the operation to complete. In some
* cases it can be useful to just "drive load" and disable the waiting. Note that when the
* "-p couchbase.syncMutationResponse=false" option is used, the measured results by YCSB can basically
* be thrown away. Still helpful sometimes during load phases to speed them up :)
*
* @param input the async input observable.
*/
private
void
waitForMutationResponse
(
final
Observable
<?
extends
Document
<?>>
input
)
{
if
(!
syncMutResponse
)
{
((
Observable
<
Document
<?>>)
input
).
subscribe
(
new
Subscriber
<
Document
<?>>()
{
@Override
public
void
onCompleted
()
{
}
@Override
public
void
onError
(
Throwable
e
)
{
}
@Override
public
void
onNext
(
Document
<?>
document
)
{
}
});
}
else
{
Blocking
.
blockForSingle
(
input
,
kvTimeout
,
TimeUnit
.
MILLISECONDS
);
}
}
/**
* Helper method to turn the values into a String, used with {@link #upsertN1ql(String, Map)}.
*
* @param values the values to encode.
* @return the encoded string.
*/
private
static
String
encodeN1qlFields
(
final
Map
<
String
,
ByteIterator
>
values
)
{
if
(
values
.
isEmpty
())
{
return
""
;
}
StringBuilder
sb
=
new
StringBuilder
();
for
(
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
String
raw
=
entry
.
getValue
().
toString
();
String
escaped
=
raw
.
replace
(
"\""
,
"\\\""
).
replace
(
"\'"
,
"\\\'"
);
sb
.
append
(
entry
.
getKey
()).
append
(
"=\""
).
append
(
escaped
).
append
(
"\" "
);
}
String
toReturn
=
sb
.
toString
();
return
toReturn
.
substring
(
0
,
toReturn
.
length
()
-
1
);
}
/**
* Helper method to turn the map of values into a {@link JsonObject} for further use.
*
* @param values the values to transform.
* @return the created json object.
*/
private
static
JsonObject
valuesToJsonObject
(
final
Map
<
String
,
ByteIterator
>
values
)
{
JsonObject
result
=
JsonObject
.
create
();
for
(
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
result
.
put
(
entry
.
getKey
(),
entry
.
getValue
().
toString
());
}
return
result
;
}
/**
* Helper method to join the set of fields into a String suitable for N1QL.
*
* @param fields the fields to join.
* @return the joined fields as a String.
*/
private
static
String
joinFields
(
final
Set
<
String
>
fields
)
{
if
(
fields
==
null
||
fields
.
isEmpty
())
{
return
"*"
;
}
StringBuilder
builder
=
new
StringBuilder
();
for
(
String
f
:
fields
)
{
builder
.
append
(
"`"
).
append
(
f
).
append
(
"`"
).
append
(
","
);
}
String
toReturn
=
builder
.
toString
();
return
toReturn
.
substring
(
0
,
toReturn
.
length
()
-
1
);
}
/**
* Helper method to turn the prefix and key into a proper document ID.
*
* @param prefix the prefix (table).
* @param key the key itself.
* @return a document ID that can be used with Couchbase.
*/
private
static
String
formatId
(
final
String
prefix
,
final
String
key
)
{
return
prefix
+
SEPARATOR
+
key
;
}
/**
* Helper method to parse the "ReplicateTo" property on startup.
*
* @param property the proeprty to parse.
* @return the parsed setting.
*/
private
static
ReplicateTo
parseReplicateTo
(
final
String
property
)
throws
DBException
{
int
value
=
Integer
.
parseInt
(
property
);
switch
(
value
)
{
case
0
:
return
ReplicateTo
.
NONE
;
case
1
:
return
ReplicateTo
.
ONE
;
case
2
:
return
ReplicateTo
.
TWO
;
case
3
:
return
ReplicateTo
.
THREE
;
default
:
throw
new
DBException
(
"\"couchbase.replicateTo\" must be between 0 and 3"
);
}
}
/**
* Helper method to parse the "PersistTo" property on startup.
*
* @param property the proeprty to parse.
* @return the parsed setting.
*/
private
static
PersistTo
parsePersistTo
(
final
String
property
)
throws
DBException
{
int
value
=
Integer
.
parseInt
(
property
);
switch
(
value
)
{
case
0
:
return
PersistTo
.
NONE
;
case
1
:
return
PersistTo
.
ONE
;
case
2
:
return
PersistTo
.
TWO
;
case
3
:
return
PersistTo
.
THREE
;
case
4
:
return
PersistTo
.
FOUR
;
default
:
throw
new
DBException
(
"\"couchbase.persistTo\" must be between 0 and 4"
);
}
}
/**
* Decode the String from server and pass it into the decoded destination.
*
* @param source the loaded object.
* @param fields the fields to check.
* @param dest the result passed back to YCSB.
*/
private
void
decode
(
final
String
source
,
final
Set
<
String
>
fields
,
final
Map
<
String
,
ByteIterator
>
dest
)
{
try
{
JsonNode
json
=
JacksonTransformers
.
MAPPER
.
readTree
(
source
);
boolean
checkFields
=
fields
!=
null
&&
!
fields
.
isEmpty
();
for
(
Iterator
<
Map
.
Entry
<
String
,
JsonNode
>>
jsonFields
=
json
.
fields
();
jsonFields
.
hasNext
();)
{
Map
.
Entry
<
String
,
JsonNode
>
jsonField
=
jsonFields
.
next
();
String
name
=
jsonField
.
getKey
();
if
(
checkFields
&&
!
fields
.
contains
(
name
))
{
continue
;
}
JsonNode
jsonValue
=
jsonField
.
getValue
();
if
(
jsonValue
!=
null
&&
!
jsonValue
.
isNull
())
{
dest
.
put
(
name
,
new
StringByteIterator
(
jsonValue
.
asText
()));
}
}
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Could not decode JSON"
);
}
}
/**
* Encode the source into a String for storage.
*
* @param source the source value.
* @return the encoded string.
*/
private
String
encode
(
final
Map
<
String
,
ByteIterator
>
source
)
{
Map
<
String
,
String
>
stringMap
=
StringByteIterator
.
getStringMap
(
source
);
ObjectNode
node
=
JacksonTransformers
.
MAPPER
.
createObjectNode
();
for
(
Map
.
Entry
<
String
,
String
>
pair
:
stringMap
.
entrySet
())
{
node
.
put
(
pair
.
getKey
(),
pair
.
getValue
());
}
JsonFactory
jsonFactory
=
new
JsonFactory
();
Writer
writer
=
new
StringWriter
();
try
{
JsonGenerator
jsonGenerator
=
jsonFactory
.
createGenerator
(
writer
);
JacksonTransformers
.
MAPPER
.
writeTree
(
jsonGenerator
,
node
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Could not encode JSON value"
);
}
return
writer
.
toString
();
}
}
/**
* Factory for the {@link BackoffSelectStrategy} to be used with boosting.
*/
class
BackoffSelectStrategyFactory
implements
SelectStrategyFactory
{
@Override
public
SelectStrategy
newSelectStrategy
()
{
return
new
BackoffSelectStrategy
();
}
}
/**
* Custom IO select strategy which trades CPU for throughput, used with the boost setting.
*/
class
BackoffSelectStrategy
implements
SelectStrategy
{
private
int
counter
=
0
;
@Override
public
int
calculateStrategy
(
final
IntSupplier
supplier
,
final
boolean
hasTasks
)
throws
Exception
{
int
selectNowResult
=
supplier
.
get
();
if
(
hasTasks
||
selectNowResult
!=
0
)
{
counter
=
0
;
return
selectNowResult
;
}
counter
++;
if
(
counter
>
2000
)
{
LockSupport
.
parkNanos
(
1
);
}
else
if
(
counter
>
3000
)
{
Thread
.
yield
();
}
else
if
(
counter
>
4000
)
{
LockSupport
.
parkNanos
(
1000
);
}
else
if
(
counter
>
5000
)
{
// defer to blocking select
counter
=
0
;
return
SelectStrategy
.
SELECT
;
}
return
SelectStrategy
.
CONTINUE
;
}
}
Event Timeline
Log In to Comment