diff --git a/CHANGELOG.md b/CHANGELOG.md index ffecb1c6..34bc184d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,20 @@ # BigFlow changelog +## Version 1.10.0 + +### Added + +* BigQuery query job labeling for collect and write operations. Labels are passed via `job_labels` dict argument in `DatasetConfiguration` and `DatasetManager`. + ## Version 1.9.0 -### Changes +### Changed * Switched from Google Container Registry to Artifact Registry. Made `-r`/`--docker-repository` common for all deploy commands. Build and deploy commands authenticate to the Docker repository taken from `deployment_config.py` or CLI arguments, instead of hardcoded `https://eu.gcr.io`. ## Version 1.8.0 -### Changes +### Changed * Bumped basic dependencies: Apache Beam 2.48.0, google-cloud-bigtable 2.17.0, google-cloud-language 2.10.0, google-cloud-storage 2.11.2, among others (#374). * Added the `env_variable` argument to `bigflow.Workflow` which enables to change a name of the variable used to obtain environment name (#365). diff --git a/bigflow/bigquery/dataset_configuration.py b/bigflow/bigquery/dataset_configuration.py index c02bcb44..3750d520 100644 --- a/bigflow/bigquery/dataset_configuration.py +++ b/bigflow/bigquery/dataset_configuration.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict from ..configuration import Config from .interface import Dataset @@ -16,7 +16,8 @@ def __init__(self, is_master: bool = True, is_default: bool = True, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): all_properties = (properties or {}).copy() all_properties['project_id'] = project_id all_properties['dataset_name'] = dataset_name @@ -24,6 +25,7 @@ def __init__(self, all_properties['external_tables'] = external_tables or {} all_properties['tables_labels'] = tables_labels or [] all_properties['dataset_labels'] = dataset_labels or [] + all_properties['job_labels'] = job_labels or [] self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default) @@ -36,7 +38,8 @@ def add_configuration(self, properties: dict = None, is_default: bool = False, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): all_properties = (properties or {}).copy() @@ -57,6 +60,9 @@ def add_configuration(self, if dataset_labels: all_properties['dataset_labels'] = dataset_labels + if job_labels: + all_properties['job_labels'] = job_labels + self.delegate.add_configuration(env, all_properties, is_default=is_default) return self @@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset: external_tables=self.resolve_external_tables(env), extras=self.resolve_extra_properties(env), tables_labels=self.resolve_tables_labels(env), - dataset_labels=self.resolve_dataset_labels(env)) + dataset_labels=self.resolve_dataset_labels(env), + job_labels=self.resolve_job_labels(env)) def resolve_extra_properties(self, env: str = None): return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)} @@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]: def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]: return self.resolve_property('dataset_labels', env) + def resolve_job_labels(self, env: str = None) -> Dict[str, str]: + return self.resolve_property('job_labels', env) + def _is_extra_property(self, property_name) -> bool: - return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels'] \ No newline at end of file + return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels'] diff --git a/bigflow/bigquery/dataset_manager.py b/bigflow/bigquery/dataset_manager.py index 37019e66..51252bbc 100644 --- a/bigflow/bigquery/dataset_manager.py +++ b/bigflow/bigquery/dataset_manager.py @@ -265,12 +265,14 @@ class DatasetManager(object): def __init__(self, bigquery_client: 'google.cloud.bigquery.Client', dataset: 'google.cloud.bigquery.Dataset', - logger: logging.Logger): + logger: logging.Logger, + job_labels: Dict[str, str] | None = None): from google.cloud import bigquery self.bigquery_client: bigquery.Client = bigquery_client self.dataset = dataset self.dataset_id = dataset.full_dataset_id.replace(':', '.') self.logger = logger + self.job_labels = job_labels def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator': return self.write(table_id, sql, 'WRITE_TRUNCATE') @@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta job_config.destination = table_id job_config.write_disposition = mode + if self.job_labels: + job_config.labels = self.job_labels + job = self.bigquery_client.query(sql, job_config=job_config) return job.result() @@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte return job.result() def collect(self, sql: str) -> 'pandas.DataFrame': - return self._query(sql).to_dataframe() + from google.cloud import bigquery + job_config = bigquery.QueryJobConfig() + + if self.job_labels: + job_config.labels = self.job_labels + + return self._query(sql, job_config=job_config).to_dataframe() def collect_list( self, @@ -482,7 +493,8 @@ def create_dataset_manager( location: str = DEFAULT_LOCATION, logger: Logger | None = None, tables_labels: Dict[str, Dict[str, str]] | None = None, - dataset_labels: Dict[str, str] | None = None + dataset_labels: Dict[str, str] | None = None, + job_labels: Dict[str, str] | None = None ) -> tp.Tuple[str, PartitionedDatasetManager]: """ Dataset manager factory. @@ -501,6 +513,7 @@ def create_dataset_manager( :param logger: custom logger. :param tables_labels: Dict with key as table_name and value as list of key/valued labels. :param dataset_labels: Dict with key/valued labels. + :param job_labels: Dict with key/valued labels. :return: tuple (full dataset ID, dataset manager). """ dataset_name = dataset_name or random_uuid(suffix='_test_case') @@ -516,6 +529,6 @@ def create_dataset_manager( upsert_tables_labels(dataset_name, tables_labels, client) - core_dataset_manager = DatasetManager(client, dataset, logger) + core_dataset_manager = DatasetManager(client, dataset, logger, job_labels) templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime) return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime)) diff --git a/bigflow/bigquery/interactive.py b/bigflow/bigquery/interactive.py index 37922e80..19a76dc4 100644 --- a/bigflow/bigquery/interactive.py +++ b/bigflow/bigquery/interactive.py @@ -52,7 +52,7 @@ def decorator(standard_component): class InteractiveDatasetManager(Dataset): - """Let's you run operations on a dataset, without the need of creating a component.""" + """Lets you run operations on a dataset, without the need of creating a component.""" def __init__(self, project_id: str, @@ -63,7 +63,8 @@ def __init__(self, extras: Dict = None, location: str = DEFAULT_LOCATION, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None): + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None): self.config = DatasetConfigInternal( project_id=project_id, dataset_name=dataset_name, @@ -73,7 +74,8 @@ def __init__(self, extras=extras, location=location, tables_labels=tables_labels, - dataset_labels=dataset_labels) + dataset_labels=dataset_labels, + job_labels=job_labels) logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict()) def write_truncate(self, table_name, sql, partitioned=True): @@ -488,7 +490,8 @@ def __init__(self, extras=None, location=DEFAULT_LOCATION, tables_labels: Dict[str, Dict[str, str]] = None, - dataset_labels: Dict[str, str] = None + dataset_labels: Dict[str, str] = None, + job_labels: Dict[str, str] = None ): self.project_id = project_id self.dataset_name = dataset_name @@ -499,6 +502,7 @@ def __init__(self, self.location = location self.tables_labels = tables_labels or {} self.dataset_labels = dataset_labels or {} + self.job_labels = job_labels or {} def _as_dict(self): return { @@ -510,7 +514,8 @@ def _as_dict(self): 'extras': self.extras, 'location': self.location, 'tables_labels': self.tables_labels, - 'dataset_labels': self.dataset_labels + 'dataset_labels': self.dataset_labels, + 'job_labels': self.job_labels } diff --git a/docs/technologies.md b/docs/technologies.md index 26d26cdf..015649ac 100644 --- a/docs/technologies.md +++ b/docs/technologies.md @@ -309,6 +309,7 @@ to call BigQuery SQL. Fully qualified names of internal tables are resolved to `{project_id}.{dataset_name}.{table_name}`. * `external_tables` — Dict that defines aliases for external table names. Fully qualified names of those tables have to be declared explicitly. +* `job_labels` — Dict of labels that will be set on BigQuery jobs. The distinction between internal and external tables shouldn't be treated too seriously. Internal means `mine`. External means any other. It's just a naming convention. @@ -511,7 +512,7 @@ The `table_labels` and `dataset_labels` parameters allow your workflow to create On the first run, tables are not created yet, so we can not create labels then. Labels are added on second and later run when tables are already created. ```python -from bigflow.bigquery import DatasetConfig +from bigflow.bigquery import DatasetConfig dataset_config = DatasetConfig( env='dev', @@ -526,8 +527,19 @@ dataset_config = DatasetConfig( } }, dataset_labels={"dataset_label_1": "value_1", "dataset_label_2": "value_2"}).create_dataset_manager() +``` +The `job_labels` argument allows to label BigQuery job. It is passed to [`QueryJobConfig.labels`](https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_labels) +in `write` and `collect` methods of `DatasetManager`. -``` +```python +from bigflow.bigquery import DatasetConfig -You can us it as an ad-hoc tool or put a labeling job to a workflow as well. \ No newline at end of file +dataset_config = DatasetConfig( + env='dev', + project_id='your-project-id', + dataset_name='example_dataset', + internal_tables=['example_table'], + external_tables={}, + job_labels={"owner": "John Doe"}).create_dataset_manager() +``` diff --git a/test/test_job.py b/test/test_job.py index 644b21db..a2f917ed 100644 --- a/test/test_job.py +++ b/test/test_job.py @@ -28,6 +28,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2): 'location': 'EU', 'tables_labels': {}, 'dataset_labels': {}, + 'job_labels': {}, }) # and @@ -42,6 +43,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2): 'location': 'EU', 'tables_labels': {}, 'dataset_labels': {}, + 'job_labels': {}, }) job = Job(component=test_component,