Skip to content

Commit

Permalink
added projection to the s-d file_writer. Created another file to stor…
Browse files Browse the repository at this point in the history
…e common functions between s-d reader and writer. Minor comment updates
  • Loading branch information
hamza.begic committed Jan 17, 2024
1 parent f4c9e9e commit 889e6d0
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 134 deletions.
135 changes: 135 additions & 0 deletions crdesigner/common/common_file_reader_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import copy

from commonroad.common.validity import ValidTypes
from commonroad.geometry.shape import Circle, Polygon, Rectangle, Shape
from commonroad.planning.planning_problem import PlanningProblemSet
from commonroad.scenario.scenario import Scenario
from pyproj import CRS, Transformer


def transform_shape(shape: Shape, transformer: Transformer) -> Shape:
"""
Function that transforms a shape.
:param shape: Shape that is to be transformed.
:param transformer: Transformer that transforms the shape.
:return: Transformed shape.
"""
shape_copy = copy.deepcopy(shape)
if isinstance(shape_copy, Rectangle) or isinstance(shape_copy, Circle):
shape_copy.center[0], shape_copy.center[1] = transformer.transform(shape_copy.center[0], shape_copy.center[1])
elif isinstance(shape_copy, Polygon):
for vertex in shape_copy.vertices:
vertex[0], vertex[1] = transformer.transform(vertex[0], vertex[1])
return shape_copy


def project_complete_scenario_and_pps(
scenario: Scenario, planning_problem_set: PlanningProblemSet, proj_string_from: str, proj_string_to: str
) -> [Scenario, PlanningProblemSet]:
"""
Function that performs a projection onto the entire scenario.
:param scenario: Scenario that needs to be projected.
:param planning_problem_set: PlanningProblemSet that needs to be projected (if not None)
:param proj_string_from: Source projection.
:param proj_string_to: Target projection.
:return: Projected scenario.
"""
crs_from = CRS(proj_string_from)
crs_to = CRS(proj_string_to)
transformer = Transformer.from_proj(crs_from, crs_to)

# create a deep copy of the scenario
scenario_copy = copy.deepcopy(scenario)

# transform lanelet vertex coordinates
for lanelet in scenario_copy.lanelet_network.lanelets:
for left_vertex in lanelet.left_vertices:
left_vertex[0], left_vertex[1] = transformer.transform(left_vertex[0], left_vertex[1])
for right_vertex in lanelet.right_vertices:
right_vertex[0], right_vertex[1] = transformer.transform(right_vertex[0], right_vertex[1])
for center_vertex in lanelet.center_vertices:
center_vertex[0], center_vertex[1] = transformer.transform(center_vertex[0], center_vertex[1])

# transform stop line coordinates
if lanelet.stop_line is not None:
lanelet.stop_line.start[0], lanelet.stop_line.start[1] = transformer.transform(
lanelet.stop_line.start[0], lanelet.stop_line.start[1]
)
lanelet.stop_line.end[0], lanelet.stop_line.end[1] = transformer.transform(
lanelet.stop_line.end[0], lanelet.stop_line.end[1]
)

# transform traffic light coordinates
for tl in scenario_copy.lanelet_network.traffic_lights:
tl.position[0], tl.position[1] = transformer.transform(tl.position[0], tl.position[1])

# transform traffic sign coordinates
for ts in scenario_copy.lanelet_network.traffic_signs:
ts.position[0], ts.position[1] = transformer.transform(ts.position[0], ts.position[1])

# transform area coordinates
for area in scenario_copy.lanelet_network.areas:
for border in area.border:
for vertex in border.border_vertices:
vertex[0], vertex[1] = transformer.transform(vertex[0], vertex[1])

# transform obstacle coordinates
for obstacle in scenario_copy.obstacles:
if obstacle.obstacle_shape:
obstacle.obstacle_shape = transform_shape(obstacle.obstacle_shape, transformer)

if obstacle.initial_state:
if obstacle.initial_state.position:
if isinstance(obstacle.initial_state.position, ValidTypes.ARRAY):
obstacle.initial_state.position[0], obstacle.initial_state.position[1] = transformer.transform(
obstacle.initial_state.position[0], obstacle.initial_state.position[1]
)
if isinstance(obstacle.initial_state.position, Shape):
obstacle.initial_state.position = transform_shape(obstacle.initial_state.position, transformer)

if obstacle.prediction:
if obstacle.prediction.occupancy_set:
for occupancy in obstacle.prediction.occupancy_set:
occupancy.shape = transform_shape(occupancy.shape, transformer)

if obstacle.prediction.shape:
obstacle.prediction.shape = transform_shape(obstacle.prediction.shape, transformer)

if obstacle.prediction.trajectory:
for state in obstacle.prediction.trajectory.state_list:
if state.position:
if isinstance(state.position, ValidTypes.ARRAY):
state.position[0], state.position[1] = transformer.transform(
state.position[0], state.position[1]
)
if isinstance(state.position, Shape):
state.position = transform_shape(state.position, transformer)

