Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refatora recapturas de GPS do SPPO #168

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
eafcdb6
refactor recaptura
Hellcassius Aug 17, 2022
0685dcd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 17, 2022
6a7fa7a
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 17, 2022
25dd922
change labels to dev, projects to staging
Hellcassius Aug 17, 2022
e0afe8d
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Aug 17, 2022
f0cf644
change datetime_filter to str
Hellcassius Aug 17, 2022
83ea843
trigger cd
Hellcassius Aug 17, 2022
bed57f3
add merge to capture flow
Hellcassius Aug 17, 2022
fde4b1a
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 17, 2022
d682335
add logs and memory/cpu requests
Hellcassius Aug 17, 2022
baacfc6
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Aug 17, 2022
506ef44
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 18, 2022
c0e0336
uncomment log critical
Hellcassius Aug 19, 2022
5a6db5b
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Aug 19, 2022
a59f95e
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 26, 2022
cdb1e09
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 26, 2022
5100088
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 26, 2022
00ed4f3
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 26, 2022
b06f547
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 26, 2022
c16468f
add timestamp to task get_last_run
Hellcassius Aug 29, 2022
8d2a911
Merge branch 'master' into staging/recaptura
mergify[bot] Aug 30, 2022
58b6863
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
e9f7338
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
2b3d94a
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
7aea263
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
9baec39
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
d9878cb
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
7b348c9
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
db66690
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
aa7e2c2
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 1, 2022
20dd122
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 2, 2022
a71af91
add limit and request to recapture
Hellcassius Sep 6, 2022
7819f95
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Sep 6, 2022
2d24e3e
trigger cd
Hellcassius Sep 6, 2022
e8da421
fix(ci): modify ref for wait-on-check action
gabriel-milan Sep 6, 2022
899ab33
add empty response return
Hellcassius Sep 6, 2022
1e19d9a
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 8, 2022
335795e
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Sep 8, 2022
2bdb51f
add bool check to cases
Hellcassius Sep 8, 2022
eefb24d
replace tzinfo on materialization_date_range
Hellcassius Sep 8, 2022
7247c76
Merge branch 'master' into staging/recaptura
Hellcassius Sep 9, 2022
7233675
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 9, 2022
f4aa189
remove unused imports
Hellcassius Sep 9, 2022
5c8fe43
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 9, 2022
1dca5e5
simplify logs logic
Hellcassius Sep 9, 2022
1d8f1f4
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Sep 9, 2022
c266695
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 13, 2022
7c4cbb4
improve materialize behaviour
Hellcassius Sep 13, 2022
3bd64a1
Merge branch 'master' into staging/recaptura
Hellcassius Sep 13, 2022
cb2ea3b
change labels to prod
Hellcassius Sep 13, 2022
2d058ec
Merge branch 'staging/recaptura' of https://github.com/prefeitura-rio…
Hellcassius Sep 13, 2022
c33ed64
fix comments
Hellcassius Sep 14, 2022
acec940
change labels to dev
Hellcassius Sep 14, 2022
cbb11af
remove duplicated code
fernandascovino Sep 15, 2022
92a68cd
fix end_ts + clean datee parser
fernandascovino Sep 15, 2022
10c20dc
fix typo
Hellcassius Sep 16, 2022
7c79a33
change brt label to dev
Hellcassius Sep 16, 2022
418f88f
update brt materialize flow to use labels and mode
Hellcassius Sep 16, 2022
f34b2d0
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 20, 2022
49f98d7
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 20, 2022
797586e
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 21, 2022
52bff32
Merge branch 'master' into staging/recaptura
mergify[bot] Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from pipelines.constants import constants as emd_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client
from pipelines.utils.tasks import get_now_time, rename_current_flow_run_now_time
from pipelines.utils.tasks import (
get_now_time,
rename_current_flow_run_now_time,
get_current_flow_mode,
get_current_flow_labels,
)

# SMTR Imports #

