Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add tests #4

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Fix(mtd) fixing sync and adding geonatures fix
  • Loading branch information
Pierre-Narcisi committed Aug 27, 2024
commit 68d3b5bf01eda8319a7fd9ced005fc8addd7bcdf
20 changes: 17 additions & 3 deletions src/mtd_sync/mtd_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _get_xml(self, path):
def _get_af_xml(self):
return self._get_xml(self.af_path)

def get_af_list(self):
def get_af_list(self) -> list:
xml = self._get_af_xml()
_xml_parser = etree.XMLParser(ns_clean=True, recover=True, encoding="utf-8")
root = etree.fromstring(xml, parser=_xml_parser)
Expand All @@ -82,7 +82,7 @@ def get_af_list(self):
def _get_ds_xml(self):
return self._get_xml(self.ds_path)

def get_ds_list(self):
def get_ds_list(self) -> list:
xml = self._get_ds_xml()
return parse_jdd_xml(xml)

Expand Down Expand Up @@ -221,11 +221,19 @@ def process_af_and_ds(af_list, ds_list, id_role=None):
continue
user_add_total_time += time.time() - start_add_user_time
af = sync_af(af)
# TODO: choose whether or not to commit retrieval of the AF before association of actors
# and possibly retrieve an AF without any actor associated to it
db.session.commit()
# If AF has not been synchronized ; due to the lack of a UUID ; actor cannot be associated to it
# and thus we skip to the next AF
if not af:
continue
associate_actors(
actors,
CorAcquisitionFrameworkActor,
"id_acquisition_framework",
af.id_acquisition_framework,
af.unique_acquisition_framework_id,
)
# TODO: remove actors removed from MTD
db.session.commit()
Expand All @@ -242,7 +250,13 @@ def process_af_and_ds(af_list, ds_list, id_role=None):
user_add_total_time += time.time() - start_add_user_time
ds = sync_ds(ds, list_cd_nomenclature)
if ds is not None:
associate_actors(actors, CorDatasetActor, "id_dataset", ds.id_dataset)
associate_actors(
actors,
CorDatasetActor,
"id_dataset",
ds.id_dataset,
ds.unique_dataset_id,
)

user_add_total_time = round(user_add_total_time, 2)
db.session.commit()
Expand Down
147 changes: 126 additions & 21 deletions src/mtd_sync/mtd_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import json
from copy import copy
import pprint
from typing import Literal
from flask import current_app

from sqlalchemy import select, exists
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
from sqlalchemy.sql import func, update

from sqlalchemy.dialects.postgresql import insert as pg_insert
Expand Down Expand Up @@ -122,7 +124,19 @@ def sync_af(af):
TAcquisitionFramework
The updated or inserted acquisition framework.
"""
# TODO: handle case where af_uuid is None ; as will raise an error at database level when executing the statement below ;
# af_uuid being None, i.e. af UUID is missing, could be due to no UUID specified in `<ca:identifiantCadre/>` tag in the XML file
# Solutions - if UUID is missing:
# - Just pass the sync of the AF
# - Generate a UUID for the AF
af_uuid = af["unique_acquisition_framework_id"]

if not af_uuid:
log.warning(
f"No UUID provided for the AF with UUID '{af_uuid}' - SKIPPING SYNCHRONIZATION FOR THIS AF."
)
return None

af_exists = DB.session.scalar(
exists().where(TAcquisitionFramework.unique_acquisition_framework_id == af_uuid).select()
)
Expand Down Expand Up @@ -185,21 +199,34 @@ def add_or_update_organism(uuid, nom, email):
return DB.session.execute(statement).scalar()


def associate_actors(actors, CorActor, pk_name, pk_value):
def associate_actors(
actors,
CorActor: CorAcquisitionFrameworkActor | CorDatasetActor,
pk_name: Literal["id_acquisition_framework", "id_dataset"],
pk_value: str,
uuid_mtd: str,
):
"""
Associate actor and DS or AF according to CorActor value.
Associate actors with either a given :
- Acquisition framework - writing to the table `gn_meta.cor_acquisition_framework_actor`.
- Dataset - writing to the table `gn_meta.cor_dataset_actor`.

