Skip to content

Commit

Permalink
Merge branch 'main' into india-mo-global
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Nov 20, 2024
2 parents 2a52c24 + 1b167f0 commit b369bc2
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 27 deletions.
59 changes: 50 additions & 9 deletions terraform/india/development/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
# 3.0 - Secret containing environment variables for the NWP consumer
# 3.1 - Secret containing environment variables for the Satellite consumer
# 3.2 - Secret containing HF read access
# 4.0 - ECS task definition for the NWP consumer
# 4.0 - ECS task definition for the ECMWF consumer
# 4.1 - ECS task definition for the GFS consumer
# 4.2 - ECS task definition for Collection RUVNL data
# 4.3 - Satellite Consumer
# 4.4 - ECS task definition for the Forecast - Client RU
# 4.5 - ECS task definition for the Forecast - Client AD
# 4.2 - ECS task definition for the MetOffice consumer
# 4.3 - ECS task definition for Collection RUVNL data
# 4.4 - Satellite Consumer
# 4.5 - ECS task definition for the Forecast - Client RU
# 4.6 - ECS task definition for the Forecast - Client AD
# 5.0 - Airflow EB Instance
# 5.1 - India API EB Instance
# 5.2 - India Analysis Dashboard
Expand Down Expand Up @@ -198,9 +199,49 @@ module "nwp_consumer_gfs_live_ecs_task" {
]
}

# 4.2
module "nwp-consumer-metoffice-live-ecs-task" {
source = "../../modules/services/ecs_task"

ecs-task_name = "nwp-consumer-metoffice-india"
ecs-task_type = "consumer"
ecs-task_execution_role_arn = module.ecs-cluster.ecs_task_execution_role_arn
ecs-task_size = {
cpu = 512
memory = 1024
}

# 4.2
aws-region = var.region
aws-environment = local.environment

s3-buckets = [
{
id : module.s3-nwp-bucket.bucket_id
access_policy_arn : module.s3-nwp-bucket.write_policy_arn
}
]

container-env_vars = [
{ "name" : "LOGLEVEL", "value" : "INFO" },
{ "name" : "METOFFICE_ORDER_ID", "value" : "india-11params-54steps" },
{ "name" : "MODEL_REPOSITORY", "value" : "metoffice-datahub" },
{ "name" : "CONCURRENCY", "value" : "false" },
{ "name" : "ZARRDIR", "value" : format("s3://%s/metoffice/data", module.s3-nwp-bucket.bucket_id) },
{ "name" : "SENTRY_DSN", "value" : var.sentry_dsn },
]
container-secret_vars = [
{
secret_policy_arn: aws_secretsmanager_secret.nwp_consumer_secret.arn,
values: ["METOFFICE_API_KEY"],
}
]
container-tag = "devsjc-major-refactor"
container-name = "openclimatefix/nwp-consumer"
container-command = ["consume"]
}


# 4.3
module "ruvnl_consumer_ecs" {
source = "../../modules/services/ecs_task"

Expand Down Expand Up @@ -234,7 +275,7 @@ module "ruvnl_consumer_ecs" {
]
}

