From 3c87dd09308ed1b0d622653f47214d688a8e0b12 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 26 Feb 2024 15:16:06 -0600 Subject: [PATCH] New ETL schema --- src/api/src/backend/utils/__init__.py | 24 ++-- src/api/src/backend/views/http/etl.py | 19 +++- src/api/src/backend/views/http/etlV2.py | 66 ----------- .../src/backend/views/http/tapis_etl.old.py | 90 +++++++++++++++ src/api/src/backend/views/http/tapis_etl.py | 103 +++++++++++------- src/api/src/tests/TestTapisETLPipeline.py | 8 +- .../tests/fixtures/tapis-etl-pipeline.json | 74 ++++++++----- 7 files changed, 230 insertions(+), 154 deletions(-) delete mode 100644 src/api/src/backend/views/http/etlV2.py create mode 100644 src/api/src/backend/views/http/tapis_etl.old.py diff --git a/src/api/src/backend/utils/__init__.py b/src/api/src/backend/utils/__init__.py index 13e9a1bb..529d3f1a 100644 --- a/src/api/src/backend/utils/__init__.py +++ b/src/api/src/backend/utils/__init__.py @@ -27,25 +27,25 @@ def build_etl_pipeline_env(body): # Convert the data integrity policies to dicts. Easier # to handle for null values via .get local_inbox_data_integrity_profile = {} - if getattr(body.local_inbox, "data_integrity_profile", None) != None: - local_inbox_data_integrity_profile = body.local_inbox.data_integrity_profile.dict() + if getattr(body.local_inbox.data, "integrity_profile", None) != None: + local_inbox_data_integrity_profile = body.local_inbox.data.integrity_profile.dict() local_outbox_data_integrity_profile = {} - if getattr(body.local_outbox, "data_integrity_profile", None) != None: - local_outbox_data_integrity_profile = body.local_outbox.data_integrity_profile.dict() + if getattr(body.local_outbox.data, "integrity_profile", None) != None: + local_outbox_data_integrity_profile = body.local_outbox.data.integrity_profile.dict() remote_inbox_data_integrity_profile = {} - if getattr(body.remote_inbox, "data_integrity_profile", None) != None: - remote_inbox_data_integrity_profile = body.remote_inbox.data_integrity_profile.dict() + if getattr(body.remote_inbox.data, "integrity_profile", None) != None: + remote_inbox_data_integrity_profile = body.remote_inbox.data.integrity_profile.dict() remote_outbox_data_integrity_profile = {} - if getattr(body.remote_outbox, "data_integrity_profile", None) != None: - remote_outbox_data_integrity_profile = body.remote_outbox.data_integrity_profile.dict() + if getattr(body.remote_outbox.data, "integrity_profile", None) != None: + remote_outbox_data_integrity_profile = body.remote_outbox.data.integrity_profile.dict() - body.local_inbox.data_integrity_profile = local_inbox_data_integrity_profile - body.local_outbox.data_integrity_profile = local_outbox_data_integrity_profile - body.remote_outbox.data_integrity_profile = remote_outbox_data_integrity_profile - body.remote_inbox.data_integrity_profile = remote_inbox_data_integrity_profile + body.local_inbox.data.integrity_profile = local_inbox_data_integrity_profile + body.local_outbox.data.integrity_profile = local_outbox_data_integrity_profile + body.remote_outbox.data.integrity_profile = remote_outbox_data_integrity_profile + body.remote_inbox.data.integrity_profile = remote_inbox_data_integrity_profile return { "REMOTE_OUTBOX": { diff --git a/src/api/src/backend/views/http/etl.py b/src/api/src/backend/views/http/etl.py index 3130d975..33c2aba0 100644 --- a/src/api/src/backend/views/http/etl.py +++ b/src/api/src/backend/views/http/etl.py @@ -40,10 +40,19 @@ class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile): Field(discriminator="type") ] -class IOSystem(BaseModel): - data_path: str - data_integrity_profile: DataIntegrityProfile = None - manifests_path: str = None - exclude_pattern: str = None +class IOSystemProfile(BaseModel): + path: str include_pattern: str = None + exclude_pattern: str = None + +class DataProfile(IOSystemProfile): + integrity_profile: DataIntegrityProfile = None + +class ManifestsProfile(IOSystemProfile): + generation_policy: EnumManifestGenerationPolicy = None + priority: EnumManifestPriority = None + +class IOSystem(BaseModel): + data: DataProfile + manifests: ManifestsProfile diff --git a/src/api/src/backend/views/http/etlV2.py b/src/api/src/backend/views/http/etlV2.py deleted file mode 100644 index 440b7f0b..00000000 --- a/src/api/src/backend/views/http/etlV2.py +++ /dev/null @@ -1,66 +0,0 @@ -from enum import Enum -from typing import Union, Literal, Annotated, List - -from pydantic import BaseModel, Field - -from .requests import _EnumMeta - - -class EnumManifestGenerationPolicy(str, Enum, metaclass=_EnumMeta): - Manual = "manual" - AutoOnePerFile = "auto_one_per_file" - AutoOneForAll = "auto_one_for_all" - -class EnumManifestPriority(str, Enum, metaclass=_EnumMeta): - Oldest = "oldest" - Newest = "newest" - Any = "any" - -class BaseDataIntegrityProfile(BaseModel): - type: Literal["checksum", "byte_check", "done_file"] - -class ChecksumDataIntegrityProfile(BaseDataIntegrityProfile): - type: Literal["checksum"] - -class ByteCheckDataIntegrityProfile(BaseDataIntegrityProfile): - type: Literal["byte_check"] - -class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile): - type: Literal["done_file"] - done_files_path: str - include_pattern: str = None - exclude_pattern: str = None - -DataIntegrityProfile = Annotated[ - Union[ - ChecksumDataIntegrityProfile, - ByteCheckDataIntegrityProfile, - DoneFileDataIntegrityProfile - ], - Field(discriminator="type") -] - -class IOProfile(BaseModel): - data_path: str - data_integrity_profile: DataIntegrityProfile = None - manifests_path: str = None - exclude_pattern: str = None - include_pattern: str = None - -class ETLSystem(BaseModel): - writable: bool = True - ingress_profile: IOProfile - egress_profile: IOProfile - -class EnumXferAction(str, Enum, metaclass=_EnumMeta): - PUSH = "push" - PULL = "pull" - NOOP = "noop" - -class ETLCycle(BaseModel): - ingress: ETLSystem - egress: ETLSystem - xfer_action: EnumXferAction - transforms: List[object] - - \ No newline at end of file diff --git a/src/api/src/backend/views/http/tapis_etl.old.py b/src/api/src/backend/views/http/tapis_etl.old.py new file mode 100644 index 00000000..31c906ae --- /dev/null +++ b/src/api/src/backend/views/http/tapis_etl.old.py @@ -0,0 +1,90 @@ +from typing import List + +from pydantic import BaseModel, validator, Extra, root_validator, conlist + +from .requests import Pipeline + +from backend.views.http.etl import ( + EnumManifestGenerationPolicy, + EnumManifestPriority, + IOSystem +) + +class TapisIOBox(IOSystem): + writable_system_id: str = None + data_transfer_system_id: str = None + + @root_validator + def validate_system_ids(cls, values): + if ( + values.get("writable_system_id") == None + and values.get("data_transfer_system_id") == None + ): + raise ValueError("Must define one or both of the following properties: ['writable_system_id', 'data_transfer_system_id']") + + return values + +class TapisRemoteOutbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = None + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/REMOTE-OUTBOX/DATA" + manifests_path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS" + +class TapisLocalInbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/LOCAL-INBOX/DATA" + manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" + inbound_transfer_manifests_path: str = "/ETL/LOCAL-INBOX/" + +class TapisLocalOutbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/LOCAL-INBOX/DATA" + manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" + +class TapisRemoteInbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = None + manifest_priority: EnumManifestPriority = None + data_path: str = "/ETL/REMOTE-INBOX/DATA" + manifests_path: str = "/ETL/REMOTE-INBOX/MANFIFESTS" + + +class TapisJobDef(BaseModel): + pass + +class TapisJobWorkflowsETL(BaseModel): + input: str + output: str + +class TapisJobWorkflowsExtension(BaseModel): + etl: TapisJobWorkflowsETL + +class ExetendedTapisJob(TapisJobDef): + workflows: TapisJobWorkflowsExtension = None + + class Config: + extra = Extra.allow + +ListOfStrMinOneItem = conlist(str, min_items=1) + +class ActionFilter(BaseModel): + pipeline_ids: ListOfStrMinOneItem = None + run_async: bool = True + +class TapisETLPipeline(Pipeline): + before: ActionFilter = None + remote_outbox: TapisRemoteOutbox + local_inbox: TapisLocalInbox + jobs: List[ExetendedTapisJob] + local_outbox: TapisLocalOutbox + remote_inbox: TapisRemoteInbox + after: ActionFilter = None + + @validator("jobs") + def one_or_more_jobs(cls, value): + # Check that the pipeline contains at least 1 tapis job definition + if len(value) < 1: + raise ValueError("A Tapis ETL pipeline must contain at least 1 Tapis Job definition") + + return value \ No newline at end of file diff --git a/src/api/src/backend/views/http/tapis_etl.py b/src/api/src/backend/views/http/tapis_etl.py index e26b5852..20c2c3c9 100644 --- a/src/api/src/backend/views/http/tapis_etl.py +++ b/src/api/src/backend/views/http/tapis_etl.py @@ -1,54 +1,67 @@ from typing import List -from pydantic import BaseModel, validator, Extra, root_validator +from pydantic import BaseModel, validator, Extra, root_validator, conlist from .requests import Pipeline from backend.views.http.etl import ( EnumManifestGenerationPolicy, EnumManifestPriority, + DataProfile, + ManifestsProfile, IOSystem ) -class TapisIOBox(IOSystem): - writable_system_id: str = None - data_transfer_system_id: str = None +class TapisIOSystemProfile(BaseModel): + system_id: str - @root_validator - def validate_system_ids(cls, values): - if ( - values.get("writable_system_id") == None - and values.get("data_transfer_system_id") == None - ): - raise ValueError("Must define one or both of the following properties: ['writable_system_id', 'data_transfer_system_id']") +class RODataProfile(TapisIOSystemProfile, DataProfile): + path: str = "/ETL/REMOTE-OUTBOX/DATA" - return values +class ROManifestsProfile(TapisIOSystemProfile, ManifestsProfile): + path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS" + generation_policy: EnumManifestGenerationPolicy = None + priority: EnumManifestPriority = EnumManifestPriority.Oldest -class TapisRemoteOutbox(TapisIOBox): - manifest_generation_policy: EnumManifestGenerationPolicy = None - manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest - data_path: str = "/ETL/REMOTE-OUTBOX/DATA" - manifests_path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS" - -class TapisLocalInbox(TapisIOBox): - manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile - manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest - data_path: str = "/ETL/LOCAL-INBOX/DATA" - manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" - inbound_transfer_manifests_path: str = "/ETL/LOCAL-INBOX/" - -class TapisLocalOutbox(TapisIOBox): - manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll - manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest - data_path: str = "/ETL/LOCAL-INBOX/DATA" - manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" - -class TapisRemoteInbox(TapisIOBox): - manifest_generation_policy: EnumManifestGenerationPolicy = None - manifest_priority: EnumManifestPriority = None - data_path: str = "/ETL/REMOTE-INBOX/DATA" - manifests_path: str = "/ETL/REMOTE-INBOX/MANFIFESTS" +class RemoteOutbox(IOSystem): + data: RODataProfile + manifests: ROManifestsProfile +class LIDataProfile(TapisIOSystemProfile, DataProfile): + path: str = "/ETL/LOCAL-INBOX/DATA" + +class LIManifestsProfile(TapisIOSystemProfile, ManifestsProfile): + path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" + generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile + priority: EnumManifestPriority = EnumManifestPriority.Oldest + +class LocalInbox(IOSystem): + data: LIDataProfile + manifests: LIManifestsProfile + +class LODataProfile(TapisIOSystemProfile, DataProfile): + path: str = "/ETL/LOCAL-OUTBOX/DATA" + +class LOManifestsProfile(TapisIOSystemProfile, ManifestsProfile): + path: str = "/ETL/LOCAL-OUTBOX/MANIFESTS" + generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll + priority: EnumManifestPriority = EnumManifestPriority.Oldest + +class LocalOutbox(IOSystem): + data: LODataProfile + manifests: LOManifestsProfile + +class RIDataProfile(TapisIOSystemProfile, DataProfile): + path: str = "/ETL/REMOTE-INBOX/DATA" + +class RIManifestsProfile(TapisIOSystemProfile, ManifestsProfile): + path: str = "/ETL/REMOTE-INBOX/MANIFESTS" + generation_policy: EnumManifestGenerationPolicy = None + priority: EnumManifestPriority = None + +class RemoteInbox(IOSystem): + data: RIDataProfile + manifests: RIManifestsProfile class TapisJobDef(BaseModel): pass @@ -66,14 +79,20 @@ class ExetendedTapisJob(TapisJobDef): class Config: extra = Extra.allow +ListOfStrMinOneItem = conlist(str, min_items=1) + +class ActionFilter(BaseModel): + pipeline_ids: ListOfStrMinOneItem = None + run_async: bool = True + class TapisETLPipeline(Pipeline): - run_before: List[str] = [] - remote_outbox: TapisRemoteOutbox - local_inbox: TapisLocalInbox + before: ActionFilter = None + remote_outbox: RemoteOutbox + local_inbox: LocalInbox jobs: List[ExetendedTapisJob] - local_outbox: TapisLocalOutbox - remote_inbox: TapisRemoteInbox - run_after: List[str] = [] + local_outbox: LocalOutbox + remote_inbox: RemoteInbox + after: ActionFilter = None @validator("jobs") def one_or_more_jobs(cls, value): diff --git a/src/api/src/tests/TestTapisETLPipeline.py b/src/api/src/tests/TestTapisETLPipeline.py index 64ea1278..9d2660ba 100644 --- a/src/api/src/tests/TestTapisETLPipeline.py +++ b/src/api/src/tests/TestTapisETLPipeline.py @@ -22,10 +22,10 @@ def testETLEnvBuilderUtil(self): env = build_etl_pipeline_env(self.pipeline) assert type(env) == dict - assert json.loads(env.get("REMOTE_OUTBOX").get("value")).get("manifest_generation_policy") == None - assert json.loads(env.get("LOCAL_INBOX").get("value")).get("manifest_generation_policy") == "auto_one_per_file" - assert json.loads(env.get("LOCAL_OUTBOX").get("value")).get("manifest_generation_policy") == "auto_one_for_all" - assert json.loads(env.get("REMOTE_INBOX").get("value")).get("manifest_generation_policy") == None + assert json.loads(env.get("REMOTE_OUTBOX").get("value")).get("manifests").get("generation_policy") == "manual" + assert json.loads(env.get("LOCAL_INBOX").get("value")).get("manifests").get("generation_policy") == "auto_one_per_file" + assert json.loads(env.get("LOCAL_OUTBOX").get("value")).get("manifests").get("generation_policy") == "auto_one_for_all" + assert json.loads(env.get("REMOTE_INBOX").get("value")).get("manifests").get("generation_policy") == None if __name__ == "__main__": unittest.main() diff --git a/src/api/src/tests/fixtures/tapis-etl-pipeline.json b/src/api/src/tests/fixtures/tapis-etl-pipeline.json index 68e500f7..769aa936 100644 --- a/src/api/src/tests/fixtures/tapis-etl-pipeline.json +++ b/src/api/src/tests/fixtures/tapis-etl-pipeline.json @@ -1,39 +1,63 @@ { "id": "test-etl-ls6", + "before": {"pipeline_ids": ["test"]}, "remote_outbox": { - "writable_system_id": "test.etl.ls6.remote.outbox", - "data_transfer_system_id": "test.etl.ls6.remote.outbox.egress", - "data_path": "/ETL/REMOTE-OUTBOX/DATA", - "data_integrity_profile": { - "type": "done_file", - "done_files_path": "/ETL/REMOTE-OUTBOX/DATA", - "include_pattern": "*.md5" + "data": { + "system_id": "test.etl.ls6.xfer", + "path": "/ETL/REMOTE-OUTBOX/DATA", + "integrity_profile": { + "type": "done_file", + "done_files_path": "/ETL/REMOTE-OUTBOX/DATA", + "include_pattern": "*.md5", + "exclude_pattern": null + }, + "include_pattern": "*.txt", + "exclude_pattern": null }, - "manifest_geration_policy": "manual", - "manifest_priority": "oldest", - "manifests_path": "/ETL/REMOTE-OUTBOX/MANIFESTS", - "exclude_pattern": "*.md5" + "manifests": { + "system_id": "test.etl.ls6.writable", + "generation_policy": "manual", + "priority": "oldest", + "path": "/ETL/REMOTE-OUTBOX/MANIFESTS", + "include_pattern": null, + "exclude_pattern": null + } }, "local_inbox": { - "writable_system_id": "test.etl.ls6.local.inbox", - "data_transfer_system_id": "test.etl.ls6.local.inbox.ingress", - "data_path": "/ETL/LOCAL-INBOX/DATA", - "inbound_transfer_manifests_path": "ETL/LOCAL-INBOX/INBOUND-TRANSER-MANIFESTS", - "manifests_path": "/ETL/LOCAL-INBOX/MANIFESTS", - "exclude_pattern": "*.md5" + "data": { + "system_id": "test.etl.ls6.xfer", + "path": "/ETL/LOCAL-INBOX/DATA", + "include_pattern": null, + "exclude_pattern": null + }, + "manifests": { + "system_id": "test.etl.ls6.writable", + "path": "/ETL/LOCAL-INBOX/MANIFESTS", + "include_pattern": null, + "exclude_pattern": null + } }, "local_outbox": { - "writable_system_id": "test.etl.ls6.local.outbox", - "data_transfer_system_id": "test.etl.ls6.local.outbox", - "data_path": "/ETL/LOCAL-OUTBOX/DATA", - "manifests_path": "/ETL/LOCAL-OUTBOX/MANIFESTS" + "data": { + "system_id": "test.etl.ls6.xfer", + "path": "/ETL/LOCAL-OUTBOX/DATA" + }, + "manifests": { + "system_id": "test.etl.ls6.writable", + "path": "/ETL/LOCAL-OUTBOX/MANIFESTS" + } }, "remote_inbox": { - "writable_system_id": "test.etl.ls6.remote.inbox", - "data_transfer_system_id": "test.etl.ls6.remote.inbox.egress", - "data_path": "/ETL/REMOTE-INBOX/DATA", - "manifests_path": "/ETL/REMOTE-INBOX/MANIFESTS" + "data": { + "system_id": "test.etl.ls6.xfer", + "path": "/ETL/REMOTE-INBOX/DATA" + }, + "manifests": { + "system_id": "test.etl.ls6.writable", + "path": "/ETL/REMOTE-INBOX/MANIFESTS" + } }, + "after": null, "jobs": [ { "name": "string-transform",