From abc0ea914005133f19d1759edacf7ee4e46ad5aa Mon Sep 17 00:00:00 2001 From: Drew Meyers Date: Wed, 6 Mar 2024 10:08:53 -0800 Subject: [PATCH] Volume mount testing in dag --- airflow/dags/hello_world.py | 66 ++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/airflow/dags/hello_world.py b/airflow/dags/hello_world.py index 50d0a08..b8971e8 100644 --- a/airflow/dags/hello_world.py +++ b/airflow/dags/hello_world.py @@ -1,11 +1,4 @@ -""" -# DAG Name: Hello World - -# Purpose - -# Usage -""" # noqa: E501 - +import os import time from datetime import datetime @@ -24,13 +17,62 @@ def hello_world(): time.sleep(30) +def write_to_shared_data(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + with open(file_path, "w") as f: + f.write("This is a test file written at " + str(datetime.now()) + "\n") + print(f"Successfully written to {file_path}") + + +def read_from_shared_data(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + try: + with open(file_path, "r") as f: + contents = f.read() + print(f"File contents:\n{contents}") + except FileNotFoundError: + print("File not found. Make sure the file path is correct.") + + +def delete_shared_data_file(): + file_path = "/path/to/shared-task-data/test_file.txt" # Adjust the path as necessary + try: + os.remove(file_path) + print(f"Successfully deleted {file_path}") + except FileNotFoundError: + print("File not found. Make sure the file path is correct.") + + with DAG( - dag_id="hello_world", - doc_md=__doc__, + dag_id="full_workflow", default_args=default_args, schedule=None, is_paused_upon_creation=False, tags=["test"], ) as dag: - hello_world_task = PythonOperator(task_id="hello_world", python_callable=hello_world) - hello_world_task + hello_world_task = PythonOperator( + task_id="hello_world", + python_callable=hello_world, + ) + + write_to_shared_data_task = PythonOperator( + task_id="write_to_shared_data", + python_callable=write_to_shared_data, + ) + + read_from_shared_data_task = PythonOperator( + task_id="read_from_shared_data", + python_callable=read_from_shared_data, + ) + + delete_shared_data_file_task = PythonOperator( + task_id="delete_shared_data_file", + python_callable=delete_shared_data_file, + ) + + ( + hello_world_task + >> write_to_shared_data_task + >> read_from_shared_data_task + >> delete_shared_data_file_task + )