diff --git a/couchbase/README.md b/couchbase/README.md index 4004cb7f..efe16bac 100644 --- a/couchbase/README.md +++ b/couchbase/README.md @@ -1,67 +1,76 @@ # Couchbase Driver for YCSB This driver is a binding for the YCSB facilities to operate against a Couchbase Server cluster. It uses the official Couchbase Java SDK and provides a rich set of configuration options. ## Quickstart ### 1. Start Couchbase Server You need to start a single node or a cluster to point the client at. Please see [http://couchbase.com](couchbase.com) for more details and instructions. ### 2. Set up YCSB You need to clone the repository and compile everything. ``` git clone git://github.com/brianfrankcooper/YCSB.git cd YCSB mvn clean package ``` ### 3. Run the Workload Before you can actually run the workload, you need to "load" the data first. ``` bin/ycsb load couchbase -s -P workloads/workloada ``` Then, you can run the workload: ``` bin/ycsb run couchbase -s -P workloads/workloada ``` Please see the general instructions in the `doc` folder if you are not sure how it all works. You can apply a property (as seen in the next section) like this: ``` bin/ycsb run couchbase -s -P workloads/workloada -p couchbase.useJson=false ``` +## Scans in the CouchbaseClient +The scan operation in the CouchbaseClient requires a Couchbase View to be created manually. To do this: +1. Go to the Couchbase UI, then to Views +2. Create a new development view, specify a ddoc and view name, use these in your YCSB properties. See Configuration Options below. +3. The default map code is sufficient. +4. Save, and publish this View. + ## Configuration Options Since no setup is the same and the goal of YCSB is to deliver realistic benchmarks, here are some setups that you can tune. Note that if you need more flexibility (let's say a custom transcoder), you still need to extend this driver and implement the facilities on your own. You can set the following properties (with the default settings applied): - couchbase.url=http://127.0.0.1:8091/pools => The connection URL from one server. - couchbase.bucket=default => The bucket name to use. - couchbase.password= => The password of the bucket. - - couchbase.checkFutures=true => If the futures should be inspected (makes ops sync). - couchbase.persistTo=0 => Observe Persistence ("PersistTo" constraint). - couchbase.replicateTo=0 => Observe Replication ("ReplicateTo" constraint). + - couchbase.ddoc => The ddoc name used for scanning + - couchbase.view => The view name used for scanning + - couchbase.stale => How to deal with stale values in View Query for scanning. (OK, FALSE, UPDATE_AFTER) - couchbase.json=true => Use json or java serialization as target format. diff --git a/couchbase/src/main/java/com/yahoo/ycsb/db/CouchbaseClient.java b/couchbase/src/main/java/com/yahoo/ycsb/db/CouchbaseClient.java index d5179b34..1eb3aeb3 100644 --- a/couchbase/src/main/java/com/yahoo/ycsb/db/CouchbaseClient.java +++ b/couchbase/src/main/java/com/yahoo/ycsb/db/CouchbaseClient.java @@ -1,345 +1,390 @@ /** - * Copyright (c) 2013 Yahoo! Inc. All rights reserved. + * Copyright (c) 2013 - 2016 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; +import com.couchbase.client.protocol.views.*; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.Status; import com.yahoo.ycsb.StringByteIterator; import net.spy.memcached.PersistTo; import net.spy.memcached.ReplicateTo; import net.spy.memcached.internal.OperationFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringWriter; import java.io.Writer; import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; /** * A class that wraps the CouchbaseClient to allow it to be interfaced with YCSB. * This class extends {@link DB} and implements the database interface used by YCSB client. * *

The following options must be passed when using this database client. * *

