Skip to content

Commit

Permalink
feat: Automate location extraction and english translation (#642)
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y authored Aug 5, 2024
1 parent abc407d commit c8fbae6
Show file tree
Hide file tree
Showing 27 changed files with 1,413 additions and 355 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:

- name: Docker Compose DB
run: |
docker-compose --env-file ./config/.env.local up -d postgres postgres-test
docker compose --env-file ./config/.env.local up -d postgres postgres-test
working-directory: ${{ github.workspace }}

- name: Run lint checks
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/datasets-batch-deployer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- name: Docker Compose DB
run: |
docker-compose --env-file ./config/.env.local up -d postgres
docker compose --env-file ./config/.env.local up -d postgres
working-directory: ${{ github.workspace }}

- name: Install Liquibase
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:

- name: Docker Compose DB
run: |
docker-compose --env-file ./config/.env.local up -d postgres
docker compose --env-file ./config/.env.local up -d postgres
working-directory: ${{ github.workspace }}

- name: Install Liquibase
Expand Down
5 changes: 5 additions & 0 deletions api/src/scripts/populate_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ def populate_location(self, feed, row, stable_id):
"""
Populate the location for the feed
"""
# TODO: validate behaviour for gtfs-rt feeds
if feed.locations and feed.data_type == "gtfs":
self.logger.warning(f"Location already exists for feed {stable_id}")
return

country_code = self.get_safe_value(row, "location.country_code", "")
subdivision_name = self.get_safe_value(row, "location.subdivision_name", "")
municipality = self.get_safe_value(row, "location.municipality", "")
Expand Down
18 changes: 0 additions & 18 deletions functions-python/extract_bb/README.md

This file was deleted.

File renamed without changes.
File renamed without changes.
26 changes: 26 additions & 0 deletions functions-python/extract_location/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Function Workflow

1. **Eventarc Trigger**: The original function is triggered by a `CloudEvent` indicating a GTFS dataset upload. It parses the event data to identify the dataset and calculates the bounding box and location information from the GTFS feed.

2. **Pub/Sub Triggered Function**: A new function is triggered by Pub/Sub messages. This allows for batch processing of dataset extractions, enabling multiple datasets to be processed in parallel without waiting for each one to complete sequentially.

3. **HTTP Triggered Batch Function**: Another function, triggered via HTTP request, identifies all latest datasets lacking bounding box or location information. It then publishes messages to the Pub/Sub topic to trigger the extraction process for these datasets.

4. **Data Parsing**: Extracts `stable_id`, `dataset_id`, and the GTFS feed `url` from the triggering event or message.

5. **GTFS Feed Processing**: Retrieves bounding box coordinates and other location-related information from the GTFS feed located at the provided URL.

6. **Database Update**: Updates the bounding box and location information for the dataset in the database.

## Expected Behavior

- Bounding boxes and location information are extracted for the latest datasets that are missing them, improving the efficiency of the process by utilizing both batch and individual dataset processing mechanisms.

## Function Configuration

The functions rely on the following environment variables:
- `FEEDS_DATABASE_URL`: The database URL for connecting to the database containing GTFS datasets.

## Local Development

Local development of these functions should follow standard practices for GCP serverless functions. For general instructions on setting up the development environment, refer to the main [README.md](../README.md) file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "extract-bounding-box",
"name": "extract-location",
"description": "Extracts the bounding box from a dataset",
"entry_point": "extract_bounding_box",
"entry_point": "extract_location",
"timeout": 540,
"memory": "8Gi",
"trigger_http": false,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import numpy
from geoalchemy2 import WKTElement

from database_gen.sqlacodegen_models import Gtfsdataset


def create_polygon_wkt_element(bounds: numpy.ndarray) -> WKTElement:
"""
Create a WKTElement polygon from bounding box coordinates.
@:param bounds (numpy.ndarray): Bounding box coordinates.
@:return WKTElement: The polygon representation of the bounding box.
"""
min_longitude, min_latitude, max_longitude, max_latitude = bounds
points = [
(min_longitude, min_latitude),
(min_longitude, max_latitude),
(max_longitude, max_latitude),
(max_longitude, min_latitude),
(min_longitude, min_latitude),
]
wkt_polygon = f"POLYGON(({', '.join(f'{lon} {lat}' for lon, lat in points)}))"
return WKTElement(wkt_polygon, srid=4326)


def update_dataset_bounding_box(session, dataset_id, geometry_polygon):
"""
Update the bounding box of a dataset in the database.
@:param session (Session): The database session.
@:param dataset_id (str): The ID of the dataset.
@:param geometry_polygon (WKTElement): The polygon representing the bounding box.
@:raises Exception: If the dataset is not found in the database.
"""
dataset: Gtfsdataset | None = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_id)
.one_or_none()
)
if dataset is None:
raise Exception(f"Dataset {dataset_id} does not exist in the database.")
dataset.bounding_box = geometry_polygon
session.add(dataset)
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
from datetime import datetime

