diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java
new file mode 100644
index 00000000..a67e0447
--- /dev/null
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.elasticsearch5;
+
+import java.util.Properties;
+
+final class Elasticsearch5 {
+
+ private Elasticsearch5() {
+
+ }
+
+ static final String KEY = "key";
+
+ static int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) {
+ final String value = properties.getProperty(key);
+ return value == null ? defaultValue : Integer.parseInt(value);
+ }
+
+}
diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
index a34dc0a2..cf8100dc 100644
--- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
@@ -1,294 +1,291 @@
/*
* Copyright (c) 2017 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.elasticsearch5;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty;
import static org.elasticsearch.common.settings.Settings.Builder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Elasticsearch client for YCSB framework.
*/
public class ElasticsearchClient extends DB {
private static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
private static final String DEFAULT_INDEX_KEY = "es.ycsb";
private static final String DEFAULT_REMOTE_HOST = "localhost:9300";
private static final int NUMBER_OF_SHARDS = 1;
private static final int NUMBER_OF_REPLICAS = 0;
- private Client client;
+ private TransportClient client;
private String indexKey;
/**
*
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
final Properties props = getProperties();
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
- int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
- int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
+ final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
+ final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
- Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
- Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
+ final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
+ final Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
// if properties file contains elasticsearch user defined properties
// add it to the settings file (will overwrite the defaults).
for (final Entry e : props.entrySet()) {
if (e.getKey() instanceof String) {
final String key = (String) e.getKey();
if (key.startsWith("es.setting.")) {
settings.put(key.substring("es.setting.".length()), e.getValue());
}
}
}
final String clusterName = settings.get("cluster.name");
- System.err.println("Elasticsearch starting node = " + clusterName);
+ System.out.println("Elasticsearch cluster name = " + clusterName);
settings.put("client.transport.sniff", true)
.put("client.transport.ignore_cluster_name", false)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s");
// Default it to localhost:9300
- String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
+ final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
- TransportClient tClient = new PreBuiltTransportClient(settings.build());
+ client = new PreBuiltTransportClient(settings.build());
for (String h : nodeList) {
String[] nodes = h.split(":");
+
+ final InetAddress address;
try {
- tClient.addTransportAddress(new InetSocketTransportAddress(
- InetAddress.getByName(nodes[0]),
- Integer.parseInt(nodes[1])
- ));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Unable to parse port number.", e);
+ address = InetAddress.getByName(nodes[0]);
} catch (UnknownHostException e) {
- throw new IllegalArgumentException("Unable to Identify host.", e);
+ throw new IllegalArgumentException("unable to identity host [" + nodes[0]+ "]", e);
+ }
+ final int port;
+ try {
+ port = Integer.parseInt(nodes[1]);
+ } catch (final NumberFormatException e) {
+ throw new IllegalArgumentException("unable to parse port [" + nodes[1] + "]", e);
}
+ client.addTransportAddress(new InetSocketTransportAddress(address, port));
}
- client = tClient;
final boolean exists =
client.admin().indices()
.exists(Requests.indicesExistsRequest(indexKey)).actionGet()
.isExists();
if (exists && newdb) {
client.admin().indices().prepareDelete(indexKey).get();
}
if (!exists || newdb) {
client.admin().indices().create(
new CreateIndexRequest(indexKey)
.settings(
Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
)).actionGet();
}
client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
}
- private int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) {
- final String value = properties.getProperty(key);
- return value == null ? defaultValue : Integer.parseInt(value);
- }
-
@Override
public void cleanup() throws DBException {
if (client != null) {
client.close();
client = null;
}
}
@Override
public Status insert(String table, String key, Map values) {
try {
final XContentBuilder doc = jsonBuilder();
doc.startObject();
for (final Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
doc.field(entry.getKey(), entry.getValue());
}
-
- doc.field("key", key);
+ doc.field(KEY, key);
doc.endObject();
client.prepareIndex(indexKey, table).setSource(doc).execute().actionGet();
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status delete(final String table, final String key) {
try {
final SearchResponse searchResponse = search(table, key);
if (searchResponse.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
final String id = searchResponse.getHits().getAt(0).getId();
final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).execute().actionGet();
if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
return Status.NOT_FOUND;
- } else {
- return Status.OK;
}
+
+ return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status read(
final String table,
final String key,
final Set fields,
final Map result) {
try {
final SearchResponse searchResponse = search(table, key);
if (searchResponse.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
final SearchHit hit = searchResponse.getHits().getAt(0);
if (fields != null) {
for (final String field : fields) {
result.put(field, new StringByteIterator(
(String) hit.getField(field).getValue()));
}
} else {
for (final Map.Entry e : hit.getFields().entrySet()) {
- if ("key".equals(e.getKey())) {
+ if (KEY.equals(e.getKey())) {
continue;
}
result.put(e.getKey(), new StringByteIterator((String) e.getValue().getValue()));
}
}
- return Status.OK;
+ return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status update(final String table, final String key, final Map values) {
try {
final SearchResponse response = search(table, key);
if (response.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
final SearchHit hit = response.getHits().getAt(0);
for (final Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
hit.getSource().put(entry.getKey(), entry.getValue());
}
client.prepareIndex(indexKey, table, hit.getId()).setSource(hit.getSource()).get();
return Status.OK;
-
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status scan(
final String table,
final String startkey,
final int recordcount,
final Set fields,
final Vector> result) {
try {
- final RangeQueryBuilder query = new RangeQueryBuilder("key").gte(startkey);
+ final RangeQueryBuilder query = new RangeQueryBuilder(KEY).gte(startkey);
final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get();
for (final SearchHit hit : response.getHits()) {
final HashMap entry;
if (fields != null) {
entry = new HashMap<>(fields.size());
for (final String field : fields) {
entry.put(field, new StringByteIterator((String) hit.getSource().get(field)));
}
} else {
entry = new HashMap<>(hit.getFields().size());
for (final Map.Entry field : hit.getFields().entrySet()) {
- if ("key".equals(field.getKey())) {
+ if (KEY.equals(field.getKey())) {
continue;
}
entry.put(field.getKey(), new StringByteIterator((String) field.getValue().getValue()));
}
}
result.add(entry);
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
private SearchResponse search(final String table, final String key) {
- return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder("key", key)).get();
+ return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder(KEY, key)).get();
}
}
diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
index d1c12d63..07952c10 100644
--- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
@@ -1,419 +1,407 @@
/*
* Copyright (c) 2017 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.elasticsearch5;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty;
import static java.util.Collections.emptyMap;
-import static org.elasticsearch.common.settings.Settings.Builder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Elasticsearch REST client for YCSB framework.
*/
public class ElasticsearchRestClient extends DB {
private static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
private static final String DEFAULT_INDEX_KEY = "es.ycsb";
private static final String DEFAULT_REMOTE_HOST = "localhost:9200";
private static final int NUMBER_OF_SHARDS = 1;
private static final int NUMBER_OF_REPLICAS = 0;
private RestClient restClient;
private String indexKey;
/**
*
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
final Properties props = getProperties();
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
- int numberOfShards = Integer.valueOf(props.getProperty("es.number_of_shards",
- String.valueOf(NUMBER_OF_SHARDS)));
- int numberOfReplicas = Integer.valueOf(props.getProperty("es.number_of_replicas",
- String.valueOf(NUMBER_OF_REPLICAS)));
-
- Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
- Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
-
- // if properties file contains elasticsearch user defined properties
- // add it to the settings file (will overwrite the defaults).
- for (final Map.Entry e : props.entrySet()) {
- if (e.getKey() instanceof String) {
- final String key = (String) e.getKey();
- if (key.startsWith("es.setting.")) {
- settings.put(key.substring("es.setting.".length()), e.getValue());
- }
- }
- }
- final String clusterName = settings.get("cluster.name");
- System.err.println("Elasticsearch starting node = " + clusterName);
+ final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
+ final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
+
+ final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
- String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
+ final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
- List esHttpHosts = new ArrayList<>(nodeList.length);
+ final List esHttpHosts = new ArrayList<>(nodeList.length);
for (String h : nodeList) {
String[] nodes = h.split(":");
esHttpHosts.add(new HttpHost(nodes[0], Integer.valueOf(nodes[1]), "http"));
}
restClient = RestClient.builder(esHttpHosts.toArray(new HttpHost[esHttpHosts.size()])).build();
final Response existsResponse = performRequest(restClient, "HEAD", "/" + indexKey);
final boolean exists = existsResponse.getStatusLine().getStatusCode() == 200;
if (exists && newdb) {
final Response deleteResponse = performRequest(restClient, "DELETE", "/" + indexKey);
final int statusCode = deleteResponse.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new DBException("delete [" + indexKey + "] failed with status [" + statusCode + "]");
}
}
if (!exists || newdb) {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("settings");
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
builder.endObject();
builder.endObject();
final Map params = emptyMap();
final StringEntity entity = new StringEntity(builder.string());
final Response createResponse = performRequest(restClient, "PUT", "/" + indexKey, params, entity);
final int statusCode = createResponse.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new DBException("create [" + indexKey + "] failed with status [" + statusCode + "]");
}
} catch (final IOException e) {
throw new DBException(e);
}
}
final Map params = Collections.singletonMap("wait_for_status", "green");
final Response healthResponse = performRequest(restClient, "GET", "/_cluster/health/" + indexKey, params);
final int healthStatusCode = healthResponse.getStatusLine().getStatusCode();
if (healthStatusCode != 200) {
throw new DBException("cluster health [" + indexKey + "] failed with status [" + healthStatusCode + "]");
}
}
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint) throws DBException {
final Map params = emptyMap();
return performRequest(restClient, method, endpoint, params);
}
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint,
final Map params) throws DBException {
return performRequest(restClient, method, endpoint, params, null);
}
private static Header[] emptyHeaders = new Header[0];
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint,
final Map params,
final HttpEntity entity) throws DBException {
try {
final Header[] headers;
if (entity != null) {
headers = new Header[]{new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString())};
} else {
headers = emptyHeaders;
}
return restClient.performRequest(
method,
endpoint,
params,
entity,
headers);
} catch (final IOException e) {
e.printStackTrace();
throw new DBException(e);
}
}
@Override
public void cleanup() throws DBException {
if (restClient != null) {
try {
restClient.close();
restClient = null;
} catch (final IOException e) {
throw new DBException(e);
}
}
}
@Override
public Status insert(final String table, final String key, final Map values) {
try {
final Map data = StringByteIterator.getStringMap(values);
- data.put("key", key);
+ data.put(KEY, key);
final Response response = restClient.performRequest(
"POST",
"/" + indexKey + "/" + table + "/",
Collections.emptyMap(),
new NStringEntity(new ObjectMapper().writeValueAsString(data), ContentType.APPLICATION_JSON));
if (response.getStatusLine().getStatusCode() == 201) {
return Status.OK;
} else {
return Status.ERROR;
}
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
- public Status delete(String table, String key) {
+ public Status delete(final String table, final String key) {
try {
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map map = map(searchResponse);
@SuppressWarnings("unchecked") final Map hits = (Map)map.get("hits");
final int total = (int)hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
- @SuppressWarnings("unchecked") final Map hit = (Map)((List)hits.get("hits")).get(0);
+ @SuppressWarnings("unchecked") final Map hit =
+ (Map)((List)hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map source = (Map)hit.get("_source");
final Response deleteResponse =
restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + source.get("_id"));
if (deleteResponse.getStatusLine().getStatusCode() != 200) {
return Status.ERROR;
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status read(
final String table,
final String key,
final Set fields,
final Map result) {
try {
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map map = map(searchResponse);
@SuppressWarnings("unchecked") final Map hits = (Map)map.get("hits");
final int total = (int)hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
- @SuppressWarnings("unchecked") final Map hit = (Map)((List)hits.get("hits")).get(0);
+ @SuppressWarnings("unchecked") final Map hit =
+ (Map)((List)hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map source = (Map)hit.get("_source");
if (fields != null) {
for (final String field : fields) {
result.put(field, new StringByteIterator((String) source.get(field)));
}
} else {
for (final Map.Entry e : source.entrySet()) {
- if ("key".equals(e.getKey())) {
+ if (KEY.equals(e.getKey())) {
continue;
}
result.put(e.getKey(), new StringByteIterator((String) e.getValue()));
}
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
- public Status update(String table, String key, Map values) {
+ public Status update(final String table, final String key, final Map values) {
try {
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map map = map(searchResponse);
@SuppressWarnings("unchecked") final Map hits = (Map) map.get("hits");
final int total = (int) hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
- @SuppressWarnings("unchecked") final Map hit = (Map) ((List) hits.get("hits")).get(0);
+ @SuppressWarnings("unchecked") final Map hit =
+ (Map) ((List) hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map source = (Map) hit.get("_source");
for (final Map.Entry entry : StringByteIterator.getStringMap(values).entrySet()) {
source.put(entry.getKey(), entry.getValue());
}
final Map params = emptyMap();
final Response response = restClient.performRequest(
"PUT",
"/" + indexKey + "/" + table + "/" + source.get("_id"),
params,
new NStringEntity(new ObjectMapper().writeValueAsString(source), ContentType.APPLICATION_JSON));
if (response.getStatusLine().getStatusCode() != 200) {
return Status.ERROR;
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status scan(
- String table,
- String startkey,
- int recordcount,
- Set fields,
- Vector> result) {
+ final String table,
+ final String startkey,
+ final int recordcount,
+ final Set fields,
+ final Vector> result) {
try {
final Response response;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("query");
builder.startObject("range");
- builder.startObject("key");
+ builder.startObject(KEY);
builder.field("gte", startkey);
builder.endObject();
builder.endObject();
builder.endObject();
builder.field("size", recordcount);
builder.endObject();
response = search(table, builder);
@SuppressWarnings("unchecked") final Map map = map(response);
@SuppressWarnings("unchecked") final Map hits = (Map)map.get("hits");
- @SuppressWarnings("unchecked") final List> list = (List>) hits.get("hits");
+ @SuppressWarnings("unchecked") final List> list =
+ (List>) hits.get("hits");
for (final Map hit : list) {
@SuppressWarnings("unchecked") final Map source = (Map)hit.get("_source");
final HashMap entry;
if (fields != null) {
entry = new HashMap<>(fields.size());
for (final String field : fields) {
entry.put(field, new StringByteIterator((String) source.get(field)));
}
} else {
entry = new HashMap<>(hit.size());
for (final Map.Entry field : source.entrySet()) {
- if ("key".equals(field.getKey())) {
+ if (KEY.equals(field.getKey())) {
continue;
}
entry.put(field.getKey(), new StringByteIterator((String) field.getValue()));
}
}
result.add(entry);
}
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
private Response search(final String table, final String key) throws IOException {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("query");
builder.startObject("term");
- builder.field("key", key);
+ builder.field(KEY, key);
builder.endObject();
builder.endObject();
builder.endObject();
return search(table, builder);
}
}
private Response search(final String table, final XContentBuilder builder) throws IOException {
final Map params = emptyMap();
final StringEntity entity = new StringEntity(builder.string());
final Header header = new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString());
return restClient.performRequest("GET", "/" + indexKey + "/" + table + "/_search", params, entity, header);
}
private Map map(final Response response) throws IOException {
try (InputStream is = response.getEntity().getContent()) {
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map map = mapper.readValue(is, Map.class);
return map;
}
}
}