Place the agent in your mmlf/agents directory.
Place the configuration file in ~/.mmlf/config.
Run with: ./run_mmlf --config world_simple_q.yaml.
| """MMLF agent that implements Q-Learning. """ | |
| __author__ = "Adriano Di Luzio & Danilo Francati" | |
| __copyright__ = "Copyright 2015, Sapienza University of Rome, CS" | |
| __credits__ = ['Mark Edgington'] | |
| __license__ = "GPLv3" | |
| __version__ = "1.0" | |
| __maintainer__ = "Adriano Di Luzio" | |
| __email__ = "[email protected]" | |
| import random | |
| import pprint | |
| import mmlf.framework.protocol | |
| from mmlf.agents.agent_base import AgentBase | |
| # Each agent has to inherit directly or indirectly from AgentBase | |
| class QAgent(AgentBase): | |
| """Agent that chooses the next action in order to maximize the expected reward.""" | |
| # Add default configuration for this agent to this static dict | |
| # This specific parameter controls after how many steps we send information | |
| # regarding the accumulated reward to the logger. | |
| DEFAULT_CONFIG_DICT = { | |
| 'epsilon': 0.5, # Probability of action | |
| 'gamma': 0.9, # Reward distance | |
| } | |
| def __init__(self, *args, **kwargs): | |
| # Create the agent info | |
| self.agentInfo = \ | |
| mmlf.framework.protocol.AgentInfo( | |
| # Which communication protocol | |
| # version can the agent handle? | |
| versionNumber="0.3", | |
| # Name of the agent (can be chosen arbitrarily) | |
| agentName="Q", | |
| # Can the agent be used in | |
| # environments with continuous | |
| # state spaces? | |
| continuousState=False, | |
| # Can the agent be used in | |
| # environments with continuous | |
| # action spaces? | |
| continuousAction=False, | |
| # Can the agent be used in | |
| # environments with discrete | |
| # action spaces? | |
| discreteAction=True, | |
| # Can the agent be used in | |
| # non-episodic environments | |
| nonEpisodicCapable=False | |
| ) | |
| # Calls constructor of base class | |
| # After this call, the agent has an attribute "self.configDict", | |
| # The values of this dict are evaluated, i.e. instead of '100' (string), | |
| # the key 'Reward log frequency' will have the same value 100 (int). | |
| super(QAgent, self).__init__(*args, **kwargs) | |
| # The superclass AgentBase implements the methods setStateSpace() and | |
| # setActionSpace() which set the attributes stateSpace and actionSpace | |
| # They can be overwritten if the agent has to modify these spaces | |
| # for some reason | |
| self.stateSpace = None | |
| self.actionSpace = None | |
| # The exploration rate of the agent | |
| self.gamma = self.configDict.get('gamma', 0.9) | |
| self.epsilon = self.configDict.get('epsilon', 0.5) | |
| # The Q matrix | |
| self.Q = None | |
| # ##################### BEGIN COMMAND-HANDLING METHODS ################### | |
| def setStateSpace(self, stateSpace): | |
| """ Informs the agent about the state space of the environment | |
| More information about state spaces can be found in | |
| :ref:`state_and_action_spaces` | |
| """ | |
| super(QAgent, self).setStateSpace(stateSpace) | |
| # Get a list of all the possible spaces | |
| self.states = self.stateSpace.getStateList() | |
| def setActionSpace(self, actionSpace): | |
| """Informs the agent about the action space of the environment | |
| More information about action spaces can be found in | |
| :ref:`state_and_action_spaces` | |
| """ | |
| super(QAgent, self).setActionSpace(actionSpace) | |
| # We can only deal with one-dimensional action spaces | |
| assert self.actionSpace.getNumberOfDimensions() == 1, \ | |
| "Action space must be one-dimensional" | |
| # Get a list of all actions this agent might take | |
| self.actions = self.actionSpace.getActionList() | |
| # Init the Q matrix | |
| self._initQ() | |
| def getAction(self): | |
| """Request the next action the agent want to execute.""" | |
| self.previousAction = self.lastAction | |
| self.previousState = self.lastState | |
| # Each action of the agent corresponds to one step | |
| self.action = self._chooseRandomAction() | |
| # Create an action dictionary | |
| # that maps action dimension to chosen action | |
| actionDictionary = dict() | |
| for index, actionName in enumerate(self.actionSpace.iterkeys()): | |
| actionDictionary[actionName] = self.action[index] | |
| # Call super class method since this updates some internal information | |
| # (self.lastState, self.lastAction, self.reward, self.state, self.action) | |
| super(QAgent, self).getAction() | |
| return self._generateActionObject(actionDictionary) | |
| def giveReward(self, reward): | |
| """Provides a reward to the agent """ | |
| self._updateQ(reward) | |
| def nextEpisodeStarted(self): | |
| """Informs the agent that a new episode has started.""" | |
| # We delegate to the superclass, which does the following: | |
| # self.episodeCounter += 1 | |
| # self.stepCounter = 0 | |
| super(QAgent, self).nextEpisodeStarted() | |
| self.agentLog.info("Q matrix: \n%s", pprint.pformat(self.Q)) | |
| # ####################### END COMMAND-HANDLING METHODS ################### | |
| def _initQ(self): | |
| """Initialize the Q matrix | |
| Build a dictionary whose keys are the possible spaces. | |
| Each value is again a dictionary, whose keys are the possible actions. | |
| Finally, the values are the Q values. | |
| """ | |
| # If the stateSpace has been set, create the spaces in Q | |
| if self.states: | |
| self.Q = {s: None for s in self.states} | |
| # If the actionSpace has been set, create the actions | |
| # for each state in already Q | |
| if self.actions: | |
| for s in self.Q: | |
| self.Q[s] = {a: 0 for a in self.actions} | |
| self.agentLog.info("New Q matrix: \n%s", pprint.pformat(self.Q)) | |
| def _chooseRandomAction(self): | |
| "Chooses an action randomly from the action space" | |
| assert self.actionSpace, \ | |
| "Error: Action requested before actionSpace was specified" | |
| return random.choice(self.actions) | |
| def _updateQ(self, reward): | |
| "Update the Q value after an action from a state." | |
| # self.agentLog.info("%s -> %s", self.previousState, self.previousAction) | |
| # self.agentLog.info("%s -> %s", self.lastState, self.lastAction) | |
| if self.previousState is not None and \ | |
| self.previousAction is not None: | |
| # Update the Q value | |
| self.Q[self.previousState][self.previousAction] = \ | |
| (1 - self.epsilon) * self.Q[self.previousState][self.previousAction] + \ | |
| self.epsilon * \ | |
| (reward + self.gamma * (max(self.Q[self.lastState].values()))) | |
| # Each module that implements an agent must have a module-level attribute | |
| # "AgentClass" that is set to the class that inherits from Agentbase | |
| AgentClass = QAgent | |
| # Furthermore, the name of the agent has to be assigned to "AgentName". This | |
| # name is used in the GUI. | |
| AgentName = "Q" |
| worldPackage : maze2d | |
| environment: | |
| moduleName : "maze2d_environment" | |
| configDict: | |
| episodesUntilDoorChange : 10000 | |
| MAZE : "maze_simple.cfg" | |
| agent: | |
| moduleName : "academic_agent" | |
| configDict: | |
| gamma : 0.9 | |
| epsilon : 0.5 | |
| monitor: | |
| policyLogFrequency : 1000 |