import functions_framework
import gtfs_kit
import numpy
from cloudevents.http import CloudEvent
from geoalchemy2 import WKTElement
from google.cloud import pubsub_v1
from sqlalchemy import or_
from sqlalchemy.orm import joinedload

from database_gen.sqlacodegen_models import Gtfsdataset
from helpers.database import start_db_session
from helpers.logger import Logger
from dataset_service.main import (
DatasetTraceService,
DatasetTrace,
Status,
PipelineStage,
)
from helpers.database import start_db_session
from helpers.logger import Logger
from .bounding_box.bounding_box_extractor import (
create_polygon_wkt_element,
update_dataset_bounding_box,
)
from .reverse_geolocation.location_extractor import update_location, reverse_coords
from .stops_utils import get_gtfs_feed_bounds_and_points

logging.basicConfig(level=logging.INFO)

Expand All @@ -40,64 +45,10 @@ def parse_resource_data(data: dict) -> tuple:
return stable_id, dataset_id, url


def get_gtfs_feed_bounds(url: str, dataset_id: str) -> numpy.ndarray:
"""
Retrieve the bounding box coordinates from the GTFS feed.
@:param url (str): URL to the GTFS feed.
@:param dataset_id (str): ID of the dataset for logs
@:return numpy.ndarray: An array containing the bounds (min_longitude, min_latitude, max_longitude, max_latitude).
@:raises Exception: If the GTFS feed is invalid
"""
try:
feed = gtfs_kit.read_feed(url, "km")
return feed.compute_bounds()
except Exception as e:
logging.error(f"[{dataset_id}] Error retrieving GTFS feed from {url}: {e}")
raise Exception(e)


def create_polygon_wkt_element(bounds: numpy.ndarray) -> WKTElement:
"""
Create a WKTElement polygon from bounding box coordinates.
@:param bounds (numpy.ndarray): Bounding box coordinates.
@:return WKTElement: The polygon representation of the bounding box.
"""
min_longitude, min_latitude, max_longitude, max_latitude = bounds
points = [
(min_longitude, min_latitude),
(min_longitude, max_latitude),
(max_longitude, max_latitude),
(max_longitude, min_latitude),
(min_longitude, min_latitude),
]
wkt_polygon = f"POLYGON(({', '.join(f'{lon} {lat}' for lon, lat in points)}))"
return WKTElement(wkt_polygon, srid=4326)


def update_dataset_bounding_box(session, dataset_id, geometry_polygon):
"""
Update the bounding box of a dataset in the database.
@:param session (Session): The database session.
@:param dataset_id (str): The ID of the dataset.
@:param geometry_polygon (WKTElement): The polygon representing the bounding box.
@:raises Exception: If the dataset is not found in the database.
"""
dataset: Gtfsdataset | None = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_id)
.one_or_none()
)
if dataset is None:
raise Exception(f"Dataset {dataset_id} does not exist in the database.")
dataset.bounding_box = geometry_polygon
session.add(dataset)
session.commit()


