From 9849ecb2408e57e69d955cf71f34093440ce2a56 Mon Sep 17 00:00:00 2001
From: ErnestaP <ernesta.petraityte@yahoo.com>
Date: Tue, 13 Aug 2024 17:41:36 +0200
Subject: [PATCH] Elsevier: added kubernetes executor config

---
 dags/elsevier/elsevier_file_processing.py | 13 +++++++------
 dags/elsevier/elsevier_pull_sftp.py       |  5 +++--
 dags/executor_config.py                   | 17 +++++++++++++++++
 requirements-airflow.txt                  |  2 +-
 4 files changed, 28 insertions(+), 9 deletions(-)
 create mode 100644 dags/executor_config.py

diff --git a/dags/elsevier/elsevier_file_processing.py b/dags/elsevier/elsevier_file_processing.py
index 258a8f6f..93db2efc 100644
--- a/dags/elsevier/elsevier_file_processing.py
+++ b/dags/elsevier/elsevier_file_processing.py
@@ -11,6 +11,7 @@
 )
 from elsevier.parser import ElsevierParser
 from elsevier.repository import ElsevierRepository
+from executor_config import kubernetes_executor_config
 from inspire_utils.record import get_value
 from structlog import get_logger
 
@@ -43,20 +44,20 @@ def elsevier_process_file():
 
     s3_client = ElsevierRepository()
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def parse(**kwargs):
         xml_path = kwargs["params"]["file_name"]
         xml_content_bytes = s3_client.get_by_id(xml_path)
         kwargs["params"]["file_content"] = xml_content_bytes
         return parse_elsevier(**kwargs)
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def enhance(parsed_file):
         if parsed_file:
             return parsed_file and enhance_elsevier(parsed_file)
         raise EmptyOutputFromPreviousTask("parse_metadata")
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def populate_files(parsed_file):
         if "files" not in parsed_file:
             logger.info("No files to populate")
@@ -74,17 +75,17 @@ def populate_files(parsed_file):
         logger.info("Files populated", files=parsed_file["files"])
         return parsed_file
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def enrich(enhanced_file):
         if enhanced_file:
             return enrich_elsevier(enhanced_file)
         raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata")
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def save_to_s3(enriched_file):
         upload_json_to_s3(json_record=enriched_file, repo=s3_client)
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def create_or_update(enriched_file):
         create_or_update_article(enriched_file)
 
diff --git a/dags/elsevier/elsevier_pull_sftp.py b/dags/elsevier/elsevier_pull_sftp.py
index e0e612cd..e9b539c1 100644
--- a/dags/elsevier/elsevier_pull_sftp.py
+++ b/dags/elsevier/elsevier_pull_sftp.py
@@ -5,6 +5,7 @@
 from elsevier.repository import ElsevierRepository
 from elsevier.sftp_service import ElsevierSFTPService
 from elsevier.trigger_file_processing import trigger_file_processing_elsevier
+from executor_config import kubernetes_executor_config
 from structlog import get_logger
 
 
@@ -20,7 +21,7 @@
 def elsevier_pull_sftp():
     logger = get_logger().bind(class_name="elsevier_pull_sftp")
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def migrate_from_ftp(
         sftp = ElsevierSFTPService(),
         repo = ElsevierRepository(),
@@ -41,7 +42,7 @@ def migrate_from_ftp(
                 sftp, repo, logger, publisher="elsevier", **kwargs
             )
 
-    @task()
+    @task(executor_config=kubernetes_executor_config)
     def trigger_file_processing(
         repo = ElsevierRepository(),
         filenames=None,
diff --git a/dags/executor_config.py b/dags/executor_config.py
new file mode 100644
index 00000000..ec944b79
--- /dev/null
+++ b/dags/executor_config.py
@@ -0,0 +1,17 @@
+from kubernetes import client as k8s
+
+kubernetes_executor_config = {
+    "pod_override": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    resources=client.V1ResourceRequirements(
+                        requests={"memory": "1500Mi"},
+                        limits={"memory": "2Gi"},
+                    )
+                )
+            ],
+        )
+    ),
+}
\ No newline at end of file
diff --git a/requirements-airflow.txt b/requirements-airflow.txt
index e562a41c..bea8a315 100644
--- a/requirements-airflow.txt
+++ b/requirements-airflow.txt
@@ -1,2 +1,2 @@
 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.3/constraints-3.10.txt
-apache-airflow[celery, postgres]==2.8.3
+apache-airflow[celery, postgres, cncf.kubernetes]==2.8.3