From 3562abbbe6415b46e4ad46d00853424af26dff61 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Fri, 20 Oct 2023 10:12:33 -0300 Subject: [PATCH 1/4] feat: add table description and automatically list projects --- pipelines/rj_escritorio/data_catalog/flows.py | 7 ++-- pipelines/rj_escritorio/data_catalog/tasks.py | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_escritorio/data_catalog/flows.py b/pipelines/rj_escritorio/data_catalog/flows.py index 07fad2d91..fd5f13654 100644 --- a/pipelines/rj_escritorio/data_catalog/flows.py +++ b/pipelines/rj_escritorio/data_catalog/flows.py @@ -11,6 +11,7 @@ from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule from pipelines.rj_escritorio.data_catalog.tasks import ( generate_dataframe_from_list_of_tables, + list_projects, list_tables, merge_list_of_list_of_tables, update_gsheets_data_catalog, @@ -28,18 +29,16 @@ ], ) as rj_escritorio_data_catalog_flow: # Parameters - project_ids = Parameter("project_ids") spreadsheet_url = Parameter("spreadsheet_url") sheet_name = Parameter("sheet_name") bq_client_mode = Parameter("bq_client_mode", default="prod") + exclude_dev_projects = Parameter("exclude_dev_projects", default=True) exclude_staging = Parameter("exclude_staging", default=True) exclude_test = Parameter("exclude_test", default=True) exclude_logs = Parameter("exclude_logs", default=True) # Flow - project_ids = parse_comma_separated_string_to_list( - input_text=project_ids, output_type=str - ) + project_ids = list_projects(mode=bq_client_mode, exclude_dev=exclude_dev_projects) list_of_list_of_tables = list_tables.map( project_id=project_ids, mode=unmapped(bq_client_mode), diff --git a/pipelines/rj_escritorio/data_catalog/tasks.py b/pipelines/rj_escritorio/data_catalog/tasks.py index b124748ba..d1ce8a991 100644 --- a/pipelines/rj_escritorio/data_catalog/tasks.py +++ b/pipelines/rj_escritorio/data_catalog/tasks.py @@ -3,7 +3,10 @@ """ Tasks for generating a data catalog from BigQuery. """ +from typing import List + from google.cloud import bigquery +from googleapiclient import discovery import gspread import pandas as pd from prefect import task @@ -15,6 +18,41 @@ from pipelines.utils.utils import get_credentials_from_env, log +@task +def list_projects( + mode: str = "prod", + exclude_dev: bool = True, +) -> List[str]: + """ + Lists all GCP projects that we have access to. + + Args: + mode: Credentials mode. + exclude_dev: Exclude projects that ends with "-dev". + + Returns: + List of project IDs. + """ + credentials = get_credentials_from_env(mode=mode) + service = discovery.build("cloudresourcemanager", "v1", credentials=credentials) + request = service.projects().list() + projects = [] + while request is not None: + response = request.execute() + for project in response.get("projects", []): + project_id = project["projectId"] + if exclude_dev and project_id.endswith("-dev"): + log(f"Excluding dev project {project_id}.") + continue + log(f"Found project {project_id}.") + projects.append(project_id) + request = service.projects().list_next( + previous_request=request, previous_response=response + ) + log(f"Found {len(projects)} projects.") + return projects + + @task def list_tables( # pylint: disable=too-many-arguments project_id: str, @@ -68,10 +106,12 @@ def list_tables( # pylint: disable=too-many-arguments if exclude_test and "test" in table_id: log(f"Excluding test table {table_id}.") continue + table_description = table.description table_info = { "project_id": project_id, "dataset_id": dataset_id, "table_id": table_id, + "description": table_description, "url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", "private": not project_id == "datario", } From b95b3566563f2089a8c1163addfa3f3e1aa8a6ee Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Fri, 20 Oct 2023 10:41:12 -0300 Subject: [PATCH 2/4] fix: get table object for querying description --- pipelines/rj_escritorio/data_catalog/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/data_catalog/tasks.py b/pipelines/rj_escritorio/data_catalog/tasks.py index d1ce8a991..415158399 100644 --- a/pipelines/rj_escritorio/data_catalog/tasks.py +++ b/pipelines/rj_escritorio/data_catalog/tasks.py @@ -103,10 +103,11 @@ def list_tables( # pylint: disable=too-many-arguments continue for table in client.list_tables(dataset): table_id = table.table_id + table_object = client.get_table(table.reference) if exclude_test and "test" in table_id: log(f"Excluding test table {table_id}.") continue - table_description = table.description + table_description = table_object.description table_info = { "project_id": project_id, "dataset_id": dataset_id, From f26ce3427e96cfb84384d77a4984cef7c6e08045 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Fri, 20 Oct 2023 11:34:02 -0300 Subject: [PATCH 3/4] fix: ignore projects when bigquery api is not enabled --- pipelines/rj_escritorio/data_catalog/tasks.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/data_catalog/tasks.py b/pipelines/rj_escritorio/data_catalog/tasks.py index 415158399..ba71132de 100644 --- a/pipelines/rj_escritorio/data_catalog/tasks.py +++ b/pipelines/rj_escritorio/data_catalog/tasks.py @@ -5,6 +5,7 @@ """ from typing import List +from google.api_core.exceptions import NotFound from google.cloud import bigquery from googleapiclient import discovery import gspread @@ -88,7 +89,13 @@ def list_tables( # pylint: disable=too-many-arguments client = get_bigquery_client(mode=mode) log(f"Listing tables in project {project_id}.") tables = [] - for dataset in client.list_datasets(project=project_id): + try: + datasets = client.list_datasets(project=project_id) + except NotFound: + # This will happen if BigQuery API is not enabled for this project. Just return an empty + # list + return tables + for dataset in datasets: dataset_id: str = dataset.dataset_id if exclude_staging and dataset_id.endswith("_staging"): log(f"Excluding staging dataset {dataset_id}.") From 511aab7aaf1f3019b9d1434c53f5b87758630fd5 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Fri, 20 Oct 2023 11:53:44 -0300 Subject: [PATCH 4/4] fix: ignore projects when bigquery api is not enabled --- pipelines/rj_escritorio/data_catalog/tasks.py | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/pipelines/rj_escritorio/data_catalog/tasks.py b/pipelines/rj_escritorio/data_catalog/tasks.py index ba71132de..6b7951fae 100644 --- a/pipelines/rj_escritorio/data_catalog/tasks.py +++ b/pipelines/rj_escritorio/data_catalog/tasks.py @@ -91,39 +91,39 @@ def list_tables( # pylint: disable=too-many-arguments tables = [] try: datasets = client.list_datasets(project=project_id) + for dataset in datasets: + dataset_id: str = dataset.dataset_id + if exclude_staging and dataset_id.endswith("_staging"): + log(f"Excluding staging dataset {dataset_id}.") + continue + if exclude_test and "test" in dataset_id: + log(f"Excluding test dataset {dataset_id}.") + continue + if exclude_logs and ( + dataset_id.startswith("logs_") or dataset_id.endswith("_logs") + ): + log(f"Excluding logs dataset {dataset_id}.") + continue + for table in client.list_tables(dataset): + table_id = table.table_id + table_object = client.get_table(table.reference) + if exclude_test and "test" in table_id: + log(f"Excluding test table {table_id}.") + continue + table_description = table_object.description + table_info = { + "project_id": project_id, + "dataset_id": dataset_id, + "table_id": table_id, + "description": table_description, + "url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", + "private": not project_id == "datario", + } + tables.append(table_info) except NotFound: # This will happen if BigQuery API is not enabled for this project. Just return an empty # list return tables - for dataset in datasets: - dataset_id: str = dataset.dataset_id - if exclude_staging and dataset_id.endswith("_staging"): - log(f"Excluding staging dataset {dataset_id}.") - continue - if exclude_test and "test" in dataset_id: - log(f"Excluding test dataset {dataset_id}.") - continue - if exclude_logs and ( - dataset_id.startswith("logs_") or dataset_id.endswith("_logs") - ): - log(f"Excluding logs dataset {dataset_id}.") - continue - for table in client.list_tables(dataset): - table_id = table.table_id - table_object = client.get_table(table.reference) - if exclude_test and "test" in table_id: - log(f"Excluding test table {table_id}.") - continue - table_description = table_object.description - table_info = { - "project_id": project_id, - "dataset_id": dataset_id, - "table_id": table_id, - "description": table_description, - "url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", - "private": not project_id == "datario", - } - tables.append(table_info) log(f"Found {len(tables)} tables in project {project_id}.") return tables