* * @author Michael Nitschinger */ public class CouchbaseClient extends DB { public static final String URL_PROPERTY = "couchbase.url"; public static final String BUCKET_PROPERTY = "couchbase.bucket"; public static final String PASSWORD_PROPERTY = "couchbase.password"; public static final String CHECKF_PROPERTY = "couchbase.checkFutures"; public static final String PERSIST_PROPERTY = "couchbase.persistTo"; public static final String REPLICATE_PROPERTY = "couchbase.replicateTo"; public static final String JSON_PROPERTY = "couchbase.json"; + public static final String DESIGN_DOC_PROPERTY = "couchbase.ddoc"; + public static final String VIEW_PROPERTY = "couchbase.view"; + public static final String STALE_PROPERTY = "couchbase.stale"; + public static final String SCAN_PROPERTY = "scanproportion"; + + public static final String STALE_PROPERTY_DEFAULT = Stale.OK.name(); + public static final String SCAN_PROPERTY_DEFAULT = "0.0"; protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); private com.couchbase.client.CouchbaseClient client; private PersistTo persistTo; private ReplicateTo replicateTo; private boolean checkFutures; private boolean useJson; + private String designDoc; + private String viewName; + private Stale stale; + private View view; private final Logger log = LoggerFactory.getLogger(getClass()); @Override public void init() throws DBException { Properties props = getProperties(); String url = props.getProperty(URL_PROPERTY, "http://127.0.0.1:8091/pools"); String bucket = props.getProperty(BUCKET_PROPERTY, "default"); String password = props.getProperty(PASSWORD_PROPERTY, ""); checkFutures = props.getProperty(CHECKF_PROPERTY, "true").equals("true"); useJson = props.getProperty(JSON_PROPERTY, "true").equals("true"); persistTo = parsePersistTo(props.getProperty(PERSIST_PROPERTY, "0")); replicateTo = parseReplicateTo(props.getProperty(REPLICATE_PROPERTY, "0")); + designDoc = getProperties().getProperty(DESIGN_DOC_PROPERTY); + viewName = getProperties().getProperty(VIEW_PROPERTY); + stale = Stale.valueOf(getProperties().getProperty(STALE_PROPERTY, STALE_PROPERTY_DEFAULT).toUpperCase()); + + Double scanproportion = Double.valueOf(props.getProperty(SCAN_PROPERTY, SCAN_PROPERTY_DEFAULT)); + Properties systemProperties = System.getProperties(); systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger"); System.setProperties(systemProperties); try { client = new com.couchbase.client.CouchbaseClient( Arrays.asList(new URI(url)), bucket, password ); } catch (Exception e) { throw new DBException("Could not create CouchbaseClient object.", e); } + + if (scanproportion > 0) { + try { + view = client.getView(designDoc, viewName); + } catch (Exception e) { + throw new DBException(String.format("%s=%s and %s=%s provided, unable to connect to view.", + DESIGN_DOC_PROPERTY, designDoc, VIEW_PROPERTY, viewName), e.getCause()); + } + } } /** * Parse the replicate property into the correct enum. * * @param property the stringified property value. * @throws DBException if parsing the property did fail. * @return the correct enum. */ private ReplicateTo parseReplicateTo(final String property) throws DBException { int value = Integer.parseInt(property); switch (value) { case 0: return ReplicateTo.ZERO; case 1: return ReplicateTo.ONE; case 2: return ReplicateTo.TWO; case 3: return ReplicateTo.THREE; default: throw new DBException(REPLICATE_PROPERTY + " must be between 0 and 3"); } } /** * Parse the persist property into the correct enum. * * @param property the stringified property value. * @throws DBException if parsing the property did fail. * @return the correct enum. */ private PersistTo parsePersistTo(final String property) throws DBException { int value = Integer.parseInt(property); switch (value) { case 0: return PersistTo.ZERO; case 1: return PersistTo.ONE; case 2: return PersistTo.TWO; case 3: return PersistTo.THREE; case 4: return PersistTo.FOUR; default: throw new DBException(PERSIST_PROPERTY + " must be between 0 and 4"); } } /** * Shutdown the client. */ @Override public void cleanup() { client.shutdown(); } @Override public Status read(final String table, final String key, final Set fields, final HashMap result) { String formattedKey = formatKey(table, key); try { Object loaded = client.get(formattedKey); if (loaded == null) { return Status.ERROR; } decode(loaded, fields, result); return Status.OK; } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not read value for key " + formattedKey, e); } return Status.ERROR; } } /** * Scan is currently not implemented. * * @param table The name of the table * @param startkey The record key of the first record to read. * @param recordcount The number of records to read * @param fields The list of fields to read, or null for all of them * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return Status.ERROR, because not implemented yet. */ @Override - public Status scan(final String table, final String startkey, final int recordcount, - final Set fields, final Vector> result) { + public Status scan(final String table, final String startkey, final int recordcount, final Set fields, + final Vector> result) { + try { + Query query = new Query().setRangeStart(startkey) + .setLimit(recordcount) + .setIncludeDocs(true) + .setStale(stale); + ViewResponse response = client.query(view, query); + + for (ViewRow row : response) { + HashMap rowMap = new HashMap(); + decode(row.getDocument(), fields, rowMap); + result.add(rowMap); + } + + return Status.OK; + } catch (Exception e) { + log.error(e.getMessage()); + } + return Status.ERROR; } @Override public Status update(final String table, final String key, final HashMap values) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.replace( formattedKey, encode(values), persistTo, replicateTo ); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not update value for key " + formattedKey, e); } return Status.ERROR; } } @Override public Status insert(final String table, final String key, final HashMap values) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.add( formattedKey, encode(values), persistTo, replicateTo ); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not insert value for key " + formattedKey, e); } return Status.ERROR; } } @Override public Status delete(final String table, final String key) { String formattedKey = formatKey(table, key); try { final OperationFuture future = client.delete(formattedKey, persistTo, replicateTo); return checkFutureStatus(future); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not delete value for key " + formattedKey, e); } return Status.ERROR; } } /** * Prefix the key with the given prefix, to establish a unique namespace. * * @param prefix the prefix to use. * @param key the actual key. * @return the formatted and prefixed key. */ private String formatKey(final String prefix, final String key) { return prefix + ":" + key; } /** * Wrapper method that either inspects the future or not. * * @param future the future to potentially verify. * @return the status of the future result. */ private Status checkFutureStatus(final OperationFuture future) { if (checkFutures) { return future.getStatus().isSuccess() ? Status.OK : Status.ERROR; } else { return Status.OK; } } /** * Decode the object from server into the storable result. * * @param source the loaded object. * @param fields the fields to check. * @param dest the result passed back to the ycsb core. */ private void decode(final Object source, final Set fields, final HashMap dest) { if (useJson) { try { JsonNode json = JSON_MAPPER.readTree((String) source); boolean checkFields = fields != null && !fields.isEmpty(); for (Iterator> jsonFields = json.fields(); jsonFields.hasNext();) { Map.Entry jsonField = jsonFields.next(); String name = jsonField.getKey(); if (checkFields && fields.contains(name)) { continue; } JsonNode jsonValue = jsonField.getValue(); if (jsonValue != null && !jsonValue.isNull()) { dest.put(name, new StringByteIterator(jsonValue.asText())); } } } catch (Exception e) { throw new RuntimeException("Could not decode JSON"); } } else { HashMap converted = (HashMap) source; for (Map.Entry entry : converted.entrySet()) { dest.put(entry.getKey(), new StringByteIterator(entry.getValue())); } } } /** * Encode the object for couchbase storage. * * @param source the source value. * @return the storable object. */ private Object encode(final HashMap source) { HashMap stringMap = StringByteIterator.getStringMap(source); if (!useJson) { return stringMap; } ObjectNode node = JSON_MAPPER.createObjectNode(); for (Map.Entry pair : stringMap.entrySet()) { node.put(pair.getKey(), pair.getValue()); } JsonFactory jsonFactory = new JsonFactory(); Writer writer = new StringWriter(); try { JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer); JSON_MAPPER.writeTree(jsonGenerator, node); } catch (Exception e) { throw new RuntimeException("Could not encode JSON value"); } return writer.toString(); } }