Skip to content

Commit

Permalink
Merge pull request #10 from Eppo-exp/add-sync-prefix
Browse files Browse the repository at this point in the history
Added a sync prefix
  • Loading branch information
tbuffington7 authored Aug 20, 2024
2 parents 2c5f87b + f04bf8e commit c2670b8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
17 changes: 11 additions & 6 deletions eppo_metrics_sync/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,32 @@
from eppo_metrics_sync.eppo_metrics_sync import EppoMetricsSync

if __name__ == '__main__':

parser = argparse.ArgumentParser(
description="Scan specified directory for Eppo yaml files and sync with Eppo"
)
parser.add_argument("directory", help="The directory of yaml files to process")
parser.add_argument("--dryrun", action="store_true", help="Run in dry run mode")
parser.add_argument("--schema", help="One of: eppo[default], dbt-model", default='eppo')
parser.add_argument("--sync-prefix", help="Used for testing in a shared Q/A workspace. "
"Will use this as a sync tag and append all fact and metric definitions with this prefix.",
required=False
)
parser.add_argument(
"--dbt-model-prefix",
help="The warehouse and schema where the dbt models live",
"--dbt-model-prefix",
help="The warehouse and schema where the dbt models live",
default=None
)

args = parser.parse_args()

eppo_metrics_sync = EppoMetricsSync(
directory = args.directory,
directory=args.directory,
schema_type=args.schema,
dbt_model_prefix=args.dbt_model_prefix
dbt_model_prefix=args.dbt_model_prefix,
sync_prefix=args.sync_prefix
)

if args.dryrun:
eppo_metrics_sync.read_yaml_files()
eppo_metrics_sync.validate()
Expand Down
36 changes: 25 additions & 11 deletions eppo_metrics_sync/eppo_metrics_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import jsonschema
import os
import requests
import yaml

from eppo_metrics_sync.validation import (
unique_names,
Expand All @@ -13,31 +12,31 @@
from eppo_metrics_sync.dbt_model_parser import DbtModelParser
from eppo_metrics_sync.helper import load_yaml


API_ENDPOINT = 'https://eppo.cloud/api/v1/metrics/sync'


class EppoMetricsSync:
def __init__(
self,
directory,
schema_type = 'eppo',
dbt_model_prefix = None
self,
directory,
schema_type='eppo',
dbt_model_prefix=None,
sync_prefix=None
):
self.directory = directory
self.fact_sources = []
self.metrics = []
self.validation_errors = []
self.schema_type = schema_type
self.dbt_model_prefix = dbt_model_prefix
self.sync_prefix = sync_prefix

# temporary: ideally would pull this from Eppo API
package_root = os.path.dirname(os.path.abspath(__file__))
schema_path = os.path.join(package_root, 'schema', 'eppo_metric_schema.json')
with open(schema_path) as schema_file:
self.schema = json.load(schema_file)


def load_eppo_yaml(self, path):
yaml_data = load_yaml(path)
if 'fact_sources' in yaml_data:
Expand Down Expand Up @@ -84,18 +83,25 @@ def read_yaml_files(self):
self.validation_errors.append(
f"Schema violation in {yaml_path}: \n{valid['error_message']}"
)

elif self.schema_type == 'dbt-model':
self.load_dbt_yaml(yaml_path)

else:
raise ValueError(f'Unexpected schema_type: {self.schema_type}')

if len(self.fact_sources) == 0 and len(self.metrics) == 0:
raise ValueError(
'No valid yaml files found. ' + ', '.join(self.validation_errors)
)

def _add_sync_prefix(self):
for source in self.fact_sources:
source['name'] = f"[{self.sync_prefix}] {source['name']}"

for metric in self.metrics:
metric['name'] = f"[{self.sync_prefix}] {metric['name']}"

def validate(self):

if len(self.fact_sources) == 0 and len(self.metrics) == 0:
Expand All @@ -113,15 +119,23 @@ def validate(self):

return True

def _determine_sync_tag(self):
if self.sync_prefix is not None:
return self.sync_prefix

os.getenv('EPPO_SYNC_TAG')

def sync(self):
self.read_yaml_files()
if self.sync_prefix is not None:
self._add_sync_prefix()
self.validate()

api_key = os.getenv('EPPO_API_KEY')
if not api_key:
raise Exception('EPPO_API_KEY not set in environment variables. Please set and try again')

sync_tag = os.getenv('EPPO_SYNC_TAG')
sync_tag = self._determine_sync_tag()
if not api_key:
raise Exception('EPPO_SYNC_TAG not set in environment variables. Please set and try again')

Expand Down

0 comments on commit c2670b8

Please sign in to comment.