# 4.3 - Satellite Consumer
# 4.4 - Satellite Consumer
module "satellite_consumer_ecs" {
source = "../../modules/services/ecs_task"

Expand Down Expand Up @@ -277,7 +318,7 @@ module "satellite_consumer_ecs" {



# 4.4 - Forecast - Client RU
# 4.5 - Forecast - Client RU
module "forecast" {
source = "../../modules/services/forecast_generic"

Expand Down Expand Up @@ -317,7 +358,7 @@ module "forecast" {
}


# 4.5 - Forecast - Client AD
# 4.6 - Forecast - Client AD
module "forecast-ad" {
source = "../../modules/services/ecs_task"

Expand Down
45 changes: 27 additions & 18 deletions terraform/modules/services/airflow/dags/india/nwp-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import on_failure_callback
from utils.s3 import determine_latest_zarr

default_args = {
'owner': 'airflow',
Expand All @@ -24,7 +25,13 @@

region = 'india'

with DAG(f'{region}-nwp-consumer', schedule_interval="0 * * * *", default_args=default_args, concurrency=10, max_active_tasks=10) as dag:
with DAG(
f'{region}-nwp-consumer',
schedule_interval="0 * * * *",
default_args=default_args,
concurrency=10,
max_active_tasks=10,
) as dag:
dag.doc_md = "Get NWP data"

latest_only = LatestOnlyOperator(task_id="latest_only")
Expand All @@ -45,7 +52,6 @@
task_concurrency=10,
)


nwp_consumer_gfs = EcsRunTaskOperator(
task_id=f'{region}-nwp-consumer-gfs-india',
task_definition='nwp-consumer-gfs-india',
Expand All @@ -62,23 +68,26 @@
task_concurrency=10,
)

# nwp_consumer_meteomatics = EcsRunTaskOperator(
# task_id=f'{region}-nwp-consumer-meteomatics-india',
# task_definition='nwp-consumer-meteomatics-india',
# cluster=cluster,
# overrides={},
# launch_type="FARGATE",
# network_configuration={
# "awsvpcConfiguration": {
# "subnets": [subnet],
# "securityGroups": [security_group],
# "assignPublicIp": "ENABLED",
# },
# },
# task_concurrency=10,
# )
nwp_consumer_metoffice = EcsRunTaskOperator(
task_id=f'{region}-nwp-consumer-metoffice-india',
task_definition='nwp-consumer-metoffice-india',
cluster=cluster,
overrides={},
launch_type="FARGATE",
network_configuration={
"awsvpcConfiguration": {
"subnets": [subnet],
"securityGroups": [security_group],
"assignPublicIp": "ENABLED",
},
},
task_concurrency=10,
)
rename_zarr_metoffice = determine_latest_zarr.override(
task_id="determine_latest_zarr_metoffice",
)(bucket=f"india-nwp-{env}", prefix="metoffice/data")

latest_only >> nwp_consumer_ecmwf
latest_only >> nwp_consumer_gfs
# latest_only >> nwp_consumer_meteomatics
latest_only >> nwp_consumer_metoffice >> rename_zarr_metoffice

44 changes: 44 additions & 0 deletions terraform/modules/services/airflow/dags/utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.decorators import task

@task(task_id="determine_latest_zarr")
def determine_latest_zarr(bucket: str, prefix: str):
s3hook = S3Hook(aws_conn_id=None) # Use Boto3 default connection strategy
# Get a list of all the non-latest zarrs in the bucket prefix
prefixes = s3hook.list_prefixes(bucket_name=bucket, prefix=prefix + "/", delimiter='/')
zarrs = sorted([
p for p in prefixes if p.endswith('.zarr/') and "latest" not in p
], reverse=True)
# Get the size of the most recent zarr and the latest.zarr zarr
s3bucket = s3hook.get_bucket(bucket_name=bucket)
size_old, size_new = (0, 0)
print(s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/"))
if len(zarrs) == 0:
s3hook.log.info("No non-latest zarrs found in bucket, exiting")
return

for obj in s3bucket.objects.filter(Prefix=zarrs[0]):
size_new += obj.size

if prefix + "/latest.zarr/" in prefixes:
for obj in s3bucket.objects.filter(Prefix=prefix + "/latest.zarr/"):
size_old += obj.size

# If the sizes are different, create a new latest.zarr
if size_old != size_new and size_new > 0:
# Delete the old latest.zarr, if it exists
if prefix + "/latest.zarr/" in prefixes:
s3hook.log.debug(f"Deleting {prefix}/latest.zarr/")
keys_to_delete = s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/")
s3hook.delete_objects(bucket_name=bucket, keys=keys_to_delete)
# Copy the new latest.zarr
s3hook.log.info(f"Copying {zarrs[0]} to {prefix}/latest.zarr/")
source_keys = s3hook.list_keys(bucket_name=bucket, prefix=zarrs[0])
for key in source_keys:
s3hook.copy_object(
source_bucket_name=bucket,
source_bucket_key=key,
dest_bucket_name=bucket,
dest_bucket_key=prefix + "/latest.zarr/" + key.split(zarrs[0])[-1],
)

7 changes: 7 additions & 0 deletions terraform/modules/services/airflow/eb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ resource "aws_elastic_beanstalk_environment" "eb-api-env" {
resource = ""
}

setting {
namespace = "aws:elasticbeanstalk:application:environment"
name = "AWS_DEFAULT_REGION"
value = var.aws-region
resource = ""
}

setting {
namespace = "aws:ec2:vpc"
name = "VPCId"
Expand Down

0 comments on commit b369bc2

Please sign in to comment.