import os
import signal
import psutil
import pybullet as p
from pybullet_utils import bullet_client as bc
from .map import Map
from .utils import get_initial_positions
from .world import World
[docs]def kill_all_servers():
    """Kill all PIDs that start with Carla"""
    processes = [p for p in psutil.process_iter() if "carla" in p.name().lower()]
    for process in processes:
        os.kill(process.pid, signal.SIGKILL) 
[docs]class ShastaCore:
    """
    Class responsible of handling all the different CARLA functionalities,
    such as server-client connecting, actor spawning,
    and getting the sensors data.
    """
    def __init__(self, config, actor_groups: dict = None):
        """Initialize the server and client"""
        self.config = config
        self.actor_groups = actor_groups
        # Verify if the actor groups is a dictionary
        if not isinstance(self.actor_groups, dict):
            raise TypeError("Actor groups should be of type dict")
        # Setup world and map
        self.world = World(config)
        self.map = Map()
        self.init_server()
        self._setup_physics_client()
[docs]    def _setup_physics_client(self):
        """Setup the physics client
        Returns
        -------
        None
        """
        # Usage mode
        if self.config["headless"]:
            self.physics_client = bc.BulletClient(connection_mode=p.DIRECT)
        else:
            options = "--background_color_red=0.85 --background_color_green=0.85 --background_color_blue=0.85"  # noqa
            self.physics_client = bc.BulletClient(
                connection_mode=p.GUI, options=options
            )
            # Set the camera parameters
            self.camer_distance = 150.0
            self.camera_yaw = 0.0
            self.camera_pitch = -89.999
            self.camera_target_position = [0, 30, 0]
            self.physics_client.resetDebugVisualizerCamera(
                cameraDistance=self.camer_distance,
                cameraYaw=self.camera_yaw,
                cameraPitch=self.camera_pitch,
                cameraTargetPosition=self.camera_target_position,
            )
            self.physics_client.configureDebugVisualizer(
                self.physics_client.COV_ENABLE_GUI, 0
            )
        # Set gravity
        self.physics_client.setGravity(0, 0, -9.81)
        # Set parameters for simulation
        self.physics_client.setPhysicsEngineParameter(
            fixedTimeStep=self.config["time_step"] / 10,
            numSubSteps=1,
            numSolverIterations=5,
        )
        # Inject physics client
        if self.world.physics_client is None:
            self.world.physics_client = self.physics_client
        return None 
[docs]    def get_physics_client(self):
        """Ge the physics client
        Returns
        -------
        object
            The bullet physics client
        """
        return self.physics_client 
[docs]    def init_server(self):
        """Start a server on a random port"""
        pass 
[docs]    def setup_experiment(self, experiment_config):
        """Initialize the hero and sensors"""
        # Load the environment and setup the map
        self.map.setup(experiment_config)
        read_path = self.map.asset_path + "/environment_collision_free.urdf"
        self.world.load_world_model(read_path)
        # Spawn the actors in the physics client
        self.spawn_actors() 
[docs]    def get_world(self):
        """Get the World object from the simulation
        Returns
        -------
        object
            The world object
        """
        return self.world 
[docs]    def get_map(self):
        """Get the Map object from the simulation
        Returns
        -------
        object
            The map object
        """
        return self.map 
[docs]    def reset(self):
        """This function resets / spawns the hero vehicle and its sensors"""
        # Reset all the actors
        for group_id in self.actor_groups:
            # Check if the entry is a list or not
            if not isinstance(self.actor_groups[group_id], list):
                self.actor_groups[group_id] = [self.actor_groups[group_id]]
            for actor in self.actor_groups[group_id]:
                # Reset the actor and collect the observation
                actor.reset()
        return None 
[docs]    def spawn_actors(self):
        """Spawns vehicles and walkers, also setting up the Traffic Manager and its parameters"""
        for group_id in self.actor_groups:
            # Check if the entry is a list or not
            if not isinstance(self.actor_groups[group_id], list):
                self.actor_groups[group_id] = [self.actor_groups[group_id]]
            # Spawn the actors
            spawn_point = self.map.get_cartesian_spawn_points()
            positions = get_initial_positions(
                spawn_point, 10, len(self.actor_groups[group_id])
            )
            for actor, position in zip(self.actor_groups[group_id], positions):
                if actor.init_pos is None:
                    actor.init_pos = position
                else:
                    actor.init_pos = self.map.convert_to_cartesian(actor.init_pos)
                self.world.spawn_actor(actor, position) 
[docs]    def get_actor_groups(self):
        """Get the actor groups
        Returns
        -------
        dict
            The actor groups as a dict with group id as the key
            list of actors as the value
        """
        return self.actor_groups 
[docs]    def get_actors_by_group_id(self, group_id):
        """Get a list of actor given by group id
        Parameters
        ----------
        group_id : int
            Group id to be returned
        Returns
        -------
        list
            A list of actor given the group id
        """
        return self.actor_groups[group_id] 
[docs]    def tick(self):
        """Performs one tick of the simulation, moving all actors, and getting the sensor data"""
        observations = {}
        # Tick once the simulation
        self.physics_client.stepSimulation()
        # Collect the raw observation from all the actors in each actor group
        for group in self.actor_groups:
            obs_from_each_actor = [
                actor.get_observation() for actor in self.actor_groups[group]
            ]
            observations[group] = obs_from_each_actor
        return observations 
[docs]    def close_simulation(self):
        """Close the simulation"""
        p.disconnect(self.physics_client._client)