From d4041c2ecf5b015096f2d48f0eb89c31d7a3e331 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Mon, 11 Mar 2024 10:56:03 -0400 Subject: [PATCH 1/2] Move to cc2, add sensors to long-running tasks --- orca_data_pipeline.py | 107 ++++++++++++++++---------- push_to_airflow.sh | 12 +-- scripts/get_full_metadata.sh | 11 +++ scripts/retrieve_repos.sh | 11 +++ scripts/scrape_additional_metadata.sh | 13 ++++ 5 files changed, 109 insertions(+), 45 deletions(-) create mode 100644 scripts/get_full_metadata.sh create mode 100644 scripts/retrieve_repos.sh create mode 100644 scripts/scrape_additional_metadata.sh diff --git a/orca_data_pipeline.py b/orca_data_pipeline.py index bcdfe63..b884c4c 100644 --- a/orca_data_pipeline.py +++ b/orca_data_pipeline.py @@ -4,7 +4,7 @@ from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy import DummyOperator -from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCheckOperator, BigQueryInsertJobOperator, @@ -14,6 +14,7 @@ ComputeEngineStopInstanceOperator, ) from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator +from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import ( BigQueryToBigQueryOperator, ) @@ -37,20 +38,17 @@ This DAG retrieves data from GitHub and updates the tables in the `orca` BigQuery dataset """ +args = get_default_args(pocs=["Jennifer"]) +args["retries"] = 1 +args["on_failure_callback"] = None production_dataset = "orca" staging_dataset = f"staging_{production_dataset}" backup_dataset = f"{production_dataset}_backups" -tmp_dir = f"{production_dataset}/tmp" -sql_dir = f"sql/{production_dataset}" -gce_resource_id = "orca-etl" -vm_working_dir = "current_run" -default_args = get_default_args() - -dag = DAG( +with DAG( "orca_updater", - default_args=default_args, + default_args=args, description="Updates ORCA data", user_defined_macros={ "staging_dataset": staging_dataset, @@ -58,9 +56,15 @@ }, schedule_interval="0 0 1 * *", catchup=False, -) +) as dag: + tmp_dir = f"{production_dataset}/tmp" + sql_dir = f"sql/{production_dataset}" + gce_resource_id = "orca-etl" + ssh_command = ( + f"gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command " + + '"{}"' + ) -with dag: clear_dl_dir = GCSDeleteObjectsOperator( task_id="clear_dl_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir ) @@ -90,46 +94,67 @@ task_id="start-" + gce_resource_id, ) - # Pull the repos from BQ, along with repos specified by custom lists - retrieve_repos_sequence = [ - f"mkdir {vm_working_dir}", - f"cd {vm_working_dir}", + working_dir = "current_run" + prep_environment_sequence = [ + f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts/*.sh .", + f"mkdir {working_dir}", + f"cd {working_dir}", f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts .", f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/input_data .", f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/requirements.txt .", "python3 -m pip install -r requirements.txt", - "PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq", ] - vm_script = f"rm -r {vm_working_dir};" + " && ".join(retrieve_repos_sequence) + prep_environment_script = f"rm -r {working_dir};" + " && ".join( + prep_environment_sequence + ) + prep_environment = BashOperator( + task_id="prep_environment", + bash_command=ssh_command.format(prep_environment_script), + ) + # Pull the repos from BQ, along with repos specified by custom lists retrieve_repos = BashOperator( task_id="retrieve_repos", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"', + bash_command=ssh_command.format( + f"bash retrieve_repos.sh {working_dir} {DATA_BUCKET} " + f"{production_dataset} &> retrieve_repos_log &" + ), + ) + wait_for_retrieve_repos = GCSObjectExistenceSensor( + task_id="wait_for_retrieve_repos", + bucket=DATA_BUCKET, + object=f"{production_dataset}/done_files/retrieve_repos", + deferrable=True, ) # Retrieve full metadata for each repo from the GitHub API - get_full_repo_metadata_sequence = [ - f"cd {vm_working_dir}", - "PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py", - ] - vm_script = " && ".join(get_full_repo_metadata_sequence) - - get_full_repo_metadata = BashOperator( - task_id="get_full_repo_metadata", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"', + get_full_metadata = BashOperator( + task_id="get_full_metadata", + bash_command=ssh_command.format( + f"bash get_full_metadata.sh {working_dir} {DATA_BUCKET} " + f"{production_dataset} &> get_full_metadata_log &" + ), + ) + wait_for_get_full_metadata = GCSObjectExistenceSensor( + task_id="wait_for_get_full_metadata", + bucket=DATA_BUCKET, + object=f"{production_dataset}/done_files/get_full_metadata", + deferrable=True, ) # Scrape GitHub for READMEs and additional metadata we aren't otherwise able to collect - scrape_gh_sequence = [ - f"cd {vm_working_dir}", - "PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl", - f"gsutil cp curr_repos_final.jsonl gs://{DATA_BUCKET}/{tmp_dir}/", - ] - vm_script = " && ".join(scrape_gh_sequence) - - scrape_gh = BashOperator( - task_id="scrape_gh", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"', + scrape_additional_metadata = BashOperator( + task_id="scrape_additional_metadata", + bash_command=ssh_command.format( + f"bash scrape_additional_metadata.sh {working_dir} {DATA_BUCKET} " + f"{production_dataset} {tmp_dir} &> scrape_additional_metadata_log &" + ), + ) + wait_for_scrape_additional_metadataa = GCSObjectExistenceSensor( + task_id="wait_for_scrape_additional_metadata", + bucket=DATA_BUCKET, + object=f"{production_dataset}/done_files/scrape_additional_metadata", + deferrable=True, ) gce_instance_stop = ComputeEngineStopInstanceOperator( @@ -154,9 +179,13 @@ clear_dl_dir >> extract_repo_mentions >> gce_instance_start + >> prep_environment >> retrieve_repos - >> get_full_repo_metadata - >> scrape_gh + >> wait_for_retrieve_repos + >> get_full_metadata + >> wait_for_get_full_metadata + >> scrape_additional_metadata + >> wait_for_scrape_additional_metadataa >> gce_instance_stop >> load_data_to_bq ) diff --git a/push_to_airflow.sh b/push_to_airflow.sh index afeee02..30f5b89 100644 --- a/push_to_airflow.sh +++ b/push_to_airflow.sh @@ -1,12 +1,12 @@ -gsutil cp orca_data_pipeline.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ +gsutil cp orca_data_pipeline.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/ gsutil -m rm gs://airflow-data-exchange/orca/schemas/* gsutil -m cp schemas/* gs://airflow-data-exchange/orca/schemas/ -gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/* -gsutil -m cp schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/ +gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/* +gsutil -m cp schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/ gsutil rm -r gs://airflow-data-exchange/orca/code gsutil -m cp -r scripts gs://airflow-data-exchange/orca/code/ gsutil -m cp -r input_data gs://airflow-data-exchange/orca/code/ gsutil cp requirements.txt gs://airflow-data-exchange/orca/code/ -gsutil cp sequences/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/orca/ -gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/* -gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/ +gsutil cp sequences/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sequences/orca/ +gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/* +gsutil -m cp sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/ diff --git a/scripts/get_full_metadata.sh b/scripts/get_full_metadata.sh new file mode 100644 index 0000000..ac25405 --- /dev/null +++ b/scripts/get_full_metadata.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +working_dir=$1 +data_bucket=$2 +production_dataset=$3 +done_file="get_full_metadata" + +cd $working_dir +gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}" +PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py && touch $done_file +gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/" diff --git a/scripts/retrieve_repos.sh b/scripts/retrieve_repos.sh new file mode 100644 index 0000000..2adc346 --- /dev/null +++ b/scripts/retrieve_repos.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +working_dir=$1 +data_bucket=$2 +production_dataset=$3 +done_file="retrieve_repos" + +cd $working_dir +gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}" +PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq && touch $done_file +gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/" diff --git a/scripts/scrape_additional_metadata.sh b/scripts/scrape_additional_metadata.sh new file mode 100644 index 0000000..5386548 --- /dev/null +++ b/scripts/scrape_additional_metadata.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +working_dir=$1 +data_bucket=$2 +production_dataset=$3 +tmp_dir=$4 +done_file="scrape_additional_metadata" + +cd $working_dir +gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}" +PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl +gsutil cp curr_repos_final.jsonl "gs://${data_bucket}/${tmp_dir}" && touch $done_file +gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/" From e9339b8eb996ca13ecd205a9f8e73b186bf266a4 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Mon, 11 Mar 2024 11:03:27 -0400 Subject: [PATCH 2/2] Re-enable failure notifications --- orca_data_pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/orca_data_pipeline.py b/orca_data_pipeline.py index b884c4c..13fb283 100644 --- a/orca_data_pipeline.py +++ b/orca_data_pipeline.py @@ -40,7 +40,6 @@ args = get_default_args(pocs=["Jennifer"]) args["retries"] = 1 -args["on_failure_callback"] = None production_dataset = "orca" staging_dataset = f"staging_{production_dataset}"