Skip to content

Commit

Permalink
feat(airflow): Add renaming task
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 20, 2024
1 parent 775e255 commit 9319547
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 18 deletions.
45 changes: 27 additions & 18 deletions terraform/modules/services/airflow/dags/india/nwp-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import on_failure_callback
from utils.s3 import determine_latest_zarr

default_args = {
'owner': 'airflow',
Expand All @@ -24,7 +25,13 @@

region = 'india'

with DAG(f'{region}-nwp-consumer', schedule_interval="0 * * * *", default_args=default_args, concurrency=10, max_active_tasks=10) as dag:
with DAG(
f'{region}-nwp-consumer',
schedule_interval="0 * * * *",
default_args=default_args,
concurrency=10,
max_active_tasks=10,
) as dag:
dag.doc_md = "Get NWP data"

latest_only = LatestOnlyOperator(task_id="latest_only")
Expand All @@ -45,7 +52,6 @@
task_concurrency=10,
)


nwp_consumer_gfs = EcsRunTaskOperator(
task_id=f'{region}-nwp-consumer-gfs-india',
task_definition='nwp-consumer-gfs-india',
Expand All @@ -62,23 +68,26 @@
task_concurrency=10,
)

# nwp_consumer_meteomatics = EcsRunTaskOperator(
# task_id=f'{region}-nwp-consumer-meteomatics-india',
# task_definition='nwp-consumer-meteomatics-india',
# cluster=cluster,
# overrides={},
# launch_type="FARGATE",
# network_configuration={
# "awsvpcConfiguration": {
# "subnets": [subnet],
# "securityGroups": [security_group],
# "assignPublicIp": "ENABLED",
# },
# },
# task_concurrency=10,
# )
nwp_consumer_metoffice = ECSRuntaskOperator(
task_id=f'{region}-nwp-consumer-metoffice-india',
task_definition='nwp-consumer-metoffice-india',
cluster=cluster,
overrides={},
launch_type="FARGATE",
network_configuration={
"awsvpcConfiguration": {
"subnets": [subnet],
"securityGroups": [security_group],
"assignPublicIp": "ENABLED",
},
},
task_concurrency=10,
)
rename_zarr_metoffice = determine_latest_zarr.override(
task_id="determine_latest_zarr_metoffice",
)(bucket=f"india-nwp-{env}", prefix="metoffice/data")

latest_only >> nwp_consumer_ecmwf
latest_only >> nwp_consumer_gfs
# latest_only >> nwp_consumer_meteomatics
latest_only >> nwp_consumer_metoffice >> rename_zarr_metoffice

44 changes: 44 additions & 0 deletions terraform/modules/services/airflow/dags/utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.decorators import task

@task(task_id="determine_latest_zarr")
def determine_latest_zarr(bucket: str, prefix: str):
s3hook = S3Hook(aws_conn_id=None) # Use Boto3 default connection strategy
# Get a list of all the non-latest zarrs in the bucket prefix
prefixes = s3hook.list_prefixes(bucket_name=bucket, prefix=prefix + "/", delimiter='/')
zarrs = sorted([
p for p in prefixes if p.endswith('.zarr/') and "latest" not in p
], reverse=True)
# Get the size of the most recent zarr and the latest.zarr zarr
s3bucket = s3hook.get_bucket(bucket_name=bucket)
size_old, size_new = (0, 0)
print(s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/"))
if len(zarrs) == 0:
s3hook.log.info("No non-latest zarrs found in bucket, exiting")
return

for obj in s3bucket.objects.filter(Prefix=zarrs[0]):
size_new += obj.size

if prefix + "/latest.zarr/" in prefixes:
for obj in s3bucket.objects.filter(Prefix=prefix + "/latest.zarr/"):
size_old += obj.size

# If the sizes are different, create a new latest.zarr
if size_old != size_new and size_new > 0:
# Delete the old latest.zarr, if it exists
if prefix + "/latest.zarr/" in prefixes:
s3hook.log.debug(f"Deleting {prefix}/latest.zarr/")
keys_to_delete = s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/")
s3hook.delete_objects(bucket_name=bucket, keys=keys_to_delete)
# Copy the new latest.zarr
s3hook.log.info(f"Copying {zarrs[0]} to {prefix}/latest.zarr/")
source_keys = s3hook.list_keys(bucket_name=bucket, prefix=zarrs[0])
for key in source_keys:
s3hook.copy_object(
source_bucket_name=bucket,
source_bucket_key=key,
dest_bucket_name=bucket,
dest_bucket_key=prefix + "/latest.zarr/" + key.split(zarrs[0])[-1],
)

7 changes: 7 additions & 0 deletions terraform/modules/services/airflow/eb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ resource "aws_elastic_beanstalk_environment" "eb-api-env" {
resource = ""
}

setting {
namespace = "aws:elasticbeanstalk:application:environment"
name = "AWS_DEFAULT_REGION"
value = var.aws-region
resource = ""
}

setting {
namespace = "aws:ec2:vpc"
name = "VPCId"
Expand Down

0 comments on commit 9319547

Please sign in to comment.