From a619098dd30c7c9ced201e79da01293fb12b239b Mon Sep 17 00:00:00 2001 From: Abdelhak Marouane Date: Tue, 29 Oct 2024 15:33:50 -0500 Subject: [PATCH] Add S3 event bridge --- .../lambda_function.py | 60 -- .../requirements.txt | 1 - infrastructure/s3_event_bridge_lambda.tf | 203 ------ sm2a/dags/__init__.py | 0 sm2a/dags/example_dag.py | 94 +++ sm2a/dags/generate_dags.py | 62 ++ sm2a/dags/rds_example_dag.py | 89 +++ sm2a/dags/requirements-constraints.txt | 663 ++++++++++++++++++ sm2a/dags/requirements.txt | 20 + sm2a/dags/veda_data_pipeline/__init__.py | 0 .../veda_data_pipeline/groups/__init__.py | 0 .../groups/collection_group.py | 91 +++ .../groups/discover_group.py | 89 +++ .../veda_data_pipeline/groups/ecs_tasks.py | 117 ++++ .../groups/processing_tasks.py | 40 ++ .../groups/transfer_group.py | 65 ++ .../veda_data_pipeline/requirements_dev.txt | 1 + sm2a/dags/veda_data_pipeline/utils/README.md | 26 + .../dags/veda_data_pipeline/utils/__init__.py | 0 .../utils/build_stac/handler.py | 133 ++++ .../utils/build_stac/utils/__init__.py | 0 .../utils/build_stac/utils/events.py | 19 + .../utils/build_stac/utils/regex.py | 91 +++ .../utils/build_stac/utils/role.py | 10 + .../utils/build_stac/utils/stac.py | 142 ++++ .../utils/cogify_transfer/handler.py | 86 +++ .../utils/cogify_transfer/requirements.txt | 11 + .../utils/collection_generation.py | 138 ++++ .../veda_data_pipeline/utils/s3_discovery.py | 292 ++++++++ sm2a/dags/veda_data_pipeline/utils/schemas.py | 15 + .../veda_data_pipeline/utils/submit_stac.py | 135 ++++ .../dags/veda_data_pipeline/utils/transfer.py | 110 +++ .../utils/vector_ingest/handler.py | 377 ++++++++++ .../utils/vector_ingest/requirements.txt | 7 + .../veda_collection_pipeline.py | 49 ++ .../veda_dataset_pipeline.py | 101 +++ .../veda_discover_pipeline.py | 112 +++ .../veda_generic_vector_pipeline.py | 84 +++ .../veda_transfer_pipeline.py | 50 ++ .../veda_vector_pipeline.py | 71 ++ sm2a/infrastructure/.terraform.lock.hcl | 19 + .../lambda_function.py | 61 ++ sm2a/infrastructure/locals.tf | 12 + sm2a/infrastructure/s3_event_bridge_lambda.tf | 173 +++++ sm2a/infrastructure/variables.tf | 15 + 45 files changed, 3670 insertions(+), 264 deletions(-) delete mode 100644 infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py delete mode 100644 infrastructure/functions/s3_event_bridge_to_sfn_execute/requirements.txt delete mode 100644 infrastructure/s3_event_bridge_lambda.tf create mode 100644 sm2a/dags/__init__.py create mode 100644 sm2a/dags/example_dag.py create mode 100644 sm2a/dags/generate_dags.py create mode 100644 sm2a/dags/rds_example_dag.py create mode 100644 sm2a/dags/requirements-constraints.txt create mode 100644 sm2a/dags/requirements.txt create mode 100644 sm2a/dags/veda_data_pipeline/__init__.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/__init__.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/collection_group.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/discover_group.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/ecs_tasks.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/processing_tasks.py create mode 100644 sm2a/dags/veda_data_pipeline/groups/transfer_group.py create mode 100644 sm2a/dags/veda_data_pipeline/requirements_dev.txt create mode 100644 sm2a/dags/veda_data_pipeline/utils/README.md create mode 100644 sm2a/dags/veda_data_pipeline/utils/__init__.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/handler.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/utils/__init__.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/utils/events.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/utils/regex.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/utils/role.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/build_stac/utils/stac.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/cogify_transfer/handler.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt create mode 100644 sm2a/dags/veda_data_pipeline/utils/collection_generation.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/s3_discovery.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/schemas.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/submit_stac.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/transfer.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/vector_ingest/handler.py create mode 100644 sm2a/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt create mode 100644 sm2a/dags/veda_data_pipeline/veda_collection_pipeline.py create mode 100644 sm2a/dags/veda_data_pipeline/veda_dataset_pipeline.py create mode 100644 sm2a/dags/veda_data_pipeline/veda_discover_pipeline.py create mode 100644 sm2a/dags/veda_data_pipeline/veda_generic_vector_pipeline.py create mode 100644 sm2a/dags/veda_data_pipeline/veda_transfer_pipeline.py create mode 100644 sm2a/dags/veda_data_pipeline/veda_vector_pipeline.py create mode 100644 sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py create mode 100644 sm2a/infrastructure/locals.tf create mode 100644 sm2a/infrastructure/s3_event_bridge_lambda.tf diff --git a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py b/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py deleted file mode 100644 index 65ff8d6f..00000000 --- a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py +++ /dev/null @@ -1,60 +0,0 @@ -import boto3 -import http.client -import os -import base64 -import ast -import json - - -#mwaa_env_name = 'veda-pipeline-dev-mwaa' -mwaa_env_name = os.getenv('TARGET_MWAA_ENV') -dag_name = os.getenv('TARGET_DAG_NAME') -mwaa_cli_command = os.getenv('TARGET_DAG_COMMAND') -client = boto3.client('mwaa') - - -def lambda_handler(event, context): - for record in event['Records']: - print(f"[ RECORD ]: {record}") - s3_event_key = record['s3']['object']['key'] - print(f"[ S3 EVENT KEY ]: {s3_event_key}") - s3_filename_target = os.path.split(s3_event_key)[-1] - print(f"[ S3 FILENAME TARGET ]: {s3_filename_target}") - s3_filename_no_ext = os.path.splitext(s3_filename_target)[0] - print(f"[ S3 FILENAME NO EXT ]: {s3_filename_no_ext}") - - bucket_key_prefix = "EIS/FEDSoutput/Snapshot_COPY/" - if s3_filename_no_ext.startswith("lf_"): - bucket_key_prefix = "EIS/FEDSoutput/LFArchive_COPY/" - - # get web token - mwaa_cli_token = client.create_cli_token( - Name=mwaa_env_name - ) - print(f"[ CLI TOKEN ]: {mwaa_cli_token}") - serialized_args = json.dumps({ - "discovery": "s3", - "collection": s3_filename_no_ext, - "prefix": bucket_key_prefix, - "bucket": "veda-data-store-staging", - "filename_regex": f"^(.*){s3_filename_target}$", - "vector": True - }) - conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) - payload = f"{mwaa_cli_command} {dag_name} --conf '{serialized_args}'" - print(f"[ CLI PAYLOAD ]: {payload}") - headers = { - 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], - 'Content-Type': 'text/plain' - } - conn.request("POST", "/aws_mwaa/cli", payload, headers) - res = conn.getresponse() - data = res.read() - dict_str = data.decode("UTF-8") - mydata = ast.literal_eval(dict_str) - print(f"[ DATA ]: {mydata}") - print(f"[ STDOUT ]: {base64.b64decode(mydata['stdout'])}") - return { - 'statusCode': 200, - 'body': json.dumps('Hello from Lambda!') - } diff --git a/infrastructure/functions/s3_event_bridge_to_sfn_execute/requirements.txt b/infrastructure/functions/s3_event_bridge_to_sfn_execute/requirements.txt deleted file mode 100644 index 30ddf823..00000000 --- a/infrastructure/functions/s3_event_bridge_to_sfn_execute/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -boto3 diff --git a/infrastructure/s3_event_bridge_lambda.tf b/infrastructure/s3_event_bridge_lambda.tf deleted file mode 100644 index d4e6a0b4..00000000 --- a/infrastructure/s3_event_bridge_lambda.tf +++ /dev/null @@ -1,203 +0,0 @@ -##################################################### -# Variables -##################################################### -variable "DEPLOY_VECTOR_AUTOMATION" { - type = bool - default = false -} - -##################################################### -# Execution Role -##################################################### -resource "aws_iam_role" "lambda_exec_role" { - count = var.deploy_vector_automation ? 1 : 0 - - provider = aws.aws_current - name = "lambda-exec-role-s3-event-bridge-veda-${var.stage}" - - assume_role_policy = <> discover_from_cmr + + start >> discover_from_s3 >> move_files_to_maap_store + ( + [discover_from_cmr, move_files_to_maap_store] + >> generate_cmr_metadata + >> push_to_cmr + >> end + ) diff --git a/sm2a/dags/generate_dags.py b/sm2a/dags/generate_dags.py new file mode 100644 index 00000000..7bd2c31b --- /dev/null +++ b/sm2a/dags/generate_dags.py @@ -0,0 +1,62 @@ +""" +Builds a DAG for each collection (indicated by a .json file) in the /collections/ folder. +These DAGs are used to discover and ingest items for each collection. +""" + +from airflow.models.variable import Variable + +from veda_data_pipeline.veda_discover_pipeline import get_discover_dag + + +def generate_dags(): + import boto3 + import json + from botocore.exceptions import ClientError, NoCredentialsError + + from pathlib import Path + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + bucket = airflow_vars_json.get("EVENT_BUCKET") + + try: + client = boto3.client("s3") + response = client.list_objects_v2(Bucket=bucket, Prefix="collections/") + except ClientError as e: + # Handle general AWS service errors (e.g., wrong bucket name) + print(f"ClientError: {e}") + return + except NoCredentialsError: + # Handle missing credentials + print("Credentials not found.") + return + except Exception as ex: + print(f"An unexpected error occurred: {ex}") + return + for file_ in response.get("Contents", []): + key = file_["Key"] + if key.endswith("/"): + continue + file_name = Path(key).stem + result = client.get_object(Bucket=bucket, Key=key) + discovery_configs = result["Body"].read().decode() + discovery_configs = json.loads(discovery_configs) + + # Allow the file content to be either one config or a list of configs + if type(discovery_configs) is dict: + discovery_configs = [discovery_configs] + scheduled_discovery_configs = [ + discovery_config + for discovery_config in discovery_configs + if discovery_config.get("schedule") + ] + for idx, discovery_config in enumerate(scheduled_discovery_configs): + id = f"discover-{file_name}" + if idx > 0: + id = f"{id}-{idx}" + get_discover_dag( + id=id, event=discovery_config + ) + + +generate_dags() diff --git a/sm2a/dags/rds_example_dag.py b/sm2a/dags/rds_example_dag.py new file mode 100644 index 00000000..66420b79 --- /dev/null +++ b/sm2a/dags/rds_example_dag.py @@ -0,0 +1,89 @@ +from __future__ import annotations +from airflow import DAG +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.hooks.postgres_hook import PostgresHook +from datetime import datetime, date +import json +from airflow.decorators import task + + +def json_serial(obj): + """JSON serializer for objects not serializable by default json code""" + if isinstance(obj, (datetime, date)): + return obj.isoformat() + raise TypeError("Type %s not serializable" % type(obj)) + + +# [START postgres_operator_howto_guide] + + +# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by +# instantiating the Postgres Operator + +DAG_ID = "postgres_operator_dag" + + +with DAG( + dag_id=DAG_ID, + start_date=datetime(2020, 2, 2), + schedule="@once", + catchup=False, + tags=["example"], +) as dag: + # [START postgres_operator_howto_guide_create_pet_table] + create_pet_table = PostgresOperator( + postgres_conn_id="cluster_rds_connection", + task_id="create_pet_table", + sql=""" + CREATE TABLE IF NOT EXISTS pet ( + pet_id SERIAL PRIMARY KEY, + name VARCHAR NOT NULL, + pet_type VARCHAR NOT NULL, + birth_date DATE NOT NULL, + OWNER VARCHAR NOT NULL); + """, + ) + # [END postgres_operator_howto_guide_create_pet_table] + # [START postgres_operator_howto_guide_populate_pet_table] + populate_pet_table = PostgresOperator( + postgres_conn_id="cluster_rds_connection", + task_id="populate_pet_table", + sql=""" + INSERT INTO pet (name, pet_type, birth_date, OWNER) + VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane'); + INSERT INTO pet (name, pet_type, birth_date, OWNER) + VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil'); + INSERT INTO pet (name, pet_type, birth_date, OWNER) + VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily'); + INSERT INTO pet (name, pet_type, birth_date, OWNER) + VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne'); + """, + ) + # [END postgres_operator_howto_guide_populate_pet_table] + # [START postgres_operator_howto_guide_get_all_pets] + + @task + def get_all_pets(): + sql = "SELECT * FROM pet" + pg_hook = PostgresHook(postgres_conn_id="cluster_rds_connection") + connection = pg_hook.get_conn() + cursor = connection.cursor() + cursor.execute(sql) + results = cursor.fetchall() + for result in results: + print(result) + return {"results": json.dumps(results, default=json_serial)} + + # [END postgres_operator_howto_guide_get_all_pets] + # [START postgres_operator_howto_guide_get_birth_date] + get_birth_date = PostgresOperator( + postgres_conn_id="cluster_rds_connection", + task_id="get_birth_date", + sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s", + parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + runtime_parameters={"statement_timeout": "3000ms"}, + ) + # [END postgres_operator_howto_guide_get_birth_date] + + create_pet_table >> populate_pet_table >> get_all_pets() >> get_birth_date + # [END postgres_operator_howto_guide] diff --git a/sm2a/dags/requirements-constraints.txt b/sm2a/dags/requirements-constraints.txt new file mode 100644 index 00000000..a3bfd18b --- /dev/null +++ b/sm2a/dags/requirements-constraints.txt @@ -0,0 +1,663 @@ +# +# This constraints file was automatically generated on 2023-01-18T18:46:04Z +# via "eager-upgrade" mechanism of PIP. For the "v2-5-test" branch of Airflow. +# This variant of constraints install uses the HEAD of the branch version for 'apache-airflow' but installs +# the providers from PIP-released packages at the moment of the constraint generation. +# +# Those constraints are actually those that regular users use to install released version of Airflow. +# We also use those constraints after "apache-airflow" is released and the constraints are tagged with +# "constraints-X.Y.Z" tag to build the production image for that version. +# +# +# This constraints file is meant to be used only in the "apache-airflow" installation command and not +# in all subsequent pip commands. By using a constraints.txt file, we ensure that solely the Airflow +# installation step is reproducible. Subsequent pip commands may install packages that would have +# been incompatible with the constraints used in Airflow reproducible installation step. Finally, pip +# commands that might change the installed version of apache-airflow should include "apache-airflow==X.Y.Z" +# in the list of install targets to prevent Airflow accidental upgrade or downgrade. +# +# Typical installation process of airflow for Python 3.8 is (with random selection of extras and custom +# dependencies added), usually consists of two steps: +# +# 1. Reproducible installation of airflow with selected providers (note constraints are used): +# +# pip install "apache-airflow[celery,cncf.kubernetes,google,amazon,snowflake]==X.Y.Z" \ +# --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-X.Y.Z/constraints-3.8.txt" +# +# 2. Installing own dependencies that are potentially not matching the constraints (note constraints are not +# used, and apache-airflow==X.Y.Z is used to make sure there is no accidental airflow upgrade/downgrade. +# +# pip install "apache-airflow==X.Y.Z" "snowflake-connector-python[pandas]==2.9.0" +# +APScheduler==3.6.3 +Authlib==1.2.0 +Babel==2.11.0 +ConfigUpdater==3.1.1 +Deprecated==1.2.13 +Flask-AppBuilder==4.1.4 +Flask-Babel==2.0.0 +Flask-Bcrypt==1.0.1 +Flask-Caching==2.0.2 +Flask-JWT-Extended==4.4.4 +Flask-Login==0.6.2 +Flask-SQLAlchemy==2.5.1 +Flask-Session==0.4.0 +Flask-WTF==1.1.1 +Flask==2.2.2 +GitPython==3.1.30 +HeapDict==1.0.1 +JPype1==1.4.1 +JayDeBeApi==1.2.3 +Jinja2==3.1.2 +Mako==1.2.4 +Markdown==3.4.1 +MarkupSafe==2.1.2 +PyGithub==1.57 +PyHive==0.6.5 +PyJWT==2.6.0 +PyNaCl==1.5.0 +PyYAML==6.0 +Pygments==2.14.0 +SQLAlchemy-JSONField==1.0.1.post0 +SQLAlchemy-Utils==0.39.0 +SQLAlchemy==1.4.46 +SecretStorage==3.3.3 +Sphinx==5.3.0 +Unidecode==1.3.6 +WTForms==3.0.1 +Werkzeug==2.2.2 +adal==1.2.7 +aiofiles==22.1.0 +aiohttp==3.8.3 +aiosignal==1.3.1 +alabaster==0.7.13 +alembic==1.9.2 +aliyun-python-sdk-core==2.13.36 +aliyun-python-sdk-kms==2.16.0 +amqp==5.1.1 +analytics-python==1.4.0 +ansiwrap==0.8.4 +anyio==3.6.2 +apache-airflow-providers-airbyte==3.2.0 +apache-airflow-providers-alibaba==2.2.0 +apache-airflow-providers-amazon==7.1.0 +apache-airflow-providers-apache-beam==4.1.1 +apache-airflow-providers-apache-cassandra==3.1.0 +apache-airflow-providers-apache-drill==2.3.1 +apache-airflow-providers-apache-druid==3.3.1 +apache-airflow-providers-apache-hdfs==3.2.0 +apache-airflow-providers-apache-hive==5.1.1 +apache-airflow-providers-apache-kylin==3.1.0 +apache-airflow-providers-apache-livy==3.2.0 +apache-airflow-providers-apache-pig==4.0.0 +apache-airflow-providers-apache-pinot==4.0.1 +apache-airflow-providers-apache-spark==4.0.0 +apache-airflow-providers-apache-sqoop==3.1.0 +apache-airflow-providers-arangodb==2.1.0 +apache-airflow-providers-asana==2.1.0 +apache-airflow-providers-atlassian-jira==2.0.0 +apache-airflow-providers-celery==3.1.0 +apache-airflow-providers-cloudant==3.1.0 +apache-airflow-providers-cncf-kubernetes==5.1.1 +apache-airflow-providers-common-sql==1.3.3 +apache-airflow-providers-databricks==4.0.0 +apache-airflow-providers-datadog==3.1.0 +apache-airflow-providers-dbt-cloud==2.3.1 +apache-airflow-providers-dingding==3.1.0 +apache-airflow-providers-discord==3.1.0 +apache-airflow-providers-docker==3.4.0 +apache-airflow-providers-elasticsearch==4.3.3 +apache-airflow-providers-exasol==4.1.3 +apache-airflow-providers-facebook==3.1.0 +apache-airflow-providers-ftp==3.3.0 +apache-airflow-providers-github==2.2.0 +apache-airflow-providers-google==8.8.0 +apache-airflow-providers-grpc==3.1.0 +apache-airflow-providers-hashicorp==3.2.0 +apache-airflow-providers-http==4.1.1 +apache-airflow-providers-imap==3.1.1 +apache-airflow-providers-influxdb==2.1.0 +apache-airflow-providers-jdbc==3.3.0 +apache-airflow-providers-jenkins==3.2.0 +apache-airflow-providers-microsoft-azure==5.1.0 +apache-airflow-providers-microsoft-mssql==3.3.2 +apache-airflow-providers-microsoft-psrp==2.2.0 +apache-airflow-providers-microsoft-winrm==3.1.1 +apache-airflow-providers-mongo==3.1.1 +apache-airflow-providers-mysql==4.0.0 +apache-airflow-providers-neo4j==3.2.1 +apache-airflow-providers-odbc==3.2.1 +apache-airflow-providers-openfaas==3.1.0 +apache-airflow-providers-opsgenie==5.0.0 +apache-airflow-providers-oracle==3.6.0 +apache-airflow-providers-pagerduty==3.1.0 +apache-airflow-providers-papermill==3.1.0 +apache-airflow-providers-plexus==3.1.0 +apache-airflow-providers-postgres==5.4.0 +apache-airflow-providers-presto==4.2.1 +apache-airflow-providers-qubole==3.3.1 +apache-airflow-providers-redis==3.1.0 +apache-airflow-providers-salesforce==5.3.0 +apache-airflow-providers-samba==4.1.0 +apache-airflow-providers-segment==3.1.0 +apache-airflow-providers-sendgrid==3.1.0 +apache-airflow-providers-sftp==4.2.1 +apache-airflow-providers-singularity==3.1.0 +apache-airflow-providers-slack==7.2.0 +apache-airflow-providers-snowflake==4.0.2 +apache-airflow-providers-sqlite==3.3.1 +apache-airflow-providers-ssh==3.4.0 +apache-airflow-providers-tableau==4.0.0 +apache-airflow-providers-tabular==1.1.0 +apache-airflow-providers-telegram==3.1.1 +apache-airflow-providers-trino==4.3.1 +apache-airflow-providers-vertica==3.3.1 +apache-airflow-providers-yandex==3.2.0 +apache-airflow-providers-zendesk==4.2.0 +apache-beam==2.44.0 +apispec==3.3.2 +appdirs==1.4.4 +argcomplete==2.0.0 +arrow==1.2.3 +asana==3.0.0 +asgiref==3.6.0 +asn1crypto==1.5.1 +astroid==2.11.7 +asttokens==2.2.1 +async-timeout==4.0.2 +asynctest==0.13.0 +atlasclient==1.0.0 +atlassian-python-api==3.32.2 +attrs==22.2.0 +aws-sam-translator==1.57.0 +aws-xray-sdk==2.11.0 +azure-batch==13.0.0 +azure-common==1.1.28 +azure-core==1.26.2 +azure-cosmos==4.3.0 +azure-datalake-store==0.0.52 +azure-identity==1.12.0 +azure-keyvault-secrets==4.6.0 +azure-kusto-data==0.0.45 +azure-mgmt-containerinstance==1.5.0 +azure-mgmt-core==1.3.2 +azure-mgmt-datafactory==1.1.0 +azure-mgmt-datalake-nspkg==3.0.1 +azure-mgmt-datalake-store==0.5.0 +azure-mgmt-nspkg==3.0.2 +azure-mgmt-resource==22.0.0 +azure-nspkg==3.0.2 +azure-servicebus==7.8.2 +azure-storage-blob==12.14.1 +azure-storage-common==2.1.0 +azure-storage-file-datalake==12.9.1 +azure-storage-file==2.1.0 +azure-synapse-spark==0.7.0 +backcall==0.2.0 +backoff==1.10.0 +bcrypt==4.0.1 +beautifulsoup4==4.11.1 +billiard==3.6.4.0 +black==23.1a1 +bleach==5.0.1 +blinker==1.5 +boto3==1.26.51 +boto==2.49.0 +botocore==1.29.51 +bowler==0.9.0 +cachelib==0.9.0 +cachetools==4.2.2 +cassandra-driver==3.25.0 +cattrs==22.2.0 +celery==5.2.7 +certifi==2022.12.7 +cffi==1.15.1 +cfgv==3.3.1 +cfn-lint==0.72.9 +cgroupspy==0.2.2 +chardet==4.0.0 +charset-normalizer==2.1.1 +checksumdir==1.2.0 +ciso8601==2.3.0 +click-default-group==1.2.2 +click-didyoumean==0.3.0 +click-plugins==1.1.1 +click-repl==0.2.0 +click==8.1.3 +clickclick==20.10.2 +cloudant==2.15.0 +cloudpickle==2.2.0 +colorama==0.4.6 +colorlog==4.8.0 +commonmark==0.9.1 +connexion==2.14.1 +coverage==7.0.5 +crcmod==1.7 +cron-descriptor==1.2.32 +croniter==1.3.8 +cryptography==38.0.4 +curlify==2.2.1 +dask==2023.1.0 +databricks-sql-connector==2.2.0 +datadog==0.44.0 +db-dtypes==1.0.5 +decorator==5.1.1 +defusedxml==0.7.1 +dill==0.3.1.1 +distlib==0.3.6 +distributed==2023.1.0 +dnspython==2.3.0 +docker==6.0.1 +docopt==0.6.2 +docutils==0.19 +ecdsa==0.18.0 +elasticsearch-dbapi==0.2.9 +elasticsearch-dsl==7.4.0 +elasticsearch==7.13.4 +email-validator==1.3.0 +entrypoints==0.4 +eralchemy2==1.3.6 +eventlet==0.33.3 +exceptiongroup==1.1.0 +execnet==1.9.0 +executing==1.2.0 +facebook-business==15.0.2 +fastavro==1.7.0 +fasteners==0.18 +fastjsonschema==2.16.2 +filelock==3.9.0 +fissix==21.11.13 +flake8-colors==0.1.9 +flake8==6.0.0 +flake8_implicit_str_concat==0.3.0 +flaky==3.7.0 +flower==1.2.0 +freezegun==1.2.2 +frozenlist==1.3.3 +fsspec==2022.11.0 +future==0.18.3 +gcloud-aio-auth==4.1.5 +gcloud-aio-bigquery==6.2.0 +gcloud-aio-storage==8.0.0 +gcsfs==2022.11.0 +geomet==0.2.1.post1 +gevent==22.10.2 +gitdb==4.0.10 +google-ads==18.0.0 +google-api-core==2.8.2 +google-api-python-client==1.12.11 +google-auth-httplib2==0.1.0 +google-auth-oauthlib==0.8.0 +google-auth==2.16.0 +google-cloud-aiplatform==1.16.1 +google-cloud-appengine-logging==1.1.3 +google-cloud-audit-log==0.2.4 +google-cloud-automl==2.8.0 +google-cloud-bigquery-datatransfer==3.7.0 +google-cloud-bigquery-storage==2.14.1 +google-cloud-bigquery==2.34.4 +google-cloud-bigtable==1.7.3 +google-cloud-build==3.9.0 +google-cloud-compute==0.7.0 +google-cloud-container==2.11.1 +google-cloud-core==2.3.2 +google-cloud-datacatalog==3.9.0 +google-cloud-dataform==0.2.0 +google-cloud-dataplex==1.1.0 +google-cloud-dataproc-metastore==1.6.0 +google-cloud-dataproc==5.0.0 +google-cloud-dlp==1.0.2 +google-cloud-kms==2.12.0 +google-cloud-language==1.3.2 +google-cloud-logging==3.2.1 +google-cloud-memcache==1.4.1 +google-cloud-monitoring==2.11.0 +google-cloud-orchestration-airflow==1.4.1 +google-cloud-os-login==2.7.1 +google-cloud-pubsub==2.13.5 +google-cloud-redis==2.9.0 +google-cloud-resource-manager==1.6.0 +google-cloud-secret-manager==1.0.2 +google-cloud-spanner==1.19.3 +google-cloud-speech==1.3.4 +google-cloud-storage==2.7.0 +google-cloud-tasks==2.10.1 +google-cloud-texttospeech==1.0.3 +google-cloud-translate==1.7.2 +google-cloud-videointelligence==1.16.3 +google-cloud-vision==1.0.2 +google-cloud-workflows==1.7.1 +google-crc32c==1.5.0 +google-resumable-media==2.4.0 +googleapis-common-protos==1.56.4 +graphql-core==3.2.3 +graphviz==0.20.1 +greenlet==2.0.1 +grpc-google-iam-v1==0.12.4 +grpcio-gcp==0.2.2 +grpcio-status==1.48.2 +grpcio==1.51.1 +gssapi==1.8.2 +gunicorn==20.1.0 +h11==0.14.0 +hdfs==2.7.0 +hmsclient==0.1.1 +httpcore==0.16.3 +httplib2==0.20.4 +httpx==0.23.3 +humanize==4.4.0 +hvac==1.0.2 +identify==2.5.13 +idna==3.4 +ijson==3.2.0.post0 +imagesize==1.4.1 +importlib-metadata==6.0.0 +incremental==22.10.0 +inflection==0.5.1 +influxdb-client==1.35.0 +iniconfig==2.0.0 +ipdb==0.13.11 +ipython==8.8.0 +isodate==0.6.1 +isort==5.11.2 +itsdangerous==2.1.2 +jaraco.classes==3.2.3 +jedi==0.18.2 +jeepney==0.8.0 +jira==3.4.1 +jmespath==0.10.0 +jschema-to-python==1.2.3 +json-merge-patch==0.2 +jsondiff==2.0.0 +jsonpatch==1.32 +jsonpath-ng==1.5.3 +jsonpickle==3.0.1 +jsonpointer==2.3 +jsonschema-spec==0.1.2 +jsonschema==4.17.3 +junit-xml==1.9 +jupyter-client==7.3.4 +jupyter_core==5.1.3 +keyring==23.13.1 +kombu==5.2.4 +krb5==0.4.1 +kubernetes==23.6.0 +kylinpy==2.8.4 +lazy-object-proxy==1.9.0 +ldap3==2.9.1 +linkify-it-py==2.0.0 +locket==1.0.0 +lockfile==0.12.2 +looker-sdk==22.20.0 +lxml==4.9.2 +lz4==4.3.2 +markdown-it-py==2.1.0 +marshmallow-enum==1.5.1 +marshmallow-oneofschema==3.0.1 +marshmallow-sqlalchemy==0.26.1 +marshmallow==3.19.0 +matplotlib-inline==0.1.6 +mccabe==0.7.0 +mdit-py-plugins==0.3.3 +mdurl==0.1.2 +mongomock==4.1.2 +monotonic==1.6 +more-itertools==8.14.0 +moreorless==0.4.0 +moto==4.1.0 +msal-extensions==1.0.0 +msal==1.20.0 +msgpack==1.0.4 +msrest==0.7.1 +msrestazure==0.6.4 +multi-key-dict==2.0.3 +multidict==6.0.4 +mypy-boto3-appflow==1.26.32 +mypy-boto3-rds==1.26.47 +mypy-boto3-redshift-data==1.26.30 +mypy-extensions==0.4.3 +mypy==0.971 +mysql-connector-python==8.0.32 +mysqlclient==2.1.1 +nbclient==0.7.2 +nbformat==5.7.3 +neo4j==5.4.0 +nest-asyncio==1.5.6 +networkx==2.8.8 +nodeenv==1.7.0 +ntlm-auth==1.5.0 +numpy==1.22.4 +oauthlib==3.2.2 +objsize==0.6.1 +openapi-schema-validator==0.4.0 +openapi-spec-validator==0.5.2 +opsgenie-sdk==2.1.5 +oracledb==1.2.1 +orjson==3.8.5 +oscrypto==1.3.0 +oss2==2.16.0 +packaging==21.3 +pandas-gbq==0.17.9 +pandas==1.5.2 +papermill==2.4.0 +parameterized==0.8.1 +paramiko==2.12.0 +parso==0.8.3 +partd==1.3.0 +pathable==0.4.3 +pathspec==0.9.0 +pbr==5.11.1 +pdpyras==4.5.2 +pendulum==2.1.2 +pexpect==4.8.0 +pickleshare==0.7.5 +pinotdb==0.4.12 +pipdeptree==2.3.3 +pipx==1.1.0 +pkginfo==1.9.6 +platformdirs==2.6.2 +pluggy==1.0.0 +ply==3.11 +plyvel==1.5.0 +portalocker==2.6.0 +pre-commit==2.21.0 +presto-python-client==0.8.3 +prison==0.2.1 +prometheus-client==0.15.0 +prompt-toolkit==3.0.36 +proto-plus==1.19.6 +protobuf==3.20.0 +psutil==5.9.4 +psycopg2-binary==2.9.5 +psycopg2==2.9.5 +ptyprocess==0.7.0 +pure-eval==0.2.2 +pure-sasl==0.6.2 +py4j==0.10.9.5 +py==1.11.0 +pyOpenSSL==22.1.0 +pyarrow==9.0.0 +pyasn1-modules==0.2.8 +pyasn1==0.4.8 +pycodestyle==2.10.0 +pycountry==22.3.5 +pycparser==2.21 +pycryptodome==3.16.0 +pycryptodomex==3.16.0 +pydantic==1.10.4 +pydata-google-auth==1.5.0 +pydot==1.4.2 +pydruid==0.6.5 +pyenchant==3.2.2 +pyexasol==0.25.1 +pyflakes==3.0.1 +pygraphviz==1.10 +pyhcl==0.4.4 +pykerberos==1.2.4 +pymongo==3.13.0 +pymssql==2.2.8 +pyodbc==4.0.35 +pyparsing==3.0.9 +pypsrp==0.8.1 +pyrsistent==0.19.3 +pyspark==3.3.1 +pyspnego==0.7.0 +pytest-asyncio==0.20.3 +pytest-capture-warnings==0.0.4 +pytest-cov==4.0.0 +pytest-httpx==0.21.2 +pytest-instafail==0.4.2 +pytest-rerunfailures==9.1.1 +pytest-timeouts==1.2.1 +pytest-xdist==3.1.0 +pytest==6.2.5 +python-arango==7.5.5 +python-daemon==2.3.2 +python-dateutil==2.8.2 +python-dotenv==0.21.0 +python-http-client==3.3.7 +python-jenkins==1.7.0 +python-jose==3.3.0 +python-ldap==3.4.3 +python-nvd3==0.15.0 +python-slugify==7.0.0 +python-telegram-bot==13.15 +pytz-deprecation-shim==0.1.0.post0 +pytz==2022.7.1 +pytzdata==2020.1 +pywinrm==0.4.3 +pyzmq==25.0.0 +qds-sdk==1.16.1 +reactivex==4.0.4 +readme-renderer==37.3 +redis==3.5.3 +redshift-connector==2.0.909 +regex==2022.10.31 +requests-file==1.5.1 +requests-kerberos==0.14.0 +requests-mock==1.10.0 +requests-ntlm==1.1.0 +requests-oauthlib==1.3.1 +requests-toolbelt==0.10.1 +requests==2.28.2 +responses==0.22.0 +rfc3986==1.5.0 +rich-click==1.6.0 +rich==13.1.0 +rsa==4.9 +s3transfer==0.6.0 +sarif-om==1.0.4 +sasl==0.3.1 +scramp==1.4.4 +scrapbook==0.5.0 +semver==2.13.0 +sendgrid==6.9.7 +sentinels==1.0.0 +sentry-sdk==1.13.0 +setproctitle==1.3.2 +simple-salesforce==1.12.3 +six==1.16.0 +slack-sdk==3.19.5 +smbprotocol==1.10.1 +smmap==5.0.0 +snakebite-py3==3.0.5 +sniffio==1.3.0 +snowballstemmer==2.2.0 +snowflake-connector-python==2.9.0 +snowflake-sqlalchemy==1.4.4 +sortedcontainers==2.4.0 +soupsieve==2.3.2.post1 +sphinx-airflow-theme==0.0.11 +sphinx-argparse==0.4.0 +sphinx-autoapi==2.0.1 +sphinx-copybutton==0.5.1 +sphinx-jinja==2.0.2 +sphinx-rtd-theme==1.1.1 +sphinxcontrib-devhelp==1.0.2 +sphinxcontrib-htmlhelp==2.0.0 +sphinxcontrib-httpdomain==1.8.1 +sphinxcontrib-jsmath==1.0.1 +sphinxcontrib-qthelp==1.0.3 +sphinxcontrib-redoc==1.6.0 +sphinxcontrib-serializinghtml==1.1.5 +sphinxcontrib-spelling==7.7.0 +sphinxcontrib.applehelp==1.0.3 +spython==0.3.0 +sqlalchemy-bigquery==1.5.0 +sqlalchemy-drill==1.1.2 +sqlalchemy-redshift==0.8.12 +sqlparse==0.4.3 +sshpubkeys==3.3.1 +sshtunnel==0.4.0 +stack-data==0.6.2 +starkbank-ecdsa==2.2.0 +statsd==4.0.1 +tableauserverclient==0.23.4 +tabulate==0.9.0 +tblib==1.7.0 +tenacity==8.1.0 +termcolor==2.2.0 +text-unidecode==1.3 +textwrap3==0.9.2 +thrift-sasl==0.4.3 +thrift==0.16.0 +toml==0.10.2 +tomli==2.0.1 +toolz==0.12.0 +tornado==6.1 +towncrier==22.12.0 +tqdm==4.64.1 +traitlets==5.8.1 +trino==0.321.0 +twine==4.0.2 +types-Deprecated==1.2.9 +types-Markdown==3.4.2.2 +types-PyMySQL==1.0.19.2 +types-PyYAML==6.0.12.3 +types-boto==2.49.18.5 +types-certifi==2021.10.8.3 +types-croniter==1.3.2.2 +types-docutils==0.19.1.2 +types-freezegun==1.1.10 +types-paramiko==2.12.0.3 +types-protobuf==4.21.0.3 +types-pyOpenSSL==23.0.0.1 +types-python-dateutil==2.8.19.6 +types-python-slugify==7.0.0.1 +types-pytz==2022.7.1.0 +types-redis==4.4.0.2 +types-requests==2.28.11.8 +types-setuptools==65.7.0.2 +types-tabulate==0.9.0.0 +types-termcolor==1.1.6 +types-toml==0.10.8.1 +types-urllib3==1.26.25.4 +typing_extensions==4.4.0 +tzdata==2022.7 +tzlocal==4.2 +uamqp==1.6.3 +uc-micro-py==1.0.1 +unicodecsv==0.14.1 +uritemplate==3.0.1 +urllib3==1.26.14 +userpath==1.8.0 +vertica-python==1.2.0 +vine==5.0.0 +virtualenv==20.17.1 +volatile==2.1.0 +watchtower==2.0.1 +wcwidth==0.2.6 +webencodings==0.5.1 +websocket-client==1.4.2 +wrapt==1.14.1 +xmltodict==0.13.0 +yamllint==1.29.0 +yandexcloud==0.194.0 +yarl==1.8.2 +zeep==4.2.1 +zenpy==2.0.25 +zict==2.2.0 +zipp==3.11.0 +zope.event==4.6 +zope.interface==5.5.2 +zstandard==0.19.0 diff --git a/sm2a/dags/requirements.txt b/sm2a/dags/requirements.txt new file mode 100644 index 00000000..8c9ec097 --- /dev/null +++ b/sm2a/dags/requirements.txt @@ -0,0 +1,20 @@ +#--constraint /usr/local/airflow/dags/requirements-constraints.txt +affine==2.4.0 +netCDF4==1.6.2 +pydantic==1.10.4 +requests==2.28.1 +rio-cogeo==3.5.0 +smart-open==6.3.0 +airflow_multi_dagrun==2.3.1 +apache-airflow-providers-docker==3.2.0 +apache-airflow-providers-postgres==5.2.2 +apache-airflow-providers-common-sql==1.2.0 +typing-extensions==4.4.0 +psycopg2-binary==2.9.5 +pypgstac==0.7.4 +pyOpenSSL==22.0.0 +stac-pydantic +fsspec +s3fs +xarray +xstac diff --git a/sm2a/dags/veda_data_pipeline/__init__.py b/sm2a/dags/veda_data_pipeline/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sm2a/dags/veda_data_pipeline/groups/__init__.py b/sm2a/dags/veda_data_pipeline/groups/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sm2a/dags/veda_data_pipeline/groups/collection_group.py b/sm2a/dags/veda_data_pipeline/groups/collection_group.py new file mode 100644 index 00000000..0a561175 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/groups/collection_group.py @@ -0,0 +1,91 @@ +import requests +from airflow.models.variable import Variable +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup +from veda_data_pipeline.utils.collection_generation import GenerateCollection +from veda_data_pipeline.utils.submit_stac import submission_handler + +generator = GenerateCollection() + + +def check_collection_exists(endpoint: str, collection_id: str): + """ + Check if a collection exists in the STAC catalog + + Args: + endpoint (str): STAC catalog endpoint + collection_id (str): collection id + """ + response = requests.get(f"{endpoint}/collections/{collection_id}") + return ( + "Collection.existing_collection" + if (response.status_code == 200) + else "Collection.generate_collection" + ) + + +def ingest_collection_task(ti): + """ + Ingest a collection into the STAC catalog + + Args: + dataset (Dict[str, Any]): dataset dictionary (JSON) + role_arn (str): role arn for Zarr collection generation + """ + import json + collection = ti.xcom_pull(task_ids='Collection.generate_collection') + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") + stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") + + return submission_handler( + event=collection, + endpoint="/collections", + cognito_app_secret=cognito_app_secret, + stac_ingestor_api_url=stac_ingestor_api_url + ) + + +# NOTE unused, but useful for item ingests, since collections are a dependency for items +def check_collection_exists_task(ti): + import json + config = ti.dag_run.conf + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + stac_url = airflow_vars_json.get("STAC_URL") + return check_collection_exists( + endpoint=stac_url, + collection_id=config.get("collection"), + ) + + +def generate_collection_task(ti): + import json + config = ti.dag_run.conf + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + + # TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable + collection = generator.generate_stac( + dataset_config=config, role_arn=role_arn + ) + return collection + + + +group_kwgs = {"group_id": "Collection", "tooltip": "Collection"} + + +def collection_task_group(): + with TaskGroup(**group_kwgs) as collection_task_grp: + generate_collection = PythonOperator( + task_id="generate_collection", python_callable=generate_collection_task + ) + ingest_collection = PythonOperator( + task_id="ingest_collection", python_callable=ingest_collection_task + ) + generate_collection >> ingest_collection + + return collection_task_grp diff --git a/sm2a/dags/veda_data_pipeline/groups/discover_group.py b/sm2a/dags/veda_data_pipeline/groups/discover_group.py new file mode 100644 index 00000000..63f41dd6 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/groups/discover_group.py @@ -0,0 +1,89 @@ +from datetime import timedelta +import json +import uuid + +from airflow.models.variable import Variable +from airflow.models.xcom import LazyXComAccess +from airflow.decorators import task +from veda_data_pipeline.utils.s3_discovery import ( + s3_discovery_handler, EmptyFileListError +) + +group_kwgs = {"group_id": "Discover", "tooltip": "Discover"} + + +@task(retries=1, retry_delay=timedelta(minutes=1)) +def discover_from_s3_task(ti=None, event={}, **kwargs): + """Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process. + This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET. + """ + config = { + **event, + **ti.dag_run.conf, + } + # TODO test that this context var is available in taskflow + last_successful_execution = kwargs.get("prev_start_date_success") + if event.get("schedule") and last_successful_execution: + config["last_successful_execution"] = last_successful_execution.isoformat() + # (event, chunk_size=2800, role_arn=None, bucket_output=None): + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + event_bucket = airflow_vars_json.get("EVENT_BUCKET") + read_assume_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + # Making the chunk size small, this helped us process large data faster than + # passing a large chunk of 500 + chunk_size = config.get("chunk_size", 500) + try: + return s3_discovery_handler( + event=config, + role_arn=read_assume_arn, + bucket_output=event_bucket, + chunk_size=chunk_size + ) + except EmptyFileListError as ex: + print(f"Received an exception {ex}") + # TODO test continued short circuit operator behavior (no files -> skip remaining tasks) + return {} + + +@task +def get_files_to_process(payload, ti=None): + """Get files from S3 produced by the discovery task. + Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks. + """ + if isinstance(payload, LazyXComAccess): # if used as part of a dynamic task mapping + payloads_xcom = payload[0].pop("payload", []) + payload = payload[0] + else: + payloads_xcom = payload.pop("payload", []) + dag_run_id = ti.dag_run.run_id + return [{ + "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", + **payload, + "payload": payload_xcom, + } for indx, payload_xcom in enumerate(payloads_xcom)] + + +@task +def get_dataset_files_to_process(payload, ti=None): + """Get files from S3 produced by the dataset task. + This is different from the get_files_to_process task as it produces a combined structure from repeated mappings. + """ + dag_run_id = ti.dag_run.run_id + + result = [] + for x in payload: + if isinstance(x, LazyXComAccess): # if used as part of a dynamic task mapping + payloads_xcom = x[0].pop("payload", []) + payload_0 = x[0] + else: + payloads_xcom = x.pop("payload", []) + payload_0 = x + for indx, payload_xcom in enumerate(payloads_xcom): + result.append({ + "run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", + **payload_0, + "payload": payload_xcom, + }) + return result diff --git a/sm2a/dags/veda_data_pipeline/groups/ecs_tasks.py b/sm2a/dags/veda_data_pipeline/groups/ecs_tasks.py new file mode 100644 index 00000000..2c8852e2 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/groups/ecs_tasks.py @@ -0,0 +1,117 @@ +import json + +from airflow.hooks.base import BaseHook +from airflow.providers.amazon.aws.operators.ecs import ( + EcsDeregisterTaskDefinitionOperator, + EcsRegisterTaskDefinitionOperator, + EcsRunTaskOperator, +) +from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule + + +def get_aws_keys_from_connection(connection_id="aws_default"): + conn = BaseHook.get_connection(connection_id) + return { + "AWS_ACCESS_KEY_ID": conn.login, + "AWS_SECRET_ACCESS_KEY": conn.password, + "AWS_DEFAULT_REGION": json.loads(conn.extra).get("region_name", "us-west-2"), + } + + +group_kwgs = {"group_id": "ECSTasks", "tooltip": "ECSTasks"} + + +def subdag_ecs_task( + task_id, + task_definition_family, + container_name, + docker_image, + cmd: str, + mwaa_stack_conf, + aws_region="us-west-2", + cpu="256", + memory="512", + stage="dev", + environment_vars=None, +): + if environment_vars is None: + environment_vars = list() + with TaskGroup(**group_kwgs) as ecs_task_grp: + if stage == "local": + from airflow.providers.docker.operators.docker import DockerOperator + + return DockerOperator( + task_id=task_id, + container_name=container_name, + image=docker_image, + api_version="auto", + auto_remove=True, + command=cmd, + environment=get_aws_keys_from_connection(), + docker_url="tcp://docker-in-docker:2375", + mount_tmp_dir=False, + network_mode="bridge", + ) + + register_task = EcsRegisterTaskDefinitionOperator( + task_id=f"{task_id}_task_register", + family=task_definition_family, + trigger_rule=TriggerRule.ONE_SUCCESS, + container_definitions=[ + { + "name": container_name, + "image": docker_image, + "entryPoint": ["sh", "-c"], + "command": ["ls"], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": mwaa_stack_conf.get("LOG_GROUP_NAME"), + "awslogs-region": aws_region, + "awslogs-stream-prefix": "ecs", + }, + }, + } + ], + register_task_kwargs={ + "cpu": cpu, + "memory": memory, + "networkMode": "awsvpc", + "taskRoleArn": mwaa_stack_conf.get("MWAA_EXECUTION_ROLE_ARN"), + "executionRoleArn": mwaa_stack_conf.get("MWAA_EXECUTION_ROLE_ARN"), + "requiresCompatibilities": ["FARGATE"], + }, + ) + ecs_task_run = EcsRunTaskOperator( + task_id=task_id, + cluster=mwaa_stack_conf.get("ECS_CLUSTER_NAME"), + task_definition=register_task.output, + launch_type="FARGATE", + do_xcom_push=True, + overrides={ + "containerOverrides": [ + { + "name": container_name, + "command": [cmd], + "environment": environment_vars, + }, + ], + }, + network_configuration={ + "awsvpcConfiguration": { + "securityGroups": mwaa_stack_conf.get("SECURITYGROUPS"), + "subnets": mwaa_stack_conf.get("SUBNETS"), + }, + }, + awslogs_region="us-west-2", + awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"), + awslogs_stream_prefix=f"ecs/{container_name}", + ) + deregister_task = EcsDeregisterTaskDefinitionOperator( + task_id=f"{task_id}_deregister_task", + task_definition=register_task.output, + ) + + register_task >> ecs_task_run >> deregister_task + return ecs_task_grp diff --git a/sm2a/dags/veda_data_pipeline/groups/processing_tasks.py b/sm2a/dags/veda_data_pipeline/groups/processing_tasks.py new file mode 100644 index 00000000..48758fcc --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/groups/processing_tasks.py @@ -0,0 +1,40 @@ +from datetime import timedelta +import json +import logging +import smart_open +from airflow.models.variable import Variable +from airflow.decorators import task +from veda_data_pipeline.utils.submit_stac import submission_handler + +group_kwgs = {"group_id": "Process", "tooltip": "Process"} + + +def log_task(text: str): + logging.info(text) + + +@task(retries=1, retry_delay=timedelta(minutes=1)) +def submit_to_stac_ingestor_task(built_stac: dict): + """Submit STAC items to the STAC ingestor API.""" + event = built_stac.copy() + success_file = event["payload"]["success_event_key"] + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") + stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") + with smart_open.open(success_file, "r") as _file: + stac_items = json.loads(_file.read()) + + for item in stac_items: + submission_handler( + event=item, + endpoint="/ingestions", + cognito_app_secret=cognito_app_secret, + stac_ingestor_api_url=stac_ingestor_api_url, + ) + return event + + + + diff --git a/sm2a/dags/veda_data_pipeline/groups/transfer_group.py b/sm2a/dags/veda_data_pipeline/groups/transfer_group.py new file mode 100644 index 00000000..3bf30e1f --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/groups/transfer_group.py @@ -0,0 +1,65 @@ +from datetime import timedelta + +from airflow.models.variable import Variable +from airflow.operators.python import BranchPythonOperator, PythonOperator +import json +from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule + +group_kwgs = {"group_id": "Transfer", "tooltip": "Transfer"} + + +def cogify_choice(ti): + """Choos whether to cogify or not; if yes, use a docker container""" + payload = ti.dag_run.conf + + if payload.get("cogify"): + return f"{group_kwgs['group_id']}.cogify_and_copy_data" + else: + return f"{group_kwgs['group_id']}.copy_data" + + +def cogify_copy_task(ti): + from veda_data_pipeline.utils.cogify_transfer.handler import cogify_transfer_handler + config = ti.dag_run.conf + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") + return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn) + + +def transfer_data(ti): + """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" + from veda_data_pipeline.utils.transfer import ( + data_transfer_handler, + ) + config = ti.dag_run.conf + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") + # (event, chunk_size=2800, role_arn=None, bucket_output=None): + return data_transfer_handler(event=config, role_arn=external_role_arn) + + +# TODO: cogify_transfer handler is missing arg parser so this subdag will not work +def subdag_transfer(): + with TaskGroup(**group_kwgs) as discover_grp: + cogify_branching = BranchPythonOperator( + task_id="cogify_branching", + trigger_rule=TriggerRule.ONE_SUCCESS, + python_callable=cogify_choice, + ) + + run_copy = PythonOperator( + task_id="copy_data", + python_callable=transfer_data, + op_kwargs={"text": "Copy files on S3"}, + ) + run_cogify_copy = PythonOperator( + task_id="cogify_and_copy_data", + trigger_rule="none_failed", + python_callable=cogify_copy_task + ) + + (cogify_branching >> [run_copy, run_cogify_copy]) + return discover_grp diff --git a/sm2a/dags/veda_data_pipeline/requirements_dev.txt b/sm2a/dags/veda_data_pipeline/requirements_dev.txt new file mode 100644 index 00000000..e21ff359 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/requirements_dev.txt @@ -0,0 +1 @@ +requests_mock==1.12.1 \ No newline at end of file diff --git a/sm2a/dags/veda_data_pipeline/utils/README.md b/sm2a/dags/veda_data_pipeline/utils/README.md new file mode 100644 index 00000000..42c1d982 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/README.md @@ -0,0 +1,26 @@ +# Data Pipeline Utils + +## submit_stac + +Test with python locally (uses example data in [hlss30_stac_example.ndjson](./hlss30_stac_example.ndjson)) + +```bash +python -m submit_stac +``` + +---------------- + +## s3_discovery + +Module to query an `s3` bucket to discover COGs +```bash +docker build -t s3-discovery. +# Currently runs an example for OMI Ozone +docker run s3-discovery python -m s3_discovery_handler +``` + +To run this locally, you may need to pass your AWS credentials to the module: `docker run -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY s3-discovery python -m s3_discovery_handler` + +AWS Provisioning +This Lambda needs to list the contents of a S3 Bucket in order to discover files. +- Add `s3:ListBucket` to the Lambda's execution role diff --git a/sm2a/dags/veda_data_pipeline/utils/__init__.py b/sm2a/dags/veda_data_pipeline/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/handler.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/handler.py new file mode 100644 index 00000000..a13350b0 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/build_stac/handler.py @@ -0,0 +1,133 @@ +import json +from typing import Any, Dict, TypedDict, Union +from uuid import uuid4 +import smart_open +from veda_data_pipeline.utils.build_stac.utils import events +from veda_data_pipeline.utils.build_stac.utils import stac +from concurrent.futures import ThreadPoolExecutor, as_completed + + + +class S3LinkOutput(TypedDict): + stac_file_url: str + + +def using_pool(objects, workers_count: int): + returned_results = [] + with ThreadPoolExecutor(max_workers=workers_count) as executor: + # Submit tasks to the executor + futures = {executor.submit(handler, obj): obj for obj in objects} + + for future in as_completed(futures): + try: + result = future.result() # Get result from future + returned_results.append(result) + except Exception as nex: + print(f"Error {nex} with object {futures[future]}") + + return returned_results + + +class StacItemOutput(TypedDict): + stac_item: Dict[str, Any] + + +def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]: + """ + Handler for STAC Collection Item generation + + Arguments: + event - object with event parameters + { + "collection": "OMDOAO3e", + "id_regex": "_(.*).tif", + "assets": { + "OMDOAO3e_LUT": { + "title": "OMDOAO3e_LUT", + "description": "OMDOAO3e_LUT, described", + "href": "s3://climatedashboard-data/OMDOAO3e/OMDOAO3e_LUT.tif", + }, + "OMDOAO3e_LUT": { + "title": "OMDOAO3e_LUT", + "description": "OMDOAO3e_LUT, described", + "href": "s3://climatedashboard-data/OMDOAO3e/OMDOAO3e_LUT.tif", + } + } + } + + """ + + parsed_event = events.RegexEvent.parse_obj(event) + try: + stac_item = stac.generate_stac(parsed_event).to_dict() + except Exception as ex: + out_err: StacItemOutput = {"stac_item": {"error": f"{ex}", "event": event}} + return out_err + + output: StacItemOutput = {"stac_item": stac_item} + return output + + +def sequential_processing(objects): + returned_results = [] + for _object in objects: + result = handler(_object) + returned_results.append(result) + return returned_results + + +def write_outputs_to_s3(key, payload_success, payload_failures): + success_key = f"{key}/build_stac_output_{uuid4()}.json" + with smart_open.open(success_key, "w") as _file: + _file.write(json.dumps(payload_success)) + dead_letter_key = "" + if payload_failures: + dead_letter_key = f"{key}/dead_letter_events/build_stac_failed_{uuid4()}.json" + with smart_open.open(dead_letter_key, "w") as _file: + _file.write(json.dumps(payload_failures)) + return [success_key, dead_letter_key] + + +def stac_handler(payload_src: dict, bucket_output): + payload_event = payload_src.copy() + s3_event = payload_event.pop("payload") + collection = payload_event.get("collection", "not_provided") + key = f"s3://{bucket_output}/events/{collection}" + payload_success = [] + payload_failures = [] + with smart_open.open(s3_event, "r") as _file: + s3_event_read = _file.read() + event_received = json.loads(s3_event_read) + objects = event_received["objects"] + use_multithreading = payload_event.get("use_multithreading", True) + payloads = ( + using_pool(objects, workers_count=4) + if use_multithreading + else sequential_processing(objects) + ) + for payload in payloads: + stac_item = payload["stac_item"] + if "error" in stac_item: + payload_failures.append(stac_item) + else: + payload_success.append(stac_item) + success_key, dead_letter_key = write_outputs_to_s3( + key=key, payload_success=payload_success, payload_failures=payload_failures + ) + + # Silent dead letters are nice, but we want the Airflow UI to quickly alert us if something went wrong. + if len(payload_failures) != 0: + raise ValueError( + f"Some items failed to be processed. Failures logged here: {dead_letter_key}" + ) + + return { + "payload": { + "success_event_key": success_key, + "failed_event_key": dead_letter_key, + "status": { + "successes": len(payload_success), + "failures": len(payload_failures), + }, + } + } diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/__init__.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/events.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/events.py new file mode 100644 index 00000000..c7e835b9 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/events.py @@ -0,0 +1,19 @@ +from datetime import datetime +from typing import Dict, Literal, Optional + +from pydantic import BaseModel, Field + +INTERVAL = Literal["month", "year", "day"] + + +class RegexEvent(BaseModel, frozen=True): + collection: str + item_id: str + assets: Dict + + start_datetime: Optional[datetime] = None + end_datetime: Optional[datetime] = None + single_datetime: Optional[datetime] = None + + properties: Optional[Dict] = Field(default_factory=dict) + datetime_range: Optional[INTERVAL] = None diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/regex.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/regex.py new file mode 100644 index 00000000..471832ee --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/regex.py @@ -0,0 +1,91 @@ +import re +from datetime import datetime +from typing import Callable, Dict, Tuple, Union + +from dateutil.relativedelta import relativedelta + +from . import events + +DATERANGE = Tuple[datetime, datetime] + + +def _calculate_year_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj.replace(month=1, day=1) + end_datetime = datetime_obj.replace(month=12, day=31) + return start_datetime, end_datetime + + +def _calculate_month_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj.replace(day=1) + end_datetime = datetime_obj + relativedelta(day=31) + return start_datetime, end_datetime + + +def _calculate_day_range(datetime_obj: datetime) -> DATERANGE: + start_datetime = datetime_obj + end_datetime = datetime_obj + relativedelta(hour=23, minute=59, second=59) + return start_datetime, end_datetime + + +DATETIME_RANGE_METHODS: Dict[events.INTERVAL, Callable[[datetime], DATERANGE]] = { + "month": _calculate_month_range, + "year": _calculate_year_range, + "day": _calculate_day_range, +} + + +def extract_dates( + filename: str, datetime_range: events.INTERVAL +) -> Union[Tuple[datetime, datetime, None], Tuple[None, None, datetime]]: + """ + Extracts start & end or single date string from filename. + """ + DATE_REGEX_STRATEGIES = [ + (r"_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", "%Y-%m-%dT%H:%M:%S"), + (r"_(\d{8}T\d{6})", "%Y%m%dT%H%M%S"), + (r"_(\d{4}_\d{2}_\d{2})", "%Y_%m_%d"), + (r"_(\d{4}-\d{2}-\d{2})", "%Y-%m-%d"), + (r"_(\d{8})", "%Y%m%d"), + (r"_(\d{6})", "%Y%m"), + (r"_(\d{4})", "%Y"), + ] + + # Find dates in filename + dates = [] + for pattern, dateformat in DATE_REGEX_STRATEGIES: + dates_found = re.compile(pattern).findall(filename) + if not dates_found: + continue + + for date_str in dates_found: + dates.append(datetime.strptime(date_str, dateformat)) + + break + + num_dates_found = len(dates) + + # No dates found + if not num_dates_found: + raise Exception( + f"No dates provided in {filename=}. " + "At least one date in format yyyy-mm-dd is required." + ) + + # Many dates found + if num_dates_found > 1: + dates.sort() + start_datetime, *_, end_datetime = dates + return start_datetime, end_datetime, None + + # Single date found + single_datetime = dates[0] + + # Convert single date to range + if datetime_range: + start_datetime, end_datetime = DATETIME_RANGE_METHODS[datetime_range]( + single_datetime + ) + return start_datetime, end_datetime, None + + # Return single date + return None, None, single_datetime diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/role.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/role.py new file mode 100644 index 00000000..817c0ad3 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/role.py @@ -0,0 +1,10 @@ +import boto3 + + +def assume_role(role_arn, session_name): + sts = boto3.client("sts") + creds = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + return creds["Credentials"] diff --git a/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/stac.py b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/stac.py new file mode 100644 index 00000000..9ec69f14 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/build_stac/utils/stac.py @@ -0,0 +1,142 @@ +import os + +import pystac +import rasterio +from pystac.utils import datetime_to_str +from rasterio.session import AWSSession +from rio_stac import stac +from rio_stac.stac import PROJECTION_EXT_VERSION, RASTER_EXT_VERSION + + +from . import events, regex, role + + +def get_sts_session(): + if role_arn := os.environ.get("EXTERNAL_ROLE_ARN"): + creds = role.assume_role(role_arn, "veda-data-pipelines_build-stac") + return AWSSession( + aws_access_key_id=creds["AccessKeyId"], + aws_secret_access_key=creds["SecretAccessKey"], + aws_session_token=creds["SessionToken"], + ) + return + + +def create_item( + item_id, + bbox, + properties, + datetime, + collection, + assets, +) -> pystac.Item: + """ + Function to create a stac item from a COG using rio_stac + """ + # item + item = pystac.Item( + id=item_id, + geometry=stac.bbox_to_geom(bbox), + bbox=bbox, + collection=collection, + stac_extensions=[ + f"https://stac-extensions.github.io/raster/{RASTER_EXT_VERSION}/schema.json", + f"https://stac-extensions.github.io/projection/{PROJECTION_EXT_VERSION}/schema.json", + ], + datetime=datetime, + properties=properties, + ) + + # if we add a collection we MUST add a link + if collection: + item.add_link( + pystac.Link( + pystac.RelType.COLLECTION, + collection, + media_type=pystac.MediaType.JSON, + ) + ) + + for key, asset in assets.items(): + item.add_asset(key=key, asset=asset) + return item + + +def generate_stac(event: events.RegexEvent) -> pystac.Item: + """ + Generate STAC item from user provided datetime range or regex & filename + """ + start_datetime = end_datetime = single_datetime = None + if event.start_datetime and event.end_datetime: + start_datetime = event.start_datetime + end_datetime = event.end_datetime + single_datetime = None + elif single_datetime := event.single_datetime: + start_datetime = end_datetime = None + single_datetime = single_datetime + else: + # Having multiple assets, we try against all filenames. + for asset_name, asset in event.assets.items(): + try: + filename = asset["href"].split("/")[-1] + start_datetime, end_datetime, single_datetime = regex.extract_dates( + filename, event.datetime_range + ) + break + except Exception: + continue + # Raise if dates can't be found + if not (start_datetime or end_datetime or single_datetime): + raise ValueError("No dates found in event config or by regex") + + properties = event.properties or {} + if start_datetime and end_datetime: + properties["start_datetime"] = datetime_to_str(start_datetime) + properties["end_datetime"] = datetime_to_str(end_datetime) + single_datetime = None + assets = {} + + rasterio_kwargs = {} + rasterio_kwargs["session"] = get_sts_session() + with rasterio.Env( + session=rasterio_kwargs.get("session"), + options={**rasterio_kwargs}, + ): + bboxes = [] + for asset_name, asset_definition in event.assets.items(): + with rasterio.open(asset_definition["href"]) as src: + # Get BBOX and Footprint + dataset_geom = stac.get_dataset_geom(src, densify_pts=0, precision=-1) + bboxes.append(dataset_geom["bbox"]) + + media_type = stac.get_media_type(src) + proj_info = { + f"proj:{name}": value + for name, value in stac.get_projection_info(src).items() + } + raster_info = {"raster:bands": stac.get_raster_info(src, max_size=1024)} + + # The default asset name for cogs is "cog_default", so we need to intercept 'default' + if asset_name == "default": + asset_name = "cog_default" + assets[asset_name] = pystac.Asset( + title=asset_definition["title"], + description=asset_definition["description"], + href=asset_definition["href"], + media_type=media_type, + roles=["data", "layer"], + extra_fields={**proj_info, **raster_info}, + ) + + minx, miny, maxx, maxy = zip(*bboxes) + bbox = [min(minx), min(miny), max(maxx), max(maxy)] + + create_item_response = create_item( + item_id=event.item_id, + bbox=bbox, + properties=properties, + datetime=single_datetime, + collection=event.collection, + assets=assets, + ) + return create_item_response diff --git a/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/handler.py b/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/handler.py new file mode 100644 index 00000000..0e9db5eb --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/handler.py @@ -0,0 +1,86 @@ +import re +import tempfile + +import boto3 +from rio_cogeo.cogeo import cog_translate + + +def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"): + sts = boto3.client("sts") + credentials = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + creds = credentials["Credentials"] + return { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds.get("SecretAccessKey"), + "aws_session_token": creds.get("SessionToken"), + } + + +def get_matching_files(s3_client, bucket, prefix, regex_pattern): + matching_files = [] + + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) + while True: + for obj in response["Contents"]: + file_key = obj["Key"] + if re.match(regex_pattern, file_key): + matching_files.append(file_key) + + if "NextContinuationToken" in response: + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + + return matching_files + + +def transfer_file(s3_client, file_key, local_file_path, destination_bucket, collection): + filename = file_key.split("/")[-1] + target_key = f"{collection}/{filename}" + s3_client.upload_file(local_file_path, destination_bucket, target_key) + + +def cogify_transfer_handler(event_src, external_role_arn=None): + event = event_src.copy() + kwargs = {} + if external_role_arn: + creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer") + kwargs = { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds["SecretAccessKey"], + "aws_session_token": creds["SessionToken"], + } + source_s3 = boto3.client("s3") + target_s3 = boto3.client("s3", **kwargs) + + origin_bucket = event.get("origin_bucket") + origin_prefix = event.get("origin_prefix") + regex_pattern = event.get("filename_regex") + target_bucket = event.get("target_bucket", "veda-data-store-staging") + collection = event.get("collection") + + matching_files = get_matching_files( + source_s3, origin_bucket, origin_prefix, regex_pattern + ) + if not event.get("dry_run"): + for origin_key in matching_files: + with tempfile.NamedTemporaryFile() as local_tif, tempfile.NamedTemporaryFile() as local_cog: + local_tif_path = local_tif.name + local_cog_path = local_cog.name + source_s3.download_file(origin_bucket, origin_key, local_tif_path) + cog_translate(local_tif_path, local_cog_path, quiet=True) + filename = origin_key.split("/")[-1] + destination_key = f"{collection}/{filename}" + target_s3.upload_file(local_cog_path, target_bucket, destination_key) + else: + print( + f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}" + ) + print(f"Files matched: {matching_files}") diff --git a/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt b/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt new file mode 100644 index 00000000..56e091b1 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/cogify_transfer/requirements.txt @@ -0,0 +1,11 @@ +aws-lambda-powertools +awslambdaric +boto3 +pystac==1.4.0 +python-cmr +rasterio==1.3.3 +rio-cogeo==4.0.0 +shapely +smart-open==6.3.0 +pydantic==1.10.7 +typing-extensions==4.5.0 diff --git a/sm2a/dags/veda_data_pipeline/utils/collection_generation.py b/sm2a/dags/veda_data_pipeline/utils/collection_generation.py new file mode 100644 index 00000000..abba2de5 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/collection_generation.py @@ -0,0 +1,138 @@ +from typing import Any, Dict + +import fsspec +import xarray as xr +import xstac +from veda_data_pipeline.utils.schemas import SpatioTemporalExtent +from datetime import datetime, timezone + + +class GenerateCollection: + common = { + "links": [], + "extent": { + "spatial": {"bbox": [[-180, -90, 180, 90]]}, + "temporal": {"interval": [[None, None]]}, + }, + "type": "Collection", + "stac_version": "1.0.0", + } + keys_to_ignore = [ + "collection", + "data_type", + "sample_files", + "discovery_items", + "spatial_extent", + "temporal_extent", + "is_periodic", + "time_density", + "type", + ] + + def get_template(self, dataset: Dict[str, Any]) -> dict: + extra_fields = { + key: dataset[key] + for key in dataset.keys() + if key not in GenerateCollection.keys_to_ignore + } + + collection_dict = { + "id": dataset["collection"], + **GenerateCollection.common, + **extra_fields, + } + + # Default REQUIRED fields + if not collection_dict.get("description"): + collection_dict["description"] = dataset["collection"] + if not collection_dict.get("license"): + collection_dict["license"] = "proprietary" + + return collection_dict + + def _create_zarr_template(self, dataset: Dict[str, Any], store_path: str) -> dict: + template = self.get_template(dataset) + template["assets"] = { + "zarr": { + "href": store_path, + "title": "Zarr Array Store", + "description": "Zarr array store with one or several arrays (variables)", + "roles": ["data", "zarr"], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "engine": "zarr", + "chunks": {}, + **dataset.xarray_kwargs, + }, + } + } + return template + + def create_zarr_collection(self, dataset: Dict[str, Any], role_arn: str) -> dict: + """ + Creates a zarr stac collection based off of the user input + """ + discovery = dataset.discovery_items[0] + store_path = f"s3://{discovery.bucket}/{discovery.prefix}{discovery.zarr_store}" + template = self._create_zarr_template(dataset, store_path) + + fs = fsspec.filesystem("s3", anon=False, role_arn=role_arn) + store = fs.get_mapper(store_path) + ds = xr.open_zarr( + store, consolidated=bool(dataset.xarray_kwargs.get("consolidated")) + ) + + collection = xstac.xarray_to_stac( + ds, + template, + temporal_dimension=dataset.temporal_dimension or "time", + x_dimension=dataset.x_dimension or "lon", + y_dimension=dataset.y_dimension or "lat", + reference_system=dataset.reference_system or 4326, + ) + return collection.to_dict() + + def create_cog_collection(self, dataset: Dict[str, Any]) -> dict: + collection_stac = self.get_template(dataset) + + # Override the extents if they exists + if spatial_extent := dataset.get("spatial_extent"): + collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]}, + + if temporal_extent := dataset.get("temporal_extent"): + collection_stac["extent"]["temporal"] = { + "interval": [ + # most of our data uses the Z suffix for UTC - isoformat() doesn't + [ + datetime.fromisoformat(x).astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + if x else None + for x in list(temporal_extent.values()) + ] + ] + } + + collection_stac["item_assets"] = { + "cog_default": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "roles": ["data", "layer"], + "title": "Default COG Layer", + "description": "Cloud optimized default layer to display on map", + } + } + return collection_stac + + def generate_stac( + self, dataset_config: Dict[str, Any], role_arn: str = None + ) -> dict: + """ + Generates a STAC collection based on the dataset and data type + + Args: + dataset_config (Dict[str, Any]): dataset configuration + role_arn (str): role arn for Zarr collection generation + """ + data_type = dataset_config.get("data_type", "cog") + if data_type == "zarr": + return self.create_zarr_collection(dataset_config, role_arn) + else: + return self.create_cog_collection(dataset_config) diff --git a/sm2a/dags/veda_data_pipeline/utils/s3_discovery.py b/sm2a/dags/veda_data_pipeline/utils/s3_discovery.py new file mode 100644 index 00000000..4b164298 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/s3_discovery.py @@ -0,0 +1,292 @@ +import itertools +import json +import os +import re +from typing import List +from uuid import uuid4 +from pathlib import Path + +from datetime import datetime +from dateutil.tz import tzlocal +import boto3 +from smart_open import open as smrt_open + + +# Adding a custom exception for empty list +class EmptyFileListError(Exception): + def __init__(self, error_message): + self.error_message = error_message + super().__init__(self.error_message) + + +def assume_role(role_arn, session_name="veda-data-pipelines_s3-discovery"): + sts = boto3.client("sts") + credentials = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + creds = credentials["Credentials"] + return { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds.get("SecretAccessKey"), + "aws_session_token": creds.get("SessionToken"), + } + + +def get_s3_resp_iterator(bucket_name, prefix, s3_client, page_size=1000): + """ + Returns an s3 paginator. + :param bucket_name: The bucket. + :param prefix: The path for the s3 granules. + :param s3_client: Initialized boto3 S3 client + :param page_size: Number of records returned + """ + s3_paginator = s3_client.get_paginator("list_objects") + print(f"Getting S3 response iterator for bucket: {bucket_name}, prefix: {prefix}") + return s3_paginator.paginate( + Bucket=bucket_name, Prefix=prefix, PaginationConfig={"page_size": page_size} + ) + + +def discover_from_s3( + response_iterator, filename_regex: str, last_execution: datetime +) -> dict: + """Iterate through pages of S3 objects returned by a ListObjectsV2 operation. + The discover_from_s3 function takes in an iterator over the pages of S3 objects returned + by a ListObjectsV2 operation. It iterates through the pages and yields each S3 object in the page as a dictionary. + This function can be used to iterate through a large number of S3 objects returned by a ListObjectsV2 operation + without having to load all the objects into memory at once. + + Parameters: + response_iterator (iter): An iterator over the pages of S3 objects returned by a ListObjectsV2 operation. + filename_regex (str): A regular expression used to filter the S3 objects returned by the ListObjectsV2 operation. + + Yields: + dict: A dictionary representing an S3 object. + """ + for page in response_iterator: + for s3_object in page.get("Contents", {}): + key = s3_object["Key"] + conditionals = [re.match(filename_regex, key)] + if last_execution: + last_modified = s3_object["LastModified"] + conditionals.append(last_modified > last_execution) + if all(conditionals): + yield s3_object + + +def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> dict: + """Group assets by matching regex patterns against discovered files.""" + grouped_files = [] + for uri in discovered_files: + # Each file gets its matched asset type and id + filename = uri.split("/")[-1] + prefix = "/".join(uri.split("/")[:-1]) + asset_type = None + if match := re.match(id_regex, filename): + # At least one match; can use the match here to construct an ID (match groups separated by '-') + item_id = "-".join(match.groups()) + for asset_name, asset_definition in assets.items(): + regex = asset_definition["regex"] + if re.match(regex, filename): + asset_type = asset_name + break + if asset_type: + grouped_files.append( + { + "prefix": prefix, + "filename": filename, + "asset_type": asset_type, + "item_id": item_id, + } + ) + else: + print(f"Warning: skipping file. No id match found: {filename}") + # At this point, files are labeled with type and id. Now, group them by id + sorted_list = sorted(grouped_files, key=lambda x: x["item_id"]) + grouped_data = [ + {"item_id": key, "data": list(group)} + for key, group in itertools.groupby(sorted_list, key=lambda x: x["item_id"]) + ] + items_with_assets = [] + # Produce a dictionary in which each record is keyed by an item ID and contains a list of associated asset hrefs + for group in grouped_data: + item = {"item_id": group["item_id"], "assets": {}} + for file in group["data"]: + asset_type = file["asset_type"] + filename = file["filename"] + # Copy the asset definition and update the href + updated_asset = assets[file["asset_type"]].copy() + updated_asset["href"] = f"{file['prefix']}/{file['filename']}" + item["assets"][asset_type] = updated_asset + items_with_assets.append(item) + return items_with_assets + + +def construct_single_asset_items(discovered_files: List[str], assets: dict|None) -> dict: + items_with_assets = [] + asset_key = "default" + asset_value = {} + if assets: + asset_key = list(assets.keys())[0] + asset_value = assets[asset_key] + for uri in discovered_files: + # Each file gets its matched asset type and id + filename = uri.split("/")[-1] + filename_without_extension = Path(filename).stem + prefix = "/".join(uri.split("/")[:-1]) + item = { + "item_id": filename_without_extension, + "assets": { + asset_key: { + "title": "Default COG Layer", + "description": "Cloud optimized default layer to display on map", + "href": f"{prefix}/{filename}", + **asset_value + } + }, + } + items_with_assets.append(item) + return items_with_assets + + +def generate_payload(s3_prefix_key: str, payload: dict): + """Generate a payload and write it to an S3 file. + This function takes in a prefix for an S3 key and a dictionary containing a payload. + The function then writes the payload to an S3 file using the provided prefix and a randomly + generated UUID as the key. The key of the output file is then returned. + Parameters: + s3_prefix_key (str): The prefix for the S3 key where the output file will be written. + payload (dict): A dictionary containing the payload to be written to the output file. + + Returns: + str: The S3 key of the output file. + """ + output_key = f"{s3_prefix_key}/s3_discover_output_{uuid4()}.json" + with smrt_open(output_key, "w") as file: + file.write(json.dumps(payload)) + return output_key + + +def propagate_forward_datetime_args(event): + """ + This function extracts datetime-related arguments from the input event dictionary. + The purpose is to forward these datetime arguments to other functions that may require them. + + The function looks for the keys "single_datetime", "start_datetime", "end_datetime", + and "datetime_range" in the event dictionary. If any of these keys are present, + it includes them in the output dictionary. + + Parameters: + event (dict): Input dictionary potentially containing datetime arguments. + + Returns: + dict: A new dictionary containing only the datetime-related keys from the input + that were present. If none of the specified keys are in the event, + the function returns an empty dictionary. + """ + keys = ["single_datetime", "start_datetime", "end_datetime", "datetime_range"] + return {key: event[key] for key in keys if key in event} + + +def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=None): + bucket = event.get("bucket") + prefix = event.get("prefix", "") + filename_regex = event.get("filename_regex", None) + collection = event.get("collection", prefix.rstrip("/")) + properties = event.get("properties", {}) + assets = event.get("assets") + id_regex = event.get("id_regex") + id_template = event.get("id_template", "{}") + date_fields = propagate_forward_datetime_args(event) + dry_run = event.get("dry_run", False) + if process_from := event.get("process_from_yyyy_mm_dd"): + process_from = datetime.strptime(process_from, "%Y-%m-%d").replace( + tzinfo=tzlocal() + ) + if last_execution := event.get("last_successful_execution"): + last_execution = datetime.fromisoformat(last_execution) + if dry_run: + print("Running discovery in dry run mode") + + payload = {**event, "objects": []} + slice = event.get("slice") + + bucket_output = os.environ.get("EVENT_BUCKET", bucket_output) + key = f"s3://{bucket_output}/events/{collection}" + records = 0 + out_keys = [] + discovered = [] + + kwargs = assume_role(role_arn=role_arn) if role_arn else {} + s3client = boto3.client("s3", **kwargs) + s3_iterator = get_s3_resp_iterator( + bucket_name=bucket, prefix=prefix, s3_client=s3client + ) + file_uris = [ + f"s3://{bucket}/{obj['Key']}" + for obj in discover_from_s3( + s3_iterator, filename_regex, last_execution=process_from or last_execution + ) + ] + + if len(file_uris) == 0: + raise EmptyFileListError(f"No files discovered at bucket: {bucket}, prefix: {prefix}") + + # group only if more than 1 assets + if assets and len(assets.keys()) > 1: + items_with_assets = group_by_item(file_uris, id_regex, assets) + else: + # out of convenience, we might not always want to explicitly define assets + # or if only a single asset is defined, follow default flow + items_with_assets = construct_single_asset_items(file_uris, assets) + + if len(items_with_assets) == 0: + raise EmptyFileListError( + f"No items could be constructed for files at bucket: {bucket}, prefix: {prefix}" + ) + + # Update IDs using id_template + for item in items_with_assets: + item["item_id"] = id_template.format(item["item_id"]) + + item_count = 0 + for item in items_with_assets: + item_count += 1 + # Logic to ingest a 'slice' of data + if slice: + if item_count < slice[0]: # Skip until we reach the start of the slice + continue + if ( + item_count >= slice[1] + ): # Stop once we reach the end of the slice, while saving progress + break + file_obj = { + "collection": collection, + "item_id": item["item_id"], + "assets": item["assets"], + "properties": properties, + **date_fields, + } + + if dry_run and item_count < 10: + print("-DRYRUN- Example item") + print(json.dumps(file_obj)) + + payload["objects"].append(file_obj) + if records == chunk_size: + out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) + records = 0 + discovered.append(len(payload["objects"])) + payload["objects"] = [] + records += 1 + + if payload["objects"]: + out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) + discovered.append(len(payload["objects"])) + # We need to make sure the payload isn't too large for ECS overrides + try: + del event["assets"] + except KeyError: + pass + return {**event, "payload": out_keys, "discovered": discovered} diff --git a/sm2a/dags/veda_data_pipeline/utils/schemas.py b/sm2a/dags/veda_data_pipeline/utils/schemas.py new file mode 100644 index 00000000..c5f33b9e --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/schemas.py @@ -0,0 +1,15 @@ +# Description: Lightweight schema definitions + +from datetime import datetime +from typing import List, Union +from stac_pydantic.collection import Extent, TimeInterval + + +class DatetimeInterval(TimeInterval): + # reimplement stac_pydantic's TimeInterval to leverage datetime types + interval: List[List[Union[datetime, None]]] + + +class SpatioTemporalExtent(Extent): + # reimplement stac_pydantic's Extent to leverage datetime types + temporal: DatetimeInterval diff --git a/sm2a/dags/veda_data_pipeline/utils/submit_stac.py b/sm2a/dags/veda_data_pipeline/utils/submit_stac.py new file mode 100644 index 00000000..d57b57fe --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/submit_stac.py @@ -0,0 +1,135 @@ +import json +import os +import sys +from dataclasses import dataclass + +if sys.version_info >= (3, 8): + from typing import TypedDict +else: + from typing_extensions import TypedDict + +from typing import Any, Dict, Optional, Union + +import boto3 +import requests + + +class InputBase(TypedDict): + dry_run: Optional[Any] + + +class S3LinkInput(InputBase): + stac_file_url: str + + +class StacItemInput(InputBase): + stac_item: Dict[str, Any] + + +class AppConfig(TypedDict): + cognito_domain: str + client_id: str + client_secret: str + scope: str + + +class Creds(TypedDict): + access_token: str + expires_in: int + token_type: str + + +@dataclass +class IngestionApi: + base_url: str + token: str + + @classmethod + def from_veda_auth_secret(cls, *, secret_id: str, base_url: str) -> "IngestionApi": + cognito_details = cls._get_cognito_service_details(secret_id) + credentials = cls._get_app_credentials(**cognito_details) + return cls(token=credentials["access_token"], base_url=base_url) + + @staticmethod + def _get_cognito_service_details(secret_id: str) -> AppConfig: + client = boto3.client("secretsmanager") + response = client.get_secret_value(SecretId=secret_id) + return json.loads(response["SecretString"]) + + @staticmethod + def _get_app_credentials( + cognito_domain: str, client_id: str, client_secret: str, scope: str, **kwargs + ) -> Creds: + response = requests.post( + f"{cognito_domain}/oauth2/token", + headers={ + "Content-Type": "application/x-www-form-urlencoded", + }, + auth=(client_id, client_secret), + data={ + "grant_type": "client_credentials", + # A space-separated list of scopes to request for the generated access token. + "scope": scope, + }, + ) + try: + response.raise_for_status() + except Exception as ex: + print(response.text) + raise f"Error, {ex}" + return response.json() + + def submit(self, event: Dict[str, Any], endpoint: str) -> Dict[str, Any]: + headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + response = requests.post( + f"{self.base_url.rstrip('/')}{endpoint}", + json=event, + headers=headers, + ) + try: + response.raise_for_status() + except Exception as e: + print(response.text) + raise e + return response.json() + + +def submission_handler( + event: Union[S3LinkInput, StacItemInput, Dict[str, Any]], + endpoint: str = "/ingestions", + cognito_app_secret=None, + stac_ingestor_api_url=None, + context=None, +) -> None: + if context is None: + context = {} + + stac_item = event + + if stac_item.get("dry_run"): + print("Dry run, not inserting, would have inserted:") + print(json.dumps(stac_item, indent=2)) + return + + cognito_app_secret = cognito_app_secret or os.getenv("COGNITO_APP_SECRET") + stac_ingestor_api_url = stac_ingestor_api_url or os.getenv("STAC_INGESTOR_API_URL") + + ingestor = IngestionApi.from_veda_auth_secret( + secret_id=cognito_app_secret, + base_url=stac_ingestor_api_url, + ) + ingestor.submit(event=stac_item, endpoint=endpoint) + + +if __name__ == "__main__": + filename = "example.ndjson" + sample_event = { + "stac_file_url": "example.ndjson", + # or + "stac_item": {}, + "type": "collections", + } + submission_handler(sample_event) diff --git a/sm2a/dags/veda_data_pipeline/utils/transfer.py b/sm2a/dags/veda_data_pipeline/utils/transfer.py new file mode 100644 index 00000000..20823f37 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/transfer.py @@ -0,0 +1,110 @@ +import re + +import boto3 +from airflow.exceptions import AirflowException + + +def assume_role(role_arn, session_name="veda-data-airflow_s3-discovery"): + sts = boto3.client("sts") + print(f"Assuming role: {role_arn}") + credentials = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + ) + creds = credentials["Credentials"] + return { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds.get("SecretAccessKey"), + "aws_session_token": creds.get("SessionToken"), + } + + +def get_matching_files(s3_client, bucket, prefix, regex_pattern): + matching_files = [] + + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) + while True: + for obj in response["Contents"]: + file_key = obj["Key"] + if re.match(regex_pattern, file_key): + matching_files.append(file_key) + + if "NextContinuationToken" in response: + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + + return matching_files + + +def transfer_files_within_s3( + s3_client, origin_bucket, matching_files, destination_bucket, collection +): + for file_key in matching_files: + filename = file_key.split("/")[-1] + # print(f"Transferring file: {filename}") + target_key = f"{collection}/{filename}" + copy_source = {"Bucket": origin_bucket, "Key": file_key} + + # We can use the etag to check if the file has already been copied and avoid duplication of effort + # by using the CopySourceIfNoneMatch parameter below. + try: + target_metadata = s3_client.head_object( + Bucket=destination_bucket, Key=target_key + ) + target_etag = target_metadata["ETag"] + # print(f"File already exists, checking Etag: {filename}") + s3_client.copy_object( + CopySource=copy_source, + Bucket=destination_bucket, + Key=target_key, + CopySourceIfNoneMatch=target_etag, + ) + except s3_client.exceptions.ClientError as err: + if err.response["Error"]["Code"] == "404": + # print(f"Copying file: {filename}") + s3_client.copy_object( + CopySource=copy_source, + Bucket=destination_bucket, + Key=target_key + ) + + +def data_transfer_handler(event, role_arn=None): + origin_bucket = event.get("origin_bucket") + origin_prefix = event.get("origin_prefix") + filename_regex = event.get("filename_regex") + target_bucket = event.get("target_bucket") + collection = event.get("collection") + + kwargs = assume_role(role_arn=role_arn) if role_arn else {} + s3client = boto3.client("s3", **kwargs) + matching_files = get_matching_files( + s3_client=s3client, + bucket=origin_bucket, + prefix=origin_prefix, + regex_pattern=filename_regex, + ) + + if len(matching_files) == 0: + raise AirflowException("No matching files found") + + if not event.get("dry_run"): + transfer_files_within_s3( + s3_client=s3client, + origin_bucket=origin_bucket, + matching_files=matching_files, + destination_bucket=target_bucket, + collection=collection, + ) + else: + print( + f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}" + ) + print(f"Files matched: {matching_files}") + + return {**event} diff --git a/sm2a/dags/veda_data_pipeline/utils/vector_ingest/handler.py b/sm2a/dags/veda_data_pipeline/utils/vector_ingest/handler.py new file mode 100644 index 00000000..09e7d437 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/vector_ingest/handler.py @@ -0,0 +1,377 @@ +import base64 +from argparse import ArgumentParser +import boto3 +import os +import subprocess +import json +import smart_open +from urllib.parse import urlparse +import psycopg2 +import geopandas as gpd +from shapely import wkb +from geoalchemy2 import Geometry +import sqlalchemy +from sqlalchemy import create_engine, MetaData, Table, Column, inspect +import concurrent.futures +from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION, INTEGER, VARCHAR, TIMESTAMP + + +def download_file(file_uri: str, role_arn:[str, None]): + session = boto3.Session() + if role_arn: + sts = boto3.client("sts") + response = sts.assume_role( + RoleArn=role_arn, + RoleSessionName="airflow_vector_ingest", + ) + session = boto3.Session( + aws_access_key_id=response["Credentials"]["AccessKeyId"], + aws_secret_access_key=response["Credentials"]["SecretAccessKey"], + aws_session_token=response["Credentials"]["SessionToken"], + ) + s3 = session.client("s3") + + url_parse = urlparse(file_uri) + + bucket = url_parse.netloc + path = url_parse.path[1:] + filename = url_parse.path.split("/")[-1] + target_filepath = os.path.join("/tmp", filename) + + s3.download_file(bucket, path, target_filepath) + + print(f"downloaded {target_filepath}") + + + return target_filepath + + +def get_connection_string(secret: dict, as_uri: bool = False) -> str: + if as_uri: + return f"postgresql://{secret['username']}:{secret['password']}@{secret['host']}:5432/{secret['dbname']}" + else: + return f"PG:host={secret['host']} dbname={secret['dbname']} user={secret['username']} password={secret['password']}" + + +def get_gdf_schema(gdf, target_projection): + """map GeoDataFrame columns into a table schema + + :param gdf: GeoDataFrame from geopandas + :param target_projection: srid for the target table geometry column + :return: + """ + # map geodatafrome dtypes to sqlalchemy types + dtype_map = { + "int64": INTEGER, + "float64": DOUBLE_PRECISION, + "object": VARCHAR, + "datetime64": TIMESTAMP, + } + schema = [] + for column, dtype in zip(gdf.columns, gdf.dtypes): + if str(dtype) == "geometry": + # do not inpsect to retrieve geom type, just use generic GEOMETRY + # geom_type = str(gdf[column].geom_type.unique()[0]).upper() + geom_type = str(dtype).upper() + # do not taKe SRID from existing file for target table + # we always want to transform from file EPSG to Table EPSG() + column_type = Geometry(geometry_type=geom_type, srid=target_projection) + else: + dtype_str = str(dtype) + column_type = dtype_map.get(dtype_str.split("[")[0], VARCHAR) + + if column == "primarykey": + schema.append(Column(column.lower(), column_type, unique=True)) + else: + schema.append(Column(column.lower(), column_type)) + return schema + + +def ensure_table_exists( + db_metadata: MetaData, gpkg_file: str, target_projection: int, table_name: str +): + """create a table if it doesn't exist or just + validate GeoDataFrame columns against existing table + + :param db_metadata: instance of sqlalchemy.MetaData + :param gpkg_file: file path to GPKG + :param target_projection: srid for target DB table geometry column + :param table_name: name of table to create + :return: None + """ + gdf = gpd.read_file(gpkg_file) + gdf_schema = get_gdf_schema(gdf, target_projection) + engine = db_metadata.bind + try: + Table(table_name, db_metadata, autoload_with=engine) + except sqlalchemy.exc.NoSuchTableError: + Table(table_name, db_metadata, *gdf_schema) + db_metadata.create_all(engine) + + # validate gdf schema against existing table schema + insp = inspect(engine) + existing_columns = insp.get_columns(table_name) + existing_column_names = [col["name"] for col in existing_columns] + for column in gdf_schema: + if column.name not in existing_column_names: + raise ValueError( + f"your .gpkg seems to have a column={column.name} that does not exist in the existing table columns={existing_column_names}" + ) + + +def delete_region( + engine, + gpkg_path: str, + table_name: str, +): + gdf = gpd.read_file(gpkg_path) + if 'region' in gdf.columns: + region_name = gdf["region"].iloc[0] + with engine.connect() as conn: + with conn.begin(): + delete_sql = sqlalchemy.text( + f""" + DELETE FROM {table_name} WHERE region=:region_name + """ + ) + conn.execute(delete_sql, {'region_name': region_name}) + else: + print(f"'region' column not found in {gpkg_path}. No records deleted.") + + +def upsert_to_postgis( + engine, + gpkg_path: str, + target_projection: int, + table_name: str, + batch_size: int = 10000, +): + """batch the GPKG file and upsert via threads + + :param engine: instance of sqlalchemy.Engine + :param gpkg_path: file path to GPKG + :param table_name: name of the target table + :param batch_size: upper limit of batch size + :return: + """ + gdf = gpd.read_file(gpkg_path) + source_epsg_code = gdf.crs.to_epsg() + if not source_epsg_code: + # assume NAD27 Equal Area for now :shrug: + # since that's what the default is for Fire Atlas team exports + # that's what PROJ4 does under the hood for 9311 :wethinksmirk: + source_epsg_code = 2163 + + # convert the `t` column to something suitable for sql insertion otherwise we get 'Timestamp()' + gdf["t"] = gdf["t"].dt.strftime("%Y-%m-%d %H:%M:%S") + # convert to WKB + gdf["geometry"] = gdf["geometry"].apply(lambda geom: wkb.dumps(geom, hex=True)) + + def upsert_batch(batch): + with engine.connect() as conn: + with conn.begin(): + for row in batch.to_dict(orient="records"): + # make sure all column names are lower case for keys and values + row = {k.lower(): v for k, v in row.items()} + columns = [col.lower() for col in batch.columns] + + non_geom_placeholders = ", ".join( + [f":{col}" for col in columns[:-1]] + ) + # NOTE: we need to escape `::geometry` so parameterized statements don't try to replace it + # because parametrized statements in sqlalchemy are `:` + geom_placeholder = f"ST_Transform(ST_SetSRID(ST_GeomFromWKB(:geometry\:\:geometry), {source_epsg_code}), {target_projection})" # noqa: W605 + upsert_sql = sqlalchemy.text( + f""" + INSERT INTO {table_name} ({', '.join([col for col in columns])}) + VALUES ({non_geom_placeholders},{geom_placeholder}) + ON CONFLICT (primarykey) + DO UPDATE SET {', '.join(f"{col}=EXCLUDED.{col}" for col in columns if col != 'primarykey')} + """ + ) + + # logging.debug(f"[ UPSERT SQL ]:\n{str(upsert_sql)}") + conn.execute(upsert_sql, row) + + batches = [gdf.iloc[i : i + batch_size] for i in range(0, len(gdf), batch_size)] + # set `max_workers` to something below max concurrent connections for postgresql + # https://www.postgresql.org/docs/14/runtime-config-connection.html + with concurrent.futures.ThreadPoolExecutor(max_workers=75) as executor: + executor.map(upsert_batch, batches) + + +def get_secret(secret_name: str, region_name: str = "us-west-2") -> None: + """Retrieve secrets from AWS Secrets Manager + + Args: + secret_name (str): name of aws secrets manager secret containing database connection secrets + + Returns: + secrets (dict): decrypted secrets in dict + """ + + # Create a Secrets Manager client + session = boto3.session.Session(region_name=region_name) + client = session.client(service_name="secretsmanager") + + # In this sample we only handle the specific exceptions for the 'GetSecretValue' API. + # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html + # We rethrow the exception by default. + + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + + # Decrypts secret using the associated KMS key. + # Depending on whether the secret is a string or binary, one of these fields will be populated. + if "SecretString" in get_secret_value_response: + return json.loads(get_secret_value_response["SecretString"]) + else: + return json.loads(base64.b64decode(get_secret_value_response["SecretBinary"])) + + +def load_to_featuresdb( + filename: str, + collection: str, + vector_secret_name: str, + extra_flags: list = None, + target_projection: str = "EPSG:4326", +): + if extra_flags is None: + extra_flags = ["-overwrite", "-progress"] + + secret_name = vector_secret_name + + con_secrets = get_secret(secret_name) + connection = get_connection_string(con_secrets) + + print(f"running ogr2ogr import for collection: {collection}") + options = [ + "ogr2ogr", + "-f", + "PostgreSQL", + connection, + "-t_srs", + target_projection, + filename, + "-nln", + collection, + *extra_flags, + ] + out = subprocess.run( + options, + check=False, + capture_output=True, + ) + + if out.stderr: + error_description = f"Error: {out.stderr}" + print(error_description) + return {"status": "failure", "reason": error_description} + + return {"status": "success"} + + +def load_to_featuresdb_eis( + filename: str, + collection: str, + vector_secret_name: str, + target_projection: int = 4326, +): + """create table if not exists and upload GPKG + + :param filename: the file path to the downloaded GPKG + :param collection: the name of the collection + :param target_projection: srid for the target table + :return: None + """ + secret_name = vector_secret_name + conn_secrets = get_secret(secret_name) + connection_string = get_connection_string(conn_secrets, as_uri=True) + + # NOTE: about `collection.rsplit` below: + # + # EIS Fire team naming convention for outputs + # Snapshots: "snapshot_{layer_name}_nrt_{region_name}.gpkg" + # Lf_archive: "lf_{layer_name}_archive_{region_name}.gpkg" + # Lf_nrt: "lf_{layer_name}_nrt_{region_name}.gpkg" + # + # Insert/Alter on table call everything except the region name: + # e.g. `snapshot_perimeter_nrt_conus` this gets inserted into the table `eis_fire_snapshot_perimeter_nrt` + collection = collection.rsplit("_", 1)[0] + target_table_name = f"eis_fire_{collection}" + + engine = create_engine(connection_string) + metadata = MetaData() + metadata.bind = engine + + ensure_table_exists(metadata, filename, target_projection, target_table_name) + delete_region(engine, filename, target_table_name) + upsert_to_postgis(engine, filename, target_projection, target_table_name) + return {"status": "success"} + + +def alter_datetime_add_indexes_eis(collection: str,vector_secret_name: str ): + # NOTE: about `collection.rsplit` below: + # + # EIS Fire team naming convention for outputs + # Snapshots: "snapshot_{layer_name}_nrt_{region_name}.gpkg" + # Lf_archive: "lf_{layer_name}_archive_{region_name}.gpkg" + # Lf_nrt: "lf_{layer_name}_nrt_{region_name}.gpkg" + # + # Insert/Alter on table call everything except the region name: + # e.g. `snapshot_perimeter_nrt_conus` this gets inserted into the table `eis_fire_snapshot_perimeter_nrt` + collection = collection.rsplit("_", 1)[0] + + secret_name = vector_secret_name + conn_secrets = get_secret(secret_name) + conn = psycopg2.connect( + host=conn_secrets["host"], + dbname=conn_secrets["dbname"], + user=conn_secrets["username"], + password=conn_secrets["password"], + ) + + cur = conn.cursor() + cur.execute( + f"ALTER table eis_fire_{collection} " + f"ALTER COLUMN t TYPE TIMESTAMP USING t::timestamp without time zone; " + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_datetime ON eis_fire_{collection}(t);" + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_primarykey ON eis_fire_{collection}(primarykey);" + f"CREATE INDEX IF NOT EXISTS idx_eis_fire_{collection}_region ON eis_fire_{collection}(region);" + ) + conn.commit() + + +def handler(payload_src: dict, vector_secret_name: str, assume_role_arn: [str, None]): + + payload_event = payload_src.copy() + s3_event = payload_event.pop("payload") + with smart_open.open(s3_event, "r") as _file: + s3_event_read = _file.read() + event_received = json.loads(s3_event_read) + s3_objects = event_received["objects"] + status = list() + for s3_object in s3_objects: + href = s3_object["assets"]["default"]["href"] + collection = s3_object["collection"] + downloaded_filepath = download_file(href, assume_role_arn) + print(f"[ DOWNLOAD FILEPATH ]: {downloaded_filepath}") + print(f"[ COLLECTION ]: {collection}") + + s3_object_prefix = event_received["prefix"] + if s3_object_prefix.startswith("EIS/"): + coll_status = load_to_featuresdb_eis(downloaded_filepath, collection, vector_secret_name) + else: + coll_status = load_to_featuresdb(downloaded_filepath, collection, vector_secret_name) + + status.append(coll_status) + # delete file after ingest + os.remove(downloaded_filepath) + + if coll_status["status"] == "success" and s3_object_prefix.startswith("EIS/"): + alter_datetime_add_indexes_eis(collection, vector_secret_name) + elif coll_status["status"] != "success": + # bubble exception so Airflow shows it as a failure + raise Exception(coll_status["reason"]) + return status + + diff --git a/sm2a/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt b/sm2a/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt new file mode 100644 index 00000000..35d23946 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/utils/vector_ingest/requirements.txt @@ -0,0 +1,7 @@ +smart-open==6.3.0 +psycopg2-binary==2.9.9 +requests==2.30.0 +boto3==1.26.129 +GeoAlchemy2==0.14.2 +geopandas==0.14.4 +SQLAlchemy==2.0.23 diff --git a/sm2a/dags/veda_data_pipeline/veda_collection_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_collection_pipeline.py new file mode 100644 index 00000000..8e67584a --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_collection_pipeline.py @@ -0,0 +1,49 @@ +import pendulum +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator as EmptyOperator +from airflow.utils.trigger_rule import TriggerRule +from veda_data_pipeline.groups.collection_group import collection_task_group + +dag_doc_md = """ +### Collection Creation and Ingestion +Generates a collection based on the Dataset model and ingests into the catalog +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "collection-id", + "data_type": "cog", + "description": "collection description", + "is_periodic": true, + "license": "collection-LICENSE", + "time_density": "year", + "title": "collection-title" +} +``` +""" + +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, + "tags": ["collection"], +} + +template_dag_run_conf = { + "collection": "", + "data_type": "cog", + "description": "", + "is_periodic": "", + "license": "", + "time_density": "", + "title": "" +} + +with DAG("veda_collection_pipeline", params=template_dag_run_conf, **dag_args) as dag: + start = EmptyOperator(task_id="start", dag=dag) + end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag) + + collection_grp = collection_task_group() + + start >> collection_grp >> end diff --git a/sm2a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_dataset_pipeline.py new file mode 100644 index 00000000..1c8746cb --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -0,0 +1,101 @@ +import pendulum +from airflow import DAG +from airflow.decorators import task +from airflow.operators.dummy_operator import DummyOperator as EmptyOperator +from airflow.models.variable import Variable +import json +from veda_data_pipeline.groups.collection_group import collection_task_group +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_dataset_files_to_process +from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task + +dag_doc_md = """ +### Dataset Pipeline +Generates a collection and triggers the file discovery process +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "collection-id", + "data_type": "cog", + "description": "collection description", + "discovery_items": + [ + { + "bucket": "veda-data-store-staging", + "datetime_range": "year", + "discovery": "s3", + "filename_regex": "^(.*).tif$", + "prefix": "example-prefix/" + } + ], + "is_periodic": true, + "license": "collection-LICENSE", + "time_density": "year", + "title": "collection-title" +} +``` +""" + +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, + "tags": ["collection", "discovery"], +} + + +@task +def extract_discovery_items(**kwargs): + ti = kwargs.get("ti") + discovery_items = ti.dag_run.conf.get("discovery_items") + print(discovery_items) + return discovery_items + + +@task(max_active_tis_per_dag=3) +def build_stac_task(payload): + from veda_data_pipeline.utils.build_stac.handler import stac_handler + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + event_bucket = airflow_vars_json.get("EVENT_BUCKET") + return stac_handler(payload_src=payload, bucket_output=event_bucket) + + +template_dag_run_conf = { + "collection": "", + "data_type": "cog", + "description": "", + "discovery_items": + [ + { + "bucket": "", + "datetime_range": "", + "discovery": "s3", + "filename_regex": "", + "prefix": "" + } + ], + "is_periodic": "", + "license": "", + "time_density": "", + "title": "" +} + +with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: + # ECS dependency variable + + start = EmptyOperator(task_id="start", dag=dag) + end = EmptyOperator(task_id="end", dag=dag) + + collection_grp = collection_task_group() + discover = discover_from_s3_task.expand(event=extract_discovery_items()) + discover.set_upstream(collection_grp) # do not discover until collection exists + get_files = get_dataset_files_to_process(payload=discover) + + build_stac = build_stac_task.expand(payload=get_files) + # .output is needed coming from a non-taskflow operator + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) + + collection_grp.set_upstream(start) + submit_stac.set_downstream(end) diff --git a/sm2a/dags/veda_data_pipeline/veda_discover_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_discover_pipeline.py new file mode 100644 index 00000000..49d790b2 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_discover_pipeline.py @@ -0,0 +1,112 @@ +import pendulum +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.decorators import task +from airflow.models.variable import Variable +import json +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process +from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task + +dag_doc_md = """ +### Discover files from S3 +#### Purpose +This DAG discovers files from either S3 and/or CMR then runs a DAG id `veda_ingest`. +The DAG `veda_ingest` will run in parallel processing (2800 files per each DAG) +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "collection-id", + "bucket": "veda-data-store-staging", + "prefix": "s3-prefix/", + "filename_regex": "^(.*).tif$", + "id_regex": ".*_(.*).tif$", + "process_from_yyyy_mm_dd": "YYYY-MM-DD", + "id_template": "example-id-prefix-{}", + "datetime_range": "month", + "last_successful_execution": datetime(2015,01,01), + "assets": { + "asset1": { + "title": "Asset type 1", + "description": "First of a multi-asset item.", + "regex": ".*asset1.*", + }, + "asset2": { + "title": "Asset type 2", + "description": "Second of a multi-asset item.", + "regex": ".*asset2.*", + }, + } +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "catchup": False, + "doc_md": dag_doc_md, + "is_paused_upon_creation": False, +} + +template_dag_run_conf = { + "collection": "", + "bucket": "", + "prefix": "/", + "filename_regex": "", + "id_regex": "", + "id_template": "", + "datetime_range": "||", + "assets": { + "": { + "title": "", + "description": "", + "regex": "", + }, + "": { + "title": "", + "description": "", + "regex": "", + }, + }, +} + + +@task(max_active_tis_per_dag=5) +def build_stac_task(payload): + from veda_data_pipeline.utils.build_stac.handler import stac_handler + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + event_bucket = airflow_vars_json.get("EVENT_BUCKET") + return stac_handler(payload_src=payload, bucket_output=event_bucket) + + +def get_discover_dag(id, event=None): + if not event: + event = {} + params_dag_run_conf = event or template_dag_run_conf + with DAG( + id, + schedule_interval=event.get("schedule"), + params=params_dag_run_conf, + **dag_args + ) as dag: + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator( + task_id="End", dag=dag + ) + # define DAG using taskflow notation + + discover = discover_from_s3_task(event=event) + get_files = get_files_to_process(payload=discover) + build_stac = build_stac_task.expand(payload=get_files) + # .output is needed coming from a non-taskflow operator + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) + + discover.set_upstream(start) + submit_stac.set_downstream(end) + + return dag + + +get_discover_dag("veda_discover") diff --git a/sm2a/dags/veda_data_pipeline/veda_generic_vector_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_generic_vector_pipeline.py new file mode 100644 index 00000000..b359d6dd --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_generic_vector_pipeline.py @@ -0,0 +1,84 @@ +import pendulum +from airflow.decorators import task +from airflow import DAG +from airflow.models.variable import Variable +import json +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process + +dag_doc_md = """ +### Generic Ingest Vector +#### Purpose +This DAG is used to ingest vector data for use in the VEDA Features API + +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "", + "prefix": "transformed_csv/", + "bucket": "ghgc-data-store-develop", + "filename_regex": ".*.csv$", + "discovery": "s3", + "datetime_range": "month", + "vector": true, + "id_regex": "", + "id_template": "NIST_Urban_Testbed_test-{}", + "datetime_range": "", + "vector": true, + "x_possible": "longitude", + "y_possible": "latitude", + "source_projection": "EPSG:4326", + "target_projection": "EPSG:4326", + "extra_flags": ["-overwrite", "-lco", "OVERWRITE=YES"] + "discovered": 33, + "payload": "s3://data-pipeline-ghgc-dev-mwaa-597746869805/events/test_layer_name2/s3_discover_output_f88257e8-ee50-4a14-ace4-5612ae6ebf38.jsonn" +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +template_dag_run_conf = { + "collection": "", + "prefix": "/", + "bucket": "", + "filename_regex": "", + "id_template": "-{}", + "datetime_range": "|", + "vector": "false | true", + "x_possible": "", + "y_possible": "", + "source_projection": "", + "target_projection": "", + "extra_flags": "", + "payload": "", +} +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, +} + + +@task +def ingest_vector_task(payload): + from veda_data_pipeline.utils.vector_ingest.handler import handler + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + read_role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + vector_secret_name = airflow_vars_json.get("VECTOR_SECRET_NAME") + return handler(payload_src=payload, vector_secret_name=vector_secret_name, + assume_role_arn=read_role_arn) + + +with DAG(dag_id="veda_generic_ingest_vector", params=template_dag_run_conf, **dag_args) as dag: + + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + discover = discover_from_s3_task() + get_files = get_files_to_process(payload=discover) + vector_ingest = ingest_vector_task.expand(payload=get_files) + discover.set_upstream(start) + vector_ingest.set_downstream(end) diff --git a/sm2a/dags/veda_data_pipeline/veda_transfer_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_transfer_pipeline.py new file mode 100644 index 00000000..6c1b4f3a --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_transfer_pipeline.py @@ -0,0 +1,50 @@ +import pendulum +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from veda_data_pipeline.groups.transfer_group import subdag_transfer + +dag_doc_md = """ +### Discover files from S3 +#### Purpose +This DAG is used to transfer files that are to permanent locations for indexing with STAC. +#### Notes +- This DAG can run with a configuration similar to this
+```json +{ + "origin_bucket": "covid-eo-dashboard", + "origin_prefix": "s3-prefix/", + "filename_regex": "^(.*).tif$", + "target_bucket": "target_s3_bucket", + "collection": "collection-id", + "cogify": false, + "dry_run": true +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, +} + +templat_dag_run_conf = { + "origin_bucket": "", + "origin_prefix": "/", + "filename_regex": "", + "target_bucket": "", + "collection": "", + "cogify": "true|false", + "dry_run": "true|false", +} + +with DAG("veda_transfer", params=templat_dag_run_conf, **dag_args) as dag: + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + + transfer_grp = subdag_transfer() + + start >> transfer_grp >> end diff --git a/sm2a/dags/veda_data_pipeline/veda_vector_pipeline.py b/sm2a/dags/veda_data_pipeline/veda_vector_pipeline.py new file mode 100644 index 00000000..c264ea92 --- /dev/null +++ b/sm2a/dags/veda_data_pipeline/veda_vector_pipeline.py @@ -0,0 +1,71 @@ +import pendulum +from airflow.decorators import task +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from airflow.models.variable import Variable +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process +import json + +dag_doc_md = """ +### Build and submit stac +#### Purpose +This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API + +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "geoglam", + "prefix": "geoglam/", + "bucket": "veda-data-store-staging", + "filename_regex": "^(.*).tif$", + "discovery": "s3", + "datetime_range": "month", + "upload": false, + "cogify": false, + "discovered": 33, + "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-19d164531cdc.json" +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +template_dag_run_conf = { + "collection": "", + "prefix": "/", + "bucket": "", + "filename_regex": "", + "discovery": "|cmr", + "datetime_range": "|", + "upload": " | true", + "cogify": "false | true" +} +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, +} + + +@task +def ingest_vector_task(payload): + from veda_data_pipeline.utils.vector_ingest.handler import handler + + airflow_vars = Variable.get("aws_dags_variables") + airflow_vars_json = json.loads(airflow_vars) + read_role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") + vector_secret_name = airflow_vars_json.get("VECTOR_SECRET_NAME") + return handler(payload_src=payload, vector_secret_name=vector_secret_name, + assume_role_arn=read_role_arn) + + +with DAG(dag_id="veda_ingest_vector", params=template_dag_run_conf, **dag_args) as dag: + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + discover = discover_from_s3_task() + get_files = get_files_to_process(payload=discover) + vector_ingest = ingest_vector_task.expand(payload=get_files) + discover.set_upstream(start) + vector_ingest.set_downstream(end) diff --git a/sm2a/infrastructure/.terraform.lock.hcl b/sm2a/infrastructure/.terraform.lock.hcl index b9eab3ab..2bdb5aef 100644 --- a/sm2a/infrastructure/.terraform.lock.hcl +++ b/sm2a/infrastructure/.terraform.lock.hcl @@ -1,6 +1,25 @@ # This file is maintained automatically by "terraform init". # Manual edits may be lost in future updates. +provider "registry.terraform.io/hashicorp/archive" { + version = "2.6.0" + hashes = [ + "h1:upAbF0KeKLAs3UImwwp5veC7jRcLnpKWVjkbd4ziWhM=", + "zh:29273484f7423b7c5b3f5df34ccfc53e52bb5e3d7f46a81b65908e7a8fd69072", + "zh:3cba58ec3aea5f301caf2acc31e184c55d994cc648126cac39c63ae509a14179", + "zh:55170cd17dbfdea842852c6ae2416d057fec631ba49f3bb6466a7268cd39130e", + "zh:7197db402ba35631930c3a4814520f0ebe980ae3acb7f8b5a6f70ec90dc4a388", + "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", + "zh:8bf7fe0915d7fb152a3a6b9162614d2ec82749a06dba13fab3f98d33c020ec4f", + "zh:8ce811844fd53adb0dabc9a541f8cb43aacfa7d8e39324e4bd3592b3428f5bfb", + "zh:bca795bca815b8ac90e3054c0a9ab1ccfb16eedbb3418f8ad473fc5ad6bf0ef7", + "zh:d9355a18df5a36cf19580748b23249de2eb445c231c36a353709f8f40a6c8432", + "zh:dc32cc32cfd8abf8752d34f2a783de0d3f7200c573b885ecb64ece5acea173b4", + "zh:ef498e20391bf7a280d0fd6fd6675621c85fbe4e92f0f517ae4394747db89bde", + "zh:f2bc5226c765b0c8055a7b6207d0fe1eb9484e3ec8880649d158827ac6ed3b22", + ] +} + provider "registry.terraform.io/hashicorp/aws" { version = "4.67.0" constraints = "~> 4.0" diff --git a/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py b/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py new file mode 100644 index 00000000..d48e22c6 --- /dev/null +++ b/sm2a/infrastructure/functions/s3_event_bridge_to_sfn_execute/lambda_function.py @@ -0,0 +1,61 @@ +import http.client +import json +import os +import uuid +from base64 import b64encode + +import boto3 + + +def lambda_handler(event, context): + secrets_client = boto3.client("secretsmanager") + sm2a_secret_manager_name = os.getenv("SM2A_SECRET_MANAGER_NAME") + dag_name = os.getenv("TARGET_DAG_ID") + storage_bucket = os.getenv("STORAGE_BUCKET") + try: + secret_response = secrets_client.get_secret_value( + SecretId=sm2a_secret_manager_name + ) + secret_data = json.loads(secret_response["SecretString"]) + sm2a_domain_name = secret_data["airflow_webserver_url"] + username = secret_data["airflow_admin_username"] + password = secret_data["airflow_admin_password"] + record = event["Records"][0] + print(record) + # Create the HTTP connection + conn = http.client.HTTPSConnection(sm2a_domain_name) + except Exception as ex: + return {"statusCode": 500, "body": json.dumps(f"Error: {ex}")} + + s3_event_key = record["s3"]["object"]["key"] + s3_filename_target = os.path.split(s3_event_key)[-1] + s3_filename_no_ext = os.path.splitext(s3_filename_target)[0] + bucket_key_prefix = os.path.dirname(s3_event_key) + data = { + "conf": { + "discovery": "s3", + "collection": s3_filename_no_ext, + "prefix": bucket_key_prefix, + "bucket": storage_bucket, + "filename_regex": f"^(.*){s3_filename_target}$", + "vector_eis": True, + }, + "dag_run_id": f"{dag_name}-{uuid.uuid4()}", + "note": "Run from S3 Event bridge", + } + headers = { + "Content-Type": "application/json", + "Authorization": "Basic " + + b64encode(f"{username}:{password}".encode()).decode(), + } + + conn.request("POST", f"/api/v1/dags/{dag_name}/dagRuns", json.dumps(data), headers) + + # Get the response + response = conn.getresponse() + response_data = response.read() + + # Close the connection + conn.close() + + return {"statusCode": response.status, "body": response_data.decode()} diff --git a/sm2a/infrastructure/locals.tf b/sm2a/infrastructure/locals.tf new file mode 100644 index 00000000..8cd5b4eb --- /dev/null +++ b/sm2a/infrastructure/locals.tf @@ -0,0 +1,12 @@ +provider "aws" { + alias = "aws_current" + region = var.aws_region +} + +data "aws_caller_identity" "current" {} +data "aws_region" "current" {} + +locals { + aws_region = data.aws_region.current.name + account_id = data.aws_caller_identity.current.account_id +} diff --git a/sm2a/infrastructure/s3_event_bridge_lambda.tf b/sm2a/infrastructure/s3_event_bridge_lambda.tf new file mode 100644 index 00000000..66e41e96 --- /dev/null +++ b/sm2a/infrastructure/s3_event_bridge_lambda.tf @@ -0,0 +1,173 @@ +##################################################### + +##################################################### +# Execution Role +##################################################### +resource "aws_iam_role" "lambda_exec_role" { + provider = aws.aws_current + name = "lambda-exec-role-s3-event-bridge-veda-${var.stage}" + permissions_boundary = var.permission_boundaries_arn + + assume_role_policy = <