# transform planning problems
for planning_problem in planning_problem_set.planning_problem_dict.values():
if planning_problem.initial_state:
if isinstance(planning_problem.initial_state.position, ValidTypes.ARRAY):
(
planning_problem.initial_state.position[0],
planning_problem.initial_state.position[1],
) = transformer.transform(
planning_problem.initial_state.position[0], planning_problem.initial_state.position[1]
)
if isinstance(planning_problem.initial_state.position, Shape):
planning_problem.initial_state.position = transform_shape(
planning_problem.initial_state.position, transformer
)

if planning_problem.goal:
for state in planning_problem.goal.state_list:
if state.position:
if isinstance(state.position, ValidTypes.ARRAY):
state.position[0], state.position[1] = transformer.transform(
state.position[0], state.position[1]
)
if isinstance(state.position, Shape):
state.position = transform_shape(state.position, transformer)

return scenario_copy, planning_problem_set
137 changes: 4 additions & 133 deletions crdesigner/common/file_reader.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,20 @@
import copy
from typing import Optional, Tuple

from commonroad.common.file_reader import CommonRoadFileReader
from commonroad.common.util import FileFormat, Path_T
from commonroad.common.validity import ValidTypes
from commonroad.geometry.shape import Circle, Polygon, Rectangle, Shape
from commonroad.planning.planning_problem import PlanningProblemSet
from commonroad.scenario.lanelet import LaneletNetwork
from commonroad.scenario.scenario import Scenario
from pyproj import CRS, Transformer

from crdesigner.common.common_file_reader_writer import (
project_complete_scenario_and_pps,
)
from crdesigner.verification_repairing.map_verification_repairing import (
verify_and_repair_map,
verify_and_repair_scenario,
)


def transform_shape(shape: Shape, transformer: Transformer) -> Shape:
"""
Function that transforms a shape.
:param shape: Shape that is to be transformed.
:param transformer: Transformer that transforms the shape.
:return: Transformed shape.
"""
shape_copy = copy.deepcopy(shape)
if isinstance(shape_copy, Rectangle) or isinstance(shape_copy, Circle):
shape_copy.center[0], shape_copy.center[1] = transformer.transform(shape_copy.center[0], shape_copy.center[1])
elif isinstance(shape_copy, Polygon):
for vertex in shape_copy.vertices:
vertex[0], vertex[1] = transformer.transform(vertex[0], vertex[1])
return shape_copy


def project_complete_scenario_and_pps(
scenario: Scenario, planning_problem_set: PlanningProblemSet, proj_string_from: str, proj_string_to: str
) -> [Scenario, PlanningProblemSet]:
"""
Function that performs a projection onto the entire scenario.
:param scenario: Scenario that needs to be projected.
:param planning_problem_set: PlanningProblemSet that needs to be projected (if not None)
:param proj_string_from: Source projection.
:param proj_string_to: Target projection.
:return: Projected scenario.
"""
crs_from = CRS(proj_string_from)
crs_to = CRS(proj_string_to)
transformer = Transformer.from_proj(crs_from, crs_to)

# create a deep copy of the scenario
scenario_copy = copy.deepcopy(scenario)

# transform lanelet vertex coordinates
for lanelet in scenario_copy.lanelet_network.lanelets:
for left_vertex in lanelet.left_vertices:
left_vertex[0], left_vertex[1] = transformer.transform(left_vertex[0], left_vertex[1])
for right_vertex in lanelet.right_vertices:
right_vertex[0], right_vertex[1] = transformer.transform(right_vertex[0], right_vertex[1])
for center_vertex in lanelet.center_vertices:
center_vertex[0], center_vertex[1] = transformer.transform(center_vertex[0], center_vertex[1])

# transform stop line coordinates
if lanelet.stop_line is not None:
lanelet.stop_line.start[0], lanelet.stop_line.start[1] = transformer.transform(
lanelet.stop_line.start[0], lanelet.stop_line.start[1]
)
lanelet.stop_line.end[0], lanelet.stop_line.end[1] = transformer.transform(
lanelet.stop_line.end[0], lanelet.stop_line.end[1]
)

# transform traffic light coordinates
for tl in scenario_copy.lanelet_network.traffic_lights:
tl.position[0], tl.position[1] = transformer.transform(tl.position[0], tl.position[1])

# transform traffic sign coordinates
for ts in scenario_copy.lanelet_network.traffic_signs:
ts.position[0], ts.position[1] = transformer.transform(ts.position[0], ts.position[1])

# transform area coordinates
for area in scenario_copy.lanelet_network.areas:
for border in area.border:
for vertex in border.border_vertices:
vertex[0], vertex[1] = transformer.transform(vertex[0], vertex[1])

# transform obstacle coordinates
for obstacle in scenario_copy.obstacles:
if obstacle.obstacle_shape:
obstacle.obstacle_shape = transform_shape(obstacle.obstacle_shape, transformer)

