Source code for pyrltr.worlds.BaseWorld

import numpy as np


[docs]class BaseWorld(): """ BaseWorld implements all basic parts of the reinforcement learning environment. It in itself is not directly usable. Attributes: randomState -- necessary for randomization; necessary, because np.random.*distribution* becomes deterministic after initialization initialAgentTrainingPositions -- initial positions to be used in training for the agent initialGoalTrainingPositions -- initial positions to be used in training for the goal initialAgentTestingPositions -- initial positions to be used in testing for the agent initialGoalTestingPositions -- initial positions to be used in testing for the goal initialTrainingDistances -- the initial distances in the training episodes initialTestingDistances -- the initial distances in the testing episodes agentUndoStack -- allows the agent to undo the last action maxEpisodeLength -- number of steps until an episode is over """ def __init__(self, maxEpisodeLength): """ Constructor. Parameters: maxEpisodeLength -- the maximum number of steps in an episode. """ self.randomState = np.random.RandomState() self.maxEpisodeLength = maxEpisodeLength trainingAgentPositions, trainingGoalPositions =\ self.createInitialPositions(2) self.useInitialTrainingPositions(trainingAgentPositions, trainingGoalPositions) testingAgentPositions, testingGoalPositions =\ self.createInitialPositions(2) self.useInitialTestingPositions(testingAgentPositions, testingGoalPositions) finalTestingAgentPositions, finalTestingGoalPositions =\ self.createInitialPositions(2) self.useFinalTestingPositions(finalTestingAgentPositions, finalTestingGoalPositions) self.initialTrainingDistances = list() self.initialTestingDistances = list() self.agentUndoStack = list() self.reset()
[docs] def performAction(self, action): self.agentUndoStack.append(self.agentPosition) self.agentPosition = self._perform(self.agentPosition, action) self.agentPosition = self._validatePosition(self.agentPosition) # for a circular world self.distance = self.calculateDistanceToTarget(self.agentPosition, self.goalPosition) self.steps += 1
[docs] def undo(self): """ Undoes the last action. """ self.agentPosition = self.agentUndoStack.pop() self.distance = self.calculateDistanceToTarget(self.agentPosition, self.goalPosition)
[docs] def getDistanceToTarget(self): return self.distance
[docs] def episodeOver(self): return self.stepsExceeded() or self.targetReached(self.distance)
[docs] def stepsExceeded(self): return self.steps >= self.maxEpisodeLength
[docs] def targetReached(self, distance): return distance < 0.5
[docs] def getReward(self): reached = self.targetReached(self.distance) if self.episodeOver() and not reached: return -1 elif reached: print "reached!! at position: %s" % str(self.agentPosition) return 1 elif not reached: return -0.1
[docs] def createInitialPositions(self, n): """ Crates n initial positions for the agent and goal. Makes sure, that the target is not reached int these configurations. Parameters: n -- the number of initial positions to create """ agentPositions = list() goalPositions = list() for i in xrange(n): # I actually want a Do-while loop, but python does not provide it agentPosition = self._getInitialAgentPosition() goalPosition = self._getInitialGoalPosition() distance = self.calculateDistanceToTarget(agentPosition, goalPosition) while self.targetReached(distance): agentPosition = self._getInitialAgentPosition() distance = self.calculateDistanceToTarget(agentPosition, goalPosition) agentPositions.append(agentPosition) goalPositions.append(goalPosition) return np.array(agentPositions), np.array(goalPositions)
[docs] def useInitialTrainingPositions(self, agentPositions, goalPositions): """ Sets the initial positions for the training episodes. Parameters: agentPositions -- the initial positions for the agent goalPositions -- the initial positions for the goal """ assert agentPositions.shape[0] == goalPositions.shape[0], "%s == %s" %\ (agentPositions.shape[0], goalPositions.shape[0]) self.immutableInitialAgentTrainingPositions = agentPositions self.immutableInitialGoalTrainingPositions = goalPositions self.numberOfTrainingEpisodes = len(agentPositions) self.resetTrainingEpoch()
[docs] def resetTrainingEpoch(self): """ Resets the training positions to be used in the next epoch. """ self.initialAgentTrainingPositions = np.copy(self.immutableInitialAgentTrainingPositions).tolist() self.initialGoalTrainingPositions = np.copy(self.immutableInitialGoalTrainingPositions).tolist()
[docs] def useInitialTestingPositions(self, agentPositions, goalPositions): """ Sets the initial positions for the testing episodes. Parameters: agentPositions -- the initial positions for the agent goalPositions -- the initial positions for the goal """ assert agentPositions.shape[0] == goalPositions.shape[0], "%s == %s" %\ (agentPosition.shape[0], goalPositions.shape[0]) self.immutableInitialAgentTestingPositions = agentPositions self.immutableInitialGoalTestingPositions = goalPositions self.numberOfTestingEpisodes = len(agentPositions) self.resetTrainingEpoch()
[docs] def useFinalTestingPositions(self, agentPositions, goalPositions): """ Sets the initial positions for the final testing episodes which are called after learning. Parameters: agentPositions -- the initial positions for the agent goalPositions -- the initial positions for the goal """ assert agentPositions.shape[0] == goalPositions.shape[0], "%s == %s" %\ (agentPosition.shape[0], goalPositions.shape[0]) self.immutableFinalAgentTestingPositions = agentPositions self.immutableFinalGoalTestingPositions = goalPositions self.numberOfFinalTestingEpisodes = len(agentPositions) self.resetTrainingEpoch()
[docs] def resetTestingEpoch(self): """ Resets the testing positions to be used in the next epoch. """ self.initialAgentTestingPositions = np.copy(self.immutableInitialAgentTestingPositions).tolist() self.initialGoalTestingPositions = np.copy(self.immutableInitialGoalTestingPositions).tolist()
[docs] def resetFinalTestingEpoch(self): """ Resets the final testing positions to be used in the next epoch. """ self.initialAgentTestingPositions = np.copy(self.immutableFinalAgentTestingPositions).tolist() self.initialGoalTestingPositions = np.copy(self.immutableFinalGoalTestingPositions).tolist()
[docs] def getDistanceInActionSpace(self): """ Returns the distance in action space. """ return self.distance
[docs] def getStateQuality(self, state): """ Returns inverse distance to be used as quality measurement. """ return -self.calculateDistanceToTarget(state, self.goalPosition)
[docs] def setupTrainingEpisode(self): """ Prepares the environment for a training episode. """ self.reset() self.agentPosition = np.copy(self.initialAgentTrainingPositions.pop(0)) self.goalPosition = np.array(self.initialGoalTrainingPositions.pop(0)) self.distance = self.calculateDistanceToTarget(self.agentPosition, self.goalPosition) self.initialTrainingDistances.append(self.getDistanceInActionSpace())
[docs] def setupTestingEpisode(self): """ Prepares the environment for a testing episode. """ self.reset() self.agentPosition = np.copy(self.initialAgentTestingPositions.pop(0)) self.goalPosition = np.array(self.initialGoalTestingPositions.pop(0)) self.distance = self.calculateDistanceToTarget(self.agentPosition, self.goalPosition) self.initialTestingDistances.append(self.getDistanceInActionSpace())
[docs] def reset(self): """ Resets the environment. Contains only the parts, which are necessary after every episodes, regardless of whether is was a training or testing episode. """ self.visualizations = 0 self.steps = 0