Skip to content

Commit

Permalink
Subsídio - Captura de extrações manuais de veículos (#670)
Browse files Browse the repository at this point in the history
* adiciona arquivos manuais

* adapta flows de captura de dados de veiculo

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Apr 19, 2024
1 parent 8387292 commit 65f41c3
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 8 deletions.
36 changes: 32 additions & 4 deletions pipelines/rj_smtr/veiculo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from copy import deepcopy
from prefect import Parameter
from prefect.tasks.control_flow import ifelse, merge
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped
Expand Down Expand Up @@ -49,6 +50,7 @@
from pipelines.rj_smtr.veiculo.tasks import (
pre_treatment_sppo_licenciamento,
pre_treatment_sppo_infracao,
get_veiculo_raw_storage,
)

from pipelines.utils.execute_dbt_model.tasks import run_dbt_model
Expand All @@ -62,7 +64,10 @@
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_LICENCIAMENTO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_licenciamento_captura:
timestamp = get_current_timestamp()
timestamp = Parameter("timestamp", default=None)
get_from_storage = Parameter("get_from_storage", default=False)

timestamp = get_current_timestamp(timestamp=timestamp)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
Expand All @@ -85,12 +90,23 @@
)

# EXTRACT
raw_status = get_raw(
raw_status_gcs = get_veiculo_raw_storage(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_LICENCIAMENTO_TABLE_ID.value,
timestamp=timestamp,
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
)

raw_status_url = get_raw(
url=constants.SPPO_LICENCIAMENTO_URL.value,
filetype="txt",
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
)

ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)

raw_status = merge(raw_status_gcs, raw_status_url)

raw_filepath = save_raw_local(status=raw_status, file_path=filepath)

# TREAT
Expand Down Expand Up @@ -130,7 +146,10 @@
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_infracao_captura:
timestamp = get_current_timestamp()
timestamp = Parameter("timestamp", default=None)
get_from_storage = Parameter("get_from_storage", default=False)

timestamp = get_current_timestamp(timestamp=timestamp)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
Expand All @@ -153,11 +172,20 @@
)

# EXTRACT
raw_status = get_raw(
raw_status_gcs = get_veiculo_raw_storage(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_INFRACAO_TABLE_ID.value,
timestamp=timestamp,
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
)
raw_status_url = get_raw(
url=constants.SPPO_INFRACAO_URL.value,
filetype="txt",
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
)
ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)

raw_status = merge(raw_status_gcs, raw_status_url)

raw_filepath = save_raw_local(status=raw_status, file_path=filepath)

Expand Down
56 changes: 52 additions & 4 deletions pipelines/rj_smtr/veiculo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
"""
Tasks for veiculos
"""

import traceback
import zipfile
import io
from datetime import datetime
import pandas as pd
import numpy as np
from prefect import task
import basedosdados as bd

# EMD Imports #

Expand All @@ -23,6 +26,51 @@
# Tasks #


@task
def get_veiculo_raw_storage(
dataset_id: str,
table_id: str,
timestamp: datetime,
csv_args: dict,
) -> dict:
"""Get data from daily manually extracted files received by email
Args:
dataset_id (str): dataset_id on BigQuery
table_id (str): table_id on BigQuery
timestamp (datetime): file extraction date
csv_args (dict): Arguments for read_csv
"""
data = None
error = None
filename_map = {
constants.SPPO_LICENCIAMENTO_TABLE_ID.value: "Cadastro de Veiculos",
constants.SPPO_INFRACAO_TABLE_ID.value: "MULTAS",
}

filename = f"{filename_map[table_id]}_{timestamp.date().strftime('%Y%m%d')}"

try:
bucket = bd.Storage(dataset_id=dataset_id, table_id=table_id)
blob = (
bucket.client["storage_staging"]
.bucket(bucket.bucket_name)
.get_blob(f"upload/{dataset_id}/{table_id}/{filename}.zip")
)
data = blob.download_as_bytes()
with zipfile.ZipFile(io.BytesIO(data), "r") as zipped_file:
data = zipped_file.read(f"{filename}.txt")

data = data.decode(encoding="utf-8")

data = pd.read_csv(io.StringIO(data), **csv_args).to_dict(orient="records")
except Exception:
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return {"data": data, "error": error}


@task
def pre_treatment_sppo_licenciamento(status: dict, timestamp: datetime):
"""Basic data treatment for vehicle data. Apply filtering to raw data.
Expand Down Expand Up @@ -78,9 +126,9 @@ def pre_treatment_sppo_licenciamento(status: dict, timestamp: datetime):

log("Update indicador_ar_condicionado based on tipo_veiculo...", level="info")
data["indicador_ar_condicionado"] = data["tipo_veiculo"].map(
lambda x: None
if not isinstance(x, str)
else bool("C/AR" in x.replace(" ", ""))
lambda x: (
None if not isinstance(x, str) else bool("C/AR" in x.replace(" ", ""))
)
)

log("Update status...", level="info")
Expand Down

0 comments on commit 65f41c3

Please sign in to comment.