if obstacle.initial_state:
if obstacle.initial_state.position:
if isinstance(obstacle.initial_state.position, ValidTypes.ARRAY):
obstacle.initial_state.position[0], obstacle.initial_state.position[1] = transformer.transform(
obstacle.initial_state.position[0], obstacle.initial_state.position[1]
)
if isinstance(obstacle.initial_state.position, Shape):
obstacle.initial_state.position = transform_shape(obstacle.initial_state.position, transformer)

if obstacle.prediction:
if obstacle.prediction.occupancy_set:
for occupancy in obstacle.prediction.occupancy_set:
occupancy.shape = transform_shape(occupancy.shape, transformer)

if obstacle.prediction.shape:
obstacle.prediction.shape = transform_shape(obstacle.prediction.shape, transformer)

if obstacle.prediction.trajectory:
for state in obstacle.prediction.trajectory.state_list:
if state.position:
if isinstance(state.position, ValidTypes.ARRAY):
state.position[0], state.position[1] = transformer.transform(
state.position[0], state.position[1]
)
if isinstance(state.position, Shape):
state.position = transform_shape(state.position, transformer)

# transform planning problems
for planning_problem in planning_problem_set.planning_problem_dict.values():
if planning_problem.initial_state:
if isinstance(planning_problem.initial_state.position, ValidTypes.ARRAY):
(
planning_problem.initial_state.position[0],
planning_problem.initial_state.position[1],
) = transformer.transform(
planning_problem.initial_state.position[0], planning_problem.initial_state.position[1]
)
if isinstance(planning_problem.initial_state.position, Shape):
planning_problem.initial_state.position = transform_shape(
planning_problem.initial_state.position, transformer
)

if planning_problem.goal:
for state in planning_problem.goal.state_list:
if state.position:
if isinstance(state.position, ValidTypes.ARRAY):
state.position[0], state.position[1] = transformer.transform(
state.position[0], state.position[1]
)
if isinstance(state.position, Shape):
state.position = transform_shape(state.position, transformer)

return scenario_copy, planning_problem_set


class CRDesignerFileReader(CommonRoadFileReader):
def __init__(self, filename: Path_T, file_format: Optional[FileFormat] = None):
"""
Expand Down Expand Up @@ -172,7 +43,7 @@ def open(
scenario, planning_problem_set = super().open(lanelet_assignment)

# check for a projection
# If target projection is provided, no projection should be applied
# If target projection is not provided, no projection should be applied
if target_projection is not None:
proj_string_from = scenario.location.geo_transformation.geo_reference
# If no source projection is defined in the scenario location element, we should skip the projection
Expand Down
20 changes: 20 additions & 0 deletions crdesigner/common/file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from commonroad.planning.planning_problem import PlanningProblemSet
from commonroad.scenario.scenario import Location, Scenario, Tag

from crdesigner.common.common_file_reader_writer import (
project_complete_scenario_and_pps,
)
from crdesigner.verification_repairing.map_verification_repairing import (
verify_and_repair_scenario,
)
Expand Down Expand Up @@ -49,6 +52,7 @@ def write_to_file(
overwrite_existing_file: OverwriteExistingFile = OverwriteExistingFile.ASK_USER_INPUT,
check_validity: bool = False,
verify_repair_scenario=False,
target_projection: str = None,
):
"""
Writes CommonRoad scenario and planning problems to file.
Expand All @@ -58,8 +62,24 @@ def write_to_file(
:param overwrite_existing_file: Overwriting mode
:param check_validity: Validity check or not
:param verify_repair_scenario: Boolean that indicates if the function will verify and repair the scenario.
:param target_projection: Target projection that the user provides.
:return: Scenario and planning problems
"""

# check for a projection
# If target projection is not provided, no projection should be applied
if target_projection is not None:
proj_string_from = self._file_writer.scenario.location.geo_transformation.geo_reference
# If no source projection is defined in the scenario location element, we should skip the projection
if proj_string_from is not None:
self._file_writer.scenario, self._file_writer.planning_problem_set = project_complete_scenario_and_pps(
self._file_writer.scenario,
self._file_writer.planning_problem_set,
proj_string_from,
target_projection,
)

# check for verifying and repairing the scenario
if verify_repair_scenario is True:
self._file_writer.scenario = verify_and_repair_scenario(self._file_writer.scenario)[0]

Expand Down
8 changes: 7 additions & 1 deletion tests/common/test_crdesigner_file_reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ def test_crdesigner_file_reader(self):
scenario = crdesigner_reader.open(verify_repair_scenario=False)[0]
# opening it with verifying and repairing
repaired_scenario = crdesigner_reader.open(verify_repair_scenario=True)[0]

self.assertNotEqual(scenario, repaired_scenario)
self.assertEqual(repaired_scenario, verify_and_repair_scenario(scenario)[0])

# opening it with the projection
projected_scenario = crdesigner_reader.open(
target_projection=scenario.location.geo_transformation.geo_reference
)[0]
# scenarios should be the same as the proj_string is the same
self.assertEqual(projected_scenario.lanelet_network, scenario.lanelet_network)

def test_crdesigner_file_writer(self):
# reading a scenario
crdesigner_reader = CRDesignerFileReader(
Expand Down

0 comments on commit 889e6d0

Please sign in to comment.