-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Detect when a backfill has stopped producing runs but some of its target subset was never materialized and fail the backfill #27885
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,6 @@ | |
ASSET_PARTITION_RANGE_START_TAG, | ||
BACKFILL_ID_TAG, | ||
PARTITION_NAME_TAG, | ||
WILL_RETRY_TAG, | ||
) | ||
from dagster._core.utils import make_new_run_id, toposort | ||
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext | ||
|
@@ -63,6 +62,7 @@ | |
|
||
if TYPE_CHECKING: | ||
from dagster._core.execution.backfill import PartitionBackfill | ||
from dagster._core.storage.dagster_run import DagsterRun | ||
|
||
|
||
def get_asset_backfill_run_chunk_size(): | ||
|
@@ -156,19 +156,14 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack | |
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool): | ||
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots) | ||
|
||
def all_targeted_partitions_have_materialization_status(self) -> bool: | ||
def get_targeted_partitions_without_materialization_status(self) -> AssetGraphSubset: | ||
"""The asset backfill is complete when all runs to be requested have finished (success, | ||
failure, or cancellation). Since the AssetBackfillData object stores materialization states | ||
per asset partition, we can use the materialization states and whether any runs for the backfill are | ||
not finished to determine if the backfill is complete. We want the daemon to continue to update | ||
the backfill data until all runs have finished in order to display the final partition statuses in the UI. | ||
""" | ||
return ( | ||
( | ||
self.materialized_subset | self.failed_and_downstream_subset | ||
).num_partitions_and_non_partitioned_assets | ||
== self.target_subset.num_partitions_and_non_partitioned_assets | ||
) | ||
return self.target_subset - (self.materialized_subset | self.failed_and_downstream_subset) | ||
|
||
def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool: | ||
return ( | ||
|
@@ -912,58 +907,31 @@ def _check_validity_and_deserialize_asset_backfill_data( | |
return asset_backfill_data | ||
|
||
|
||
def backfill_is_complete( | ||
backfill_id: str, | ||
backfill_data: AssetBackfillData, | ||
instance: DagsterInstance, | ||
def backfill_runs_are_complete( | ||
logger: logging.Logger, | ||
backfill_runs: Sequence["DagsterRun"], | ||
): | ||
"""A backfill is complete when: | ||
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). | ||
2. there are no in progress runs for the backfill. | ||
3. there are no failed runs that will result in an automatic retry, but have not yet been retried. | ||
"""Checks if a backfill has any more runs using the following conditions: | ||
1. there are no in progress runs for the backfill. | ||
2. there are no failed runs that will result in an automatic retry, but have not yet been retried. | ||
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we | ||
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are | ||
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried | ||
but it was not added into the queue in time to be caught by condition 2. | ||
cannot materialize it because of a failed dependency. Condition 1 ensures that no retries of failed runs are | ||
in progress. Condition 2 guards against a race condition where a failed run could be automatically retried | ||
but it was not added into the queue in time to be caught by condition 1. | ||
Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the | ||
daemon continues to update the backfill data until all runs have finished in order to display the | ||
final partition statuses in the UI. | ||
""" | ||
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill | ||
# is not complete | ||
if not backfill_data.all_targeted_partitions_have_materialization_status(): | ||
logger.info( | ||
"Not all targeted asset partitions have a materialization status. Backfill is still in progress." | ||
) | ||
return False | ||
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete | ||
if ( | ||
len( | ||
instance.get_run_ids( | ||
filters=RunsFilter( | ||
statuses=NOT_FINISHED_STATUSES, | ||
tags={BACKFILL_ID_TAG: backfill_id}, | ||
), | ||
limit=1, | ||
) | ||
) | ||
> 0 | ||
): | ||
# Condition 1 - if there are in progress runs for the backfill, the backfill is not complete | ||
if any(run.status in NOT_FINISHED_STATUSES for run in backfill_runs): | ||
logger.info("Backfill has in progress runs. Backfill is still in progress.") | ||
return False | ||
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete | ||
|
||
# Condition 2 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete | ||
runs_waiting_to_retry = [ | ||
run.run_id | ||
for run in instance.get_runs( | ||
filters=RunsFilter( | ||
tags={BACKFILL_ID_TAG: backfill_id, WILL_RETRY_TAG: "true"}, | ||
statuses=[DagsterRunStatus.FAILURE], | ||
) | ||
) | ||
if run.is_complete_and_waiting_to_retry | ||
run.run_id for run in backfill_runs if run.is_complete_and_waiting_to_retry | ||
] | ||
if len(runs_waiting_to_retry) > 0: | ||
num_runs_to_log = 20 | ||
|
@@ -1023,6 +991,13 @@ def execute_asset_backfill_iteration( | |
) | ||
|
||
if backfill.status == BulkActionStatus.REQUESTED: | ||
backfill_runs = instance.get_runs( | ||
filters=RunsFilter( | ||
tags={BACKFILL_ID_TAG: backfill.backfill_id}, | ||
statuses=NOT_FINISHED_STATUSES + [DagsterRunStatus.FAILURE], | ||
) | ||
) | ||
|
||
if backfill.submitting_run_requests: | ||
# interrupted in the middle of executing run requests - re-construct the in-progress iteration result | ||
logger.warn( | ||
|
@@ -1097,12 +1072,32 @@ def execute_asset_backfill_iteration( | |
|
||
updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) | ||
|
||
if backfill_is_complete( | ||
backfill_id=backfill.backfill_id, | ||
backfill_data=updated_backfill_data, | ||
instance=instance, | ||
partitions_without_materialization_status = ( | ||
updated_backfill_data.get_targeted_partitions_without_materialization_status() | ||
) | ||
|
||
asset_backfill_data_unchanged = ( | ||
updated_backfill_data.materialized_subset | ||
== previous_asset_backfill_data.materialized_subset | ||
and updated_backfill_data.requested_subset | ||
== previous_asset_backfill_data.requested_subset | ||
and updated_backfill_data.failed_and_downstream_subset | ||
== previous_asset_backfill_data.failed_and_downstream_subset | ||
) | ||
|
||
if ( | ||
partitions_without_materialization_status.is_empty or asset_backfill_data_unchanged | ||
) and backfill_runs_are_complete( | ||
logger=logger, | ||
backfill_runs=backfill_runs, | ||
): | ||
if not partitions_without_materialization_status.is_empty: | ||
raise DagsterBackfillFailedError( | ||
"Backfill stopped producing runs, but the following partitions were never materialized or failed, " | ||
"possibly because the runs that were supposed to materialize them succeeded without creating the expected materializations: " | ||
f"{_asset_graph_subset_to_str(partitions_without_materialization_status, asset_graph)}." | ||
) | ||
Comment on lines
+1088
to
+1099
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's probably a less confusing way to structure this and separate out the two cases case 1: backfill is not making progress, there are requested partitions not in a terminal state, and there are no more in progress or retrying runs - assume its stuck and fail the backfill case 2: all requested partitions have been materialized or failed and there are no in progress runs - move to a successful or failed state depending on whether there are any failed partitions |
||
|
||
if ( | ||
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets | ||
> 0 | ||
|
@@ -1492,7 +1487,7 @@ def execute_asset_backfill_iteration_inner( | |
) | ||
logger.info( | ||
f"Assets materialized since last tick:\n{_asset_graph_subset_to_str(materialized_since_last_tick, asset_graph)}" | ||
if not materialized_since_last_tick.empty | ||
if not materialized_since_last_tick.is_empty | ||
else "No relevant assets materialized since last tick." | ||
) | ||
|
||
|
@@ -1538,7 +1533,7 @@ def execute_asset_backfill_iteration_inner( | |
|
||
logger.info( | ||
f"Asset partitions to request:\n{_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}" | ||
if not asset_subset_to_request.empty | ||
if not asset_subset_to_request.is_empty | ||
else "No asset partitions to request." | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring has a small error. The sentence "Condition 1 ensures that no retries of failed runs are in progress" should be "Condition 2 ensures that no retries of failed runs are in progress", since this matches the actual conditions described in the docstring above.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.