diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 925acfc16f0c1..9a5c1b563ce4d 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -821,6 +821,8 @@ def unmap(self, resolve: None | Mapping[str, Any] | tuple[Context, Session]) -> from airflow.serialization.serialized_objects import SerializedBaseOperator op = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True) + for partial_attr, value in self.partial_kwargs.items(): + setattr(op, partial_attr, value) SerializedBaseOperator.populate_operator(op, self.operator_class) if self.dag is not None: # For Mypy; we only serialize tasks in a DAG so the check always satisfies. SerializedBaseOperator.set_task_dag_references(op, self.dag) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d10984556ff23..f04ff3e2568dd 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -2529,7 +2529,11 @@ def test_operator_expand_deserialized_unmap(): ser_normal = BaseSerialization.serialize(normal) deser_normal = BaseSerialization.deserialize(ser_normal) deser_normal.dag = None - assert deser_mapped.unmap(None) == deser_normal + unmapped_deser_mapped = deser_mapped.unmap(None) + + assert type(unmapped_deser_mapped) is type(deser_normal) is SerializedBaseOperator + assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a" + assert unmapped_deser_mapped.executor_config == deser_normal.executor_config == {"a": "b"} @pytest.mark.db_test