diff --git a/kudu/README.md b/kudu/README.md index e7c4fa01..cd5cffd6 100644 --- a/kudu/README.md +++ b/kudu/README.md @@ -1,45 +1,44 @@ # Kudu bindings for YCSB [Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data. ## Benchmarking Kudu Use the following command line to load the initial data into an existing Kudu cluster with default configurations. ``` bin/ycsb load kudu -P workloads/workloada ``` Additional configurations: * `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost. * `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default uses 4 tablets. A good rule of thumb is to use 5 per tablet server. * `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should only be configured to use 1 instead, for single node tests. -* `kudu_sync_ops`: If the client should buffer data before sending it. The default is false. Should -always be set to true for the run phase. +* `kudu_sync_ops`: If the client should wait after every write operation. The default is true. * `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes. Then, you can run the workload: ``` -bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true +bin/ycsb run kudu -P workloads/workloada ``` diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java index d467f452..9e65407a 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -1,320 +1,320 @@ /** * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb.db; import com.stumbleupon.async.TimeoutException; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.StringByteIterator; import com.yahoo.ycsb.workloads.CoreWorkload; import org.kududb.ColumnSchema; import org.kududb.Schema; import org.kududb.client.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.Vector; import static org.kududb.Type.STRING; /** * Kudu client for YCSB framework * Example to load: * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5 * Example to run: * ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5 * */ public class KuduYCSBClient extends com.yahoo.ycsb.DB { public static final String KEY = "key"; public static final int OK = 0; public static final int SERVER_ERROR = -1; public static final int NO_MATCHING_RECORD = -2; public static final int TIMEOUT = -3; public static final int MAX_TABLETS = 9000; public static final long DEFAULT_SLEEP = 60000; private static final String SYNC_OPS_OPT = "kudu_sync_ops"; private static final String DEBUG_OPT = "kudu_debug"; private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors"; private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets"; private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas"; private static final String BLOCK_SIZE_OPT = "kudu_block_size"; private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses"; private static final int BLOCK_SIZE_DEFAULT = 4096; private static final List columnNames = new ArrayList(); private static KuduClient client; private static Schema schema; private static int fieldCount; private boolean debug = false; private boolean printErrors = false; private String tableName; private KuduSession session; private KuduTable table; @Override public void init() throws DBException { if (getProperties().getProperty(DEBUG_OPT) != null) { this.debug = getProperties().getProperty(DEBUG_OPT).equals("true"); } if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); } if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); } this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table; initClient(debug, tableName, getProperties()); this.session = client.newSession(); if (getProperties().getProperty(SYNC_OPS_OPT) != null && - getProperties().getProperty(SYNC_OPS_OPT).equals("true")) { - this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); - } else { + getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); + this.session.setMutationBufferSpace(100); + } else { + this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); } - this.session.setMutationBufferSpace(100); try { this.table = client.openTable(tableName); } catch (Exception e) { throw new DBException("Could not open a table because of:", e); } } private synchronized static void initClient(boolean debug, String tableName, Properties prop) throws DBException { if (client != null) return; String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT); if (masterAddresses == null) { masterAddresses = "localhost:7051"; } int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); if (numTablets > MAX_TABLETS) { throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " + "or below " + MAX_TABLETS); } int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3); int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); client = new KuduClient.KuduClientBuilder(masterAddresses) .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) .defaultOperationTimeoutMs(DEFAULT_SLEEP) .build(); if (debug) { System.out.println("Connecting to the masters at " + masterAddresses); } fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); List columns = new ArrayList(fieldCount + 1); ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) .key(true) .desiredBlockSize(blockSize) .build(); columns.add(keyColumn); columnNames.add(KEY); for (int i = 0; i < fieldCount; i++) { String name = "field" + i; columnNames.add(name); columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) .desiredBlockSize(blockSize) .build()); } schema = new Schema(columns); CreateTableBuilder builder = new CreateTableBuilder(); builder.setNumReplicas(numReplicas); // create n-1 split keys, which will end up being n tablets master-side for (int i = 1; i < numTablets + 0; i++) { // We do +1000 since YCSB starts at user1. int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000; String startKey = String.format("%04d", startKeyInt); PartialRow splitRow = schema.newPartialRow(); splitRow.addString(0, "user" + startKey); builder.addSplitRow(splitRow); } try { client.createTable(tableName, schema, builder); } catch (Exception e) { if (!e.getMessage().contains("ALREADY_PRESENT")) { throw new DBException("Couldn't create the table", e); } } } private static int getIntFromProp(Properties prop, String propName, int defaultValue) throws DBException { String intStr = prop.getProperty(propName); if (intStr == null) { return defaultValue; } else { try { return Integer.valueOf(intStr); } catch (NumberFormatException ex) { throw new DBException("Provided number for " + propName + " isn't a valid integer"); } } } @Override public void cleanup() throws DBException { try { this.session.close(); } catch (Exception e) { throw new DBException("Couldn't cleanup the session", e); } } @Override public int read(String table, String key, Set fields, HashMap result) { Vector> results = new Vector>(); int ret = scan(table, key, 1, fields, results); if (ret != OK) return ret; if (results.size() != 1) return NO_MATCHING_RECORD; result.putAll(results.firstElement()); return OK; } @Override public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { try { KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table); List querySchema; if (fields == null) { querySchema = columnNames; // No need to set the projected columns with the whole schema. } else { querySchema = new ArrayList(fields); scannerBuilder.setProjectedColumnNames(querySchema); } PartialRow lowerBound = schema.newPartialRow(); lowerBound.addString(0, startkey); scannerBuilder.lowerBound(lowerBound); if (recordcount == 1) { PartialRow upperBound = schema.newPartialRow(); // Keys are fixed length, just adding something at the end is safe. upperBound.addString(0, startkey.concat(" ")); scannerBuilder.exclusiveUpperBound(upperBound); } KuduScanner scanner = scannerBuilder .limit(recordcount) // currently noop .build(); while (scanner.hasMoreRows()) { RowResultIterator data = scanner.nextRows(); addAllRowsToResult(data, recordcount, querySchema, result); if (recordcount == result.size()) break; } RowResultIterator closer = scanner.close(); addAllRowsToResult(closer, recordcount, querySchema, result); } catch (TimeoutException te) { if (printErrors) { System.err.println("Waited too long for a scan operation with start key=" + startkey); } return TIMEOUT; } catch (Exception e) { System.err.println("Unexpected exception " + e); e.printStackTrace(); return SERVER_ERROR; } return OK; } private void addAllRowsToResult(RowResultIterator it, int recordcount, List querySchema, Vector> result) throws Exception { RowResult row; HashMap rowResult = new HashMap(querySchema.size()); if (it == null) return; while (it.hasNext()) { if (result.size() == recordcount) return; row = it.next(); int colIdx = 0; for (String col : querySchema) { rowResult.put(col, new StringByteIterator(row.getString(colIdx))); colIdx++; } result.add(rowResult); } } @Override public int update(String table, String key, HashMap values) { Update update = this.table.newUpdate(); PartialRow row = update.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { String columnName = schema.getColumnByIndex(i).getName(); if (values.containsKey(columnName)) { String value = values.get(columnName).toString(); row.addString(columnName, value); } } apply(update); return OK; } @Override public int insert(String table, String key, HashMap values) { Insert insert = this.table.newInsert(); PartialRow row = insert.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray())); } apply(insert); return OK; } @Override public int delete(String table, String key) { Delete delete = this.table.newDelete(); PartialRow row = delete.getRow(); row.addString(KEY, key); apply(delete); return OK; } private void apply(Operation op) { try { OperationResponse response = session.apply(op); if (response != null && response.hasRowError() && printErrors) { System.err.println("Got a row error " + response.getRowError()); } } catch (Exception ex) { if (printErrors) { System.err.println("Failed to apply an operation " + ex.toString()); ex.printStackTrace(); } } } }