diff --git a/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java b/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java new file mode 100644 index 00000000..fe2a5cd9 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/AsyncDBPoolAdapter.java @@ -0,0 +1,197 @@ +package com.yahoo.ycsb; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Adapter that creates an extendable pool of DB instances which are run concurrently + * to simulate the behaviour of an AsyncDB object. + */ +public class AsyncDBPoolAdapter extends AsyncDB { + + private boolean opened = false; + private Properties props; + private T syncViewDB; + private Class dbType; + private ObjectPool clients = new ObjectPool(Long.MAX_VALUE) { + + private Constructor constructor; + + private synchronized void populateConstructor() { + if (constructor != null) { + return; + } + try { + constructor = dbType.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + System.err.println("Unable to find constructor of selected DB."); + throw new AsyncDBPoolCannotCreateClientException(); + } + } + + @Override + protected T create() { + if (constructor == null) { + populateConstructor(); + } + T newClient = null; + try { + newClient = constructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + System.err.println("Unable to create new instance of selected DB: " + e); + throw new AsyncDBPoolCannotCreateClientException(); + } + + if (props != null) { + newClient.setProperties(props); + } + + try { + newClient.init(); + } catch (DBException e) { + System.err.println("Unable to initialize new instance of selected DB: " + e); + throw new AsyncDBPoolCannotCreateClientException(); + } + + opened = true; + return newClient; + } + + @Override + protected void destroy(T object) { + try { + object.cleanup(); + } catch (DBException e) { + System.err.println("Exception thrown when cleaning up DB client: " + e); + } + } + + @Override + protected boolean validate(T object) { + return true; + } + }; + + private static class AsyncDBPoolAlreadyStartedException extends IllegalStateException { + AsyncDBPoolAlreadyStartedException() { + super("AsyncDBPool already started, cannot change properties anymore."); + } + } + + private static class AsyncDBPoolCannotCreateClientException extends IllegalStateException { + AsyncDBPoolCannotCreateClientException() { + super("AsyncDBPool cannot create a new client instance."); + } + } + + private AsyncDBPoolAdapter() {} + + AsyncDBPoolAdapter(Class dbClass) { + this.dbType = dbClass; + } + + private CompletableFuture asyncOp(Supplier op) { + class BlockingResult implements ForkJoinPool.ManagedBlocker { + private boolean finished = false; + private Status result = null; + + @Override + public boolean block() { + + result = op.get(); + + finished = true; + return true; + } + + @Override + public boolean isReleasable() { + return finished; + } + + public Status getResult() { + if (!isReleasable()) { + return Status.UNEXPECTED_STATE; + } + return result; + } + } + return CompletableFuture.supplyAsync(() -> { + try { + BlockingResult br = new BlockingResult(); + ForkJoinPool.managedBlock(br); + return br.getResult(); + } catch (InterruptedException e) { + System.err.println("AsyncDBPoolAdapter: asyncOp() got an InterruptedException: " + e); + return Status.ERROR; + } + }); + } + + @Override + public void cleanup() { + clients.close(); + } + + @Override + public void setProperties(Properties p) { + if (opened) { + throw new AsyncDBPoolAlreadyStartedException(); + } + this.props = p; + } + + @Override + public Properties getProperties() { + return props; + } + + private CompletableFuture runInSeparateClient(Function> op) { + T client = clients.take(); + return asyncOp(op.apply(client)).whenComplete((res, ex) -> clients.giveBack(client)); + } + + @Override + public CompletableFuture read(String table, String key, Set fields, + Map result) { + return runInSeparateClient(c -> () -> c.read(table, key, fields, result)); + } + + @Override + public CompletableFuture scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + return runInSeparateClient(c -> () -> c.scan(table, startkey, recordcount, fields, result)); + } + + @Override + public CompletableFuture update(String table, String key, Map values) { + return runInSeparateClient(c -> () -> c.update(table, key, values)); + } + + @Override + public CompletableFuture insert(String table, String key, Map values) { + return runInSeparateClient(c -> () -> c.insert(table, key, values)); + } + + @Override + public CompletableFuture delete(String table, String key) { + return runInSeparateClient(c -> () -> c.delete(table, key)); + } + + @Override + public synchronized DB syncView() { + if (syncViewDB == null) { + syncViewDB = clients.create(); + } + return syncViewDB; + } +} diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/main/java/com/yahoo/ycsb/DBFactory.java index 12043635..394e1a0d 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java +++ b/core/src/main/java/com/yahoo/ycsb/DBFactory.java @@ -1,55 +1,63 @@ /** * Copyright (c) 2010-2016 Yahoo! Inc., 2017 YCSB contributors All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package com.yahoo.ycsb; import org.apache.htrace.core.Tracer; import java.util.Properties; /** * Creates a DB layer by dynamically classloading the specified DB class. */ public final class DBFactory { + + private static final String SIMULATE_ASYNCDB = "simulate-asyncdb"; + private DBFactory() { // not used } public static AsyncDB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException { + boolean simulateAsyncDB = Boolean.parseBoolean(properties.getProperty(SIMULATE_ASYNCDB, "false")); ClassLoader classLoader = DBFactory.class.getClassLoader(); AsyncDB ret; try { Class dbclass = classLoader.loadClass(dbname); if (DB.class.isAssignableFrom(dbclass)) { - ret = new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance()); + if (simulateAsyncDB) { + ret = new AsyncDBPoolAdapter(dbclass); + } else { + ret = new AsyncDBAdapter((DB) dbclass.getDeclaredConstructor().newInstance()); + } } else { ret = (AsyncDB) dbclass.getDeclaredConstructor().newInstance(); } } catch (Exception e) { e.printStackTrace(); return null; } ret.setProperties(properties); return new DBWrapper(ret, tracer); } }