From 73cfd572149a54ebbc5193e1e8872b22f75aba2c Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Mon, 17 Feb 2025 11:22:50 -0600 Subject: [PATCH 1/2] Fix asset backfill subset logging Summary: AssetGraphSubset.empty() returns an empty subset. What we wanted here was a boolean that tells you whether the subset was empty. This was (in master) causing asset backfills to always log that there was nothing to request, even if they then went on to request things Test Plan: New test case, Launch an asset backfill and view the logs, verify that output now includes the asset partitions being requested. --- .../dagster/dagster/_core/definitions/asset_graph_subset.py | 4 ++++ .../dagster/dagster/_core/execution/asset_backfill.py | 4 ++-- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 3288537dcfb66..ea21840dab873 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -61,6 +61,10 @@ def num_partitions_and_non_partitioned_assets(self) -> int: len(subset) for subset in self.partitions_subsets_by_asset_key.values() ) + @property + def is_empty(self) -> bool: + return len(self.asset_keys) == 0 + def get_asset_subset( self, asset_key: AssetKey, asset_graph: BaseAssetGraph ) -> SerializableEntitySubset[AssetKey]: diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 0e347bb8b01bd..1bb30465f129f 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1492,7 +1492,7 @@ def execute_asset_backfill_iteration_inner( ) logger.info( f"Assets materialized since last tick:\n{_asset_graph_subset_to_str(materialized_since_last_tick, asset_graph)}" - if not materialized_since_last_tick.empty + if not materialized_since_last_tick.is_empty else "No relevant assets materialized since last tick." ) @@ -1538,7 +1538,7 @@ def execute_asset_backfill_iteration_inner( logger.info( f"Asset partitions to request:\n{_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}" - if not asset_subset_to_request.empty + if not asset_subset_to_request.is_empty else "No asset partitions to request." ) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 9b84ec028ddaa..bd7e25bda3db0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -2902,6 +2902,11 @@ def test_asset_backfill_logging(caplog, instance, workspace_context): assert "DefaultPartitionsSubset(subset={'foo_b'})" in logs assert "latest_storage_id=None" in logs assert "AssetBackfillData" in logs + assert ( + """Asset partitions to request: +- asset_a: {foo_a}""" + in logs + ) def test_backfill_with_title_and_description( From dedccbf5a600a4b78bbae5eabfc9a2fc200bb739 Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Mon, 17 Feb 2025 19:41:04 -0600 Subject: [PATCH 2/2] Detect when a backfill has stopped producing runs but some of its target subset was never materialized and fail the backfill --- .../dagster/_core/execution/asset_backfill.py | 99 +++++++++---------- .../execution_tests/test_asset_backfill.py | 8 +- .../daemon_tests/test_backfill.py | 12 ++- 3 files changed, 57 insertions(+), 62 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 1bb30465f129f..1ceb4394aebf3 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -53,7 +53,6 @@ ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, PARTITION_NAME_TAG, - WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext @@ -63,6 +62,7 @@ if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill + from dagster._core.storage.dagster_run import DagsterRun def get_asset_backfill_run_chunk_size(): @@ -156,19 +156,14 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool): return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots) - def all_targeted_partitions_have_materialization_status(self) -> bool: + def get_targeted_partitions_without_materialization_status(self) -> AssetGraphSubset: """The asset backfill is complete when all runs to be requested have finished (success, failure, or cancellation). Since the AssetBackfillData object stores materialization states per asset partition, we can use the materialization states and whether any runs for the backfill are not finished to determine if the backfill is complete. We want the daemon to continue to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ - return ( - ( - self.materialized_subset | self.failed_and_downstream_subset - ).num_partitions_and_non_partitioned_assets - == self.target_subset.num_partitions_and_non_partitioned_assets - ) + return self.target_subset - (self.materialized_subset | self.failed_and_downstream_subset) def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool: return ( @@ -912,58 +907,31 @@ def _check_validity_and_deserialize_asset_backfill_data( return asset_backfill_data -def backfill_is_complete( - backfill_id: str, - backfill_data: AssetBackfillData, - instance: DagsterInstance, +def backfill_runs_are_complete( logger: logging.Logger, + backfill_runs: Sequence["DagsterRun"], ): - """A backfill is complete when: - 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). - 2. there are no in progress runs for the backfill. - 3. there are no failed runs that will result in an automatic retry, but have not yet been retried. + """Checks if a backfill has any more runs using the following conditions: + 1. there are no in progress runs for the backfill. + 2. there are no failed runs that will result in an automatic retry, but have not yet been retried. Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we - cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are - in progress. Condition 3 guards against a race condition where a failed run could be automatically retried - but it was not added into the queue in time to be caught by condition 2. + cannot materialize it because of a failed dependency. Condition 1 ensures that no retries of failed runs are + in progress. Condition 2 guards against a race condition where a failed run could be automatically retried + but it was not added into the queue in time to be caught by condition 1. Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the daemon continues to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ - # Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill - # is not complete - if not backfill_data.all_targeted_partitions_have_materialization_status(): - logger.info( - "Not all targeted asset partitions have a materialization status. Backfill is still in progress." - ) - return False - # Condition 2 - if there are in progress runs for the backfill, the backfill is not complete - if ( - len( - instance.get_run_ids( - filters=RunsFilter( - statuses=NOT_FINISHED_STATUSES, - tags={BACKFILL_ID_TAG: backfill_id}, - ), - limit=1, - ) - ) - > 0 - ): + # Condition 1 - if there are in progress runs for the backfill, the backfill is not complete + if any(run.status in NOT_FINISHED_STATUSES for run in backfill_runs): logger.info("Backfill has in progress runs. Backfill is still in progress.") return False - # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete + + # Condition 2 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete runs_waiting_to_retry = [ - run.run_id - for run in instance.get_runs( - filters=RunsFilter( - tags={BACKFILL_ID_TAG: backfill_id, WILL_RETRY_TAG: "true"}, - statuses=[DagsterRunStatus.FAILURE], - ) - ) - if run.is_complete_and_waiting_to_retry + run.run_id for run in backfill_runs if run.is_complete_and_waiting_to_retry ] if len(runs_waiting_to_retry) > 0: num_runs_to_log = 20 @@ -1023,6 +991,13 @@ def execute_asset_backfill_iteration( ) if backfill.status == BulkActionStatus.REQUESTED: + backfill_runs = instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill.backfill_id}, + statuses=NOT_FINISHED_STATUSES + [DagsterRunStatus.FAILURE], + ) + ) + if backfill.submitting_run_requests: # interrupted in the middle of executing run requests - re-construct the in-progress iteration result logger.warn( @@ -1097,12 +1072,32 @@ def execute_asset_backfill_iteration( updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) - if backfill_is_complete( - backfill_id=backfill.backfill_id, - backfill_data=updated_backfill_data, - instance=instance, + partitions_without_materialization_status = ( + updated_backfill_data.get_targeted_partitions_without_materialization_status() + ) + + asset_backfill_data_unchanged = ( + updated_backfill_data.materialized_subset + == previous_asset_backfill_data.materialized_subset + and updated_backfill_data.requested_subset + == previous_asset_backfill_data.requested_subset + and updated_backfill_data.failed_and_downstream_subset + == previous_asset_backfill_data.failed_and_downstream_subset + ) + + if ( + partitions_without_materialization_status.is_empty or asset_backfill_data_unchanged + ) and backfill_runs_are_complete( logger=logger, + backfill_runs=backfill_runs, ): + if not partitions_without_materialization_status.is_empty: + raise DagsterBackfillFailedError( + "Backfill stopped producing runs, but the following partitions were never materialized or failed, " + "possibly because the runs that were supposed to materialize them succeeded without creating the expected materializations: " + f"{_asset_graph_subset_to_str(partitions_without_materialization_status, asset_graph)}." + ) + if ( updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets > 0 diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 9a819d7508aa2..dc5bb4eddf22d 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -53,7 +53,6 @@ AssetBackfillData, AssetBackfillIterationResult, AssetBackfillStatus, - backfill_is_complete, execute_asset_backfill_iteration_inner, get_canceling_asset_backfill_iteration_data, ) @@ -676,12 +675,7 @@ def run_backfill_to_completion( fail_and_downstream_asset_graph_subset.iterate_asset_partitions() ) - while not backfill_is_complete( - backfill_id=backfill_id, - backfill_data=backfill_data, - instance=instance, - logger=logging.getLogger("fake_logger"), - ): + while not backfill_data.get_targeted_partitions_without_materialization_status().is_empty: iteration_count += 1 result1 = execute_asset_backfill_iteration_consume_generator( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index bd7e25bda3db0..be0fa63229d47 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3266,6 +3266,11 @@ def test_asset_backfill_not_complete_until_retries_complete( assert backfill.status == BulkActionStatus.REQUESTED list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + assert instance.get_runs_count() == 3 wait_for_all_runs_to_start(instance, timeout=30) assert instance.get_runs_count() == 3 @@ -3296,8 +3301,9 @@ def test_asset_backfill_not_complete_until_retries_complete( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.asset_backfill_data - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty # manually mark the run as successful to show that the backfill will be marked as complete # since there are no in progress runs @@ -3373,7 +3379,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.asset_backfill_data - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty assert backfill.status == BulkActionStatus.REQUESTED # automatic retries wont get automatically run in test environment, so we run the function manually @@ -3443,7 +3449,7 @@ def test_asset_backfill_fails_if_retries_fail( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.asset_backfill_data - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty assert backfill.status == BulkActionStatus.REQUESTED runs = instance.get_run_records()