Skip to content

Commit

Permalink
Add S3 event bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
amarouane-ABDELHAK committed Oct 29, 2024
1 parent 305dd43 commit a619098
Show file tree
Hide file tree
Showing 45 changed files with 3,670 additions and 264 deletions.

This file was deleted.

This file was deleted.

203 changes: 0 additions & 203 deletions infrastructure/s3_event_bridge_lambda.tf

This file was deleted.

Empty file added sm2a/dags/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions sm2a/dags/example_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
import time

import pendulum
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator
from airflow.operators.python import PythonOperator


def log_task(text: str):
logging.info(text)


def discover_from_cmr_task(text):
log_task(text)


def discover_from_s3_task(text):
log_task("I am discovering")
time.sleep(1)
log_task("Done discovering")
log_task(text)


def move_files_to_maap_store_task(text):
log_task("I am moving files")
time.sleep(3)
log_task("Done moving files")
log_task(text)


def generate_cmr_metadata_task(text):
log_task(text)


def push_to_cmr_task(text):
log_task(text)


with DAG(
dag_id="example_etl_flow_test",
start_date=pendulum.today("UTC").add(days=-1),
schedule_interval=None,
tags=["example"],
) as dag:

start = EmptyOperator(task_id="start", dag=dag)

discover_from_cmr = PythonOperator(
task_id="discover_from_cmr",
python_callable=discover_from_cmr_task,
op_kwargs={"text": "Discover from CMR"},
dag=dag,
)

discover_from_s3 = PythonOperator(
task_id="discover_from_s3",
python_callable=discover_from_s3_task,
op_kwargs={"text": "Discover from S3"},
dag=dag,
)

move_files_to_maap_store = PythonOperator(
task_id="move_files_to_maap_store",
python_callable=move_files_to_maap_store_task,
op_kwargs={"text": "Moving Files to MAAP store"},
dag=dag,
)

generate_cmr_metadata = PythonOperator(
task_id="generate_cmr_metadata",
python_callable=generate_cmr_metadata_task,
op_kwargs={"text": "Generate CMR metadata"},
dag=dag,
)

push_to_cmr = PythonOperator(
task_id="push_to_cmr",
python_callable=push_to_cmr_task,
op_kwargs={"text": "Push to CMR"},
dag=dag,
)

end = EmptyOperator(task_id="end", dag=dag)

start >> discover_from_cmr

start >> discover_from_s3 >> move_files_to_maap_store
(
[discover_from_cmr, move_files_to_maap_store]
>> generate_cmr_metadata
>> push_to_cmr
>> end
)
Loading

0 comments on commit a619098

Please sign in to comment.