diff --git a/.github/workflows/db-update.yml b/.github/workflows/db-update.yml index 28dff01c4..2765a726c 100644 --- a/.github/workflows/db-update.yml +++ b/.github/workflows/db-update.yml @@ -87,7 +87,7 @@ jobs: - name: Google Cloud Setup uses: google-github-actions/setup-gcloud@v2 - + - name: Load secrets from 1Password uses: 1password/load-secrets-action@v2.0.0 with: @@ -97,12 +97,12 @@ jobs: GCP_FEED_SSH_USER: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_SSH_USER/username" GCP_FEED_BASTION_NAME: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_NAME/username" GCP_FEED_BASTION_SSH_KEY: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_SSH_KEY/private key" - + - name: Tunnel run: | mkdir -p ~/.ssh echo "${{ env.GCP_FEED_BASTION_SSH_KEY }}" > ~/.ssh/id_rsa - chmod 600 ~/.ssh/id_rsa + chmod 600 ~/.ssh/id_rsa ./scripts/tunnel-create.sh -project_id ${{ inputs.PROJECT_ID }} -zone ${{ inputs.REGION }}-a -instance ${{ env.GCP_FEED_BASTION_NAME }}-${{ inputs.DB_ENVIRONMENT}} -target_account ${{ env.GCP_FEED_SSH_USER }} -db_instance ${{ secrets.POSTGRE_SQL_INSTANCE_NAME }} sleep 10 # Wait for the tunnel to establish @@ -112,21 +112,21 @@ jobs: PGPASSWORD=${{ secrets.DB_USER_PASSWORD }} psql -h localhost -p 5432 -U ${{ secrets.DB_USER_NAME }} -d ${{ inputs.DB_NAME }} -c "SELECT version();" - name: Run Liquibase - run: | + run: | wget -O- https://repo.liquibase.com/liquibase.asc | gpg --dearmor > liquibase-keyring.gpg && \ cat liquibase-keyring.gpg | sudo tee /usr/share/keyrings/liquibase-keyring.gpg > /dev/null && \ echo 'deb [trusted=yes arch=amd64 signed-by=/usr/share/keyrings/liquibase-keyring.gpg] https://repo.liquibase.com stable main' | sudo tee /etc/apt/sources.list.d/liquibase.list - + sudo apt-get update sudo apt-get install liquibase=4.25.1 - + export LIQUIBASE_CLASSPATH="liquibase" export LIQUIBASE_COMMAND_CHANGELOG_FILE="changelog.xml" export LIQUIBASE_COMMAND_URL=jdbc:postgresql://localhost:5432/${{ inputs.DB_NAME }} export LIQUIBASE_COMMAND_USERNAME=${{ secrets.DB_USER_NAME }} export LIQUIBASE_COMMAND_PASSWORD=${{ secrets.DB_USER_PASSWORD }} export LIQUIBASE_LOG_LEVEL=FINE - + liquibase update db-content-update: @@ -224,7 +224,7 @@ jobs: if: ${{ github.event_name == 'repository_dispatch' || github.event_name == 'workflow_dispatch' }} runs-on: ubuntu-latest steps: - - name: Authenticate to Google Cloud QA/PROD + - name: Authenticate to Google Cloud uses: google-github-actions/auth@v2 with: credentials_json: ${{ secrets.GCP_MOBILITY_FEEDS_SA_KEY }} diff --git a/api/src/scripts/load_dataset_on_create.py b/api/src/scripts/load_dataset_on_create.py index c211bfe13..9bd78cc41 100644 --- a/api/src/scripts/load_dataset_on_create.py +++ b/api/src/scripts/load_dataset_on_create.py @@ -1,21 +1,22 @@ import json -import logging import os import threading import uuid from typing import List +from concurrent import futures -from database_gen.sqlacodegen_models import Feed +from google.auth import default from google.cloud import pubsub_v1 from google.cloud.pubsub_v1.futures import Future -env = os.getenv("ENV", "dev") -pubsub_topic_name = f"datasets-batch-topic-{env}" -project_id = f"mobility-feeds-{env}" +from database_gen.sqlacodegen_models import Feed +from utils.logger import Logger + # Lazy create so we won't try to connect to google cloud when the file is imported. pubsub_client = None lock = threading.Lock() +logger = Logger("load_dataset_on_create").get_logger() def get_pubsub_client(): @@ -23,13 +24,14 @@ def get_pubsub_client(): global pubsub_client if pubsub_client is None: pubsub_client = pubsub_v1.PublisherClient() + return pubsub_client def get_topic_path(): - if pubsub_topic_name is None or project_id is None: - raise ValueError("PUBSUB_TOPIC_NAME and PROJECT_ID must be set in the environment") - + env = os.getenv("ENV", "dev") + pubsub_topic_name = f"datasets-batch-topic-{env}" + project_id = f"mobility-feeds-{env}" # Cannot use GOOGLE_CLOUD_PROJECT because it points to QA for DEV return get_pubsub_client().topic_path(project_id, pubsub_topic_name) @@ -42,16 +44,17 @@ def publish_callback(future: Future, stable_id: str, topic_path: str): @param topic_path: The path to the Pub/Sub topic """ if future.exception(): - logging.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}") + logger.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}") else: - logging.info(f"Published stable_id = {stable_id}.") + logger.info(f"Published stable_id = {stable_id}.") -def publish(feed: Feed, topic_path: str): +def publish(feed: Feed, topic_path: str) -> Future: """ Publishes a feed to the Pub/Sub topic. :param feed: The feed to publish :param topic_path: The path to the Pub/Sub topic + :return: The Future object representing the result of the publishing operation """ payload = { "execution_id": f"batch-uuid-{uuid.uuid4()}", @@ -67,6 +70,7 @@ def publish(feed: Feed, topic_path: str): data_bytes = json.dumps(payload).encode("utf-8") future = get_pubsub_client().publish(topic_path, data=data_bytes) future.add_done_callback(lambda _: publish_callback(future, feed.stable_id, topic_path)) + return future def publish_all(feeds: List[Feed]): @@ -75,6 +79,14 @@ def publish_all(feeds: List[Feed]): :param feeds: The list of feeds to publish """ topic_path = get_topic_path() + logger.info(f"Publishing {len(feeds)} feeds to Pub/Sub topic {topic_path}...") + credentials, project = default() + logger.info(f"Authenticated project: {project}") + logger.info(f"Service Account Email: {credentials.service_account_email}") + publish_futures = [] for feed in feeds: - publish(feed, topic_path) - logging.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.") + logger.info(f"Publishing feed {feed.stable_id} to Pub/Sub topic {topic_path}...") + future = publish(feed, topic_path) + publish_futures.append(future) + futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + logger.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.") diff --git a/api/src/scripts/populate_db_gtfs.py b/api/src/scripts/populate_db_gtfs.py index 0a749f429..e391363f5 100644 --- a/api/src/scripts/populate_db_gtfs.py +++ b/api/src/scripts/populate_db_gtfs.py @@ -228,7 +228,7 @@ def trigger_downstream_tasks(self): Trigger downstream tasks after populating the database """ self.logger.info("Triggering downstream tasks") - self.logger.debug( + self.logger.info( f"New feeds added to the database: " f"{','.join([feed.stable_id for feed in self.added_gtfs_feeds] if self.added_gtfs_feeds else [])}" )