diff --git a/src/api/src/backend/utils/__init__.py b/src/api/src/backend/utils/__init__.py index ecc90063..13e9a1bb 100644 --- a/src/api/src/backend/utils/__init__.py +++ b/src/api/src/backend/utils/__init__.py @@ -41,223 +41,28 @@ def build_etl_pipeline_env(body): remote_outbox_data_integrity_profile = {} if getattr(body.remote_outbox, "data_integrity_profile", None) != None: remote_outbox_data_integrity_profile = body.remote_outbox.data_integrity_profile.dict() - - return { - "REMOTE_OUTBOX": json.dumps(body.remote_outbox.dict()), - "LOCAL_INBOX": json.dumps(body.local_inbox.dict()), - "LOCAL_OUTBOX": json.dumps(body.local_outbox.dict()), - "REMOTE_INBOX": json.dumps(body.remote_inbox.dict()), - "REMOTE_OUTBOX_SYSTEM_ID": { - "type": "string", - "value": body.remote_outbox.system_id - }, - "REMOTE_OUTBOX_DATA_PATH": { - "type": "string", - "value": body.remote_outbox.data_path - }, - "REMOTE_OUTBOX_INCLUDE_PATTERN": { - "type": "string", - "value": body.remote_outbox.include_pattern - }, - "REMOTE_OUTBOX_EXCLUDE_PATTERN": { - "type": "string", - "value": body.remote_outbox.exclude_pattern - }, - "REMOTE_OUTBOX_MANIFESTS_PATH": { - "type": "string", - "value": body.remote_outbox.manifests_path - }, - "REMOTE_OUTBOX_MANIFEST_GENERATION_POLICY": { - "type": "string", - "value": body.remote_outbox.manifest_generation_policy - }, - "REMOTE_OUTBOX_MANIFEST_PRIORITY": { - "type": "string", - "value": body.remote_outbox.manifest_priority - }, - "REMOTE_OUTBOX_DATA_INTEGRITY_TYPE": { - "type": "string", - "value": remote_outbox_data_integrity_profile.get( - "type", - None - ) - }, - "REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILES_PATH": { - "type": "string", - "value": remote_outbox_data_integrity_profile.get( - "done_files_path", - None - ) - }, - "REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": { - "type": "string", - "value": remote_outbox_data_integrity_profile.get( - "include_pattern", - None - ) - }, - "REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": { - "type": "string", - "value": remote_outbox_data_integrity_profile.get( - "exclude_pattern", - None - ) - }, + body.local_inbox.data_integrity_profile = local_inbox_data_integrity_profile + body.local_outbox.data_integrity_profile = local_outbox_data_integrity_profile + body.remote_outbox.data_integrity_profile = remote_outbox_data_integrity_profile + body.remote_inbox.data_integrity_profile = remote_inbox_data_integrity_profile - "LOCAL_INBOX_SYSTEM_ID": { - "type": "string", - "value": body.local_inbox.system_id - }, - "LOCAL_INBOX_DATA_PATH": { - "type": "string", - "value": body.local_inbox.data_path - }, - "LOCAL_INBOX_INCLUDE_PATTERN": { - "type": "string", - "value": body.local_inbox.include_pattern - }, - "LOCAL_INBOX_EXCLUDE_PATTERN": { - "type": "string", - "value": body.local_inbox.exclude_pattern - }, - "LOCAL_INBOX_MANIFESTS_PATH": { - "type": "string", - "value": body.local_inbox.manifests_path - }, - "LOCAL_INBOX_DATA_INTEGRITY_TYPE": { - "type": "string", - "value": local_inbox_data_integrity_profile.get( - "type", - None - ) - }, - "LOCAL_INBOX_DATA_INTEGRITY_DONE_FILES_PATH": { - "type": "string", - "value": local_inbox_data_integrity_profile.get( - "done_files_path", - None - ) - }, - "LOCAL_INBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": { - "type": "string", - "value": local_inbox_data_integrity_profile.get( - "include_pattern", - None - ) - }, - "LOCAL_INBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": { - "type": "string", - "value": local_inbox_data_integrity_profile.get( - "exclude_pattern", - None - ) - }, - - "LOCAL_OUTBOX_SYSTEM_ID": { - "type": "string", - "value": body.local_outbox.system_id - }, - "LOCAL_OUTBOX_DATA_PATH": { - "type": "string", - "value": body.local_outbox.data_path - }, - "LOCAL_OUTBOX_INCLUDE_PATTERN": { - "type": "string", - "value": body.local_outbox.include_pattern - }, - "LOCAL_OUTBOX_EXCLUDE_PATTERN": { - "type": "string", - "value": body.local_outbox.exclude_pattern - }, - "LOCAL_OUTBOX_MANIFESTS_PATH": { - "type": "string", - "value": body.local_outbox.manifests_path - }, - "LOCAL_OUTBOX_MANIFEST_GENERATION_POLICY": { - "type": "string", - "value": body.local_outbox.manifest_generation_policy - }, - "LOCAL_OUTBOX_MANIFEST_PRIORITY": { - "type": "string", - "value": body.local_outbox.manifest_priority - }, - "LOCAL_OUTBOX_DATA_INTEGRITY_TYPE": { - "type": "string", - "value": local_outbox_data_integrity_profile.get( - "type", - None - ) - }, - "LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILES_PATH": { - "type": "string", - "value": local_outbox_data_integrity_profile.get( - "done_files_path", - None - ) - }, - "LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": { - "type": "string", - "value": local_outbox_data_integrity_profile.get( - "include_pattern", - None - ) - }, - "LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": { - "type": "string", - "value": local_outbox_data_integrity_profile.get( - "exclude_pattern", - None - ) - }, - - "REMOTE_INBOX_SYSTEM_ID": { - "type": "string", - "value": body.remote_inbox.system_id - }, - "REMOTE_INBOX_DATA_PATH": { - "type": "string", - "value": body.remote_inbox.data_path - }, - "REMOTE_INBOX_INCLUDE_PATTERN": { - "type": "string", - "value": body.remote_inbox.include_pattern - }, - "REMOTE_INBOX_EXCLUDE_PATTERN": { - "type": "string", - "value": body.remote_inbox.exclude_pattern - }, - "REMOTE_INBOX_MANIFESTS_PATH": { - "type": "string", - "value": body.remote_inbox.manifests_path - }, - "REMOTE_INBOX_DATA_INTEGRITY_TYPE": { + return { + "REMOTE_OUTBOX": { "type": "string", - "value": remote_inbox_data_integrity_profile.get( - "type", - None - ) + "value": json.dumps(body.remote_outbox.dict()) }, - "REMOTE_INBOX_DATA_INTEGRITY_DONE_FILES_PATH": { + "LOCAL_INBOX": { "type": "string", - "value": remote_inbox_data_integrity_profile.get( - "done_files_path", - None - ) + "value": json.dumps(body.local_inbox.dict()) }, - "REMOTE_INBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": { + "LOCAL_OUTBOX": { "type": "string", - "value": remote_inbox_data_integrity_profile.get( - "include_pattern", - None - ) + "value": json.dumps(body.local_outbox.dict()) }, - "REMOTE_INBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": { + "REMOTE_INBOX": { "type": "string", - "value": remote_inbox_data_integrity_profile.get( - "exclude_pattern", - None - ) + "value": json.dumps(body.remote_inbox.dict()) }, } diff --git a/src/api/src/backend/views/http/etl.py b/src/api/src/backend/views/http/etl.py index 2326441b..d956b85b 100644 --- a/src/api/src/backend/views/http/etl.py +++ b/src/api/src/backend/views/http/etl.py @@ -40,31 +40,11 @@ class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile): Field(discriminator="type") ] -class IOBox(BaseModel): +class IOSystem(BaseModel): + writable: bool = True data_path: str data_integrity_profile: DataIntegrityProfile = None manifests_path: str = None exclude_pattern: str = None include_pattern: str = None -class DataEgress(IOBox): - manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile - manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest - -class DataIngress(IOBox): - pass - -class RemoteOutbox(DataEgress): - manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile - manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest - -class LocalInbox(DataIngress): - pass - -class LocalOutbox(DataEgress): - manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll - manifest_priority: EnumManifestPriority = None - -class RemoteInbox(DataIngress): - pass - diff --git a/src/api/src/backend/views/http/etlV2.py b/src/api/src/backend/views/http/etlV2.py new file mode 100644 index 00000000..440b7f0b --- /dev/null +++ b/src/api/src/backend/views/http/etlV2.py @@ -0,0 +1,66 @@ +from enum import Enum +from typing import Union, Literal, Annotated, List + +from pydantic import BaseModel, Field + +from .requests import _EnumMeta + + +class EnumManifestGenerationPolicy(str, Enum, metaclass=_EnumMeta): + Manual = "manual" + AutoOnePerFile = "auto_one_per_file" + AutoOneForAll = "auto_one_for_all" + +class EnumManifestPriority(str, Enum, metaclass=_EnumMeta): + Oldest = "oldest" + Newest = "newest" + Any = "any" + +class BaseDataIntegrityProfile(BaseModel): + type: Literal["checksum", "byte_check", "done_file"] + +class ChecksumDataIntegrityProfile(BaseDataIntegrityProfile): + type: Literal["checksum"] + +class ByteCheckDataIntegrityProfile(BaseDataIntegrityProfile): + type: Literal["byte_check"] + +class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile): + type: Literal["done_file"] + done_files_path: str + include_pattern: str = None + exclude_pattern: str = None + +DataIntegrityProfile = Annotated[ + Union[ + ChecksumDataIntegrityProfile, + ByteCheckDataIntegrityProfile, + DoneFileDataIntegrityProfile + ], + Field(discriminator="type") +] + +class IOProfile(BaseModel): + data_path: str + data_integrity_profile: DataIntegrityProfile = None + manifests_path: str = None + exclude_pattern: str = None + include_pattern: str = None + +class ETLSystem(BaseModel): + writable: bool = True + ingress_profile: IOProfile + egress_profile: IOProfile + +class EnumXferAction(str, Enum, metaclass=_EnumMeta): + PUSH = "push" + PULL = "pull" + NOOP = "noop" + +class ETLCycle(BaseModel): + ingress: ETLSystem + egress: ETLSystem + xfer_action: EnumXferAction + transforms: List[object] + + \ No newline at end of file diff --git a/src/api/src/backend/views/http/tapis_etl.py b/src/api/src/backend/views/http/tapis_etl.py index d055f56f..0e4958ef 100644 --- a/src/api/src/backend/views/http/tapis_etl.py +++ b/src/api/src/backend/views/http/tapis_etl.py @@ -5,27 +5,44 @@ from .requests import Pipeline from backend.views.http.etl import ( - LocalInbox, - LocalOutbox, - RemoteInbox, - RemoteOutbox + EnumManifestGenerationPolicy, + EnumManifestPriority, + IOSystem ) + - -class TapisIOBox(BaseModel): +class TapisIOBox(IOSystem): system_id: str -class TapisRemoteInbox(TapisIOBox, RemoteInbox): - pass +class TapisLocalIOBox(IOSystem): + writable_system_id: str + data_transfer_system_id: str -class TapisLocalInbox(TapisIOBox, LocalInbox): - pass +class TapisRemoteOutbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = None + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/REMOTE-OUTBOX/DATA" + manifests_path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS" -class TapisLocalOutbox(TapisIOBox, LocalOutbox): - pass +class TapisLocalInbox(TapisLocalIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/LOCAL-INBOX/DATA" + manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" + inbound_transfer_manifests_path: str = "/ETL/LOCAL-INBOX/" + +class TapisLocalOutbox(TapisLocalIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll + manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest + data_path: str = "/ETL/LOCAL-INBOX/DATA" + manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS" + +class TapisRemoteInbox(TapisIOBox): + manifest_generation_policy: EnumManifestGenerationPolicy = None + manifest_priority: EnumManifestPriority = None + data_path: str = "/ETL/REMOTE-INBOX/DATA" + manifests_path: str = "/ETL/REMOTE-INBOX/MANFIFESTS" -class TapisRemoteOutbox(TapisIOBox, RemoteOutbox): - pass class TapisJobDef(BaseModel): pass @@ -44,11 +61,13 @@ class Config: extra = Extra.allow class TapisETLPipeline(Pipeline): + run_before: List[str] = [] remote_outbox: TapisRemoteOutbox local_inbox: TapisLocalInbox jobs: List[ExetendedTapisJob] local_outbox: TapisLocalOutbox remote_inbox: TapisRemoteInbox + run_after: List[str] = [] @validator("jobs") def one_or_more_jobs(cls, value): diff --git a/src/api/src/tests/TestImports.py b/src/api/src/tests/TestImports.py index 20d0cb8c..15d690f0 100644 --- a/src/api/src/tests/TestImports.py +++ b/src/api/src/tests/TestImports.py @@ -3,15 +3,10 @@ class TestImports(unittest.TestCase): def testETLImports(self): - from backend.views.http.tapis_etl import ( - RemoteInbox, - RemoteOutbox, - LocalInbox, - LocalOutbox - ) + import backend.views.http.etl def testTapisETLImports(self): - from backend.views.http.tapis_etl import TapisETLPipeline + import backend.views.http.tapis_etl if __name__ == "__main__": unittest.main() diff --git a/src/api/src/tests/TestTapisETLPipeline.py b/src/api/src/tests/TestTapisETLPipeline.py index 1db2d2b1..64ea1278 100644 --- a/src/api/src/tests/TestTapisETLPipeline.py +++ b/src/api/src/tests/TestTapisETLPipeline.py @@ -13,21 +13,19 @@ def testInstance(self): assert type(self.pipeline) == TapisETLPipeline def testIOBoxToDictSerialization(self): - serialized = { - "REMOTE_OUTBOX": json.dumps(self.pipeline.remote_outbox.dict()), - "LOCAL_INBOX": json.dumps(self.pipeline.local_inbox.dict()), - "LOCAL_OUTBOX": json.dumps(self.pipeline.local_outbox.dict()), - "REMOTE_INBOX": json.dumps(self.pipeline.remote_inbox.dict()), - } + json.dumps(self.pipeline.remote_outbox.dict()) + json.dumps(self.pipeline.local_inbox.dict()) + json.dumps(self.pipeline.local_outbox.dict()) + json.dumps(self.pipeline.remote_inbox.dict()) def testETLEnvBuilderUtil(self): env = build_etl_pipeline_env(self.pipeline) assert type(env) == dict - assert env.get("REMOTE_OUTBOX_MANIFEST_GENERATION_POLICY").get("value") == "auto_one_per_file" - assert env.get("LOCAL_INBOX_MANIFEST_GENERATION_POLICY") == None - assert env.get("LOCAL_OUTBOX_MANIFEST_GENERATION_POLICY").get("value") == "auto_one_for_all" - assert env.get("REMOTE_INBOX_MANIFEST_GENERATION_POLICY") == None + assert json.loads(env.get("REMOTE_OUTBOX").get("value")).get("manifest_generation_policy") == None + assert json.loads(env.get("LOCAL_INBOX").get("value")).get("manifest_generation_policy") == "auto_one_per_file" + assert json.loads(env.get("LOCAL_OUTBOX").get("value")).get("manifest_generation_policy") == "auto_one_for_all" + assert json.loads(env.get("REMOTE_INBOX").get("value")).get("manifest_generation_policy") == None if __name__ == "__main__": unittest.main() diff --git a/src/api/src/tests/fixtures/tapis-etl-pipeline.json b/src/api/src/tests/fixtures/tapis-etl-pipeline.json index 68f13cfb..849d49cf 100644 --- a/src/api/src/tests/fixtures/tapis-etl-pipeline.json +++ b/src/api/src/tests/fixtures/tapis-etl-pipeline.json @@ -1,32 +1,38 @@ { "id": "test-etl-ls6", "remote_outbox": { - "system_id": "test.etl.ls6.remote.inbox", - "data_path": "/ETL/REMOTE-INBOX/DATA", + "system_id": "test.etl.ls6.remote.outbox", + "data_path": "/ETL/REMOTE-BOX/DATA", "data_integrity_profile": { "type": "done_file", - "done_files_path": "/ETL/REMOTE-INBOX/DATA", + "done_files_path": "/ETL/LOCAL-INBOX/DATA", "include_pattern": "*.md5" }, - "manifests_path": "/ETL/REMOTE-INBOX/MANIFESTS", + "manifests_path": "/ETL/REMOTE-OUTBOX/MANIFESTS", "exclude_pattern": "*.md5" }, "local_inbox": { - "system_id": "test.etl.ls6.local.inbox", + "writable_system_id": "test.etl.ls6.local.inbox", + "data_transfer_system_id": "test.etl.ls6.local.inbox", "data_path": "/ETL/LOCAL-INBOX/DATA", - "data_integrity_profile": null, + "data_integrity_profile": { + "type": "done_file", + "done_files_path": "/ETL/LOCAL-INBOX/DATA", + "include_pattern": "*.md5" + }, "manifests_path": "/ETL/LOCAL-INBOX/MANIFESTS", "exclude_pattern": "*.md5" }, "local_outbox": { - "system_id": "test.etl.ls6.local.outbox", + "writable_system_id": "test.etl.ls6.local.outbox", + "data_transfer_system_id": "test.etl.ls6.local.outbox", "data_path": "/ETL/LOCAL-OUTBOX/DATA", "manifests_path": "/ETL/LOCAL-OUTBOX/MANIFESTS" }, "remote_inbox": { - "system_id": "test.etl.ls6.remote.inbox", "data_path": "/ETL/REMOTE-INBOX/DATA", - "manifests_path": "/ETL/REMOTE-INBOX/MANIFESTS" + "manifests_path": "/ETL/REMOTE-INBOX/MANIFESTS", + "system_id": "test.etl.ls6.remote.inbox" }, "jobs": [ {