diff --git a/eppo_metrics_sync/__main__.py b/eppo_metrics_sync/__main__.py index e30cb19..e6f1417 100644 --- a/eppo_metrics_sync/__main__.py +++ b/eppo_metrics_sync/__main__.py @@ -3,27 +3,32 @@ from eppo_metrics_sync.eppo_metrics_sync import EppoMetricsSync if __name__ == '__main__': - + parser = argparse.ArgumentParser( description="Scan specified directory for Eppo yaml files and sync with Eppo" ) parser.add_argument("directory", help="The directory of yaml files to process") parser.add_argument("--dryrun", action="store_true", help="Run in dry run mode") parser.add_argument("--schema", help="One of: eppo[default], dbt-model", default='eppo') + parser.add_argument("--sync-prefix", help="Used for testing in a shared Q/A workspace. " + "Will use this as a sync tag and append all fact and metric definitions with this prefix.", + required=False + ) parser.add_argument( - "--dbt-model-prefix", - help="The warehouse and schema where the dbt models live", + "--dbt-model-prefix", + help="The warehouse and schema where the dbt models live", default=None ) args = parser.parse_args() eppo_metrics_sync = EppoMetricsSync( - directory = args.directory, + directory=args.directory, schema_type=args.schema, - dbt_model_prefix=args.dbt_model_prefix + dbt_model_prefix=args.dbt_model_prefix, + sync_prefix=args.sync_prefix ) - + if args.dryrun: eppo_metrics_sync.read_yaml_files() eppo_metrics_sync.validate() diff --git a/eppo_metrics_sync/eppo_metrics_sync.py b/eppo_metrics_sync/eppo_metrics_sync.py index cab1a7d..d4dfe83 100644 --- a/eppo_metrics_sync/eppo_metrics_sync.py +++ b/eppo_metrics_sync/eppo_metrics_sync.py @@ -2,7 +2,6 @@ import jsonschema import os import requests -import yaml from eppo_metrics_sync.validation import ( unique_names, @@ -13,16 +12,16 @@ from eppo_metrics_sync.dbt_model_parser import DbtModelParser from eppo_metrics_sync.helper import load_yaml - API_ENDPOINT = 'https://eppo.cloud/api/v1/metrics/sync' class EppoMetricsSync: def __init__( - self, - directory, - schema_type = 'eppo', - dbt_model_prefix = None + self, + directory, + schema_type='eppo', + dbt_model_prefix=None, + sync_prefix=None ): self.directory = directory self.fact_sources = [] @@ -30,6 +29,7 @@ def __init__( self.validation_errors = [] self.schema_type = schema_type self.dbt_model_prefix = dbt_model_prefix + self.sync_prefix = sync_prefix # temporary: ideally would pull this from Eppo API package_root = os.path.dirname(os.path.abspath(__file__)) @@ -37,7 +37,6 @@ def __init__( with open(schema_path) as schema_file: self.schema = json.load(schema_file) - def load_eppo_yaml(self, path): yaml_data = load_yaml(path) if 'fact_sources' in yaml_data: @@ -84,18 +83,25 @@ def read_yaml_files(self): self.validation_errors.append( f"Schema violation in {yaml_path}: \n{valid['error_message']}" ) - + elif self.schema_type == 'dbt-model': self.load_dbt_yaml(yaml_path) - + else: raise ValueError(f'Unexpected schema_type: {self.schema_type}') - + if len(self.fact_sources) == 0 and len(self.metrics) == 0: raise ValueError( 'No valid yaml files found. ' + ', '.join(self.validation_errors) ) + def _add_sync_prefix(self): + for source in self.fact_sources: + source['name'] = f"[{self.sync_prefix}] {source['name']}" + + for metric in self.metrics: + metric['name'] = f"[{self.sync_prefix}] {metric['name']}" + def validate(self): if len(self.fact_sources) == 0 and len(self.metrics) == 0: @@ -113,15 +119,23 @@ def validate(self): return True + def _determine_sync_tag(self): + if self.sync_prefix is not None: + return self.sync_prefix + + os.getenv('EPPO_SYNC_TAG') + def sync(self): self.read_yaml_files() + if self.sync_prefix is not None: + self._add_sync_prefix() self.validate() api_key = os.getenv('EPPO_API_KEY') if not api_key: raise Exception('EPPO_API_KEY not set in environment variables. Please set and try again') - sync_tag = os.getenv('EPPO_SYNC_TAG') + sync_tag = self._determine_sync_tag() if not api_key: raise Exception('EPPO_SYNC_TAG not set in environment variables. Please set and try again')