Skip to content

Commit

Permalink
Add unit tests. Update etl schema
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Feb 23, 2024
1 parent f076491 commit 29da043
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 270 deletions.
221 changes: 13 additions & 208 deletions src/api/src/backend/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,223 +41,28 @@ def build_etl_pipeline_env(body):
remote_outbox_data_integrity_profile = {}
if getattr(body.remote_outbox, "data_integrity_profile", None) != None:
remote_outbox_data_integrity_profile = body.remote_outbox.data_integrity_profile.dict()

return {
"REMOTE_OUTBOX": json.dumps(body.remote_outbox.dict()),
"LOCAL_INBOX": json.dumps(body.local_inbox.dict()),
"LOCAL_OUTBOX": json.dumps(body.local_outbox.dict()),
"REMOTE_INBOX": json.dumps(body.remote_inbox.dict()),

"REMOTE_OUTBOX_SYSTEM_ID": {
"type": "string",
"value": body.remote_outbox.system_id
},
"REMOTE_OUTBOX_DATA_PATH": {
"type": "string",
"value": body.remote_outbox.data_path
},
"REMOTE_OUTBOX_INCLUDE_PATTERN": {
"type": "string",
"value": body.remote_outbox.include_pattern
},
"REMOTE_OUTBOX_EXCLUDE_PATTERN": {
"type": "string",
"value": body.remote_outbox.exclude_pattern
},
"REMOTE_OUTBOX_MANIFESTS_PATH": {
"type": "string",
"value": body.remote_outbox.manifests_path
},
"REMOTE_OUTBOX_MANIFEST_GENERATION_POLICY": {
"type": "string",
"value": body.remote_outbox.manifest_generation_policy
},
"REMOTE_OUTBOX_MANIFEST_PRIORITY": {
"type": "string",
"value": body.remote_outbox.manifest_priority
},
"REMOTE_OUTBOX_DATA_INTEGRITY_TYPE": {
"type": "string",
"value": remote_outbox_data_integrity_profile.get(
"type",
None
)
},
"REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILES_PATH": {
"type": "string",
"value": remote_outbox_data_integrity_profile.get(
"done_files_path",
None
)
},
"REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": {
"type": "string",
"value": remote_outbox_data_integrity_profile.get(
"include_pattern",
None
)
},
"REMOTE_OUTBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": {
"type": "string",
"value": remote_outbox_data_integrity_profile.get(
"exclude_pattern",
None
)
},
body.local_inbox.data_integrity_profile = local_inbox_data_integrity_profile
body.local_outbox.data_integrity_profile = local_outbox_data_integrity_profile
body.remote_outbox.data_integrity_profile = remote_outbox_data_integrity_profile
body.remote_inbox.data_integrity_profile = remote_inbox_data_integrity_profile

"LOCAL_INBOX_SYSTEM_ID": {
"type": "string",
"value": body.local_inbox.system_id
},
"LOCAL_INBOX_DATA_PATH": {
"type": "string",
"value": body.local_inbox.data_path
},
"LOCAL_INBOX_INCLUDE_PATTERN": {
"type": "string",
"value": body.local_inbox.include_pattern
},
"LOCAL_INBOX_EXCLUDE_PATTERN": {
"type": "string",
"value": body.local_inbox.exclude_pattern
},
"LOCAL_INBOX_MANIFESTS_PATH": {
"type": "string",
"value": body.local_inbox.manifests_path
},
"LOCAL_INBOX_DATA_INTEGRITY_TYPE": {
"type": "string",
"value": local_inbox_data_integrity_profile.get(
"type",
None
)
},
"LOCAL_INBOX_DATA_INTEGRITY_DONE_FILES_PATH": {
"type": "string",
"value": local_inbox_data_integrity_profile.get(
"done_files_path",
None
)
},
"LOCAL_INBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": {
"type": "string",
"value": local_inbox_data_integrity_profile.get(
"include_pattern",
None
)
},
"LOCAL_INBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": {
"type": "string",
"value": local_inbox_data_integrity_profile.get(
"exclude_pattern",
None
)
},

"LOCAL_OUTBOX_SYSTEM_ID": {
"type": "string",
"value": body.local_outbox.system_id
},
"LOCAL_OUTBOX_DATA_PATH": {
"type": "string",
"value": body.local_outbox.data_path
},
"LOCAL_OUTBOX_INCLUDE_PATTERN": {
"type": "string",
"value": body.local_outbox.include_pattern
},
"LOCAL_OUTBOX_EXCLUDE_PATTERN": {
"type": "string",
"value": body.local_outbox.exclude_pattern
},
"LOCAL_OUTBOX_MANIFESTS_PATH": {
"type": "string",
"value": body.local_outbox.manifests_path
},
"LOCAL_OUTBOX_MANIFEST_GENERATION_POLICY": {
"type": "string",
"value": body.local_outbox.manifest_generation_policy
},
"LOCAL_OUTBOX_MANIFEST_PRIORITY": {
"type": "string",
"value": body.local_outbox.manifest_priority
},
"LOCAL_OUTBOX_DATA_INTEGRITY_TYPE": {
"type": "string",
"value": local_outbox_data_integrity_profile.get(
"type",
None
)
},
"LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILES_PATH": {
"type": "string",
"value": local_outbox_data_integrity_profile.get(
"done_files_path",
None
)
},
"LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": {
"type": "string",
"value": local_outbox_data_integrity_profile.get(
"include_pattern",
None
)
},
"LOCAL_OUTBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": {
"type": "string",
"value": local_outbox_data_integrity_profile.get(
"exclude_pattern",
None
)
},

"REMOTE_INBOX_SYSTEM_ID": {
"type": "string",
"value": body.remote_inbox.system_id
},
"REMOTE_INBOX_DATA_PATH": {
"type": "string",
"value": body.remote_inbox.data_path
},
"REMOTE_INBOX_INCLUDE_PATTERN": {
"type": "string",
"value": body.remote_inbox.include_pattern
},
"REMOTE_INBOX_EXCLUDE_PATTERN": {
"type": "string",
"value": body.remote_inbox.exclude_pattern
},
"REMOTE_INBOX_MANIFESTS_PATH": {
"type": "string",
"value": body.remote_inbox.manifests_path
},
"REMOTE_INBOX_DATA_INTEGRITY_TYPE": {
return {
"REMOTE_OUTBOX": {
"type": "string",
"value": remote_inbox_data_integrity_profile.get(
"type",
None
)
"value": json.dumps(body.remote_outbox.dict())
},
"REMOTE_INBOX_DATA_INTEGRITY_DONE_FILES_PATH": {
"LOCAL_INBOX": {
"type": "string",
"value": remote_inbox_data_integrity_profile.get(
"done_files_path",
None
)
"value": json.dumps(body.local_inbox.dict())
},
"REMOTE_INBOX_DATA_INTEGRITY_DONE_FILE_INCLUDE_PATTERN": {
"LOCAL_OUTBOX": {
"type": "string",
"value": remote_inbox_data_integrity_profile.get(
"include_pattern",
None
)
"value": json.dumps(body.local_outbox.dict())
},
"REMOTE_INBOX_DATA_INTEGRITY_DONE_FILE_EXCLUDE_PATTERN": {
"REMOTE_INBOX": {
"type": "string",
"value": remote_inbox_data_integrity_profile.get(
"exclude_pattern",
None
)
"value": json.dumps(body.remote_inbox.dict())
},
}

Expand Down
24 changes: 2 additions & 22 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,11 @@ class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile):
Field(discriminator="type")
]

class IOBox(BaseModel):
class IOSystem(BaseModel):
writable: bool = True
data_path: str
data_integrity_profile: DataIntegrityProfile = None
manifests_path: str = None
exclude_pattern: str = None
include_pattern: str = None

class DataEgress(IOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest

class DataIngress(IOBox):
pass

class RemoteOutbox(DataEgress):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOnePerFile
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest

class LocalInbox(DataIngress):
pass

class LocalOutbox(DataEgress):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.AutoOneForAll
manifest_priority: EnumManifestPriority = None

class RemoteInbox(DataIngress):
pass

66 changes: 66 additions & 0 deletions src/api/src/backend/views/http/etlV2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from enum import Enum
from typing import Union, Literal, Annotated, List

from pydantic import BaseModel, Field

from .requests import _EnumMeta


class EnumManifestGenerationPolicy(str, Enum, metaclass=_EnumMeta):
Manual = "manual"
AutoOnePerFile = "auto_one_per_file"
AutoOneForAll = "auto_one_for_all"

class EnumManifestPriority(str, Enum, metaclass=_EnumMeta):
Oldest = "oldest"
Newest = "newest"
Any = "any"

class BaseDataIntegrityProfile(BaseModel):
type: Literal["checksum", "byte_check", "done_file"]

class ChecksumDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["checksum"]

class ByteCheckDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["byte_check"]

class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["done_file"]
done_files_path: str
include_pattern: str = None
exclude_pattern: str = None

DataIntegrityProfile = Annotated[
Union[
ChecksumDataIntegrityProfile,
ByteCheckDataIntegrityProfile,
DoneFileDataIntegrityProfile
],
Field(discriminator="type")
]

class IOProfile(BaseModel):
data_path: str
data_integrity_profile: DataIntegrityProfile = None
manifests_path: str = None
exclude_pattern: str = None
include_pattern: str = None

class ETLSystem(BaseModel):
writable: bool = True
ingress_profile: IOProfile
egress_profile: IOProfile

class EnumXferAction(str, Enum, metaclass=_EnumMeta):
PUSH = "push"
PULL = "pull"
NOOP = "noop"

class ETLCycle(BaseModel):
ingress: ETLSystem
egress: ETLSystem
xfer_action: EnumXferAction
transforms: List[object]


Loading

0 comments on commit 29da043

Please sign in to comment.