Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F65244432
CassandraCQLClient.java
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Jun 2, 04:40
Size
15 KB
Mime Type
text/x-java
Expires
Tue, Jun 4, 04:40 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
18035278
Attached To
R7507 YCSB
CassandraCQLClient.java
View Options
/**
* Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License. See accompanying LICENSE file.
*
* Submitted by Chrisjan Matser on 10/11/2010.
*/
package
com.yahoo.ycsb.db
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ColumnDefinitions
;
import
com.datastax.driver.core.ConsistencyLevel
;
import
com.datastax.driver.core.Host
;
import
com.datastax.driver.core.HostDistance
;
import
com.datastax.driver.core.Metadata
;
import
com.datastax.driver.core.ResultSet
;
import
com.datastax.driver.core.Row
;
import
com.datastax.driver.core.Session
;
import
com.datastax.driver.core.SimpleStatement
;
import
com.datastax.driver.core.Statement
;
import
com.datastax.driver.core.querybuilder.Insert
;
import
com.datastax.driver.core.querybuilder.QueryBuilder
;
import
com.datastax.driver.core.querybuilder.Select
;
import
com.yahoo.ycsb.ByteArrayByteIterator
;
import
com.yahoo.ycsb.ByteIterator
;
import
com.yahoo.ycsb.DB
;
import
com.yahoo.ycsb.DBException
;
import
java.nio.ByteBuffer
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.Vector
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
* Cassandra 2.x CQL client.
*
* See {@code cassandra2/README.md} for details.
*
* @author cmatser
*/
public
class
CassandraCQLClient
extends
DB
{
protected
static
Cluster
cluster
=
null
;
protected
static
Session
session
=
null
;
private
static
ConsistencyLevel
readConsistencyLevel
=
ConsistencyLevel
.
ONE
;
private
static
ConsistencyLevel
writeConsistencyLevel
=
ConsistencyLevel
.
ONE
;
public
static
final
int
OK
=
0
;
public
static
final
int
ERR
=
-
1
;
public
static
final
int
NOT_FOUND
=
-
3
;
public
static
final
String
YCSB_KEY
=
"y_id"
;
public
static
final
String
KEYSPACE_PROPERTY
=
"cassandra.keyspace"
;
public
static
final
String
KEYSPACE_PROPERTY_DEFAULT
=
"ycsb"
;
public
static
final
String
USERNAME_PROPERTY
=
"cassandra.username"
;
public
static
final
String
PASSWORD_PROPERTY
=
"cassandra.password"
;
public
static
final
String
HOSTS_PROPERTY
=
"hosts"
;
public
static
final
String
PORT_PROPERTY
=
"port"
;
public
static
final
String
READ_CONSISTENCY_LEVEL_PROPERTY
=
"cassandra.readconsistencylevel"
;
public
static
final
String
READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT
=
"ONE"
;
public
static
final
String
WRITE_CONSISTENCY_LEVEL_PROPERTY
=
"cassandra.writeconsistencylevel"
;
public
static
final
String
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT
=
"ONE"
;
/** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
private
static
final
AtomicInteger
initCount
=
new
AtomicInteger
(
0
);
private
static
boolean
_debug
=
false
;
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public
void
init
()
throws
DBException
{
//Keep track of number of calls to init (for later cleanup)
initCount
.
incrementAndGet
();
//Synchronized so that we only have a single
// cluster/session instance for all the threads.
synchronized
(
initCount
)
{
//Check if the cluster has already been initialized
if
(
cluster
!=
null
)
{
return
;
}
try
{
_debug
=
Boolean
.
parseBoolean
(
getProperties
().
getProperty
(
"debug"
,
"false"
));
String
host
=
getProperties
().
getProperty
(
HOSTS_PROPERTY
);
if
(
host
==
null
)
{
throw
new
DBException
(
String
.
format
(
"Required property \"%s\" missing for CassandraCQLClient"
,
HOSTS_PROPERTY
));
}
String
hosts
[]
=
host
.
split
(
","
);
String
port
=
getProperties
().
getProperty
(
"port"
,
"9042"
);
if
(
port
==
null
)
{
throw
new
DBException
(
String
.
format
(
"Required property \"%s\" missing for CassandraCQLClient"
,
PORT_PROPERTY
));
}
String
username
=
getProperties
().
getProperty
(
USERNAME_PROPERTY
);
String
password
=
getProperties
().
getProperty
(
PASSWORD_PROPERTY
);
String
keyspace
=
getProperties
().
getProperty
(
KEYSPACE_PROPERTY
,
KEYSPACE_PROPERTY_DEFAULT
);
readConsistencyLevel
=
ConsistencyLevel
.
valueOf
(
getProperties
().
getProperty
(
READ_CONSISTENCY_LEVEL_PROPERTY
,
READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT
));
writeConsistencyLevel
=
ConsistencyLevel
.
valueOf
(
getProperties
().
getProperty
(
WRITE_CONSISTENCY_LEVEL_PROPERTY
,
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT
));
// public void connect(String node) {}
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
cluster
=
Cluster
.
builder
()
.
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
).
build
();
}
else
{
cluster
=
Cluster
.
builder
()
.
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
).
build
();
}
//Update number of connections based on threads
int
threadcount
=
Integer
.
parseInt
(
getProperties
().
getProperty
(
"threadcount"
,
"1"
));
cluster
.
getConfiguration
().
getPoolingOptions
().
setMaxConnectionsPerHost
(
HostDistance
.
LOCAL
,
threadcount
);
//Set connection timeout 3min (default is 5s)
cluster
.
getConfiguration
().
getSocketOptions
().
setConnectTimeoutMillis
(
3
*
60
*
1000
);
//Set read (execute) timeout 3min (default is 12s)
cluster
.
getConfiguration
().
getSocketOptions
().
setReadTimeoutMillis
(
3
*
60
*
1000
);
Metadata
metadata
=
cluster
.
getMetadata
();
System
.
err
.
printf
(
"Connected to cluster: %s\n"
,
metadata
.
getClusterName
());
for
(
Host
discoveredHost
:
metadata
.
getAllHosts
())
{
System
.
out
.
printf
(
"Datacenter: %s; Host: %s; Rack: %s\n"
,
discoveredHost
.
getDatacenter
(),
discoveredHost
.
getAddress
(),
discoveredHost
.
getRack
());
}
session
=
cluster
.
connect
(
keyspace
);
}
catch
(
Exception
e
)
{
throw
new
DBException
(
e
);
}
}
//synchronized
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public
void
cleanup
()
throws
DBException
{
synchronized
(
initCount
)
{
final
int
curInitCount
=
initCount
.
decrementAndGet
();
if
(
curInitCount
<=
0
)
{
session
.
close
();
cluster
.
close
();
cluster
=
null
;
session
=
null
;
}
if
(
curInitCount
<
0
)
{
// This should never happen.
throw
new
DBException
(
String
.
format
(
"initCount is negative: %d"
,
curInitCount
));
}
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
@Override
public
int
read
(
String
table
,
String
key
,
Set
<
String
>
fields
,
HashMap
<
String
,
ByteIterator
>
result
)
{
try
{
Statement
stmt
;
Select
.
Builder
selectBuilder
;
if
(
fields
==
null
)
{
selectBuilder
=
QueryBuilder
.
select
().
all
();
}
else
{
selectBuilder
=
QueryBuilder
.
select
();
for
(
String
col
:
fields
)
{
((
Select
.
Selection
)
selectBuilder
).
column
(
col
);
}
}
stmt
=
selectBuilder
.
from
(
table
).
where
(
QueryBuilder
.
eq
(
YCSB_KEY
,
key
)).
limit
(
1
);
stmt
.
setConsistencyLevel
(
readConsistencyLevel
);
if
(
_debug
)
{
System
.
out
.
println
(
stmt
.
toString
());
}
ResultSet
rs
=
session
.
execute
(
stmt
);
if
(
rs
.
isExhausted
())
{
return
NOT_FOUND
;
}
//Should be only 1 row
Row
row
=
rs
.
one
();
ColumnDefinitions
cd
=
row
.
getColumnDefinitions
();
for
(
ColumnDefinitions
.
Definition
def
:
cd
)
{
ByteBuffer
val
=
row
.
getBytesUnsafe
(
def
.
getName
());
if
(
val
!=
null
)
{
result
.
put
(
def
.
getName
(),
new
ByteArrayByteIterator
(
val
.
array
()));
}
else
{
result
.
put
(
def
.
getName
(),
null
);
}
}
return
OK
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"Error reading key: "
+
key
);
return
ERR
;
}
}
/**
* Perform a range scan for a set of records in the database. Each
* field/value pair from the result will be stored in a HashMap.
*
* Cassandra CQL uses "token" method for range scan which doesn't always
* yield intuitive results.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set
* field/value pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public
int
scan
(
String
table
,
String
startkey
,
int
recordcount
,
Set
<
String
>
fields
,
Vector
<
HashMap
<
String
,
ByteIterator
>>
result
)
{
try
{
Statement
stmt
;
Select
.
Builder
selectBuilder
;
if
(
fields
==
null
)
{
selectBuilder
=
QueryBuilder
.
select
().
all
();
}
else
{
selectBuilder
=
QueryBuilder
.
select
();
for
(
String
col
:
fields
)
{
((
Select
.
Selection
)
selectBuilder
).
column
(
col
);
}
}
stmt
=
selectBuilder
.
from
(
table
);
//The statement builder is not setup right for tokens.
// So, we need to build it manually.
String
initialStmt
=
stmt
.
toString
();
StringBuilder
scanStmt
=
new
StringBuilder
();
scanStmt
.
append
(
initialStmt
.
substring
(
0
,
initialStmt
.
length
()-
1
));
scanStmt
.
append
(
" WHERE "
);
scanStmt
.
append
(
QueryBuilder
.
token
(
YCSB_KEY
));
scanStmt
.
append
(
" >= "
);
scanStmt
.
append
(
"token('"
);
scanStmt
.
append
(
startkey
);
scanStmt
.
append
(
"')"
);
scanStmt
.
append
(
" LIMIT "
);
scanStmt
.
append
(
recordcount
);
stmt
=
new
SimpleStatement
(
scanStmt
.
toString
());
stmt
.
setConsistencyLevel
(
readConsistencyLevel
);
if
(
_debug
)
{
System
.
out
.
println
(
stmt
.
toString
());
}
ResultSet
rs
=
session
.
execute
(
stmt
);
HashMap
<
String
,
ByteIterator
>
tuple
;
while
(!
rs
.
isExhausted
())
{
Row
row
=
rs
.
one
();
tuple
=
new
HashMap
<
String
,
ByteIterator
>
();
ColumnDefinitions
cd
=
row
.
getColumnDefinitions
();
for
(
ColumnDefinitions
.
Definition
def
:
cd
)
{
ByteBuffer
val
=
row
.
getBytesUnsafe
(
def
.
getName
());
if
(
val
!=
null
)
{
tuple
.
put
(
def
.
getName
(),
new
ByteArrayByteIterator
(
val
.
array
()));
}
else
{
tuple
.
put
(
def
.
getName
(),
null
);
}
}
result
.
add
(
tuple
);
}
return
OK
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"Error scanning with startkey: "
+
startkey
);
return
ERR
;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public
int
update
(
String
table
,
String
key
,
HashMap
<
String
,
ByteIterator
>
values
)
{
//Insert and updates provide the same functionality
return
insert
(
table
,
key
,
values
);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public
int
insert
(
String
table
,
String
key
,
HashMap
<
String
,
ByteIterator
>
values
)
{
try
{
Insert
insertStmt
=
QueryBuilder
.
insertInto
(
table
);
//Add key
insertStmt
.
value
(
YCSB_KEY
,
key
);
//Add fields
for
(
Map
.
Entry
<
String
,
ByteIterator
>
entry
:
values
.
entrySet
())
{
Object
value
;
ByteIterator
byteIterator
=
entry
.
getValue
();
value
=
byteIterator
.
toString
();
insertStmt
.
value
(
entry
.
getKey
(),
value
);
}
insertStmt
.
setConsistencyLevel
(
writeConsistencyLevel
).
enableTracing
();
if
(
_debug
)
{
System
.
out
.
println
(
insertStmt
.
toString
());
}
ResultSet
rs
=
session
.
execute
(
insertStmt
);
return
OK
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
ERR
;
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public
int
delete
(
String
table
,
String
key
)
{
try
{
Statement
stmt
;
stmt
=
QueryBuilder
.
delete
().
from
(
table
).
where
(
QueryBuilder
.
eq
(
YCSB_KEY
,
key
));
stmt
.
setConsistencyLevel
(
writeConsistencyLevel
);
if
(
_debug
)
{
System
.
out
.
println
(
stmt
.
toString
());
}
ResultSet
rs
=
session
.
execute
(
stmt
);
return
OK
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"Error deleting key: "
+
key
);
}
return
ERR
;
}
}
Event Timeline
Log In to Comment