diff --git a/functions-python/helpers/locations.py b/functions-python/helpers/locations.py new file mode 100644 index 000000000..9042b67b5 --- /dev/null +++ b/functions-python/helpers/locations.py @@ -0,0 +1,30 @@ +from typing import Dict + +from database_gen.sqlacodegen_models import Feed + + +def translate_feed_locations(feed: Feed, location_translations: Dict): + """ + Translate the locations of a feed. + :param feed: The feed object + :param location_translations: The location translations + """ + for location in feed.locations: + location_translation = location_translations.get(location.id) + + if location_translation: + location.subdivision_name = ( + location_translation["subdivision_name_translation"] + if location_translation["subdivision_name_translation"] + else location.subdivision_name + ) + location.municipality = ( + location_translation["municipality_translation"] + if location_translation["municipality_translation"] + else location.municipality + ) + location.country = ( + location_translation["country_translation"] + if location_translation["country_translation"] + else location.country + ) diff --git a/functions-python/helpers/requirements.txt b/functions-python/helpers/requirements.txt index f586cdb4b..0c58ed566 100644 --- a/functions-python/helpers/requirements.txt +++ b/functions-python/helpers/requirements.txt @@ -11,8 +11,8 @@ psycopg2-binary==2.9.6 aiohttp asyncio urllib3~=2.1.0 -SQLAlchemy==1.4.49 -geoalchemy2 +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 requests~=2.31.0 cloudevents~=1.10.1 requests_mock \ No newline at end of file diff --git a/functions-python/helpers/tests/test_locations.py b/functions-python/helpers/tests/test_locations.py new file mode 100644 index 000000000..38180cdc2 --- /dev/null +++ b/functions-python/helpers/tests/test_locations.py @@ -0,0 +1,91 @@ +import unittest +from unittest.mock import MagicMock +from database_gen.sqlacodegen_models import Feed, Location +from helpers.locations import translate_feed_locations + + +class TestTranslateFeedLocations(unittest.TestCase): + def test_translate_feed_locations(self): + # Mock a location object with specific attributes + mock_location = MagicMock(spec=Location) + mock_location.id = 1 + mock_location.subdivision_name = "Original Subdivision" + mock_location.municipality = "Original Municipality" + mock_location.country = "Original Country" + + # Mock a feed object with locations + mock_feed = MagicMock(spec=Feed) + mock_feed.locations = [mock_location] + + # Define a translation dictionary + location_translations = { + 1: { + "subdivision_name_translation": "Translated Subdivision", + "municipality_translation": "Translated Municipality", + "country_translation": "Translated Country", + } + } + + # Call the translate_feed_locations function + translate_feed_locations(mock_feed, location_translations) + + # Assert that the location's attributes were updated with translations + self.assertEqual(mock_location.subdivision_name, "Translated Subdivision") + self.assertEqual(mock_location.municipality, "Translated Municipality") + self.assertEqual(mock_location.country, "Translated Country") + + def test_translate_feed_locations_with_missing_translations(self): + # Mock a location object with specific attributes + mock_location = MagicMock(spec=Location) + mock_location.id = 1 + mock_location.subdivision_name = "Original Subdivision" + mock_location.municipality = "Original Municipality" + mock_location.country = "Original Country" + + # Mock a feed object with locations + mock_feed = MagicMock(spec=Feed) + mock_feed.locations = [mock_location] + + # Define a translation dictionary with missing translations + location_translations = { + 1: { + "subdivision_name_translation": None, + "municipality_translation": None, + "country_translation": "Translated Country", + } + } + + # Call the translate_feed_locations function + translate_feed_locations(mock_feed, location_translations) + + # Assert that the location's attributes were updated correctly + self.assertEqual( + mock_location.subdivision_name, "Original Subdivision" + ) # No translation + self.assertEqual( + mock_location.municipality, "Original Municipality" + ) # No translation + self.assertEqual(mock_location.country, "Translated Country") # Translated + + def test_translate_feed_locations_with_no_translation(self): + # Mock a location object with specific attributes + mock_location = MagicMock(spec=Location) + mock_location.id = 1 + mock_location.subdivision_name = "Original Subdivision" + mock_location.municipality = "Original Municipality" + mock_location.country = "Original Country" + + # Mock a feed object with locations + mock_feed = MagicMock(spec=Feed) + mock_feed.locations = [mock_location] + + # Define an empty translation dictionary + location_translations = {} + + # Call the translate_feed_locations function + translate_feed_locations(mock_feed, location_translations) + + # Assert that the location's attributes remain unchanged + self.assertEqual(mock_location.subdivision_name, "Original Subdivision") + self.assertEqual(mock_location.municipality, "Original Municipality") + self.assertEqual(mock_location.country, "Original Country") diff --git a/functions-python/preprocessed_analytics/.coveragerc b/functions-python/preprocessed_analytics/.coveragerc new file mode 100644 index 000000000..d3ef5cbc8 --- /dev/null +++ b/functions-python/preprocessed_analytics/.coveragerc @@ -0,0 +1,9 @@ +[run] +omit = + */test*/* + */helpers/* + */database_gen/* + +[report] +exclude_lines = + if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/preprocessed_analytics/README.md b/functions-python/preprocessed_analytics/README.md new file mode 100644 index 000000000..94dd05c55 --- /dev/null +++ b/functions-python/preprocessed_analytics/README.md @@ -0,0 +1,197 @@ +# GTFS & GBFS Analytics Processor + +This directory contains Google Cloud Functions that automate the retrieval, processing, and analytics generation for GTFS and GBFS datasets. The project is designed to handle and analyze both GTFS and GBFS data, storing the results in Google Cloud Storage. + +## Overview + +### `process_analytics_gtfs` + +This HTTP-triggered Cloud Function processes GTFS datasets by performing the following steps: + +1. **Retrieving Data**: Fetches the latest GTFS dataset per feed from the database. +2. **Processing Data**: Analyzes the dataset, extracting metrics related to validation notices, features, and geographical locations. +3. **Storing Analytics**: Saves the processed data as JSON files in the Google Cloud Storage bucket, updating metrics and analytics files. + +#### Files Modified/Created: +- **`analytics_YYYY-MM-DD.json`**: Contains the GTFS analytics data for the specific date in JSON format. + **Format:** + ```json + { + "feed_id": "string", + "dataset_id": "string", + "notices": { + "errors": ["string"], + "warnings": ["string"], + "infos": ["string"] + }, + "features": ["string"], + "created_on": "datetime", + "last_modified": "datetime", + "provider": "string", + "locations": [ + { + "country_code": "string", + "country": "string", + "municipality": "string", + "subdivision_name": "string" + } + ] + } + ``` + +- **`feed_metrics.json`**: Stores aggregated feed-level metrics, including error, warning, and info counts. + **Format:** + ```json + { + "feed_id": "string", + "computed_on": ["datetime"], + "errors_count": ["int"], + "warnings_count": ["int"], + "infos_count": ["int"] + } + ``` + +- **`features_metrics.json`**: Tracks feature usage across feeds, showing the number of feeds using specific features. + **Format:** + ```json + { + "feature": "string", + "computed_on": ["datetime"], + "feeds_count": ["int"] + } + ``` + +- **`notices_metrics.json`**: Records notice metrics by severity level (error, warning, info). + **Format:** + ```json + { + "notice": "string", + "severity": "string", + "computed_on": ["datetime"], + "feeds_count": ["int"] + } + ``` + +- **`summary/summary_YYYY-MM-DD.json`**: Contains aggregated metrics for the specific date, including feed metrics, feature metrics, and notice metrics. + **Format:** + ```json + { + "feed_metrics": [...], + "features_metrics": [...], + "notices_metrics": [...] + } + ``` + +- **`analytics_files.json`**: Index of all `analytics_YYYY-MM-DD.json` files stored in the bucket. + **Format:** + ```json + { + "file_name": "string", + "created_on": "datetime" + } + ``` + +### `process_analytics_gbfs` + +This HTTP-triggered Cloud Function processes GBFS datasets by performing the following steps: + +1. **Retrieving Data**: Fetches the latest GBFS snapshot per feed from the database. +2. **Processing Data**: Analyzes the snapshot, extracting metrics related to validation notices, versions, and geographical locations. +3. **Storing Analytics**: Saves the processed data as JSON files in the Google Cloud Storage bucket, updating metrics and analytics files. + +#### Files Modified/Created: +- **`analytics_YYYY-MM-DD.json`**: Contains the GBFS analytics data for the specific date in JSON format. + **Format:** + ```json + { + "feed_id": "string", + "snapshot_id": "string", + "notices": [ + { + "keyword": "string", + "gbfs_file": "string", + "schema_path": "string" + } + ], + "created_on": "datetime", + "operator": "string", + "locations": [ + { + "country_code": "string", + "country": "string", + "municipality": "string", + "subdivision_name": "string" + } + ] + } + ``` + +- **`feed_metrics.json`**: Stores aggregated feed-level metrics, including error counts. + **Format:** + ```json + { + "feed_id": "string", + "computed_on": ["datetime"], + "errors_count": ["int"] + } + ``` + +- **`versions_metrics.json`**: Tracks the usage of different GBFS versions across feeds. + **Format:** + ```json + { + "version": "string", + "computed_on": ["datetime"], + "feeds_count": ["int"] + } + ``` + +- **`notices_metrics.json`**: Records notice metrics specific to GBFS, categorized by keyword, file, and schema path. + **Format:** + ```json + { + "keyword": "string", + "gbfs_file": "string", + "schema_path": "string", + "computed_on": ["datetime"], + "feeds_count": ["int"] + } + ``` + +- **`summary/summary_YYYY-MM-DD.json`**: Contains aggregated metrics for the specific date, including feed metrics, version metrics, and notice metrics. + **Format:** + ```json + { + "feed_metrics": [...], + "versions_metrics": [...], + "notices_metrics": [...] + } + ``` + +- **`analytics_files.json`**: Index of all `analytics_YYYY-MM-DD.json` files stored in the bucket. + **Format:** + ```json + { + "file_name": "string", + "created_on": "datetime" + } + ``` + +## Project Structure + +- **`main.py`**: Defines the HTTP-triggered Cloud Functions that initiate the GTFS and GBFS data analytics processes. +- **`processors/base_analytics_processor.py`**: Contains the base class for analytics processing, providing common logic for GTFS and GBFS processors. +- **`processors/gtfs_analytics_processor.py`**: Implements GTFS-specific data retrieval and processing logic. +- **`processors/gbfs_analytics_processor.py`**: Implements GBFS-specific data retrieval and processing logic. +- **`tests/`**: Unit tests for all modules and functions, ensuring correct functionality and robustness. + +## Project Configuration + +The following environment variables need to be set: + +- `FEEDS_DATABASE_URL`: The URL for the database containing GTFS and GBFS feeds. +- `ANALYTICS_BUCKET`: The name of the Google Cloud Storage bucket where analytics results are stored. + +## Local Development + +Refer to the main [README.md](../README.md) for general setup instructions for the development environment. \ No newline at end of file diff --git a/functions-python/preprocessed_analytics/function_config.json b/functions-python/preprocessed_analytics/function_config.json new file mode 100644 index 000000000..2d8d7589b --- /dev/null +++ b/functions-python/preprocessed_analytics/function_config.json @@ -0,0 +1,20 @@ +{ + "name": "preprocess-analytics", + "description": "Preprocess analytics", + "entry_point": "preprocess_analytics", + "timeout": 540, + "memory": "2Gi", + "trigger_http": false, + "include_folders": ["database_gen", "helpers"], + "environment_variables": [], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_ALL", + "max_instance_request_concurrency": 1, + "max_instance_count": 5, + "min_instance_count": 0, + "available_cpu": 1 +} diff --git a/functions-python/preprocessed_analytics/requirements.txt b/functions-python/preprocessed_analytics/requirements.txt new file mode 100644 index 000000000..145b6372f --- /dev/null +++ b/functions-python/preprocessed_analytics/requirements.txt @@ -0,0 +1,15 @@ +functions-framework==3.* +google-cloud-logging +google-cloud-bigquery +google-cloud-storage +psycopg2-binary==2.9.6 +aiohttp~=3.8.6 +asyncio~=3.4.3 +urllib3~=2.1.0 +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 +requests~=2.31.0 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2023.7.22 +pandas \ No newline at end of file diff --git a/functions-python/preprocessed_analytics/requirements_dev.txt b/functions-python/preprocessed_analytics/requirements_dev.txt new file mode 100644 index 000000000..800a4ac11 --- /dev/null +++ b/functions-python/preprocessed_analytics/requirements_dev.txt @@ -0,0 +1,4 @@ +Faker +pytest~=7.4.3 +urllib3-mock +requests-mock \ No newline at end of file diff --git a/functions-python/preprocessed_analytics/src/__init__.py b/functions-python/preprocessed_analytics/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/preprocessed_analytics/src/main.py b/functions-python/preprocessed_analytics/src/main.py new file mode 100644 index 000000000..b29154e19 --- /dev/null +++ b/functions-python/preprocessed_analytics/src/main.py @@ -0,0 +1,68 @@ +import logging +import traceback +from datetime import datetime + +import flask +import functions_framework +from flask import Response + +from helpers.logger import Logger +from .processors.base_analytics_processor import NoFeedDataException +from .processors.gbfs_analytics_processor import GBFSAnalyticsProcessor +from .processors.gtfs_analytics_processor import GTFSAnalyticsProcessor + +logging.basicConfig(level=logging.INFO) + + +def get_compute_date(request: flask.Request) -> datetime: + """ + Get the compute date from the request JSON. If the date is invalid, return today at midnight. + """ + try: + json_request = request.get_json() + compute_date_str = json_request.get("compute_date", None) + if compute_date_str: + return datetime.strptime(compute_date_str, "%Y%m%d") + except Exception as e: + logging.error(f"Error getting compute date: {e}") + # Return today at midnight if the date is invalid + return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + +def preprocess_analytics(request: flask.Request, processor_class) -> Response: + """ + Common logic to process analytics using the given processor class. + """ + Logger.init_logger() + logging.info(f"{processor_class.__name__} Function triggered") + compute_date = get_compute_date(request) + logging.info(f"Compute date: {compute_date}") + try: + processor = processor_class(compute_date) + processor.run() + except NoFeedDataException as e: + logging.warning(f"No feed data found for date {compute_date}: {e}") + return Response(f"No feed data found for date {compute_date}: {e}", status=404) + except Exception as e: + # Extracting the traceback details + tb = traceback.format_exc() + logging.error( + f"Error processing {processor_class.__name__} analytics: {e}\nTraceback:\n{tb}" + ) + return Response( + f"Error processing analytics for date {compute_date}: {e}", status=500 + ) + + return Response( + f"Successfully processed analytics for date: {compute_date}", status=200 + ) + + +@functions_framework.http +def preprocess_analytics_gtfs(request: flask.Request) -> Response: + return preprocess_analytics(request, GTFSAnalyticsProcessor) + + +@functions_framework.http +def preprocess_analytics_gbfs(request: flask.Request) -> Response: + return preprocess_analytics(request, GBFSAnalyticsProcessor) diff --git a/functions-python/preprocessed_analytics/src/processors/base_analytics_processor.py b/functions-python/preprocessed_analytics/src/processors/base_analytics_processor.py new file mode 100644 index 000000000..461be3cb2 --- /dev/null +++ b/functions-python/preprocessed_analytics/src/processors/base_analytics_processor.py @@ -0,0 +1,236 @@ +import logging +import os +from typing import List, Dict, Tuple + +import json +import pandas as pd +from google.cloud import storage +from sqlalchemy.orm import Query + +from database_gen.sqlacodegen_models import ( + Gbfsfeed, + Gbfssnapshot, + Gtfsfeed, + Gtfsdataset, +) +from helpers.database import start_db_session + + +class NoFeedDataException(Exception): + pass + + +class BaseAnalyticsProcessor: + def __init__(self, run_date): + self.run_date = run_date + self.session = start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False) + self.processed_feeds = set() + self.data = [] + self.feed_metrics_data = [] + self.notices_metrics_data = [] + self.storage_client = storage.Client() + self.analytics_bucket = self.storage_client.bucket( + os.getenv("ANALYTICS_BUCKET") + ) + + def get_latest_data(self) -> Query: + raise NotImplementedError("Subclasses should implement this method.") + + def process_feed_data( + self, + feed: Gtfsfeed | Gbfsfeed, + dataset_or_snapshot: Gtfsdataset | Gbfssnapshot, + translations: Dict, + ) -> None: + raise NotImplementedError("Subclasses should implement this method.") + + def save(self) -> None: + raise NotImplementedError("Subclasses should implement this method.") + + def save_summary(self) -> None: + raise NotImplementedError("Subclasses should implement this method.") + + def _load_json(self, file_name: str) -> Tuple[List[Dict], storage.Blob]: + # Read the JSON file from the specified GCS bucket + blob = self.analytics_bucket.blob(file_name) + + if blob.exists(): + json_data = blob.download_as_text() + try: + return ( + pd.read_json(json_data, convert_dates=["computed_on"]).to_dict( + orient="records" + ), + blob, + ) + except Exception as e: + logging.warning( + f"Unable to convert data to DataFrame using Pandas: {e}" + ) + return json.loads(json_data), blob + return [], blob + + @staticmethod + def _save_blob(blob: storage.Blob, data: List[Dict]) -> None: + try: + # Convert the data to JSON format + json_data = pd.DataFrame(data).to_json(orient="records", date_format="iso") + except Exception as e: + logging.warning(f"Unable to convert data to JSON using Pandas: {e}") + json_data = json.dumps(data, default=str) + + # Save the JSON file to the specified GCS bucket + blob.upload_from_string(json_data, content_type="application/json") + blob.make_public() + logging.info(f"{blob.name} saved to bucket") + + def _save_json(self, file_name: str, data: List[Dict]) -> None: + # Save the JSON file to the specified GCS bucket + blob = self.analytics_bucket.blob(file_name) + self._save_blob(blob, data) + + def aggregate_summary_files( + self, metrics_file_data: Dict[str, List], merging_keys: Dict[str, List[str]] + ) -> None: + blobs = self.analytics_bucket.list_blobs(prefix="summary/summary_") + for blob in blobs: + logging.info(f"Aggregating data from {blob.name}") + summary_data, _ = self._load_json(blob.name) + for key, new_data in summary_data.items(): + if key in metrics_file_data: + metrics_file_data[key] = self.append_new_data_if_not_exists( + metrics_file_data[key], new_data, merging_keys[key] + ) + # Save metrics to the bucket + for file_name, data in metrics_file_data.items(): + self._save_json(f"{file_name}.json", data) + + @staticmethod + def append_new_data_if_not_exists( + old_data: List[Dict], new_data: List[Dict], keys: List[str] + ) -> List[Dict]: + for new_entry in new_data: + exists = any( + all(new_entry[key] == old_entry[key] for key in keys) + for old_entry in old_data + ) + if not exists: + old_data.append(new_entry) + else: + matching_entries = [ + old_entry + for old_entry in old_data + if all(new_entry[key] == old_entry[key] for key in keys) + ] + list_to_append = [key for key in new_entry if key not in keys] + if len(list_to_append) > 0: + for entry in matching_entries: + for key in list_to_append: + entry[key].extend(new_entry[key]) + return old_data + + def save_analytics(self) -> None: + file_name = f"analytics_{self.run_date.strftime('%Y-%m-%d')}.json" + self._save_json(file_name, self.data) + self.save() + logging.info(f"Analytics saved to bucket as {file_name}") + + def run(self) -> None: + for ( + feed, + dataset_or_snapshot, + translation_fields, + ) in self._get_data_with_translations(): + self.process_feed_data(feed, dataset_or_snapshot, translation_fields) + + self.session.close() + self.save_summary() + self.save_analytics() + self.update_analytics_files() + logging.info(f"Finished running analytics for date: {self.run_date}") + + def _get_data_with_translations(self): + query = self.get_latest_data() + all_results = query.all() + if len(all_results) == 0: + raise NoFeedDataException("No feed data found") + logging.info(f"Loaded {len(all_results)} feeds to process") + try: + location_translations = [ + self._extract_translation_fields(result[2:]) for result in all_results + ] + logging.info("Location translations loaded") + location_translations_dict = { + translation["location_id"]: translation + for translation in location_translations + if translation["location_id"] is not None + } + except Exception as e: + logging.warning( + f"Error loading location translations: {e}\n Continuing without translations" + ) + location_translations_dict = {} + unique_feeds = {result[0].stable_id: result for result in all_results} + logging.info(f"Nb of unique feeds loaded: {len(unique_feeds)}") + return [ + (result[0], result[1], location_translations_dict) + for result in unique_feeds.values() + ] + + @staticmethod + def _extract_translation_fields(translation_data): + keys = [ + "location_id", + "country_code", + "country", + "subdivision_name", + "municipality", + "country_translation", + "subdivision_name_translation", + "municipality_translation", + ] + try: + return dict(zip(keys, translation_data)) + except Exception as e: + logging.error(f"Error extracting translation fields: {e}") + return dict(zip(keys, [None] * len(keys))) + + def update_analytics_files(self) -> None: + try: + # List all blobs in the analytics bucket + blobs = self.analytics_bucket.list_blobs() + + # Initialize a list to store information about each analytics file + analytics_files_list = [] + + for blob in blobs: + # Only include blobs that match the pattern for monthly analytics files + if ( + blob.name.startswith("analytics_") + and blob.name.endswith(".json") + and blob.name != "analytics_files.json" + ): + created_on = blob.time_created + analytics_files_list.append( + { + "file_name": blob.name, + "created_on": created_on, + } + ) + + # Convert the list to a DataFrame + analytics_files = pd.DataFrame(analytics_files_list) + logging.info("Analytics files list created.") + logging.info(analytics_files) + + # Save the DataFrame as analytics_files.json in the bucket + self._save_json( + "analytics_files.json", analytics_files.to_dict(orient="records") + ) + + logging.info( + "analytics_files.json created and saved to bucket successfully." + ) + + except Exception as e: + logging.error(f"Error updating analytics files: {e}") diff --git a/functions-python/preprocessed_analytics/src/processors/gbfs_analytics_processor.py b/functions-python/preprocessed_analytics/src/processors/gbfs_analytics_processor.py new file mode 100644 index 000000000..4a78ff72a --- /dev/null +++ b/functions-python/preprocessed_analytics/src/processors/gbfs_analytics_processor.py @@ -0,0 +1,171 @@ +from typing import List + +import sqlalchemy +from sqlalchemy.orm import joinedload +from sqlalchemy.sql import func, and_ + +from database_gen.sqlacodegen_models import ( + Gbfsfeed, + Gbfssnapshot, + Gbfsvalidationreport, + Gbfsnotice, +) +from .base_analytics_processor import BaseAnalyticsProcessor + + +class GBFSAnalyticsProcessor(BaseAnalyticsProcessor): + def __init__(self, run_date): + super().__init__(run_date) + self.versions_metrics_data = [] + + def get_latest_data(self) -> sqlalchemy.orm.Query: + subquery = ( + self.session.query( + Gbfssnapshot.feed_id, + func.max(Gbfssnapshot.downloaded_at).label("max_downloaded_at"), + ) + .filter(Gbfssnapshot.downloaded_at < self.run_date) + .group_by(Gbfssnapshot.feed_id) + .subquery() + ) + + query = ( + self.session.query(Gbfsfeed, Gbfssnapshot) + .join(Gbfssnapshot, Gbfsfeed.id == Gbfssnapshot.feed_id) + .join( + subquery, + and_( + Gbfssnapshot.feed_id == subquery.c.feed_id, + Gbfssnapshot.downloaded_at == subquery.c.max_downloaded_at, + ), + ) + .options( + joinedload(Gbfsfeed.locations), + joinedload(Gbfssnapshot.gbfsvalidationreports).joinedload( + Gbfsvalidationreport.gbfsnotices + ), + joinedload(Gbfssnapshot.gbfsvalidationreports), + ) + .order_by(Gbfssnapshot.downloaded_at.desc()) + ) + return query + + def process_feed_data(self, feed: Gbfsfeed, snapshot: Gbfssnapshot, _) -> None: + if feed.stable_id in self.processed_feeds: + return + self.processed_feeds.add(feed.stable_id) + + validation_reports = snapshot.gbfsvalidationreports + if not validation_reports: + return + + latest_validation_report = max(validation_reports, key=lambda x: x.validated_at) + notices = latest_validation_report.gbfsnotices + + self.data.append( + { + "feed_id": feed.stable_id, + "snapshot_id": snapshot.stable_id, + "notices": [ + { + "keyword": notice.keyword, + "gbfs_file": notice.gbfs_file, + "schema_path": notice.schema_path, + } + for notice in notices + ], + "created_on": feed.created_at, + "operator": feed.operator, + "locations": [ + { + "country_code": location.country_code, + "country": location.country, + "municipality": location.municipality, + "subdivision_name": location.subdivision_name, + } + for location in feed.locations + ], + } + ) + + self.feed_metrics_data.append( + { + "feed_id": feed.stable_id, + "computed_on": [self.run_date], + "errors_count": [len(notices)], + } + ) + + self._process_versions(feed) + self._process_notices(notices) + + def save_summary(self) -> None: + # Save the summary data for the current run date + summary_file_name = f"summary/summary_{self.run_date.strftime('%Y-%m-%d')}.json" + summary_data = { + "feed_metrics": self.feed_metrics_data, + "notices_metrics": self.notices_metrics_data, + "versions_metrics": self.versions_metrics_data, + } + self._save_json(summary_file_name, summary_data) + + def save(self) -> None: + metrics_file_data = { + "feed_metrics": [], + "versions_metrics": [], + "notices_metrics": [], + } + merging_keys = { + "feed_metrics": ["feed_id"], + "versions_metrics": ["version"], + "notices_metrics": ["keyword", "gbfs_file", "schema_path"], + } + self.aggregate_summary_files(metrics_file_data, merging_keys) + + def _process_versions(self, feed: Gbfsfeed) -> None: + for version in feed.gbfsversions: + existing_version_index = next( + ( + index + for (index, d) in enumerate(self.versions_metrics_data) + if d["version"] == version.version + ), + None, + ) + if existing_version_index is not None: + self.versions_metrics_data[existing_version_index]["feeds_count"][ + -1 + ] += 1 + else: + self.versions_metrics_data.append( + { + "version": version.version, + "computed_on": [self.run_date], + "feeds_count": [1], + } + ) + + def _process_notices(self, notices: List[Gbfsnotice]) -> None: + for notice in notices: + existing_notice_index = next( + ( + index + for (index, d) in enumerate(self.notices_metrics_data) + if d["keyword"] == notice.keyword + and d["gbfs_file"] == notice.gbfs_file + and d["schema_path"] == notice.schema_path + ), + None, + ) + if existing_notice_index is not None: + self.notices_metrics_data[existing_notice_index]["feeds_count"][-1] += 1 + else: + self.notices_metrics_data.append( + { + "keyword": notice.keyword, + "gbfs_file": notice.gbfs_file, + "schema_path": notice.schema_path, + "computed_on": [self.run_date], + "feeds_count": [1], + } + ) diff --git a/functions-python/preprocessed_analytics/src/processors/gtfs_analytics_processor.py b/functions-python/preprocessed_analytics/src/processors/gtfs_analytics_processor.py new file mode 100644 index 000000000..c4d36d7e8 --- /dev/null +++ b/functions-python/preprocessed_analytics/src/processors/gtfs_analytics_processor.py @@ -0,0 +1,190 @@ +from typing import List, Dict + +import sqlalchemy +from sqlalchemy.orm import joinedload +from sqlalchemy.sql import func, and_ +from database_gen.sqlacodegen_models import ( + Gtfsdataset, + Gtfsfeed, + Validationreport, + Notice, + Feature, + Feed, + t_location_with_translations_en, + Location, +) +from helpers.locations import translate_feed_locations +from .base_analytics_processor import BaseAnalyticsProcessor + + +class GTFSAnalyticsProcessor(BaseAnalyticsProcessor): + def __init__(self, run_date): + super().__init__(run_date) + self.features_metrics_data = [] + + def get_latest_data(self) -> sqlalchemy.orm.Query: + subquery = ( + self.session.query( + Gtfsdataset.feed_id, + func.max(Gtfsdataset.downloaded_at).label("max_downloaded_at"), + ) + .filter(Gtfsdataset.downloaded_at < self.run_date) + .group_by(Gtfsdataset.feed_id) + .subquery() + ) + + query = ( + self.session.query(Gtfsfeed, Gtfsdataset, t_location_with_translations_en) + .join(Gtfsdataset, Gtfsfeed.id == Gtfsdataset.feed_id) + .join( + subquery, + and_( + Gtfsdataset.feed_id == subquery.c.feed_id, + Gtfsdataset.downloaded_at == subquery.c.max_downloaded_at, + ), + ) + .outerjoin(Location, Feed.locations) + .outerjoin( + t_location_with_translations_en, + Location.id == t_location_with_translations_en.c.location_id, + ) + .where(Gtfsfeed.status != "deprecated") + .options( + joinedload(Gtfsfeed.locations), + joinedload(Gtfsdataset.validation_reports).joinedload( + Validationreport.notices + ), + joinedload(Gtfsdataset.validation_reports).joinedload( + Validationreport.features + ), + ) + .order_by(Gtfsdataset.downloaded_at.desc()) + ) + return query + + def save_summary(self) -> None: + # Save the summary data for the current run date + summary_file_name = f"summary/summary_{self.run_date.strftime('%Y-%m-%d')}.json" + summary_data = { + "feed_metrics": self.feed_metrics_data, + "notices_metrics": self.notices_metrics_data, + "features_metrics": self.features_metrics_data, + } + self._save_json(summary_file_name, summary_data) + + def process_feed_data( + self, feed: Feed, dataset: Gtfsdataset, translations: Dict + ) -> None: + if feed.stable_id in self.processed_feeds: + return + self.processed_feeds.add(feed.stable_id) + + validation_reports = dataset.validation_reports + if not validation_reports: + return + + translate_feed_locations(feed, translations) + + latest_validation_report = max(validation_reports, key=lambda x: x.validated_at) + notices = latest_validation_report.notices + errors = [notice for notice in notices if notice.severity == "ERROR"] + warnings = [notice for notice in notices if notice.severity == "WARNING"] + infos = [notice for notice in notices if notice.severity == "INFO"] + features = latest_validation_report.features + + self.data.append( + { + "feed_id": feed.stable_id, + "dataset_id": dataset.stable_id, + "notices": { + "errors": [error.notice_code for error in errors], + "warnings": [warning.notice_code for warning in warnings], + "infos": [info.notice_code for info in infos], + }, + "features": [feature.name for feature in features], + "created_on": feed.created_at, + "last_modified": dataset.downloaded_at, + "provider": feed.provider, + "locations": [ + { + "country_code": location.country_code, + "country": location.country, + "municipality": location.municipality, + "subdivision_name": location.subdivision_name, + } + for location in feed.locations + ], + } + ) + + self.feed_metrics_data.append( + { + "feed_id": feed.stable_id, + "computed_on": [self.run_date], + "errors_count": [len(errors)], + "warnings_count": [len(warnings)], + "infos_count": [len(infos)], + } + ) + + self._process_features(features) + self._process_notices(notices) + + def save(self) -> None: + metrics_file_data = { + "feed_metrics": [], + "features_metrics": [], + "notices_metrics": [], + } + merging_keys = { + "feed_metrics": ["feed_id"], + "features_metrics": ["feature"], + "notices_metrics": ["notice", "severity"], + } + self.aggregate_summary_files(metrics_file_data, merging_keys) + + def _process_features(self, features: List[Feature]) -> None: + for feature in features: + existing_feature_index = next( + ( + index + for (index, d) in enumerate(self.features_metrics_data) + if d["feature"] == feature.name + ), + None, + ) + if existing_feature_index is not None: + self.features_metrics_data[existing_feature_index]["feeds_count"][ + -1 + ] += 1 + else: + self.features_metrics_data.append( + { + "feature": feature.name, + "computed_on": [self.run_date], + "feeds_count": [1], + } + ) + + def _process_notices(self, notices: List[Notice]) -> None: + for notice in notices: + existing_notice_index = next( + ( + index + for (index, d) in enumerate(self.notices_metrics_data) + if d["notice"] == notice.notice_code + and d["severity"] == notice.severity + ), + None, + ) + if existing_notice_index is not None: + self.notices_metrics_data[existing_notice_index]["feeds_count"][-1] += 1 + else: + self.notices_metrics_data.append( + { + "notice": notice.notice_code, + "severity": notice.severity, + "computed_on": [self.run_date], + "feeds_count": [1], + } + ) diff --git a/functions-python/preprocessed_analytics/tests/test_base_processor.py b/functions-python/preprocessed_analytics/tests/test_base_processor.py new file mode 100644 index 000000000..bf709da22 --- /dev/null +++ b/functions-python/preprocessed_analytics/tests/test_base_processor.py @@ -0,0 +1,146 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime +import pandas as pd + +from preprocessed_analytics.src.processors.base_analytics_processor import ( + BaseAnalyticsProcessor, +) + + +class TestBaseAnalyticsProcessor(unittest.TestCase): + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.start_db_session" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.storage.Client" + ) + def setUp(self, mock_storage_client, mock_start_db_session): + self.mock_session = MagicMock() + mock_start_db_session.return_value = self.mock_session + + self.mock_storage_client = mock_storage_client + self.mock_bucket = MagicMock() + self.mock_storage_client().bucket.return_value = self.mock_bucket + + self.run_date = datetime(2024, 8, 22) + self.processor = BaseAnalyticsProcessor(self.run_date) + + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.pd.read_json" + ) + def test_load_json_exists(self, mock_read_json): + mock_blob = MagicMock() + mock_blob.exists.return_value = True + mock_blob.download_as_text.return_value = '{"key": "value"}' + mock_read_json.return_value = pd.DataFrame([{"key": "value"}]) + + data, blob = self.processor._load_json("test.json") + + mock_read_json.assert_called_once() + self.assertEqual(data, [{"key": "value"}]) + + def test_load_json_not_exists(self): + mock_blob = MagicMock() + mock_blob.exists.return_value = False + self.mock_bucket.blob.return_value = mock_blob + + data, blob = self.processor._load_json("test.json") + + mock_blob.exists.assert_called_once() + self.assertEqual(data, []) + self.assertEqual(blob, mock_blob) + + def test_save_blob(self): + mock_blob = MagicMock() + data = [{"key": "value"}] + + self.processor._save_blob(mock_blob, data) + + mock_blob.upload_from_string.assert_called_once_with( + pd.DataFrame(data).to_json(orient="records", date_format="iso"), + content_type="application/json", + ) + mock_blob.make_public.assert_called_once() + + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor._save_json" + ) + def test_save_analytics(self, _): + self.processor.data = [{"key": "value"}] + with self.assertRaises(NotImplementedError): + self.processor.save_analytics() + + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor._save_json" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.storage.Blob" + ) + def test_update_analytics_files(self, mock_blob, mock_save_json): + mock_blob = MagicMock() + self.mock_bucket.list_blobs.return_value = [mock_blob] + mock_blob.name = "analytics_2024_08.json" + mock_blob.time_created = datetime(2024, 8, 22) + + self.processor.update_analytics_files() + + mock_save_json.assert_called_once() + + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor.get_latest_data" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor.process_feed_data" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor.save_analytics" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor.update_analytics_files" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.BaseAnalyticsProcessor.save_summary" + ) + def test_run( + self, + mock_save_summary, + mock_update_analytics_files, + mock_save_analytics, + mock_process_feed_data, + mock_get_latest_data, + ): + # Create mock feed objects with a stable_id attribute + mock_feed1 = MagicMock() + mock_feed1.stable_id = "stable_id_1" + + mock_feed2 = MagicMock() + mock_feed2.stable_id = "stable_id_2" + + # Mock the dataset_or_snapshot and translation data + mock_dataset1 = MagicMock() + mock_dataset2 = MagicMock() + + translation_data1 = "translation1" + translation_data2 = "translation2" + + # Mock query and its all() method + mock_query = MagicMock() + mock_get_latest_data.return_value = mock_query + mock_query.all.return_value = [ + (mock_feed1, mock_dataset1, translation_data1), + (mock_feed2, mock_dataset2, translation_data2), + ] + + # Run the processor's run method + self.processor.run() + + # Assert that get_latest_data was called + mock_get_latest_data.assert_called_once() + + # Assert that process_feed_data was called twice (once for each feed-dataset pair) + self.assertEqual(mock_process_feed_data.call_count, 2) + + mock_save_analytics.assert_called_once() + mock_update_analytics_files.assert_called_once() + mock_save_summary.assert_called_once() diff --git a/functions-python/preprocessed_analytics/tests/test_gbfs_processor.py b/functions-python/preprocessed_analytics/tests/test_gbfs_processor.py new file mode 100644 index 000000000..215f4782d --- /dev/null +++ b/functions-python/preprocessed_analytics/tests/test_gbfs_processor.py @@ -0,0 +1,175 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime +from preprocessed_analytics.src.processors.gbfs_analytics_processor import ( + GBFSAnalyticsProcessor, +) + + +class TestGBFSAnalyticsProcessor(unittest.TestCase): + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.start_db_session" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.storage.Client" + ) + def setUp(self, mock_storage_client, mock_start_db_session): + self.mock_session = MagicMock() + mock_start_db_session.return_value = self.mock_session + + self.mock_storage_client = mock_storage_client + self.mock_bucket = MagicMock() + self.mock_storage_client().bucket.return_value = self.mock_bucket + + self.run_date = datetime(2024, 8, 22) + self.processor = GBFSAnalyticsProcessor(self.run_date) + + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor.GBFSAnalyticsProcessor.get_latest_data" + ) + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor.GBFSAnalyticsProcessor.process_feed_data" + ) + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor.GBFSAnalyticsProcessor.save" + ) + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor." + "GBFSAnalyticsProcessor.update_analytics_files" + ) + def test_run( + self, + mock_update_analytics_files, + mock_save, + mock_process_feed_data, + mock_get_latest_data, + ): + # Create mock feed objects with a stable_id attribute + mock_feed1 = MagicMock() + mock_feed1.stable_id = "stable_id_1" + + mock_feed2 = MagicMock() + mock_feed2.stable_id = "stable_id_2" + + # Mock the snapshot data + mock_snapshot1 = MagicMock() + mock_snapshot2 = MagicMock() + + # Mock query and its all() method + mock_query = MagicMock() + mock_get_latest_data.return_value = mock_query + mock_query.all.return_value = [ + (mock_feed1, mock_snapshot1), + (mock_feed2, mock_snapshot2), + ] + + # Run the processor's run method + self.processor.run() + + # Assert that get_latest_data was called + mock_get_latest_data.assert_called_once() + + # Assert that process_feed_data was called twice (once for each feed-snapshot pair) + self.assertEqual(mock_process_feed_data.call_count, 2) + mock_process_feed_data.assert_any_call(mock_feed1, mock_snapshot1, {}) + mock_process_feed_data.assert_any_call(mock_feed2, mock_snapshot2, {}) + + # Assert that save was called once + mock_save.assert_called_once() + + # Assert that update_analytics_files was called once + mock_update_analytics_files.assert_called_once() + + def test_process_feed_data(self): + # Create mock feed and snapshot + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.gbfsversions = [MagicMock(version="v1"), MagicMock(version="v2")] + mock_feed.locations = [ + MagicMock( + country_code="US", + country="United States", + municipality="City", + subdivision_name="State", + ) + ] + mock_feed.operator = "Operator1" + mock_feed.created_at = datetime(2024, 8, 1) + + mock_snapshot = MagicMock() + mock_snapshot.stable_id = "snapshot1" + mock_snapshot.gbfsvalidationreports = [ + MagicMock(validated_at=datetime(2024, 8, 20)) + ] + mock_snapshot.gbfsvalidationreports[0].gbfsnotices = [ + MagicMock(keyword="keyword1", gbfs_file="file1", schema_path="path1"), + MagicMock(keyword="keyword2", gbfs_file="file2", schema_path="path2"), + ] + + # Run process_feed_data + self.processor.process_feed_data(mock_feed, mock_snapshot, None) + + # Assert the data was appended correctly + self.assertEqual(len(self.processor.data), 1) + self.assertEqual(len(self.processor.feed_metrics_data), 1) + self.assertEqual(len(self.processor.versions_metrics_data), 2) + self.assertEqual(len(self.processor.notices_metrics_data), 2) + + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor.GBFSAnalyticsProcessor._save_blob" + ) + @patch( + "preprocessed_analytics.src.processors.gbfs_analytics_processor.GBFSAnalyticsProcessor._load_json" + ) + def test_save(self, mock_load_json, mock_save_blob): + # Mock the return values of _load_json + # First two calls return empty lists, the last one returns data + mock_load_json.return_value = ( + { + "feed_metrics": [ + { + "feed_id": "feed1", + "errors_count": [1], + "computed_on": ["2024-08-22"], + } + ] + }, + MagicMock(), + ) + + self.mock_bucket.list_blobs.return_value = [ + MagicMock(name="summary/summary_2024-08-22.json"), + MagicMock(name="feed_metrics/feed_metrics_2024-08-22.json"), + MagicMock(name="versions_metrics/versions_metrics_2024-08-22.json"), + ] + self.processor.save() + self.assertEqual(mock_load_json.call_count, 3) + self.assertEqual(mock_save_blob.call_count, 3) + + def test_process_versions(self): + # Create a mock feed with versions + mock_feed = MagicMock() + mock_feed.gbfsversions = [MagicMock(version="v1"), MagicMock(version="v2")] + + # Process versions + self.processor._process_versions(mock_feed) + + # Assert versions_metrics_data was updated correctly + self.assertEqual(len(self.processor.versions_metrics_data), 2) + self.assertEqual(self.processor.versions_metrics_data[0]["version"], "v1") + self.assertEqual(self.processor.versions_metrics_data[1]["version"], "v2") + + def test_process_notices(self): + # Create mock notices + mock_notices = [ + MagicMock(keyword="keyword1", gbfs_file="file1", schema_path="path1"), + MagicMock(keyword="keyword2", gbfs_file="file2", schema_path="path2"), + ] + + # Process notices + self.processor._process_notices(mock_notices) + + # Assert notices_metrics_data was updated correctly + self.assertEqual(len(self.processor.notices_metrics_data), 2) + self.assertEqual(self.processor.notices_metrics_data[0]["keyword"], "keyword1") + self.assertEqual(self.processor.notices_metrics_data[1]["keyword"], "keyword2") diff --git a/functions-python/preprocessed_analytics/tests/test_gtfs_processor.py b/functions-python/preprocessed_analytics/tests/test_gtfs_processor.py new file mode 100644 index 000000000..fc587ba90 --- /dev/null +++ b/functions-python/preprocessed_analytics/tests/test_gtfs_processor.py @@ -0,0 +1,181 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime +from preprocessed_analytics.src.processors.gtfs_analytics_processor import ( + GTFSAnalyticsProcessor, +) + + +class TestGTFSAnalyticsProcessor(unittest.TestCase): + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.start_db_session" + ) + @patch( + "preprocessed_analytics.src.processors.base_analytics_processor.storage.Client" + ) + def setUp(self, mock_storage_client, mock_start_db_session): + self.mock_session = MagicMock() + mock_start_db_session.return_value = self.mock_session + + self.mock_storage_client = mock_storage_client + self.mock_bucket = MagicMock() + self.mock_storage_client().bucket.return_value = self.mock_bucket + + self.run_date = datetime(2024, 8, 22) + self.processor = GTFSAnalyticsProcessor(self.run_date) + + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor.get_latest_data" + ) + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor.process_feed_data" + ) + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor.save" + ) + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor" + ".update_analytics_files" + ) + def test_run( + self, + mock_update_analytics_files, + mock_save, + mock_process_feed_data, + mock_get_latest_data, + ): + mock_feed1 = MagicMock() + mock_feed1.stable_id = "stable_id_1" + + mock_feed2 = MagicMock() + mock_feed2.stable_id = "stable_id_2" + + # Mock the dataset data + mock_dataset1 = MagicMock() + mock_dataset2 = MagicMock() + + # Mock query and its all() method + mock_query = MagicMock() + mock_get_latest_data.return_value = mock_query + mock_query.all.return_value = [ + (mock_feed1, mock_dataset1), + (mock_feed2, mock_dataset2), + ] + + # Run the processor's run method + self.processor.run() + + # Assert that get_latest_data was called + mock_get_latest_data.assert_called_once() + + # Assert that process_feed_data was called twice (once for each feed-dataset pair) + self.assertEqual(mock_process_feed_data.call_count, 2) + mock_process_feed_data.assert_any_call(mock_feed1, mock_dataset1, {}) + mock_process_feed_data.assert_any_call(mock_feed2, mock_dataset2, {}) + + # Assert that save was called once + mock_save.assert_called_once() + + # Assert that update_analytics_files was called once + mock_update_analytics_files.assert_called_once() + + def test_process_feed_data(self): + # Create mock feed and dataset + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.locations = [ + MagicMock( + country_code="US", + country="United States", + municipality="City", + subdivision_name="State", + ) + ] + mock_feed.provider = "Provider1" + mock_feed.created_at = datetime(2024, 8, 1) + + mock_dataset = MagicMock() + mock_dataset.stable_id = "dataset1" + mock_dataset.downloaded_at = datetime(2024, 8, 2) + mock_dataset.locations = mock_feed.locations + mock_dataset.validation_reports = [ + MagicMock(validated_at=datetime(2024, 8, 20)) + ] + mock_dataset.validation_reports[0].notices = [ + MagicMock(notice_code="error1", severity="ERROR"), + MagicMock(notice_code="warning1", severity="WARNING"), + MagicMock(notice_code="info1", severity="INFO"), + ] + mock_dataset.validation_reports[0].features = [MagicMock(name="feature1")] + + # Run process_feed_data + self.processor.process_feed_data(mock_feed, mock_dataset, {}) + + # Assert the data was appended correctly + self.assertEqual(len(self.processor.data), 1) + self.assertEqual(len(self.processor.feed_metrics_data), 1) + self.assertEqual(len(self.processor.features_metrics_data), 1) + self.assertEqual(len(self.processor.notices_metrics_data), 3) + + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor._save_blob" + ) + @patch( + "preprocessed_analytics.src.processors.gtfs_analytics_processor.GTFSAnalyticsProcessor._load_json" + ) + def test_save(self, mock_load_json, mock_save_blob): + mock_load_json.return_value = ( + { + "feed_metrics": [ + { + "feed_id": "feed1", + "errors_count": [1], + "computed_on": ["2024-08-22"], + } + ] + }, + MagicMock(), + ) + + # Mock the list_blobs method to return some blobs + self.mock_bucket.list_blobs.return_value = [ + MagicMock(name="summary/summary_2024-08-22.json"), + MagicMock(name="feed_metrics/feed_metrics_2024-08-22.json"), + MagicMock(name="features_metrics/features_metrics_2024-08-22.json"), + ] + + # Call save + self.processor.save() + + # Assert that _load_json was called for each metrics file + self.assertEqual(mock_load_json.call_count, 3) + + # Assert that _save_blob was called three times + self.assertEqual(mock_save_blob.call_count, 3) + + def test_process_features(self): + # Create mock features + mock_features = [MagicMock(name="feature1"), MagicMock(name="feature2")] + + # Process features + self.processor._process_features(mock_features) + + # Assert features_metrics_data was updated correctly + self.assertEqual(len(self.processor.features_metrics_data), 2) + + def test_process_notices(self): + # Create mock notices + mock_notices = [ + MagicMock(notice_code="error1", severity="ERROR"), + MagicMock(notice_code="warning1", severity="WARNING"), + MagicMock(notice_code="info1", severity="INFO"), + ] + + # Process notices + self.processor._process_notices(mock_notices) + + # Assert notices_metrics_data was updated correctly + self.assertEqual(len(self.processor.notices_metrics_data), 3) + self.assertEqual(self.processor.notices_metrics_data[0]["notice"], "error1") + self.assertEqual(self.processor.notices_metrics_data[1]["notice"], "warning1") + self.assertEqual(self.processor.notices_metrics_data[2]["notice"], "info1") diff --git a/functions-python/preprocessed_analytics/tests/test_main.py b/functions-python/preprocessed_analytics/tests/test_main.py new file mode 100644 index 000000000..42142c14a --- /dev/null +++ b/functions-python/preprocessed_analytics/tests/test_main.py @@ -0,0 +1,93 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime +from flask import Request +from preprocessed_analytics.src.main import ( + preprocess_analytics_gtfs, + preprocess_analytics_gbfs, + get_compute_date, + preprocess_analytics, +) +from preprocessed_analytics.src.processors.gtfs_analytics_processor import ( + GTFSAnalyticsProcessor, +) +from preprocessed_analytics.src.processors.gbfs_analytics_processor import ( + GBFSAnalyticsProcessor, +) +from flask import Response + + +class TestAnalyticsFunctions(unittest.TestCase): + @patch("preprocessed_analytics.src.main.preprocess_analytics_gtfs") + @patch("preprocessed_analytics.src.main.preprocess_analytics_gbfs") + def setUp(self, mock_process_analytics_gtfs, mock_process_analytics_gbfs): + self.mock_request = MagicMock(spec=Request) + self.mock_request.get_json.return_value = {"compute_date": "20240822"} + + @patch("preprocessed_analytics.src.main.get_compute_date") + def test_get_compute_date_valid(self, mock_get_compute_date): + compute_date = get_compute_date(self.mock_request) + self.assertEqual(compute_date, datetime(2024, 8, 22)) + + def test_get_compute_date_invalid(self): + # Mock request with invalid compute_date format + self.mock_request.get_json.return_value = {"compute_date": "invalid_date"} + compute_date = get_compute_date(self.mock_request) + self.assertIsInstance(compute_date, datetime) + self.assertLessEqual(compute_date, datetime.now()) + + @patch("preprocessed_analytics.src.main.GTFSAnalyticsProcessor.run") + @patch("preprocessed_analytics.src.main.preprocess_analytics") + def test_process_analytics_gtfs_success(self, mock_process_analytics, mock_run): + mock_run.return_value = None + mock_process_analytics.return_value = Response(None, status=200) + response = preprocess_analytics_gtfs(self.mock_request) + mock_process_analytics.assert_called_once_with( + self.mock_request, GTFSAnalyticsProcessor + ) + self.assertEqual(response.status_code, 200) + + @patch("preprocessed_analytics.src.main.GBFSAnalyticsProcessor.run") + @patch("preprocessed_analytics.src.main.preprocess_analytics") + def test_process_analytics_gbfs_success(self, mock_process_analytics, mock_run): + mock_run.return_value = None + mock_process_analytics.return_value = Response(None, status=200) + response = preprocess_analytics_gbfs(self.mock_request) + mock_process_analytics.assert_called_once_with( + self.mock_request, GBFSAnalyticsProcessor + ) + self.assertEqual(response.status_code, 200) + + @patch("preprocessed_analytics.src.main.Logger.init_logger") + @patch("preprocessed_analytics.src.main.GTFSAnalyticsProcessor.run") + def test_process_analytics_gtfs_error(self, mock_run, _): + mock_run.side_effect = Exception("Test error") + response = preprocess_analytics_gtfs(self.mock_request) + self.assertEqual(response.status_code, 500) + + @patch("preprocessed_analytics.src.main.Logger.init_logger") + @patch("preprocessed_analytics.src.main.GBFSAnalyticsProcessor.run") + def test_process_analytics_gbfs_error(self, mock_run, _): + mock_run.side_effect = Exception("Test error") + response = preprocess_analytics_gbfs(self.mock_request) + self.assertEqual(response.status_code, 500) + + @patch("preprocessed_analytics.src.main.Logger.init_logger") + @patch("preprocessed_analytics.src.main.GTFSAnalyticsProcessor.run") + @patch("preprocessed_analytics.src.main.GTFSAnalyticsProcessor.__init__") + def test_process_analytics_success(self, mock_init, mock_run, _): + mock_run.return_value = None + mock_init.return_value = None + response = preprocess_analytics(self.mock_request, GTFSAnalyticsProcessor) + self.assertEqual(response.status_code, 200) + self.assertIn( + "Successfully processed analytics for date:", response.data.decode() + ) + + @patch("preprocessed_analytics.src.main.Logger.init_logger") + @patch("preprocessed_analytics.src.main.GTFSAnalyticsProcessor.run") + def test_process_analytics_failure(self, mock_run, _): + mock_run.side_effect = Exception("Processing error") + response = preprocess_analytics(self.mock_request, GTFSAnalyticsProcessor) + self.assertEqual(response.status_code, 500) + self.assertIn("Error processing analytics for date", response.data.decode()) diff --git a/infra/metrics/main.tf b/infra/metrics/main.tf index c3e2fc73c..ade1658c6 100644 --- a/infra/metrics/main.tf +++ b/infra/metrics/main.tf @@ -26,6 +26,10 @@ locals { # Validation report conversion to ndjson function config function_validation_report_conversion_config = jsondecode(file("${path.module}/../../functions-python/validation_to_ndjson/function_config.json")) function_validation_report_conversion_zip = "${path.module}/../../functions-python/validation_to_ndjson/.dist/validation_to_ndjson.zip" + + # Preprocessed analytics function config + function_preprocessed_analytics_config = jsondecode(file("${path.module}/../../functions-python/preprocessed_analytics/function_config.json")) + function_preprocessed_analytics_zip = "${path.module}/../../functions-python/preprocessed_analytics/.dist/preprocessed_analytics.zip" } locals { @@ -33,7 +37,8 @@ locals { # Combine all keys into a list all_secret_keys_list = concat( [for x in local.function_big_query_ingest_config.secret_environment_variables : x.key], - [for x in local.function_validation_report_conversion_config.secret_environment_variables : x.key] + [for x in local.function_validation_report_conversion_config.secret_environment_variables : x.key], + [for x in local.function_preprocessed_analytics_config.secret_environment_variables : x.key] ) # Convert the list to a set to ensure uniqueness @@ -85,6 +90,13 @@ resource "google_storage_bucket_object" "function_validation_report_conversion" source = local.function_validation_report_conversion_zip } +# 3. Preprocessed analytics function +resource "google_storage_bucket_object" "function_preprocessed_analytics" { + name = "preprocessed-analytics-${substr(filebase64sha256(local.function_preprocessed_analytics_zip),0,10)}.zip" + bucket = google_storage_bucket.functions_bucket.name + source = local.function_preprocessed_analytics_zip +} + # 2. Cloud Function # 2.1. GTFS - Big Query data ingestion function resource "google_cloudfunctions2_function" "gtfs_big_query_ingest" { @@ -402,6 +414,118 @@ resource "google_cloudfunctions2_function" "gbfs_validation_report_conversion_ba } } +# 2.7 GTFS - Preprocessed analytics function +resource "google_storage_bucket" "gtfs_analytics_bucket" { + location = var.gcp_region + name = "mobilitydata-gtfs-analytics-${var.environment}" + cors { + origin = ["*"] + method = ["GET"] + response_header = ["*"] + } +} + +resource "google_cloudfunctions2_function" "gtfs_preprocessed_analytics" { + name = "${local.function_preprocessed_analytics_config.name}-gtfs" + project = var.project_id + description = local.function_preprocessed_analytics_config.description + location = var.gcp_region + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member] + + build_config { + runtime = var.python_runtime + entry_point = "${local.function_preprocessed_analytics_config.entry_point}_gtfs" + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.function_preprocessed_analytics.name + } + } + } + service_config { + environment_variables = { + PYTHONNODEBUGRANGES = 0 + ANALYTICS_BUCKET = google_storage_bucket.gtfs_analytics_bucket.name + } + available_memory = local.function_preprocessed_analytics_config.memory + timeout_seconds = local.function_preprocessed_analytics_config.timeout + available_cpu = local.function_preprocessed_analytics_config.available_cpu + max_instance_request_concurrency = local.function_preprocessed_analytics_config.max_instance_request_concurrency + max_instance_count = local.function_preprocessed_analytics_config.max_instance_count + min_instance_count = local.function_preprocessed_analytics_config.min_instance_count + service_account_email = google_service_account.metrics_service_account.email + ingress_settings = "ALLOW_ALL" + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + dynamic "secret_environment_variables" { + for_each = local.function_preprocessed_analytics_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } + +} + +# 2.7 GBFS - Preprocessed analytics function +resource "google_storage_bucket" "gbfs_analytics_bucket" { + location = var.gcp_region + name = "mobilitydata-gbfs-analytics-${var.environment}" + cors { + origin = ["*"] + method = ["GET"] + response_header = ["*"] + } +} +resource "google_cloudfunctions2_function" "gbfs_preprocessed_analytics" { + name = "${local.function_preprocessed_analytics_config.name}-gbfs" + project = var.project_id + description = local.function_preprocessed_analytics_config.description + location = var.gcp_region + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member] + + build_config { + runtime = var.python_runtime + entry_point = "${local.function_preprocessed_analytics_config.entry_point}_gbfs" + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.function_preprocessed_analytics.name + } + } + } + service_config { + environment_variables = { + PYTHONNODEBUGRANGES = 0 + ANALYTICS_BUCKET = google_storage_bucket.gbfs_analytics_bucket.name + } + available_memory = local.function_preprocessed_analytics_config.memory + timeout_seconds = local.function_preprocessed_analytics_config.timeout + available_cpu = local.function_preprocessed_analytics_config.available_cpu + max_instance_request_concurrency = local.function_preprocessed_analytics_config.max_instance_request_concurrency + max_instance_count = local.function_preprocessed_analytics_config.max_instance_count + min_instance_count = local.function_preprocessed_analytics_config.min_instance_count + service_account_email = google_service_account.metrics_service_account.email + ingress_settings = "ALLOW_ALL" + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + dynamic "secret_environment_variables" { + for_each = local.function_preprocessed_analytics_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } + +} + + # Grant permissions to the service account # 1. BigQuery roles resource "google_project_iam_member" "big_query_data_editor_permissions" { @@ -490,4 +614,39 @@ resource "google_cloud_scheduler_job" "gbfs_ingestion_scheduler" { } } } +# 3. GTFS - data preprocessed analytics scheduler +resource "google_cloud_scheduler_job" "gtfs_preprocessed_analytics_scheduler" { + name = "gtfs-preprocessed-analytics-scheduler" + project = var.project_id + description = "GTFS preprocessed analytics scheduler" + region = var.gcp_region + paused = var.environment == "prod" ? false : true + schedule = var.gtfs_data_preprocessor_schedule + time_zone = "UTC" + + http_target { + uri = google_cloudfunctions2_function.gtfs_preprocessed_analytics.url + http_method = "POST" + oidc_token { + service_account_email = google_service_account.metrics_service_account.email + } + } +} +# 4. GBFS - data preprocessed analytics scheduler +resource "google_cloud_scheduler_job" "gbfs_preprocessed_analytics_scheduler" { + name = "gbfs-preprocessed-analytics-scheduler" + project = var.project_id + description = "GBFS preprocessed analytics scheduler" + region = var.gcp_region + paused = var.environment == "prod" ? false : true + schedule = var.gbfs_data_preprocessor_schedule + time_zone = "UTC" + http_target { + uri = google_cloudfunctions2_function.gbfs_preprocessed_analytics.url + http_method = "POST" + oidc_token { + service_account_email = google_service_account.metrics_service_account.email + } + } +} diff --git a/infra/metrics/vars.tf b/infra/metrics/vars.tf index 0e015a407..e21a5ae91 100644 --- a/infra/metrics/vars.tf +++ b/infra/metrics/vars.tf @@ -75,4 +75,16 @@ variable gbfs_data_schedule { type = string description = "Schedule for GBFS data ingestion" default = "0 0 2 * *" # Midnight on the 2nd of every month +} + +variable "gtfs_data_preprocessor_schedule" { + type = string + description = "Schedule for GTFS data preprocessor" + default = "0 0 2 * *" # Midnight on the 2nd of every month +} + +variable "gbfs_data_preprocessor_schedule" { + type = string + description = "Schedule for GBFS data preprocessor" + default = "0 0 2 * *" # Midnight on the 2nd of every month } \ No newline at end of file