Skip to content

Commit

Permalink
New ETL schema
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Feb 26, 2024
1 parent 5a2f404 commit 3c87dd0
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 154 deletions.
24 changes: 12 additions & 12 deletions src/api/src/backend/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ def build_etl_pipeline_env(body):
# Convert the data integrity policies to dicts. Easier
# to handle for null values via .get
local_inbox_data_integrity_profile = {}
if getattr(body.local_inbox, "data_integrity_profile", None) != None:
local_inbox_data_integrity_profile = body.local_inbox.data_integrity_profile.dict()
if getattr(body.local_inbox.data, "integrity_profile", None) != None:
local_inbox_data_integrity_profile = body.local_inbox.data.integrity_profile.dict()

local_outbox_data_integrity_profile = {}
if getattr(body.local_outbox, "data_integrity_profile", None) != None:
local_outbox_data_integrity_profile = body.local_outbox.data_integrity_profile.dict()
if getattr(body.local_outbox.data, "integrity_profile", None) != None:
local_outbox_data_integrity_profile = body.local_outbox.data.integrity_profile.dict()

remote_inbox_data_integrity_profile = {}
if getattr(body.remote_inbox, "data_integrity_profile", None) != None:
remote_inbox_data_integrity_profile = body.remote_inbox.data_integrity_profile.dict()
if getattr(body.remote_inbox.data, "integrity_profile", None) != None:
remote_inbox_data_integrity_profile = body.remote_inbox.data.integrity_profile.dict()

remote_outbox_data_integrity_profile = {}
if getattr(body.remote_outbox, "data_integrity_profile", None) != None:
remote_outbox_data_integrity_profile = body.remote_outbox.data_integrity_profile.dict()
if getattr(body.remote_outbox.data, "integrity_profile", None) != None:
remote_outbox_data_integrity_profile = body.remote_outbox.data.integrity_profile.dict()

body.local_inbox.data_integrity_profile = local_inbox_data_integrity_profile
body.local_outbox.data_integrity_profile = local_outbox_data_integrity_profile
body.remote_outbox.data_integrity_profile = remote_outbox_data_integrity_profile
body.remote_inbox.data_integrity_profile = remote_inbox_data_integrity_profile
body.local_inbox.data.integrity_profile = local_inbox_data_integrity_profile
body.local_outbox.data.integrity_profile = local_outbox_data_integrity_profile
body.remote_outbox.data.integrity_profile = remote_outbox_data_integrity_profile
body.remote_inbox.data.integrity_profile = remote_inbox_data_integrity_profile

return {
"REMOTE_OUTBOX": {
Expand Down
19 changes: 14 additions & 5 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@ class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile):
Field(discriminator="type")
]

class IOSystem(BaseModel):
data_path: str
data_integrity_profile: DataIntegrityProfile = None
manifests_path: str = None
exclude_pattern: str = None
class IOSystemProfile(BaseModel):
path: str
include_pattern: str = None
exclude_pattern: str = None

class DataProfile(IOSystemProfile):
integrity_profile: DataIntegrityProfile = None

class ManifestsProfile(IOSystemProfile):
generation_policy: EnumManifestGenerationPolicy = None
priority: EnumManifestPriority = None

class IOSystem(BaseModel):
data: DataProfile
manifests: ManifestsProfile

66 changes: 0 additions & 66 deletions src/api/src/backend/views/http/etlV2.py

This file was deleted.

90 changes: 90 additions & 0 deletions src/api/src/backend/views/http/tapis_etl.old.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import List

from pydantic import BaseModel, validator, Extra, root_validator, conlist

from .requests import Pipeline

from backend.views.http.etl import (
EnumManifestGenerationPolicy,
EnumManifestPriority,
IOSystem
)

class TapisIOBox(IOSystem):
writable_system_id: str = None
data_transfer_system_id: str = None

