Skip to content

Commit

Permalink
started refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
grunde73 committed Jun 10, 2024
1 parent 668b471 commit 775edad
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 261 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"python.languageServer": "Pylance",
"editor.formatOnSave": true,
"editor.formatOnSave": false,
"notebook.formatOnSave.enabled": true,
"notebook.codeActionsOnSave": {
"notebook.source.fixAll": true,
Expand Down Expand Up @@ -29,4 +29,4 @@
"python.terminal.executeInFileDir": true,
"python.terminal.activateEnvironment": true,
"python.terminal.activateEnvInCurrentTerminal": false,
}
}
4 changes: 2 additions & 2 deletions src/maritime_schema/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"""

# from .state import Position, StateVector
from .meta import MTSBaseModel


__all__ = []
__all__ = [MTSBaseModel, ]
278 changes: 22 additions & 256 deletions src/maritime_schema/types/caga.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,267 +14,33 @@

# change
from maritime_schema.types.caga_examples import (
create_caga_config_example,
create_caga_data_example,
create_caga_event_example,
create_caga_time_frame_example,
create_detected_ship_example,
create_environment_example,
create_initial_example,
create_position_example,
create_predicted_point_example,
create_ship_example,
create_ship_static_example,
create_simulation_data_example,
create_simulation_timeframe_example,
create_software_config_example,
create_waypoint_example,
)
create_caga_config_example, create_caga_data_example,
create_caga_event_example, create_caga_time_frame_example,
create_detected_ship_example, create_environment_example,
create_initial_example, create_position_example,
create_predicted_point_example, create_ship_example,
create_ship_static_example, create_simulation_data_example,
create_simulation_timeframe_example, create_software_config_example,
create_waypoint_example)
from maritime_schema.utils.strings import to_camel

from . import MTSBaseModel

__ALL__ = ["TrafficSituation", "OutputSchema", "publish_schema"]

logger = logging.getLogger(__name__)


class BaseModelConfig(BaseModel):
"""Enables the alias_generator for all cases."""

model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)









class Initial(BaseModelConfig):
position: Position = Field(
None,
description="Initial longitude and latitude of the ship.",
examples=[create_position_example()],
)
sog: float = Field(
None,
ge=0,
description="Initial ship speed over ground in knots",
examples=[10.0],
)
cog: float = Field(
None,
ge=0,
le=360,
description="Initial ship course over ground in degrees",
examples=[45.0],
)
heading: float = Field(
None,
ge=0,
le=360,
description="Initial ship heading in degrees",
examples=[45.2],
)
nav_status: Optional[AISNavStatus] = Field(None, description="AIS Navigational Status")


class DataPoint(BaseModelConfig):
value: Optional[float] = Field(
None,
description="the value of the data at the current timestep",
examples=[12.3],
)
m_before_leg_change: Optional[float] = Field(
None,
description="meters before the waypoint to start interpolating to the new value",
examples=[10],
)
m_after_leg_change: Optional[float] = Field(
None,
description="meters after the waypoint to finish interpolating to the new value",
examples=[10],
)
interp_method: Optional[Union[InterpolationMethod, str]] = Field(None, description="Method used for interpolation")


class Data(BaseModelConfig):
model_config = ConfigDict(
extra="allow",
json_schema_extra={
"additionalProperties": {
"type": "object",
"properties": {
"value": {"type": "number"},
"mBeforeLegChange": {"type": "number"},
"mAfterLegChange": {"type": "number"},
"interpMethod": {"type": "string"},
},
"required": [],
"description": "The 'data' field can include additional properties. All additional properties should be DataPoint objects.",
},
"sog": {
"type": "object",
"properties": {
"value": {"type": "number"},
"mBeforeLegChange": {"type": "number"},
"mAfterLegChange": {"type": "number"},
"interpMethod": {"type": "string"},
},
"required": [],
"description": "Speed data point",
"examples": [
{
"value": 12.3,
"mBeforeLegChange": 100,
"mAfterLegChange": 100,
"interpMethod": "linear",
}
],
},
"heading": {
"type": "object",
"properties": {
"value": {"type": "number"},
"mBeforeLegChange": {"type": "number"},
"mAfterLegChange": {"type": "number"},
"interpMethod": {"type": "string"},
},
"required": [],
"description": "Heading data point",
"examples": [
{
"value": 180,
"mBeforeLegChange": 100,
"mAfterLegChange": 100,
"interpMethod": "linear",
}
],
},
},
)


class Waypoint(BaseModelConfig):
position: Position = Field(
description="A geographical coordinate",
examples=[Position(latitude=51.2123, longitude=11.2313)],
)
turn_radius: Optional[float] = Field(
None,
description="Orthodrome turn radius as defined in RTZ format",
examples=[200],
)
data: Optional[Data] = Field(
None,
description="A `Data` object that includes `speed`, `course`, and `heading` data points",
)


class Ship(BaseModelConfig):
static: ShipStatic = Field(
None,
description="Static ship information which does not change during a scenario.",
examples=[create_ship_static_example()],
)
initial: Optional[Initial] = Field(None, examples=[create_initial_example()])
waypoints: Optional[List[Waypoint]] = Field(
None,
description="An array of `Waypoint` objects. Each waypoint object must have a `position` property. <br /> If no turn radius is provided, it will be assumed to be `0`. <br /> Additional data can be added to each waypoint leg. This allows varying parameters on a per-leg basis, such as speed and heading, or navigational status ",
examples=[create_waypoint_example()],
)

def __init__(self, **data: Any):
super().__init__(**data)
if not self.waypoints:
self.waypoints = self.generate_waypoints()

def generate_waypoints(self) -> Union[List[Waypoint], None]:
"""Generate waypoints if they don't exist."""
waypoints = []

