Skip to content

Commit

Permalink
fix: trigger downstream tasks when a new feed is added (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y authored Nov 13, 2024
1 parent ca3e5cb commit d5c46c2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/db-update.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:

- name: Google Cloud Setup
uses: google-github-actions/setup-gcloud@v2

- name: Load secrets from 1Password
uses: 1password/[email protected]
with:
Expand All @@ -97,12 +97,12 @@ jobs:
GCP_FEED_SSH_USER: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_SSH_USER/username"
GCP_FEED_BASTION_NAME: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_NAME/username"
GCP_FEED_BASTION_SSH_KEY: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_SSH_KEY/private key"

- name: Tunnel
run: |
mkdir -p ~/.ssh
echo "${{ env.GCP_FEED_BASTION_SSH_KEY }}" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
./scripts/tunnel-create.sh -project_id ${{ inputs.PROJECT_ID }} -zone ${{ inputs.REGION }}-a -instance ${{ env.GCP_FEED_BASTION_NAME }}-${{ inputs.DB_ENVIRONMENT}} -target_account ${{ env.GCP_FEED_SSH_USER }} -db_instance ${{ secrets.POSTGRE_SQL_INSTANCE_NAME }}
sleep 10 # Wait for the tunnel to establish
Expand All @@ -112,21 +112,21 @@ jobs:
PGPASSWORD=${{ secrets.DB_USER_PASSWORD }} psql -h localhost -p 5432 -U ${{ secrets.DB_USER_NAME }} -d ${{ inputs.DB_NAME }} -c "SELECT version();"
- name: Run Liquibase
run: |
run: |
wget -O- https://repo.liquibase.com/liquibase.asc | gpg --dearmor > liquibase-keyring.gpg && \
cat liquibase-keyring.gpg | sudo tee /usr/share/keyrings/liquibase-keyring.gpg > /dev/null && \
echo 'deb [trusted=yes arch=amd64 signed-by=/usr/share/keyrings/liquibase-keyring.gpg] https://repo.liquibase.com stable main' | sudo tee /etc/apt/sources.list.d/liquibase.list
sudo apt-get update
sudo apt-get install liquibase=4.25.1
export LIQUIBASE_CLASSPATH="liquibase"
export LIQUIBASE_COMMAND_CHANGELOG_FILE="changelog.xml"
export LIQUIBASE_COMMAND_URL=jdbc:postgresql://localhost:5432/${{ inputs.DB_NAME }}
export LIQUIBASE_COMMAND_USERNAME=${{ secrets.DB_USER_NAME }}
export LIQUIBASE_COMMAND_PASSWORD=${{ secrets.DB_USER_PASSWORD }}
export LIQUIBASE_LOG_LEVEL=FINE
liquibase update
db-content-update:
Expand Down Expand Up @@ -224,7 +224,7 @@ jobs:
if: ${{ github.event_name == 'repository_dispatch' || github.event_name == 'workflow_dispatch' }}
runs-on: ubuntu-latest
steps:
- name: Authenticate to Google Cloud QA/PROD
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_MOBILITY_FEEDS_SA_KEY }}
Expand Down
38 changes: 25 additions & 13 deletions api/src/scripts/load_dataset_on_create.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
import json
import logging
import os
import threading
import uuid
from typing import List
from concurrent import futures

from database_gen.sqlacodegen_models import Feed
from google.auth import default
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.futures import Future

env = os.getenv("ENV", "dev")
pubsub_topic_name = f"datasets-batch-topic-{env}"
project_id = f"mobility-feeds-{env}"
from database_gen.sqlacodegen_models import Feed
from utils.logger import Logger

# Lazy create so we won't try to connect to google cloud when the file is imported.
pubsub_client = None

lock = threading.Lock()
logger = Logger("load_dataset_on_create").get_logger()


def get_pubsub_client():
with lock:
global pubsub_client
if pubsub_client is None:
pubsub_client = pubsub_v1.PublisherClient()

return pubsub_client


def get_topic_path():
if pubsub_topic_name is None or project_id is None:
raise ValueError("PUBSUB_TOPIC_NAME and PROJECT_ID must be set in the environment")

env = os.getenv("ENV", "dev")
pubsub_topic_name = f"datasets-batch-topic-{env}"
project_id = f"mobility-feeds-{env}" # Cannot use GOOGLE_CLOUD_PROJECT because it points to QA for DEV
return get_pubsub_client().topic_path(project_id, pubsub_topic_name)


Expand All @@ -42,16 +44,17 @@ def publish_callback(future: Future, stable_id: str, topic_path: str):
@param topic_path: The path to the Pub/Sub topic
"""
if future.exception():
logging.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
logger.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
else:
logging.info(f"Published stable_id = {stable_id}.")
logger.info(f"Published stable_id = {stable_id}.")


def publish(feed: Feed, topic_path: str):
def publish(feed: Feed, topic_path: str) -> Future:
"""
Publishes a feed to the Pub/Sub topic.
:param feed: The feed to publish
:param topic_path: The path to the Pub/Sub topic
:return: The Future object representing the result of the publishing operation
"""
payload = {
"execution_id": f"batch-uuid-{uuid.uuid4()}",
Expand All @@ -67,6 +70,7 @@ def publish(feed: Feed, topic_path: str):
data_bytes = json.dumps(payload).encode("utf-8")
future = get_pubsub_client().publish(topic_path, data=data_bytes)
future.add_done_callback(lambda _: publish_callback(future, feed.stable_id, topic_path))
return future


def publish_all(feeds: List[Feed]):
Expand All @@ -75,6 +79,14 @@ def publish_all(feeds: List[Feed]):
:param feeds: The list of feeds to publish
"""
topic_path = get_topic_path()
logger.info(f"Publishing {len(feeds)} feeds to Pub/Sub topic {topic_path}...")
credentials, project = default()
logger.info(f"Authenticated project: {project}")
logger.info(f"Service Account Email: {credentials.service_account_email}")
publish_futures = []
for feed in feeds:
publish(feed, topic_path)
logging.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")
logger.info(f"Publishing feed {feed.stable_id} to Pub/Sub topic {topic_path}...")
future = publish(feed, topic_path)
publish_futures.append(future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
logger.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")
2 changes: 1 addition & 1 deletion api/src/scripts/populate_db_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def trigger_downstream_tasks(self):
Trigger downstream tasks after populating the database
"""
self.logger.info("Triggering downstream tasks")
self.logger.debug(
self.logger.info(
f"New feeds added to the database: "
f"{','.join([feed.stable_id for feed in self.added_gtfs_feeds] if self.added_gtfs_feeds else [])}"
)
Expand Down

0 comments on commit d5c46c2

Please sign in to comment.