-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathexample.py
55 lines (44 loc) · 1.32 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from src.example_module import example_function
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.now(),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"catchup": False,
"max_active_runs": 1
}
dag = DAG("example", default_args=default_args, schedule_interval=timedelta(1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
dag=dag,
)
t4 = PythonOperator(
task_id="python_code",
python_callable=example_function,
dag=dag
)
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t1)