if self.initial:
# Create waypoints from initial position
geod = Geod(ellps="WGS84")
lon, lat, _ = geod.fwd(
self.initial.position.longitude,
self.initial.position.latitude,
self.initial.cog,
10000, # distance in meters
)
position1 = Position(latitude=lat, longitude=lon)
waypoint1 = Waypoint(position=position1)

position0 = Position(
latitude=self.initial.position.latitude,
longitude=self.initial.position.longitude,
)
waypoint0 = Waypoint(position=position0, data=None)

waypoints = [waypoint0, waypoint1]
return waypoints

else:
return None


class OwnShip(Ship):
pass


class TargetShip(Ship):
pass


class TrafficSituation(BaseModelConfig):
title: str = Field(
None,
description="The title of the traffic situation",
examples=["overtaking_18"],
)
description: Optional[str] = Field(
None,
description="A description of the traffic situation",
examples=["Crossing situation with 3 target vessels in the Oslofjord"],
)
start_time: Optional[datetime] = Field(
None,
description="Starting time of the situation in `ISO 8601` format `YYYY-MM-DDThh:mm:ssZ`",
examples=[datetime.now()],
)
own_ship: Union[OwnShip, Ship] = Field(
title="Own Ship data",
description="Own Ship data",
examples=[create_ship_example()],
)
target_ships: List[Union[TargetShip, Ship]] = Field(
None,
title="Target Ship data",
description="Target Ship data",
examples=[[create_ship_example()]],
)
environment: Optional[Environment] = Field(
None,
description="environmental parameters",
examples=[create_environment_example()],
)

model_config = ConfigDict(
extra="allow",
title="Test Input Schema",
json_schema_extra={"additionalProperties": True, "omit_default": True},
)


class SoftwareConfig(BaseModelConfig):
class SoftwareConfig(MTSBaseModel):
name: str = Field(..., description="The name of the system", examples=["AutoNavigation-System 1"])
vendor: str = Field(..., description="The name of the system vendor", examples=["CompanyABC"])
version: str = Field(..., description="The software version", examples=["1.2.3"])

model_config = ConfigDict(json_schema_extra={"additionalProperties": True})