Expand Down Expand Up @@ -67,8 +72,11 @@
table_id = Parameter("table_id", default=constants.GPS_BRT_TREATED_TABLE_ID.value)
rebuild = Parameter("rebuild", False)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)

# Set dbt client #
dbt_client = get_k8s_dbt_client(mode="prod", wait=rename_flow_run)
dbt_client = get_k8s_dbt_client(mode=MODE, wait=rename_flow_run)
# Use the command below to get the dbt client in dev mode:
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

Expand All @@ -79,6 +87,8 @@
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_date_column_name="data",
mode=MODE,
delay_hours=constants.GPS_BRT_MATERIALIZE_DELAY_HOURS.value,
)
dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
Expand All @@ -99,6 +109,7 @@
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)
with case(rebuild, False):
RUN = run_dbt_model(
Expand All @@ -113,12 +124,13 @@
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

materialize_brt.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_brt.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
materialize_brt.schedule = every_hour

Expand Down
114 changes: 49 additions & 65 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.tasks.control_flow import merge
from prefect.utilities.edges import unmapped


Expand Down Expand Up @@ -38,7 +39,7 @@
get_materialization_date_range,
# get_local_dbt_client,
get_raw,
parse_timestamp_to_string,
get_bool,
query_logs,
save_raw_local,
save_treated_local,
Expand Down Expand Up @@ -90,6 +91,7 @@
raw_table_id=raw_table_id,
table_date_column_name="data",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
Expand Down Expand Up @@ -131,7 +133,7 @@
materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)


Expand All @@ -142,25 +144,33 @@

version = Parameter("version", default=2)

# RECAPTURE PARAMETERS
datetime_filter = Parameter("datetime_filter", default=None)
recapture = Parameter("recapture", default=False)
previous_error = Parameter("previous_error", default=None)

# SETUP #
timestamp = get_current_timestamp()
with case(get_bool(datetime_filter), False):
timestamp_now = get_current_timestamp()
with case(get_bool(datetime_filter), True):
timestamp_default = datetime_filter

timestamp = merge(timestamp_default, timestamp_now)

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO: ", now_time=timestamp
)

partitions = create_date_hour_partition(timestamp)

filename = parse_timestamp_to_string(timestamp)

filepath = create_local_partition_path(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
filename=filename,
filename=timestamp,
partitions=partitions,
)

url = create_api_url_onibus_gps(version=version)
url = create_api_url_onibus_gps(version=version, timestamp=timestamp)

# EXTRACT #
raw_status = get_raw(url)
Expand All @@ -183,18 +193,24 @@
partitions=partitions,
status=treated_status,
)

upload_logs_to_bq(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
parent_table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
error=error,
previous_error=previous_error,
timestamp=timestamp,
recapture=recapture,
)


captura_sppo_v2.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
captura_sppo_v2.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
cpu_limit="50m",
cpu_request="10m",
memory_limit="256Mi",
memory_request="64Mi",
)
captura_sppo_v2.schedule = every_minute

Expand All @@ -205,20 +221,22 @@
datetime_filter = Parameter("datetime_filter", default=None)
# SETUP #
LABELS = get_current_flow_labels()
errors, timestamps = query_logs(
errors, recapture_parameters = query_logs(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
datetime_filter=datetime_filter,
)

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO Recapturas: ", now_time=get_now_time(), wait=timestamps
prefix="GPS SPPO Recapturas: ",
now_time=get_now_time(),
wait=recapture_parameters,
)

with case(errors, False):
materialize_no_errors = create_flow_run(
flow_name=materialize_sppo.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
run_name=materialize_sppo.name,
)
Expand All @@ -229,69 +247,35 @@
raise_final_state=True,
)
with case(errors, True):
# SETUP #
partitions = create_date_hour_partition.map(timestamps)
filename = parse_timestamp_to_string.map(timestamps)

filepath = create_local_partition_path.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_RAW_TABLE_ID.value),
filename=filename,
partitions=partitions,
)

url = create_api_url_onibus_gps.map(
version=unmapped(version), timestamp=timestamps
)