@root_validator
def validate_system_ids(cls, values):
if (
values.get("writable_system_id") == None
and values.get("data_transfer_system_id") == None
):
raise ValueError("Must define one or both of the following properties: ['writable_system_id', 'data_transfer_system_id']")

return values

class TapisRemoteOutbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = None
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/REMOTE-OUTBOX/DATA"
manifests_path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS"

class TapisLocalInbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/LOCAL-INBOX/DATA"
manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS"
inbound_transfer_manifests_path: str = "/ETL/LOCAL-INBOX/"

class TapisLocalOutbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/LOCAL-INBOX/DATA"
manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS"

class TapisRemoteInbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = None
manifest_priority: EnumManifestPriority = None
data_path: str = "/ETL/REMOTE-INBOX/DATA"
manifests_path: str = "/ETL/REMOTE-INBOX/MANFIFESTS"


class TapisJobDef(BaseModel):
pass

class TapisJobWorkflowsETL(BaseModel):
input: str
output: str

class TapisJobWorkflowsExtension(BaseModel):
etl: TapisJobWorkflowsETL

class ExetendedTapisJob(TapisJobDef):
workflows: TapisJobWorkflowsExtension = None

class Config:
extra = Extra.allow

ListOfStrMinOneItem = conlist(str, min_items=1)

class ActionFilter(BaseModel):
pipeline_ids: ListOfStrMinOneItem = None
run_async: bool = True

class TapisETLPipeline(Pipeline):
before: ActionFilter = None
remote_outbox: TapisRemoteOutbox
local_inbox: TapisLocalInbox
jobs: List[ExetendedTapisJob]
local_outbox: TapisLocalOutbox
remote_inbox: TapisRemoteInbox
after: ActionFilter = None

@validator("jobs")
def one_or_more_jobs(cls, value):
# Check that the pipeline contains at least 1 tapis job definition
if len(value) < 1:
raise ValueError("A Tapis ETL pipeline must contain at least 1 Tapis Job definition")

return value
103 changes: 61 additions & 42 deletions src/api/src/backend/views/http/tapis_etl.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,67 @@
from typing import List

from pydantic import BaseModel, validator, Extra, root_validator
from pydantic import BaseModel, validator, Extra, root_validator, conlist

from .requests import Pipeline

from backend.views.http.etl import (
EnumManifestGenerationPolicy,
EnumManifestPriority,
DataProfile,
ManifestsProfile,
IOSystem
)

class TapisIOBox(IOSystem):
writable_system_id: str = None
data_transfer_system_id: str = None
class TapisIOSystemProfile(BaseModel):
system_id: str

@root_validator
def validate_system_ids(cls, values):
if (
values.get("writable_system_id") == None
and values.get("data_transfer_system_id") == None
):
raise ValueError("Must define one or both of the following properties: ['writable_system_id', 'data_transfer_system_id']")
class RODataProfile(TapisIOSystemProfile, DataProfile):
path: str = "/ETL/REMOTE-OUTBOX/DATA"

return values
class ROManifestsProfile(TapisIOSystemProfile, ManifestsProfile):
path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS"
generation_policy: EnumManifestGenerationPolicy = None
priority: EnumManifestPriority = EnumManifestPriority.Oldest

class TapisRemoteOutbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = None
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/REMOTE-OUTBOX/DATA"
manifests_path: str = "/ETL/REMOTE-OUTBOX/MANFIFESTS"

class TapisLocalInbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/LOCAL-INBOX/DATA"
manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS"
inbound_transfer_manifests_path: str = "/ETL/LOCAL-INBOX/"

class TapisLocalOutbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
data_path: str = "/ETL/LOCAL-INBOX/DATA"
manifests_path: str = "/ETL/LOCAL-INBOX/MANFIFESTS"

