Creating Custom Experiment#
This tutorial will guide you through the process of creating a custom experiment using the BaseExperiment class provided. We’ll explain the implementation of each function required to customize the experiment and provide an example of a custom experiment class called SearchingExperiment.
BaseExperiment Class Overview#
The BaseExperiment class provides a basic structure for defining experiments in a simulation environment. It is designed to be inherited by custom experiment classes. Here’s a brief overview of the functions defined in the BaseExperiment class:
class BaseExperiment(with_metaclass(ABCMeta, object)):
def __init__(self, config, core, experiment_config=None, *args, **kargs):
"""The base experiment which other experiments should inherit
Parameters
----------
config : yaml
The experiment configuration file
core : object
The SHASTA core object
Returns
-------
None
"""
self.config = config
self.core = core
self.exp_config = experiment_config
return None
def reset(self):
"""Called at the beginning and each time the simulation is reset"""
pass
@abstractmethod
def get_action_space(self):
"""Returns the action space"""
raise NotImplementedError
@abstractmethod
def get_observation_space(self):
"""Returns the observation space"""
raise NotImplementedError
def get_actions(self):
"""Returns the actions"""
raise NotImplementedError
@abstractmethod
def apply_actions(self, actions, core):
"""Given the action, applies the action to group or individual actor
:param action: value outputted by the policy
"""
raise NotImplementedError
@abstractmethod
def get_observation(self, observation, core):
"""Function to do all the post processing of observations (sensor data).
:param sensor_data: dictionary {sensor_name: sensor_data}
Should return a tuple or list with two items, the processed observations,
as well as a variable with additional information about such observation.
The information variable can be empty
"""
return NotImplementedError
def get_done_status(self, observation, core):
"""Returns whether or not the experiment has to end"""
return NotImplementedError
def compute_reward(self, observation, core):
"""Computes the reward"""
return NotImplementedError
__init__(self, config, core, experiment_config=None, *args, **kargs): Initializes the base experiment class with configuration parameters.
reset(self): Called at the beginning and each time the simulation is reset.
get_action_space(self): Returns the action space.
get_observation_space(self): Returns the observation space.
get_actions(self): Returns the actions.
apply_actions(self, actions, core): Given the action, applies the action to group or individual actor.
get_observation(self, observation, core): Processes observation data.
get_done_status(self, observation, core): Returns whether or not the experiment has to end.
compute_reward(self, observation, core): Computes the reward.
Custom Experiment: SearchingExperiment#
The SearchingExperiment class is an example of a custom experiment inheriting from the BaseExperiment class. This experiment is designed for a simulation environment where agents (UAVs and UGVs) need to search for a target building. Below is the implementation of the SearchingExperiment class along with explanations for each overridden function:
from collections import defaultdict
from shasta.base_experiment import BaseExperiment
from .actions import SimpleActionDecoder
from .custom_primitive import FormationWithPlanning
from .states import StatesExtractor
def group_actors_by_type(actor_groups):
actor_type = defaultdict(list)
for key, value in sorted(actor_groups.items()):
actor_type[value[0].type].extend(value)
return actor_type
class SearchingExperiment(BaseExperiment):
def __init__(self, config, core, experiment_config=None, *args, **kargs):
super().__init__(config, core, experiment_config, *args, **kargs)
self.state_extractor = StatesExtractor(experiment_config, core)
self.actions_decoder = SimpleActionDecoder(experiment_config)
self.actions = {}
env_map = core.get_map()
for i in range(6):
self.actions[i] = FormationWithPlanning(env_map)
def reset(self):
pass
def get_action_space(self):
pass
def get_observation_space(self):
pass
def get_actions(self):
pass
def apply_actions(self, actions, core):
paretos = self.state_extractor.get_pareto_node()
decoded_action = self.actions_decoder.get_action(actions, paretos)
while True:
actor_groups = core.get_actor_groups()
self.actions_done = []
for group_id, action_args in decoded_action.items():
self.actions_done.append(
self.actions[group_id].execute(actor_groups[group_id], action_args)
)
core.tick()
if all(self.actions_done):
for group_id in actor_groups.keys():
self.actions[group_id].path_points = None
break
def get_observation(self, observation, core):
actor_groups = core.get_actor_groups()
grouped_by_type = group_actors_by_type(actor_groups)
states = self.state_extractor.get_state(
grouped_by_type['uav'], grouped_by_type['ugv']
)
done = self.state_extractor.update_progrees(
grouped_by_type['uav'], grouped_by_type['ugv']
)
return states, {"searching_done": done}
def get_done_status(self, observation, core):
return self.actions_done
def compute_reward(self, observation, core):
return 0
Now let’s explain each function in detail:
__init__(self, config, core, experiment_config=None, *args, **kargs): Initializes the SearchingExperiment class by setting up state extractors, action decoders, and primitive actions.
reset(self): This function is called at the beginning and each time the simulation is reset. In this example, it doesn’t have any implementation.
get_action_space(self): Returns the action space. It is not implemented in this example.
get_observation_space(self): Returns the observation space. It is not implemented in this example.
get_actions(self): Returns the actions. It is not implemented in this example.
apply_actions(self, actions, core): Applies the decoded actions to the actors in the simulation environment. It iterates through decoded actions, applies them to respective actor groups, and updates the simulation.
get_observation(self, observation, core): Processes observation data. It extracts states of UAVs and UGVs from the simulation environment using the state extractor.
get_done_status(self, observation, core): Returns whether or not the experiment has to end based on the completion status of actions.
compute_reward(self, observation, core): Computes the reward. In this example, it returns a constant reward of 0.
This concludes the tutorial on creating a custom experiment using the BaseExperiment class. You can further extend and customize this experiment class according to your specific simulation requirements.