Source code for pyrltr.worlds.WorldFactory

import numpy as np
from pyrltr.worlds import NumericalSimulator
from pyrltr.worlds import InverseKinematics
import os

[docs]class WorldFactory: """ Creates, stores and reads world objects. Attributes: NUMERICAL_ONE_DOF -- filename for the numerical one DoF world NUMERICAL_TWO_DOF -- filename for the numerical two DoF world NUMERICAL_THREE_DOF -- filename for the numerical three DoF world NUMERICAL_NIMBRO -- filename for the numerical nimbro world VREP_ONE_DOF -- filename for the v-rep one DoF world VREP_TWO_DOF -- filename for the v-rep two DoF world AGENT_TRAINING_POSITIONS -- training positions for the agent GOAL_TRAINING_POSITIONS -- training positions for the goal AGENT_TESTING_POSITIONS -- testing positions for the agent GOAL_TESTING_POSITIONS -- testing positions for the goal """ # names of the environments NUMERICAL_ONE_DOF = "numerical.onedof" NUMERICAL_TWO_DOF = "numerical.twodof" NUMERICAL_THREE_DOF = "numerical.threedof" NUMERICAL_NIMBRO = "numerical.nimbro" INVERSE_TWO_DOF = "inversekinematics.twodof" INVERSE_TWO_DOF_REDUNDANT = "inversekinematics.twodofredundant" VREP_ONE_DOF = "vrep.onedof" VREP_TWO_DOF = "vrep.twodof" # names of the files AGENT_TRAINING_POSITIONS = "agentTrainingPositions.dat.gz" GOAL_TRAINING_POSITIONS = "goalTrainingPositions.dat.gz" AGENT_TESTING_POSITIONS = "agentTestingPositions.dat.gz" GOAL_TESTING_POSITIONS = "goalTestingPositions.dat.gz" FINAL_AGENT_TESTING_POSITIONS = "final_agentTestingPositions.dat.gz" FINAL_GOAL_TESTING_POSITIONS = "final_goalTestingPositions.dat.gz"
[docs] def createWorld(self, name, path, numberOfTrainingPositions, numberOfTestingPositions, numberOfFinalTestingPositions): """ Creates the world and if necessary, creates the initial positions as well. Parameters: name -- the name of the world, i.e. numerical.onedof or numerical.twodof. See class attributes for possible values path -- the path to look in for the files numberOfTrainingPositions -- the number of positions to use for training numberOfTestingPositionss -- the number of positions to use for testing numberOfFInalTestingPositions -- the number of final positions to use after the learning is completed """ lowerName = name.lower() if lowerName == self.NUMERICAL_ONE_DOF: world = NumericalSimulator.SingleDoF() elif lowerName == self.NUMERICAL_TWO_DOF: world = NumericalSimulator.TwoDoF() elif lowerName == self.NUMERICAL_THREE_DOF: world = NumericalSimulator.ThreeDoF() elif lowerName == self.NUMERICAL_NIMBRO: world = NumericalSimulator.Nimbro() elif lowerName == self.INVERSE_TWO_DOF: world = InverseKinematics.TwoDoF() elif lowerName == self.INVERSE_TWO_DOF_REDUNDANT: world = InverseKinematics.TwoDoFReduntant() elif lowerName == self.VREP_ONE_DOF: from Worlds import Vrep world = Vrep.SingleDoF("localhost", 19997) elif lowerName == self.VREP_TWO_DOF: from Worlds import Vrep world = Vrep.TwoDoF("localhost", 19997) else: errorMessage = "Could not fine world: %s" % lowerName print errorMessage raise Exception(errorMessage) files = os.listdir(path) files = filter(lambda x: lowerName in x, files) if len(files) != 6: print("creating initial positions") self._createInitialPositions(world, numberOfTrainingPositions, self.AGENT_TRAINING_POSITIONS, self.GOAL_TRAINING_POSITIONS, path, lowerName) self._createInitialPositions(world, numberOfTestingPositions, self.AGENT_TESTING_POSITIONS, self.GOAL_TESTING_POSITIONS, path, lowerName) self._createInitialPositions(world, numberOfFinalTestingPositions, self.FINAL_AGENT_TESTING_POSITIONS, self.FINAL_GOAL_TESTING_POSITIONS, path, lowerName) print("reading initial positions.") self._readInitialPositionsIntoWorld(world, files, path, numberOfTrainingPositions, numberOfTestingPositions, numberOfFinalTestingPositions, name) return world
def _createInitialPositions(self, world, numberOfEpisodes, agentFilename, goalFilename, path, name): """ Creates the initial positions and stores them in files with the given names. Parameters: world -- the world to use to create the initial positions numberOfEpisodes -- the number of episodes to create the initial positions for agentFilname -- name of the file to store the agent's positiosn goalFilename -- name of the file to store the goal's positions path -- folder in which to store the files name -- name of the world, so it can be read later """ # one for the last reset initialAgentPositions, initialGoalPositions = world.createInitialPositions(numberOfEpisodes + 1) agentFilename = "%s/%s_%s" % (path, name, agentFilename) np.savetxt(agentFilename, initialAgentPositions) goalFilename = "%s/%s_%s" % (path, name, goalFilename) np.savetxt(goalFilename, initialGoalPositions) def _readInitialPositionsIntoWorld(self, world, files, path, numberOfTrainingPositions, numberOfTestingPositions, numberOfFinalTestingPositions, name): """ Reads the initial positions from the file system and sets them to be used by the world. Parameters: world -- the world which is supposed to use the initial positions files -- to read the positions from path -- path in which the files lie numberOfTrainingPositions -- the number of positions to use for training numberOfTestingPositions -- the number of positions to use for testing numberOfFInalTestingPositions -- the number of final positions to use after the learning is completed name -- the name of the environment """ assert 6 == len(files), "%s == %s" % (6, len(files)) filesWithPath = map(lambda x: "%s/%s" % (path, x), files) filenameToCompleteFile = dict(zip(map(lambda x: x.replace(name + "_", ""), files), filesWithPath)) agentTrainingPositions = self._read(filenameToCompleteFile[self.AGENT_TRAINING_POSITIONS]) goalTrainingPositions = self._read(filenameToCompleteFile[self.GOAL_TRAINING_POSITIONS]) assert numberOfTrainingPositions == len(agentTrainingPositions) - 1, "%s, == %s" % (numberOfTrainingPositions, len(agentTrainingPositions) -1) assert numberOfTrainingPositions == len(goalTrainingPositions) - 1, "%s, == %s" % (numberOfTrainingPositions, len(goalTrainingPositions) -1) agentTestingPositions = self._read(filenameToCompleteFile[self.AGENT_TESTING_POSITIONS]) goalTestingPositions = self._read(filenameToCompleteFile[self.GOAL_TESTING_POSITIONS]) assert numberOfTestingPositions == len(agentTestingPositions) - 1, "%s, == %s" % (numberOfTestingPositions, len(agentTestingPositions) -1) assert numberOfTestingPositions == len(goalTestingPositions) - 1, "%s, == %s" % (numberOfTestingPositions, len(goalTestingPositions) -1) finalAgentTestingPositions = self._read(filenameToCompleteFile[self.FINAL_AGENT_TESTING_POSITIONS]) finalGoalTestingPositions = self._read(filenameToCompleteFile[self.FINAL_GOAL_TESTING_POSITIONS]) assert numberOfFinalTestingPositions == len(finalAgentTestingPositions) - 1, "%s, == %s" % (numberOfFinalTestingPositions, len(agentTestingPositions) -1) assert numberOfFinalTestingPositions == len(finalGoalTestingPositions) - 1, "%s, == %s" % (numberOfFinalTestingPositions, len(goalTestingPositions) -1) world.useInitialTrainingPositions(agentTrainingPositions, goalTrainingPositions) world.useInitialTestingPositions(agentTestingPositions, goalTestingPositions) world.useFinalTestingPositions(finalAgentTestingPositions, finalGoalTestingPositions) def _read(self, filename): """ Reads the content of the filename as a numpy.ndarray with dim == 2. """ return np.loadtxt(filename)