Source code for shasta.core

import os
import signal

import psutil
import pybullet as p
from pybullet_utils import bullet_client as bc

from .map import Map
from .utils import get_initial_positions
from .world import World


[docs]def kill_all_servers(): """Kill all PIDs that start with Carla""" processes = [p for p in psutil.process_iter() if "carla" in p.name().lower()] for process in processes: os.kill(process.pid, signal.SIGKILL)
[docs]class ShastaCore: """ Class responsible of handling all the different CARLA functionalities, such as server-client connecting, actor spawning, and getting the sensors data. """ def __init__(self, config, actor_groups: dict = None): """Initialize the server and client""" self.config = config self.actor_groups = actor_groups # Verify if the actor groups is a dictionary if not isinstance(self.actor_groups, dict): raise TypeError("Actor groups should be of type dict") # Setup world and map self.world = World(config) self.map = Map() self.init_server() self._setup_physics_client()
[docs] def _setup_physics_client(self): """Setup the physics client Returns ------- None """ # Usage mode if self.config["headless"]: self.physics_client = bc.BulletClient(connection_mode=p.DIRECT) else: options = "--background_color_red=0.85 --background_color_green=0.85 --background_color_blue=0.85" # noqa self.physics_client = bc.BulletClient( connection_mode=p.GUI, options=options ) # Set the camera parameters self.camer_distance = 150.0 self.camera_yaw = 0.0 self.camera_pitch = -89.999 self.camera_target_position = [0, 30, 0] self.physics_client.resetDebugVisualizerCamera( cameraDistance=self.camer_distance, cameraYaw=self.camera_yaw, cameraPitch=self.camera_pitch, cameraTargetPosition=self.camera_target_position, ) self.physics_client.configureDebugVisualizer( self.physics_client.COV_ENABLE_GUI, 0 ) # Set gravity self.physics_client.setGravity(0, 0, -9.81) # Set parameters for simulation self.physics_client.setPhysicsEngineParameter( fixedTimeStep=self.config["time_step"] / 10, numSubSteps=1, numSolverIterations=5, ) # Inject physics client if self.world.physics_client is None: self.world.physics_client = self.physics_client return None
[docs] def get_physics_client(self): """Ge the physics client Returns ------- object The bullet physics client """ return self.physics_client
[docs] def init_server(self): """Start a server on a random port""" pass
[docs] def setup_experiment(self, experiment_config): """Initialize the hero and sensors""" # Load the environment and setup the map self.map.setup(experiment_config) read_path = self.map.asset_path + "/environment_collision_free.urdf" self.world.load_world_model(read_path) # Spawn the actors in the physics client self.spawn_actors()
[docs] def get_world(self): """Get the World object from the simulation Returns ------- object The world object """ return self.world
[docs] def get_map(self): """Get the Map object from the simulation Returns ------- object The map object """ return self.map
[docs] def reset(self): """This function resets / spawns the hero vehicle and its sensors""" # Reset all the actors for group_id in self.actor_groups: # Check if the entry is a list or not if not isinstance(self.actor_groups[group_id], list): self.actor_groups[group_id] = [self.actor_groups[group_id]] for actor in self.actor_groups[group_id]: # Reset the actor and collect the observation actor.reset() return None
[docs] def spawn_actors(self): """Spawns vehicles and walkers, also setting up the Traffic Manager and its parameters""" for group_id in self.actor_groups: # Check if the entry is a list or not if not isinstance(self.actor_groups[group_id], list): self.actor_groups[group_id] = [self.actor_groups[group_id]] # Spawn the actors spawn_point = self.map.get_cartesian_spawn_points() positions = get_initial_positions( spawn_point, 10, len(self.actor_groups[group_id]) ) for actor, position in zip(self.actor_groups[group_id], positions): if actor.init_pos is None: actor.init_pos = position else: actor.init_pos = self.map.convert_to_cartesian(actor.init_pos) self.world.spawn_actor(actor, position)
[docs] def get_actor_groups(self): """Get the actor groups Returns ------- dict The actor groups as a dict with group id as the key list of actors as the value """ return self.actor_groups
[docs] def get_actors_by_group_id(self, group_id): """Get a list of actor given by group id Parameters ---------- group_id : int Group id to be returned Returns ------- list A list of actor given the group id """ return self.actor_groups[group_id]
[docs] def tick(self): """Performs one tick of the simulation, moving all actors, and getting the sensor data""" observations = {} # Tick once the simulation self.physics_client.stepSimulation() # Collect the raw observation from all the actors in each actor group for group in self.actor_groups: obs_from_each_actor = [ actor.get_observation() for actor in self.actor_groups[group] ] observations[group] = obs_from_each_actor return observations
[docs] def close_simulation(self): """Close the simulation""" p.disconnect(self.physics_client._client)