Page MenuHomec4science

evolution_strategy.py
No OneTemporary

File Metadata

Created
Fri, Oct 18, 10:43

evolution_strategy.py

from __future__ import print_function
import numpy as np
import pickle
import time
np.random.seed(0)
class EvolutionStrategy(object):
"""Evolution strategy class, implements the population of generators and the generation of kinetic parameters (Kms)
"""
def __init__(self, mlp, get_reward_func, save_path, path_to_weights, save_step, population_size=50, sigma=0.1,
learning_rate=0.03, decay=0.999, print_step=1, verbose=False):
self.mlp = mlp
self.weights = mlp.generator.get_weights()
self.get_reward = get_reward_func
self.save_path = save_path
self.path_to_weights = path_to_weights
self.save_step = save_step
self.POPULATION_SIZE = population_size
self.SIGMA = sigma
self.learning_rate = learning_rate
self.decay = decay
self.print_step = print_step
self.verbose=verbose
def _get_weights_try(self, w, p):
"""Generate alternate candidate generator weights jittered from the stored generator weights
:param w: weights of the stored generator
:param p: population of generators
:return:
"""
weights_try = []
for index, i in enumerate(p):
jittered = self.SIGMA * i
weights_try.append(w[index] + jittered)
return weights_try
def get_weights(self):
return self.weights
def _get_population(self):
"""Create a list of generator weights similar to the stored weights
:return: generator candidates population
"""
population = []
for i in range(self.POPULATION_SIZE):
x = []
for w in self.weights:
x.append(np.random.randn(*w.shape))
population.append(x)
return population
def _get_rewards(self, population, state=None):
"""Call the reward function of the Renaissance class object for each generator candidate
:param population: population of generators
:param state: given if the current optimization mode is 'singular'
:return:
"""
rewards = []
for p in population:
weights_try = self._get_weights_try(self.weights, p)
generated_params = self._sample_parameters(weights_try)
rewards.append(self.get_reward(generated_params)['group'])
rewards = np.array(rewards)
return rewards
def _sample_parameters(self, weights):
"""Generate kinetic parameters from the candidate generator's weights
:param weights: candidate's weights
:return: generated Kms
"""
self.mlp.generator.set_weights(weights)
generated_params = self.mlp.sample_parameters()
return generated_params
def _update_weights(self, rewards, population):
"""Update the generator weights by estimating a gradient step from the population rewards
:param rewards: rewards of the Natural Evolution Strategy
:param population: population of generators
"""
std = rewards.std()
if std == 0:
return
rewards = (rewards - rewards.mean()) / std
for index, w in enumerate(self.weights):
layer_population = np.array([p[index] for p in population])
update_factor = self.learning_rate / (self.POPULATION_SIZE * self.SIGMA)
self.weights[index] = w + update_factor * np.dot(layer_population.T, rewards).T
self.learning_rate *= self.decay
def run(self, iterations, state=None):
"""Natural Evolution strategy loop involving creation of a population of generators, computation of reward of
each candidate and estimation of the gradient step to update the stored weights
:param iterations: number of iterations wanted
:param state: given if the current optimization mode is 'singular'
:return: list of all the rewards for each generation step
"""
start = time.time()
all_rewards = []
for iteration in range(iterations):
if self.verbose:
print(f"EvoStrat iteration {iteration+1}: creating new population\n"
f"Time elapsed: {round((time.time()-start)/60, 3)}\n")
population = self._get_population()
if self.verbose:
print(f"EvoStrat iteration {iteration+1}: call reward func\n"
f"Time elapsed: {round((time.time()-start)/60, 3)}\n")
rewards = self._get_rewards(population, state)
if self.verbose:
print(f"EvoStrat iteration {iteration+1}: update weights\n"
f"Time elapsed: {round((time.time()-start)/60, 3)}\n")
self._update_weights(rewards, population)
if self.verbose:
print(f"EvoStrat iteration {iteration+1}: compute current weights reward\n"
f"Time elapsed: {round((time.time()-start)/60, 3)}\n")
this_reward = self.get_reward(self._sample_parameters(self.weights), state, True)
# save parameters and results
if (iteration + 1) % self.save_step == 0:
with open(f'{self.save_path}/weights_{iteration}.pkl', 'wb') as f:
pickle.dump(self.weights, f)
with open(f'{self.save_path}/rewards_{iteration}.pkl', 'wb') as f:
pickle.dump(this_reward, f)
# print summary of iteration
if iteration == 0 or (iteration+1) % self.print_step == 0:
iteration_summary = f'*********** iteration {iteration+1} ***********\n'
for item in this_reward.items():
if item[0] != 'group':
iteration_summary = iteration_summary + f"steady-state {item[0]} reward: {item[1]:.5}\n"
elif state is not None:
iteration_summary = iteration_summary + f"steady-state {state} reward: {item[1]:.5}\n"
else:
iteration_summary = iteration_summary + f"{item[0]} reward: {item[1]:.5}"
print(iteration_summary)
this_end = time.time()
print(f'Time elapsed: {round((this_end-start)/60, 3)} minutes\n')
all_rewards.append(this_reward['group'])
return np.array(all_rewards)

Event Timeline