Skip to content

Commit

Permalink
BQ write and query jobs labeling (#381)
Browse files Browse the repository at this point in the history
* BQ write and query jobs labeling

* Fixed failing test

* QueryJobConfig.labels instead of job_labels

* Changelog and docs
  • Loading branch information
artnowo-alle authored May 29, 2024
1 parent fcd1226 commit 420c82c
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 19 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# BigFlow changelog

## Version 1.10.0

### Added

* BigQuery query job labeling for collect and write operations. Labels are passed via `job_labels` dict argument in `DatasetConfiguration` and `DatasetManager`.

## Version 1.9.0

### Changes
### Changed

* Switched from Google Container Registry to Artifact Registry. Made `-r`/`--docker-repository` common for all deploy commands. Build and deploy commands authenticate to the Docker repository taken from `deployment_config.py` or CLI arguments, instead of hardcoded `https://eu.gcr.io`.

## Version 1.8.0

### Changes
### Changed

* Bumped basic dependencies: Apache Beam 2.48.0, google-cloud-bigtable 2.17.0, google-cloud-language 2.10.0, google-cloud-storage 2.11.2, among others (#374).
* Added the `env_variable` argument to `bigflow.Workflow` which enables to change a name of the variable used to obtain environment name (#365).
Expand Down
20 changes: 15 additions & 5 deletions bigflow/bigquery/dataset_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict

from ..configuration import Config
from .interface import Dataset
Expand All @@ -16,14 +16,16 @@ def __init__(self,
is_master: bool = True,
is_default: bool = True,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
all_properties = (properties or {}).copy()
all_properties['project_id'] = project_id
all_properties['dataset_name'] = dataset_name
all_properties['internal_tables'] = internal_tables or []
all_properties['external_tables'] = external_tables or {}
all_properties['tables_labels'] = tables_labels or []
all_properties['dataset_labels'] = dataset_labels or []
all_properties['job_labels'] = job_labels or []

self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default)

Expand All @@ -36,7 +38,8 @@ def add_configuration(self,
properties: dict = None,
is_default: bool = False,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):

all_properties = (properties or {}).copy()

Expand All @@ -57,6 +60,9 @@ def add_configuration(self,
if dataset_labels:
all_properties['dataset_labels'] = dataset_labels

if job_labels:
all_properties['job_labels'] = job_labels

self.delegate.add_configuration(env, all_properties, is_default=is_default)
return self

Expand All @@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset:
external_tables=self.resolve_external_tables(env),
extras=self.resolve_extra_properties(env),
tables_labels=self.resolve_tables_labels(env),
dataset_labels=self.resolve_dataset_labels(env))
dataset_labels=self.resolve_dataset_labels(env),
job_labels=self.resolve_job_labels(env))

def resolve_extra_properties(self, env: str = None):
return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)}
Expand Down Expand Up @@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]:
def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('dataset_labels', env)

def resolve_job_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('job_labels', env)

def _is_extra_property(self, property_name) -> bool:
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels']
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels']
21 changes: 17 additions & 4 deletions bigflow/bigquery/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,14 @@ class DatasetManager(object):
def __init__(self,
bigquery_client: 'google.cloud.bigquery.Client',
dataset: 'google.cloud.bigquery.Dataset',
logger: logging.Logger):
logger: logging.Logger,
job_labels: Dict[str, str] | None = None):
from google.cloud import bigquery
self.bigquery_client: bigquery.Client = bigquery_client
self.dataset = dataset
self.dataset_id = dataset.full_dataset_id.replace(':', '.')
self.logger = logger
self.job_labels = job_labels

def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator':
return self.write(table_id, sql, 'WRITE_TRUNCATE')
Expand All @@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta
job_config.destination = table_id
job_config.write_disposition = mode

if self.job_labels:
job_config.labels = self.job_labels

job = self.bigquery_client.query(sql, job_config=job_config)
return job.result()

Expand Down Expand Up @@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte
return job.result()

def collect(self, sql: str) -> 'pandas.DataFrame':
return self._query(sql).to_dataframe()
from google.cloud import bigquery
job_config = bigquery.QueryJobConfig()

if self.job_labels:
job_config.labels = self.job_labels

return self._query(sql, job_config=job_config).to_dataframe()

