Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BQ write and query jobs labeling #381

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions bigflow/bigquery/dataset_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict

from ..configuration import Config
from .interface import Dataset
Expand All @@ -16,14 +16,16 @@ def __init__(self,
is_master: bool = True,
is_default: bool = True,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
all_properties = (properties or {}).copy()
artnowo-alle marked this conversation as resolved.
Show resolved Hide resolved
all_properties['project_id'] = project_id
all_properties['dataset_name'] = dataset_name
all_properties['internal_tables'] = internal_tables or []
all_properties['external_tables'] = external_tables or {}
all_properties['tables_labels'] = tables_labels or []
all_properties['dataset_labels'] = dataset_labels or []
all_properties['job_labels'] = job_labels or []

self.delegate = Config(name=env, properties=all_properties, is_master=is_master, is_default=is_default)

Expand All @@ -36,7 +38,8 @@ def add_configuration(self,
properties: dict = None,
is_default: bool = False,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):

artnowo-alle marked this conversation as resolved.
Show resolved Hide resolved
all_properties = (properties or {}).copy()

Expand All @@ -57,6 +60,9 @@ def add_configuration(self,
if dataset_labels:
all_properties['dataset_labels'] = dataset_labels

if job_labels:
all_properties['job_labels'] = job_labels

self.delegate.add_configuration(env, all_properties, is_default=is_default)
return self

Expand All @@ -68,7 +74,8 @@ def create_dataset_manager(self, env: str = None) -> Dataset:
external_tables=self.resolve_external_tables(env),
extras=self.resolve_extra_properties(env),
tables_labels=self.resolve_tables_labels(env),
dataset_labels=self.resolve_dataset_labels(env))
dataset_labels=self.resolve_dataset_labels(env),
job_labels=self.resolve_job_labels(env))

def resolve_extra_properties(self, env: str = None):
return {k: v for (k, v) in self.resolve(env).items() if self._is_extra_property(k)}
Expand Down Expand Up @@ -103,5 +110,8 @@ def resolve_tables_labels(self, env: str = None) -> Dict[str, Dict[str, str]]:
def resolve_dataset_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('dataset_labels', env)

def resolve_job_labels(self, env: str = None) -> Dict[str, str]:
return self.resolve_property('job_labels', env)

def _is_extra_property(self, property_name) -> bool:
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels']
return property_name not in ['project_id','dataset_name','internal_tables','external_tables', 'env', 'dataset_labels', 'tables_labels', 'job_labels']
21 changes: 17 additions & 4 deletions bigflow/bigquery/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,14 @@ class DatasetManager(object):
def __init__(self,
bigquery_client: 'google.cloud.bigquery.Client',
dataset: 'google.cloud.bigquery.Dataset',
logger: logging.Logger):
logger: logging.Logger,
job_labels: Dict[str, str] | None = None):
from google.cloud import bigquery
self.bigquery_client: bigquery.Client = bigquery_client
self.dataset = dataset
self.dataset_id = dataset.full_dataset_id.replace(':', '.')
self.logger = logger
self.job_labels = job_labels

def write_tmp(self, table_id: str, sql: str) -> 'google.cloud.bigquery.table.RowIterator':
return self.write(table_id, sql, 'WRITE_TRUNCATE')
Expand All @@ -285,6 +287,9 @@ def write(self, table_id: str, sql: str, mode: str) -> 'google.cloud.bigquery.ta
job_config.destination = table_id
job_config.write_disposition = mode

if self.job_labels:
job_config.labels = self.job_labels
artnowo-alle marked this conversation as resolved.
Show resolved Hide resolved

job = self.bigquery_client.query(sql, job_config=job_config)
return job.result()

Expand Down Expand Up @@ -314,7 +319,13 @@ def create_table(self, create_query: str) -> 'google.cloud.bigquery.table.RowIte
return job.result()

def collect(self, sql: str) -> 'pandas.DataFrame':
return self._query(sql).to_dataframe()
from google.cloud import bigquery
job_config = bigquery.QueryJobConfig()