class TapisRemoteInbox(TapisIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = None
manifest_priority: EnumManifestPriority = None
data_path: str = "/ETL/REMOTE-INBOX/DATA"
manifests_path: str = "/ETL/REMOTE-INBOX/MANFIFESTS"
class RemoteOutbox(IOSystem):
data: RODataProfile
manifests: ROManifestsProfile

class LIDataProfile(TapisIOSystemProfile, DataProfile):
path: str = "/ETL/LOCAL-INBOX/DATA"

class LIManifestsProfile(TapisIOSystemProfile, ManifestsProfile):
path: str = "/ETL/LOCAL-INBOX/MANFIFESTS"
generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile
priority: EnumManifestPriority = EnumManifestPriority.Oldest

class LocalInbox(IOSystem):
data: LIDataProfile
manifests: LIManifestsProfile

class LODataProfile(TapisIOSystemProfile, DataProfile):
path: str = "/ETL/LOCAL-OUTBOX/DATA"

class LOManifestsProfile(TapisIOSystemProfile, ManifestsProfile):
path: str = "/ETL/LOCAL-OUTBOX/MANIFESTS"
generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll
priority: EnumManifestPriority = EnumManifestPriority.Oldest

class LocalOutbox(IOSystem):
data: LODataProfile
manifests: LOManifestsProfile

class RIDataProfile(TapisIOSystemProfile, DataProfile):
path: str = "/ETL/REMOTE-INBOX/DATA"

class RIManifestsProfile(TapisIOSystemProfile, ManifestsProfile):
path: str = "/ETL/REMOTE-INBOX/MANIFESTS"
generation_policy: EnumManifestGenerationPolicy = None
priority: EnumManifestPriority = None

class RemoteInbox(IOSystem):
data: RIDataProfile
manifests: RIManifestsProfile

class TapisJobDef(BaseModel):
pass
Expand All @@ -66,14 +79,20 @@ class ExetendedTapisJob(TapisJobDef):
class Config:
extra = Extra.allow

ListOfStrMinOneItem = conlist(str, min_items=1)

class ActionFilter(BaseModel):
pipeline_ids: ListOfStrMinOneItem = None
run_async: bool = True

class TapisETLPipeline(Pipeline):
run_before: List[str] = []
remote_outbox: TapisRemoteOutbox
local_inbox: TapisLocalInbox
before: ActionFilter = None
remote_outbox: RemoteOutbox
local_inbox: LocalInbox
jobs: List[ExetendedTapisJob]
local_outbox: TapisLocalOutbox
remote_inbox: TapisRemoteInbox
run_after: List[str] = []
local_outbox: LocalOutbox
remote_inbox: RemoteInbox
after: ActionFilter = None

@validator("jobs")
def one_or_more_jobs(cls, value):
Expand Down
8 changes: 4 additions & 4 deletions src/api/src/tests/TestTapisETLPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def testETLEnvBuilderUtil(self):
env = build_etl_pipeline_env(self.pipeline)

assert type(env) == dict
assert json.loads(env.get("REMOTE_OUTBOX").get("value")).get("manifest_generation_policy") == None
assert json.loads(env.get("LOCAL_INBOX").get("value")).get("manifest_generation_policy") == "auto_one_per_file"
assert json.loads(env.get("LOCAL_OUTBOX").get("value")).get("manifest_generation_policy") == "auto_one_for_all"
assert json.loads(env.get("REMOTE_INBOX").get("value")).get("manifest_generation_policy") == None
assert json.loads(env.get("REMOTE_OUTBOX").get("value")).get("manifests").get("generation_policy") == "manual"
assert json.loads(env.get("LOCAL_INBOX").get("value")).get("manifests").get("generation_policy") == "auto_one_per_file"
assert json.loads(env.get("LOCAL_OUTBOX").get("value")).get("manifests").get("generation_policy") == "auto_one_for_all"
assert json.loads(env.get("REMOTE_INBOX").get("value")).get("manifests").get("generation_policy") == None

if __name__ == "__main__":
unittest.main()
Expand Down
Loading

0 comments on commit 3c87dd0

Please sign in to comment.