-
Notifications
You must be signed in to change notification settings - Fork 202
Fix: Enviroment & snapshot cleanup order #4228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
02dbe2e
to
89d76c4
Compare
89d76c4
to
14d63c2
Compare
@@ -273,20 +273,35 @@ def unpause_snapshots( | |||
def invalidate_environment(self, name: str) -> None: | |||
self.environment_state.invalidate_environment(name) | |||
|
|||
def get_expired_snapshots( | |||
self, ignore_ttl: bool = False | |||
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the public interface should be clean and only return what external caller cares about, which is the list of cleanup targets.
The only caller that cares about snapshot IDs is the facade itself. I suggest having a common private method which returns a tuple, and then use it in both public methods instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we don't even need a private method, since we have self.snapshot_state.get_expired_snapshots(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see now that you might actually use the snapshot IDs in the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you were right initially, expired_snapshot_ids
is needed in facade.py
but not in context.py
, do we still want to break get_expired_snapshots
apart in 2?
Not sure if we revisited this during our sync after we went over the nuances
return expired_snapshot_ids, cleanup_targets | ||
|
||
def delete_expired_snapshots(self, expired_snapshot_ids: t.Set[SnapshotId]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we even need this method? Why not call delete_snapshots
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced over this, delete_expired_snapshots()
is used as a standalone function in test_state_sync.py
so will leave it as is
sqlmesh/core/context.py
Outdated
self.state_sync.compact_intervals() | ||
|
||
def _cleanup_environments(self) -> None: | ||
expired_environments = self.state_sync.delete_expired_environments() | ||
cleanup_targets = self.state_sync.get_expired_environments() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should just return environment names that are then passed into delete_environments
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced over this, we need the full Enviroment
in cleanup_expired_views
so will keep it as is for now.
]: | ||
self.engine_adapter.delete_from( | ||
self.environment_statements_table, | ||
where=exp.or_(*expired_environments), | ||
) | ||
|
||
return environments | ||
def delete_expired_environments(self) -> t.List[Environment]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we even need this method? Shouldn't we just have delete_environments
that takes a list of environment names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced over this too, similar to delete_expired_snapshots()
, we use it as a standalone function in tests so we aligned on leaving it as is and cleaning it up in the future.
|
||
return expired_environments | ||
|
||
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the delete_environments
function isn't needed anymore since we pass the current_ts
, delete_expired_environments
can contain this logic and we use that instead in _cleanup_environments
. The argument against this I suppose would be that we'd perform the query twice to fetch the environments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@izeigerman Don't recall if we ended up aligning for this exact case, but to reiterate:
-
If we want to query the expired environments only once, we need
delete_enviroments()
which can process that pre-computed list -
If we don't care about the perf cost, we can remove
delete_enviroments()
and instead keep onlydelete_expired_enviroments()
, with the implication thatget_expired_enviroments(...)
will be called twice now e.g:
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
expired_environments = self.state_sync.get_expired_environments(current_ts=current_ts)
cleanup_expired_views(
...,
enviroments=expired_enviroments
)
self.state_sync.delete_expired_environments(current_ts=current_ts) # also calls get_expired_enviroments()
Fixes #4055
This PR changes the order of deletions so that the actual views & physical tables are deleted before their state sync entries; This ensures that if the former fails, the state sync is not modified and a future
sqlmesh janitor
can pick up again the expired snapshots & environments on a later invocation.