@functions_framework.cloud_event
def extract_bounding_box_pubsub(cloud_event: CloudEvent):
def extract_location_pubsub(cloud_event: CloudEvent):
"""
Main function triggered by a Pub/Sub message to extract and update the bounding box in the database.
Main function triggered by a Pub/Sub message to extract and update the location information in the database.
@param cloud_event: The CloudEvent containing the Pub/Sub message.
"""
Logger.init_logger()
Expand All @@ -106,6 +57,7 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
except ValueError:
maximum_executions = 1
data = cloud_event.data
location_extraction_n_points = os.getenv("LOCATION_EXTRACTION_N_POINTS", 5)
logging.info(f"Function triggered with Pub/Sub event data: {data}")

# Extract the Pub/Sub message data
Expand Down Expand Up @@ -164,7 +116,9 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
try:
logging.info(f"[{dataset_id}] accessing url: {url}")
try:
bounds = get_gtfs_feed_bounds(url, dataset_id)
bounds, location_geo_points = get_gtfs_feed_bounds_and_points(
url, dataset_id, location_extraction_n_points
)
except Exception as e:
error = f"Error processing GTFS feed: {e}"
raise e
Expand All @@ -176,16 +130,19 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
update_dataset_bounding_box(session, dataset_id, geometry_polygon)
update_location(reverse_coords(location_geo_points), dataset_id, session)
except Exception as e:
error = f"Error updating bounding box in database: {e}"
error = f"Error updating location information in database: {e}"
logging.error(f"[{dataset_id}] Error while processing: {e}")
if session is not None:
session.rollback()
raise e
finally:
if session is not None:
session.close()
logging.info(f"[{stable_id} - {dataset_id}] Bounding box updated successfully.")
logging.info(
f"[{stable_id} - {dataset_id}] Location information updated successfully."
)
except Exception:
pass
finally:
Expand All @@ -195,7 +152,7 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):


@functions_framework.cloud_event
def extract_bounding_box(cloud_event: CloudEvent):
def extract_location(cloud_event: CloudEvent):
"""
Wrapper function to extract necessary data from the CloudEvent and call the core function.
@param cloud_event: The CloudEvent containing the Pub/Sub message.
Expand Down Expand Up @@ -232,15 +189,16 @@ def extract_bounding_box(cloud_event: CloudEvent):
new_cloud_event = CloudEvent(attributes=attributes, data=new_cloud_event_data)

# Call the pubsub function with the constructed CloudEvent
return extract_bounding_box_pubsub(new_cloud_event)
return extract_location_pubsub(new_cloud_event)


@functions_framework.http
def extract_bounding_box_batch(_):
def extract_location_batch(_):
Logger.init_logger()
logging.info("Batch function triggered.")

pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None)
force_datasets_update = bool(os.getenv("FORCE_DATASETS_UPDATE", False))
if pubsub_topic_name is None:
logging.error("PUBSUB_TOPIC_NAME environment variable not set.")
return "PUBSUB_TOPIC_NAME environment variable not set.", 500
Expand All @@ -251,15 +209,22 @@ def extract_bounding_box_batch(_):
datasets_data = []
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
# Select all latest datasets with no bounding boxes or all datasets if forced
datasets = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.bounding_box == None) # noqa: E711
.filter(
or_(
force_datasets_update,
Gtfsdataset.bounding_box == None, # noqa: E711
)
)
.filter(Gtfsdataset.latest)
.options(joinedload(Gtfsdataset.feed))
.all()
)
for dataset in datasets:
data = {
"stable_id": dataset.feed_id,
"stable_id": dataset.feed.stable_id,
"dataset_id": dataset.stable_id,
"url": dataset.hosted_url,
"execution_id": execution_id,
Expand All @@ -274,7 +239,7 @@ def extract_bounding_box_batch(_):
if session is not None:
session.close()

# Trigger update bounding box for each dataset by publishing to Pub/Sub
# Trigger update location for each dataset by publishing to Pub/Sub
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(os.getenv("PROJECT_ID"), pubsub_topic_name)
for data in datasets_data:
Expand Down
Loading

0 comments on commit c8fbae6

Please sign in to comment.