Skip to content

Commit

Permalink
BQ write and query jobs labeling
Browse files Browse the repository at this point in the history
  • Loading branch information
artnowo-alle committed Apr 10, 2024
1 parent aab532e commit 5942597
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
20 changes: 15 additions & 5 deletions bigflow/bigquery/dataset_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict

from ..configuration import Config
from .interface import Dataset
Expand All @@ -16,14 +16,16 @@ def __init__(self,
is_master: bool = True,
is_default: bool = True,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
all_properties = (properties or {}).copy()
all_properties['project_id'] = project_id
all_properties['dataset_name'] = dataset_name
all_properties['internal_tables'] = internal_tables or []
all_properties['external_tables'] = external_tables or {}
all_properties['tables_labels'] = tables_labels or []
all_properties['dataset_labels'] = dataset_labels or []
all_properties['job_labels'] = job_labels or []

self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default)

Expand All @@ -36,7 +38,8 @@ def add_configuration(self,
properties: dict = None,
is_default: bool = False,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):

all_properties = (properties or {}).copy()

Expand All @@ -57,6 +60,9 @@ def add_configuration(self,
if dataset_labels:
all_properties['dataset_labels'] = dataset_labels

if job_labels:
all_properties['job_labels'] = job_labels

self.delegate.add_configuration(env, all_properties, is_default=is_default)
return self

Expand All @@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset:
external_tables=self.resolve_external_tables(env),
extras=self.resolve_extra_properties(env),
tables_labels=self.resolve_tables_labels(env),
dataset_labels=self.resolve_dataset_labels(env))
dataset_labels=self.resolve_dataset_labels(env),
job_labels=self.resolve_job_labels(env))

def resolve_extra_properties(self, env: str = None):
return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)}
Expand Down Expand Up @@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]:
def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('dataset_labels', env)

def resolve_job_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('job_labels', env)

def _is_extra_property(self, property_name) -> bool:
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels']
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels']
21 changes: 17 additions & 4 deletions bigflow/bigquery/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,14 @@ class DatasetManager(object):
def __init__(self,
bigquery_client: 'google.cloud.bigquery.Client',
dataset: 'google.cloud.bigquery.Dataset',
logger: logging.Logger):
logger: logging.Logger,
job_labels: Dict[str, str] | None = None):
from google.cloud import bigquery
self.bigquery_client: bigquery.Client = bigquery_client
self.dataset = dataset
self.dataset_id = dataset.full_dataset_id.replace(':', '.')
self.logger = logger
self.job_labels = job_labels

def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator':
return self.write(table_id, sql, 'WRITE_TRUNCATE')
Expand All @@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta
job_config.destination = table_id
job_config.write_disposition = mode

if self.job_labels:
job_config.job_labels = self.job_labels

job = self.bigquery_client.query(sql, job_config=job_config)
return job.result()

Expand Down Expand Up @@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte
return job.result()

def collect(self, sql: str) -> 'pandas.DataFrame':
return self._query(sql).to_dataframe()
from google.cloud import bigquery
job_config = bigquery.QueryJobConfig()

if self.job_labels:
job_config.job_labels = self.job_labels

return self._query(sql, job_config=job_config).to_dataframe()

def collect_list(
self,
Expand Down Expand Up @@ -482,7 +493,8 @@ def create_dataset_manager(
location: str = DEFAULT_LOCATION,
logger: Logger | None = None,
tables_labels: Dict[str, Dict[str, str]] | None = None,
dataset_labels: Dict[str, str] | None = None
dataset_labels: Dict[str, str] | None = None,
job_labels: Dict[str, str] | None = None
) -> tp.Tuple[str, PartitionedDatasetManager]:
"""
Dataset manager factory.
Expand All @@ -501,6 +513,7 @@ def create_dataset_manager(
:param logger: custom logger.
:param tables_labels: Dict with key as table_name and value as list of key/valued labels.
:param dataset_labels: Dict with key/valued labels.
:param job_labels: Dict with key/valued labels.
:return: tuple (full dataset ID, dataset manager).
"""
dataset_name = dataset_name or random_uuid(suffix='_test_case')
Expand All @@ -516,6 +529,6 @@ def create_dataset_manager(

upsert_tables_labels(dataset_name, tables_labels, client)

core_dataset_manager = DatasetManager(client, dataset, logger)
core_dataset_manager = DatasetManager(client, dataset, logger, job_labels)
templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime)
return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime))
15 changes: 10 additions & 5 deletions bigflow/bigquery/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def decorator(standard_component):


class InteractiveDatasetManager(Dataset):
"""Let's you run operations on a dataset, without the need of creating a component."""
"""Lets you run operations on a dataset, without the need of creating a component."""

def __init__(self,
project_id: str,
Expand All @@ -63,7 +63,8 @@ def __init__(self,
extras: Dict = None,
location: str = DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
self.config = DatasetConfigInternal(
project_id=project_id,
dataset_name=dataset_name,
Expand All @@ -73,7 +74,8 @@ def __init__(self,
extras=extras,
location=location,
tables_labels=tables_labels,
dataset_labels=dataset_labels)
dataset_labels=dataset_labels,
job_labels=job_labels)
logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict())

def write_truncate(self, table_name, sql, partitioned=True):
Expand Down Expand Up @@ -488,7 +490,8 @@ def __init__(self,
extras=None,
location=DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None
):
self.project_id = project_id
self.dataset_name = dataset_name
Expand All @@ -499,6 +502,7 @@ def __init__(self,
self.location = location
self.tables_labels = tables_labels or {}
self.dataset_labels = dataset_labels or {}
self.job_labels = job_labels or {}

def _as_dict(self):
return {
Expand All @@ -510,7 +514,8 @@ def _as_dict(self):
'extras': self.extras,
'location': self.location,
'tables_labels': self.tables_labels,
'dataset_labels': self.dataset_labels
'dataset_labels': self.dataset_labels,
'job_labels': self.job_labels
}


Expand Down

0 comments on commit 5942597

Please sign in to comment.