diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index cfed5691..dfcb1810 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -1,60 +1,77 @@
4.0.0
com.yahoo.ycsb
binding-parent
0.6.0-SNAPSHOT
../binding-parent
elasticsearch-binding
ElasticSearch Binding
jar
0.19.8
-
-
- sonatype-nexus-snapshots
- Sonatype releases
- https://oss.sonatype.org/content/repositories/releases
-
-
com.yahoo.ycsb
core
${project.version}
provided
org.elasticsearch
elasticsearch
${elasticsearch-version}
org.testng
testng
6.1.1
test
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.15
+
+ true
+ ../checkstyle.xml
+ true
+ true
+
+
+
+ validate
+ validate
+
+ checkstyle
+
+
+
+
+
+
diff --git a/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticSearchClient.java b/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticSearchClient.java
index 927b304c..49c2c604 100644
--- a/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticSearchClient.java
+++ b/elasticsearch/src/main/java/com/yahoo/ycsb/db/ElasticSearchClient.java
@@ -1,316 +1,345 @@
/**
* Copyright (c) 2012 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
-import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.RangeFilterBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
/**
* ElasticSearch client for YCSB framework.
*
- *
Default properties to set:
- es.cluster.name = es.ycsb.cluster
- *
- es.client = true
- es.index.key = es.ycsb
+ *
+ * Default properties to set:
+ *
+ *
+ * - es.cluster.name = es.ycsb.cluster
+ *
- es.client = true
+ *
- es.index.key = es.ycsb
+ *
*
* @author Sharmarke Aden
*
*/
public class ElasticSearchClient extends DB {
- public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
- public static final String DEFAULT_INDEX_KEY = "es.ycsb";
- public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
- private Node node;
- private Client client;
- private String indexKey;
-
- private Boolean remoteMode;
-
- /**
- * Initialize any state for this DB. Called once per DB instance; there is
- * one DB instance per client thread.
- */
- @Override
- public void init() throws DBException {
- // initialize OrientDB driver
- Properties props = getProperties();
- this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
- String clusterName = props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
- //Check if transport client needs to be used (To connect to multiple elasticsearch nodes)
- remoteMode = Boolean.parseBoolean(props.getProperty("elasticsearch.remote", "false"));
- Boolean newdb = Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
- Builder settings = settingsBuilder()
- .put("node.local", "true")
- .put("path.data", System.getProperty("java.io.tmpdir") + "/esdata")
- .put("discovery.zen.ping.multicast.enabled", "false")
- .put("index.mapping._id.indexed", "true")
- .put("index.gateway.type", "none")
- .put("gateway.type", "none")
- .put("index.number_of_shards", "1")
- .put("index.number_of_replicas", "0");
-
-
- //if properties file contains elasticsearch user defined properties
- //add it to the settings file (will overwrite the defaults).
- settings.put(props);
- System.out.println("ElasticSearch starting node = " + settings.get("cluster.name"));
- System.out.println("ElasticSearch node data path = " + settings.get("path.data"));
- System.out.println("ElasticSearch Remote Mode = " +remoteMode);
- //Remote mode support for connecting to remote elasticsearch cluster
- if(remoteMode) {
- settings.put("client.transport.sniff", true)
- .put("client.transport.ignore_cluster_name", false)
- .put("client.transport.ping_timeout", "30s")
- .put("client.transport.nodes_sampler_interval", "30s");
- //Default it to localhost:9300
- String nodeList[] = props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST).split(",");
- System.out.println("ElasticSearch Remote Hosts = " +props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
- TransportClient tClient = new TransportClient(settings);
- for(String h : nodeList) {
- String node[] = h.split(":");
- tClient.addTransportAddress(new InetSocketTransportAddress(node[0], Integer.parseInt(node[1])));
- }
- client = tClient;
- } else { //Start node only if transport client mode is disabled
- node = nodeBuilder().clusterName(clusterName).settings(settings).node();
- node.start();
- client = node.client();
- }
-
-
- if (newdb) {
- client.admin().indices().prepareDelete(indexKey).execute().actionGet();
- client.admin().indices().prepareCreate(indexKey).execute().actionGet();
- } else {
- boolean exists = client.admin().indices().exists(Requests.indicesExistsRequest(indexKey)).actionGet().isExists();
- if (!exists) {
- client.admin().indices().prepareCreate(indexKey).execute().actionGet();
- }
- }
+ public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
+ public static final String DEFAULT_INDEX_KEY = "es.ycsb";
+ public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
+ private Node node;
+ private Client client;
+ private String indexKey;
+
+ private Boolean remoteMode;
+
+ /**
+ * Initialize any state for this DB. Called once per DB instance; there is one
+ * DB instance per client thread.
+ */
+ @Override
+ public void init() throws DBException {
+ // initialize OrientDB driver
+ Properties props = getProperties();
+ this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
+ String clusterName =
+ props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
+ // Check if transport client needs to be used (To connect to multiple
+ // elasticsearch nodes)
+ remoteMode = Boolean
+ .parseBoolean(props.getProperty("elasticsearch.remote", "false"));
+ Boolean newdb =
+ Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
+ Builder settings = settingsBuilder().put("node.local", "true")
+ .put("path.data", System.getProperty("java.io.tmpdir") + "/esdata")
+ .put("discovery.zen.ping.multicast.enabled", "false")
+ .put("index.mapping._id.indexed", "true")
+ .put("index.gateway.type", "none").put("gateway.type", "none")
+ .put("index.number_of_shards", "1")
+ .put("index.number_of_replicas", "0");
+
+ // if properties file contains elasticsearch user defined properties
+ // add it to the settings file (will overwrite the defaults).
+ settings.put(props);
+ System.out.println(
+ "ElasticSearch starting node = " + settings.get("cluster.name"));
+ System.out
+ .println("ElasticSearch node data path = " + settings.get("path.data"));
+ System.out.println("ElasticSearch Remote Mode = " + remoteMode);
+ // Remote mode support for connecting to remote elasticsearch cluster
+ if (remoteMode) {
+ settings.put("client.transport.sniff", true)
+ .put("client.transport.ignore_cluster_name", false)
+ .put("client.transport.ping_timeout", "30s")
+ .put("client.transport.nodes_sampler_interval", "30s");
+ // Default it to localhost:9300
+ String[] nodeList =
+ props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST)
+ .split(",");
+ System.out.println("ElasticSearch Remote Hosts = "
+ + props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
+ TransportClient tClient = new TransportClient(settings);
+ for (String h : nodeList) {
+ String[] nodes = h.split(":");
+ tClient.addTransportAddress(
+ new InetSocketTransportAddress(nodes[0],
+ Integer.parseInt(nodes[1])));
+ }
+ client = tClient;
+ } else { // Start node only if transport client mode is disabled
+ node = nodeBuilder().clusterName(clusterName).settings(settings).node();
+ node.start();
+ client = node.client();
}
- @Override
- public void cleanup() throws DBException {
- if(!remoteMode) {
- if (!node.isClosed()) {
- client.close();
- node.stop();
- node.close();
- }
- } else {
- client.close();
- }
+ if (newdb) {
+ client.admin().indices().prepareDelete(indexKey).execute().actionGet();
+ client.admin().indices().prepareCreate(indexKey).execute().actionGet();
+ } else {
+ boolean exists = client.admin().indices()
+ .exists(Requests.indicesExistsRequest(indexKey)).actionGet()
+ .isExists();
+ if (!exists) {
+ client.admin().indices().prepareCreate(indexKey).execute().actionGet();
+ }
}
-
- /**
- * Insert a record in the database. Any field/value pairs in the specified
- * values HashMap will be written into the record with the specified record
- * key.
- *
- * @param table The name of the table
- * @param key The record key of the record to insert.
- * @param values A HashMap of field/value pairs to insert in the record
- * @return Zero on success, a non-zero error code on error. See this class's
- * description for a discussion of error codes.
- */
- @Override
- public Status insert(String table, String key, HashMap values) {
- try {
- final XContentBuilder doc = jsonBuilder().startObject();
-
- for (Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
- doc.field(entry.getKey(), entry.getValue());
- }
-
- doc.endObject();
-
- client.prepareIndex(indexKey, table, key)
- .setSource(doc)
- .execute()
- .actionGet();
-
- return Status.OK;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return Status.ERROR;
+ }
+
+ @Override
+ public void cleanup() throws DBException {
+ if (!remoteMode) {
+ if (!node.isClosed()) {
+ client.close();
+ node.stop();
+ node.close();
+ }
+ } else {
+ client.close();
}
-
- /**
- * Delete a record from the database.
- *
- * @param table The name of the table
- * @param key The record key of the record to delete.
- * @return Zero on success, a non-zero error code on error. See this class's
- * description for a discussion of error codes.
- */
- @Override
- public Status delete(String table, String key) {
- try {
- client.prepareDelete(indexKey, table, key)
- .execute()
- .actionGet();
- return Status.OK;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return Status.ERROR;
+ }
+
+ /**
+ * Insert a record in the database. Any field/value pairs in the specified
+ * values HashMap will be written into the record with the specified record
+ * key.
+ *
+ * @param table
+ * The name of the table
+ * @param key
+ * The record key of the record to insert.
+ * @param values
+ * A HashMap of field/value pairs to insert in the record
+ * @return Zero on success, a non-zero error code on error. See this class's
+ * description for a discussion of error codes.
+ */
+ @Override
+ public Status insert(String table, String key,
+ HashMap values) {
+ try {
+ final XContentBuilder doc = jsonBuilder().startObject();
+
+ for (Entry entry : StringByteIterator.getStringMap(values)
+ .entrySet()) {
+ doc.field(entry.getKey(), entry.getValue());
+ }
+
+ doc.endObject();
+
+ client.prepareIndex(indexKey, table, key).setSource(doc).execute()
+ .actionGet();
+
+ return Status.OK;
+ } catch (Exception e) {
+ e.printStackTrace();
}
-
- /**
- * Read a record from the database. Each field/value pair from the result
- * will be stored in a HashMap.
- *
- * @param table The name of the table
- * @param key The record key of the record to read.
- * @param fields The list of fields to read, or null for all of them
- * @param result A HashMap of field/value pairs for the result
- * @return Zero on success, a non-zero error code on error or "not found".
- */
- @Override
- public Status read(String table, String key, Set fields, HashMap result) {
- try {
- final GetResponse response = client.prepareGet(indexKey, table, key)
- .execute()
- .actionGet();
-
- if (response.isExists()) {
- if (fields != null) {
- for (String field : fields) {
- result.put(field, new StringByteIterator((String) response.getSource().get(field)));
- }
- } else {
- for (String field : response.getSource().keySet()) {
- result.put(field, new StringByteIterator((String) response.getSource().get(field)));
- }
- }
- return Status.OK;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return Status.ERROR;
+ return Status.ERROR;
+ }
+
+ /**
+ * Delete a record from the database.
+ *
+ * @param table
+ * The name of the table
+ * @param key
+ * The record key of the record to delete.
+ * @return Zero on success, a non-zero error code on error. See this class's
+ * description for a discussion of error codes.
+ */
+ @Override
+ public Status delete(String table, String key) {
+ try {
+ client.prepareDelete(indexKey, table, key).execute().actionGet();
+ return Status.OK;
+ } catch (Exception e) {
+ e.printStackTrace();
}
-
- /**
- * Update a record in the database. Any field/value pairs in the specified
- * values HashMap will be written into the record with the specified record
- * key, overwriting any existing values with the same field name.
- *
- * @param table The name of the table
- * @param key The record key of the record to write.
- * @param values A HashMap of field/value pairs to update in the record
- * @return Zero on success, a non-zero error code on error. See this class's
- * description for a discussion of error codes.
- */
- @Override
- public Status update(String table, String key, HashMap values) {
- try {
- final GetResponse response = client.prepareGet(indexKey, table, key)
- .execute()
- .actionGet();
-
- if (response.isExists()) {
- for (Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
- response.getSource().put(entry.getKey(), entry.getValue());
- }
-
- client.prepareIndex(indexKey, table, key)
- .setSource(response.getSource())
- .execute()
- .actionGet();
-
- return Status.OK;
- }
-
- } catch (Exception e) {
- e.printStackTrace();
+ return Status.ERROR;
+ }
+
+ /**
+ * Read a record from the database. Each field/value pair from the result will
+ * be stored in a HashMap.
+ *
+ * @param table
+ * The name of the table
+ * @param key
+ * The record key of the record to read.
+ * @param fields
+ * The list of fields to read, or null for all of them
+ * @param result
+ * A HashMap of field/value pairs for the result
+ * @return Zero on success, a non-zero error code on error or "not found".
+ */
+ @Override
+ public Status read(String table, String key, Set fields,
+ HashMap result) {
+ try {
+ final GetResponse response =
+ client.prepareGet(indexKey, table, key).execute().actionGet();
+
+ if (response.isExists()) {
+ if (fields != null) {
+ for (String field : fields) {
+ result.put(field, new StringByteIterator(
+ (String) response.getSource().get(field)));
+ }
+ } else {
+ for (String field : response.getSource().keySet()) {
+ result.put(field, new StringByteIterator(
+ (String) response.getSource().get(field)));
+ }
}
- return Status.ERROR;
+ return Status.OK;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ return Status.ERROR;
+ }
+
+ /**
+ * Update a record in the database. Any field/value pairs in the specified
+ * values HashMap will be written into the record with the specified record
+ * key, overwriting any existing values with the same field name.
+ *
+ * @param table
+ * The name of the table
+ * @param key
+ * The record key of the record to write.
+ * @param values
+ * A HashMap of field/value pairs to update in the record
+ * @return Zero on success, a non-zero error code on error. See this class's
+ * description for a discussion of error codes.
+ */
+ @Override
+ public Status update(String table, String key,
+ HashMap values) {
+ try {
+ final GetResponse response =
+ client.prepareGet(indexKey, table, key).execute().actionGet();
+
+ if (response.isExists()) {
+ for (Entry entry : StringByteIterator
+ .getStringMap(values).entrySet()) {
+ response.getSource().put(entry.getKey(), entry.getValue());
+ }
- /**
- * Perform a range scan for a set of records in the database. Each
- * field/value pair from the result will be stored in a HashMap.
- *
- * @param table The name of the table
- * @param startkey The record key of the first record to read.
- * @param recordcount The number of records to read
- * @param fields The list of fields to read, or null for all of them
- * @param result A Vector of HashMaps, where each HashMap is a set
- * field/value pairs for one record
- * @return Zero on success, a non-zero error code on error. See this class's
- * description for a discussion of error codes.
- */
- @Override
- public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) {
- try {
- final RangeFilterBuilder filter = rangeFilter("_id").gte(startkey);
- final SearchResponse response = client.prepareSearch(indexKey)
- .setTypes(table)
- .setQuery(matchAllQuery())
- .setFilter(filter)
- .setSize(recordcount)
- .execute()
- .actionGet();
-
- HashMap entry;
+ client.prepareIndex(indexKey, table, key)
+ .setSource(response.getSource()).execute().actionGet();
- for (SearchHit hit : response.getHits()) {
- entry = new HashMap(fields.size());
+ return Status.OK;
+ }
- for (String field : fields) {
- entry.put(field, new StringByteIterator((String) hit.getSource().get(field)));
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return Status.ERROR;
+ }
+
+ /**
+ * Perform a range scan for a set of records in the database. Each field/value
+ * pair from the result will be stored in a HashMap.
+ *
+ * @param table
+ * The name of the table
+ * @param startkey
+ * The record key of the first record to read.
+ * @param recordcount
+ * The number of records to read
+ * @param fields
+ * The list of fields to read, or null for all of them
+ * @param result
+ * A Vector of HashMaps, where each HashMap is a set field/value
+ * pairs for one record
+ * @return Zero on success, a non-zero error code on error. See this class's
+ * description for a discussion of error codes.
+ */
+ @Override
+ public Status scan(String table, String startkey, int recordcount,
+ Set fields, Vector> result) {
+ try {
+ final RangeFilterBuilder filter = rangeFilter("_id").gte(startkey);
+ final SearchResponse response = client.prepareSearch(indexKey)
+ .setTypes(table).setQuery(matchAllQuery()).setFilter(filter)
+ .setSize(recordcount).execute().actionGet();
+
+ HashMap entry;
+
+ for (SearchHit hit : response.getHits()) {
+ entry = new HashMap(fields.size());
+
+ for (String field : fields) {
+ entry.put(field,
+ new StringByteIterator((String) hit.getSource().get(field)));
+ }
- result.add(entry);
- }
+ result.add(entry);
+ }
- return Status.OK;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return Status.ERROR;
+ return Status.OK;
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ return Status.ERROR;
+ }
}
diff --git a/elasticsearch/src/main/java/com/yahoo/ycsb/db/package-info.java b/elasticsearch/src/main/java/com/yahoo/ycsb/db/package-info.java
new file mode 100644
index 00000000..5b831713
--- /dev/null
+++ b/elasticsearch/src/main/java/com/yahoo/ycsb/db/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for
+ * Elasticsearch.
+ */
+package com.yahoo.ycsb.db;
+