Skip to content

Commit

Permalink
feat: Implement script to update existing datasets with validator rep…
Browse files Browse the repository at this point in the history
…orts (#410)
  • Loading branch information
cka-y authored May 9, 2024
1 parent a50ec2c commit 3d7578b
Show file tree
Hide file tree
Showing 16 changed files with 624 additions and 75 deletions.
8 changes: 4 additions & 4 deletions functions-python/helpers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def get_db_engine(database_url: str = None, echo: bool = True):
return create_engine(database_url, echo=echo)


def start_new_db_session(database_url: str = None):
def start_new_db_session(database_url: str = None, echo: bool = True):
if database_url is None:
raise Exception("Database URL is not provided")
logging.info("Starting new database session.")
return sessionmaker(bind=get_db_engine(database_url))()
return sessionmaker(bind=get_db_engine(database_url, echo=echo))()


def start_singleton_db_session(database_url: str = None):
Expand All @@ -59,7 +59,7 @@ def start_singleton_db_session(database_url: str = None):
raise Exception(f"Error creating database session: {error}")


def start_db_session(database_url: str = None):
def start_db_session(database_url: str = None, echo: bool = True):
"""
:return: Database session
"""
Expand All @@ -69,7 +69,7 @@ def start_db_session(database_url: str = None):
if is_session_reusable():
return start_singleton_db_session(database_url)
logging.info("Not reusing the previous session, starting new database session.")
return start_new_db_session(database_url)
return start_new_db_session(database_url, echo)
except Exception as error:
raise Exception(f"Error creating database session: {error}")
finally:
Expand Down
9 changes: 9 additions & 0 deletions functions-python/update_validation_report/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[run]
omit =
*/test*/*
*/helpers/*
*/database_gen/*

[report]
exclude_lines =
if __name__ == .__main__.:
7 changes: 7 additions & 0 deletions functions-python/update_validation_report/.env.rename_me
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Environment variables for the validation report updates to run locally
FEEDS_DATABASE_URL={{FEEDS_DATABASE_URL}}
ENV={{ENV}}
BATCH_SIZE={{BATCH_SIZE}}
WEB_VALIDATOR_URL={{WEB_VALIDATOR_URL}}
LOCATION={{LOCATION}}
SLEEP_TIME={{SLEEP_TIME}}
29 changes: 29 additions & 0 deletions functions-python/update_validation_report/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Update Validation Report
This function activates the process that updates the validation report for all latest datasets that lack the current version of the report.

## Function Workflow
1. **HTTP Request Trigger**: The function is initiated via an HTTP request.
2. **Retrieve Latest Datasets**: Retrieves the latest datasets from the database that do not have the latest version of the validation report.
3. **Validate Accessibility of Datasets**: Checks the availability of the latest datasets to ensure that the data is accessible for validation report processing.
4. **Trigger Validation Report Processing**: If the latest dataset lacks the current validation report, this action initiates the `gtfs_validator_execution` workflow.
5. **Return Response**: Outputs a response indicating the status of the validation report update. The response format is as follows:
```json
{
"message": "Validation report update needed for X datasets and triggered for Y datasets",
"dataset_workflow_triggered": ["dataset_id1", "dataset_id2", ...],
"datasets_not_updated": ["dataset_id3", "dataset_id4", ...]
"ignored_datasets": ["dataset_id5", "dataset_id6", ...]
}
```
The response message provides information on the number of datasets that require a validation report update and the number of datasets for which the update has been triggered. It also lists the datasets that were not updated and those that were ignored due to unavailability of the data.

## Function Configuration
The function relies on several environmental variables:
- `FEEDS_DATABASE_URL`: URL used to connect to the database that holds GTFS datasets and related data.
- `ENV`: Specifies the environment (`dev`, `qa`, or `prod`), used to determine the appropriate bucket name and project id for retrieving validation reports and executing the `gtfs_validator_execution` workflow.
- `BATCH_SIZE`: Number of datasets processed in each batch to prevent rate limiting by the web validator.
- `SLEEP_TIME`: Time in seconds to wait between batches to prevent rate limiting by the web validator.
- `WEB_VALIDATOR_URL`: URL for the web validator that checks for the latest validation report version.
- `LOCATION`: Location of the GCP workflow execution.
## Local Development
Follow standard practices for local development of GCP serverless functions. Refer to the main [README.md](../README.md) for general setup instructions for the development environment.
19 changes: 19 additions & 0 deletions functions-python/update_validation_report/function_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "update-validation-report",
"description": "Batch update of validation report for the latest datasets",
"entry_point": "update_validation_report",
"timeout": 3600,
"memory": "256Mi",
"trigger_http": true,
"include_folders": ["database_gen", "helpers"],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 1,
"max_instance_count": 5,
"min_instance_count": 0,
"available_cpu": 1
}
15 changes: 15 additions & 0 deletions functions-python/update_validation_report/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
functions-framework==3.*
google-cloud-logging
google-cloud-storage
google-cloud-workflows
psycopg2-binary==2.9.6
aiohttp~=3.8.6
asyncio~=3.4.3
urllib3~=2.1.0
SQLAlchemy==2.0.23
geoalchemy2==0.14.7
requests~=2.31.0
cloudevents~=1.10.1
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2023.7.22
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker
pytest~=7.4.3
Empty file.
229 changes: 229 additions & 0 deletions functions-python/update_validation_report/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#
# MobilityData 2024
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
from time import sleep
from typing import List

import functions_framework
import requests
import sqlalchemy.orm
import json
from sqlalchemy import or_
from google.cloud import storage
from sqlalchemy.engine import Row
from sqlalchemy.engine.interfaces import Any

from database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfeed, Validationreport
from helpers.database import start_db_session
from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
from google.cloud.workflows.executions_v1 import Execution

from helpers.logger import Logger

logging.basicConfig(level=logging.INFO)
env = os.getenv("ENV", "dev").lower()
bucket_name = f"mobilitydata-datasets-{env}"


@functions_framework.http
def update_validation_report(_):
"""
Update the validation report for the datasets that need it
"""
Logger.init_logger()

# Get validator version
validator_version = get_validator_version()
logging.info(f"Accessing bucket {bucket_name}")

session = start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False)
latest_datasets = get_latest_datasets_without_validation_reports(
session, validator_version
)
logging.info(f"Retrieved {len(latest_datasets)} latest datasets.")

valid_latest_datasets = get_datasets_for_validation(latest_datasets)
logging.info(f"Retrieved {len(latest_datasets)} blobs to update.")

execution_triggered_datasets = execute_workflows(valid_latest_datasets)
response = {
"message": f"Validation report update needed for {len(valid_latest_datasets)} datasets and triggered for "
f"{len(execution_triggered_datasets)} datasets.",
"dataset_workflow_triggered": sorted(execution_triggered_datasets),
"datasets_not_updated": sorted(
[
dataset_id
for _, dataset_id in valid_latest_datasets
if dataset_id not in execution_triggered_datasets
]
),
"ignored_datasets": sorted(
[
dataset_id
for _, dataset_id in latest_datasets
if dataset_id not in valid_latest_datasets
]
),
}
return response, 200


def get_validator_version():
"""
Get the version of the validator
:return: the version of the validator
"""
web_validator_endpoint = os.getenv("WEB_VALIDATOR_URL")
response = requests.get(f"{web_validator_endpoint}/version")
validator_version = response.json()["version"]
logging.info(f"Validator version: {validator_version}")
return validator_version


def get_latest_datasets_without_validation_reports(
session: sqlalchemy.orm.Session,
validator_version: str,
) -> List[Row[tuple[Any, Any]]]:
"""
Retrieve the latest datasets for each feed that do not have a validation report
:param session: The database session
:param validator_version: The version of the validator
:return: A list of tuples containing the feed stable id and dataset stable id
"""
query = (
session.query(
Gtfsfeed.stable_id,
Gtfsdataset.stable_id,
)
.select_from(Gtfsfeed)
.join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id)
.outerjoin(Validationreport, Gtfsdataset.validation_reports)
.filter(Gtfsdataset.latest.is_(True))
.filter(
or_(
Validationreport.validator_version != validator_version,
Validationreport.id.is_(None),
)
)
.distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id)
)
return query.all()


def get_datasets_for_validation(
latest_datasets: List[Row[tuple[Any, Any]]]
) -> List[tuple[str, str]]:
"""
Get the valid dataset blobs that need their validation report to be updated
:param latest_datasets: List of tuples containing the feed stable id and dataset stable id
:return: List of tuples containing the feed stable id and dataset stable id
"""
report_update_needed = []
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

for feed_id, dataset_id in latest_datasets:
try:
dataset_blob = bucket.blob(f"{feed_id}/{dataset_id}/{dataset_id}.zip")
if not dataset_blob.exists():
logging.warning(f"Dataset blob not found for {feed_id}/{dataset_id}")
else:
report_update_needed.append((feed_id, dataset_id))
logging.info(
f"Dataset blob found for {feed_id}/{dataset_id} -- Adding to update list"
)
except Exception as e:
logging.error(
f"Error while accessing dataset blob for {feed_id}/{dataset_id}: {e}"
)
return report_update_needed


def execute_workflow(
project: str,
location: str = "northamerica-northeast1",
workflow: str = "gtfs_validator_execution",
input_data: dict = None,
) -> Execution:
"""
Executes a workflow with input data and print the execution results.
@param project: The Google Cloud project id which contains the workflow to execute.
@param location: The location for the workflow.
@param workflow: The ID of the workflow to execute.
@param input_data: A dictionary containing input data for the workflow.
@return: The execution response.
"""
execution_client = executions_v1.ExecutionsClient()
workflows_client = workflows_v1.WorkflowsClient()
parent = workflows_client.workflow_path(project, location, workflow)

# Prepare the execution input as a JSON string.
input_json = json.dumps(input_data) if input_data else "{}"

# Create and configure the execution request with input data.
execution_request = Execution(argument=input_json)
response = execution_client.create_execution(
parent=parent, execution=execution_request
)
logging.info(f"Created execution: {response.name}")
execution = execution_client.get_execution(request={"name": response.name})
return execution


def execute_workflows(latest_datasets):
"""
Execute the workflow for the latest datasets that need their validation report to be updated
:param latest_datasets: List of tuples containing the feed stable id and dataset stable id
:return: List of dataset stable ids for which the workflow was executed
"""
project_id = f"mobility-feeds-{env}"
location = os.getenv("LOCATION", "northamerica-northeast1")
execution_triggered_datasets = []
batch_size = int(os.getenv("BATCH_SIZE", 5))
sleep_time = int(os.getenv("SLEEP_TIME", 5))
count = 0
logging.info(f"Executing workflow for {len(latest_datasets)} datasets")
for feed_id, dataset_id in latest_datasets:
try:
input_data = {
"data": {
"protoPayload": {
"resourceName": "projects/_/"
f"buckets/{bucket_name}/"
f"objects/{feed_id}/{dataset_id}/{dataset_id}.zip"
},
"resource": {
"labels": {"location": location, "project_id": project_id},
},
}
}
logging.info(f"Executing workflow for {feed_id}/{dataset_id}")
execute_workflow(project_id, input_data=input_data)
execution_triggered_datasets.append(dataset_id)
except Exception as e:
logging.error(
f"Error while executing workflow for {feed_id}/{dataset_id}: {e}"
)
count += 1
logging.info(f"Triggered workflow execution for {count} datasets")
if count % batch_size == 0:
logging.info(
f"Sleeping for {sleep_time} seconds before next batch to avoid rate limiting.."
)
sleep(sleep_time)
return execution_triggered_datasets
Loading

0 comments on commit 3d7578b

Please sign in to comment.