# EXTRACT #
raw_status = get_raw.map(url)

raw_filepath = save_raw_local.map(status=raw_status, file_path=filepath)

# # CLEAN #
trated_status = pre_treatment_br_rj_riodejaneiro_onibus_gps.map(
status=raw_status, timestamp=timestamps, version=unmapped(version)
)

treated_filepath = save_treated_local.map(
status=trated_status, file_path=filepath
recaptura_runs = create_flow_run.map(
flow_name=unmapped(captura_sppo_v2.name),
project_name=unmapped("staging"),
labels=unmapped(LABELS),
run_name=unmapped(captura_sppo_v2.name),
parameters=recapture_parameters,
)

# # LOAD #
error = bq_upload.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_RAW_TABLE_ID.value),
filepath=treated_filepath,
raw_filepath=raw_filepath,
partitions=partitions,
status=trated_status,
)

UPLOAD_LOGS = upload_logs_to_bq.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
parent_table_id=unmapped(constants.GPS_SPPO_RAW_TABLE_ID.value),
error=error,
timestamp=timestamps,
recapture=unmapped(True),
wait_for_recapturas = wait_for_flow_run.map(
recaptura_runs,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
materialize = create_flow_run(
flow_name=materialize_sppo.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
run_name=materialize_sppo.name,
)
wait_materialize = wait_for_flow_run(
materialize,
stream_states=True,
stream_logs=True,
raise_final_state=True,
wait_for_materialize = wait_for_flow_run(
materialize, stream_states=True, stream_logs=True, raise_final_state=True
)
recaptura.set_dependencies(task=materialize, upstream_tasks=[UPLOAD_LOGS])

materialize.set_upstream(wait_for_recapturas)
recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
recaptura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
memory_limit="256Mi",
memory_request="64Mi",
cpu_limit="50m",
)
recaptura.schedule = every_hour_minute_six
5 changes: 4 additions & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


@task
def create_api_url_onibus_gps(version: str, timestamp: datetime = None) -> str:
def create_api_url_onibus_gps(version: str, timestamp: str = None) -> str:
"""
Generates the complete URL to get data from API.
"""
Expand All @@ -38,6 +38,8 @@ def create_api_url_onibus_gps(version: str, timestamp: datetime = None) -> str:
timestamp = pendulum.now(constants.TIMEZONE.value).replace(
second=0, microsecond=0
)
else:
timestamp = datetime.fromisoformat(timestamp)

headers = get_vault_secret(source)["data"]
key = list(headers)[0]
Expand Down Expand Up @@ -83,6 +85,7 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps(
timezone = constants.TIMEZONE.value

log(f"Data received to treat: \n{status['data'][:5]}")

df = pd.DataFrame(status["data"]) # pylint: disable=c0103
df["timestamp_captura"] = timestamp
log(f"Before converting, datahora is: \n{df['datahora']}")
Expand Down
2 changes: 2 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ class constants(Enum): # pylint: disable=c0103
GPS_SPPO_CAPTURE_DELAY_V1 = 1
GPS_SPPO_CAPTURE_DELAY_V2 = 60
GPS_SPPO_RECAPTURE_DELAY_V2 = 6
GPS_SPPO_MATERIALIZE_DELAY_HOURS = 1
# GPS BRT #
GPS_BRT_SECRET_PATH = "brt_api"
GPS_BRT_DATASET_ID = "br_rj_riodejaneiro_veiculos"
GPS_BRT_RAW_DATASET_ID = "br_rj_riodejaneiro_brt_gps"
GPS_BRT_RAW_TABLE_ID = "registros"
GPS_BRT_TREATED_TABLE_ID = "gps_brt"
GPS_BRT_MATERIALIZE_DELAY_HOURS = 0
GPS_BRT_MAPPING_KEYS = {
"vei_nro_gestor": "id_veiculo",
"linha": "servico",
Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_smtr/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
2021, 1, 1, 0, 6, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value,
],
),
]
Expand Down
Loading