Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to cc2, add sensors to long-running tasks #305

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 67 additions & 39 deletions orca_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryInsertJobOperator,
Expand All @@ -14,6 +14,7 @@
ComputeEngineStopInstanceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import (
BigQueryToBigQueryOperator,
)
Expand All @@ -37,30 +38,32 @@
This DAG retrieves data from GitHub and updates the tables in the `orca` BigQuery dataset
"""

args = get_default_args(pocs=["Jennifer"])
args["retries"] = 1

production_dataset = "orca"
staging_dataset = f"staging_{production_dataset}"
backup_dataset = f"{production_dataset}_backups"
tmp_dir = f"{production_dataset}/tmp"
sql_dir = f"sql/{production_dataset}"
gce_resource_id = "orca-etl"
vm_working_dir = "current_run"

default_args = get_default_args()

dag = DAG(
with DAG(
"orca_updater",
default_args=default_args,
default_args=args,
description="Updates ORCA data",
user_defined_macros={
"staging_dataset": staging_dataset,
"production_dataset": production_dataset,
},
schedule_interval="0 0 1 * *",
catchup=False,
)
) as dag:
tmp_dir = f"{production_dataset}/tmp"
sql_dir = f"sql/{production_dataset}"
gce_resource_id = "orca-etl"
ssh_command = (
f"gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "
+ '"{}"'
)

with dag:
clear_dl_dir = GCSDeleteObjectsOperator(
task_id="clear_dl_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir
)
Expand Down Expand Up @@ -90,46 +93,67 @@
task_id="start-" + gce_resource_id,
)

# Pull the repos from BQ, along with repos specified by custom lists
retrieve_repos_sequence = [
f"mkdir {vm_working_dir}",
f"cd {vm_working_dir}",
working_dir = "current_run"
prep_environment_sequence = [
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts/*.sh .",
f"mkdir {working_dir}",
f"cd {working_dir}",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/input_data .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/requirements.txt .",
"python3 -m pip install -r requirements.txt",
"PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq",
]
vm_script = f"rm -r {vm_working_dir};" + " && ".join(retrieve_repos_sequence)
prep_environment_script = f"rm -r {working_dir};" + " && ".join(
prep_environment_sequence
)
prep_environment = BashOperator(
task_id="prep_environment",
bash_command=ssh_command.format(prep_environment_script),
)

# Pull the repos from BQ, along with repos specified by custom lists
retrieve_repos = BashOperator(
task_id="retrieve_repos",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
bash_command=ssh_command.format(
f"bash retrieve_repos.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} &> retrieve_repos_log &"
),
)
wait_for_retrieve_repos = GCSObjectExistenceSensor(
task_id="wait_for_retrieve_repos",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/retrieve_repos",
deferrable=True,
)

# Retrieve full metadata for each repo from the GitHub API
get_full_repo_metadata_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py",
]
vm_script = " && ".join(get_full_repo_metadata_sequence)

get_full_repo_metadata = BashOperator(
task_id="get_full_repo_metadata",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
get_full_metadata = BashOperator(
task_id="get_full_metadata",
bash_command=ssh_command.format(
f"bash get_full_metadata.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} &> get_full_metadata_log &"
),
)
wait_for_get_full_metadata = GCSObjectExistenceSensor(
task_id="wait_for_get_full_metadata",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/get_full_metadata",
deferrable=True,
)

# Scrape GitHub for READMEs and additional metadata we aren't otherwise able to collect
scrape_gh_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl",
f"gsutil cp curr_repos_final.jsonl gs://{DATA_BUCKET}/{tmp_dir}/",
]
vm_script = " && ".join(scrape_gh_sequence)

scrape_gh = BashOperator(
task_id="scrape_gh",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
scrape_additional_metadata = BashOperator(
task_id="scrape_additional_metadata",
bash_command=ssh_command.format(
f"bash scrape_additional_metadata.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} {tmp_dir} &> scrape_additional_metadata_log &"
),
)
wait_for_scrape_additional_metadataa = GCSObjectExistenceSensor(
task_id="wait_for_scrape_additional_metadata",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/scrape_additional_metadata",
deferrable=True,
)

gce_instance_stop = ComputeEngineStopInstanceOperator(
Expand All @@ -154,9 +178,13 @@
clear_dl_dir
>> extract_repo_mentions
>> gce_instance_start
>> prep_environment
>> retrieve_repos
>> get_full_repo_metadata
>> scrape_gh
>> wait_for_retrieve_repos
>> get_full_metadata
>> wait_for_get_full_metadata
>> scrape_additional_metadata
>> wait_for_scrape_additional_metadataa
>> gce_instance_stop
>> load_data_to_bq
)
Expand Down
12 changes: 6 additions & 6 deletions push_to_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
gsutil cp orca_data_pipeline.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil cp orca_data_pipeline.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
gsutil -m rm gs://airflow-data-exchange/orca/schemas/*
gsutil -m cp schemas/* gs://airflow-data-exchange/orca/schemas/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/*
gsutil -m cp schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/
gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/*
gsutil -m cp schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/
gsutil rm -r gs://airflow-data-exchange/orca/code
gsutil -m cp -r scripts gs://airflow-data-exchange/orca/code/
gsutil -m cp -r input_data gs://airflow-data-exchange/orca/code/
gsutil cp requirements.txt gs://airflow-data-exchange/orca/code/
gsutil cp sequences/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/orca/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/*
gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/
gsutil cp sequences/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sequences/orca/
gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/*
gsutil -m cp sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/
11 changes: 11 additions & 0 deletions scripts/get_full_metadata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
done_file="get_full_metadata"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"
11 changes: 11 additions & 0 deletions scripts/retrieve_repos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
done_file="retrieve_repos"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"
13 changes: 13 additions & 0 deletions scripts/scrape_additional_metadata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
tmp_dir=$4
done_file="scrape_additional_metadata"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl
gsutil cp curr_repos_final.jsonl "gs://${data_bucket}/${tmp_dir}" && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"
Loading