class CagaConfiguration(BaseModelConfig):
class CagaConfiguration(MTSBaseModel):
name: str = Field(..., description="The name of the system", examples=["AutoNavigation-System 1"])
vendor: str = Field(..., description="The name of the system vendor", examples=["CompanyABC"])
version: str = Field(..., description="The software version", examples=["1.2.3"])
Expand Down Expand Up @@ -317,7 +83,7 @@ class EncounterType(str, Enum):
NO_RISK = "No Risk"


class PredictedPoint(BaseModelConfig):
class PredictedPoint(MTSBaseModel):
time: Union[datetime, int] = Field(
...,
description="Date and Time of the predicted value `ISO 8601` format `YYYY-MM-DDThh:mm:ssZ`",
Expand All @@ -330,7 +96,7 @@ class PredictedPoint(BaseModelConfig):
)


class DetectedShip(BaseModelConfig):
class DetectedShip(MTSBaseModel):
id: UUID = Field(..., description="Unique Identifier", examples=[uuid4()])

position: Position = Field(
Expand Down Expand Up @@ -389,7 +155,7 @@ class DetectedShip(BaseModelConfig):
model_config = ConfigDict(json_schema_extra={"additionalProperties": True})


class SimulatedShip(BaseModelConfig):
class SimulatedShip(MTSBaseModel):
id: UUID = Field(..., description="Unique Identifier", examples=[uuid4()])

position: Position = Field(
Expand Down Expand Up @@ -426,7 +192,7 @@ class SimulatedShip(BaseModelConfig):
model_config = ConfigDict(json_schema_extra={"additionalProperties": True})


class CagaTimeStep(BaseModelConfig):
class CagaTimeStep(MTSBaseModel):
time: Union[datetime, int] = Field(
...,
description="Date and Time of the predicted value `ISO 8601` format `YYYY-MM-DDThh:mm:ssZ`",
Expand All @@ -443,7 +209,7 @@ class CagaTimeStep(BaseModelConfig):
)


class CagaEvent(BaseModelConfig):
class CagaEvent(MTSBaseModel):
time: Union[datetime, int] = Field(..., description="Date and Time of the event", examples=[datetime.now()])
route: List[Waypoint] = Field(
None,
Expand All @@ -462,13 +228,13 @@ class CagaEvent(BaseModelConfig):
model_config = ConfigDict(json_schema_extra={"additionalProperties": True})


class SimulatorEvent(BaseModelConfig):
class SimulatorEvent(MTSBaseModel):
time: Union[datetime, int] = Field(..., description="Date and Time of the event", examples=[datetime.now()])

model_config = ConfigDict(json_schema_extra={"additionalProperties": True})


class CagaData(BaseModelConfig):
class CagaData(MTSBaseModel):
configuration: Optional[CagaConfiguration] = Field(
None, description="System Configuration", examples=[create_caga_config_example()]
)
Expand All @@ -484,7 +250,7 @@ class CagaData(BaseModelConfig):
)


class SimulationTimeFrame(BaseModelConfig):
class SimulationTimeFrame(MTSBaseModel):
time: Union[datetime, int] = Field(
...,
description="Date and Time of the predicted value `ISO 8601` format `YYYY-MM-DDThh:mm:ssZ`",
Expand All @@ -494,7 +260,7 @@ class SimulationTimeFrame(BaseModelConfig):
target_ships: List[SimulatedShip]


class SimulationData(BaseModelConfig):
class SimulationData(MTSBaseModel):
configuration: Optional[SoftwareConfig] = Field(
None,
description="Simulator software configuration",
Expand All @@ -513,7 +279,7 @@ class SimulationData(BaseModelConfig):
event_data: Optional[List[SimulatorEvent]] = Field(None, description="Event data from the simulator")


class OutputSchema(BaseModelConfig):
class OutputSchema(MTSBaseModel):
creation_time: datetime = Field(
...,
description="Date and Time that this file was created, in `ISO 8601` format `YYYY-MM-DDThh:mm:ssZ`. This should be the simulation end time.",
Expand Down
Loading

0 comments on commit 775edad

Please sign in to comment.