Skip to content

Commit

Permalink
Add data integrity check functionality. Add lifecycle hook functions …
Browse files Browse the repository at this point in the history
…in function task code
  • Loading branch information
nathandf committed Feb 8, 2024
1 parent b173ae1 commit d822e9e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 37 deletions.
33 changes: 11 additions & 22 deletions src/api/specs/WorkflowsAPI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2860,16 +2860,9 @@ components:
allOf:
- $ref: "#/components/schemas/BaseDataIntegrityProfile"
- type: object
required:
- checksum_algo
- checksums_path
properties:
type:
type: string
checksum_algo:
$ref: "#/components/schemas/ETLEnumChecksumAlgorithm"
checksums_path:
type: string

DoneFileDataIntegrityProfile:
allOf:
Expand All @@ -2882,7 +2875,9 @@ components:
type: string
done_files_path:
type: string
pattern:
include_pattern:
type: string
exclude_pattern:
type: string

ByteCheckDataIntegrityProfile:
Expand Down Expand Up @@ -2924,10 +2919,10 @@ components:
default: 'oldest'
data_integrity_profile:
$ref: "#/components/schemas/DataIntegrityProfile"
include_pattern:
type: string
exclude_pattern:
type: array
items:
type: string
type: string

ETLRemoteOutbox:
type: object
Expand All @@ -2940,7 +2935,7 @@ components:
properties:
manifest_generation_policy:
$ref: "#/components/schemas/ETLEnumManifestGenerationPolicy"
default: 'one_per_file'
default: 'auto_one_per_file'

ETLLocalOutbox:
allOf:
Expand All @@ -2949,7 +2944,7 @@ components:
properties:
manifest_generation_policy:
$ref: "#/components/schemas/ETLEnumManifestGenerationPolicy"
default: 'one_for_all'
default: 'auto_one_for_all'

ETLTapisSystemRemoteInbox:
type: object
Expand All @@ -2969,8 +2964,9 @@ components:
ETLEnumManifestGenerationPolicy:
type: string
enum:
- one_per_file
- one_for_all
- manual
- auto_one_per_file
- auto_one_for_all

ETLEnumManifestPriority:
type: string
Expand All @@ -2986,13 +2982,6 @@ components:
- byte_check
- done_file

ETLEnumChecksumAlgorithm:
type: string
enum:
- md5
- sha256
- sha3

################################# End ETL

ReqCIPipeline:
Expand Down
19 changes: 8 additions & 11 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,30 @@


class EnumManifestGenerationPolicy(str, Enum, metaclass=_EnumMeta):
OnePerFile = "one_per_file"
OneForAll = "one_for_all"
Manual = "manual"
AutoOnePerFile = "auto_one_per_file"
AutoOneForAll = "auto_one_for_all"

class EnumManifestPriority(str, Enum, metaclass=_EnumMeta):
Oldest = "oldest"
Newest = "newest"
Any = "any"

class EnumChecksumAlgorithm(str, Enum, metaclass=_EnumMeta):
MD5 = "md5"
SHA256 = "sha256"
SHA3 = "sha3"

class BaseDataIntegrityProfile(BaseModel):
type: Literal["checksum", "byte_check", "done_file"]

class ChecksumDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["checksum"]
checksum_algo: EnumChecksumAlgorithm
checksums_path: str

class ByteCheckDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["byte_check"]

class DoneFileDataIntegrityProfile(BaseDataIntegrityProfile):
type: Literal["done_file"]
done_files_path: str
pattern: str
include_pattern: str = None
exclude_pattern: str = None


DataIntegrityProfile = Annotated[
Union[
Expand All @@ -52,7 +48,8 @@ class LocalIOBox(BaseModel):
manifests_path: str = None
manifest_generation_policy: EnumManifestGenerationPolicy
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
exclude_pattern: List[str] = []
exclude_pattern: str = None
include_pattern: str = None

class LocalInbox(LocalIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.OnePerFile
Expand Down
44 changes: 40 additions & 4 deletions src/engine/src/owe_python_sdk/runtime/execution_context.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os, sys, json

from functools import partial
from typing import Dict

from owe_python_sdk.runtime.runtime import Runtime
from owe_python_sdk.constants import INPUT_PREFIX


class ExecutionContext:
Expand All @@ -12,6 +14,20 @@ def __init__(self, runtime: Runtime):
self.exec_dir = runtime.EXEC_DIR
self.input_schema = runtime.INPUT_SCHEMA
self.input_ids = list(self.input_schema.keys())
self._hook_active = False
self._exit_hooks: Dict[int, partial] = {0: [],1: []}

def add_hook(self, exit_code: int, hook: callable, *args, **kwargs):
if type(exit_code) != int:
raise TypeError("Error registering hook. Argument 'exit_code' must be an integer")

if not callable(hook):
raise TypeError("Error registering hook. Argument 'hook' must be callable")

self._exit_hooks(exit_code, partial(hook, *args, **kwargs))

return self


def get_input(self, input_id, default=None):
contents = None
Expand Down Expand Up @@ -40,23 +56,43 @@ def set_output(self, _id, value, encoding=None):
with open(os.path.join(self.output_dir, _id), flag, encoding=encoding) as file:
file.write(value)


def stderr(self, code: int, message):
def stderr(self, code: int, message, call_hooks=True):
"""Exits with error"""
if code < 1:
raise Exception("Exit code provided must be an int with a value >= 1")

with open(self._runtime.STDERR, "w") as file:
file.write(message)

call_hooks and self._call_hooks(code)

sys.exit(code)

def stdout(self, value):
def stdout(self, value, call_hooks=True):
"""Exits the process with no error"""
with open(self._runtime.STDOUT, "a") as file:
if type(value) == dict:
value = json.dumps(value)

file.write(str(value))

call_hooks and self._call_hooks(0)

sys.exit(0)

def _call_hooks(self, exit_code):
"""Enables users to trigger functionality when lifecycle methods such as
.stdout and .stderr are called"""
# Prevent hooks from being called recursively
if self._hook_active:
raise Exception("Recursive hook calls are not permitted. This happens when user-defined hooks call functions on the 'runtime.execution_context' object that trigger hooks. Try passing the 'call_hooks' keyword argument as false in you hook.")
# Call each hook in the order that they were registered
for hook in self._exit_hooks.get(exit_code, []):
self._hook_active = True
try:
hook()
except Exception as e:
self.stderr(1, f"Error caught in hook: {str(e)}", call_hooks=False)
self._hook_active = False

execution_context = ExecutionContext(Runtime())

0 comments on commit d822e9e

Please sign in to comment.