Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Detect when a backfill has stopped producing runs but some of its target subset was never materialized and fail the backfill #27885

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def num_partitions_and_non_partitioned_assets(self) -> int:
len(subset) for subset in self.partitions_subsets_by_asset_key.values()
)

@property
def is_empty(self) -> bool:
return len(self.asset_keys) == 0

def get_asset_subset(
self, asset_key: AssetKey, asset_graph: BaseAssetGraph
) -> SerializableEntitySubset[AssetKey]:
Expand Down
103 changes: 49 additions & 54 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
PARTITION_NAME_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id, toposort
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
Expand All @@ -63,6 +62,7 @@

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.storage.dagster_run import DagsterRun


def get_asset_backfill_run_chunk_size():
Expand Down Expand Up @@ -156,19 +156,14 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def all_targeted_partitions_have_materialization_status(self) -> bool:
def get_targeted_partitions_without_materialization_status(self) -> AssetGraphSubset:
"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
per asset partition, we can use the materialization states and whether any runs for the backfill are
not finished to determine if the backfill is complete. We want the daemon to continue to update
the backfill data until all runs have finished in order to display the final partition statuses in the UI.
"""
return (
(
self.materialized_subset | self.failed_and_downstream_subset
).num_partitions_and_non_partitioned_assets
== self.target_subset.num_partitions_and_non_partitioned_assets
)
return self.target_subset - (self.materialized_subset | self.failed_and_downstream_subset)

def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool:
return (
Expand Down Expand Up @@ -912,58 +907,31 @@ def _check_validity_and_deserialize_asset_backfill_data(
return asset_backfill_data


def backfill_is_complete(
backfill_id: str,
backfill_data: AssetBackfillData,
instance: DagsterInstance,
def backfill_runs_are_complete(
logger: logging.Logger,
backfill_runs: Sequence["DagsterRun"],
):
"""A backfill is complete when:
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry, but have not yet been retried.
"""Checks if a backfill has any more runs using the following conditions:
1. there are no in progress runs for the backfill.
2. there are no failed runs that will result in an automatic retry, but have not yet been retried.
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried
but it was not added into the queue in time to be caught by condition 2.
cannot materialize it because of a failed dependency. Condition 1 ensures that no retries of failed runs are
in progress. Condition 2 guards against a race condition where a failed run could be automatically retried
but it was not added into the queue in time to be caught by condition 1.
Comment on lines 918 to +921
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring has a small error. The sentence "Condition 1 ensures that no retries of failed runs are in progress" should be "Condition 2 ensures that no retries of failed runs are in progress", since this matches the actual conditions described in the docstring above.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
# is not complete
if not backfill_data.all_targeted_partitions_have_materialization_status():
logger.info(
"Not all targeted asset partitions have a materialization status. Backfill is still in progress."
)
return False
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
if (
len(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
tags={BACKFILL_ID_TAG: backfill_id},
),
limit=1,
)
)
> 0
):
# Condition 1 - if there are in progress runs for the backfill, the backfill is not complete
if any(run.status in NOT_FINISHED_STATUSES for run in backfill_runs):
logger.info("Backfill has in progress runs. Backfill is still in progress.")
return False
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete

# Condition 2 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
runs_waiting_to_retry = [
run.run_id
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id, WILL_RETRY_TAG: "true"},
statuses=[DagsterRunStatus.FAILURE],
)
)
if run.is_complete_and_waiting_to_retry
run.run_id for run in backfill_runs if run.is_complete_and_waiting_to_retry
]
if len(runs_waiting_to_retry) > 0:
num_runs_to_log = 20
Expand Down Expand Up @@ -1023,6 +991,13 @@ def execute_asset_backfill_iteration(
)

if backfill.status == BulkActionStatus.REQUESTED:
backfill_runs = instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill.backfill_id},
statuses=NOT_FINISHED_STATUSES + [DagsterRunStatus.FAILURE],
)
)

if backfill.submitting_run_requests:
# interrupted in the middle of executing run requests - re-construct the in-progress iteration result
logger.warn(
Expand Down Expand Up @@ -1097,12 +1072,32 @@ def execute_asset_backfill_iteration(

updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if backfill_is_complete(
backfill_id=backfill.backfill_id,
backfill_data=updated_backfill_data,
instance=instance,
partitions_without_materialization_status = (
updated_backfill_data.get_targeted_partitions_without_materialization_status()
)

asset_backfill_data_unchanged = (
updated_backfill_data.materialized_subset
== previous_asset_backfill_data.materialized_subset
and updated_backfill_data.requested_subset
== previous_asset_backfill_data.requested_subset
and updated_backfill_data.failed_and_downstream_subset
== previous_asset_backfill_data.failed_and_downstream_subset
)

if (
partitions_without_materialization_status.is_empty or asset_backfill_data_unchanged
) and backfill_runs_are_complete(
logger=logger,
backfill_runs=backfill_runs,
):
if not partitions_without_materialization_status.is_empty:
raise DagsterBackfillFailedError(
"Backfill stopped producing runs, but the following partitions were never materialized or failed, "
"possibly because the runs that were supposed to materialize them succeeded without creating the expected materializations: "
f"{_asset_graph_subset_to_str(partitions_without_materialization_status, asset_graph)}."
)
Comment on lines +1088 to +1099
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's probably a less confusing way to structure this and separate out the two cases

case 1: backfill is not making progress, there are requested partitions not in a terminal state, and there are no more in progress or retrying runs - assume its stuck and fail the backfill

case 2: all requested partitions have been materialized or failed and there are no in progress runs - move to a successful or failed state depending on whether there are any failed partitions


if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
Expand Down Expand Up @@ -1492,7 +1487,7 @@ def execute_asset_backfill_iteration_inner(
)
logger.info(
f"Assets materialized since last tick:\n{_asset_graph_subset_to_str(materialized_since_last_tick, asset_graph)}"
if not materialized_since_last_tick.empty
if not materialized_since_last_tick.is_empty
else "No relevant assets materialized since last tick."
)

Expand Down Expand Up @@ -1538,7 +1533,7 @@ def execute_asset_backfill_iteration_inner(

logger.info(
f"Asset partitions to request:\n{_asset_graph_subset_to_str(asset_subset_to_request, asset_graph)}"
if not asset_subset_to_request.empty
if not asset_subset_to_request.is_empty
else "No asset partitions to request."
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
AssetBackfillData,
AssetBackfillIterationResult,
AssetBackfillStatus,
backfill_is_complete,
execute_asset_backfill_iteration_inner,
get_canceling_asset_backfill_iteration_data,
)
Expand Down Expand Up @@ -676,12 +675,7 @@ def run_backfill_to_completion(
fail_and_downstream_asset_graph_subset.iterate_asset_partitions()
)

while not backfill_is_complete(
backfill_id=backfill_id,
backfill_data=backfill_data,
instance=instance,
logger=logging.getLogger("fake_logger"),
):
while not backfill_data.get_targeted_partitions_without_materialization_status().is_empty:
iteration_count += 1

result1 = execute_asset_backfill_iteration_consume_generator(
Expand Down
17 changes: 14 additions & 3 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -2902,6 +2902,11 @@ def test_asset_backfill_logging(caplog, instance, workspace_context):
assert "DefaultPartitionsSubset(subset={'foo_b'})" in logs
assert "latest_storage_id=None" in logs
assert "AssetBackfillData" in logs
assert (
"""Asset partitions to request:
- asset_a: {foo_a}"""
in logs
)


def test_backfill_with_title_and_description(
Expand Down Expand Up @@ -3261,6 +3266,11 @@ def test_asset_backfill_not_complete_until_retries_complete(
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))

backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
Expand Down Expand Up @@ -3291,8 +3301,9 @@ def test_asset_backfill_not_complete_until_retries_complete(
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()

assert backfill.status == BulkActionStatus.REQUESTED
assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty

# manually mark the run as successful to show that the backfill will be marked as complete
# since there are no in progress runs
Expand Down Expand Up @@ -3368,7 +3379,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty
assert backfill.status == BulkActionStatus.REQUESTED

# automatic retries wont get automatically run in test environment, so we run the function manually
Expand Down Expand Up @@ -3438,7 +3449,7 @@ def test_asset_backfill_fails_if_retries_fail(
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.asset_backfill_data.get_targeted_partitions_without_materialization_status().is_empty
assert backfill.status == BulkActionStatus.REQUESTED

runs = instance.get_run_records()
Expand Down