Source code for pyrltr.agents.Cacla

# -*- coding: utf-8 -*-
"""
Implementation of the Continuous actor critic automaton. Uses neural networks
as actor and critic.
"""

from pymlp.mlp.NeuralNetworkAdapter import FFNetworkFactory
import numpy as np


[docs]class Cacla(): """ Implementation of Continuous Actor-Critic Learning Automata (CACLA). """ def __init__(self, actorLayout, transferFunctions, replicationNumber, maxReward=1, alpha=0.2, beta=0.2, gamma=0.1, sigma=0.05, folder="Cacla"): """ Initializes the basic member variables, not the controller itself. Parameters: actorLayout -- the layout of the actor's neural network transferFunctions -- the transfer functions for the neural network replicationNumber -- the number for the replication, simply an identifier for the controllers maxReward -- the maximum possible reward so the received reward can be scaled to be < 1 alpha -- the critic's learning rate beta -- the actor's learning rate gamma -- the discount factor sigma -- the standard deviation for the exploration folder -- the folder in which the data shall be stored """ # Make sure it uses its own random seed self.randomState = np.random.RandomState() self.zeta = 0.001 self.inputDimensions = actorLayout[0] self.actionDimensions = actorLayout[-1] self.actorLayout = actorLayout self.transferFunctions = transferFunctions self.maxReward = maxReward self.alpha = alpha self.beta = beta self.gamma = gamma self.sigma = sigma self.replicationNumber = replicationNumber self.folder = folder self.networkFactory = FFNetworkFactory(folder) self.errors = list() self.actorErrors = list() self.sigmas = list()
[docs] def initController(self): """ Initializes the controllers for the critic and actor. """ self.critic = self.networkFactory.createNetwork("critic", self.actorLayout[0:-1] + (1,), self.transferFunctions, self.alpha, self.replicationNumber) self.actor = self.networkFactory.createNetwork("actor", self.actorLayout, self.transferFunctions, self.beta, self.replicationNumber) self.var = 1.0
[docs] def updateReward(self, state, reward, nextState, action, episodeOver): """ Updated the reward according to the algorithms definition. Parameters: state -- the current state of the environment reward -- the just received reward nextState -- the state of the environment after performing the action action -- the just performed action episodeOver -- True if this was the last step in this episode """ reward = self.scaleReward(reward) futureOpinion = self.getCriticOpinion(nextState) # ignore the future, if there is none, because this episode is over stillRunning = not episodeOver target = reward + (stillRunning * self.gamma * futureOpinion) criticOpinion = self.getCriticOpinion(state) delta_t = (target - criticOpinion) self.errors.append(delta_t) numIt = np.ceil(np.nan_to_num(delta_t / np.sqrt(self.var))) self.var = self.updateVar(delta_t) self.critic.train(np.array([state]), np.array([target]), 1) updateActor = delta_t[0] > 0 # actual performance better than expected if updateActor: self.actor.train(np.array([state]), np.array([action]), numIt)
[docs] def updateVar(self, delta_t): """ Updated the running variance based on the td-error. Parameters: delta_t -- the current temporal difference error returns -- the new value for the running variance """ return (1 - self.zeta) * self.var + self.zeta * delta_t ** 2
def _explore(self, actions): """ Adds the exploration to the actions. Uses a simple simulation of the truncated normal distribution. Parameters: actions -- actions to which the exploratory noise shall be added returns -- the action + noise """ explored = actions if self.sigma >0: explored = map(self._sample, actions) return np.array(explored) def _sample(self, mean): """ Samples from a truncated normal distribution with the borders -1, 1 around the given mean value. Parameters: mean -- the mean value of the truncated normal distribution to sample from """ X = self.randomState.normal(mean, self.sigma) return X
[docs] def selectAction(self, state): """ Selects an action with exploration. Parameters: state -- the state for which to select the action returns -- the result from the actor's controller plus the exploration """ actions = self.getAction(state) actions = self._explore(actions) return actions
[docs] def getAction(self, state): """ Selects an action without exploration. Parameters: state -- the state for which to select the action returns -- the result from the actor's controller """ action = self.actor.forward(np.array([state]))[0] return action
[docs] def getCriticOpinion(self, state): """ Returns the critic's current estimate of the value function for the state. Parameters: state -- the state the critic shall evaluate returns -- the current estimate of the value function for the state """ return self.critic.forward(np.array([state]))
[docs] def scaleReward(self, reward): """ Scales the reward to be within [-1, 1]. """ reward = reward / float(self.maxReward) # reward = (reward + 1) / 2.0 return reward
[docs] def reset(self): """ Resets the learner. """ self.sigmas.append(self.sigma)
[docs] def getDataFolderName(self): """ Returns the name of the folder containing the stored data. """ return "%s/results/%s_%s_%s_%s" % (self.folder, self.alpha, self.beta, self.sigma, self.gamma)
[docs] def finalize(self, folder, index): """ Finalizes this learner, e.g. saves the controllers after training. """ pathName = folder + "/networks" self.networkFactory.saveNetwork(self.actor, "actor", pathName, index) self.networkFactory.saveNetwork(self.critic, "critic", pathName, index)
[docs]class ADCacla(Cacla): """ An Action Dependent implementation of Cacla. The main difference is that the critic does not learn the value function but the Q-Function. """
[docs] def initController(self): """ Initializes the controllers for the critic and actor. """ criticLayout = (self.actorLayout[0] + self.actorLayout[-1],) + self.actorLayout[1:-1] + (1,) self.critic = self.networkFactory.createNetwork("critic", criticLayout, self.transferFunctions, self.alpha, self.replicationNumber) self.actor = self.networkFactory.createNetwork("actor", self.actorLayout, self.transferFunctions, self.beta, self.replicationNumber) self.var = 1.0
[docs] def updateReward(self, state, reward, nextState, action, episodeOver): """ Updated the reward according to the algorithms definition. Parameters: state -- the current state of the environment reward -- the just received reward nextState -- the state of the environment after performing the action action -- the just performed action episodeOver -- True if this was the last step in this episode """ reward = self.scaleReward(reward) futureOpinion = self.getCriticOpinion(np.concatenate((nextState, self.getAction(nextState)))) # ignore the future, if there is none, because this episode is over stillRunning = not episodeOver target = reward + (stillRunning * self.gamma * futureOpinion) criticOpinion = self.getCriticOpinion(np.concatenate((state, action))) delta_t = (target - criticOpinion) self.errors.append(delta_t) numIt = np.ceil(np.nan_to_num(delta_t / np.sqrt(self.var))) self.var = self.updateVar(delta_t) self.critic.train(np.array([np.concatenate((state, action))]), np.array([target]), 1) updateActor = delta_t[0] > 0 # actual performance better than expected if updateActor: self.actor.train(np.array([state]), np.array([action]), numIt)