diff --git a/dags/elsevier/elsevier_file_processing.py b/dags/elsevier/elsevier_file_processing.py index 258a8f6f..93db2efc 100644 --- a/dags/elsevier/elsevier_file_processing.py +++ b/dags/elsevier/elsevier_file_processing.py @@ -11,6 +11,7 @@ ) from elsevier.parser import ElsevierParser from elsevier.repository import ElsevierRepository +from executor_config import kubernetes_executor_config from inspire_utils.record import get_value from structlog import get_logger @@ -43,20 +44,20 @@ def elsevier_process_file(): s3_client = ElsevierRepository() - @task() + @task(executor_config=kubernetes_executor_config) def parse(**kwargs): xml_path = kwargs["params"]["file_name"] xml_content_bytes = s3_client.get_by_id(xml_path) kwargs["params"]["file_content"] = xml_content_bytes return parse_elsevier(**kwargs) - @task() + @task(executor_config=kubernetes_executor_config) def enhance(parsed_file): if parsed_file: return parsed_file and enhance_elsevier(parsed_file) raise EmptyOutputFromPreviousTask("parse_metadata") - @task() + @task(executor_config=kubernetes_executor_config) def populate_files(parsed_file): if "files" not in parsed_file: logger.info("No files to populate") @@ -74,17 +75,17 @@ def populate_files(parsed_file): logger.info("Files populated", files=parsed_file["files"]) return parsed_file - @task() + @task(executor_config=kubernetes_executor_config) def enrich(enhanced_file): if enhanced_file: return enrich_elsevier(enhanced_file) raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata") - @task() + @task(executor_config=kubernetes_executor_config) def save_to_s3(enriched_file): upload_json_to_s3(json_record=enriched_file, repo=s3_client) - @task() + @task(executor_config=kubernetes_executor_config) def create_or_update(enriched_file): create_or_update_article(enriched_file) diff --git a/dags/elsevier/elsevier_pull_sftp.py b/dags/elsevier/elsevier_pull_sftp.py index e0e612cd..e9b539c1 100644 --- a/dags/elsevier/elsevier_pull_sftp.py +++ b/dags/elsevier/elsevier_pull_sftp.py @@ -5,6 +5,7 @@ from elsevier.repository import ElsevierRepository from elsevier.sftp_service import ElsevierSFTPService from elsevier.trigger_file_processing import trigger_file_processing_elsevier +from executor_config import kubernetes_executor_config from structlog import get_logger @@ -20,7 +21,7 @@ def elsevier_pull_sftp(): logger = get_logger().bind(class_name="elsevier_pull_sftp") - @task() + @task(executor_config=kubernetes_executor_config) def migrate_from_ftp( sftp = ElsevierSFTPService(), repo = ElsevierRepository(), @@ -41,7 +42,7 @@ def migrate_from_ftp( sftp, repo, logger, publisher="elsevier", **kwargs ) - @task() + @task(executor_config=kubernetes_executor_config) def trigger_file_processing( repo = ElsevierRepository(), filenames=None, diff --git a/dags/executor_config.py b/dags/executor_config.py new file mode 100644 index 00000000..ec944b79 --- /dev/null +++ b/dags/executor_config.py @@ -0,0 +1,17 @@ +from kubernetes import client as k8s + +kubernetes_executor_config = { + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + resources=client.V1ResourceRequirements( + requests={"memory": "1500Mi"}, + limits={"memory": "2Gi"}, + ) + ) + ], + ) + ), +} \ No newline at end of file diff --git a/requirements-airflow.txt b/requirements-airflow.txt index e562a41c..bea8a315 100644 --- a/requirements-airflow.txt +++ b/requirements-airflow.txt @@ -1,2 +1,2 @@ -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.3/constraints-3.10.txt -apache-airflow[celery, postgres]==2.8.3 +apache-airflow[celery, postgres, cncf.kubernetes]==2.8.3