diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java index a7bf1179..258dc4b2 100644 --- a/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java +++ b/core/src/main/java/com/yahoo/ycsb/AsyncDBAdapter.java @@ -1,58 +1,113 @@ package com.yahoo.ycsb; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; +import java.util.function.Supplier; /** * */ public class AsyncDBAdapter extends AsyncDB { + /* + * Option to select whether the underlying synchronous DB is simply wrapped but maintains its blocking properties, + * or whether the operations are dispatched to a separate thread using the ForkJoinPool. + */ + private static final String DB_SIMULATE_ASYNC = "db-simulate-async"; private final DB db; + private Function, CompletableFuture> opExecutor = this::syncOp; public AsyncDBAdapter(final DB db) { this.db = db; } @Override public void setProperties(Properties p) { db.setProperties(p); + boolean simulateAsyncDB = Boolean.parseBoolean(p.getProperty(DB_SIMULATE_ASYNC, "false")); + if (simulateAsyncDB) { + opExecutor = this::asyncOp; + } } @Override public Properties getProperties() { return db.getProperties(); } @Override public CompletableFuture read(String table, String key, Set fields, Map result) { - return CompletableFuture.completedFuture(db.read(table, key, fields, result)); + return opExecutor.apply(() -> db.read(table, key, fields, result)); } @Override public CompletableFuture scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - return CompletableFuture.completedFuture(db.scan(table, startkey, recordcount, fields, result)); + return opExecutor.apply(() -> db.scan(table, startkey, recordcount, fields, result)); } @Override public CompletableFuture update(String table, String key, Map values) { - return CompletableFuture.completedFuture(db.update(table, key, values)); + return opExecutor.apply(() -> db.update(table, key, values)); } @Override public CompletableFuture insert(String table, String key, Map values) { - return CompletableFuture.completedFuture(db.insert(table, key, values)); + return opExecutor.apply(() -> db.insert(table, key, values)); } @Override public CompletableFuture delete(String table, String key) { - return CompletableFuture.completedFuture(db.delete(table, key)); + return opExecutor.apply(() -> db.delete(table, key)); } @Override public DB syncView() { return db; } + + private CompletableFuture asyncOp(Supplier op) { + class BlockingResult implements ForkJoinPool.ManagedBlocker { + private boolean finished = false; + private Status result = null; + + @Override + public boolean block() { + + result = op.get(); + + finished = true; + return true; + } + + @Override + public boolean isReleasable() { + return finished; + } + + public Status getResult() { + if (!isReleasable()) { + return Status.UNEXPECTED_STATE; + } + return result; + } + } + return CompletableFuture.supplyAsync(() -> { + try { + BlockingResult br = new BlockingResult(); + ForkJoinPool.managedBlock(br); + return br.getResult(); + } catch (InterruptedException e) { + System.err.println("AsyncDBAdapter: asyncOp() got an InterruptedException: " + e); + return Status.ERROR; + } + }); + } + + private CompletableFuture syncOp(Supplier op) { + return CompletableFuture.completedFuture(op.get()); + } } diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java b/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java index 47c7126e..0121c24a 100644 --- a/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java +++ b/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java @@ -1,178 +1,136 @@ package com.yahoo.ycsb; -import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.function.Supplier; /** - * Adapter that creates an extendable pool of DB instances which are run concurrently - * to simulate the behaviour of an AsyncDB object. + * Adapter that creates an extendable pool of AsyncDB instances which are run concurrently + * to simulate the behaviour of multiple concurrent clients. */ -public class AsyncDBPoolAdapter extends AsyncDB { +public class AsyncDBPoolAdapter extends AsyncDB { private boolean opened = false; private Properties props; - private T syncViewDB; - private AbstractObjectPool clients; + private AsyncDB syncViewDB; + private AbstractObjectPool clients; private AsyncDBPoolAdapter() {} - AsyncDBPoolAdapter(Class dbClass, boolean blocking, int maxClients) { - clients = ObjectPoolFactory.getObjectPool(blocking, maxClients, new ObjectPoolItemHandler() { + AsyncDBPoolAdapter(Supplier clientGenerator, boolean blocking, int maxClients) { + clients = ObjectPoolFactory.getObjectPool(blocking, maxClients, new ObjectPoolItemHandler() { @Override - public T create() { - T newClient = null; - try { - newClient = dbClass.getDeclaredConstructor().newInstance(); - } catch (InstantiationException | IllegalAccessException | - InvocationTargetException | NoSuchMethodException e) { - System.err.println("Unable to create new instance of selected DB: " + e); - throw new AsyncDBPoolCannotCreateClientException(); - } - + public AsyncDB create() { + AsyncDB newClient = clientGenerator.get(); if (props != null) { newClient.setProperties(props); } try { newClient.init(); } catch (DBException e) { System.err.println("Unable to initialize new instance of selected DB: " + e); throw new AsyncDBPoolCannotCreateClientException(); } opened = true; return newClient; } @Override - public void destroy(T object) { + public void destroy(AsyncDB object) { try { object.cleanup(); } catch (DBException e) { System.err.println("Exception thrown when cleaning up DB client: " + e); } } @Override - public boolean validate(T object) { + public boolean validate(AsyncDB object) { return true; } }); } private static class AsyncDBPoolAlreadyStartedException extends IllegalStateException { AsyncDBPoolAlreadyStartedException() { super("AsyncDBPool already started, cannot change properties anymore."); } } private static class AsyncDBPoolCannotCreateClientException extends IllegalStateException { AsyncDBPoolCannotCreateClientException() { super("AsyncDBPool cannot create a new client instance."); } } - private CompletableFuture asyncOp(Supplier op) { - class BlockingResult implements ForkJoinPool.ManagedBlocker { - private boolean finished = false; - private Status result = null; - - @Override - public boolean block() { - - result = op.get(); - - finished = true; - return true; - } - - @Override - public boolean isReleasable() { - return finished; - } - - public Status getResult() { - if (!isReleasable()) { - return Status.UNEXPECTED_STATE; - } - return result; - } - } - return CompletableFuture.supplyAsync(() -> { - try { - BlockingResult br = new BlockingResult(); - ForkJoinPool.managedBlock(br); - return br.getResult(); - } catch (InterruptedException e) { - System.err.println("AsyncDBPoolAdapter: asyncOp() got an InterruptedException: " + e); - return Status.ERROR; - } - }); - } @Override public void cleanup() { + synchronized (this) { + if (syncViewDB != null) { + clients.giveBack(syncViewDB); + } + } clients.close(); } @Override public void setProperties(Properties p) { if (opened) { throw new AsyncDBPoolAlreadyStartedException(); } this.props = p; } @Override public Properties getProperties() { return props; } - private CompletableFuture runInSeparateClient(Function> op) { - T client = clients.take(); - return asyncOp(op.apply(client)).whenComplete((res, ex) -> clients.giveBack(client)); + private CompletableFuture runInSeparateClient(Function>> op) { + AsyncDB client = clients.take(); + return op.apply(client).get().whenComplete((res, ex) -> clients.giveBack(client)); } @Override public CompletableFuture read(String table, String key, Set fields, Map result) { return runInSeparateClient(c -> () -> c.read(table, key, fields, result)); } @Override public CompletableFuture scan(String table, String startkey, int recordcount, Set fields, Vector> result) { return runInSeparateClient(c -> () -> c.scan(table, startkey, recordcount, fields, result)); } @Override public CompletableFuture update(String table, String key, Map values) { return runInSeparateClient(c -> () -> c.update(table, key, values)); } @Override public CompletableFuture insert(String table, String key, Map values) { return runInSeparateClient(c -> () -> c.insert(table, key, values)); } @Override public CompletableFuture delete(String table, String key) { return runInSeparateClient(c -> () -> c.delete(table, key)); } @Override public synchronized DB syncView() { if (syncViewDB == null) { syncViewDB = clients.take(); } - return syncViewDB; + return syncViewDB.syncView(); } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/main/java/com/yahoo/ycsb/DBFactory.java index c8593f9d..e04e83f3 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java +++ b/core/src/main/java/com/yahoo/ycsb/DBFactory.java @@ -1,70 +1,87 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import org.apache.htrace.core.Tracer; import java.util.Properties; +import java.util.function.Supplier; /** * Creates a DB layer by dynamically classloading the specified DB class. */ public final class DBFactory { - private static final String SIMULATE_ASYNCDB = "simulate-asyncdb"; - private static final String BLOCKING_DB_POOL = "blocking-db-pool"; + private static final String DB_POOL_ENABLE = "db-pool-enable"; + private static final String DB_POOL_BLOCKING = "db-pool-blocking"; private static final String DB_POOL_MAX_SIZE = "db-pool-max-size"; private DBFactory() { // not used } public static AsyncDB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException { - boolean simulateAsyncDB = Boolean.parseBoolean(properties.getProperty(SIMULATE_ASYNCDB, "false")); - boolean blockingAsyncDBPool = Boolean.parseBoolean(properties.getProperty(BLOCKING_DB_POOL, "true")); + boolean dbPoolEnabled = Boolean.parseBoolean(properties.getProperty(DB_POOL_ENABLE, "false")); + boolean dbPoolBlocking = Boolean.parseBoolean(properties.getProperty(DB_POOL_BLOCKING, "true")); int dbPoolMaxSize = Integer.MAX_VALUE; if (properties.containsKey(DB_POOL_MAX_SIZE)) { dbPoolMaxSize = Integer.parseInt(properties.getProperty(DB_POOL_MAX_SIZE)); } ClassLoader classLoader = DBFactory.class.getClassLoader(); AsyncDB ret; try { Class dbclass = classLoader.loadClass(dbname); + Supplier dbCreator; if (DB.class.isAssignableFrom(dbclass)) { - if (simulateAsyncDB) { - ret = new AsyncDBPoolAdapter(dbclass, blockingAsyncDBPool, dbPoolMaxSize); - } else { - ret = new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance()); - } + dbCreator = () -> { + try { + return new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance()); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }; } else { - ret = (AsyncDB) dbclass.getDeclaredConstructor().newInstance(); + dbCreator = () -> { + try { + return (AsyncDB) dbclass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }; + } + if (dbPoolEnabled) { + ret = new AsyncDBPoolAdapter(dbCreator, dbPoolBlocking, dbPoolMaxSize); + } else { + ret = dbCreator.get(); } } catch (Exception e) { e.printStackTrace(); return null; } ret.setProperties(properties); return new DBWrapper(ret, tracer); } }