KDD Cup|Humanities Track Tutorial Genetic Algorithm
This Tutorial builds on the previous tutorial to demonstrate a baseline implementation of an evolutionary search for high performing Policies
Reward
$R_\pi \in (-\infty, \infty)$
Policy
($\pi$) for this challenge consists of a temporal sequence of Actions.
In [ ]:
!pip3 install git+https://github.com/slremy/netsapi --user --upgrade
from netsapi.challenge import *
import pandas as pd
import numpy as np
import itertools
Learning high Performing Policies Based on a population based searchΒΆ
In [ ]:
class RemyGA:
'''
Simple Genetic Algorithm.
https://github.com/slremy/estool/
'''
def __init__(self, num_params, # number of model parameters
random_individuals_fcn,
mutate_fcn,
immigrant_ratio=.2, # percentage of new individuals
sigma_init=0.1, # initial standard deviation
sigma_decay=0.999, # anneal standard deviation
sigma_limit=0.01, # stop annealing if less than this
popsize=256, # population size
elite_ratio=0.1, # percentage of the elites
forget_best=False, # forget the historical best elites
weight_decay=0.01, # weight decay coefficient
):
self.num_params = num_params
self.sigma_init = sigma_init
self.sigma_decay = sigma_decay
self.sigma_limit = sigma_limit
self.popsize = popsize
self.random_individuals_fcn = random_individuals_fcn
self.mutate_fcn = mutate_fcn
self.solutions = None
self.elite_ratio = elite_ratio
self.elite_popsize = int(self.popsize * self.elite_ratio)
self.immigrant_ratio = immigrant_ratio
self.immigrant_popsize = int(self.popsize * self.immigrant_ratio)
self.sigma = self.sigma_init
self.elite_params = np.zeros((self.elite_popsize, self.num_params))
#self.elite_rewards = np.zeros(self.elite_popsize)
self.best_param = np.zeros(self.num_params)
self.best_reward = 0
self.reward_pdf = np.zeros(self.popsize+1)
self.solutions = np.zeros((self.popsize, self.num_params))
self.first_iteration = True
self.forget_best = forget_best
self.weight_decay = weight_decay
def rms_stdev(self):
return self.sigma # same sigma for all parameters.
def ask(self, process=lambda x:x):
'''returns a list of parameters'''
self.epsilon = np.random.randn(self.popsize, self.num_params) * self.sigma
def mate(a, b):
#single point crossover
c = np.copy(a)
idx = np.where(np.random.rand((c.size)) > 0.5)
c[idx] = b[idx]
return c
def crossover(a,b):
cross_point = int((self.num_params-1)*np.random.rand(1));
c = np.append(a[:cross_point], b[cross_point:self.num_params]);
return c
index_array = np.arange(self.popsize)
if self.first_iteration:
self.solutions = process(self.random_individuals_fcn(self.popsize,self.num_params))
else:
#intialize the index list for "mating" chromosomes
childrenIDX = range(self.popsize - self.elite_popsize - self.immigrant_popsize);
selected = np.arange(2*len(childrenIDX));
for i in range(len(selected)):
testNo = 1;
#Choose a parent
while self.reward_pdf[testNo] < np.random.rand():
testNo = testNo + 1;
selected[i] = index_array[testNo];
children = []
for i in range(len(childrenIDX)):
chromosomeA = self.solutions[selected[i*2+0], :];
chromosomeB = self.solutions[selected[i*2+1], :];
child = crossover(chromosomeA,chromosomeB) if 0.5 < np.random.rand() else crossover(chromosomeB,chromosomeA)
children.append(self.mutate_fcn(child))
self.solutions = process(np.concatenate((self.elite_params, self.random_individuals_fcn(self.immigrant_popsize,self.num_params), np.array(children))))
return self.solutions
def tell(self, reward_table_result):
# input must be a numpy float array
assert(len(reward_table_result) == self.popsize), "Inconsistent reward_table size reported."
reward_table = np.array(reward_table_result)
if self.weight_decay > 0:
l2_decay = compute_weight_decay(self.weight_decay, self.solutions)
reward_table += l2_decay
reward = reward_table
solution = self.solutions
reward_masked = np.ma.masked_array(reward,mask = (np.isnan(reward) | np.isinf(reward)))
self.reward_pdf = (np.cumsum(reward_masked)/np.sum(reward_masked)).compressed()
sorted_idx = np.argsort(reward_masked)[::-1]
idx = sorted_idx[~reward_masked.mask][0:self.elite_popsize]
assert(len(idx) == self.elite_popsize), "Inconsistent elite size reported."
self.elite_rewards = reward[idx]
self.elite_params = solution[idx]
self.curr_best_reward = self.elite_rewards[0]
if self.first_iteration or (self.curr_best_reward > self.best_reward):
self.first_iteration = False
self.best_reward = self.elite_rewards[0]
self.best_param = np.copy(self.elite_params[0])
if (self.sigma > self.sigma_limit):
self.sigma *= self.sigma_decay
self.first_iteration = False
def current_param(self):
return self.elite_params[0]
def set_mu(self, mu):
pass
def best_param(self):
return self.best_param
def result(self): # return best params so far, along with historically best reward, curr reward, sigma
return (self.best_param, self.best_reward, self.curr_best_reward, self.sigma)
def mutate(chromosome):
mutation_rate = .5
for j in range(chromosome.shape[0]):
r = np.random.rand(1);
if(r > mutation_rate):
chromosome[j] = np.remainder(chromosome[j]+np.random.randn(1),0.99);
return chromosome
def make_random_individuals(x,y):
value=np.random.rand(x,y);
return value
def boundary(individual):
processed = individual%(1+np.finfo(float).eps)
return processed
Creating a Valid Submission from Agent Code:
In [ ]:
class SRGAAgent():
def __init__(self, environment):
self._epsilon = 0.2 # 20% chances to apply a random action
self._gamma = 0.99 # Discounting factor
self._alpha = 0.5 # soft update param
self.environment = environment #self._env = env
self._resolution = .1#self.environment.resolution
self.popsize=10
self.num_paramters = 10
self.solver = RemyGA(self.num_paramters, # number of model parameters
random_individuals_fcn=make_random_individuals,
mutate_fcn = mutate,
sigma_init=1, # initial standard deviation
popsize=self.popsize, # population size
elite_ratio=0.2, # percentage of the elites
forget_best=False, # forget the historical best elites
weight_decay=0.00, # weight decay coefficient
)
def stateSpace(self):
return range(1,self.environment.policyDimension+1)
def train(self):
allrewards = []
for episode in range(20):
rewards = []
if episode % self.popsize == 0:
# ask for a set of random candidate solutions to be evaluated
solutions = self.solver.ask(boundary)
#convert an array of 10 floats into a policy of itn, irs per year for 5 years
policies = []
for v in solutions:
actions = [i for i in itertools.zip_longest(*[iter(v)] * 2, fillvalue="")]
policy = {i+1: list(actions[i]) for i in range(5)}
policies.append(policy)
# calculate the reward for each given solution using the environment's method
batchRewards = self.environment.evaluatePolicy(policies)
rewards.append(batchRewards)
self.solver.tell(batchRewards);
allrewards.append(rewards)
return np.array(allrewards)
def generate(self):
self.train()
#generate a policy from the array used to represent the candidate solution
actions = [i for i in itertools.zip_longest(*[iter(self.solver.best_param)] * 2, fillvalue="")]
best_policy = {state: list(actions[state-1]) for state in self.stateSpace()}
best_reward = self.environment.evaluatePolicy(best_policy)
print(best_policy, best_reward)
return best_policy, best_reward
Run the EvaluateChallengeSubmission Method with your Agent Class
In [ ]:
EvaluateChallengeSubmission(ChallengeSeqDecEnvironment, SRGAAgent, "SRGAAgent_20.csv")