Skip to content

Commit

Permalink
refatora sppo_infracao e sppo_licenciamento_stu para buscar do ftp
Browse files Browse the repository at this point in the history
  • Loading branch information
vtr363 committed May 16, 2024
1 parent 675f87a commit 6929600
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 232 deletions.
15 changes: 15 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,21 @@ class constants(Enum): # pylint: disable=c0103

# INFRAÇÃO
SPPO_INFRACAO_URL = "https://siurblab.rio.rj.gov.br/SMTR/Multas/multas.txt"

SPPO_INFRACAO_COLUMNS = [
"permissao",
"modo",
"placa",
"id_auto_infracao",
"data_infracao",
"valor",
"id_infracao",
"infracao",
"status",
"data_pagamento",
"servico",
]

SPPO_INFRACAO_MAPPING_KEYS = {
"permissao": "permissao",
"modal": "modo",
Expand Down
160 changes: 41 additions & 119 deletions pipelines/rj_smtr/veiculo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# EMD Imports #

from pipelines.constants import constants as emd_constants

from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client
from pipelines.utils.utils import set_default_parameters
Expand All @@ -36,6 +37,7 @@
create_local_partition_path,
get_current_timestamp,
get_raw,
get_rounded_timestamp,
parse_timestamp_to_string,
save_raw_local,
save_treated_local,
Expand All @@ -48,6 +50,8 @@
)

from pipelines.rj_smtr.veiculo.tasks import (
download_and_save_local_from_ftp,
get_ftp_filepaths,
pre_treatment_sppo_licenciamento,
pre_treatment_sppo_infracao,
get_veiculo_raw_storage,
Expand All @@ -62,77 +66,35 @@
# flake8: noqa: E501
with Flow(
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_LICENCIAMENTO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
# code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_licenciamento_captura:
timestamp = Parameter("timestamp", default=None)
get_from_storage = Parameter("get_from_storage", default=False)

timestamp = get_current_timestamp(timestamp=timestamp)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)

# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix=f"{sppo_licenciamento_captura.name} - ", now_time=timestamp
)

# SETUP #
partitions = create_date_hour_partition(timestamp, partition_date_only=True)

filename = parse_timestamp_to_string(timestamp)

filepath = create_local_partition_path(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_LICENCIAMENTO_TABLE_ID.value,
filename=filename,
partitions=partitions,
timestamp = Parameter("timestamp", default=None)
search_dir = Parameter("search_dir", default="licenciamento")
dataset_id = Parameter("dataset_id", default=constants.VEICULO_DATASET_ID.value)
table_id = Parameter(
"table_id", default=constants.SPPO_LICENCIAMENTO_TABLE_ID.value
)

timestamp = get_rounded_timestamp(timestamp)
# EXTRACT
raw_status_gcs = get_veiculo_raw_storage(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_LICENCIAMENTO_TABLE_ID.value,
timestamp=timestamp,
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
files = get_ftp_filepaths(search_dir=search_dir, timestamp=timestamp)
updated_files_info = download_and_save_local_from_ftp.map(
file_info=files, dataset_id=dataset_id, table_id=table_id
)

raw_status_url = get_raw(
url=constants.SPPO_LICENCIAMENTO_URL.value,
filetype="txt",
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
)

ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)

raw_status = merge(raw_status_gcs, raw_status_url)

raw_filepath = save_raw_local(status=raw_status, file_path=filepath)

# TREAT
treated_status = pre_treatment_sppo_licenciamento(
status=raw_status, timestamp=timestamp
# TRANSFORM
treated_paths, raw_paths, partitions, status = pre_treatment_sppo_licenciamento(
files=updated_files_info
)

treated_filepath = save_treated_local(status=treated_status, file_path=filepath)

# LOAD
error = bq_upload(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_LICENCIAMENTO_TABLE_ID.value,
filepath=treated_filepath,
raw_filepath=raw_filepath,
errors = bq_upload.map(
dataset_id=unmapped(dataset_id),
table_id=unmapped(table_id),
filepath=treated_paths,
raw_filepath=raw_paths,
partitions=partitions,
status=treated_status,
)
upload_logs_to_bq(
dataset_id=constants.VEICULO_DATASET_ID.value,
parent_table_id=constants.SPPO_LICENCIAMENTO_TABLE_ID.value,
timestamp=timestamp,
error=error,
)
sppo_licenciamento_captura.set_dependencies(
task=partitions, upstream_tasks=[rename_flow_run]
status=status,
)

sppo_licenciamento_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
Expand All @@ -144,73 +106,33 @@

with Flow(
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
# code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_infracao_captura:
timestamp = Parameter("timestamp", default=None)
get_from_storage = Parameter("get_from_storage", default=False)

timestamp = get_current_timestamp(timestamp=timestamp)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)

# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix=f"{sppo_infracao_captura.name} - ", now_time=timestamp
)

# SETUP #
partitions = create_date_hour_partition(timestamp, partition_date_only=True)

filename = parse_timestamp_to_string(timestamp)

filepath = create_local_partition_path(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_INFRACAO_TABLE_ID.value,
filename=filename,
partitions=partitions,
)
search_dir = Parameter("search_dir", default="multas")
dataset_id = Parameter("dataset_id", default=constants.VEICULO_DATASET_ID.value)
table_id = Parameter("table_id", default=constants.SPPO_INFRACAO_TABLE_ID.value)

# MODE = get_current_flow_mode()
timestamp = get_rounded_timestamp(timestamp)
# EXTRACT
raw_status_gcs = get_veiculo_raw_storage(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_INFRACAO_TABLE_ID.value,
timestamp=timestamp,
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
files = get_ftp_filepaths(search_dir=search_dir, timestamp=timestamp)
updated_files_info = download_and_save_local_from_ftp.map(
file_info=files, dataset_id=dataset_id, table_id=table_id
)
raw_status_url = get_raw(
url=constants.SPPO_INFRACAO_URL.value,
filetype="txt",
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
# TRANSFORM
treated_paths, raw_paths, partitions, status = pre_treatment_sppo_infracao(
files=updated_files_info
)
ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)

raw_status = merge(raw_status_gcs, raw_status_url)

raw_filepath = save_raw_local(status=raw_status, file_path=filepath)

# TREAT
treated_status = pre_treatment_sppo_infracao(status=raw_status, timestamp=timestamp)

treated_filepath = save_treated_local(status=treated_status, file_path=filepath)

# LOAD
error = bq_upload(
dataset_id=constants.VEICULO_DATASET_ID.value,
table_id=constants.SPPO_INFRACAO_TABLE_ID.value,
filepath=treated_filepath,
raw_filepath=raw_filepath,
errors = bq_upload.map(
dataset_id=unmapped(dataset_id),
table_id=unmapped(table_id),
filepath=treated_paths,
raw_filepath=raw_paths,
partitions=partitions,
status=treated_status,
)
upload_logs_to_bq(
dataset_id=constants.VEICULO_DATASET_ID.value,
parent_table_id=constants.SPPO_INFRACAO_TABLE_ID.value,
timestamp=timestamp,
error=error,
)
sppo_infracao_captura.set_dependencies(
task=partitions, upstream_tasks=[rename_flow_run]
status=status,
)

sppo_infracao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
Expand Down
Loading

0 comments on commit 6929600

Please sign in to comment.