Skip to content

Commit

Permalink
Merge branch 'master' into staging/cor-richard
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 31, 2023
2 parents 4679df3 + 0fe7d64 commit 4ec65e3
Show file tree
Hide file tree
Showing 25 changed files with 1,206 additions and 410 deletions.
3 changes: 2 additions & 1 deletion pipelines/rj_cor/meteorologia/meteorologia_redemet/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame:

# Remover fuso horário
dados["data"] = dados["data"].dt.strftime("%Y-%m-%d %H:%M:%S")
dados.rename(columns={"data": "data_medicao"}, inplace=True)

# Capitalizar os dados da coluna céu
dados["ceu"] = dados["ceu"].str.capitalize()
Expand All @@ -186,7 +187,7 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
prepath = Path("/tmp/meteorologia_redemet/")
prepath.mkdir(parents=True, exist_ok=True)

partition_column = "data"
partition_column = "data_medicao"
dataframe, partitions = parse_date_columns(dados, partition_column)

# Cria partições a partir da data
Expand Down
2 changes: 0 additions & 2 deletions pipelines/rj_cor/meteorologia/radar/precipitacao/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
name="COR: Meteorologia - Precipitacao RADAR",
code_owners=[
"paty",
"joao",
"gabriel",
],
# skip_if_running=True,
) as cor_meteorologia_precipitacao_radar_flow:
Expand Down
7 changes: 3 additions & 4 deletions pipelines/rj_escritorio/data_catalog/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule
from pipelines.rj_escritorio.data_catalog.tasks import (
generate_dataframe_from_list_of_tables,
list_projects,
list_tables,
merge_list_of_list_of_tables,
update_gsheets_data_catalog,
Expand All @@ -28,18 +29,16 @@
],
) as rj_escritorio_data_catalog_flow:
# Parameters
project_ids = Parameter("project_ids")
spreadsheet_url = Parameter("spreadsheet_url")
sheet_name = Parameter("sheet_name")
bq_client_mode = Parameter("bq_client_mode", default="prod")
exclude_dev_projects = Parameter("exclude_dev_projects", default=True)
exclude_staging = Parameter("exclude_staging", default=True)
exclude_test = Parameter("exclude_test", default=True)
exclude_logs = Parameter("exclude_logs", default=True)

# Flow
project_ids = parse_comma_separated_string_to_list(
input_text=project_ids, output_type=str
)
project_ids = list_projects(mode=bq_client_mode, exclude_dev=exclude_dev_projects)
list_of_list_of_tables = list_tables.map(
project_id=project_ids,
mode=unmapped(bq_client_mode),
Expand Down
98 changes: 73 additions & 25 deletions pipelines/rj_escritorio/data_catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
"""
Tasks for generating a data catalog from BigQuery.
"""
from typing import List

from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from googleapiclient import discovery
import gspread
import pandas as pd
from prefect import task
Expand All @@ -15,6 +19,41 @@
from pipelines.utils.utils import get_credentials_from_env, log


@task
def list_projects(
mode: str = "prod",
exclude_dev: bool = True,
) -> List[str]:
"""
Lists all GCP projects that we have access to.
Args:
mode: Credentials mode.
exclude_dev: Exclude projects that ends with "-dev".
Returns:
List of project IDs.
"""
credentials = get_credentials_from_env(mode=mode)
service = discovery.build("cloudresourcemanager", "v1", credentials=credentials)
request = service.projects().list()
projects = []
while request is not None:
response = request.execute()
for project in response.get("projects", []):
project_id = project["projectId"]
if exclude_dev and project_id.endswith("-dev"):
log(f"Excluding dev project {project_id}.")
continue
log(f"Found project {project_id}.")
projects.append(project_id)
request = service.projects().list_next(
previous_request=request, previous_response=response
)
log(f"Found {len(projects)} projects.")
return projects


@task
def list_tables( # pylint: disable=too-many-arguments
project_id: str,
Expand Down Expand Up @@ -50,32 +89,41 @@ def list_tables( # pylint: disable=too-many-arguments
client = get_bigquery_client(mode=mode)
log(f"Listing tables in project {project_id}.")
tables = []
for dataset in client.list_datasets(project=project_id):
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
for table in client.list_tables(dataset):
table_id = table.table_id
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
try:
datasets = client.list_datasets(project=project_id)
for dataset in datasets:
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
for table in client.list_tables(dataset):
table_id = table.table_id
table_object = client.get_table(table.reference)
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
continue
table_description = table_object.description
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"description": table_description,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
except NotFound:
# This will happen if BigQuery API is not enabled for this project. Just return an empty
# list
return tables
log(f"Found {len(tables)} tables in project {project_id}.")
return tables

Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_smfp/dump_db_sigma/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for SMFP SIGMA system
Database dumping flows for SMFP SIGMA system.
"""

from copy import deepcopy
Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_smfp/dump_url_metas/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"url": "https://docs.google.com/spreadsheets/d/1nqnwgigE0_Ac6-jkWiNwAdB4EMJfz1WV5nn0HaOhfu0\
/edit#gid=1236673479",
"url_type": "google_sheet",
"gsheets_sheet_name": "Lista Órgãos",
"gsheets_sheet_name": "ListaOrgaos",
"materialize_after_dump": True,
"dataset_id": "planejamento_gestao_dashboard_metas",
},
Expand Down
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from pipelines.rj_smtr.veiculo.flows import *
from pipelines.rj_smtr.example.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_gtfs.flows import *
Loading

0 comments on commit 4ec65e3

Please sign in to comment.