Replies: 4 comments 2 replies
-
Not possible. Not without modifying manually database and code of Airflow. You should rather run whatever code you need to run manually if you need to do it. |
Beta Was this translation helpful? Give feedback.
-
Would it be possible with a plugin? I'd like to get to know the code better anyway, this could be a good exercise. |
Beta Was this translation helpful? Give feedback.
-
I looked a bit at the code/db. The simplest/dirty way to do it would be to update the conf field in DagRun and then clear the task. It would be sufficient to be able to edit a DagRun conf in the UI or API. I guess this feature request is not very popular? |
Beta Was this translation helpful? Give feedback.
-
I'm late but for those who still find a way to work around. Using BranchPythonOperator will get tasks you want to run and skip the rest. This is an example from datetime import datetime, timedelta
import time
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.models.param import Param
FACT_TABLES = ['table1', 'table2', 'table3']
def process_table(**context):
task_id = context['task_instance'].task_id
table_name = task_id.replace("process_", "")
params = context['params']
time.sleep(5)
print(f"Processing {table_name} with date: {params['process_date']}")
def select_tables(**context):
dag_conf = context['dag_run'].conf
if not dag_conf or not dag_conf.get('tables'):
return [f'process_{table}' for table in FACT_TABLES]
return [f'process_{table}' for table in dag_conf.get('tables')]
default_args = {
'start_date': datetime(2024, 1, 1),
}
with DAG(
'selective_table_processing',
default_args=default_args,
schedule_interval=None, # Manual trigger
params={
'process_date': Param(
default=(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
type='string'
)
}
) as dag:
select_tables_task = BranchPythonOperator(
task_id='select_tables',
python_callable=select_tables,
provide_context=True
)
tasks = []
for table in FACT_TABLES:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
provide_context=True,
dag=dag
)
tasks.append(task)
select_tables_task >> tasks On UI, click Trigger DAG and put this into Configuration JSON
Click trigger, you see task |
Beta Was this translation helpful? Give feedback.
-
Hello, is it possible to rerun a failed task in a dag but with different params?
I've looked around and I don't think is possible in airflow. You can only rerun the full dag with different params but not rerun just one task in a past DagRun with different parameters.
Is not this something desirable? Any suggestion on how this could be achieved?
Beta Was this translation helpful? Give feedback.
All reactions