diff --git a/bigflow/bigquery/dataset_configuration.py b/bigflow/bigquery/dataset_configuration.py index c02bcb44..3750d520 100644 --- a/bigflow/bigquery/dataset_configuration.py +++ b/bigflow/bigquery/dataset_configuration.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict from ..configuration import Config from .interface import Dataset @@ -16,7 +16,8 @@ def __init__(self, is_master: bool = True, is_default: bool = True, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): all_properties = (properties or {}).copy() all_properties['project_id'] = project_id all_properties['dataset_name'] = dataset_name @@ -24,6 +25,7 @@ def __init__(self, all_properties['external_tables'] = external_tables or {} all_properties['tables_labels'] = tables_labels or [] all_properties['dataset_labels'] = dataset_labels or [] + all_properties['job_labels'] = job_labels or [] self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default) @@ -36,7 +38,8 @@ def add_configuration(self, properties: dict = None, is_default: bool = False, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): all_properties = (properties or {}).copy() @@ -57,6 +60,9 @@ def add_configuration(self, if dataset_labels: all_properties['dataset_labels'] = dataset_labels + if job_labels: + all_properties['job_labels'] = job_labels + self.delegate.add_configuration(env, all_properties, is_default=is_default) return self @@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset: external_tables=self.resolve_external_tables(env), extras=self.resolve_extra_properties(env), tables_labels=self.resolve_tables_labels(env), - dataset_labels=self.resolve_dataset_labels(env)) + dataset_labels=self.resolve_dataset_labels(env), + job_labels=self.resolve_job_labels(env)) def resolve_extra_properties(self, env: str = None): return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)} @@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]: def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]: return self.resolve_property('dataset_labels', env) + def resolve_job_labels(self, env: str = None) -> Dict[str, str]: + return self.resolve_property('job_labels', env) + def _is_extra_property(self, property_name) -> bool: - return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels'] \ No newline at end of file + return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels'] diff --git a/bigflow/bigquery/dataset_manager.py b/bigflow/bigquery/dataset_manager.py index 37019e66..4c54b9e3 100644 --- a/bigflow/bigquery/dataset_manager.py +++ b/bigflow/bigquery/dataset_manager.py @@ -265,12 +265,14 @@ class DatasetManager(object): def __init__(self, bigquery_client: 'google.cloud.bigquery.Client', dataset: 'google.cloud.bigquery.Dataset', - logger: logging.Logger): + logger: logging.Logger, + job_labels: Dict[str, str] | None = None): from google.cloud import bigquery self.bigquery_client: bigquery.Client = bigquery_client self.dataset = dataset self.dataset_id = dataset.full_dataset_id.replace(':', '.') self.logger = logger + self.job_labels = job_labels def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator': return self.write(table_id, sql, 'WRITE_TRUNCATE') @@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta job_config.destination = table_id job_config.write_disposition = mode + if self.job_labels: + job_config.job_labels = self.job_labels + job = self.bigquery_client.query(sql, job_config=job_config) return job.result() @@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte return job.result() def collect(self, sql: str) -> 'pandas.DataFrame': - return self._query(sql).to_dataframe() + from google.cloud import bigquery + job_config = bigquery.QueryJobConfig() + + if self.job_labels: + job_config.job_labels = self.job_labels + + return self._query(sql, job_config=job_config).to_dataframe() def collect_list( self, @@ -482,7 +493,8 @@ def create_dataset_manager( location: str = DEFAULT_LOCATION, logger: Logger | None = None, tables_labels: Dict[str, Dict[str, str]] | None = None, - dataset_labels: Dict[str, str] | None = None + dataset_labels: Dict[str, str] | None = None, + job_labels: Dict[str, str] | None = None ) -> tp.Tuple[str, PartitionedDatasetManager]: """ Dataset manager factory. @@ -501,6 +513,7 @@ def create_dataset_manager( :param logger: custom logger. :param tables_labels: Dict with key as table_name and value as list of key/valued labels. :param dataset_labels: Dict with key/valued labels. + :param job_labels: Dict with key/valued labels. :return: tuple (full dataset ID, dataset manager). """ dataset_name = dataset_name or random_uuid(suffix='_test_case') @@ -516,6 +529,6 @@ def create_dataset_manager( upsert_tables_labels(dataset_name, tables_labels, client) - core_dataset_manager = DatasetManager(client, dataset, logger) + core_dataset_manager = DatasetManager(client, dataset, logger, job_labels) templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime) return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime)) diff --git a/bigflow/bigquery/interactive.py b/bigflow/bigquery/interactive.py index 37922e80..19a76dc4 100644 --- a/bigflow/bigquery/interactive.py +++ b/bigflow/bigquery/interactive.py @@ -52,7 +52,7 @@ def decorator(standard_component): class InteractiveDatasetManager(Dataset): - """Let's you run operations on a dataset, without the need of creating a component.""" + """Lets you run operations on a dataset, without the need of creating a component.""" def __init__(self, project_id: str, @@ -63,7 +63,8 @@ def __init__(self, extras: Dict = None, location: str = DEFAULT_LOCATION, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): self.config = DatasetConfigInternal( project_id=project_id, dataset_name=dataset_name, @@ -73,7 +74,8 @@ def __init__(self, extras=extras, location=location, tables_labels=tables_labels, - dataset_labels=dataset_labels) + dataset_labels=dataset_labels, + job_labels=job_labels) logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict()) def write_truncate(self, table_name, sql, partitioned=True): @@ -488,7 +490,8 @@ def __init__(self, extras=None, location=DEFAULT_LOCATION, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None ): self.project_id = project_id self.dataset_name = dataset_name @@ -499,6 +502,7 @@ def __init__(self, self.location = location self.tables_labels = tables_labels or {} self.dataset_labels = dataset_labels or {} + self.job_labels = job_labels or {} def _as_dict(self): return { @@ -510,7 +514,8 @@ def _as_dict(self): 'extras': self.extras, 'location': self.location, 'tables_labels': self.tables_labels, - 'dataset_labels': self.dataset_labels + 'dataset_labels': self.dataset_labels, + 'job_labels': self.job_labels }