if self.job_labels:
job_config.labels = self.job_labels

return self._query(sql, job_config=job_config).to_dataframe()

def collect_list(
self,
Expand Down Expand Up @@ -482,7 +493,8 @@ def create_dataset_manager(
location: str = DEFAULT_LOCATION,
logger: Logger | None = None,
tables_labels: Dict[str, Dict[str, str]] | None = None,
dataset_labels: Dict[str, str] | None = None
dataset_labels: Dict[str, str] | None = None,
job_labels: Dict[str, str] | None = None
) -> tp.Tuple[str, PartitionedDatasetManager]:
"""
Dataset manager factory.
Expand All @@ -501,6 +513,7 @@ def create_dataset_manager(
:param logger: custom logger.
:param tables_labels: Dict with key as table_name and value as list of key/valued labels.
:param dataset_labels: Dict with key/valued labels.
:param job_labels: Dict with key/valued labels.
:return: tuple (full dataset ID, dataset manager).
"""
dataset_name = dataset_name or random_uuid(suffix='_test_case')
Expand All @@ -516,6 +529,6 @@ def create_dataset_manager(

upsert_tables_labels(dataset_name, tables_labels, client)

core_dataset_manager = DatasetManager(client, dataset, logger)
core_dataset_manager = DatasetManager(client, dataset, logger, job_labels)
templated_dataset_manager = TemplatedDatasetManager(core_dataset_manager, internal_tables, external_tables, extras, runtime)
return dataset.full_dataset_id.replace(':', '.'), PartitionedDatasetManager(templated_dataset_manager, get_partition_from_run_datetime_or_none(runtime))
15 changes: 10 additions & 5 deletions bigflow/bigquery/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def decorator(standard_component):


class InteractiveDatasetManager(Dataset):
"""Let's you run operations on a dataset, without the need of creating a component."""
"""Lets you run operations on a dataset, without the need of creating a component."""

def __init__(self,
project_id: str,
Expand All @@ -63,7 +63,8 @@ def __init__(self,
extras: Dict = None,
location: str = DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None):
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None):
self.config = DatasetConfigInternal(
project_id=project_id,
dataset_name=dataset_name,
Expand All @@ -73,7 +74,8 @@ def __init__(self,
extras=extras,
location=location,
tables_labels=tables_labels,
dataset_labels=dataset_labels)
dataset_labels=dataset_labels,
job_labels=job_labels)
logger.debug("Create InteractiveDatasetManager, config %s", self.config._as_dict())

def write_truncate(self, table_name, sql, partitioned=True):
Expand Down Expand Up @@ -488,7 +490,8 @@ def __init__(self,
extras=None,
location=DEFAULT_LOCATION,
tables_labels: Dict[str, Dict[str, str]] = None,
dataset_labels: Dict[str, str] = None
dataset_labels: Dict[str, str] = None,
job_labels: Dict[str, str] = None
):
self.project_id = project_id
self.dataset_name = dataset_name
Expand All @@ -499,6 +502,7 @@ def __init__(self,
self.location = location
self.tables_labels = tables_labels or {}
self.dataset_labels = dataset_labels or {}
self.job_labels = job_labels or {}

def _as_dict(self):
return {
Expand All @@ -510,7 +514,8 @@ def _as_dict(self):
'extras': self.extras,
'location': self.location,
'tables_labels': self.tables_labels,
'dataset_labels': self.dataset_labels
'dataset_labels': self.dataset_labels,
'job_labels': self.job_labels
}


Expand Down
2 changes: 2 additions & 0 deletions test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
'location': 'EU',
'tables_labels': {},
'dataset_labels': {},
'job_labels': {},
})

# and
Expand All @@ -42,6 +43,7 @@ def test_component(bigquery_dependency1, bigquery_dependency2):
'location': 'EU',
'tables_labels': {},
'dataset_labels': {},
'job_labels': {},
})

job = Job(component=test_component,
Expand Down
Loading