Skip to content

Commit

Permalink
rename: extract/transform/load
Browse files Browse the repository at this point in the history
  • Loading branch information
herve.le-bars committed Mar 31, 2024
1 parent 4ea2053 commit 6942149
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 17 deletions.
4 changes: 3 additions & 1 deletion src/tasks/dimensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .load_dim_port_from_csv import *
from .load_dim_vessel_from_csv import *
from .load_dim_zone_amp_from_csv import *
from .load_dim_zone_amp_from_csv import *

from .compute_port_geometry_buffer import *
1 change: 0 additions & 1 deletion src/tasks/dimensions/load_dim_vessel_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class LoadDimVesselFromCsv(BaseTask):
def map_to_domain(self, row: pd.Series) -> Vessel:
isna = row.isna()
return Vessel(
id=int(row['id']),
mmsi=int(row["mmsi"]) if not isna["mmsi"] else None,
ship_name=row["ship_name"],
width=int(row["width"]) if not isna["width"] else None,
Expand Down
5 changes: 3 additions & 2 deletions src/tasks/dimensions/load_dim_zone_amp_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ def run(self, *args, **kwargs):
use_cases = UseCases()
db = use_cases.db()
zone_repository = use_cases.zone_repository()

amp_data_csv_path= kwargs['amp_data_csv_path'] if 'amp_data_csv_path' in kwargs else settings.amp_data_csv_path

total = 0
try:
df = pd.read_csv(kwargs['amp_data_csv_path'], sep=",")
df = pd.read_csv(amp_data_csv_path, sep=',')
df = df.rename(columns={"Geometry": "geometry",
"Index": "index", "WDPAID": "wdpaid",
"Name": "name",
Expand All @@ -48,7 +50,6 @@ def run(self, *args, **kwargs):
zones = zone_repository.batch_create_zone(session, list(zones))
session.commit()
total = len(zones)
print(zones)
except ValidationError as e:
logger.error("Erreur de validation des données de zone AMP")
logger.error(e.errors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tasks.base import BaseTask


class LoadSpireDataFromApi(BaseTask):
class ExtractSpireDataFromApi(BaseTask):

def run(self, *args, **kwargs) -> None:
use_cases = UseCases()
Expand Down Expand Up @@ -62,7 +62,7 @@ def run(self, *args, **kwargs) -> None:
args = parser.parse_args()
time_start = perf_counter()
logger.info("DEBUT - Chargement des données JSON depuis l'API SPIRE")
LoadSpireDataFromApi(dump_path=args.dump_path).start()
ExtractSpireDataFromApi(dump_path=args.dump_path).start()
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Chargement des données depuis l'API SPIRE en {duration:.2f}s")
47 changes: 47 additions & 0 deletions src/tasks/facts/extract_spire_data_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import argparse
import json
from pathlib import Path
from time import perf_counter

from bloom.container import UseCases
from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain
from bloom.logger import logger
from pydantic import ValidationError
from tasks.base import BaseTask


class ExtractSpireDataFromCsv(BaseTask):

def run(self, *args, **kwargs) -> None:
use_cases = UseCases()
spire_ais_data_repository = use_cases.spire_ais_data_repository()
db = use_cases.db()

logger.info(f"Loading spire data from {kwargs['file_name']}")
orm_data = []
try:
df = pd.read_csv(settings.spire_data_csv_path, sep=";")
with Path.open(kwargs['file_name']) as json_data, db.session() as session:
raw_vessels = json.load(json_data)
spire_ais_data = map_raw_vessels_to_domain(raw_vessels)
orm_data = spire_ais_data_repository.batch_create_ais_data(spire_ais_data, session)
session.commit()
except ValidationError as e:
logger.error("Erreur de validation des données JSON")
logger.error(e.errors())
logger.info(f"{len(orm_data)} vessel data loaded")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Load Spire data from file CSV file")
parser.add_argument(
"filename",
help="Path to CSV file to load",
)
args = parser.parse_args()
time_start = perf_counter()
logger.info(f"DEBUT - Chargement des données CSV depuis le fichier {args.filename}")
ExtractSpireDataFromCsv(file_name=args.filename).start()
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Chargement des données CSV en {duration:.2f}s")
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tasks.base import BaseTask


class ComputePortGeometryBuffer(BaseTask):
class ExtractSpireDataFromJson(BaseTask):

def run(self, *args, **kwargs) -> None:
use_cases = UseCases()
Expand All @@ -20,7 +20,7 @@ def run(self, *args, **kwargs) -> None:
logger.info(f"Loading spire data from {kwargs['file_name']}")
orm_data = []
try:
with Path(kwargs['file_name']).open() as json_data, db.session() as session:
with Path.open(kwargs['file_name']) as json_data, db.session() as session:
raw_vessels = json.load(json_data)
spire_ais_data = map_raw_vessels_to_domain(raw_vessels)
orm_data = spire_ais_data_repository.batch_create_ais_data(spire_ais_data, session)
Expand All @@ -40,7 +40,7 @@ def run(self, *args, **kwargs) -> None:
args = parser.parse_args()
time_start = perf_counter()
logger.info(f"DEBUT - Chargement des données JSON depuis le fichier {args.filename}")
ComputePortGeometryBuffer(file_name=args.filename).start()
ExtractSpireDataFromJson(file_name=args.filename).start()
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Chargement des données JSON en {duration:.2f}s")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tasks.base import BaseTask


class LoadVesselPositionsDataTask(BaseTask):
class ExtractVesselPositionsDataTask(BaseTask):
def run(self, *args, **kwargs):
engine = create_engine(settings.db_url)

Expand All @@ -18,4 +18,4 @@ def run(self, *args, **kwargs):


if __name__ == "__main__":
LoadVesselPositionsDataTask().start()
ExtractVesselPositionsDataTask().start()
6 changes: 3 additions & 3 deletions src/tasks/load_dimensions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import sys

from tasks.base import BaseTask
from tasks.dimensions import LoadDimPortFromCsv, LoadDimVesselFromCsv, LoadDimZoneAmpFromCsv
from tasks.transformations import ComputePortGeometryBuffer
from tasks.dimensions import LoadDimPortFromCsv, LoadDimVesselFromCsv, LoadDimZoneAmpFromCsv,\
ComputePortGeometryBuffer


class LoadDimensions(BaseTask):

def run(self, *args, **kwargs):
LoadDimZoneAmpFromCsv(*args, **kwargs).start()
LoadDimPortFromCsv(*args, **kwargs).start()
ComputePortGeometryBuffer(*args, **kwargs).start()
LoadDimPortFromCsv(*args, **kwargs).start()
LoadDimVesselFromCsv(*args, **kwargs).start()


Expand Down
4 changes: 2 additions & 2 deletions src/tasks/load_facts.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import sys

from tasks.base import BaseTask
from tasks.facts import LoadVesselPositionsDataTask
from tasks.facts import CleanPositionsTask


class LoadFacts(BaseTask):

def run(self, *args, **kwargs):
LoadVesselPositionsDataTask(*args, **kwargs).start()
CleanPositionsTask(*args, **kwargs).start()


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
class PipelinePorts(BaseTask):

def run(self, *args, **kwargs):
LoadSpireDataFromApi(*args, **kwargs).start()
UpdateVesselDataVoyage(*args, **kwargs).start()
CleanPositionsTask(*args, **kwargs).start()

Expand Down

0 comments on commit 6942149

Please sign in to comment.