diff --git a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py index 8ed91424046f0..c9d1bb009fb4c 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py @@ -20,10 +20,12 @@ import copy import json import traceback +from collections.abc import Iterable from typing import TYPE_CHECKING, cast from airflow.providers.common.compat.openlineage.facet import ( ColumnLineageDatasetFacet, + DatasetFacet, ErrorMessageRunFacet, ExternalQueryRunFacet, Fields, @@ -32,13 +34,15 @@ OutputDataset, OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, - SchemaDatasetFacetFields, SQLJobFacet, ) from airflow.providers.google.cloud.openlineage.utils import ( BIGQUERY_NAMESPACE, BigQueryJobRunFacet, + get_facets_from_bq_table, get_from_nullable_chain, + get_identity_column_lineage_facet, + get_namespace_name_from_source_uris, merge_column_lineage_facets, ) @@ -100,7 +104,7 @@ def get_openlineage_facets_on_complete(self, _): if get_from_nullable_chain(job_properties, ["status", "state"]) != "DONE": raise ValueError(f"Trying to extract data from running bigquery job: `{self.job_id}`") - run_facets.update(self._get_run_facets(job_properties)) + run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(job_properties) if get_from_nullable_chain(job_properties, ["statistics", "numChildJobs"]): self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.") # type: ignore[attr-defined] @@ -108,12 +112,11 @@ def get_openlineage_facets_on_complete(self, _): # https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job for child_job_id in self._client.list_jobs(parent_job=self.job_id): child_job_properties = self._client.get_job(job_id=child_job_id)._properties # type: ignore - child_inputs, child_output = self._get_inputs_and_output(child_job_properties) + child_inputs, child_outputs = self._get_inputs_and_outputs(child_job_properties) inputs.extend(child_inputs) - outputs.append(child_output) + outputs.extend(child_outputs) else: - inputs, _output = self._get_inputs_and_output(job_properties) - outputs.append(_output) + inputs, outputs = self._get_inputs_and_outputs(job_properties) except Exception as e: self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) # type: ignore[attr-defined] @@ -128,33 +131,30 @@ def get_openlineage_facets_on_complete(self, _): ) return OperatorLineage( - inputs=inputs, + inputs=list(inputs), outputs=self._deduplicate_outputs(outputs), run_facets=run_facets, job_facets={"sql": SQLJobFacet(query=SQLParser.normalize_sql(self.sql))} if self.sql else {}, ) - def _get_run_facets(self, properties: dict) -> dict[str, RunFacet]: + def _get_inputs_and_outputs(self, properties: dict) -> tuple[list[InputDataset], list[OutputDataset]]: job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) - run_facets: dict[str, RunFacet] = {} if job_type == "QUERY": - run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(properties) - - return run_facets - - def _get_inputs_and_output(self, properties: dict) -> tuple[list[InputDataset], OutputDataset | None]: - job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) - - if job_type == "QUERY": - inputs, output = self._get_inputs_and_output_for_query_job(properties) + inputs, outputs = self._get_inputs_and_outputs_for_query_job(properties) + elif job_type == "LOAD": + inputs, outputs = self._get_inputs_and_outputs_for_load_job(properties) + elif job_type == "COPY": + inputs, outputs = self._get_inputs_and_outputs_for_copy_job(properties) + elif job_type == "EXTRACT": + inputs, outputs = self._get_inputs_and_outputs_for_extract_job(properties) else: self.log.debug("Unsupported job type for input/output extraction: `%s`.", job_type) # type: ignore[attr-defined] - inputs, output = [], None + inputs, outputs = [], [] - return inputs, output + return inputs, outputs - def _deduplicate_outputs(self, outputs: list[OutputDataset | None]) -> list[OutputDataset]: + def _deduplicate_outputs(self, outputs: Iterable[OutputDataset | None]) -> list[OutputDataset]: final_outputs = {} for single_output in outputs: if not single_output: @@ -199,92 +199,133 @@ def _get_dataset(self, table: dict, dataset_type: str) -> Dataset: table_name = table.get("tableId") dataset_name = f"{project}.{dataset}.{table_name}" - dataset_schema = self._get_table_schema_safely(dataset_name) + dataset_facets = self._get_table_facets_safely(dataset_name) if dataset_type == "input": # Logic specific to creating InputDataset (if needed) return InputDataset( namespace=BIGQUERY_NAMESPACE, name=dataset_name, - facets={ - "schema": dataset_schema, - } - if dataset_schema - else {}, + facets=dataset_facets, ) elif dataset_type == "output": # Logic specific to creating OutputDataset (if needed) return OutputDataset( namespace=BIGQUERY_NAMESPACE, name=dataset_name, - facets={ - "schema": dataset_schema, - } - if dataset_schema - else {}, + facets=dataset_facets, ) else: raise ValueError("Invalid dataset_type. Must be 'input' or 'output'") - def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet | None: + def _get_table_facets_safely(self, table_name: str) -> dict[str, DatasetFacet]: try: - return self._get_table_schema(table_name) + bq_table = self._client.get_table(table_name) + return get_facets_from_bq_table(bq_table) except Exception as e: - self.log.warning("Could not extract output schema from bigquery. %s", e) # type: ignore[attr-defined] - return None - - def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None: - bq_table = self._client.get_table(table) - - if not bq_table._properties: - return None + self.log.warning("Could not extract facets from bigquery table: `%s`. %s", table_name, e) # type: ignore[attr-defined] + return {} - fields = get_from_nullable_chain(bq_table._properties, ["schema", "fields"]) - if not fields: - return None - - return SchemaDatasetFacet( - fields=[ - SchemaDatasetFacetFields( - name=field.get("name"), - type=field.get("type"), - description=field.get("description"), - ) - for field in fields - ] - ) - - def _get_inputs_and_output_for_query_job( + def _get_inputs_and_outputs_for_query_job( self, properties: dict - ) -> tuple[list[InputDataset], OutputDataset | None]: + ) -> tuple[list[InputDataset], list[OutputDataset]]: input_tables = get_from_nullable_chain(properties, ["statistics", "query", "referencedTables"]) or [] output_table = get_from_nullable_chain(properties, ["configuration", "query", "destinationTable"]) inputs = [ - (self._get_input_dataset(input_table)) + self._get_input_dataset(input_table) for input_table in input_tables if input_table != output_table # Output table is in `referencedTables` and needs to be removed ] if not output_table: - return inputs, None + return inputs, [] output = self._get_output_dataset(output_table) - if dataset_stat_facet := self._get_statistics_dataset_facet(properties): + if dataset_stat_facet := self._get_output_statistics_dataset_facet(properties): output.outputFacets = output.outputFacets or {} output.outputFacets["outputStatistics"] = dataset_stat_facet - if cll_facet := self._get_column_level_lineage_facet(properties, output, inputs): + if cll_facet := self._get_column_level_lineage_facet_for_query_job(properties, output, inputs): output.facets = output.facets or {} output.facets["columnLineage"] = cll_facet - return inputs, output + return inputs, [output] + + def _get_inputs_and_outputs_for_load_job( + self, properties: dict + ) -> tuple[list[InputDataset], list[OutputDataset]]: + output = self._get_output_dataset(properties["configuration"]["load"]["destinationTable"]) + output_table_schema_facet = output.facets.get("schema") if output.facets else None + + source_uris = properties["configuration"]["load"]["sourceUris"] + inputs = [ + InputDataset( + namespace=namespace, + name=name, + facets={"schema": output_table_schema_facet} if output_table_schema_facet else {}, + ) + for namespace, name in get_namespace_name_from_source_uris(source_uris) + ] + + if dataset_stat_facet := self._get_output_statistics_dataset_facet(properties): + output.outputFacets = output.outputFacets or {} + output.outputFacets["outputStatistics"] = dataset_stat_facet + if cll_facet := get_identity_column_lineage_facet(self._extract_column_names(output), inputs): + output.facets = {**output.facets, **cll_facet} if output.facets else cll_facet + return inputs, [output] + + def _get_inputs_and_outputs_for_copy_job( + self, properties: dict + ) -> tuple[list[InputDataset], list[OutputDataset]]: + input_tables = get_from_nullable_chain(properties, ["configuration", "copy", "sourceTables"]) or [ + get_from_nullable_chain(properties, ["configuration", "copy", "sourceTable"]) + ] + inputs = [self._get_input_dataset(input_table) for input_table in input_tables] + + output = self._get_output_dataset(properties["configuration"]["copy"]["destinationTable"]) + if dataset_stat_facet := self._get_output_statistics_dataset_facet(properties): + output.outputFacets = output.outputFacets or {} + output.outputFacets["outputStatistics"] = dataset_stat_facet + if cll_facet := get_identity_column_lineage_facet(self._extract_column_names(output), inputs): + output.facets = {**output.facets, **cll_facet} if output.facets else cll_facet + return inputs, [output] + + def _get_inputs_and_outputs_for_extract_job( + self, properties: dict + ) -> tuple[list[InputDataset], list[OutputDataset]]: + source_table = get_from_nullable_chain(properties, ["configuration", "extract", "sourceTable"]) + input_dataset = self._get_input_dataset(source_table) if source_table else None + + destination_uris = get_from_nullable_chain( + properties, ["configuration", "extract", "destinationUris"] + ) or [get_from_nullable_chain(properties, ["configuration", "extract", "destinationUri"])] + + outputs = [] + for namespace, name in get_namespace_name_from_source_uris(destination_uris): + output_facets = {} + if input_dataset: + input_schema = input_dataset.facets.get("schema") if input_dataset.facets else None + if input_schema: + output_facets["schema"] = input_schema + if cll_facet := get_identity_column_lineage_facet( + self._extract_column_names(input_dataset), [input_dataset] + ): + output_facets = {**output_facets, **cll_facet} + outputs.append(OutputDataset(namespace=namespace, name=name, facets=output_facets)) + + inputs = [input_dataset] if input_dataset else [] + return inputs, outputs @staticmethod def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: - if get_from_nullable_chain(properties, ["configuration", "query", "query"]): - # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. - properties = copy.deepcopy(properties) - properties["configuration"]["query"].pop("query") - cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) - billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) + job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) + cache_hit, billed_bytes = None, None + if job_type == "QUERY": + if get_from_nullable_chain(properties, ["configuration", "query", "query"]): + # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. + properties = copy.deepcopy(properties) + properties["configuration"]["query"].pop("query") + cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) + billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) + return BigQueryJobRunFacet( cached=str(cache_hit).lower() == "true", billedBytes=int(billed_bytes) if billed_bytes else None, @@ -292,22 +333,32 @@ def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: ) @staticmethod - def _get_statistics_dataset_facet( + def _get_output_statistics_dataset_facet( properties, ) -> OutputStatisticsOutputDatasetFacet | None: - query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) - if not query_plan: - return None + job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) + out_rows, out_bytes = None, None + if job_type == "QUERY": + query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) + if not query_plan: # Without query plan there is no statistics + return None + out_stage = query_plan[-1] # Last stage of query plan writes the data and has all the statistics + out_rows = out_stage.get("recordsWritten", None) + out_bytes = out_stage.get("shuffleOutputBytes", None) + elif job_type == "LOAD": + out_rows = get_from_nullable_chain(properties, ["statistics", "load", "outputRows"]) + out_bytes = get_from_nullable_chain(properties, ["statistics", "load", "outputBytes"]) + elif job_type == "COPY": + out_rows = get_from_nullable_chain(properties, ["statistics", "copy", "copiedRows"]) + out_bytes = get_from_nullable_chain(properties, ["statistics", "copy", "copiedLogicalBytes"]) + # No statistics available for EXTRACT job type - out_stage = query_plan[-1] - out_rows = out_stage.get("recordsWritten", None) - out_bytes = out_stage.get("shuffleOutputBytes", None) if out_bytes and out_rows: return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows), size=int(out_bytes)) return None - def _get_column_level_lineage_facet( - self, properties: dict, output: OutputDataset, inputs: list[InputDataset] + def _get_column_level_lineage_facet_for_query_job( + self, properties: dict, output: OutputDataset, inputs: Iterable[InputDataset] ) -> ColumnLineageDatasetFacet | None: """ Extract column-level lineage information from a BigQuery job and return it as a facet. @@ -330,9 +381,13 @@ def _get_column_level_lineage_facet( # Extract SQL query and parse it self.log.debug("Extracting column-level lineage facet from BigQuery query.") # type: ignore[attr-defined] - query = get_from_nullable_chain(properties, ["configuration", "query", "query"]) or "" - parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(SQLParser.normalize_sql(query))) + query = get_from_nullable_chain(properties, ["configuration", "query", "query"]) + if query is None: + self.log.debug("No query found in BQ job configuration. Facet generation skipped.") # type: ignore[attr-defined] + return None + + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(SQLParser.normalize_sql(query))) if parse_result is None or parse_result.column_lineage == []: self.log.debug("No column-level lineage found in the SQL query. Facet generation skipped.") # type: ignore[attr-defined] return None diff --git a/providers/src/airflow/providers/google/cloud/openlineage/utils.py b/providers/src/airflow/providers/google/cloud/openlineage/utils.py index c6fbadba953ee..2e00ca327b65b 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/utils.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/utils.py @@ -21,15 +21,17 @@ import logging import os import pathlib +import re from collections import defaultdict +from collections.abc import Iterable from typing import TYPE_CHECKING, Any from attr import define, field from google.cloud.dataproc_v1 import Batch, RuntimeConfig from airflow.providers.common.compat.openlineage.facet import ( - BaseFacet, ColumnLineageDatasetFacet, + DatasetFacet, DocumentationDatasetFacet, Fields, Identifier, @@ -212,9 +214,9 @@ def extract_ds_name_from_gcs_path(path: str) -> str: return path -def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]: +def get_facets_from_bq_table(table: Table) -> dict[str, DatasetFacet]: """Get facets from BigQuery table object.""" - facets: dict[str, BaseFacet] = {} + facets: dict[str, DatasetFacet] = {} if table.schema: facets["schema"] = SchemaDatasetFacet( fields=[ @@ -228,26 +230,37 @@ def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]: facets["documentation"] = DocumentationDatasetFacet(description=table.description) if table.external_data_configuration: - symlinks = set() - for uri in table.external_data_configuration.source_uris: - if uri.startswith("gs://"): - bucket, blob = _parse_gcs_url(uri) - blob = extract_ds_name_from_gcs_path(blob) - symlinks.add((f"gs://{bucket}", blob)) - + symlinks = get_namespace_name_from_source_uris(table.external_data_configuration.source_uris) facets["symlink"] = SymlinksDatasetFacet( identifiers=[ - Identifier(namespace=namespace, name=name, type="file") + Identifier( + namespace=namespace, name=name, type="file" if namespace.startswith("gs://") else "table" + ) for namespace, name in sorted(symlinks) ] ) return facets +def get_namespace_name_from_source_uris(source_uris: Iterable[str]) -> set[tuple[str, str]]: + result = set() + for uri in source_uris: + if uri.startswith("gs://"): + bucket, blob = _parse_gcs_url(uri) + result.add((f"gs://{bucket}", extract_ds_name_from_gcs_path(blob))) + elif uri.startswith("https://googleapis.com/bigtable"): + regex = r"https://googleapis.com/bigtable/projects/([^/]+)/instances/([^/]+)(?:/appProfiles/([^/]+))?/tables/([^/]+)" + match = re.match(regex, uri) + if match: + project_id, instance_id, table_id = match.groups()[0], match.groups()[1], match.groups()[3] + result.add((f"bigtable://{project_id}/{instance_id}", table_id)) + return result + + def get_identity_column_lineage_facet( - dest_field_names: list[str], - input_datasets: list[Dataset], -) -> dict[str, ColumnLineageDatasetFacet]: + dest_field_names: Iterable[str], + input_datasets: Iterable[Dataset], +) -> dict[str, DatasetFacet]: """ Get column lineage facet for identity transformations. diff --git a/providers/tests/google/cloud/openlineage/test_mixins.py b/providers/tests/google/cloud/openlineage/test_mixins.py index 824da4b8447e8..770a2bf61e6c4 100644 --- a/providers/tests/google/cloud/openlineage/test_mixins.py +++ b/providers/tests/google/cloud/openlineage/test_mixins.py @@ -20,13 +20,15 @@ import json import logging import os -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest +from google.cloud.bigquery.table import Table from airflow.providers.common.compat.openlineage.facet import ( ColumnLineageDatasetFacet, Dataset, + DocumentationDatasetFacet, ExternalQueryRunFacet, Fields, InputDataset, @@ -110,22 +112,12 @@ def read_common_json_file(rel: str): return json.load(f) -class TableMock(MagicMock): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.inputs = [ - read_common_json_file("table_details.json"), - read_common_json_file("out_table_details.json"), - ] - - @property - def _properties(self): - return self.inputs.pop() - - class TestBigQueryOpenLineageMixin: def setup_method(self): - self.job_details = read_common_json_file("job_details.json") + self.copy_job_details = read_common_json_file("copy_job_details.json") + self.extract_job_details = read_common_json_file("extract_job_details.json") + self.load_job_details = read_common_json_file("load_job_details.json") + self.query_job_details = read_common_json_file("query_job_details.json") self.script_job_details = read_common_json_file("script_job_details.json") hook = MagicMock() self.client = MagicMock() @@ -143,19 +135,22 @@ def hook(self): hook.get_client.return_value = self.client - self.client.get_table.return_value = TableMock() - self.operator = BQOperator() + self.operator._client = self.client - def test_bq_job_information(self): - self.client.get_job.return_value._properties = self.job_details + def test_get_openlineage_facets_on_complete_query_job(self): + self.client.get_job.return_value._properties = self.query_job_details + self.client.get_table.side_effect = [ + Table.from_api_repr(read_common_json_file("table_details.json")), + Table.from_api_repr(read_common_json_file("out_table_details.json")), + ] lineage = self.operator.get_openlineage_facets_on_complete(None) - self.job_details["configuration"]["query"].pop("query") + self.query_job_details["configuration"]["query"].pop("query") assert lineage.run_facets == { "bigQueryJob": BigQueryJobRunFacet( - cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) + cached=False, billedBytes=111149056, properties=json.dumps(self.query_job_details) ), "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), } @@ -174,7 +169,10 @@ def test_bq_job_information(self): "number", "INTEGER", "Number of occurrences of the name" ), ] - ) + ), + "documentation": DocumentationDatasetFacet( + "The table contains the number of applicants for a Social Security card by year of birth and sex." + ), }, ) ] @@ -187,13 +185,226 @@ def test_bq_job_information(self): rowCount=20, size=321, fileCount=None ) }, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ), + }, ), ] - def test_bq_script_job_information(self): + def test_get_openlineage_facets_on_complete_copy_job(self): + self.client.get_job.return_value._properties = self.copy_job_details + self.client.get_table.return_value = Table.from_api_repr(read_common_json_file("table_details.json")) + expected_facets = { + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("state", "STRING", "2-digit state code"), + SchemaDatasetFacetFields("gender", "STRING", "Sex (M=male or F=female)"), + SchemaDatasetFacetFields("year", "INTEGER", "4-digit year of birth"), + SchemaDatasetFacetFields("name", "STRING", "Given name of a person at birth"), + SchemaDatasetFacetFields("number", "INTEGER", "Number of occurrences of the name"), + ] + ), + "documentation": DocumentationDatasetFacet( + "The table contains the number of applicants for a Social Security card by year of birth and sex." + ), + } + + lineage = self.operator.get_openlineage_facets_on_complete(None) + + assert lineage.run_facets == { + "bigQueryJob": BigQueryJobRunFacet( + cached=False, billedBytes=None, properties=json.dumps(self.copy_job_details) + ), + "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), + } + assert lineage.inputs == [ + InputDataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.copy_job_source", + facets=expected_facets, + ), + InputDataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.copy_job_source2", + facets=expected_facets, + ), + ] + assert lineage.outputs == [ + OutputDataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.copy_job_result", + outputFacets={ + "outputStatistics": OutputStatisticsOutputDatasetFacet( + rowCount=20, size=3800, fileCount=None + ) + }, + facets={ + **expected_facets, + "columnLineage": ColumnLineageDatasetFacet( + fields={ + col: Fields( + inputFields=[ + InputField( + "bigquery", "airflow-openlineage.new_dataset.copy_job_source", col + ), + InputField( + "bigquery", "airflow-openlineage.new_dataset.copy_job_source2", col + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ) + for col in ["state", "gender", "year", "name", "number"] + } + ), + }, + ), + ] + + def test_get_openlineage_facets_on_complete_load_job(self): + self.client.get_job.return_value._properties = self.load_job_details + self.client.get_table.return_value = Table.from_api_repr( + read_common_json_file("out_table_details.json") + ) + + lineage = self.operator.get_openlineage_facets_on_complete(None) + + assert lineage.run_facets == { + "bigQueryJob": BigQueryJobRunFacet( + cached=False, billedBytes=None, properties=json.dumps(self.load_job_details) + ), + "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), + } + assert lineage.inputs == [ + InputDataset( + namespace="gs://airflow-openlineage", + name="/", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ) + }, + ), + ] + assert lineage.outputs == [ + OutputDataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.job_load", + outputFacets={ + "outputStatistics": OutputStatisticsOutputDatasetFacet( + rowCount=10, size=546, fileCount=None + ) + }, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "name": Fields( + inputFields=[InputField("gs://airflow-openlineage", "/", "name")], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "total_people": Fields( + inputFields=[InputField("gs://airflow-openlineage", "/", "total_people")], + transformationType="IDENTITY", + transformationDescription="identical", + ), + } + ), + }, + ), + ] + + def test_get_openlineage_facets_on_complete_extract_job(self): + self.client.get_job.return_value._properties = self.extract_job_details + self.client.get_table.return_value = Table.from_api_repr( + read_common_json_file("out_table_details.json") + ) + + lineage = self.operator.get_openlineage_facets_on_complete(None) + + assert lineage.run_facets == { + "bigQueryJob": BigQueryJobRunFacet( + cached=False, billedBytes=None, properties=json.dumps(self.extract_job_details) + ), + "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), + } + assert lineage.inputs == [ + InputDataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.extract_job_source", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ) + }, + ), + ] + assert lineage.outputs == [ + OutputDataset( + namespace="gs://airflow-openlineage", + name="extract_job_source", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "name": Fields( + inputFields=[ + InputField( + "bigquery", + "airflow-openlineage.new_dataset.extract_job_source", + "name", + ) + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "total_people": Fields( + inputFields=[ + InputField( + "bigquery", + "airflow-openlineage.new_dataset.extract_job_source", + "total_people", + ) + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + } + ), + }, + ), + ] + + def test_get_openlineage_facets_on_complete_script_job(self): self.client.get_job.side_effect = [ MagicMock(_properties=self.script_job_details), - MagicMock(_properties=self.job_details), + MagicMock(_properties=self.query_job_details), + ] + self.client.get_table.side_effect = [ + Table.from_api_repr(read_common_json_file("table_details.json")), + Table.from_api_repr(read_common_json_file("out_table_details.json")), ] self.client.list_jobs.return_value = ["child_job_id"] @@ -221,7 +432,10 @@ def test_bq_script_job_information(self): "number", "INTEGER", "Number of occurrences of the name" ), ] - ) + ), + "documentation": DocumentationDatasetFacet( + "The table contains the number of applicants for a Social Security card by year of birth and sex." + ), }, ) ] @@ -234,6 +448,14 @@ def test_bq_script_job_information(self): rowCount=20, size=321, fileCount=None ) }, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("name", "STRING"), + SchemaDatasetFacetFields("total_people", "INTEGER"), + ] + ), + }, ), ] @@ -367,11 +589,63 @@ def test_deduplicate_outputs_with_cll(self): } ) + @patch("airflow.providers.google.cloud.openlineage.mixins.get_facets_from_bq_table") + def test_get_table_facets_safely(self, mock_get_facets): + mock_get_facets.return_value = {"some": "facets"} + result = self.operator._get_table_facets_safely("some_table") + assert result == {"some": "facets"} + + @patch("airflow.providers.google.cloud.openlineage.mixins.get_facets_from_bq_table") + def test_get_table_facets_safely_empty_when_error(self, mock_get_facets): + mock_get_facets.side_effect = ValueError("Some error") + result = self.operator._get_table_facets_safely("some_table") + assert result == {} + + @patch("airflow.providers.google.cloud.openlineage.mixins.get_facets_from_bq_table") + def test_get_dataset(self, mock_get_facets): + mock_get_facets.return_value = {"some": "facets"} + table_ref = {"projectId": "p", "datasetId": "d", "tableId": "t"} + input_result = self.operator._get_dataset(table_ref, "input") + assert input_result == InputDataset("bigquery", "p.d.t", facets={"some": "facets"}) + + output_result = self.operator._get_dataset(table_ref, "output") + assert output_result == OutputDataset("bigquery", "p.d.t", facets={"some": "facets"}) + + with pytest.raises(ValueError): + self.operator._get_dataset(table_ref, "wrong") + + @patch("airflow.providers.google.cloud.openlineage.mixins.get_facets_from_bq_table") + def test_get_input_dataset(self, mock_get_facets): + mock_get_facets.return_value = {"some": "facets"} + table_ref = {"projectId": "p", "datasetId": "d", "tableId": "t"} + expected_result = self.operator._get_dataset(table_ref, "input") + result = self.operator._get_input_dataset(table_ref) + assert result == expected_result + + @patch("airflow.providers.google.cloud.openlineage.mixins.get_facets_from_bq_table") + def test_get_output_dataset(self, mock_get_facets): + mock_get_facets.return_value = {"some": "facets"} + table_ref = {"projectId": "p", "datasetId": "d", "tableId": "t"} + expected_result = self.operator._get_dataset(table_ref, "output") + result = self.operator._get_output_dataset(table_ref) + assert result == expected_result + + @pytest.mark.parametrize("job_type", ("LOAD", "COPY", "EXTRACT")) + def test_get_bigquery_job_run_facet_non_query_jobs(self, job_type): + properties = { + "statistics": {"some": "stats"}, + "configuration": {"jobType": job_type}, + } + result = self.operator._get_bigquery_job_run_facet(properties) + assert result.cached is False + assert result.billedBytes is None + assert result.properties == json.dumps(properties) + @pytest.mark.parametrize("cache", (None, "false", False, 0)) - def test_get_job_run_facet_no_cache_and_with_bytes(self, cache): + def test_get_bigquery_job_run_facet_query_no_cache_and_with_bytes(self, cache): properties = { "statistics": {"query": {"cacheHit": cache, "totalBytesBilled": 10}}, - "configuration": {"query": {"query": "SELECT ..."}}, + "configuration": {"query": {"query": "SELECT ..."}, "jobType": "QUERY"}, } result = self.operator._get_bigquery_job_run_facet(properties) assert result.cached is False @@ -380,14 +654,14 @@ def test_get_job_run_facet_no_cache_and_with_bytes(self, cache): assert result.properties == json.dumps(properties) @pytest.mark.parametrize("cache", ("true", True)) - def test_get_job_run_facet_with_cache_and_no_bytes(self, cache): + def test_get_bigquery_job_run_facet_query_with_cache_and_no_bytes(self, cache): properties = { "statistics": { "query": { "cacheHit": cache, } }, - "configuration": {"query": {"query": "SELECT ..."}}, + "configuration": {"query": {"query": "SELECT ..."}, "jobType": "QUERY"}, } result = self.operator._get_bigquery_job_run_facet(properties) assert result.cached is True @@ -395,23 +669,23 @@ def test_get_job_run_facet_with_cache_and_no_bytes(self, cache): properties["configuration"]["query"].pop("query") assert result.properties == json.dumps(properties) - def test_get_statistics_dataset_facet_no_query_plan(self): + def test_get_output_statistics_dataset_facet_query_no_query_plan(self): properties = { "statistics": {"query": {"totalBytesBilled": 10}}, - "configuration": {"query": {"query": "SELECT ..."}}, + "configuration": {"query": {"query": "SELECT ..."}, "jobType": "QUERY"}, } - result = self.operator._get_statistics_dataset_facet(properties) + result = self.operator._get_output_statistics_dataset_facet(properties) assert result is None - def test_get_statistics_dataset_facet_no_stats(self): + def test_get_output_statistics_dataset_facet_query_no_stats(self): properties = { "statistics": {"query": {"totalBytesBilled": 10, "queryPlan": [{"test": "test"}]}}, - "configuration": {"query": {"query": "SELECT ..."}}, + "configuration": {"query": {"query": "SELECT ..."}, "jobType": "QUERY"}, } - result = self.operator._get_statistics_dataset_facet(properties) + result = self.operator._get_output_statistics_dataset_facet(properties) assert result is None - def test_get_statistics_dataset_facet_with_stats(self): + def test_get_output_statistics_dataset_facet_query(self): properties = { "statistics": { "query": { @@ -419,14 +693,42 @@ def test_get_statistics_dataset_facet_with_stats(self): "queryPlan": [{"recordsWritten": 123, "shuffleOutputBytes": "321"}], } }, - "configuration": {"query": {"query": "SELECT ..."}}, + "configuration": {"query": {"query": "SELECT ..."}, "jobType": "QUERY"}, } - result = self.operator._get_statistics_dataset_facet(properties) + result = self.operator._get_output_statistics_dataset_facet(properties) + assert result.rowCount == 123 + assert result.size == 321 + + def test_get_output_statistics_dataset_facet_copy(self): + properties = { + "statistics": { + "copy": { + "copiedRows": 123, + "copiedLogicalBytes": 321, + } + }, + "configuration": {"jobType": "COPY"}, + } + result = self.operator._get_output_statistics_dataset_facet(properties) + assert result.rowCount == 123 + assert result.size == 321 + + def test_get_output_statistics_dataset_facet_load(self): + properties = { + "statistics": { + "load": { + "outputRows": 123, + "outputBytes": 321, + } + }, + "configuration": {"jobType": "LOAD"}, + } + result = self.operator._get_output_statistics_dataset_facet(properties) assert result.rowCount == 123 assert result.size == 321 def test_get_column_level_lineage_facet(self): - result = self.operator._get_column_level_lineage_facet( + result = self.operator._get_column_level_lineage_facet_for_query_job( QUERY_JOB_PROPERTIES, OUTPUT_DATASET, INPUT_DATASETS ) assert result == ColumnLineageDatasetFacet( @@ -446,15 +748,23 @@ def test_get_column_level_lineage_facet(self): def test_get_column_level_lineage_facet_early_exit_empty_cll_from_parser(self): properties = {"configuration": {"query": {"query": "SELECT 1"}}} assert ( - self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + self.operator._get_column_level_lineage_facet_for_query_job( + properties, OUTPUT_DATASET, INPUT_DATASETS + ) + is None + ) + assert ( + self.operator._get_column_level_lineage_facet_for_query_job({}, OUTPUT_DATASET, INPUT_DATASETS) + is None ) - assert self.operator._get_column_level_lineage_facet({}, OUTPUT_DATASET, INPUT_DATASETS) is None def test_get_column_level_lineage_facet_early_exit_output_table_id_mismatch(self): output = copy.deepcopy(OUTPUT_DATASET) output.name = "different.name.table" assert ( - self.operator._get_column_level_lineage_facet(QUERY_JOB_PROPERTIES, output, INPUT_DATASETS) + self.operator._get_column_level_lineage_facet_for_query_job( + QUERY_JOB_PROPERTIES, output, INPUT_DATASETS + ) is None ) @@ -464,7 +774,9 @@ def test_get_column_level_lineage_facet_early_exit_output_columns_mismatch(self) SchemaDatasetFacetFields("different_col", "STRING"), ] assert ( - self.operator._get_column_level_lineage_facet(QUERY_JOB_PROPERTIES, output, INPUT_DATASETS) + self.operator._get_column_level_lineage_facet_for_query_job( + QUERY_JOB_PROPERTIES, output, INPUT_DATASETS + ) is None ) @@ -480,7 +792,10 @@ def test_get_column_level_lineage_facet_early_exit_wrong_parsed_input_tables(sel } } assert ( - self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + self.operator._get_column_level_lineage_facet_for_query_job( + properties, OUTPUT_DATASET, INPUT_DATASETS + ) + is None ) def test_get_column_level_lineage_facet_early_exit_wrong_parsed_input_columns(self): @@ -495,7 +810,10 @@ def test_get_column_level_lineage_facet_early_exit_wrong_parsed_input_columns(se } } assert ( - self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + self.operator._get_column_level_lineage_facet_for_query_job( + properties, OUTPUT_DATASET, INPUT_DATASETS + ) + is None ) def test_get_qualified_name_from_parse_result(self): diff --git a/providers/tests/google/cloud/openlineage/test_utils.py b/providers/tests/google/cloud/openlineage/test_utils.py index b5e451debe5fa..8fa9c90e0e71f 100644 --- a/providers/tests/google/cloud/openlineage/test_utils.py +++ b/providers/tests/google/cloud/openlineage/test_utils.py @@ -46,6 +46,7 @@ extract_ds_name_from_gcs_path, get_facets_from_bq_table, get_identity_column_lineage_facet, + get_namespace_name_from_source_uris, inject_openlineage_properties_into_dataproc_batch, inject_openlineage_properties_into_dataproc_job, inject_openlineage_properties_into_dataproc_workflow_template, @@ -832,6 +833,34 @@ def test_inject_openlineage_properties_into_dataproc_batch(mock_is_ol_accessible assert result == expected_batch +@pytest.mark.parametrize( + "input_uris, expected_output", + [ + (["gs://bucket/blob"], {("gs://bucket", "/")}), + (["gs://bucket/blob/*"], {("gs://bucket", "blob")}), + ( + [ + "https://googleapis.com/bigtable/projects/project/instances/instance/appProfiles/profile/tables/table", + "https://googleapis.com/bigtable/projects/project/instances/instance/tables/table", + ], + {("bigtable://project/instance", "table"), ("bigtable://project/instance", "table")}, + ), + ( + [ + "gs://bucket/blob", + "https://googleapis.com/bigtable/projects/project/instances/instance/tables/table", + "invalid_uri", + ], + {("gs://bucket", "/"), ("bigtable://project/instance", "table")}, + ), + ([], set()), + (["invalid_uri"], set()), + ], +) +def test_get_namespace_name_from_source_uris(input_uris, expected_output): + assert get_namespace_name_from_source_uris(input_uris) == expected_output + + @patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible") def test_inject_openlineage_properties_into_dataproc_workflow_template_provider_not_accessible( mock_is_accessible, diff --git a/providers/tests/google/cloud/operators/test_bigquery.py b/providers/tests/google/cloud/operators/test_bigquery.py index ab89443a69e75..2f7d5ef57df05 100644 --- a/providers/tests/google/cloud/operators/test_bigquery.py +++ b/providers/tests/google/cloud/operators/test_bigquery.py @@ -1603,7 +1603,7 @@ def test_execute_openlineage_events(self, mock_hook): assert result == real_job_id - with open(os.path.dirname(__file__) + "/../utils/job_details.json") as f: + with open(os.path.dirname(__file__) + "/../utils/query_job_details.json") as f: job_details = json.loads(f.read()) mock_hook.return_value.get_client.return_value.get_job.return_value._properties = job_details mock_hook.return_value.get_client.return_value.get_table.side_effect = Exception() diff --git a/providers/tests/google/cloud/utils/copy_job_details.json b/providers/tests/google/cloud/utils/copy_job_details.json new file mode 100644 index 0000000000000..5ae953957ac38 --- /dev/null +++ b/providers/tests/google/cloud/utils/copy_job_details.json @@ -0,0 +1,52 @@ +{ + "kind": "bigquery#job", + "etag": "AbKXChCUwE1fgklx3toVRA==", + "id": "job_5f02bb13d96f64be13effec4ad23ee66", + "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/airflow-openlineage/jobs/job_5f02bb13d96f64be13effec4ad23ee66?location=EU", + "user_email": "svc-account@airflow-openlineage.iam.gserviceaccount.com", + "configuration": { + "copy": { + "sourceTables": [ + { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "copy_job_source" + }, + { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "copy_job_source2" + } + ], + "destinationTable": { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "copy_job_result" + }, + "writeDisposition": "WRITE_TRUNCATE", + "operationType": "COPY" + }, + "jobType": "COPY" + }, + "jobReference": { + "projectId": "airflow-openlineage", + "jobId": "job_5f02bb13d96f64be13effec4ad23ee66", + "location": "EU" + }, + "statistics": { + "creationTime": 1734541406502.0, + "startTime": 1734541406646.0, + "endTime": 1734541408159.0, + "copy": { + "copiedRows": "20", + "copiedLogicalBytes": "3800" + } + }, + "status": { + "state": "DONE" + }, + "principal_subject": "serviceAccount:svc-account@airflow-openlineage.iam.gserviceaccount.com", + "jobCreationReason": { + "code": "REQUESTED" + } +} diff --git a/providers/tests/google/cloud/utils/extract_job_details.json b/providers/tests/google/cloud/utils/extract_job_details.json new file mode 100644 index 0000000000000..bcb8c918dc116 --- /dev/null +++ b/providers/tests/google/cloud/utils/extract_job_details.json @@ -0,0 +1,63 @@ +{ + "kind": "bigquery#job", + "etag": "SCVmMFN/kToRoL/iZXznhg==", + "id": "job_5f3c370056c5ae9414ef45e7c9219f68", + "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/airflow-openlineage/jobs/job_5f3c370056c5ae9414ef45e7c9219f68?location=EU", + "user_email": "svc-account@airflow-openlineage.iam.gserviceaccount.com", + "configuration": { + "extract": { + "sourceTable": { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "extract_job_source" + }, + "destinationUri": "gs://airflow-openlineage/extract_job_source/pre*.csv", + "destinationUris": [ + "gs://airflow-openlineage/extract_job_source/pre*.csv" + ], + "destinationFormat": "CSV" + }, + "jobType": "EXTRACT" + }, + "jobReference": { + "projectId": "airflow-openlineage", + "jobId": "job_5f3c370056c5ae9414ef45e7c9219f68", + "location": "EU" + }, + "statistics": { + "creationTime": 1734541420811.0, + "startTime": 1734541420897.0, + "endTime": 1734541421686.0, + "query": { + "totalSlotMs": "958", + "performanceInsights": { + "avgPreviousExecutionMs": "338" + } + }, + "extract": { + "destinationUriFileCounts": [ + "2" + ], + "inputBytes": "1092", + "timeline": [ + { + "elapsedMs": "494", + "totalSlotMs": "958", + "pendingUnits": "0", + "completedUnits": "2", + "estimatedRunnableUnits": "0" + } + ] + }, + "totalSlotMs": "958", + "reservation_id": "default-pipeline", + "finalExecutionDurationMs": "388" + }, + "status": { + "state": "DONE" + }, + "principal_subject": "serviceAccount:svc-account@airflow-openlineage.iam.gserviceaccount.com", + "jobCreationReason": { + "code": "REQUESTED" + } +} diff --git a/providers/tests/google/cloud/utils/load_job_details.json b/providers/tests/google/cloud/utils/load_job_details.json new file mode 100644 index 0000000000000..6fff5120da845 --- /dev/null +++ b/providers/tests/google/cloud/utils/load_job_details.json @@ -0,0 +1,87 @@ +{ + "kind": "bigquery#job", + "etag": "PJw7+a1NbcFIk85B7OCePA==", + "id": "job_f94357cf9ddbab1dd8d084a74cf525b3", + "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/airflow-openlineage/jobs/job_f94357cf9ddbab1dd8d084a74cf525b3?location=EU", + "user_email": "svc-account@airflow-openlineage.iam.gserviceaccount.com", + "configuration": { + "load": { + "sourceUris": [ + "gs://airflow-openlineage/*.csv" + ], + "schema": { + "fields": [ + { + "name": "name", + "type": "STRING" + }, + { + "name": "age", + "type": "INTEGER" + }, + { + "name": "email", + "type": "STRING" + }, + { + "name": "country", + "type": "STRING" + } + ] + }, + "destinationTable": { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "job_load" + }, + "writeDisposition": "WRITE_TRUNCATE", + "skipLeadingRows": 1, + "sourceFormat": "CSV" + }, + "jobType": "LOAD" + }, + "jobReference": { + "projectId": "airflow-openlineage", + "jobId": "job_f94357cf9ddbab1dd8d084a74cf525b3", + "location": "EU" + }, + "statistics": { + "creationTime": 1734541391943.0, + "startTime": 1734541392059.0, + "endTime": 1734541393953.0, + "completionRatio": 1, + "load": { + "inputFiles": "2", + "inputFileBytes": "512", + "outputRows": "10", + "outputBytes": "546", + "badRecords": "0", + "timeline": [ + { + "elapsedMs": "1044", + "totalSlotMs": "196", + "pendingUnits": "0", + "completedUnits": "1", + "activeUnits": "0" + }, + { + "elapsedMs": "853", + "totalSlotMs": "196", + "pendingUnits": "0", + "completedUnits": "1", + "estimatedRunnableUnits": "0" + } + ] + }, + "totalSlotMs": "196", + "reservation_id": "default-pipeline", + "finalExecutionDurationMs": "843" + }, + "status": { + "state": "DONE" + }, + "principal_subject": "serviceAccount:svc-account@airflow-openlineage.iam.gserviceaccount.com", + "jobCreationReason": { + "code": "REQUESTED" + } +} diff --git a/providers/tests/google/cloud/utils/job_details.json b/providers/tests/google/cloud/utils/query_job_details.json similarity index 100% rename from providers/tests/google/cloud/utils/job_details.json rename to providers/tests/google/cloud/utils/query_job_details.json diff --git a/providers/tests/google/cloud/utils/table_details.json b/providers/tests/google/cloud/utils/table_details.json index 7904c913b48b4..696ed98765784 100644 --- a/providers/tests/google/cloud/utils/table_details.json +++ b/providers/tests/google/cloud/utils/table_details.json @@ -8,7 +8,7 @@ "datasetId": "usa_names", "tableId": "usa_1910_2013" }, - "description": "The table contains the number of applicants for a Social Security card by year of birth and sex. The number of such applicants is restricted to U.S. births where the year of birth, sex, State of birth (50 States and District of Columbia) are known, and where the given name is at least 2 characters long.\n\nsource: http://www.ssa.gov/OACT/babynames/limits.html", + "description": "The table contains the number of applicants for a Social Security card by year of birth and sex.", "schema": { "fields": [ {