Parameters
----------
actors : list
list of actors
CorActor : db.Model
table model
pk_name : str
pk attribute name
CorActor : CorAcquisitionFrameworkActor | CorDatasetActor
the SQLAlchemy model corresponding to the destination table
pk_name : Literal['id_acquisition_framework', 'id_dataset']
pk attribute name:
- 'id_acquisition_framework' for AF
- 'id_dataset' for DS
pk_value : str
pk value
pk value: ID of the AF or DS
uuid_mtd : str
UUID of the AF or DS
"""
type_mtd = "AF" if pk_name == "id_acquisition_framework" else "DS"
for actor in actors:
id_organism = None
uuid_organism = actor["uuid_organism"]
Expand All @@ -218,20 +245,49 @@ def associate_actors(actors, CorActor, pk_name, pk_value):
),
**{pk_name: pk_value},
)
if not id_organism:
values["id_role"] = DB.session.scalar(
select(User.id_role).filter_by(email=actor["email"])
)
else:
# TODO: choose wether to:
# - (retained) Try to associate to an organism first and then to a user
# - Try to associate to a user first and then to an organism
if id_organism:
values["id_organism"] = id_organism
statement = (
pg_insert(CorActor)
.values(**values)
.on_conflict_do_nothing(
index_elements=[pk_name, "id_organism", "id_nomenclature_actor_role"],
# TODO: handle case where no user is retrieved for the actor email:
# - (retained) Just do not try to associate the actor with the metadata
# - Try to retrieve and id_organism from the organism name - field `organism`
# - Try to retrieve and id_organism from the actor email considered as an organism email - field `email`
# - Try to insert a new user from the actor name - field `name` - and possibly also email - field `email`
else:
id_user_from_email = DB.session.scalar(
select(User.id_role).filter_by(email=actor["email"]).where(User.groupe.is_(False))
)
if id_user_from_email:
values["id_role"] = id_user_from_email
else:
log.warning(
f"MTD - actor association impossible for {type_mtd} with UUID '{uuid_mtd}' because no id_organism nor id_organism could be retrieved - with the following actor information:\n"
+ format_str_dict_actor_for_logging(actor)
)
continue
try:
statement = (
pg_insert(CorActor)
.values(**values)
.on_conflict_do_nothing(
index_elements=[
pk_name,
"id_organism" if id_organism else "id_role",
"id_nomenclature_actor_role",
],
)
)
DB.session.execute(statement)
except IntegrityError as I:
db.session.rollback()
# TODO: uncomment following lines when work is done - commented to limit log while developping
log.error(
f"MTD - DB INTEGRITY ERROR - actor association failed for {type_mtd} with UUID '{uuid_mtd}' and following actor information:\n"
+ format_sqlalchemy_error_for_logging(I)
+ format_str_dict_actor_for_logging(actor)
)
)
DB.session.execute(statement)


def associate_dataset_modules(dataset):
Expand All @@ -251,6 +307,54 @@ def associate_dataset_modules(dataset):
)


def format_sqlalchemy_error_for_logging(error: SQLAlchemyError):
"""
Format SQLAlchemy error information in a nice way for MTD logging

Parameters
----------
error : SQLAlchemyError
the SQLAlchemy error

Returns
-------
str
formatted error information
"""
indented_original_error_message = str(error.orig).replace("\n", "\n\t")

formatted_error_message = "".join(
[
f"\t{indented_original_error_message}",
f"SQL QUERY: {error.statement}\n",
f"\tSQL PARAMS: {error.params}\n",
]
)

return formatted_error_message


def format_str_dict_actor_for_logging(actor: dict):
"""
Format actor information in a nice way for MTD logging

Parameters
----------
actor : dict
actor information: actor_role, email, name, organism, uuid_organism, ...

Returns
-------
str
formatted actor information
"""
formatted_str_dict_actor = "\tACTOR:\n\t\t" + pprint.pformat(actor).replace(
"\n", "\n\t\t"
).rstrip("\t")

return formatted_str_dict_actor


class CasAuthentificationError(GeonatureApiError):
pass

Expand All @@ -259,7 +363,8 @@ def insert_user_and_org(info_user, update_user_organism: bool = True):
id_provider_inpn = current_app.config["MTD_SYNC"]["ID_PROVIDER_INPN"]
idprov = AuthenficationCASINPN()
idprov.id_provider = id_provider_inpn
auth_manager.add_provider(id_provider_inpn, idprov)
if id_provider_inpn not in auth_manager:
auth_manager.add_provider(id_provider_inpn, idprov)

# if not id_provider_inpn in auth_manager:
# raise GeonatureApiError(
Expand Down
27 changes: 26 additions & 1 deletion src/mtd_sync/xml_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,35 @@ def parse_jdd_xml(xml):

root = ET.fromstring(xml, parser=_xml_parser)
jdd_list = []

def format_acquisition_framework_id_from_xml(provided_af_uuid) -> str | None:
"""
Format the acquisition framework UUID provided for the dataset
i.e. the value for the tag `<jdd:identifiantCadre>` in the XML file

Args:
provided_af_uuid (str): The acquisition framework UUID
Returns:
str | None: The formatted acquisition framework UUID, or None if none was provided
"""
if not provided_af_uuid:
return None

if provided_af_uuid.startswith("http://oafs.fr/meta/ca/"):
return provided_af_uuid.split("/")[-1]

return provided_af_uuid

for jdd in root.findall(".//" + namespace + "JeuDeDonnees"):
# We extract all the required informations from the different tags of the XML file
jdd_uuid = get_tag_content(jdd, "identifiantJdd")
ca_uuid = get_tag_content(jdd, "identifiantCadre")
# TODO: handle case where value for the tag `<jdd:identifiantCadre>` in the XML file is not of the form `xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx`
# Solutions - if in the form `http://oafs.fr/meta/ca/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx` (has some entries for INPN MTD PREPROD and instance 'Nationale') :
# - (retained) Format by keeping only the `xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx` part
# - Add a check further in the MTD sync to process only if ca_uuid is in the right format
ca_uuid = format_acquisition_framework_id_from_xml(
get_tag_content(jdd, "identifiantCadre")
)
dataset_name = get_tag_content(jdd, "libelle")
dataset_shortname = get_tag_content(jdd, "libelleCourt", default_value="")
dataset_desc = get_tag_content(jdd, "description", default_value="")
Expand Down
Loading