From 9319547d7b9d37e8843131b90ba20a5dd30be865 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 20 Nov 2024 09:00:34 +0000 Subject: [PATCH] feat(airflow): Add renaming task --- .../services/airflow/dags/india/nwp-dag.py | 45 +++++++++++-------- .../modules/services/airflow/dags/utils/s3.py | 44 ++++++++++++++++++ terraform/modules/services/airflow/eb.tf | 7 +++ 3 files changed, 78 insertions(+), 18 deletions(-) create mode 100644 terraform/modules/services/airflow/dags/utils/s3.py diff --git a/terraform/modules/services/airflow/dags/india/nwp-dag.py b/terraform/modules/services/airflow/dags/india/nwp-dag.py index b12d1a69..bcd59069 100644 --- a/terraform/modules/services/airflow/dags/india/nwp-dag.py +++ b/terraform/modules/services/airflow/dags/india/nwp-dag.py @@ -5,6 +5,7 @@ from airflow.operators.latest_only import LatestOnlyOperator from utils.slack import on_failure_callback +from utils.s3 import determine_latest_zarr default_args = { 'owner': 'airflow', @@ -24,7 +25,13 @@ region = 'india' -with DAG(f'{region}-nwp-consumer', schedule_interval="0 * * * *", default_args=default_args, concurrency=10, max_active_tasks=10) as dag: +with DAG( + f'{region}-nwp-consumer', + schedule_interval="0 * * * *", + default_args=default_args, + concurrency=10, + max_active_tasks=10, +) as dag: dag.doc_md = "Get NWP data" latest_only = LatestOnlyOperator(task_id="latest_only") @@ -45,7 +52,6 @@ task_concurrency=10, ) - nwp_consumer_gfs = EcsRunTaskOperator( task_id=f'{region}-nwp-consumer-gfs-india', task_definition='nwp-consumer-gfs-india', @@ -62,23 +68,26 @@ task_concurrency=10, ) - # nwp_consumer_meteomatics = EcsRunTaskOperator( - # task_id=f'{region}-nwp-consumer-meteomatics-india', - # task_definition='nwp-consumer-meteomatics-india', - # cluster=cluster, - # overrides={}, - # launch_type="FARGATE", - # network_configuration={ - # "awsvpcConfiguration": { - # "subnets": [subnet], - # "securityGroups": [security_group], - # "assignPublicIp": "ENABLED", - # }, - # }, - # task_concurrency=10, - # ) + nwp_consumer_metoffice = ECSRuntaskOperator( + task_id=f'{region}-nwp-consumer-metoffice-india', + task_definition='nwp-consumer-metoffice-india', + cluster=cluster, + overrides={}, + launch_type="FARGATE", + network_configuration={ + "awsvpcConfiguration": { + "subnets": [subnet], + "securityGroups": [security_group], + "assignPublicIp": "ENABLED", + }, + }, + task_concurrency=10, + ) + rename_zarr_metoffice = determine_latest_zarr.override( + task_id="determine_latest_zarr_metoffice", + )(bucket=f"india-nwp-{env}", prefix="metoffice/data") latest_only >> nwp_consumer_ecmwf latest_only >> nwp_consumer_gfs - # latest_only >> nwp_consumer_meteomatics + latest_only >> nwp_consumer_metoffice >> rename_zarr_metoffice diff --git a/terraform/modules/services/airflow/dags/utils/s3.py b/terraform/modules/services/airflow/dags/utils/s3.py new file mode 100644 index 00000000..b8463b25 --- /dev/null +++ b/terraform/modules/services/airflow/dags/utils/s3.py @@ -0,0 +1,44 @@ +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.decorators import task + +@task(task_id="determine_latest_zarr") +def determine_latest_zarr(bucket: str, prefix: str): + s3hook = S3Hook(aws_conn_id=None) # Use Boto3 default connection strategy + # Get a list of all the non-latest zarrs in the bucket prefix + prefixes = s3hook.list_prefixes(bucket_name=bucket, prefix=prefix + "/", delimiter='/') + zarrs = sorted([ + p for p in prefixes if p.endswith('.zarr/') and "latest" not in p + ], reverse=True) + # Get the size of the most recent zarr and the latest.zarr zarr + s3bucket = s3hook.get_bucket(bucket_name=bucket) + size_old, size_new = (0, 0) + print(s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/")) + if len(zarrs) == 0: + s3hook.log.info("No non-latest zarrs found in bucket, exiting") + return + + for obj in s3bucket.objects.filter(Prefix=zarrs[0]): + size_new += obj.size + + if prefix + "/latest.zarr/" in prefixes: + for obj in s3bucket.objects.filter(Prefix=prefix + "/latest.zarr/"): + size_old += obj.size + + # If the sizes are different, create a new latest.zarr + if size_old != size_new and size_new > 0: + # Delete the old latest.zarr, if it exists + if prefix + "/latest.zarr/" in prefixes: + s3hook.log.debug(f"Deleting {prefix}/latest.zarr/") + keys_to_delete = s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/") + s3hook.delete_objects(bucket_name=bucket, keys=keys_to_delete) + # Copy the new latest.zarr + s3hook.log.info(f"Copying {zarrs[0]} to {prefix}/latest.zarr/") + source_keys = s3hook.list_keys(bucket_name=bucket, prefix=zarrs[0]) + for key in source_keys: + s3hook.copy_object( + source_bucket_name=bucket, + source_bucket_key=key, + dest_bucket_name=bucket, + dest_bucket_key=prefix + "/latest.zarr/" + key.split(zarrs[0])[-1], + ) + diff --git a/terraform/modules/services/airflow/eb.tf b/terraform/modules/services/airflow/eb.tf index 4fdb626c..6b775ec8 100644 --- a/terraform/modules/services/airflow/eb.tf +++ b/terraform/modules/services/airflow/eb.tf @@ -141,6 +141,13 @@ resource "aws_elastic_beanstalk_environment" "eb-api-env" { resource = "" } + setting { + namespace = "aws:elasticbeanstalk:application:environment" + name = "AWS_DEFAULT_REGION" + value = var.aws-region + resource = "" + } + setting { namespace = "aws:ec2:vpc" name = "VPCId"