from abc import ABC
from typing import Optional
import random
import matplotlib.pyplot as plt
import numpy as np
from gym.error import DependencyNotInstalled
from gym.spaces import Box, Dict
from ray.rllib.env.apis.task_settable_env import TaskSettableEnv, TaskType
from ray.rllib.utils.spaces.repeated import Repeated
from ray.rllib.utils import try_import_torch
from shapely.geometry import Point, Polygon
from decimal import *
from configs.base_config import MAX_PADDING_LEN
from src.utils.utils import line_intersection, MULTI_GEOM_TYPES, NO_EXTERIOR_TYPES, \
project_intervals_into_action_space, inverse_space, dict_key_maximum, SHAPE_COLLECTION, midpoint
getcontext().prec = 5
torch, nn = try_import_torch()
[docs]class Agent:
""" The agent representation
Args:
x (float): x-coordinate starting position
y (float): y-coordinate starting position
radius (float): Radius of the agent
perspective (float): Starting perspective
step_size (float): moving distance with each step
"""
def __init__(self, x, y, radius, perspective, step_size):
self.x = Decimal(repr(x))
self.y = Decimal(repr(y))
self.last_action = Decimal(0.0)
self.radius = Decimal(repr(radius))
self.perspective = Decimal(repr(perspective))
self.step_size = Decimal(repr(step_size))
self.collided = False
self.distance_target = False
self.distance_improvement = Decimal(0.0)
[docs] def step(self, direction, dt):
""" Take a step in a specific direction
Args:
direction (Decimal): Angle in which the next step should be taken
dt (float)
"""
self.x += Decimal(repr(np.cos(np.radians(float(direction))))) * self.step_size * dt
self.y += Decimal(repr(np.sin(np.radians(float(direction))))) * self.step_size * dt
self.perspective = direction
[docs] def set_distance_target(self, new_distance):
""" Sets the improvement and new distance to the target
Args:
new_distance: (float): The new distance to the target
"""
self.distance_improvement = self.distance_target - new_distance
self.distance_target = new_distance
[docs] def geometric_representation(self):
""" Returns the shapely geometry representation of the agent
Returns:
shapely geometry object
"""
return Point(float(self.x), float(self.y)).buffer(float(self.radius))
[docs]class Obstacle:
""" The obstacle representation
Args:
coordinates (list): Polygon coordinates for the shape of the obstacle
step_size (float): moving distance with each step
"""
def __init__(self, coordinates: list, step_size):
self.coordinates = np.array([[
Decimal(repr(coordinate[0])), Decimal(repr(coordinate[1]))
] for coordinate in coordinates])
self.step_size = Decimal(repr(step_size))
self.waypoints = [self.geometric_representation().centroid.coords[0]]
self.distance = Decimal(0.0)
self.x = Decimal(repr(self.waypoints[0][0]))
self.y = Decimal(repr(self.waypoints[0][1]))
self.current_target = 1
self.moving_direction = 'forward'
[docs] def add_waypoint(self, waypoint):
""" Adds a new waypoint to which the obstacle moves
Args:
waypoint (list): waypoint coordinates
"""
self.waypoints.append(waypoint)
[docs] def step(self, dt):
""" Takes a step in the direction of the next waypoint in the list
Args: dt (float)
"""
if len(self.waypoints) > 1:
distance = Decimal(repr(Point(float(self.x), float(self.y)).distance(
Point(self.waypoints[self.current_target][0], self.waypoints[self.current_target][1])
)))
if distance - self.step_size >= Decimal(0.0):
unit_vector = [
(Decimal(repr(self.waypoints[self.current_target][0])) - self.x) / distance,
(Decimal(repr(self.waypoints[self.current_target][1])) - self.y) / distance
]
step = np.array(unit_vector) * self.step_size * dt
self.x += step[0]
self.y += step[1]
self.coordinates = np.add(self.coordinates, step)
else:
self.x = Decimal(repr(self.waypoints[self.current_target][0]))
self.y = Decimal(repr(self.waypoints[self.current_target][1]))
if self.current_target >= (len(self.waypoints) - 1) or self.current_target <= 0:
self.moving_direction = 'forward' if self.moving_direction == 'backward' else 'backward'
else:
self.current_target += 1 if self.moving_direction == 'forward' else -1
[docs] def geometric_representation(self):
""" Returns the shapely geometry representation of the obstalce
Returns:
shapely geometry object
"""
return Polygon(self.coordinates)
[docs] def collision_area(self, radius):
""" Returns the area which would lead to a collision when the agent enters it
Args:
radius: The radius of the agent
Returns:
shapely geometry object
"""
return Polygon(self.coordinates).buffer(radius)
[docs]class ObstacleAvoidance(TaskSettableEnv, ABC):
""" The obstacle avoidance environment
Args:
env_config (dict): Setup of the environment
render_mode (str): Select how to visualize the environment. Options are human, rgb_array, and jupyter
"""
metadata = {
'render_modes': ['human', 'rgb_array', 'jupyter'],
'render_fps': 4,
'video.frames_per_second': 4
}
def __init__(self, env_config, render_mode: Optional[str] = None):
assert 'STEPS_PER_EPISODE' in env_config
assert 'ACTION_RANGE' in env_config
assert 'DT' in env_config
assert 'SAFETY_DISTANCE' in env_config
assert 'REWARD' in env_config
assert 'REWARD_COEFFICIENT' in env_config['REWARD']
assert 'TIMESTEP_PENALTY_COEFFICIENT' in env_config['REWARD']
assert 'GOAL' in env_config['REWARD']
assert 'COLLISION' in env_config['REWARD']
assert 'LEVELS' in env_config
assert 1 in env_config['LEVELS']
assert 'HEIGHT' in env_config['LEVELS'][1]
assert 'WIDTH' in env_config['LEVELS'][1]
assert 'AGENT' in env_config['LEVELS'][1]
assert 'GOAL' in env_config['LEVELS'][1]
self.STEPS_PER_EPISODE = env_config['STEPS_PER_EPISODE']
self.MAX_LEVEL = max(env_config['LEVELS'].keys())
self.ACTION_RANGE = Decimal(repr(env_config["ACTION_RANGE"]))
self.SAFETY_DISTANCE = Decimal(repr(env_config["SAFETY_DISTANCE"]))
self.MAXIMUM_HEIGHT = dict_key_maximum(env_config['LEVELS'], 'HEIGHT')
self.MAXIMUM_WIDTH = dict_key_maximum(env_config['LEVELS'], 'WIDTH')
self.REWARD_COEFFICIENT = Decimal(repr(env_config["REWARD"]["REWARD_COEFFICIENT"]))
self.REWARD_GOAL = Decimal(repr(env_config["REWARD"]["GOAL"]))
self.REWARD_COLLISION = Decimal(repr(env_config["REWARD"]["COLLISION"]))
self.TIMESTEP_PENALTY_COEFFICIENT = Decimal(repr(env_config['REWARD']['TIMESTEP_PENALTY_COEFFICIENT']))
self.DT = Decimal(repr(env_config["DT"]))
self.levels = env_config['LEVELS']
self.current_height = 0.0
self.current_width = 0.0
self.goal = None
self.goal_radius = None
self.agent = None
self.map = None
self.obstacles = []
self.map_collision_area = None
self.current_step = 0
self.previous_position = [Decimal(0.0), Decimal(0.0)]
self.safety_angle = Decimal(0.0)
self.allowed_actions = []
self.last_reward = 0.0
self.current_level = 1
self.trajectory = []
self.current_seed = -1 if 'GENERATE_OBSTACLES' not in env_config['LEVELS'][1] else env_config[
'LEVELS'][1]['GENERATE_OBSTACLES']['START_SEED']
self.window_scale = 50
self.window = None
self.clock = None
self.render_mode = render_mode
self.reload = False
self.load_map(self.levels[self.current_level])
# Observation and Action Space
self.observation_space = Dict({
'observation': Dict({
'location': Box(low=-2.0, high=np.max([self.MAXIMUM_WIDTH, self.MAXIMUM_HEIGHT]) + 2.0, shape=(2,),
dtype=np.float32),
'perspective': Box(low=0.0, high=360.0, shape=(1,), dtype=np.float32),
'target_angle': Box(low=0.0, high=360.0, shape=(1,), dtype=np.float32),
'target_distance': Box(low=0.0, high=np.sqrt(self.MAXIMUM_WIDTH ** 2 + self.MAXIMUM_HEIGHT ** 2),
shape=(1,), dtype=np.float32),
'current_step': Box(low=0.0, high=self.STEPS_PER_EPISODE, shape=(1,), dtype=np.float32)
}),
'allowed_actions': Repeated(Box(low=-180.0, high=180.0, shape=(2,)), max_len=MAX_PADDING_LEN)
})
self.action_space = Box(low=float(-self.ACTION_RANGE / 2), high=float(self.ACTION_RANGE / 2), shape=(1,),
dtype=np.float32)
if 'RANDOM_SEED' in env_config:
self.seed(env_config['RANDOM_SEED'])
[docs] def load_map(self, structure: dict):
""" Loads the current level of the map.
Args:
structure (dict): Setup of the environment
"""
self.current_height = structure['HEIGHT']
self.current_width = structure['WIDTH']
self.goal_radius = structure['GOAL']['radius']
self.goal = Point(structure['GOAL']['x'], structure['GOAL']['y']).buffer(self.goal_radius)
self.agent = Agent(x=structure['AGENT']['x'], y=structure['AGENT']['y'], radius=structure['AGENT']['radius'],
perspective=structure['AGENT']['angle'], step_size=structure['AGENT']['step_size'])
self.safety_angle = Decimal(2.0) * Decimal(
repr(np.rad2deg(np.arcsin(float((self.SAFETY_DISTANCE / Decimal(2.0)) / self.agent.radius)))))
self.map = Polygon([(0.0, 0.0), (self.current_width, 0.0), (self.current_width, self.current_height),
(0.0, self.current_height)])
self.map_collision_area = self.map.exterior.buffer(self.agent.radius)
obstacles = None
if 'GENERATE_OBSTACLES' in structure:
obstacle_properties = structure['GENERATE_OBSTACLES']
if self.current_seed < 10e7:
self.current_seed += 1
else:
self.current_seed = obstacle_properties['START_SEED']
obstacles = {
**generate_obstacles(
self.current_width, self.current_height, obstacle_properties['COUNT'],
obstacle_properties['POSITION_COVARIANCE'], obstacle_properties['MEAN_SIZE'],
obstacle_properties['VARIANCE_SIZE'], obstacle_properties['RANGE_SIZE'],
obstacle_properties['WAYPOINTS'], obstacle_properties['DISTANCE_WAYPOINTS'],
obstacle_properties['VARIANCE_DISTANCE'], obstacle_properties['STEP_SIZE'],
self.current_seed,
obstacles=structure['OBSTACLES'].copy(),
forbidden_circles=[(self.agent.x, self.agent.y, float(self.agent.radius+self.agent.step_size)),
(structure['GOAL']['x'], structure['GOAL']['y'], self.goal_radius)])
}
if 'OBSTACLES' in structure:
if obstacles is None:
obstacles = structure['OBSTACLES'].copy()
for key, obstacle_structure in obstacles.items():
obstacle = Obstacle(coordinates=obstacle_structure['coordinates'],
step_size=obstacle_structure['step_size'])
for waypoint in obstacle_structure['waypoints']:
obstacle.add_waypoint(waypoint)
self.obstacles.append(obstacle)
[docs] def angle_to_target(self):
""" Calculates the angle between the agent and the goal
Returns:
angle_to_target (float)
"""
if Decimal(repr(self.goal.centroid.coords[0][0])) == self.agent.x:
angle_agent = Decimal(0.0)
else:
angle_agent = self.agent.perspective - Decimal(
repr((
np.rad2deg(np.arctan(float(np.abs(
Decimal(repr(self.goal.centroid.coords[0][1])) - self.agent.y
) / np.abs(Decimal(repr(self.goal.centroid.coords[0][0])) - self.agent.x)))))))
return angle_agent if angle_agent >= Decimal(0.0) else Decimal(360.0) + angle_agent
[docs] def distance_to_target(self):
""" Calculates the distance between the agent and the goal
Returns:
distance_to_target (float)
"""
return Decimal(repr(self.goal.centroid.distance(self.agent.geometric_representation())))
[docs] def detect_collision(self):
""" Checks if the agent violated any of the restrictions
Returns:
violation (bool)
"""
# Check if allowed actions got violated
if not np.any([action_interval[0] <= self.agent.last_action <= action_interval[1] for action_interval in
self.allowed_actions]):
return True
# Check if agent is on the map and not collided with the boundaries
if not self.map.contains(self.agent.geometric_representation()) or self.agent.radius - Decimal(
repr(self.map.exterior.distance(Point(self.agent.x, self.agent.y)))) > Decimal(0.0):
return True
# Check if agent collided with one of the obstacles
if np.any([
obstacle.geometric_representation().distance(self.agent.geometric_representation()
) < Decimal(0.0) for obstacle in self.obstacles]):
return True
return False
[docs] def get_reward(self):
""" Calculates the reward based on collisions, improvement and the distance to the goal
Returns:
reward (float)
"""
if self.agent.collided:
reward = self.REWARD_COLLISION
elif self.agent.distance_target <= self.goal_radius:
reward = self.REWARD_GOAL
else:
reward = self.REWARD_COEFFICIENT * self.agent.distance_improvement - (
Decimal(repr(self.current_step)) * self.TIMESTEP_PENALTY_COEFFICIENT)
return float(reward)
[docs] def seed(self, seed: int = None):
""" Set the seed of the environment
Args:
seed (int)
"""
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
self.action_space.seed(seed)
[docs] def step(self, action):
""" Perform an environment iteration including moving the agent and obstacles.
Args:
action (list): Angle of the agent's next step
Returns:
observation (dict)
"""
action = Decimal(repr(action[0]))
step_direction = self.agent.perspective + action
if step_direction < Decimal(0.0):
step_direction += Decimal(360.0)
elif step_direction >= Decimal(360.0):
step_direction -= Decimal(360.0)
self.agent.step(step_direction, self.DT)
self.agent.last_action = action
self.agent.collided = self.detect_collision()
self.agent.set_distance_target(self.distance_to_target())
self.last_reward = self.get_reward()
for moving_obstacle in self.obstacles:
moving_obstacle.step(self.DT)
self.allowed_actions = self.get_allowed_actions()
self.trajectory.append([float(self.agent.x),
float(self.agent.y)])
self.current_step += 1
observation = {'location': np.array([self.agent.x, self.agent.y], dtype=np.float32),
'perspective': np.array([self.agent.perspective], dtype=np.float32),
'target_angle': np.array([self.angle_to_target()], dtype=np.float32),
'target_distance': np.array([self.agent.distance_target], dtype=np.float32),
'current_step': np.array([self.current_step], dtype=np.float32)}
done = self.agent.collided or (self.agent.distance_target <= self.goal_radius)
truncated = self.current_step >= self.STEPS_PER_EPISODE
info = {
'goal_distance': float(self.agent.distance_target),
'solved': self.agent.distance_target <= Decimal(f'{self.goal_radius}'),
'level': self.current_level
}
return {'observation': observation,
'allowed_actions': np.array(self.allowed_actions, dtype=np.float32)
}, self.last_reward, done or truncated, info
[docs] def reset(self):
""" Resets and loads the structure of the map again
Returns:
observation (dict)
"""
self.trajectory.append([float(self.agent.x),
float(self.agent.y)])
self.obstacles = []
self.load_map(self.levels[self.current_level])
self.agent.distance_target = self.distance_to_target()
self.allowed_actions = self.get_allowed_actions()
self.current_step = 0
return {'observation': {'location': np.array([self.agent.x, self.agent.y], dtype=np.float32),
'perspective': np.array([self.agent.perspective], dtype=np.float32),
'target_angle': np.array([self.angle_to_target()], dtype=np.float32),
'target_distance': np.array([self.agent.distance_target], dtype=np.float32),
'current_step': np.array([self.current_step], dtype=np.float32)},
'allowed_actions': np.array(self.allowed_actions, dtype=np.float32)}
[docs] def set_task(self, task: TaskType) -> None:
""" Sets the next environment level when the episode is reset
Args:
task (int): next environment level to load
"""
if task <= self.MAX_LEVEL:
self.current_level = task
self.reload = True
[docs] def get_task(self) -> TaskType:
""" Returns the level of the environment
Returns:
current_level (int)
"""
return self.current_level
[docs] def get_restrictions_for_polygon(self, polygon_coordinates):
""" Calculates the restriction angles for the agent and a single polygon
Args:
polygon_coordinates (list): List of polygon corner points that define the shape of the obstacle
Returns:
restrictions (list): List of intervals which would lead to a collision. For example [[-10, 30]]
"""
max_angle = Decimal(-np.inf)
min_angle = Decimal(np.inf)
agent_on_action_space_boundary = self.agent.y == Decimal(repr(polygon_coordinates[0][0])
) if len(polygon_coordinates) > 0 else False
boundary_crossed_negative = False
boundary_crossed_positive = False
for index, coordinates in enumerate(polygon_coordinates):
coordinates = list(coordinates)
coordinates[0] = Decimal(repr(coordinates[0]))
coordinates[1] = Decimal(repr(coordinates[1]))
# Check if next coordinates go beyond max and min action space boundaries.
# For example: Coordinate 1 -> -170 and coordinate 2 -> -190 with boundary -180
if index != 0:
coordinate_direction_line = (coordinates[0], coordinates[1],
Decimal(repr(polygon_coordinates[index - 1][0])),
Decimal(repr(polygon_coordinates[index - 1][1])))
action_space_boundary_line = (self.agent.x, self.agent.y,
self.agent.x - self.agent.radius - self.agent.step_size, self.agent.y)
line_crossed = line_intersection(*coordinate_direction_line, *action_space_boundary_line)
if not boundary_crossed_positive and line_crossed in ['negative_positive', 'negative_line']:
boundary_crossed_negative = True
elif not boundary_crossed_negative and line_crossed in ['positive_negative', 'line_negative']:
boundary_crossed_positive = True
elif boundary_crossed_negative and line_crossed in ['positive_negative', 'line_negative',
'line_right_out'
] and not agent_on_action_space_boundary:
boundary_crossed_negative = False
elif boundary_crossed_positive and line_crossed in ['negative_positive', 'negative_line']:
boundary_crossed_positive = False
if agent_on_action_space_boundary and line_crossed in ['line_positive']:
agent_on_action_space_boundary = False
if agent_on_action_space_boundary and line_crossed in ['line_negative']:
agent_on_action_space_boundary = False
# Angle to polygon corner
if Decimal(coordinates[0]) == self.agent.x:
angle_to_coordinates = Decimal(90.0)
else:
angle_to_coordinates = Decimal(repr(np.rad2deg(np.arctan(float(
np.abs(coordinates[1] - self.agent.y) / np.abs(
coordinates[0] - self.agent.x))))))
# Subtract 180 if polygon corner lies left to agent
if self.agent.x > coordinates[0]:
angle_to_coordinates = Decimal(180.0) - angle_to_coordinates
# Negative if polygon corner is below agent
if self.agent.y > coordinates[1] or index == 0 and self.agent.y == coordinates[1] and index + 1 != len(
polygon_coordinates) and Decimal(
repr(polygon_coordinates[index + 1][1])) < self.agent.y:
angle_to_coordinates = -angle_to_coordinates
# Correct if polygon corner goes beyond possible action space
if boundary_crossed_negative and angle_to_coordinates != -180:
angle_to_coordinates = angle_to_coordinates - Decimal(360.0)
elif boundary_crossed_positive and angle_to_coordinates != 180:
angle_to_coordinates = angle_to_coordinates + Decimal(360.0)
if angle_to_coordinates > max_angle:
max_angle = angle_to_coordinates
if angle_to_coordinates < min_angle:
min_angle = angle_to_coordinates
return [min_angle - self.agent.perspective,
max_angle - self.agent.perspective]
[docs] def get_allowed_actions(self):
""" Iterates through the obstacles and calculates the intervals which are allowed and do not lead to a collision
Returns:
allowed_actions (list): Allowed action space
"""
step_circle = Point(float(self.agent.x), float(self.agent.y)).buffer(float(self.agent.step_size * self.DT))
restrictions = []
for obstacle in self.obstacles + [self.map_collision_area]:
if isinstance(obstacle, Obstacle):
obstacle = obstacle.collision_area(float(self.agent.radius))
is_in_collision_area = obstacle.contains(
Point(float(self.agent.x), float(self.agent.y))) or obstacle.boundary.contains(
Point(float(self.agent.x), float(self.agent.y)))
obstacle_step_circle_intersection = step_circle.intersection(obstacle) if not is_in_collision_area else (
step_circle.boundary.difference(obstacle))
# If intersection consists of multiple parts, iterate through them
if obstacle_step_circle_intersection.geom_type in MULTI_GEOM_TYPES:
restrictions_for_part = []
for polygon in obstacle_step_circle_intersection.geoms:
restriction = self.get_restrictions_for_polygon(
polygon.exterior.coords if not is_in_collision_area and not (
polygon.geom_type in NO_EXTERIOR_TYPES) else polygon.coords)
restrictions_for_part.append(restriction)
# Bring each restriction into the action space
restrictions_for_part = project_intervals_into_action_space(restrictions_for_part,
low=Decimal(-180), high=Decimal(180))
for restriction in restrictions_for_part:
if restriction[0] < Decimal(-180.0):
restrictions_for_part.append([Decimal(-180.0), restriction[1]])
restriction[0] = Decimal(360) + restriction[0]
restriction[1] = Decimal(180)
# Merge overlapping restrictions for different parts
if len(restrictions_for_part) > 1:
for index, restriction in enumerate(restrictions_for_part):
if index != (len(restrictions_for_part) - 1):
if restriction[1] == restrictions_for_part[index + 1][0]:
restrictions_for_part[index + 1][0] = restriction[0]
restriction[0] = Decimal(np.inf)
restrictions_for_part = [res for res in restrictions_for_part if res[0] != Decimal(np.inf)]
# When agent is inside the collision area, inverse the space to get restrictions
if is_in_collision_area:
restrictions_for_part = inverse_space(restrictions_for_part,
low=Decimal(-180.0), high=Decimal(180.0))
else:
restrictions_for_part = [np.flip(restrictions_for_part[0])
] if is_in_collision_area else restrictions_for_part
restrictions += restrictions_for_part
else:
object_restrictions = self.get_restrictions_for_polygon(
obstacle_step_circle_intersection.exterior.coords if not is_in_collision_area and not (
obstacle_step_circle_intersection.geom_type in NO_EXTERIOR_TYPES
) else obstacle_step_circle_intersection.coords)
restrictions.append(np.flip(object_restrictions) if is_in_collision_area else object_restrictions)
restrictions = project_intervals_into_action_space(restrictions,
low=Decimal(-180.0), high=Decimal(180.0))
restrictions = [restriction for restriction in restrictions if restriction[0] != restriction[1]]
# Build allowed action space from restrictions
allowed_action_space = [[-self.ACTION_RANGE / 2, self.ACTION_RANGE / 2]]
for restriction in restrictions:
for index, allowed_subset in enumerate(allowed_action_space):
if restriction[0] <= restriction[1]:
if restriction[0] < allowed_subset[0] <= restriction[1] <= allowed_subset[1]:
allowed_subset[0] = restriction[1]
if restriction[1] > allowed_subset[1] >= restriction[0] >= allowed_subset[0]:
allowed_subset[1] = restriction[0]
if restriction[0] >= allowed_subset[0] and restriction[1] <= allowed_subset[1]:
if allowed_subset[0] != restriction[0]:
allowed_action_space.append([allowed_subset[0], restriction[0]])
if allowed_subset[1] != restriction[1]:
allowed_action_space.append([restriction[1], allowed_subset[1]])
allowed_subset[0] = np.inf
if restriction[0] < allowed_subset[0] and restriction[1] > allowed_subset[1]:
allowed_subset[0] = np.inf
else:
if restriction[0] <= allowed_subset[0] and restriction[1] <= allowed_subset[0] or (
restriction[0] >= allowed_subset[1]) and restriction[1] >= allowed_subset[1]:
allowed_subset[0] = np.inf
if allowed_subset[1] > restriction[0] > allowed_subset[0]:
allowed_subset[1] = restriction[0]
if allowed_subset[0] < restriction[1] < allowed_subset[1]:
allowed_subset[0] = restriction[1]
allowed_action_space = np.array(
[subset for subset in allowed_action_space if subset[0] != np.inf and subset[0] != subset[1]])
if len(allowed_action_space) > 0:
allowed_action_space[allowed_action_space[:, 0] != -self.ACTION_RANGE / 2, 0] += self.safety_angle
allowed_action_space[allowed_action_space[:, 1] != self.ACTION_RANGE / 2, 1] -= self.safety_angle
return [list(subset) for subset in allowed_action_space if subset[0] < subset[1]]
[docs] def render(self, render_mode: Optional[str] = 'rgb_array', draw_trajectory: bool = False, draw_information=True):
""" Renders the environment
Args:
render_mode (str)
draw_trajectory (bool): Whether past steps should be indicated on the map
draw_information (bool): Whether to show information about the reward, target distance, and allowed actions
"""
try:
import pygame
from pygame import gfxdraw
except ImportError:
raise DependencyNotInstalled('Pygame is not installed, run `pip install pygame`')
def draw_polygon_border(polygon_coordinates):
for index, coordinate in enumerate(polygon_coordinates):
if index == len(polygon_coordinates) - 1:
pygame.draw.line(canvas, (0, 0, 0), coordinate, polygon_coordinates[0], 2)
else:
pygame.draw.line(canvas, (0, 0, 0), coordinate, polygon_coordinates[index + 1], 2)
self.window_scale = 50 if self.current_width < 15 else 30
window_width = self.current_width * self.window_scale
window_height = self.current_height * self.window_scale
if self.window is None or self.reload:
pygame.init()
pygame.font.init()
if self.render_mode == 'human':
pygame.display.init()
self.window = pygame.display.set_mode((window_width, window_height))
pygame.display.set_caption('Obstacle Avoidance')
else:
self.window = pygame.Surface((window_width, window_height))
if self.clock is None:
self.clock = pygame.time.Clock()
canvas = pygame.Surface((window_width, window_height))
canvas.fill((232, 232, 232))
for obstacle in self.obstacles:
gfxdraw.filled_polygon(canvas, obstacle.coordinates * self.window_scale, (136, 136, 136))
draw_polygon_border(obstacle.coordinates * self.window_scale)
gfxdraw.pie(canvas, int(float(self.agent.x) * self.window_scale),
int(float(self.agent.y) * self.window_scale),
int(float(self.agent.step_size + self.agent.radius) * self.window_scale),
int(float(self.agent.perspective) - float(self.ACTION_RANGE) / 2),
int(float(self.agent.perspective) + float(self.ACTION_RANGE) / 2), (0, 0, 0))
gfxdraw.filled_circle(canvas, int(float(self.agent.x) * self.window_scale),
int(float(self.agent.y) * self.window_scale),
int(float(self.agent.radius) * self.window_scale), (65, 105, 225))
gfxdraw.circle(canvas, int(self.goal.centroid.coords[0][0] * self.window_scale),
int(self.goal.centroid.coords[0][1] * self.window_scale),
int((self.goal.bounds[3] - self.goal.centroid.coords[0][1]) * self.window_scale), (34, 139, 34))
if draw_trajectory and len(self.trajectory) > 1:
pygame.draw.aalines(canvas, (232, 232, 232), False, np.multiply(self.trajectory, self.window_scale), 0)
if draw_information:
font = pygame.font.SysFont('Arial', 14)
text_canvas = font.render(
f'Reward: {np.round(self.last_reward, 2)}',
True, (0, 0, 0))
perspective_canvas = font.render(
f'Perspective: {np.round(float(self.agent.perspective), 2)}',
True, (0, 0, 0))
allowed_actions_canvas = font.render(
f'Allowed Actions: {[(np.round(float(subset[0]), 2), np.round(float(subset[1]), 2)) for subset in self.allowed_actions]}',
True, (0, 0, 0))
canvas = pygame.transform.flip(canvas, False, True)
self.window.blit(canvas, (0, 0))
if draw_information:
self.window.blit(text_canvas, (self.window_scale / 4, self.window_scale / 4))
self.window.blit(perspective_canvas, (self.window_scale / 4, 4 * self.window_scale / 4))
self.window.blit(allowed_actions_canvas, (self.window_scale / 4, 7 * self.window_scale / 4))
if self.render_mode == 'human':
pygame.event.pump()
self.clock.tick(self.metadata['render_fps'])
pygame.display.flip()
elif self.render_mode == 'rgb_array':
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.window)), axes=(1, 0, 2)
)
elif self.render_mode == 'jupyter':
plt.imshow(np.transpose(
np.array(pygame.surfarray.pixels3d(self.window)), axes=(1, 0, 2)
))
plt.axis('off')
plt.show()
[docs] def close(self):
""" Closes the visualization
"""
if self.window is not None:
import pygame
pygame.display.quit()
pygame.quit()
def generate_obstacles(width: float, height: float, num_obstacles: int,
position_covariance: list = None,
mean_size_obstacle: float = 1.0, sigma_size_obstacle: float = 0.2,
range_size_obstacle: float = 0.5,
num_waypoints: int = 0, distance_waypoints: float = 2.0,
sigma_distance: float = 1.0, step_size: float = 0.3,
seed: int = None, obstacles: dict = None, forbidden_circles: list = None,
max_iterations: int = 10000, uniform: bool = False):
""" Algorithm to generate environment setups
Args:
width (float): Width of the map
height ( float): Height of the map
num_obstacles (int): Number of obstacles
position_covariance (list): Covariance matrix
mean_size_obstacle (float): Mean size of an obstacle
sigma_size_obstacle (float): Standard deviation of the obstacle size
range_size_obstacle (float): Defines the minimum and maximum allowed obstacle sizes
num_waypoints (int): Number of waypoints for each obstacle
distance_waypoints (float): Mean distance of the straight path
sigma_distance (float): Standard deviation of the waypoints' distance
step_size (float): Step size of the obstacles
seed (int): Seed to make generations reproducible
obstacles (dict): Already existing obstacles in the environment
forbidden_circles (list): List of circles in which no obstacle or waypoint should be placed
max_iterations (int): Maximum generation trials before the next setup is taken
uniform (bool): Whether to sample uniformly instead of a normal distribution
Returns:
setup (dict): obstacle setup which can be used in an environment configuration
"""
if position_covariance is None:
position_covariance = [[4.0, 0.0], [0.0, 4.0]]
def is_valid(el_coordinates):
out_of_map = minimum_distance > el_coordinates[0] or el_coordinates[0] > width - minimum_distance or (
minimum_distance > el_coordinates[1]) or el_coordinates[1] > height - minimum_distance
collision = np.any(
[Point(midpoint(geometry['coordinates'])).distance(Point(el_coordinates)) < minimum_distance + np.sqrt(2 * (
(max(geometry['coordinates'][:, 1]) - min(geometry['coordinates'][:, 1])) / 2) ** 2) or np.any(
[Point(waypoint).distance(Point(el_coordinates)) < minimum_distance for waypoint in
geometry['waypoints']]
) for geometry in obstacles.values()])
if forbidden_circles is not None:
in_forbidden_circle = np.any(
[Point(circle[0], circle[1]).distance(Point(el_coordinates)
) < circle[2] + minimum_distance for circle in forbidden_circles])
else:
in_forbidden_circle = False
return not collision and not out_of_map and not in_forbidden_circle
if seed is not None:
np.random.seed(seed)
if obstacles is None:
obstacles = {}
iteration = 0
while len(obstacles) < num_obstacles:
iteration += 1
if uniform:
size_obstacle = np.random.uniform(mean_size_obstacle - range_size_obstacle,
mean_size_obstacle + range_size_obstacle)
else:
size_obstacle = np.clip(np.random.normal(mean_size_obstacle, sigma_size_obstacle),
mean_size_obstacle - range_size_obstacle,
mean_size_obstacle + range_size_obstacle)
minimum_distance = np.sqrt(2 * (size_obstacle / 2) ** 2) + 0.95
position = np.random.multivariate_normal([width / 2, height / 2],
position_covariance)
position[0] = np.clip(position[0], 0.0, width - size_obstacle)
position[1] = np.clip(position[1], 0.0, height - size_obstacle)
coordinates = SHAPE_COLLECTION[np.random.randint(0, len(SHAPE_COLLECTION) - 1)] * size_obstacle + position - (
size_obstacle / 2)
centroid = np.array(Polygon(coordinates).centroid.coords[0])
if is_valid(position) or iteration > max_iterations:
iteration = 0
waypoints = []
while len(waypoints) < num_waypoints:
iteration += 1
radius = np.clip(np.random.normal(distance_waypoints, sigma_distance), 1.0, 4.0)
update = np.array([np.cos(np.radians(np.random.uniform(low=0.0, high=259.9))) * radius,
np.sin(np.radians(np.random.uniform(low=0.0, high=259.0))) * radius])
if is_valid(centroid + update) or iteration > max_iterations:
iteration = 0
centroid += update
waypoints.append(centroid.copy())
obstacles[len(obstacles)] = {'coordinates': coordinates, 'waypoints': waypoints, 'step_size': step_size}
return obstacles