def collect_list(
self,
Expand Down Expand Up @@ -482,7 +493,8 @@ def create_dataset_manager(
location: str = DEFAULT_LOCATION,
logger: Logger | None = None,
tables_labels: Dict[str, Dict[str, str]] | None = None,
dataset_labels: Dict[str, str] | None = None
dataset_labels: Dict[str, str] | None = None,
job_labels: Dict[str, str] | None = None
) -> tp.Tuple[str, PartitionedDatasetManager]:
"""
Dataset manager factory.
Expand All @@ -501,6 +513,7 @@ def create_dataset_manager(
:param logger: custom logger.
:param tables_labels: Dict with key as table_name and value as list of key/valued labels.
:param dataset_labels: Dict with key/valued labels.
:param job_labels: Dict with key/valued labels.
:return: tuple (full dataset ID, dataset manager).
"""
dataset_name = dataset_name or random_uuid(suffix='_test_case')
Expand All @@ -516,6 +529,6 @@ def create_dataset_manager(

upsert_tables_labels(dataset_name, tables_labels, client)

core_dataset_manager = DatasetManager(client, dataset, logger)
core_dataset_manager = DatasetManager(client, dataset, logger, job_labels)
templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime)
return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime))
15 changes: 10 additions & 5 deletions bigflow/bigquery/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def decorator(standard_component):


class InteractiveDatasetManager(Dataset):
"""Let's you run operations on a dataset, without the need of creating a component."""
"""Lets you run operations on a dataset, without the need of creating a component."""

def __init__(self,
project_id: str,
Expand All @@ -63,7 +63,8 @@ def __init__(self,
extras: Dict = None,
location: str = DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
self.config = DatasetConfigInternal(
project_id=project_id,
dataset_name=dataset_name,
Expand All @@ -73,7 +74,8 @@ def __init__(self,
extras=extras,
location=location,
tables_labels=tables_labels,
dataset_labels=dataset_labels)
dataset_labels=dataset_labels,
job_labels=job_labels)
logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict())

def write_truncate(self, table_name, sql, partitioned=True):
Expand Down Expand Up @@ -488,7 +490,8 @@ def __init__(self,
extras=None,
location=DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None
):
self.project_id = project_id
self.dataset_name = dataset_name
Expand All @@ -499,6 +502,7 @@ def __init__(self,
self.location = location
self.tables_labels = tables_labels or {}
self.dataset_labels = dataset_labels or {}
self.job_labels = job_labels or {}

def _as_dict(self):
return {
Expand All @@ -510,7 +514,8 @@ def _as_dict(self):
'extras': self.extras,
'location': self.location,
'tables_labels': self.tables_labels,
'dataset_labels': self.dataset_labels
'dataset_labels': self.dataset_labels,
'job_labels': self.job_labels
}


Expand Down
18 changes: 15 additions & 3 deletions docs/technologies.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ to call BigQuery SQL.
Fully qualified names of internal tables are resolved to `{project_id}.{dataset_name}.{table_name}`.
* `external_tables` — Dict that defines aliases for external table names.
Fully qualified names of those tables have to be declared explicitly.
* `job_labels` — Dict of labels that will be set on BigQuery jobs.

The distinction between internal and external tables shouldn't be treated too seriously.
Internal means `mine`. External means any other. It's just a naming convention.
Expand Down Expand Up @@ -511,7 +512,7 @@ The `table_labels` and `dataset_labels` parameters allow your workflow to create
On the first run, tables are not created yet, so we can not create labels then. Labels are added on second and later run when tables are already created.

```python
from bigflow.bigquery import DatasetConfig
from bigflow.bigquery import DatasetConfig

dataset_config = DatasetConfig(
env='dev',
Expand All @@ -526,8 +527,19 @@ dataset_config = DatasetConfig(
}
},
dataset_labels={"dataset_label_1": "value_1", "dataset_label_2": "value_2"}).create_dataset_manager()
```

The `job_labels` argument allows to label BigQuery job. It is passed to [`QueryJobConfig.labels`](https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_labels)
in `write` and `collect` methods of `DatasetManager`.

```
```python
from bigflow.bigquery import DatasetConfig

You can us it as an ad-hoc tool or put a labeling job to a workflow as well.
dataset_config = DatasetConfig(
env='dev',
project_id='your-project-id',
dataset_name='example_dataset',
internal_tables=['example_table'],
external_tables={},
job_labels={"owner": "John Doe"}).create_dataset_manager()
```
2 changes: 2 additions & 0 deletions test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
'location': 'EU',
'tables_labels': {},
'dataset_labels': {},
'job_labels': {},
})

# and
Expand All @@ -42,6 +43,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
'location': 'EU',
'tables_labels': {},
'dataset_labels': {},
'job_labels': {},
})

job = Job(component=test_component,
Expand Down

0 comments on commit 420c82c

Please sign in to comment.