Skip to content

Commit

Permalink
Merge branch 'master' into staging/cor-richard
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 20, 2023
2 parents e957730 + c2245d1 commit 928f729
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 29 deletions.
7 changes: 3 additions & 4 deletions pipelines/rj_escritorio/data_catalog/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule
from pipelines.rj_escritorio.data_catalog.tasks import (
generate_dataframe_from_list_of_tables,
list_projects,
list_tables,
merge_list_of_list_of_tables,
update_gsheets_data_catalog,
Expand All @@ -28,18 +29,16 @@
],
) as rj_escritorio_data_catalog_flow:
# Parameters
project_ids = Parameter("project_ids")
spreadsheet_url = Parameter("spreadsheet_url")
sheet_name = Parameter("sheet_name")
bq_client_mode = Parameter("bq_client_mode", default="prod")
exclude_dev_projects = Parameter("exclude_dev_projects", default=True)
exclude_staging = Parameter("exclude_staging", default=True)
exclude_test = Parameter("exclude_test", default=True)
exclude_logs = Parameter("exclude_logs", default=True)

# Flow
project_ids = parse_comma_separated_string_to_list(
input_text=project_ids, output_type=str
)
project_ids = list_projects(mode=bq_client_mode, exclude_dev=exclude_dev_projects)
list_of_list_of_tables = list_tables.map(
project_id=project_ids,
mode=unmapped(bq_client_mode),
Expand Down
98 changes: 73 additions & 25 deletions pipelines/rj_escritorio/data_catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
"""
Tasks for generating a data catalog from BigQuery.
"""
from typing import List

from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from googleapiclient import discovery
import gspread
import pandas as pd
from prefect import task
Expand All @@ -15,6 +19,41 @@
from pipelines.utils.utils import get_credentials_from_env, log


@task
def list_projects(
mode: str = "prod",
exclude_dev: bool = True,
) -> List[str]:
"""
Lists all GCP projects that we have access to.
Args:
mode: Credentials mode.
exclude_dev: Exclude projects that ends with "-dev".
Returns:
List of project IDs.
"""
credentials = get_credentials_from_env(mode=mode)
service = discovery.build("cloudresourcemanager", "v1", credentials=credentials)
request = service.projects().list()
projects = []
while request is not None:
response = request.execute()
for project in response.get("projects", []):
project_id = project["projectId"]
if exclude_dev and project_id.endswith("-dev"):
log(f"Excluding dev project {project_id}.")
continue
log(f"Found project {project_id}.")
projects.append(project_id)
request = service.projects().list_next(
previous_request=request, previous_response=response
)
log(f"Found {len(projects)} projects.")
return projects


@task
def list_tables( # pylint: disable=too-many-arguments
project_id: str,
Expand Down Expand Up @@ -50,32 +89,41 @@ def list_tables( # pylint: disable=too-many-arguments
client = get_bigquery_client(mode=mode)
log(f"Listing tables in project {project_id}.")
tables = []
for dataset in client.list_datasets(project=project_id):
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
for table in client.list_tables(dataset):
table_id = table.table_id
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
try:
datasets = client.list_datasets(project=project_id)
for dataset in datasets:
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
for table in client.list_tables(dataset):
table_id = table.table_id
table_object = client.get_table(table.reference)
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
continue
table_description = table_object.description
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"description": table_description,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
except NotFound:
# This will happen if BigQuery API is not enabled for this project. Just return an empty
# list
return tables
log(f"Found {len(tables)} tables in project {project_id}.")
return tables

Expand Down

0 comments on commit 928f729

Please sign in to comment.