From 3562abbbe6415b46e4ad46d00853424af26dff61 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Fri, 20 Oct 2023 10:12:33 -0300 Subject: [PATCH] feat: add table description and automatically list projects --- pipelines/rj_escritorio/data_catalog/flows.py | 7 ++-- pipelines/rj_escritorio/data_catalog/tasks.py | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_escritorio/data_catalog/flows.py b/pipelines/rj_escritorio/data_catalog/flows.py index 07fad2d91..fd5f13654 100644 --- a/pipelines/rj_escritorio/data_catalog/flows.py +++ b/pipelines/rj_escritorio/data_catalog/flows.py @@ -11,6 +11,7 @@ from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule from pipelines.rj_escritorio.data_catalog.tasks import ( generate_dataframe_from_list_of_tables, + list_projects, list_tables, merge_list_of_list_of_tables, update_gsheets_data_catalog, @@ -28,18 +29,16 @@ ], ) as rj_escritorio_data_catalog_flow: # Parameters - project_ids = Parameter("project_ids") spreadsheet_url = Parameter("spreadsheet_url") sheet_name = Parameter("sheet_name") bq_client_mode = Parameter("bq_client_mode", default="prod") + exclude_dev_projects = Parameter("exclude_dev_projects", default=True) exclude_staging = Parameter("exclude_staging", default=True) exclude_test = Parameter("exclude_test", default=True) exclude_logs = Parameter("exclude_logs", default=True) # Flow - project_ids = parse_comma_separated_string_to_list( - input_text=project_ids, output_type=str - ) + project_ids = list_projects(mode=bq_client_mode, exclude_dev=exclude_dev_projects) list_of_list_of_tables = list_tables.map( project_id=project_ids, mode=unmapped(bq_client_mode), diff --git a/pipelines/rj_escritorio/data_catalog/tasks.py b/pipelines/rj_escritorio/data_catalog/tasks.py index b124748ba..d1ce8a991 100644 --- a/pipelines/rj_escritorio/data_catalog/tasks.py +++ b/pipelines/rj_escritorio/data_catalog/tasks.py @@ -3,7 +3,10 @@ """ Tasks for generating a data catalog from BigQuery. """ +from typing import List + from google.cloud import bigquery +from googleapiclient import discovery import gspread import pandas as pd from prefect import task @@ -15,6 +18,41 @@ from pipelines.utils.utils import get_credentials_from_env, log +@task +def list_projects( + mode: str = "prod", + exclude_dev: bool = True, +) -> List[str]: + """ + Lists all GCP projects that we have access to. + + Args: + mode: Credentials mode. + exclude_dev: Exclude projects that ends with "-dev". + + Returns: + List of project IDs. + """ + credentials = get_credentials_from_env(mode=mode) + service = discovery.build("cloudresourcemanager", "v1", credentials=credentials) + request = service.projects().list() + projects = [] + while request is not None: + response = request.execute() + for project in response.get("projects", []): + project_id = project["projectId"] + if exclude_dev and project_id.endswith("-dev"): + log(f"Excluding dev project {project_id}.") + continue + log(f"Found project {project_id}.") + projects.append(project_id) + request = service.projects().list_next( + previous_request=request, previous_response=response + ) + log(f"Found {len(projects)} projects.") + return projects + + @task def list_tables( # pylint: disable=too-many-arguments project_id: str, @@ -68,10 +106,12 @@ def list_tables( # pylint: disable=too-many-arguments if exclude_test and "test" in table_id: log(f"Excluding test table {table_id}.") continue + table_description = table.description table_info = { "project_id": project_id, "dataset_id": dataset_id, "table_id": table_id, + "description": table_description, "url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", "private": not project_id == "datario", }